You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/07/21 18:45:09 UTC
[50/52] [abbrv] [partial] nifi-minifi-cpp git commit: MINIFI-6: More
infra works
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/FlowControlProtocol.cpp
----------------------------------------------------------------------
diff --git a/src/FlowControlProtocol.cpp b/src/FlowControlProtocol.cpp
new file mode 100644
index 0000000..6aaa969
--- /dev/null
+++ b/src/FlowControlProtocol.cpp
@@ -0,0 +1,540 @@
+/**
+ * @file FlowControlProtocol.cpp
+ * FlowControlProtocol class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <sys/time.h>
+#include <stdio.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <random>
+#include <netinet/tcp.h>
+#include <iostream>
+#include "FlowController.h"
+#include "FlowControlProtocol.h"
+
+int FlowControlProtocol::connectServer(const char *host, uint16_t port)
+{
+ in_addr_t addr;
+ int sock = 0;
+ struct hostent *h;
+#ifdef __MACH__
+ h = gethostbyname(host);
+#else
+ char buf[1024];
+ struct hostent he;
+ int hh_errno;
+ gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
+#endif
+ memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+ sock = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock < 0)
+ {
+ _logger->log_error("Could not create socket to hostName %s", host);
+ return 0;
+ }
+
+#ifndef __MACH__
+ int opt = 1;
+ bool nagle_off = true;
+
+ if (nagle_off)
+ {
+ if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0)
+ {
+ _logger->log_error("setsockopt() TCP_NODELAY failed");
+ close(sock);
+ return 0;
+ }
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+ (char *)&opt, sizeof(opt)) < 0)
+ {
+ _logger->log_error("setsockopt() SO_REUSEADDR failed");
+ close(sock);
+ return 0;
+ }
+ }
+
+ int sndsize = 256*1024;
+ if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0)
+ {
+ _logger->log_error("setsockopt() SO_SNDBUF failed");
+ close(sock);
+ return 0;
+ }
+#endif
+
+ struct sockaddr_in sa;
+ socklen_t socklen;
+ int status;
+
+ memset(&sa, 0, sizeof(sa));
+ sa.sin_family = AF_INET;
+ sa.sin_addr.s_addr = htonl(INADDR_ANY);
+ sa.sin_port = htons(0);
+ socklen = sizeof(sa);
+ if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
+ {
+ _logger->log_error("socket bind failed");
+ close(sock);
+ return 0;
+ }
+
+ memset(&sa, 0, sizeof(sa));
+ sa.sin_family = AF_INET;
+ sa.sin_addr.s_addr = addr;
+ sa.sin_port = htons(port);
+ socklen = sizeof(sa);
+
+ status = connect(sock, (struct sockaddr *)&sa, socklen);
+
+ if (status < 0)
+ {
+ _logger->log_error("socket connect failed to %s %d", host, port);
+ close(sock);
+ return 0;
+ }
+
+ _logger->log_info("Flow Control Protocol socket %d connect to server %s port %d success", sock, host, port);
+
+ return sock;
+}
+
+int FlowControlProtocol::sendData(uint8_t *buf, int buflen)
+{
+ int ret = 0, bytes = 0;
+
+ while (bytes < buflen)
+ {
+ ret = send(_socket, buf+bytes, buflen-bytes, 0);
+ //check for errors
+ if (ret == -1)
+ {
+ return ret;
+ }
+ bytes+=ret;
+ }
+
+ return bytes;
+}
+
+int FlowControlProtocol::selectClient(int msec)
+{
+ fd_set fds;
+ struct timeval tv;
+ int retval;
+ int fd = _socket;
+
+ FD_ZERO(&fds);
+ FD_SET(fd, &fds);
+
+ tv.tv_sec = msec/1000;
+ tv.tv_usec = (msec % 1000) * 1000;
+
+ if (msec > 0)
+ retval = select(fd+1, &fds, NULL, NULL, &tv);
+ else
+ retval = select(fd+1, &fds, NULL, NULL, NULL);
+
+ if (retval <= 0)
+ return retval;
+ if (FD_ISSET(fd, &fds))
+ return retval;
+ else
+ return 0;
+}
+
+int FlowControlProtocol::readData(uint8_t *buf, int buflen)
+{
+ int sendSize = buflen;
+
+ while (buflen)
+ {
+ int status;
+ status = selectClient(MAX_READ_TIMEOUT);
+ if (status <= 0)
+ {
+ return status;
+ }
+#ifndef __MACH__
+ status = read(_socket, buf, buflen);
+#else
+ status = recv(_socket, buf, buflen, 0);
+#endif
+ if (status <= 0)
+ {
+ return status;
+ }
+ buflen -= status;
+ buf += status;
+ }
+
+ return sendSize;
+}
+
+int FlowControlProtocol::readHdr(FlowControlProtocolHeader *hdr)
+{
+ uint8_t buffer[sizeof(FlowControlProtocolHeader)];
+
+ uint8_t *data = buffer;
+
+ int status = readData(buffer, sizeof(FlowControlProtocolHeader));
+ if (status <= 0)
+ return status;
+
+ uint32_t value;
+ data = this->decode(data, value);
+ hdr->msgType = value;
+
+ data = this->decode(data, value);
+ hdr->seqNumber = value;
+
+ data = this->decode(data, value);
+ hdr->status = value;
+
+ data = this->decode(data, value);
+ hdr->payloadLen = value;
+
+ return sizeof(FlowControlProtocolHeader);
+}
+
+void FlowControlProtocol::start()
+{
+ if (_running)
+ return;
+ _running = true;
+ _logger->log_info("FlowControl Protocol Start");
+ _thread = new std::thread(run, this);
+ _thread->detach();
+}
+
+void FlowControlProtocol::stop()
+{
+ if (!_running)
+ return;
+ _running = false;
+ _logger->log_info("FlowControl Protocol Stop");
+ delete _thread;
+}
+
+void FlowControlProtocol::run(FlowControlProtocol *protocol)
+{
+ while (protocol->_running)
+ {
+ std::this_thread::sleep_for(std::chrono::milliseconds(protocol->_reportInterval));
+ if (!protocol->_registered)
+ {
+ // if it is not register yet
+ protocol->sendRegisterReq();
+ // protocol->_controller->reload("flow.xml");
+ }
+ else
+ protocol->sendReportReq();
+ }
+ return;
+}
+
+int FlowControlProtocol::sendRegisterReq()
+{
+ if (_registered)
+ {
+ _logger->log_info("Already registered");
+ return -1;
+ }
+
+ uint16_t port = this->_serverPort;
+
+ if (this->_socket <= 0)
+ this->_socket = connectServer(_serverName.c_str(), port);
+
+ if (this->_socket <= 0)
+ return -1;
+
+ // Calculate the total payload msg size
+ uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 0) +
+ FlowControlMsgIDEncodingLen(FLOW_XML_NAME, this->_controller->getName().size()+1);
+ uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
+
+ uint8_t *data = new uint8_t[size];
+ uint8_t *start = data;
+
+ // encode the HDR
+ FlowControlProtocolHeader hdr;
+ hdr.msgType = REGISTER_REQ;
+ hdr.payloadLen = payloadSize;
+ hdr.seqNumber = this->_seqNumber;
+ hdr.status = RESP_SUCCESS;
+ data = this->encode(data, hdr.msgType);
+ data = this->encode(data, hdr.seqNumber);
+ data = this->encode(data, hdr.status);
+ data = this->encode(data, hdr.payloadLen);
+
+ // encode the serial number
+ data = this->encode(data, FLOW_SERIAL_NUMBER);
+ data = this->encode(data, this->_serialNumber, 8);
+
+ // encode the XML name
+ data = this->encode(data, FLOW_XML_NAME);
+ data = this->encode(data, this->_controller->getName());
+
+ // send it
+ int status = sendData(start, size);
+ delete[] start;
+ if (status <= 0)
+ {
+ close(_socket);
+ _socket = 0;
+ _logger->log_error("Flow Control Protocol Send Register Req failed");
+ return -1;
+ }
+
+ // Looking for register respond
+ status = readHdr(&hdr);
+
+ if (status <= 0)
+ {
+ close(_socket);
+ _socket = 0;
+ _logger->log_error("Flow Control Protocol Read Register Resp header failed");
+ return -1;
+ }
+ _logger->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+ _logger->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber);
+ _logger->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+ _logger->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen);
+
+ if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber)
+ {
+ this->_registered = true;
+ this->_seqNumber++;
+ _logger->log_info("Flow Control Protocol Register success");
+ uint8_t *payload = new uint8_t[hdr.payloadLen];
+ uint8_t *payloadPtr = payload;
+ status = readData(payload, hdr.payloadLen);
+ if (status <= 0)
+ {
+ delete[] payload;
+ _logger->log_info("Flow Control Protocol Register Read Payload fail");
+ close(_socket);
+ _socket = 0;
+ return -1;
+ }
+ while (payloadPtr < (payload + hdr.payloadLen))
+ {
+ uint32_t msgID;
+ payloadPtr = this->decode(payloadPtr, msgID);
+ if (((FlowControlMsgID) msgID) == REPORT_INTERVAL)
+ {
+ // Fixed 4 bytes
+ uint32_t reportInterval;
+ payloadPtr = this->decode(payloadPtr, reportInterval);
+ _logger->log_info("Flow Control Protocol receive report interval %d ms", reportInterval);
+ this->_reportInterval = reportInterval;
+ }
+ else if (((FlowControlMsgID) msgID) == FLOW_XML_CONTENT)
+ {
+ uint32_t xmlLen;
+ payloadPtr = this->decode(payloadPtr, xmlLen);
+ _logger->log_info("Flow Control Protocol receive XML content length %d", xmlLen);
+ time_t rawtime;
+ struct tm *timeinfo;
+ time(&rawtime);
+ timeinfo = localtime(&rawtime);
+ std::string xmlFileName = "flow.";
+ xmlFileName += asctime(timeinfo);
+ xmlFileName += ".xml";
+ std::ofstream fs;
+ fs.open(xmlFileName.c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
+ if (fs.is_open())
+ {
+ fs.write((const char *)payloadPtr, xmlLen);
+ fs.close();
+ this->_controller->reload(xmlFileName.c_str());
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+ delete[] payload;
+ close(_socket);
+ _socket = 0;
+ return 0;
+ }
+ else
+ {
+ _logger->log_info("Flow Control Protocol Register fail");
+ close(_socket);
+ _socket = 0;
+ return -1;
+ }
+}
+
+
+int FlowControlProtocol::sendReportReq()
+{
+ uint16_t port = this->_serverPort;
+
+ if (this->_socket <= 0)
+ this->_socket = connectServer(_serverName.c_str(), port);
+
+ if (this->_socket <= 0)
+ return -1;
+
+ // Calculate the total payload msg size
+ uint32_t payloadSize =
+ FlowControlMsgIDEncodingLen(FLOW_XML_NAME, this->_controller->getName().size()+1);
+ uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
+
+ uint8_t *data = new uint8_t[size];
+ uint8_t *start = data;
+
+ // encode the HDR
+ FlowControlProtocolHeader hdr;
+ hdr.msgType = REPORT_REQ;
+ hdr.payloadLen = payloadSize;
+ hdr.seqNumber = this->_seqNumber;
+ hdr.status = RESP_SUCCESS;
+ data = this->encode(data, hdr.msgType);
+ data = this->encode(data, hdr.seqNumber);
+ data = this->encode(data, hdr.status);
+ data = this->encode(data, hdr.payloadLen);
+
+ // encode the XML name
+ data = this->encode(data, FLOW_XML_NAME);
+ data = this->encode(data, this->_controller->getName());
+
+ // send it
+ int status = sendData(start, size);
+ delete[] start;
+ if (status <= 0)
+ {
+ close(_socket);
+ _socket = 0;
+ _logger->log_error("Flow Control Protocol Send Report Req failed");
+ return -1;
+ }
+
+ // Looking for report respond
+ status = readHdr(&hdr);
+
+ if (status <= 0)
+ {
+ close(_socket);
+ _socket = 0;
+ _logger->log_error("Flow Control Protocol Read Report Resp header failed");
+ return -1;
+ }
+ _logger->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+ _logger->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber);
+ _logger->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+ _logger->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen);
+
+ if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber)
+ {
+ this->_seqNumber++;
+ uint8_t *payload = new uint8_t[hdr.payloadLen];
+ uint8_t *payloadPtr = payload;
+ status = readData(payload, hdr.payloadLen);
+ if (status <= 0)
+ {
+ delete[] payload;
+ _logger->log_info("Flow Control Protocol Report Resp Read Payload fail");
+ close(_socket);
+ _socket = 0;
+ return -1;
+ }
+ std::string processor;
+ std::string propertyName;
+ std::string propertyValue;
+ while (payloadPtr < (payload + hdr.payloadLen))
+ {
+ uint32_t msgID;
+ payloadPtr = this->decode(payloadPtr, msgID);
+ if (((FlowControlMsgID) msgID) == PROCESSOR_NAME)
+ {
+ uint32_t len;
+ payloadPtr = this->decode(payloadPtr, len);
+ processor = (const char *) payloadPtr;
+ payloadPtr += len;
+ _logger->log_info("Flow Control Protocol receive report resp processor %s", processor.c_str());
+ }
+ else if (((FlowControlMsgID) msgID) == PROPERTY_NAME)
+ {
+ uint32_t len;
+ payloadPtr = this->decode(payloadPtr, len);
+ propertyName = (const char *) payloadPtr;
+ payloadPtr += len;
+ _logger->log_info("Flow Control Protocol receive report resp property name %s", propertyName.c_str());
+ }
+ else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE)
+ {
+ uint32_t len;
+ payloadPtr = this->decode(payloadPtr, len);
+ propertyValue = (const char *) payloadPtr;
+ payloadPtr += len;
+ _logger->log_info("Flow Control Protocol receive report resp property value %s", propertyValue.c_str());
+ this->_controller->updatePropertyValue(processor, propertyName, propertyValue);
+ }
+ else
+ {
+ break;
+ }
+ }
+ delete[] payload;
+ close(_socket);
+ _socket = 0;
+ return 0;
+ }
+ else if (hdr.status == RESP_TRIGGER_REGISTER && hdr.seqNumber == this->_seqNumber)
+ {
+ _logger->log_info("Flow Control Protocol trigger reregister");
+ this->_registered = false;
+ this->_seqNumber++;
+ close(_socket);
+ _socket = 0;
+ return 0;
+ }
+ else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber)
+ {
+ _logger->log_info("Flow Control Protocol stop flow controller");
+ this->_controller->stop(true);
+ this->_seqNumber++;
+ close(_socket);
+ _socket = 0;
+ return 0;
+ }
+ else if (hdr.status == RESP_START_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber)
+ {
+ _logger->log_info("Flow Control Protocol start flow controller");
+ this->_controller->start();
+ this->_seqNumber++;
+ close(_socket);
+ _socket = 0;
+ return 0;
+ }
+ else
+ {
+ _logger->log_info("Flow Control Protocol Report fail");
+ close(_socket);
+ _socket = 0;
+ return -1;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/src/FlowController.cpp b/src/FlowController.cpp
index 0525ccc..a2cafbc 100644
--- a/src/FlowController.cpp
+++ b/src/FlowController.cpp
@@ -44,6 +44,7 @@ FlowController::FlowController(std::string name)
_initialized = false;
_root = NULL;
_logger = Logger::getLogger();
+ _protocol = new FlowControlProtocol(this);
// NiFi config properties
_configure = Configure::getConfigure();
@@ -58,6 +59,7 @@ FlowController::~FlowController()
{
stop(true);
unload();
+ delete _protocol;
}
bool FlowController::isRunning()
@@ -75,6 +77,11 @@ void FlowController::stop(bool force)
if (_running)
{
_logger->log_info("Stop Flow Controller");
+ this->_timerScheduler.stop();
+ // Wait for sometime for thread stop
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ if (this->_root)
+ this->_root->stopProcessing(&this->_timerScheduler);
_running = false;
}
}
@@ -88,12 +95,36 @@ void FlowController::unload()
if (_initialized)
{
_logger->log_info("Unload Flow Controller");
+ if (_root)
+ delete _root;
+ _root = NULL;
_initialized = false;
+ _name = "";
}
return;
}
+void FlowController::reload(std::string xmlFile)
+{
+ _logger->log_info("Starting to reload Flow Controller with xml %s", xmlFile.c_str());
+ stop(true);
+ unload();
+ std::string oldxmlFile = this->_xmlFileName;
+ this->_xmlFileName = xmlFile;
+ load();
+ start();
+ if (!this->_root)
+ {
+ this->_xmlFileName = oldxmlFile;
+ _logger->log_info("Rollback Flow Controller to xml %s", oldxmlFile.c_str());
+ stop(true);
+ unload();
+ load();
+ start();
+ }
+}
+
Processor *FlowController::createProcessor(std::string name, uuid_t uuid)
{
Processor *processor = NULL;
@@ -291,6 +322,7 @@ void FlowController::parseRootProcessGroup(xmlDoc *doc, xmlNode *node)
}
// Set the root process group
this->_root = group;
+ this->_name = name;
xmlFree(name);
}
}
@@ -504,6 +536,20 @@ void FlowController::parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, Pro
xmlFree(temp);
}
}
+ else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0)
+ {
+ char *temp = (char *) xmlNodeGetContent(currentNode);
+ if (temp)
+ {
+ int64_t maxConcurrentTasks;
+ if (Property::StringToInt(temp, maxConcurrentTasks))
+ {
+ _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
+ processor->setMaxConcurrentTasks(maxConcurrentTasks);
+ }
+ xmlFree(temp);
+ }
+ }
else if (xmlStrcmp(currentNode->name, BAD_CAST "runDurationNanos") == 0)
{
char *temp = (char *) xmlNodeGetContent(currentNode);
@@ -548,12 +594,13 @@ void FlowController::load()
}
if (!_initialized)
{
- _logger->log_info("Load Flow Controller");
+ _logger->log_info("Load Flow Controller from file %s", _xmlFileName.c_str());
xmlDoc *doc = xmlReadFile(_xmlFileName.c_str(), NULL, XML_PARSE_NONET);
if (doc == NULL)
{
_logger->log_error("xmlReadFile returned NULL when reading [%s]", _xmlFileName.c_str());
+ _initialized = true;
return;
}
@@ -637,6 +684,7 @@ bool FlowController::start()
if (this->_root)
this->_root->startProcessing(&this->_timerScheduler);
_running = true;
+ this->_protocol->start();
}
return true;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/src/FlowFileRecord.cpp b/src/FlowFileRecord.cpp
index 601dcde..2dda47a 100644
--- a/src/FlowFileRecord.cpp
+++ b/src/FlowFileRecord.cpp
@@ -22,6 +22,9 @@
#include <map>
#include <sys/time.h>
#include <time.h>
+#include <iostream>
+#include <fstream>
+#include <cstdio>
#include "FlowFileRecord.h"
#include "Relationship.h"
@@ -83,6 +86,7 @@ FlowFileRecord::~FlowFileRecord()
if (_claim->getFlowFileRecordOwnedCount() == 0)
{
_logger->log_debug("Delete Resource Claim %s", _claim->getContentFullPath().c_str());
+ std::remove(_claim->getContentFullPath().c_str());
delete _claim;
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/src/LogAttribute.cpp b/src/LogAttribute.cpp
index f4548af..67ed74e 100644
--- a/src/LogAttribute.cpp
+++ b/src/LogAttribute.cpp
@@ -25,6 +25,7 @@
#include <time.h>
#include <sstream>
#include <string.h>
+#include <iostream>
#include "TimeUtil.h"
#include "LogAttribute.h"
@@ -103,6 +104,24 @@ void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session)
{
message << "\n" << "Content Claim:" << claim->getContentFullPath();
}
+ if (logPayload && flow->getSize() <= 1024*1024)
+ {
+ message << "\n" << "Payload:" << "\n";
+ ReadCallback callback(flow->getSize());
+ session->read(flow, &callback);
+ for (int i = 0, j = 0; i < callback._readSize; i++)
+ {
+ char temp[8];
+ sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i]));
+ message << temp;
+ j++;
+ if (j == 16)
+ {
+ message << '\n';
+ j = 0;
+ }
+ }
+ }
message << "\n" << dashLine << std::ends;
std::string output = message.str();
@@ -127,6 +146,12 @@ void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session)
break;
}
+ // Test Import
+ FlowFileRecord *importRecord = session->create();
+ session->import(claim->getContentFullPath(), importRecord);
+ session->transfer(importRecord, Success);
+
+
// Transfer to the relationship
session->transfer(flow, Success);
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/src/ProcessGroup.cpp b/src/ProcessGroup.cpp
index 78ae1a4..a5fd773 100644
--- a/src/ProcessGroup.cpp
+++ b/src/ProcessGroup.cpp
@@ -46,7 +46,24 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid,
ProcessGroup::~ProcessGroup()
{
+ for (std::set<Connection *>::iterator it = _connections.begin(); it != _connections.end(); ++it)
+ {
+ Connection *connection = *it;
+ connection->drain();
+ delete connection;
+ }
+
+ for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+ {
+ ProcessGroup *processGroup(*it);
+ delete processGroup;
+ }
+ for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
+ {
+ Processor *processor(*it);
+ delete processor;
+ }
}
bool ProcessGroup::isRootProcessGroup()
@@ -198,6 +215,28 @@ Processor *ProcessGroup::findProcessor(uuid_t uuid)
return ret;
}
+void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue)
+{
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
+ {
+ Processor *processor(*it);
+ if (processor->getName() == processorName)
+ {
+ processor->setProperty(propertyName, propertyValue);
+ }
+ }
+
+ for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+ {
+ ProcessGroup *processGroup(*it);
+ processGroup->updatePropertyValue(processorName, propertyName, propertyValue);
+ }
+
+ return;
+}
+
void ProcessGroup::addConnection(Connection *connection)
{
std::lock_guard<std::mutex> lock(_mtx);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/src/ProcessSession.cpp b/src/ProcessSession.cpp
index cf8249d..32d8920 100644
--- a/src/ProcessSession.cpp
+++ b/src/ProcessSession.cpp
@@ -207,8 +207,9 @@ void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback)
}
flow->_claim = claim;
claim->increaseFlowFileRecordOwnedCount();
+ /*
_logger->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
- flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str());
+ flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
fs.close();
}
else
@@ -273,8 +274,9 @@ void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback *callback
{
uint64_t appendSize = fs.tellp() - oldPos;
flow->_size += appendSize;
+ /*
_logger->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
- flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str());
+ flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
fs.close();
}
else
@@ -321,8 +323,9 @@ void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback)
if (fs.good())
{
callback->process(&fs);
+ /*
_logger->log_debug("Read offset %d size %d content %s for FlowFile UUID %s",
- flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str());
+ flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
fs.close();
}
else
@@ -348,6 +351,94 @@ void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback)
}
}
+void ProcessSession::import(std::string source, FlowFileRecord *flow)
+{
+ ResourceClaim *claim = NULL;
+
+ claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
+ char *buf = NULL;
+ int size = 4096;
+ buf = new char [size];
+
+ try
+ {
+ std::ofstream fs;
+ fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
+ std::ifstream input;
+ input.open(source.c_str(), std::fstream::in | std::fstream::binary);
+
+ if (fs.is_open() && input.is_open())
+ {
+ // Open the source file and stream to the flow file
+ while (input.good())
+ {
+ input.read(buf, size);
+ if (input)
+ fs.write(buf, size);
+ else
+ fs.write(buf, input.gcount());
+ }
+
+ if (fs.good() && fs.tellp() >= 0)
+ {
+ flow->_size = fs.tellp();
+ flow->_offset = 0;
+ if (flow->_claim)
+ {
+ // Remove the old claim
+ flow->_claim->decreaseFlowFileRecordOwnedCount();
+ flow->_claim = NULL;
+ }
+ flow->_claim = claim;
+ claim->increaseFlowFileRecordOwnedCount();
+ /*
+ _logger->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s",
+ flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+ fs.close();
+ input.close();
+ }
+ else
+ {
+ fs.close();
+ input.close();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+ }
+ }
+ else
+ {
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+ }
+
+ delete[] buf;
+ }
+ catch (std::exception &exception)
+ {
+ if (flow && flow->_claim == claim)
+ {
+ flow->_claim->decreaseFlowFileRecordOwnedCount();
+ flow->_claim = NULL;
+ }
+ if (claim)
+ delete claim;
+ _logger->log_debug("Caught Exception %s", exception.what());
+ delete[] buf;
+ throw;
+ }
+ catch (...)
+ {
+ if (flow && flow->_claim == claim)
+ {
+ flow->_claim->decreaseFlowFileRecordOwnedCount();
+ flow->_claim = NULL;
+ }
+ if (claim)
+ delete claim;
+ _logger->log_debug("Caught Exception during process session write");
+ delete[] buf;
+ throw;
+ }
+}
+
void ProcessSession::commit()
{
try
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/Processor.cpp
----------------------------------------------------------------------
diff --git a/src/Processor.cpp b/src/Processor.cpp
index 5c695af..d5e1a6a 100644
--- a/src/Processor.cpp
+++ b/src/Processor.cpp
@@ -40,7 +40,7 @@ Processor::Processor(std::string name, uuid_t uuid)
uuid_copy(_uuid, uuid);
char uuidStr[37];
- uuid_parse(uuidStr, _uuid);
+ uuid_unparse(_uuid, uuidStr);
_uuidStr = uuidStr;
// Setup the default values
@@ -58,7 +58,7 @@ Processor::Processor(std::string name, uuid_t uuid)
_incomingConnectionsIter = this->_incomingConnections.begin();
_logger = Logger::getLogger();
- _logger->log_info("Processor %s created", _name.c_str());
+ _logger->log_info("Processor %s created UUID %s", _name.c_str(), _uuidStr.c_str());
}
Processor::~Processor()
@@ -208,13 +208,6 @@ bool Processor::getProperty(std::string name, std::string &value)
bool Processor::setProperty(std::string name, std::string value)
{
- if (isRunning())
- {
- _logger->log_info("Can not set processor property while the process %s is running",
- _name.c_str());
- return false;
- }
-
std::lock_guard<std::mutex> lock(_mtx);
std::map<std::string, Property>::iterator it = _properties.find(name);
if (it != _properties.end())
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/src/SchedulingAgent.cpp b/src/SchedulingAgent.cpp
index 45f5d10..211c328 100644
--- a/src/SchedulingAgent.cpp
+++ b/src/SchedulingAgent.cpp
@@ -60,6 +60,7 @@ bool SchedulingAgent::onTrigger(Processor *processor)
try
{
processor->onTrigger();
+ processor->decrementActiveTask();
}
catch (Exception &exception)
{
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/src/TimerDrivenSchedulingAgent.cpp b/src/TimerDrivenSchedulingAgent.cpp
index a5f8f3c..3ce57ae 100644
--- a/src/TimerDrivenSchedulingAgent.cpp
+++ b/src/TimerDrivenSchedulingAgent.cpp
@@ -74,7 +74,7 @@ void TimerDrivenSchedulingAgent::schedule(Processor *processor)
_logger->log_info("Scheduled Time Driven thread %d running for process %s", thread->get_id(),
processor->getName().c_str());
}
- _threads[processor->getName().c_str()] = threads;
+ _threads[processor->getUUIDStr().c_str()] = threads;
return;
}
@@ -105,6 +105,7 @@ void TimerDrivenSchedulingAgent::unschedule(Processor *processor)
delete thread;
}
_threads.erase(processor->getUUIDStr());
+ processor->clearActiveTask();
return;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/target/conf/nifi.properties
----------------------------------------------------------------------
diff --git a/target/conf/nifi.properties b/target/conf/nifi.properties
index c4f7dff..b1902b1 100644
--- a/target/conf/nifi.properties
+++ b/target/conf/nifi.properties
@@ -183,3 +183,9 @@ nifi.cluster.manager.safemode.duration=0 sec
# kerberos #
nifi.kerberos.krb5.file=
+
+# Server
+nifi.server.name=localhost
+nifi.server.port=9000
+nifi.server.report.interval=1000 ms
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/test/Server.cpp
----------------------------------------------------------------------
diff --git a/test/Server.cpp b/test/Server.cpp
new file mode 100644
index 0000000..e7b3452
--- /dev/null
+++ b/test/Server.cpp
@@ -0,0 +1,607 @@
+/* A simple server in the internet domain using TCP
+ The port number is passed as an argument */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <errno.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <string>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <iostream> // std::cout
+#include <fstream> // std::ifstream
+#include <signal.h>
+
+#define DEFAULT_NIFI_SERVER_PORT 9000
+#define DEFAULT_REPORT_INTERVAL 1000 // 1 sec
+#define MAX_READ_TIMEOUT 30000 // 30 seconds
+
+//! FlowControl Protocol Msg Type
+typedef enum {
+ REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version
+ REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval
+ REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info
+ REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property
+ MAX_FLOW_CONTROL_MSG_TYPE
+} FlowControlMsgType;
+
+//! FlowControl Protocol Msg Type String
+static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] =
+{
+ "REGISTER_REQ",
+ "REGISTER_RESP",
+ "REPORT_REQ",
+ "REPORT_RESP"
+};
+
+//! Flow Control Msg Type to String
+inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type)
+{
+ if (type < MAX_FLOW_CONTROL_MSG_TYPE)
+ return FlowControlMsgTypeStr[type];
+ else
+ return NULL;
+}
+
+//! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV)
+typedef enum {
+ //Fix length 8 bytes: client to server in register request, required field
+ FLOW_SERIAL_NUMBER,
+ // Flow XML name TLV: client to server in register request and report request, required field
+ FLOW_XML_NAME,
+ // Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server
+ FLOW_XML_CONTENT,
+ // Fix length, 4 bytes Report interval in msec: server to client in register respond, option field
+ REPORT_INTERVAL,
+ // Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
+ PROCESSOR_NAME,
+ // Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
+ PROPERTY_NAME,
+ // Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property
+ PROPERTY_VALUE,
+ // Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server
+ REPORT_BLOB,
+ MAX_FLOW_MSG_ID
+} FlowControlMsgID;
+
+//! FlowControl Protocol Msg ID String
+static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] =
+{
+ "FLOW_SERIAL_NUMBER",
+ "FLOW_XML_NAME",
+ "FLOW_XML_CONTENT",
+ "REPORT_INTERVAL",
+ "PROCESSOR_NAME"
+ "PROPERTY_NAME",
+ "PROPERTY_VALUE",
+ "REPORT_BLOB"
+};
+
+#define TYPE_HDR_LEN 4 // Fix Hdr Type
+#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes
+
+//! FlowControl Protocol Msg Len
+inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen)
+{
+ if (id == FLOW_SERIAL_NUMBER)
+ return (TYPE_HDR_LEN + 8);
+ else if (id == REPORT_INTERVAL)
+ return (TYPE_HDR_LEN + 4);
+ else if (id < MAX_FLOW_MSG_ID)
+ return (TLV_HDR_LEN + payLoadLen);
+ else
+ return -1;
+}
+
+//! Flow Control Msg Id to String
+inline const char *FlowControlMsgIdToStr(FlowControlMsgID id)
+{
+ if (id < MAX_FLOW_MSG_ID)
+ return FlowControlMsgIDStr[id];
+ else
+ return NULL;
+}
+
+//! Flow Control Respond status code
+typedef enum {
+ RESP_SUCCESS,
+ RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register
+ RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller
+ RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller
+ RESP_FAILURE,
+ MAX_RESP_CODE
+} FlowControlRespCode;
+
+//! FlowControl Resp Code str
+static const char *FlowControlRespCodeStr[MAX_RESP_CODE] =
+{
+ "RESP_SUCCESS",
+ "RESP_TRIGGER_REGISTER",
+ "RESP_START_FLOW_CONTROLLER",
+ "RESP_STOP_FLOW_CONTROLLER",
+ "RESP_FAILURE"
+};
+
+//! Flow Control Resp Code to String
+inline const char *FlowControlRespCodeToStr(FlowControlRespCode code)
+{
+ if (code < MAX_RESP_CODE)
+ return FlowControlRespCodeStr[code];
+ else
+ return NULL;
+}
+
+//! Common FlowControlProtocol Header
+typedef struct {
+ uint32_t msgType; //! Msg Type
+ uint32_t seqNumber; //! Seq Number to match Req with Resp
+ uint32_t status; //! Resp Code, see FlowControlRespCode
+ uint32_t payloadLen; //! Msg Payload length
+} FlowControlProtocolHeader;
+
+
+//! encode uint32_t
+uint8_t *encode(uint8_t *buf, uint32_t value)
+{
+ *buf++ = (value & 0xFF000000) >> 24;
+ *buf++ = (value & 0x00FF0000) >> 16;
+ *buf++ = (value & 0x0000FF00) >> 8;
+ *buf++ = (value & 0x000000FF);
+ return buf;
+}
+
+//! encode uint32_t
+uint8_t *decode(uint8_t *buf, uint32_t &value)
+{
+ value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3]));
+ return (buf + 4);
+}
+
+//! encode byte array
+uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size)
+{
+ memcpy(buf, bufArray, size);
+ buf += size;
+ return buf;
+}
+
+//! encode std::string
+uint8_t *encode(uint8_t *buf, std::string value)
+{
+ // add the \0 for size
+ buf = encode(buf, value.size()+1);
+ buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1);
+ return buf;
+}
+
+int sendData(int socket, uint8_t *buf, int buflen)
+{
+ int ret = 0, bytes = 0;
+
+ while (bytes < buflen)
+ {
+ ret = send(socket, buf+bytes, buflen-bytes, 0);
+ //check for errors
+ if (ret == -1)
+ {
+ return ret;
+ }
+ bytes+=ret;
+ }
+
+ return bytes;
+}
+
+void error(const char *msg)
+{
+ perror(msg);
+ exit(1);
+}
+
+/* readline - read a '\n' terminated line from socket fd
+ into buffer bufptr of size len. The line in the
+ buffer is terminated with '\0'.
+ It returns -1 in case of error or if
+ the capacity of the buffer is exceeded.
+ It returns 0 if EOF is encountered before reading '\n'.
+ */
+int readline( int fd, char *bufptr, size_t len )
+{
+ /* Note that this function is very tricky. It uses the
+ static variables bp, cnt, and b to establish a local buffer.
+ The recv call requests large chunks of data (the size of the buffer).
+ Then if the recv call reads more than one line, the overflow
+ remains in the buffer and it is made available to the next call
+ to readline.
+ Notice also that this routine reads up to '\n' and overwrites
+ it with '\0'. Thus if the line is really terminated with
+ "\r\n", the '\r' will remain unchanged.
+ */
+ char *bufx = bufptr;
+ static char *bp;
+ static int cnt = 0;
+ static char b[ 4096 ];
+ char c;
+
+ while ( --len > 0 )
+ {
+ if ( --cnt <= 0 )
+ {
+ cnt = recv( fd, b, sizeof( b ), 0 );
+ if ( cnt < 0 )
+ {
+ if ( errno == EINTR )
+ {
+ len++; /* the while will decrement */
+ continue;
+ }
+ return -1;
+ }
+ if ( cnt == 0 )
+ return 0;
+ bp = b;
+ }
+ c = *bp++;
+ *bufptr++ = c;
+ if ( c == '\n' )
+ {
+ *bufptr = '\0';
+ return bufptr - bufx;
+ }
+ }
+ return -1;
+}
+
+int readData(int socket, uint8_t *buf, int buflen)
+{
+ int sendSize = buflen;
+ int status;
+
+ while (buflen)
+ {
+#ifndef __MACH__
+ status = read(socket, buf, buflen);
+#else
+ status = recv(socket, buf, buflen, 0);
+#endif
+ if (status <= 0)
+ {
+ return status;
+ }
+ buflen -= status;
+ buf += status;
+ }
+
+ return sendSize;
+}
+
+int readHdr(int socket, FlowControlProtocolHeader *hdr)
+{
+ uint8_t buffer[sizeof(FlowControlProtocolHeader)];
+
+ uint8_t *data = buffer;
+
+ int status = readData(socket, buffer, sizeof(FlowControlProtocolHeader));
+ if (status <= 0)
+ return status;
+
+ uint32_t value;
+ data = decode(data, value);
+ hdr->msgType = value;
+
+ data = decode(data, value);
+ hdr->seqNumber = value;
+
+ data = decode(data, value);
+ hdr->status = value;
+
+ data = decode(data, value);
+ hdr->payloadLen = value;
+
+ return sizeof(FlowControlProtocolHeader);
+}
+
+int readXML(char **xmlContent)
+{
+ std::ifstream is ("conf/flowServer.xml", std::ifstream::binary);
+ if (is) {
+ // get length of file:
+ is.seekg (0, is.end);
+ int length = is.tellg();
+ is.seekg (0, is.beg);
+
+ char * buffer = new char [length];
+
+ printf("Reading %s len %d\n", "conf/flowServer.xml", length);
+ // read data as a block:
+ is.read (buffer,length);
+
+ is.close();
+
+ // ...buffer contains the entire file...
+ *xmlContent = buffer;
+
+ return length;
+ }
+ return 0;
+}
+
+static int sockfd = 0, newsockfd = 0;
+void sigHandler(int signal)
+{
+ if (signal == SIGINT || signal == SIGTERM)
+ {
+ close(newsockfd);
+ close(sockfd);
+ exit(1);
+ }
+}
+
+int main(int argc, char *argv[])
+{
+ int portno;
+ socklen_t clilen;
+ struct sockaddr_in serv_addr, cli_addr;
+ char buffer[4096];
+ int flag = 0;
+ int number = 0;
+
+ int n;
+ if (argc < 2) {
+ fprintf(stderr,"ERROR, no port provided\n");
+ exit(1);
+ }
+
+ if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR)
+ {
+
+ return -1;
+ }
+ sockfd = socket(AF_INET, SOCK_STREAM, 0);
+ if (sockfd < 0)
+ error("ERROR opening socket");
+ bzero((char *) &serv_addr, sizeof(serv_addr));
+ portno = atoi(argv[1]);
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_addr.s_addr = INADDR_ANY;
+ serv_addr.sin_port = htons(portno);
+ if (bind(sockfd, (struct sockaddr *) &serv_addr,
+ sizeof(serv_addr)) < 0)
+ error("ERROR on binding");
+ listen(sockfd,5);
+ if (portno == DEFAULT_NIFI_SERVER_PORT)
+ {
+ while (true)
+ {
+ clilen = sizeof(cli_addr);
+ newsockfd = accept(sockfd,
+ (struct sockaddr *) &cli_addr,
+ &clilen);
+ if (newsockfd < 0)
+ {
+ error("ERROR on accept");
+ break;
+ }
+ // process request
+ FlowControlProtocolHeader hdr;
+ int status = readHdr(newsockfd, &hdr);
+ if (status > 0)
+ {
+ printf("Flow Control Protocol receive MsgType %s\n", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+ printf("Flow Control Protocol receive Seq Num %d\n", hdr.seqNumber);
+ printf("Flow Control Protocol receive Resp Code %s\n", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+ printf("Flow Control Protocol receive Payload len %d\n", hdr.payloadLen);
+ if (((FlowControlMsgType) hdr.msgType) == REGISTER_REQ)
+ {
+ printf("Flow Control Protocol Register Req receive\n");
+ uint8_t *payload = new uint8_t[hdr.payloadLen];
+ uint8_t *payloadPtr = payload;
+ status = readData(newsockfd, payload, hdr.payloadLen);
+ while (status > 0 && payloadPtr < (payload + hdr.payloadLen))
+ {
+ uint32_t msgID = 0xFFFFFFFF;
+ payloadPtr = decode(payloadPtr, msgID);
+ if (((FlowControlMsgID) msgID) == FLOW_SERIAL_NUMBER)
+ {
+ // Fixed 8 bytes
+ uint8_t seqNum[8];
+ memcpy(seqNum, payloadPtr, 8);
+ printf("Flow Control Protocol Register Req receive serial num\n");
+ payloadPtr += 8;
+ }
+ else if (((FlowControlMsgID) msgID) == FLOW_XML_NAME)
+ {
+ uint32_t len;
+ payloadPtr = decode(payloadPtr, len);
+ printf("Flow Control Protocol receive XML name length %d\n", len);
+ std::string flowName = (const char *) payloadPtr;
+ payloadPtr += len;
+ printf("Flow Control Protocol receive XML name %s\n", flowName.c_str());
+ }
+ else
+ {
+ break;
+ }
+ }
+ delete[] payload;
+ // Send Register Respond
+ // Calculate the total payload msg size
+ char *xmlContent;
+ uint32_t xmlLen = readXML(&xmlContent);
+ uint32_t payloadSize = FlowControlMsgIDEncodingLen(REPORT_INTERVAL, 0);
+ if (xmlLen > 0)
+ payloadSize += FlowControlMsgIDEncodingLen(FLOW_XML_CONTENT, xmlLen);
+
+ uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
+ uint8_t *data = new uint8_t[size];
+ uint8_t *start = data;
+
+ // encode the HDR
+ hdr.msgType = REGISTER_RESP;
+ hdr.payloadLen = payloadSize;
+ hdr.status = RESP_SUCCESS;
+ data = encode(data, hdr.msgType);
+ data = encode(data, hdr.seqNumber);
+ data = encode(data, hdr.status);
+ data = encode(data, hdr.payloadLen);
+
+ // encode the report interval
+ data = encode(data, REPORT_INTERVAL);
+ data = encode(data, DEFAULT_REPORT_INTERVAL);
+
+ // encode the XML content
+ if (xmlLen > 0)
+ {
+ data = encode(data, FLOW_XML_CONTENT);
+ data = encode(data, xmlLen);
+ data = encode(data, (uint8_t *) xmlContent, xmlLen);
+ delete[] xmlContent;
+ }
+
+ // send it
+ status = sendData(newsockfd, start, size);
+ delete[] start;
+ }
+ else if (((FlowControlMsgType) hdr.msgType) == REPORT_REQ)
+ {
+ printf("Flow Control Protocol Report Req receive\n");
+ uint8_t *payload = new uint8_t[hdr.payloadLen];
+ uint8_t *payloadPtr = payload;
+ status = readData(newsockfd, payload, hdr.payloadLen);
+ while (status > 0 && payloadPtr < (payload + hdr.payloadLen))
+ {
+ uint32_t msgID = 0xFFFFFFFF;
+ payloadPtr = decode(payloadPtr, msgID);
+ if (((FlowControlMsgID) msgID) == FLOW_XML_NAME)
+ {
+ uint32_t len;
+ payloadPtr = decode(payloadPtr, len);
+ printf("Flow Control Protocol receive XML name length %d\n", len);
+ std::string flowName = (const char *) payloadPtr;
+ payloadPtr += len;
+ printf("Flow Control Protocol receive XML name %s\n", flowName.c_str());
+ }
+ else
+ {
+ break;
+ }
+ }
+ delete[] payload;
+ // Send Register Respond
+ // Calculate the total payload msg size
+ std::string processor = "RealTimeDataCollector";
+ std::string propertyName1 = "real Time Message ID";
+ std::string propertyValue1 = "41";
+ std::string propertyName2 = "Batch Message ID";
+ std::string propertyValue2 = "172,30,48";
+ if (flag == 0)
+ {
+ propertyName1 = "Real Time Message ID";
+ propertyValue1 = "41";
+ propertyName2 = "Batch Message ID";
+ propertyValue2 = "172,48";
+ flag = 1;
+ }
+ else if (flag == 1)
+ {
+ propertyName1 = "Real Time Message ID";
+ propertyValue1 = "172,48";
+ propertyName2 = "Batch Message ID";
+ propertyValue2 = "41";
+ flag = 0;
+ }
+ uint32_t payloadSize = FlowControlMsgIDEncodingLen(PROCESSOR_NAME, processor.size()+1);
+ payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName1.size()+1);
+ payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue1.size()+1);
+ payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName2.size()+1);
+ payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue2.size()+1);
+
+ uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
+ uint8_t *data = new uint8_t[size];
+ uint8_t *start = data;
+
+ // encode the HDR
+ hdr.msgType = REPORT_RESP;
+ hdr.payloadLen = payloadSize;
+ hdr.status = RESP_SUCCESS;
+
+ if (number >= 10 && number < 20)
+ {
+ // After 10 second report, stop the flow controller for 10 second
+ hdr.status = RESP_STOP_FLOW_CONTROLLER;
+ }
+ else if (number == 20)
+ {
+ // restart the flow controller after 10 second
+ hdr.status = RESP_START_FLOW_CONTROLLER;
+ }
+ else if (number == 30)
+ {
+ // retrigger register
+ hdr.status = RESP_TRIGGER_REGISTER;
+ number = 0;
+ }
+
+ number++;
+
+ data = encode(data, hdr.msgType);
+ data = encode(data, hdr.seqNumber);
+ data = encode(data, hdr.status);
+ data = encode(data, hdr.payloadLen);
+
+ // encode the processorName
+ data = encode(data, PROCESSOR_NAME);
+ data = encode(data, processor);
+
+ // encode the propertyName and value TLV
+ data = encode(data, PROPERTY_NAME);
+ data = encode(data, propertyName1);
+ data = encode(data, PROPERTY_VALUE);
+ data = encode(data, propertyValue1);
+ data = encode(data, PROPERTY_NAME);
+ data = encode(data, propertyName2);
+ data = encode(data, PROPERTY_VALUE);
+ data = encode(data, propertyValue2);
+ // send it
+ status = sendData(newsockfd, start, size);
+ delete[] start;
+ }
+ }
+ close(newsockfd);
+ }
+ close(sockfd);
+ }
+ else
+ {
+ clilen = sizeof(cli_addr);
+ newsockfd = accept(sockfd,
+ (struct sockaddr *) &cli_addr,
+ &clilen);
+ if (newsockfd < 0)
+ error("ERROR on accept");
+ while (1)
+ {
+ bzero(buffer,4096);
+ n = readline(newsockfd,buffer,4095);
+ if (n <= 0 )
+ {
+ close(newsockfd);
+ newsockfd = accept(sockfd,
+ (struct sockaddr *) &cli_addr,
+ &clilen);
+ continue;
+ }
+ printf("%s",buffer);
+ }
+ close(newsockfd);
+ close(sockfd);
+ }
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/thirdparty/libxml2/AUTHORS
----------------------------------------------------------------------
diff --git a/thirdparty/libxml2/AUTHORS b/thirdparty/libxml2/AUTHORS
new file mode 100644
index 0000000..cf2e9a6
--- /dev/null
+++ b/thirdparty/libxml2/AUTHORS
@@ -0,0 +1,5 @@
+Daniel Veillard <da...@veillard.com>
+Bjorn Reese <br...@users.sourceforge.net>
+William Brack <wb...@mmm.com.hk>
+Igor Zlatkovic <ig...@zlatkovic.com> for the Windows port
+Aleksey Sanin <al...@aleksey.com>