You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/10/13 15:07:24 UTC
[06/18] nifi-minifi-cpp git commit: MINIFI-34 Establishing CMake
build system to provide build functionality equivalent to pre-existing
Makefile.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/ListenSyslog.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ListenSyslog.cpp b/libminifi/src/ListenSyslog.cpp
new file mode 100644
index 0000000..ace37d7
--- /dev/null
+++ b/libminifi/src/ListenSyslog.cpp
@@ -0,0 +1,342 @@
+/**
+ * @file ListenSyslog.cpp
+ * ListenSyslog class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <queue>
+#include <stdio.h>
+#include <string>
+#include "TimeUtil.h"
+#include "ListenSyslog.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string ListenSyslog::ProcessorName("ListenSyslog");
+Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each buffer used to receive Syslog messages.", "65507 B");
+Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The maximum size of the socket buffer that should be used.", "1 MB");
+Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The maximum number of concurrent connections to accept Syslog messages in TCP mode.", "2");
+Property ListenSyslog::MaxBatchSize("Max Batch Size",
+ "The maximum number of Syslog events to add to a single FlowFile.", "1");
+Property ListenSyslog::MessageDelimiter("Message Delimiter",
+ "Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> property).", "\n");
+Property ListenSyslog::ParseMessages("Parse Messages",
+ "Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", "false");
+Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog communication.", "UDP");
+Property ListenSyslog::Port("Port", "The port for Syslog communication.", "514");
+Relationship ListenSyslog::Success("success", "All files are routed to success");
+Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid");
+
+void ListenSyslog::initialize()
+{
+ //! Set the supported properties
+ std::set<Property> properties;
+ properties.insert(RecvBufSize);
+ properties.insert(MaxSocketBufSize);
+ properties.insert(MaxConnections);
+ properties.insert(MaxBatchSize);
+ properties.insert(MessageDelimiter);
+ properties.insert(ParseMessages);
+ properties.insert(Protocol);
+ properties.insert(Port);
+ setSupportedProperties(properties);
+ //! Set the supported relationships
+ std::set<Relationship> relationships;
+ relationships.insert(Success);
+ relationships.insert(Invalid);
+ setSupportedRelationships(relationships);
+}
+
+void ListenSyslog::startSocketThread()
+{
+ if (_thread != NULL)
+ return;
+
+ _logger->log_info("ListenSysLog Socket Thread Start");
+ _serverTheadRunning = true;
+ _thread = new std::thread(run, this);
+ _thread->detach();
+}
+
+void ListenSyslog::run(ListenSyslog *process)
+{
+ process->runThread();
+}
+
+void ListenSyslog::runThread()
+{
+ while (_serverTheadRunning)
+ {
+ if (_resetServerSocket)
+ {
+ _resetServerSocket = false;
+ // need to reset the socket
+ std::vector<int>::iterator it;
+ for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it)
+ {
+ int clientSocket = *it;
+ close(clientSocket);
+ }
+ _clientSockets.clear();
+ if (_serverSocket > 0)
+ {
+ close(_serverSocket);
+ _serverSocket = 0;
+ }
+ }
+
+ if (_serverSocket <= 0)
+ {
+ uint16_t portno = _port;
+ struct sockaddr_in serv_addr;
+ int sockfd;
+ if (_protocol == "TCP")
+ sockfd = socket(AF_INET, SOCK_STREAM, 0);
+ else
+ sockfd = socket(AF_INET, SOCK_DGRAM, 0);
+ if (sockfd < 0)
+ {
+ _logger->log_info("ListenSysLog Server socket creation failed");
+ break;
+ }
+ bzero((char *) &serv_addr, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_addr.s_addr = INADDR_ANY;
+ serv_addr.sin_port = htons(portno);
+ if (bind(sockfd, (struct sockaddr *) &serv_addr,
+ sizeof(serv_addr)) < 0)
+ {
+ _logger->log_error("ListenSysLog Server socket bind failed");
+ break;
+ }
+ if (_protocol == "TCP")
+ listen(sockfd,5);
+ _serverSocket = sockfd;
+ _logger->log_error("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno);
+ }
+ FD_ZERO(&_readfds);
+ FD_SET(_serverSocket, &_readfds);
+ _maxFds = _serverSocket;
+ std::vector<int>::iterator it;
+ for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it)
+ {
+ int clientSocket = *it;
+ if (clientSocket >= _maxFds)
+ _maxFds = clientSocket;
+ FD_SET(clientSocket, &_readfds);
+ }
+ fd_set fds;
+ struct timeval tv;
+ int retval;
+ fds = _readfds;
+ tv.tv_sec = 0;
+ // 100 msec
+ tv.tv_usec = 100000;
+ retval = select(_maxFds+1, &fds, NULL, NULL, &tv);
+ if (retval < 0)
+ break;
+ if (retval == 0)
+ continue;
+ if (FD_ISSET(_serverSocket, &fds))
+ {
+ // server socket, either we have UDP datagram or TCP connection request
+ if (_protocol == "TCP")
+ {
+ socklen_t clilen;
+ struct sockaddr_in cli_addr;
+ clilen = sizeof(cli_addr);
+ int newsockfd = accept(_serverSocket,
+ (struct sockaddr *) &cli_addr,
+ &clilen);
+ if (newsockfd > 0)
+ {
+ if (_clientSockets.size() < _maxConnections)
+ {
+ _clientSockets.push_back(newsockfd);
+ _logger->log_info("ListenSysLog new client socket %d connection", newsockfd);
+ continue;
+ }
+ else
+ {
+ close(newsockfd);
+ }
+ }
+ }
+ else
+ {
+ socklen_t clilen;
+ struct sockaddr_in cli_addr;
+ clilen = sizeof(cli_addr);
+ int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0,
+ (struct sockaddr *)&cli_addr, &clilen);
+ if (recvlen > 0 && (recvlen + getEventQueueByteSize()) <= _recvBufSize)
+ {
+ uint8_t *payload = new uint8_t[recvlen];
+ memcpy(payload, _buffer, recvlen);
+ putEvent(payload, recvlen);
+ }
+ }
+ }
+ it = _clientSockets.begin();
+ while (it != _clientSockets.end())
+ {
+ int clientSocket = *it;
+ if (FD_ISSET(clientSocket, &fds))
+ {
+ int recvlen = readline(clientSocket, (char *)_buffer, sizeof(_buffer));
+ if (recvlen <= 0)
+ {
+ close(clientSocket);
+ _logger->log_info("ListenSysLog client socket %d close", clientSocket);
+ it = _clientSockets.erase(it);
+ }
+ else
+ {
+ if ((recvlen + getEventQueueByteSize()) <= _recvBufSize)
+ {
+ uint8_t *payload = new uint8_t[recvlen];
+ memcpy(payload, _buffer, recvlen);
+ putEvent(payload, recvlen);
+ }
+ ++it;
+ }
+ }
+ }
+ }
+ return;
+}
+
+
+int ListenSyslog::readline( int fd, char *bufptr, size_t len )
+{
+ char *bufx = bufptr;
+ static char *bp;
+ static int cnt = 0;
+ static char b[ 2048 ];
+ char c;
+
+ while ( --len > 0 )
+ {
+ if ( --cnt <= 0 )
+ {
+ cnt = recv( fd, b, sizeof( b ), 0 );
+ if ( cnt < 0 )
+ {
+ if ( errno == EINTR )
+ {
+ len++; /* the while will decrement */
+ continue;
+ }
+ return -1;
+ }
+ if ( cnt == 0 )
+ return 0;
+ bp = b;
+ }
+ c = *bp++;
+ *bufptr++ = c;
+ if ( c == '\n' )
+ {
+ *bufptr = '\n';
+ return bufptr - bufx + 1;
+ }
+ }
+ return -1;
+}
+
+void ListenSyslog::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+ std::string value;
+ bool needResetServerSocket = false;
+ if (context->getProperty(Protocol.getName(), value))
+ {
+ if (_protocol != value)
+ needResetServerSocket = true;
+ _protocol = value;
+ }
+ if (context->getProperty(RecvBufSize.getName(), value))
+ {
+ Property::StringToInt(value, _recvBufSize);
+ }
+ if (context->getProperty(MaxSocketBufSize.getName(), value))
+ {
+ Property::StringToInt(value, _maxSocketBufSize);
+ }
+ if (context->getProperty(MaxConnections.getName(), value))
+ {
+ Property::StringToInt(value, _maxConnections);
+ }
+ if (context->getProperty(MessageDelimiter.getName(), value))
+ {
+ _messageDelimiter = value;
+ }
+ if (context->getProperty(ParseMessages.getName(), value))
+ {
+ Property::StringToBool(value, _parseMessages);
+ }
+ if (context->getProperty(Port.getName(), value))
+ {
+ int64_t oldPort = _port;
+ Property::StringToInt(value, _port);
+ if (_port != oldPort)
+ needResetServerSocket = true;
+ }
+ if (context->getProperty(MaxBatchSize.getName(), value))
+ {
+ Property::StringToInt(value, _maxBatchSize);
+ }
+
+ if (needResetServerSocket)
+ _resetServerSocket = true;
+
+ startSocketThread();
+
+ // read from the event queue
+ if (isEventQueueEmpty())
+ {
+ context->yield();
+ return;
+ }
+
+ std::queue<SysLogEvent> eventQueue;
+ pollEvent(eventQueue, _maxBatchSize);
+ bool firstEvent = true;
+ FlowFileRecord *flowFile = NULL;
+ while(!eventQueue.empty())
+ {
+ SysLogEvent event = eventQueue.front();
+ eventQueue.pop();
+ if (firstEvent)
+ {
+ flowFile = session->create();
+ if (!flowFile)
+ return;
+ ListenSyslog::WriteCallback callback((char *)event.payload, event.len);
+ session->write(flowFile, &callback);
+ delete[] event.payload;
+ firstEvent = false;
+ }
+ else
+ {
+ ListenSyslog::WriteCallback callback((char *)event.payload, event.len);
+ session->append(flowFile, &callback);
+ delete[] event.payload;
+ }
+ }
+ flowFile->addAttribute("syslog.protocol", _protocol);
+ flowFile->addAttribute("syslog.port", std::to_string(_port));
+ session->transfer(flowFile, Success);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/LogAttribute.cpp b/libminifi/src/LogAttribute.cpp
new file mode 100644
index 0000000..82130f8
--- /dev/null
+++ b/libminifi/src/LogAttribute.cpp
@@ -0,0 +1,158 @@
+/**
+ * @file LogAttribute.cpp
+ * LogAttribute class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <sstream>
+#include <string.h>
+#include <iostream>
+
+#include "TimeUtil.h"
+#include "LogAttribute.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string LogAttribute::ProcessorName("LogAttribute");
+Property LogAttribute::LogLevel("Log Level", "The Log Level to use when logging the Attributes", "info");
+Property LogAttribute::AttributesToLog("Attributes to Log", "A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.", "");
+Property LogAttribute::AttributesToIgnore("Attributes to Ignore", "A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.", "");
+Property LogAttribute::LogPayload("Log Payload",
+ "If true, the FlowFile's payload will be logged, in addition to its attributes; otherwise, just the Attributes will be logged.", "false");
+Property LogAttribute::LogPrefix("Log prefix",
+ "Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.", "");
+Relationship LogAttribute::Success("success", "success operational on the flow record");
+
+void LogAttribute::initialize()
+{
+ //! Set the supported properties
+ std::set<Property> properties;
+ properties.insert(LogLevel);
+ properties.insert(AttributesToLog);
+ properties.insert(AttributesToIgnore);
+ properties.insert(LogPayload);
+ properties.insert(LogPrefix);
+ setSupportedProperties(properties);
+ //! Set the supported relationships
+ std::set<Relationship> relationships;
+ relationships.insert(Success);
+ setSupportedRelationships(relationships);
+}
+
+void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+ std::string dashLine = "--------------------------------------------------";
+ LogAttrLevel level = LogAttrLevelInfo;
+ bool logPayload = false;
+ std::ostringstream message;
+
+ FlowFileRecord *flow = session->get();
+
+ if (!flow)
+ return;
+
+ std::string value;
+ if (context->getProperty(LogLevel.getName(), value))
+ {
+ logLevelStringToEnum(value, level);
+ }
+ if (context->getProperty(LogPrefix.getName(), value))
+ {
+ dashLine = "-----" + value + "-----";
+ }
+ if (context->getProperty(LogPayload.getName(), value))
+ {
+ Property::StringToBool(value, logPayload);
+ }
+
+ message << "Logging for flow file " << "\n";
+ message << dashLine;
+ message << "\nStandard FlowFile Attributes";
+ message << "\n" << "UUID:" << flow->getUUIDStr();
+ message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate());
+ message << "\n" << "lineageStartDate:" << getTimeStr(flow->getlineageStartDate());
+ message << "\n" << "Size:" << flow->getSize() << " Offset:" << flow->getOffset();
+ message << "\nFlowFile Attributes Map Content";
+ std::map<std::string, std::string> attrs = flow->getAttributes();
+ std::map<std::string, std::string>::iterator it;
+ for (it = attrs.begin(); it!= attrs.end(); it++)
+ {
+ message << "\n" << "key:" << it->first << " value:" << it->second;
+ }
+ message << "\nFlowFile Resource Claim Content";
+ ResourceClaim *claim = flow->getResourceClaim();
+ if (claim)
+ {
+ message << "\n" << "Content Claim:" << claim->getContentFullPath();
+ }
+ if (logPayload && flow->getSize() <= 1024*1024)
+ {
+ message << "\n" << "Payload:" << "\n";
+ ReadCallback callback(flow->getSize());
+ session->read(flow, &callback);
+ for (unsigned int i = 0, j = 0; i < callback._readSize; i++)
+ {
+ char temp[8];
+ sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i]));
+ message << temp;
+ j++;
+ if (j == 16)
+ {
+ message << '\n';
+ j = 0;
+ }
+ }
+ }
+ message << "\n" << dashLine << std::ends;
+ std::string output = message.str();
+
+ switch (level)
+ {
+ case LogAttrLevelInfo:
+ _logger->log_info("%s", output.c_str());
+ break;
+ case LogAttrLevelDebug:
+ _logger->log_debug("%s", output.c_str());
+ break;
+ case LogAttrLevelError:
+ _logger->log_error("%s", output.c_str());
+ break;
+ case LogAttrLevelTrace:
+ _logger->log_trace("%s", output.c_str());
+ break;
+ case LogAttrLevelWarn:
+ _logger->log_warn("%s", output.c_str());
+ break;
+ default:
+ break;
+ }
+
+ // Test Import
+ /*
+ FlowFileRecord *importRecord = session->create();
+ session->import(claim->getContentFullPath(), importRecord);
+ session->transfer(importRecord, Success); */
+
+
+ // Transfer to the relationship
+ session->transfer(flow, Success);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/Logger.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Logger.cpp b/libminifi/src/Logger.cpp
new file mode 100644
index 0000000..984f609
--- /dev/null
+++ b/libminifi/src/Logger.cpp
@@ -0,0 +1,27 @@
+/**
+ * @file Logger.cpp
+ * Logger class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+
+#include "Logger.h"
+
+Logger *Logger::_logger(NULL);
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessGroup.cpp b/libminifi/src/ProcessGroup.cpp
new file mode 100644
index 0000000..70ee9d7
--- /dev/null
+++ b/libminifi/src/ProcessGroup.cpp
@@ -0,0 +1,314 @@
+/**
+ * @file ProcessGroup.cpp
+ * ProcessGroup class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+
+#include "ProcessGroup.h"
+#include "Processor.h"
+
+ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, ProcessGroup *parent)
+: _name(name),
+ _type(type),
+ _parentProcessGroup(parent)
+{
+ if (!uuid)
+ // Generate the global UUID for the flow record
+ uuid_generate(_uuid);
+ else
+ uuid_copy(_uuid, uuid);
+
+ _yieldPeriodMsec = 0;
+ _transmitting = false;
+
+ _logger = Logger::getLogger();
+ _logger->log_info("ProcessGroup %s created", _name.c_str());
+}
+
+ProcessGroup::~ProcessGroup()
+{
+ for (std::set<Connection *>::iterator it = _connections.begin(); it != _connections.end(); ++it)
+ {
+ Connection *connection = *it;
+ connection->drain();
+ delete connection;
+ }
+
+ for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+ {
+ ProcessGroup *processGroup(*it);
+ delete processGroup;
+ }
+
+ for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
+ {
+ Processor *processor(*it);
+ delete processor;
+ }
+}
+
+bool ProcessGroup::isRootProcessGroup()
+{
+ std::lock_guard<std::mutex> lock(_mtx);
+ return (_type == ROOT_PROCESS_GROUP);
+}
+
+void ProcessGroup::addProcessor(Processor *processor)
+{
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ if (_processors.find(processor) == _processors.end())
+ {
+ // We do not have the same processor in this process group yet
+ _processors.insert(processor);
+ _logger->log_info("Add processor %s into process group %s",
+ processor->getName().c_str(), _name.c_str());
+ }
+}
+
+void ProcessGroup::removeProcessor(Processor *processor)
+{
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ if (_processors.find(processor) != _processors.end())
+ {
+ // We do have the same processor in this process group yet
+ _processors.erase(processor);
+ _logger->log_info("Remove processor %s from process group %s",
+ processor->getName().c_str(), _name.c_str());
+ }
+}
+
+void ProcessGroup::addProcessGroup(ProcessGroup *child)
+{
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ if (_childProcessGroups.find(child) == _childProcessGroups.end())
+ {
+ // We do not have the same child process group in this process group yet
+ _childProcessGroups.insert(child);
+ _logger->log_info("Add child process group %s into process group %s",
+ child->getName().c_str(), _name.c_str());
+ }
+}
+
+void ProcessGroup::removeProcessGroup(ProcessGroup *child)
+{
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ if (_childProcessGroups.find(child) != _childProcessGroups.end())
+ {
+ // We do have the same child process group in this process group yet
+ _childProcessGroups.erase(child);
+ _logger->log_info("Remove child process group %s from process group %s",
+ child->getName().c_str(), _name.c_str());
+ }
+}
+
+void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler)
+{
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ try
+ {
+ // Start all the processor node, input and output ports
+ for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
+ {
+ Processor *processor(*it);
+ if (!processor->isRunning() && processor->getScheduledState() != DISABLED)
+ {
+ if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
+ timeScheduler->schedule(processor);
+ }
+ }
+
+ for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+ {
+ ProcessGroup *processGroup(*it);
+ processGroup->startProcessing(timeScheduler);
+ }
+ }
+ catch (std::exception &exception)
+ {
+ _logger->log_debug("Caught Exception %s", exception.what());
+ throw;
+ }
+ catch (...)
+ {
+ _logger->log_debug("Caught Exception during process group start processing");
+ throw;
+ }
+}
+
+void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler)
+{
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ try
+ {
+ // Stop all the processor node, input and output ports
+ for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
+ {
+ Processor *processor(*it);
+ if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
+ timeScheduler->unschedule(processor);
+ }
+
+ for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+ {
+ ProcessGroup *processGroup(*it);
+ processGroup->stopProcessing(timeScheduler);
+ }
+ }
+ catch (std::exception &exception)
+ {
+ _logger->log_debug("Caught Exception %s", exception.what());
+ throw;
+ }
+ catch (...)
+ {
+ _logger->log_debug("Caught Exception during process group stop processing");
+ throw;
+ }
+}
+
+Processor *ProcessGroup::findProcessor(uuid_t uuid)
+{
+ Processor *ret = NULL;
+ // std::lock_guard<std::mutex> lock(_mtx);
+
+ for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
+ {
+ Processor *processor(*it);
+ uuid_t processorUUID;
+ if (processor->getUUID(processorUUID) && uuid_compare(processorUUID, uuid) == 0)
+ return processor;
+ }
+
+ for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+ {
+ ProcessGroup *processGroup(*it);
+ Processor *processor = processGroup->findProcessor(uuid);
+ if (processor)
+ return processor;
+ }
+
+ return ret;
+}
+
+Processor *ProcessGroup::findProcessor(std::string processorName)
+{
+ Processor *ret = NULL;
+
+ for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
+ {
+ Processor *processor(*it);
+ _logger->log_debug("Current processor is %s", processor->getName().c_str());
+ if (processor->getName() == processorName)
+ return processor;
+ }
+
+ for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+ {
+ ProcessGroup *processGroup(*it);
+ Processor *processor = processGroup->findProcessor(processorName);
+ if (processor)
+ return processor;
+ }
+
+ return ret;
+}
+
+void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue)
+{
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
+ {
+ Processor *processor(*it);
+ if (processor->getName() == processorName)
+ {
+ processor->setProperty(propertyName, propertyValue);
+ }
+ }
+
+ for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+ {
+ ProcessGroup *processGroup(*it);
+ processGroup->updatePropertyValue(processorName, propertyName, propertyValue);
+ }
+
+ return;
+}
+
+void ProcessGroup::addConnection(Connection *connection)
+{
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ if (_connections.find(connection) == _connections.end())
+ {
+ // We do not have the same connection in this process group yet
+ _connections.insert(connection);
+ _logger->log_info("Add connection %s into process group %s",
+ connection->getName().c_str(), _name.c_str());
+ uuid_t sourceUUID;
+ Processor *source = NULL;
+ connection->getSourceProcessorUUID(sourceUUID);
+ source = this->findProcessor(sourceUUID);
+ if (source)
+ source->addConnection(connection);
+ Processor *destination = NULL;
+ uuid_t destinationUUID;
+ connection->getDestinationProcessorUUID(destinationUUID);
+ destination = this->findProcessor(destinationUUID);
+ if (destination && destination != source)
+ destination->addConnection(connection);
+ }
+}
+
+void ProcessGroup::removeConnection(Connection *connection)
+{
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ if (_connections.find(connection) != _connections.end())
+ {
+ // We do not have the same connection in this process group yet
+ _connections.erase(connection);
+ _logger->log_info("Remove connection %s into process group %s",
+ connection->getName().c_str(), _name.c_str());
+ uuid_t sourceUUID;
+ Processor *source = NULL;
+ connection->getSourceProcessorUUID(sourceUUID);
+ source = this->findProcessor(sourceUUID);
+ if (source)
+ source->removeConnection(connection);
+ Processor *destination = NULL;
+ uuid_t destinationUUID;
+ connection->getDestinationProcessorUUID(destinationUUID);
+ destination = this->findProcessor(destinationUUID);
+ if (destination && destination != source)
+ destination->removeConnection(connection);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ProcessSession.cpp b/libminifi/src/ProcessSession.cpp
new file mode 100644
index 0000000..4f526c3
--- /dev/null
+++ b/libminifi/src/ProcessSession.cpp
@@ -0,0 +1,731 @@
+/**
+ * @file ProcessSession.cpp
+ * ProcessSession class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <iostream>
+
+#include "ProcessSession.h"
+
+FlowFileRecord* ProcessSession::create()
+{
+ std::map<std::string, std::string> empty;
+ FlowFileRecord *record = new FlowFileRecord(empty);
+
+ if (record)
+ {
+ _addedFlowFiles[record->getUUIDStr()] = record;
+ _logger->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
+ }
+
+ return record;
+}
+
+FlowFileRecord* ProcessSession::create(FlowFileRecord *parent)
+{
+ FlowFileRecord *record = this->create();
+ if (record)
+ {
+ // Copy attributes
+ std::map<std::string, std::string> parentAttributes = parent->getAttributes();
+ std::map<std::string, std::string>::iterator it;
+ for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++)
+ {
+ if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) ||
+ it->first == FlowAttributeKey(DISCARD_REASON) ||
+ it->first == FlowAttributeKey(UUID))
+ // Do not copy special attributes from parent
+ continue;
+ record->setAttribute(it->first, it->second);
+ }
+ record->_lineageStartDate = parent->_lineageStartDate;
+ record->_lineageIdentifiers = parent->_lineageIdentifiers;
+ record->_lineageIdentifiers.insert(parent->_uuidStr);
+
+ }
+ return record;
+}
+
+FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent)
+{
+ FlowFileRecord *record = this->create(parent);
+ if (record)
+ {
+ // Copy Resource Claim
+ record->_claim = parent->_claim;
+ if (record->_claim)
+ {
+ record->_offset = parent->_offset;
+ record->_size = parent->_size;
+ record->_claim->increaseFlowFileRecordOwnedCount();
+ }
+ }
+ return record;
+}
+
+FlowFileRecord* ProcessSession::cloneDuringTransfer(FlowFileRecord *parent)
+{
+ std::map<std::string, std::string> empty;
+ FlowFileRecord *record = new FlowFileRecord(empty);
+
+ if (record)
+ {
+ this->_clonedFlowFiles[record->getUUIDStr()] = record;
+ _logger->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr().c_str());
+ // Copy attributes
+ std::map<std::string, std::string> parentAttributes = parent->getAttributes();
+ std::map<std::string, std::string>::iterator it;
+ for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++)
+ {
+ if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) ||
+ it->first == FlowAttributeKey(DISCARD_REASON) ||
+ it->first == FlowAttributeKey(UUID))
+ // Do not copy special attributes from parent
+ continue;
+ record->setAttribute(it->first, it->second);
+ }
+ record->_lineageStartDate = parent->_lineageStartDate;
+ record->_lineageIdentifiers = parent->_lineageIdentifiers;
+ record->_lineageIdentifiers.insert(parent->_uuidStr);
+
+ // Copy Resource Claim
+ record->_claim = parent->_claim;
+ if (record->_claim)
+ {
+ record->_offset = parent->_offset;
+ record->_size = parent->_size;
+ record->_claim->increaseFlowFileRecordOwnedCount();
+ }
+ }
+
+ return record;
+}
+
+FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, long size)
+{
+ FlowFileRecord *record = this->create(parent);
+ if (record)
+ {
+ if (parent->_claim)
+ {
+ if ((offset + size) > (long) parent->_size)
+ {
+ // Set offset and size
+ _logger->log_error("clone offset %d and size %d exceed parent size %d",
+ offset, size, parent->_size);
+ // Remove the Add FlowFile for the session
+ std::map<std::string, FlowFileRecord *>::iterator it =
+ this->_addedFlowFiles.find(record->getUUIDStr());
+ if (it != this->_addedFlowFiles.end())
+ this->_addedFlowFiles.erase(record->getUUIDStr());
+ delete record;
+ return NULL;
+ }
+ record->_offset = parent->_offset + parent->_offset;
+ record->_size = size;
+ // Copy Resource Claim
+ record->_claim = parent->_claim;
+ record->_claim->increaseFlowFileRecordOwnedCount();
+ }
+ }
+ return record;
+}
+
+void ProcessSession::remove(FlowFileRecord *flow)
+{
+ flow->_markedDelete = true;
+ _deletedFlowFiles[flow->getUUIDStr()] = flow;
+}
+
+void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, std::string value)
+{
+ flow->setAttribute(key, value);
+}
+
+void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key)
+{
+ flow->removeAttribute(key);
+}
+
+void ProcessSession::penalize(FlowFileRecord *flow)
+{
+ flow->_penaltyExpirationMs = getTimeMillis() + this->_processContext->getProcessor()->getPenalizationPeriodMsec();
+}
+
+void ProcessSession::transfer(FlowFileRecord *flow, Relationship relationship)
+{
+ _transferRelationship[flow->getUUIDStr()] = relationship;
+}
+
+void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback)
+{
+ ResourceClaim *claim = NULL;
+
+ claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
+
+ try
+ {
+ std::ofstream fs;
+ fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
+ if (fs.is_open())
+ {
+ // Call the callback to write the content
+ callback->process(&fs);
+ if (fs.good() && fs.tellp() >= 0)
+ {
+ flow->_size = fs.tellp();
+ flow->_offset = 0;
+ if (flow->_claim)
+ {
+ // Remove the old claim
+ flow->_claim->decreaseFlowFileRecordOwnedCount();
+ flow->_claim = NULL;
+ }
+ flow->_claim = claim;
+ claim->increaseFlowFileRecordOwnedCount();
+ /*
+ _logger->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
+ flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+ fs.close();
+ }
+ else
+ {
+ fs.close();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
+ }
+ }
+ else
+ {
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+ }
+ }
+ catch (std::exception &exception)
+ {
+ if (flow && flow->_claim == claim)
+ {
+ flow->_claim->decreaseFlowFileRecordOwnedCount();
+ flow->_claim = NULL;
+ }
+ if (claim)
+ delete claim;
+ _logger->log_debug("Caught Exception %s", exception.what());
+ throw;
+ }
+ catch (...)
+ {
+ if (flow && flow->_claim == claim)
+ {
+ flow->_claim->decreaseFlowFileRecordOwnedCount();
+ flow->_claim = NULL;
+ }
+ if (claim)
+ delete claim;
+ _logger->log_debug("Caught Exception during process session write");
+ throw;
+ }
+}
+
+void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback *callback)
+{
+ ResourceClaim *claim = NULL;
+
+ if (flow->_claim == NULL)
+ {
+ // No existed claim for append, we need to create new claim
+ return write(flow, callback);
+ }
+
+ claim = flow->_claim;
+
+ try
+ {
+ std::ofstream fs;
+ fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app);
+ if (fs.is_open())
+ {
+ // Call the callback to write the content
+ std::streampos oldPos = fs.tellp();
+ callback->process(&fs);
+ if (fs.good() && fs.tellp() >= 0)
+ {
+ uint64_t appendSize = fs.tellp() - oldPos;
+ flow->_size += appendSize;
+ /*
+ _logger->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
+ flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+ fs.close();
+ }
+ else
+ {
+ fs.close();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
+ }
+ }
+ else
+ {
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+ }
+ }
+ catch (std::exception &exception)
+ {
+ _logger->log_debug("Caught Exception %s", exception.what());
+ throw;
+ }
+ catch (...)
+ {
+ _logger->log_debug("Caught Exception during process session append");
+ throw;
+ }
+}
+
+void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback)
+{
+ try
+ {
+ ResourceClaim *claim = NULL;
+ if (flow->_claim == NULL)
+ {
+ // No existed claim for read, we throw exception
+ throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read");
+ }
+
+ claim = flow->_claim;
+ std::ifstream fs;
+ fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary);
+ if (fs.is_open())
+ {
+ fs.seekg(flow->_offset, fs.beg);
+
+ if (fs.good())
+ {
+ callback->process(&fs);
+ /*
+ _logger->log_debug("Read offset %d size %d content %s for FlowFile UUID %s",
+ flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+ fs.close();
+ }
+ else
+ {
+ fs.close();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error");
+ }
+ }
+ else
+ {
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
+ }
+ }
+ catch (std::exception &exception)
+ {
+ _logger->log_debug("Caught Exception %s", exception.what());
+ throw;
+ }
+ catch (...)
+ {
+ _logger->log_debug("Caught Exception during process session read");
+ throw;
+ }
+}
+
+void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepSource, uint64_t offset)
+{
+ ResourceClaim *claim = NULL;
+
+ claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
+ char *buf = NULL;
+ int size = 4096;
+ buf = new char [size];
+
+ try
+ {
+ std::ofstream fs;
+ fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
+ std::ifstream input;
+ input.open(source.c_str(), std::fstream::in | std::fstream::binary);
+
+ if (fs.is_open() && input.is_open())
+ {
+ // Open the source file and stream to the flow file
+ input.seekg(offset, fs.beg);
+ while (input.good())
+ {
+ input.read(buf, size);
+ if (input)
+ fs.write(buf, size);
+ else
+ fs.write(buf, input.gcount());
+ }
+
+ if (fs.good() && fs.tellp() >= 0)
+ {
+ flow->_size = fs.tellp();
+ flow->_offset = 0;
+ if (flow->_claim)
+ {
+ // Remove the old claim
+ flow->_claim->decreaseFlowFileRecordOwnedCount();
+ flow->_claim = NULL;
+ }
+ flow->_claim = claim;
+ claim->increaseFlowFileRecordOwnedCount();
+ /*
+ _logger->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s",
+ flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+ fs.close();
+ input.close();
+ if (!keepSource)
+ std::remove(source.c_str());
+ }
+ else
+ {
+ fs.close();
+ input.close();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+ }
+ }
+ else
+ {
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+ }
+
+ delete[] buf;
+ }
+ catch (std::exception &exception)
+ {
+ if (flow && flow->_claim == claim)
+ {
+ flow->_claim->decreaseFlowFileRecordOwnedCount();
+ flow->_claim = NULL;
+ }
+ if (claim)
+ delete claim;
+ _logger->log_debug("Caught Exception %s", exception.what());
+ delete[] buf;
+ throw;
+ }
+ catch (...)
+ {
+ if (flow && flow->_claim == claim)
+ {
+ flow->_claim->decreaseFlowFileRecordOwnedCount();
+ flow->_claim = NULL;
+ }
+ if (claim)
+ delete claim;
+ _logger->log_debug("Caught Exception during process session write");
+ delete[] buf;
+ throw;
+ }
+}
+
+void ProcessSession::commit()
+{
+ try
+ {
+ // First we clone the flow record based on the transfered relationship for updated flow record
+ std::map<std::string, FlowFileRecord *>::iterator it;
+ for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++)
+ {
+ FlowFileRecord *record = it->second;
+ if (record->_markedDelete)
+ continue;
+ std::map<std::string, Relationship>::iterator itRelationship =
+ this->_transferRelationship.find(record->getUUIDStr());
+ if (itRelationship != _transferRelationship.end())
+ {
+ Relationship relationship = itRelationship->second;
+ // Find the relationship, we need to find the connections for that relationship
+ std::set<Connection *> connections =
+ _processContext->getProcessor()->getOutGoingConnections(relationship.getName());
+ if (connections.empty())
+ {
+ // No connection
+ if (!_processContext->getProcessor()->isAutoTerminated(relationship))
+ {
+ // Not autoterminate, we should have the connect
+ std::string message = "Connect empty for non auto terminated relationship" + relationship.getName();
+ throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
+ }
+ else
+ {
+ // Autoterminated
+ remove(record);
+ }
+ }
+ else
+ {
+ // We connections, clone the flow and assign the connection accordingly
+ for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection)
+ {
+ Connection *connection(*itConnection);
+ if (itConnection == connections.begin())
+ {
+ // First connection which the flow need be routed to
+ record->_connection = connection;
+ }
+ else
+ {
+ // Clone the flow file and route to the connection
+ FlowFileRecord *cloneRecord;
+ cloneRecord = this->cloneDuringTransfer(record);
+ if (cloneRecord)
+ cloneRecord->_connection = connection;
+ else
+ throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
+ }
+ }
+ }
+ }
+ else
+ {
+ // Can not find relationship for the flow
+ throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
+ }
+ }
+
+ // Do the samething for added flow file
+ for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++)
+ {
+ FlowFileRecord *record = it->second;
+ if (record->_markedDelete)
+ continue;
+ std::map<std::string, Relationship>::iterator itRelationship =
+ this->_transferRelationship.find(record->getUUIDStr());
+ if (itRelationship != _transferRelationship.end())
+ {
+ Relationship relationship = itRelationship->second;
+ // Find the relationship, we need to find the connections for that relationship
+ std::set<Connection *> connections =
+ _processContext->getProcessor()->getOutGoingConnections(relationship.getName());
+ if (connections.empty())
+ {
+ // No connection
+ if (!_processContext->getProcessor()->isAutoTerminated(relationship))
+ {
+ // Not autoterminate, we should have the connect
+ std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
+ throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
+ }
+ else
+ {
+ // Autoterminated
+ remove(record);
+ }
+ }
+ else
+ {
+ // We connections, clone the flow and assign the connection accordingly
+ for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection)
+ {
+ Connection *connection(*itConnection);
+ if (itConnection == connections.begin())
+ {
+ // First connection which the flow need be routed to
+ record->_connection = connection;
+ }
+ else
+ {
+ // Clone the flow file and route to the connection
+ FlowFileRecord *cloneRecord;
+ cloneRecord = this->cloneDuringTransfer(record);
+ if (cloneRecord)
+ cloneRecord->_connection = connection;
+ else
+ throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
+ }
+ }
+ }
+ }
+ else
+ {
+ // Can not find relationship for the flow
+ throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
+ }
+ }
+
+ // Complete process the added and update flow files for the session, send the flow file to its queue
+ for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++)
+ {
+ FlowFileRecord *record = it->second;
+ if (record->_markedDelete)
+ {
+ continue;
+ }
+ if (record->_connection)
+ record->_connection->put(record);
+ else
+ delete record;
+ }
+ for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++)
+ {
+ FlowFileRecord *record = it->second;
+ if (record->_markedDelete)
+ {
+ continue;
+ }
+ if (record->_connection)
+ record->_connection->put(record);
+ else
+ delete record;
+ }
+ // Process the clone flow files
+ for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++)
+ {
+ FlowFileRecord *record = it->second;
+ if (record->_markedDelete)
+ {
+ continue;
+ }
+ if (record->_connection)
+ record->_connection->put(record);
+ else
+ delete record;
+ }
+ // Delete the deleted flow files
+ for (it = _deletedFlowFiles.begin(); it!= _deletedFlowFiles.end(); it++)
+ {
+ FlowFileRecord *record = it->second;
+ delete record;
+ }
+ // Delete the snapshot
+ for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++)
+ {
+ FlowFileRecord *record = it->second;
+ delete record;
+ }
+ // All done
+ _updatedFlowFiles.clear();
+ _addedFlowFiles.clear();
+ _clonedFlowFiles.clear();
+ _deletedFlowFiles.clear();
+ _originalFlowFiles.clear();
+ _logger->log_trace("ProcessSession committed for %s", _processContext->getProcessor()->getName().c_str());
+ }
+ catch (std::exception &exception)
+ {
+ _logger->log_debug("Caught Exception %s", exception.what());
+ throw;
+ }
+ catch (...)
+ {
+ _logger->log_debug("Caught Exception during process session commit");
+ throw;
+ }
+}
+
+
+void ProcessSession::rollback()
+{
+ try
+ {
+ std::map<std::string, FlowFileRecord *>::iterator it;
+ // Requeue the snapshot of the flowfile back
+ for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++)
+ {
+ FlowFileRecord *record = it->second;
+ if (record->_orginalConnection)
+ {
+ record->_snapshot = false;
+ record->_orginalConnection->put(record);
+ }
+ else
+ delete record;
+ }
+ _originalFlowFiles.clear();
+ // Process the clone flow files
+ for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++)
+ {
+ FlowFileRecord *record = it->second;
+ delete record;
+ }
+ _clonedFlowFiles.clear();
+ for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++)
+ {
+ FlowFileRecord *record = it->second;
+ delete record;
+ }
+ _addedFlowFiles.clear();
+ for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++)
+ {
+ FlowFileRecord *record = it->second;
+ delete record;
+ }
+ _updatedFlowFiles.clear();
+ _deletedFlowFiles.clear();
+ _logger->log_trace("ProcessSession rollback for %s", _processContext->getProcessor()->getName().c_str());
+ }
+ catch (std::exception &exception)
+ {
+ _logger->log_debug("Caught Exception %s", exception.what());
+ throw;
+ }
+ catch (...)
+ {
+ _logger->log_debug("Caught Exception during process session roll back");
+ throw;
+ }
+}
+
+FlowFileRecord *ProcessSession::get()
+{
+ Connection *first = _processContext->getProcessor()->getNextIncomingConnection();
+
+ if (first == NULL)
+ return NULL;
+
+ Connection *current = first;
+
+ do
+ {
+ std::set<FlowFileRecord *> expired;
+ FlowFileRecord *ret = current->poll(expired);
+ if (expired.size() > 0)
+ {
+ // Remove expired flow record
+ for (std::set<FlowFileRecord *>::iterator it = expired.begin(); it != expired.end(); ++it)
+ {
+ delete (*it);
+ }
+ }
+ if (ret)
+ {
+ // add the flow record to the current process session update map
+ ret->_markedDelete = false;
+ _updatedFlowFiles[ret->getUUIDStr()] = ret;
+ std::map<std::string, std::string> empty;
+ FlowFileRecord *snapshot = new FlowFileRecord(empty);
+ _logger->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr().c_str());
+ snapshot->duplicate(ret);
+ // save a snapshot
+ _originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
+ return ret;
+ }
+ current = _processContext->getProcessor()->getNextIncomingConnection();
+ }
+ while (current != NULL && current != first);
+
+ return NULL;
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Processor.cpp b/libminifi/src/Processor.cpp
new file mode 100644
index 0000000..cc136dc
--- /dev/null
+++ b/libminifi/src/Processor.cpp
@@ -0,0 +1,451 @@
+/**
+ * @file Processor.cpp
+ * Processor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+
+#include "Processor.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+Processor::Processor(std::string name, uuid_t uuid)
+: _name(name)
+{
+ if (!uuid)
+ // Generate the global UUID for the flow record
+ uuid_generate(_uuid);
+ else
+ uuid_copy(_uuid, uuid);
+
+ char uuidStr[37];
+ uuid_unparse(_uuid, uuidStr);
+ _uuidStr = uuidStr;
+
+ // Setup the default values
+ _state = DISABLED;
+ _strategy = TIMER_DRIVEN;
+ _lossTolerant = false;
+ _triggerWhenEmpty = false;
+ _schedulingPeriodNano = MINIMUM_SCHEDULING_NANOS;
+ _runDurantionNano = 0;
+ _yieldPeriodMsec = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
+ _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
+ _maxConcurrentTasks = 1;
+ _activeTasks = 0;
+ _yieldExpiration = 0;
+ _incomingConnectionsIter = this->_incomingConnections.begin();
+ _logger = Logger::getLogger();
+
+ _logger->log_info("Processor %s created UUID %s", _name.c_str(), _uuidStr.c_str());
+}
+
+Processor::~Processor()
+{
+
+}
+
+bool Processor::isRunning()
+{
+ return (_state == RUNNING && _activeTasks > 0);
+}
+
+bool Processor::setSupportedProperties(std::set<Property> properties)
+{
+ if (isRunning())
+ {
+ _logger->log_info("Can not set processor property while the process %s is running",
+ _name.c_str());
+ return false;
+ }
+
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ _properties.clear();
+ for (std::set<Property>::iterator it = properties.begin(); it != properties.end(); ++it)
+ {
+ Property item(*it);
+ _properties[item.getName()] = item;
+ _logger->log_info("Processor %s supported property name %s", _name.c_str(), item.getName().c_str());
+ }
+
+ return true;
+}
+
+bool Processor::setSupportedRelationships(std::set<Relationship> relationships)
+{
+ if (isRunning())
+ {
+ _logger->log_info("Can not set processor supported relationship while the process %s is running",
+ _name.c_str());
+ return false;
+ }
+
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ _relationships.clear();
+ for (std::set<Relationship>::iterator it = relationships.begin(); it != relationships.end(); ++it)
+ {
+ Relationship item(*it);
+ _relationships[item.getName()] = item;
+ _logger->log_info("Processor %s supported relationship name %s", _name.c_str(), item.getName().c_str());
+ }
+
+ return true;
+}
+
+bool Processor::setAutoTerminatedRelationships(std::set<Relationship> relationships)
+{
+ if (isRunning())
+ {
+ _logger->log_info("Can not set processor auto terminated relationship while the process %s is running",
+ _name.c_str());
+ return false;
+ }
+
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ _autoTerminatedRelationships.clear();
+ for (std::set<Relationship>::iterator it = relationships.begin(); it != relationships.end(); ++it)
+ {
+ Relationship item(*it);
+ _autoTerminatedRelationships[item.getName()] = item;
+ _logger->log_info("Processor %s auto terminated relationship name %s", _name.c_str(), item.getName().c_str());
+ }
+
+ return true;
+}
+
+bool Processor::isAutoTerminated(Relationship relationship)
+{
+ bool isRun = isRunning();
+
+ if (!isRun)
+ _mtx.lock();
+
+ std::map<std::string, Relationship>::iterator it = _autoTerminatedRelationships.find(relationship.getName());
+ if (it != _autoTerminatedRelationships.end())
+ {
+ if (!isRun)
+ _mtx.unlock();
+ return true;
+ }
+ else
+ {
+ if (!isRun)
+ _mtx.unlock();
+ return false;
+ }
+}
+
+bool Processor::isSupportedRelationship(Relationship relationship)
+{
+ bool isRun = isRunning();
+
+ if (!isRun)
+ _mtx.lock();
+
+ std::map<std::string, Relationship>::iterator it = _relationships.find(relationship.getName());
+ if (it != _relationships.end())
+ {
+ if (!isRun)
+ _mtx.unlock();
+ return true;
+ }
+ else
+ {
+ if (!isRun)
+ _mtx.unlock();
+ return false;
+ }
+}
+
+bool Processor::getProperty(std::string name, std::string &value)
+{
+ bool isRun = isRunning();
+
+ if (!isRun)
+ // Because set property only allowed in non running state, we need to obtain lock avoid rack condition
+ _mtx.lock();
+
+ std::map<std::string, Property>::iterator it = _properties.find(name);
+ if (it != _properties.end())
+ {
+ Property item = it->second;
+ value = item.getValue();
+ if (!isRun)
+ _mtx.unlock();
+ return true;
+ }
+ else
+ {
+ if (!isRun)
+ _mtx.unlock();
+ return false;
+ }
+}
+
+bool Processor::setProperty(std::string name, std::string value)
+{
+
+ std::lock_guard<std::mutex> lock(_mtx);
+ std::map<std::string, Property>::iterator it = _properties.find(name);
+
+ if (it != _properties.end())
+ {
+ Property item = it->second;
+ item.setValue(value);
+ _properties[item.getName()] = item;
+ _logger->log_info("Processor %s property name %s value %s", _name.c_str(), item.getName().c_str(), value.c_str());
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+}
+
+std::set<Connection *> Processor::getOutGoingConnections(std::string relationship)
+{
+ std::set<Connection *> empty;
+
+ std::map<std::string, std::set<Connection *>>::iterator it = _outGoingConnections.find(relationship);
+ if (it != _outGoingConnections.end())
+ {
+ return _outGoingConnections[relationship];
+ }
+ else
+ {
+ return empty;
+ }
+}
+
+bool Processor::addConnection(Connection *connection)
+{
+ bool ret = false;
+
+ if (isRunning())
+ {
+ _logger->log_info("Can not add connection while the process %s is running",
+ _name.c_str());
+ return false;
+ }
+
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ uuid_t srcUUID;
+ uuid_t destUUID;
+
+ connection->getSourceProcessorUUID(srcUUID);
+ connection->getDestinationProcessorUUID(destUUID);
+
+ if (uuid_compare(_uuid, destUUID) == 0)
+ {
+ // Connection is destination to the current processor
+ if (_incomingConnections.find(connection) == _incomingConnections.end())
+ {
+ _incomingConnections.insert(connection);
+ connection->setDestinationProcessor(this);
+ _logger->log_info("Add connection %s into Processor %s incoming connection",
+ connection->getName().c_str(), _name.c_str());
+ _incomingConnectionsIter = this->_incomingConnections.begin();
+ ret = true;
+ }
+ }
+
+ if (uuid_compare(_uuid, srcUUID) == 0)
+ {
+ std::string relationship = connection->getRelationship().getName();
+ // Connection is source from the current processor
+ std::map<std::string, std::set<Connection *>>::iterator it =
+ _outGoingConnections.find(relationship);
+ if (it != _outGoingConnections.end())
+ {
+ // We already has connection for this relationship
+ std::set<Connection *> existedConnection = it->second;
+ if (existedConnection.find(connection) == existedConnection.end())
+ {
+ // We do not have the same connection for this relationship yet
+ existedConnection.insert(connection);
+ connection->setSourceProcessor(this);
+ _outGoingConnections[relationship] = existedConnection;
+ _logger->log_info("Add connection %s into Processor %s outgoing connection for relationship %s",
+ connection->getName().c_str(), _name.c_str(), relationship.c_str());
+ ret = true;
+ }
+ }
+ else
+ {
+ // We do not have any outgoing connection for this relationship yet
+ std::set<Connection *> newConnection;
+ newConnection.insert(connection);
+ connection->setSourceProcessor(this);
+ _outGoingConnections[relationship] = newConnection;
+ _logger->log_info("Add connection %s into Processor %s outgoing connection for relationship %s",
+ connection->getName().c_str(), _name.c_str(), relationship.c_str());
+ ret = true;
+ }
+ }
+
+ return ret;
+}
+
+void Processor::removeConnection(Connection *connection)
+{
+ if (isRunning())
+ {
+ _logger->log_info("Can not remove connection while the process %s is running",
+ _name.c_str());
+ return;
+ }
+
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ uuid_t srcUUID;
+ uuid_t destUUID;
+
+ connection->getSourceProcessorUUID(srcUUID);
+ connection->getDestinationProcessorUUID(destUUID);
+
+ if (uuid_compare(_uuid, destUUID) == 0)
+ {
+ // Connection is destination to the current processor
+ if (_incomingConnections.find(connection) != _incomingConnections.end())
+ {
+ _incomingConnections.erase(connection);
+ connection->setDestinationProcessor(NULL);
+ _logger->log_info("Remove connection %s into Processor %s incoming connection",
+ connection->getName().c_str(), _name.c_str());
+ _incomingConnectionsIter = this->_incomingConnections.begin();
+ }
+ }
+
+ if (uuid_compare(_uuid, srcUUID) == 0)
+ {
+ std::string relationship = connection->getRelationship().getName();
+ // Connection is source from the current processor
+ std::map<std::string, std::set<Connection *>>::iterator it =
+ _outGoingConnections.find(relationship);
+ if (it == _outGoingConnections.end())
+ {
+ return;
+ }
+ else
+ {
+ if (_outGoingConnections[relationship].find(connection) != _outGoingConnections[relationship].end())
+ {
+ _outGoingConnections[relationship].erase(connection);
+ connection->setSourceProcessor(NULL);
+ _logger->log_info("Remove connection %s into Processor %s outgoing connection for relationship %s",
+ connection->getName().c_str(), _name.c_str(), relationship.c_str());
+ }
+ }
+ }
+}
+
+Connection *Processor::getNextIncomingConnection()
+{
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ if (_incomingConnections.size() == 0)
+ return NULL;
+
+ if (_incomingConnectionsIter == _incomingConnections.end())
+ _incomingConnectionsIter = _incomingConnections.begin();
+
+ Connection *ret = *_incomingConnectionsIter;
+ _incomingConnectionsIter++;
+
+ if (_incomingConnectionsIter == _incomingConnections.end())
+ _incomingConnectionsIter = _incomingConnections.begin();
+
+ return ret;
+}
+
+bool Processor::flowFilesQueued()
+{
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ if (_incomingConnections.size() == 0)
+ return false;
+
+ for (std::set<Connection *>::iterator it = _incomingConnections.begin(); it != _incomingConnections.end(); ++it)
+ {
+ Connection *connection = *it;
+ if (connection->getQueueSize() > 0)
+ return true;
+ }
+
+ return false;
+}
+
+bool Processor::flowFilesOutGoingFull()
+{
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ std::map<std::string, std::set<Connection *>>::iterator it;
+
+ for (it = _outGoingConnections.begin(); it != _outGoingConnections.end(); ++it)
+ {
+ // We already has connection for this relationship
+ std::set<Connection *> existedConnection = it->second;
+ for (std::set<Connection *>::iterator itConnection = existedConnection.begin(); itConnection != existedConnection.end(); ++itConnection)
+ {
+ Connection *connection = *itConnection;
+ if (connection->isFull())
+ return true;
+ }
+ }
+
+ return false;
+}
+
+void Processor::onTrigger()
+{
+ ProcessContext *context = new ProcessContext(this);
+ ProcessSession *session = new ProcessSession(context);
+ try {
+ // Call the child onTrigger function
+ this->onTrigger(context, session);
+ session->commit();
+ delete session;
+ delete context;
+ }
+ catch (std::exception &exception)
+ {
+ _logger->log_debug("Caught Exception %s", exception.what());
+ session->rollback();
+ delete session;
+ delete context;
+ throw;
+ }
+ catch (...)
+ {
+ _logger->log_debug("Caught Exception Processor::onTrigger");
+ session->rollback();
+ delete session;
+ delete context;
+ throw;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/RealTimeDataCollector.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RealTimeDataCollector.cpp b/libminifi/src/RealTimeDataCollector.cpp
new file mode 100644
index 0000000..c7118ff
--- /dev/null
+++ b/libminifi/src/RealTimeDataCollector.cpp
@@ -0,0 +1,482 @@
+/**
+ * @file RealTimeDataCollector.cpp
+ * RealTimeDataCollector class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <random>
+#include <netinet/tcp.h>
+
+#include "RealTimeDataCollector.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string RealTimeDataCollector::ProcessorName("RealTimeDataCollector");
+Property RealTimeDataCollector::FILENAME("File Name", "File Name for the real time processor to process", "data.osp");
+Property RealTimeDataCollector::REALTIMESERVERNAME("Real Time Server Name", "Real Time Server Name", "localhost");
+Property RealTimeDataCollector::REALTIMESERVERPORT("Real Time Server Port", "Real Time Server Port", "10000");
+Property RealTimeDataCollector::BATCHSERVERNAME("Batch Server Name", "Batch Server Name", "localhost");
+Property RealTimeDataCollector::BATCHSERVERPORT("Batch Server Port", "Batch Server Port", "10001");
+Property RealTimeDataCollector::ITERATION("Iteration",
+ "If true, sample osp file will be iterated", "true");
+Property RealTimeDataCollector::REALTIMEMSGID("Real Time Message ID", "Real Time Message ID", "41");
+Property RealTimeDataCollector::BATCHMSGID("Batch Message ID", "Batch Message ID", "172, 30, 48");
+Property RealTimeDataCollector::REALTIMEINTERVAL("Real Time Interval", "Real Time Data Collection Interval in msec", "10 ms");
+Property RealTimeDataCollector::BATCHINTERVAL("Batch Time Interval", "Batch Processing Interval in msec", "100 ms");
+Property RealTimeDataCollector::BATCHMAXBUFFERSIZE("Batch Max Buffer Size", "Batch Buffer Maximum size in bytes", "262144");
+Relationship RealTimeDataCollector::Success("success", "success operational on the flow record");
+
+void RealTimeDataCollector::initialize()
+{
+ //! Set the supported properties
+ std::set<Property> properties;
+ properties.insert(FILENAME);
+ properties.insert(REALTIMESERVERNAME);
+ properties.insert(REALTIMESERVERPORT);
+ properties.insert(BATCHSERVERNAME);
+ properties.insert(BATCHSERVERPORT);
+ properties.insert(ITERATION);
+ properties.insert(REALTIMEMSGID);
+ properties.insert(BATCHMSGID);
+ properties.insert(REALTIMEINTERVAL);
+ properties.insert(BATCHINTERVAL);
+ properties.insert(BATCHMAXBUFFERSIZE);
+
+ setSupportedProperties(properties);
+ //! Set the supported relationships
+ std::set<Relationship> relationships;
+ relationships.insert(Success);
+ setSupportedRelationships(relationships);
+
+}
+
+int RealTimeDataCollector::connectServer(const char *host, uint16_t port)
+{
+ in_addr_t addr;
+ int sock = 0;
+ struct hostent *h;
+#ifdef __MACH__
+ h = gethostbyname(host);
+#else
+ char buf[1024];
+ struct hostent he;
+ int hh_errno;
+ gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
+#endif
+ memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+ sock = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock < 0)
+ {
+ _logger->log_error("Could not create socket to hostName %s", host);
+ return 0;
+ }
+
+#ifndef __MACH__
+ int opt = 1;
+ bool nagle_off = true;
+
+ if (nagle_off)
+ {
+ if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0)
+ {
+ _logger->log_error("setsockopt() TCP_NODELAY failed");
+ close(sock);
+ return 0;
+ }
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+ (char *)&opt, sizeof(opt)) < 0)
+ {
+ _logger->log_error("setsockopt() SO_REUSEADDR failed");
+ close(sock);
+ return 0;
+ }
+ }
+
+ int sndsize = 256*1024;
+ if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0)
+ {
+ _logger->log_error("setsockopt() SO_SNDBUF failed");
+ close(sock);
+ return 0;
+ }
+#endif
+
+ struct sockaddr_in sa;
+ socklen_t socklen;
+ int status;
+
+ //TODO bind socket to the interface
+ memset(&sa, 0, sizeof(sa));
+ sa.sin_family = AF_INET;
+ sa.sin_addr.s_addr = htonl(INADDR_ANY);
+ sa.sin_port = htons(0);
+ socklen = sizeof(sa);
+ if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
+ {
+ _logger->log_error("socket bind failed");
+ close(sock);
+ return 0;
+ }
+
+ memset(&sa, 0, sizeof(sa));
+ sa.sin_family = AF_INET;
+ sa.sin_addr.s_addr = addr;
+ sa.sin_port = htons(port);
+ socklen = sizeof(sa);
+
+ status = connect(sock, (struct sockaddr *)&sa, socklen);
+
+ if (status < 0)
+ {
+ _logger->log_error("socket connect failed to %s %d", host, port);
+ close(sock);
+ return 0;
+ }
+
+ _logger->log_info("socket %d connect to server %s port %d success", sock, host, port);
+
+ return sock;
+}
+
+int RealTimeDataCollector::sendData(int socket, const char *buf, int buflen)
+{
+ int ret = 0, bytes = 0;
+
+ while (bytes < buflen)
+ {
+ ret = send(socket, buf+bytes, buflen-bytes, 0);
+ //check for errors
+ if (ret == -1)
+ {
+ return ret;
+ }
+ bytes+=ret;
+ }
+
+ if (ret)
+ _logger->log_debug("Send data size %d over socket %d", buflen, socket);
+
+ return ret;
+}
+
+void RealTimeDataCollector::onTriggerRealTime(ProcessContext *context, ProcessSession *session)
+{
+ if (_realTimeAccumulated >= this->_realTimeInterval)
+ {
+ std::string value;
+ if (this->getProperty(REALTIMEMSGID.getName(), value))
+ {
+ this->_realTimeMsgID.clear();
+ this->_logger->log_info("Real Time Msg IDs %s", value.c_str());
+ std::stringstream lineStream(value);
+ std::string cell;
+
+ while(std::getline(lineStream, cell, ','))
+ {
+ this->_realTimeMsgID.push_back(cell);
+ // this->_logger->log_debug("Real Time Msg ID %s", cell.c_str());
+ }
+ }
+ if (this->getProperty(BATCHMSGID.getName(), value))
+ {
+ this->_batchMsgID.clear();
+ this->_logger->log_info("Batch Msg IDs %s", value.c_str());
+ std::stringstream lineStream(value);
+ std::string cell;
+
+ while(std::getline(lineStream, cell, ','))
+ {
+ cell = Property::trim(cell);
+ this->_batchMsgID.push_back(cell);
+ // this->_logger->log_debug("Batch Msg ID %s", cell.c_str());
+ }
+ }
+ // _logger->log_info("onTriggerRealTime");
+ // Open the file
+ if (!this->_fileStream.is_open())
+ {
+ _fileStream.open(this->_fileName.c_str(), std::ifstream::in);
+ if (this->_fileStream.is_open())
+ _logger->log_debug("open %s", _fileName.c_str());
+ }
+ if (!_fileStream.good())
+ {
+ _logger->log_error("load data file failed %s", _fileName.c_str());
+ return;
+ }
+ if (this->_fileStream.is_open())
+ {
+ std::string line;
+
+ while (std::getline(_fileStream, line))
+ {
+ line += "\n";
+ std::stringstream lineStream(line);
+ std::string cell;
+ if (std::getline(lineStream, cell, ','))
+ {
+ cell = Property::trim(cell);
+ // Check whether it match to the batch traffic
+ for (std::vector<std::string>::iterator it = _batchMsgID.begin(); it != _batchMsgID.end(); ++it)
+ {
+ if (cell == *it)
+ {
+ // push the batch data to the queue
+ std::lock_guard<std::mutex> lock(_mtx);
+ while ((_queuedDataSize + line.size()) > _batchMaxBufferSize)
+ {
+ std::string item = _queue.front();
+ _queuedDataSize -= item.size();
+ _logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", item.size(), _queuedDataSize);
+ _queue.pop();
+ }
+ _queue.push(line);
+ _queuedDataSize += line.size();
+ _logger->log_debug("Push batch msg ID %s into batch queue, queue buffer size %d", cell.c_str(), _queuedDataSize);
+ }
+ }
+ bool findRealTime = false;
+ // Check whether it match to the real time traffic
+ for (std::vector<std::string>::iterator it = _realTimeMsgID.begin(); it != _realTimeMsgID.end(); ++it)
+ {
+ if (cell == *it)
+ {
+ int status = 0;
+ if (this->_realTimeSocket <= 0)
+ {
+ // Connect the LTE socket
+ uint16_t port = _realTimeServerPort;
+ this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port);
+ }
+ if (this->_realTimeSocket)
+ {
+ // try to send the data
+ status = sendData(_realTimeSocket, line.data(), line.size());
+ if (status < 0)
+ {
+ close(_realTimeSocket);
+ _realTimeSocket = 0;
+ }
+ }
+ if (this->_realTimeSocket <= 0 || status < 0)
+ {
+ // push the batch data to the queue
+ std::lock_guard<std::mutex> lock(_mtx);
+ while ((_queuedDataSize + line.size()) > _batchMaxBufferSize)
+ {
+ std::string item = _queue.front();
+ _queuedDataSize -= item.size();
+ _logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", item.size(), _queuedDataSize);
+ _queue.pop();
+ }
+ _queue.push(line);
+ _queuedDataSize += line.size();
+ _logger->log_debug("Push real time msg ID %s into batch queue, queue buffer size %d", cell.c_str(), _queuedDataSize);
+ }
+ // find real time
+ findRealTime = true;
+ } // cell
+ } // for real time pattern
+ if (findRealTime)
+ // we break the while once we find the first real time
+ break;
+ } // if get line
+ } // while
+ if (_fileStream.eof())
+ {
+ _fileStream.close();
+ }
+ } // if open
+ _realTimeAccumulated = 0;
+ }
+ _realTimeAccumulated += context->getProcessor()->getSchedulingPeriodNano();
+}
+
+void RealTimeDataCollector::onTriggerBatch(ProcessContext *context, ProcessSession *session)
+{
+ if (_batchAcccumulated >= this->_batchInterval)
+ {
+ // _logger->log_info("onTriggerBatch");
+ // dequeue the batch and send over WIFI
+ int status = 0;
+ if (this->_batchSocket <= 0)
+ {
+ // Connect the WIFI socket
+ uint16_t port = _batchServerPort;
+ this->_batchSocket = connectServer(_batchServerName.c_str(), port);
+ }
+ if (this->_batchSocket)
+ {
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ while (!_queue.empty())
+ {
+ std::string line = _queue.front();
+ status = sendData(_batchSocket, line.data(), line.size());
+ _queue.pop();
+ _queuedDataSize -= line.size();
+ if (status < 0)
+ {
+ close(_batchSocket);
+ _batchSocket = 0;
+ break;
+ }
+ }
+ }
+ _batchAcccumulated = 0;
+ }
+ _batchAcccumulated += context->getProcessor()->getSchedulingPeriodNano();
+}
+
+void RealTimeDataCollector::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+ std::thread::id id = std::this_thread::get_id();
+
+ if (id == _realTimeThreadId)
+ return onTriggerRealTime(context, session);
+ else if (id == _batchThreadId)
+ return onTriggerBatch(context, session);
+ else
+ {
+ std::lock_guard<std::mutex> lock(_mtx);
+ if (!this->_firstInvoking)
+ {
+ this->_fileName = "data.osp";
+ std::string value;
+ if (this->getProperty(FILENAME.getName(), value))
+ {
+ this->_fileName = value;
+ this->_logger->log_info("Data Collector File Name %s", _fileName.c_str());
+ }
+ this->_realTimeServerName = "localhost";
+ if (this->getProperty(REALTIMESERVERNAME.getName(), value))
+ {
+ this->_realTimeServerName = value;
+ this->_logger->log_info("Real Time Server Name %s", this->_realTimeServerName.c_str());
+ }
+ this->_realTimeServerPort = 10000;
+ if (this->getProperty(REALTIMESERVERPORT.getName(), value))
+ {
+ Property::StringToInt(value, _realTimeServerPort);
+ this->_logger->log_info("Real Time Server Port %d", _realTimeServerPort);
+ }
+ if (this->getProperty(BATCHSERVERNAME.getName(), value))
+ {
+ this->_batchServerName = value;
+ this->_logger->log_info("Batch Server Name %s", this->_batchServerName.c_str());
+ }
+ this->_batchServerPort = 10001;
+ if (this->getProperty(BATCHSERVERPORT.getName(), value))
+ {
+ Property::StringToInt(value, _batchServerPort);
+ this->_logger->log_info("Batch Server Port %d", _batchServerPort);
+ }
+ if (this->getProperty(ITERATION.getName(), value))
+ {
+ Property::StringToBool(value, this->_iteration);
+ _logger->log_info("Iteration %d", _iteration);
+ }
+ this->_realTimeInterval = 10000000; //10 msec
+ if (this->getProperty(REALTIMEINTERVAL.getName(), value))
+ {
+ TimeUnit unit;
+ if (Property::StringToTime(value, _realTimeInterval, unit) &&
+ Property::ConvertTimeUnitToNS(_realTimeInterval, unit, _realTimeInterval))
+ {
+ _logger->log_info("Real Time Interval: [%d] ns", _realTimeInterval);
+ }
+ }
+ this->_batchInterval = 100000000; //100 msec
+ if (this->getProperty(BATCHINTERVAL.getName(), value))
+ {
+ TimeUnit unit;
+ if (Property::StringToTime(value, _batchInterval, unit) &&
+ Property::ConvertTimeUnitToNS(_batchInterval, unit, _batchInterval))
+ {
+ _logger->log_info("Batch Time Interval: [%d] ns", _batchInterval);
+ }
+ }
+ this->_batchMaxBufferSize = 256*1024;
+ if (this->getProperty(BATCHMAXBUFFERSIZE.getName(), value))
+ {
+ Property::StringToInt(value, _batchMaxBufferSize);
+ this->_logger->log_info("Batch Max Buffer Size %d", _batchMaxBufferSize);
+ }
+ if (this->getProperty(REALTIMEMSGID.getName(), value))
+ {
+ this->_logger->log_info("Real Time Msg IDs %s", value.c_str());
+ std::stringstream lineStream(value);
+ std::string cell;
+
+ while(std::getline(lineStream, cell, ','))
+ {
+ this->_realTimeMsgID.push_back(cell);
+ this->_logger->log_info("Real Time Msg ID %s", cell.c_str());
+ }
+ }
+ if (this->getProperty(BATCHMSGID.getName(), value))
+ {
+ this->_logger->log_info("Batch Msg IDs %s", value.c_str());
+ std::stringstream lineStream(value);
+ std::string cell;
+
+ while(std::getline(lineStream, cell, ','))
+ {
+ cell = Property::trim(cell);
+ this->_batchMsgID.push_back(cell);
+ this->_logger->log_info("Batch Msg ID %s", cell.c_str());
+ }
+ }
+ // Connect the LTE socket
+ uint16_t port = _realTimeServerPort;
+
+ this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port);
+
+ // Connect the WIFI socket
+ port = _batchServerPort;
+
+ this->_batchSocket = connectServer(_batchServerName.c_str(), port);
+
+ // Open the file
+ _fileStream.open(this->_fileName.c_str(), std::ifstream::in);
+ if (!_fileStream.good())
+ {
+ _logger->log_error("load data file failed %s", _fileName.c_str());
+ return;
+ }
+ else
+ {
+ _logger->log_debug("open %s", _fileName.c_str());
+ }
+ _realTimeThreadId = id;
+ this->_firstInvoking = true;
+ }
+ else
+ {
+ if (id != _realTimeThreadId)
+ _batchThreadId = id;
+ this->_firstInvoking = false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
new file mode 100644
index 0000000..9d849ae
--- /dev/null
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -0,0 +1,100 @@
+/**
+ * @file RemoteProcessorGroupPort.cpp
+ * RemoteProcessorGroupPort class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <sstream>
+#include <string.h>
+#include <iostream>
+
+#include "TimeUtil.h"
+#include "RemoteProcessorGroupPort.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string RemoteProcessorGroupPort::ProcessorName("RemoteProcessorGroupPort");
+Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name.", "localhost");
+Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999");
+Relationship RemoteProcessorGroupPort::relation;
+
+void RemoteProcessorGroupPort::initialize()
+{
+ //! Set the supported properties
+ std::set<Property> properties;
+ properties.insert(hostName);
+ properties.insert(port);
+ setSupportedProperties(properties);
+ //! Set the supported relationships
+ std::set<Relationship> relationships;
+ relationships.insert(relation);
+ setSupportedRelationships(relationships);
+}
+
+void RemoteProcessorGroupPort::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+ std::string value;
+
+ if (!_transmitting)
+ return;
+
+ std::string host = _peer->getHostName();
+ uint16_t sport = _peer->getPort();
+ int64_t lvalue;
+ bool needReset = false;
+
+ if (context->getProperty(hostName.getName(), value))
+ {
+ host = value;
+ }
+ if (context->getProperty(port.getName(), value) && Property::StringToInt(value, lvalue))
+ {
+ sport = (uint16_t) lvalue;
+ }
+ if (host != _peer->getHostName())
+ {
+ _peer->setHostName(host);
+ needReset= true;
+ }
+ if (sport != _peer->getPort())
+ {
+ _peer->setPort(sport);
+ needReset = true;
+ }
+ if (needReset)
+ _protocol->tearDown();
+
+ if (!_protocol->bootstrap())
+ {
+ // bootstrap the client protocol if needeed
+ context->yield();
+ _logger->log_error("Site2Site bootstrap failed yield period %d peer timeout %d", context->getProcessor()->getYieldPeriodMsec(), _protocol->getPeer()->getTimeOut());
+ return;
+ }
+
+ if (_direction == RECEIVE)
+ _protocol->receiveFlowFiles(context, session);
+ else
+ _protocol->transferFlowFiles(context, session);
+
+ return;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/ResourceClaim.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp
new file mode 100644
index 0000000..3c22ac9
--- /dev/null
+++ b/libminifi/src/ResourceClaim.cpp
@@ -0,0 +1,45 @@
+/**
+ * @file ResourceClaim.cpp
+ * ResourceClaim class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+
+#include "ResourceClaim.h"
+
+std::atomic<uint64_t> ResourceClaim::_localResourceClaimNumber(0);
+
+ResourceClaim::ResourceClaim(const std::string contentDirectory)
+: _id(_localResourceClaimNumber.load()),
+ _flowFileRecordOwnedCount(0)
+{
+ char uuidStr[37];
+
+ // Generate the global UUID for the resource claim
+ uuid_generate(_uuid);
+ // Increase the local ID for the resource claim
+ ++_localResourceClaimNumber;
+ uuid_unparse(_uuid, uuidStr);
+ // Create the full content path for the content
+ _contentFullPath = contentDirectory + "/" + uuidStr;
+
+ _configure = Configure::getConfigure();
+ _logger = Logger::getLogger();
+ _logger->log_debug("Resource Claim created %s", _contentFullPath.c_str());
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
new file mode 100644
index 0000000..211c328
--- /dev/null
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -0,0 +1,86 @@
+/**
+ * @file SchedulingAgent.cpp
+ * SchedulingAgent class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <chrono>
+#include <thread>
+#include <iostream>
+#include "Exception.h"
+#include "SchedulingAgent.h"
+
+bool SchedulingAgent::hasWorkToDo(Processor *processor)
+{
+ // Whether it has work to do
+ if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections() ||
+ processor->flowFilesQueued())
+ return true;
+ else
+ return false;
+}
+
+bool SchedulingAgent::hasTooMuchOutGoing(Processor *processor)
+{
+ return processor->flowFilesOutGoingFull();
+}
+
+bool SchedulingAgent::onTrigger(Processor *processor)
+{
+ if (processor->isYield())
+ return false;
+
+ // No need to yield, reset yield expiration to 0
+ processor->clearYield();
+
+ if (!hasWorkToDo(processor))
+ // No work to do, yield
+ return true;
+
+ if(hasTooMuchOutGoing(processor))
+ // need to apply backpressure
+ return true;
+
+ //TODO runDuration
+
+ processor->incrementActiveTasks();
+ try
+ {
+ processor->onTrigger();
+ processor->decrementActiveTask();
+ }
+ catch (Exception &exception)
+ {
+ // Normal exception
+ _logger->log_debug("Caught Exception %s", exception.what());
+ processor->decrementActiveTask();
+ }
+ catch (std::exception &exception)
+ {
+ _logger->log_debug("Caught Exception %s", exception.what());
+ processor->yield(_administrativeYieldDuration);
+ processor->decrementActiveTask();
+ }
+ catch (...)
+ {
+ _logger->log_debug("Caught Exception during SchedulingAgent::onTrigger");
+ processor->yield(_administrativeYieldDuration);
+ processor->decrementActiveTask();
+ }
+
+ return false;
+}
+