You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/06/06 16:33:06 UTC
[3/9] nifi-minifi-cpp git commit: MINIFI-331: Apply formatter with
increased line length to source
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/InvokeHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/InvokeHTTP.cpp b/libminifi/src/processors/InvokeHTTP.cpp
index 295560f..fd39a64 100644
--- a/libminifi/src/processors/InvokeHTTP.cpp
+++ b/libminifi/src/processors/InvokeHTTP.cpp
@@ -51,73 +51,37 @@ namespace processors {
const char *InvokeHTTP::ProcessorName = "InvokeHTTP";
-core::Property InvokeHTTP::Method(
- "HTTP Method",
- "HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS). "
- "Arbitrary methods are also supported. Methods other than POST, PUT and PATCH will be sent without a message body.",
- "GET");
-core::Property InvokeHTTP::URL(
- "Remote URL",
- "Remote URL which will be connected to, including scheme, host, port, path.",
- "");
-core::Property InvokeHTTP::ConnectTimeout(
- "Connection Timeout", "Max wait time for connection to remote service.",
- "5 secs");
-core::Property InvokeHTTP::ReadTimeout(
- "Read Timeout", "Max wait time for response from remote service.",
- "15 secs");
-core::Property InvokeHTTP::DateHeader(
- "Include Date Header", "Include an RFC-2616 Date header in the request.",
- "True");
-core::Property InvokeHTTP::FollowRedirects(
- "Follow Redirects", "Follow HTTP redirects issued by remote server.",
- "True");
-core::Property InvokeHTTP::AttributesToSend(
- "Attributes to Send",
- "Regular expression that defines which attributes to send as HTTP"
- " headers in the request. If not defined, no attributes are sent as headers.",
- "");
-core::Property InvokeHTTP::SSLContext(
- "SSL Context Service",
- "The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.",
- "");
-core::Property InvokeHTTP::ProxyHost(
- "Proxy Host",
- "The fully qualified hostname or IP address of the proxy server", "");
-core::Property InvokeHTTP::ProxyPort("Proxy Port",
- "The port of the proxy server", "");
-core::Property InvokeHTTP::ProxyUser(
- "invokehttp-proxy-user",
- "Username to set when authenticating against proxy", "");
-core::Property InvokeHTTP::ProxyPassword(
- "invokehttp-proxy-password",
- "Password to set when authenticating against proxy", "");
-core::Property InvokeHTTP::ContentType(
- "Content-type",
- "The Content-Type to specify for when content is being transmitted through a PUT, "
- "POST or PATCH. In the case of an empty value after evaluating an expression language expression, "
- "Content-Type defaults to",
- "application/octet-stream");
-core::Property InvokeHTTP::SendBody(
- "send-message-body",
- "If true, sends the HTTP message body on POST/PUT/PATCH requests (default). "
- "If false, suppresses the message body and content-type header for these requests.",
- "true");
-
-core::Property InvokeHTTP::PropPutOutputAttributes(
- "Put Response Body in Attribute",
- "If set, the response body received back will be put into an attribute of the original "
- "FlowFile instead of a separate FlowFile. The attribute key to put to is determined by evaluating value of this property. ",
- "");
-core::Property InvokeHTTP::AlwaysOutputResponse(
- "Always Output Response",
- "Will force a response FlowFile to be generated and routed to the 'Response' relationship "
- "regardless of what the server status code received is ",
- "false");
-core::Property InvokeHTTP::PenalizeOnNoRetry(
- "Penalize on \"No Retry\"",
- "Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.",
- "false");
+core::Property InvokeHTTP::Method("HTTP Method", "HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS). "
+ "Arbitrary methods are also supported. Methods other than POST, PUT and PATCH will be sent without a message body.",
+ "GET");
+core::Property InvokeHTTP::URL("Remote URL", "Remote URL which will be connected to, including scheme, host, port, path.", "");
+core::Property InvokeHTTP::ConnectTimeout("Connection Timeout", "Max wait time for connection to remote service.", "5 secs");
+core::Property InvokeHTTP::ReadTimeout("Read Timeout", "Max wait time for response from remote service.", "15 secs");
+core::Property InvokeHTTP::DateHeader("Include Date Header", "Include an RFC-2616 Date header in the request.", "True");
+core::Property InvokeHTTP::FollowRedirects("Follow Redirects", "Follow HTTP redirects issued by remote server.", "True");
+core::Property InvokeHTTP::AttributesToSend("Attributes to Send", "Regular expression that defines which attributes to send as HTTP"
+ " headers in the request. If not defined, no attributes are sent as headers.",
+ "");
+core::Property InvokeHTTP::SSLContext("SSL Context Service", "The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.", "");
+core::Property InvokeHTTP::ProxyHost("Proxy Host", "The fully qualified hostname or IP address of the proxy server", "");
+core::Property InvokeHTTP::ProxyPort("Proxy Port", "The port of the proxy server", "");
+core::Property InvokeHTTP::ProxyUser("invokehttp-proxy-user", "Username to set when authenticating against proxy", "");
+core::Property InvokeHTTP::ProxyPassword("invokehttp-proxy-password", "Password to set when authenticating against proxy", "");
+core::Property InvokeHTTP::ContentType("Content-type", "The Content-Type to specify for when content is being transmitted through a PUT, "
+ "POST or PATCH. In the case of an empty value after evaluating an expression language expression, "
+ "Content-Type defaults to",
+ "application/octet-stream");
+core::Property InvokeHTTP::SendBody("send-message-body", "If true, sends the HTTP message body on POST/PUT/PATCH requests (default). "
+ "If false, suppresses the message body and content-type header for these requests.",
+ "true");
+
+core::Property InvokeHTTP::PropPutOutputAttributes("Put Response Body in Attribute", "If set, the response body received back will be put into an attribute of the original "
+ "FlowFile instead of a separate FlowFile. The attribute key to put to is determined by evaluating value of this property. ",
+ "");
+core::Property InvokeHTTP::AlwaysOutputResponse("Always Output Response", "Will force a response FlowFile to be generated and routed to the 'Response' relationship "
+ "regardless of what the server status code received is ",
+ "false");
+core::Property InvokeHTTP::PenalizeOnNoRetry("Penalize on \"No Retry\"", "Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.", "false");
const char* InvokeHTTP::STATUS_CODE = "invokehttp.status.code";
const char* InvokeHTTP::STATUS_MESSAGE = "invokehttp.status.message";
@@ -128,31 +92,22 @@ const char* InvokeHTTP::REMOTE_DN = "invokehttp.remote.dn";
const char* InvokeHTTP::EXCEPTION_CLASS = "invokehttp.java.exception.class";
const char* InvokeHTTP::EXCEPTION_MESSAGE = "invokehttp.java.exception.message";
-core::Relationship InvokeHTTP::Success("success",
- "All files are routed to success");
+core::Relationship InvokeHTTP::Success("success", "All files are routed to success");
-core::Relationship InvokeHTTP::RelResponse("response",
- "Represents a response flowfile");
+core::Relationship InvokeHTTP::RelResponse("response", "Represents a response flowfile");
-core::Relationship InvokeHTTP::RelRetry(
- "retry",
- "The original FlowFile will be routed on any status code that can be retried "
- "(5xx status codes). It will have new attributes detailing the request.");
+core::Relationship InvokeHTTP::RelRetry("retry", "The original FlowFile will be routed on any status code that can be retried "
+ "(5xx status codes). It will have new attributes detailing the request.");
-core::Relationship InvokeHTTP::RelNoRetry(
- "no retry",
- "The original FlowFile will be routed on any status code that should NOT "
- "be retried (1xx, 3xx, 4xx status codes). It will have new attributes detailing the request.");
+core::Relationship InvokeHTTP::RelNoRetry("no retry", "The original FlowFile will be routed on any status code that should NOT "
+ "be retried (1xx, 3xx, 4xx status codes). It will have new attributes detailing the request.");
-core::Relationship InvokeHTTP::RelFailure(
- "failure",
- "The original FlowFile will be routed on any type of connection failure, "
- "timeout or general exception. It will have new attributes detailing the request.");
+core::Relationship InvokeHTTP::RelFailure("failure", "The original FlowFile will be routed on any type of connection failure, "
+ "timeout or general exception. It will have new attributes detailing the request.");
void InvokeHTTP::set_request_method(CURL *curl, const std::string &method) {
std::string my_method = method;
- std::transform(my_method.begin(), my_method.end(), my_method.begin(),
- ::toupper);
+ std::transform(my_method.begin(), my_method.end(), my_method.begin(), ::toupper);
if (my_method == "POST") {
curl_easy_setopt(curl, CURLOPT_POST, 1);
} else if (my_method == "PUT") {
@@ -190,19 +145,14 @@ void InvokeHTTP::initialize() {
setSupportedRelationships(relationships);
}
-void InvokeHTTP::onSchedule(core::ProcessContext *context,
- core::ProcessSessionFactory *sessionFactory) {
+void InvokeHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
if (!context->getProperty(Method.getName(), method_)) {
- logger_->log_info(
- "%s attribute is missing, so default value of %s will be used",
- Method.getName().c_str(), Method.getValue().c_str());
+ logger_->log_info("%s attribute is missing, so default value of %s will be used", Method.getName().c_str(), Method.getValue().c_str());
return;
}
if (!context->getProperty(URL.getName(), url_)) {
- logger_->log_info(
- "%s attribute is missing, so default value of %s will be used",
- URL.getName().c_str(), URL.getValue().c_str());
+ logger_->log_info("%s attribute is missing, so default value of %s will be used", URL.getName().c_str(), URL.getValue().c_str());
return;
}
@@ -213,9 +163,7 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context,
// set the timeout in curl options.
} else {
- logger_->log_info(
- "%s attribute is missing, so default value of %s will be used",
- ConnectTimeout.getName().c_str(), ConnectTimeout.getValue().c_str());
+ logger_->log_info("%s attribute is missing, so default value of %s will be used", ConnectTimeout.getName().c_str(), ConnectTimeout.getValue().c_str());
return;
}
@@ -224,67 +172,43 @@ void InvokeHTTP::onSchedule(core::ProcessContext *context,
core::Property::StringToInt(timeoutStr, read_timeout_);
} else {
- logger_->log_info(
- "%s attribute is missing, so default value of %s will be used",
- ReadTimeout.getName().c_str(), ReadTimeout.getValue().c_str());
+ logger_->log_info("%s attribute is missing, so default value of %s will be used", ReadTimeout.getName().c_str(), ReadTimeout.getValue().c_str());
}
std::string dateHeaderStr;
if (!context->getProperty(DateHeader.getName(), dateHeaderStr)) {
- logger_->log_info(
- "%s attribute is missing, so default value of %s will be used",
- DateHeader.getName().c_str(), DateHeader.getValue().c_str());
+ logger_->log_info("%s attribute is missing, so default value of %s will be used", DateHeader.getName().c_str(), DateHeader.getValue().c_str());
}
- date_header_include_ = utils::StringUtils::StringToBool(dateHeaderStr,
- date_header_include_);
+ date_header_include_ = utils::StringUtils::StringToBool(dateHeaderStr, date_header_include_);
- if (!context->getProperty(PropPutOutputAttributes.getName(),
- put_attribute_name_)) {
- logger_->log_info(
- "%s attribute is missing, so default value of %s will be used",
- PropPutOutputAttributes.getName().c_str(),
- PropPutOutputAttributes.getValue().c_str());
+ if (!context->getProperty(PropPutOutputAttributes.getName(), put_attribute_name_)) {
+ logger_->log_info("%s attribute is missing, so default value of %s will be used", PropPutOutputAttributes.getName().c_str(), PropPutOutputAttributes.getValue().c_str());
}
- if (!context->getProperty(AttributesToSend.getName(),
- attribute_to_send_regex_)) {
- logger_->log_info(
- "%s attribute is missing, so default value of %s will be used",
- AttributesToSend.getName().c_str(),
- AttributesToSend.getValue().c_str());
+ if (!context->getProperty(AttributesToSend.getName(), attribute_to_send_regex_)) {
+ logger_->log_info("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName().c_str(), AttributesToSend.getValue().c_str());
}
std::string always_output_response = "false";
- if (!context->getProperty(AlwaysOutputResponse.getName(),
- always_output_response)) {
- logger_->log_info(
- "%s attribute is missing, so default value of %s will be used",
- AttributesToSend.getName().c_str(),
- AttributesToSend.getValue().c_str());
+ if (!context->getProperty(AlwaysOutputResponse.getName(), always_output_response)) {
+ logger_->log_info("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName().c_str(), AttributesToSend.getValue().c_str());
}
- utils::StringUtils::StringToBool(always_output_response,
- always_output_response_);
+ utils::StringUtils::StringToBool(always_output_response, always_output_response_);
std::string penalize_no_retry = "false";
if (!context->getProperty(PenalizeOnNoRetry.getName(), penalize_no_retry)) {
- logger_->log_info(
- "%s attribute is missing, so default value of %s will be used",
- AttributesToSend.getName().c_str(),
- AttributesToSend.getValue().c_str());
+ logger_->log_info("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName().c_str(), AttributesToSend.getValue().c_str());
}
utils::StringUtils::StringToBool(penalize_no_retry, penalize_no_retry_);
std::string context_name;
- if (context->getProperty(SSLContext.getName(), context_name)
- && !IsNullOrEmpty(context_name)) {
- std::shared_ptr<core::controller::ControllerService> service = context
- ->getControllerService(context_name);
+ if (context->getProperty(SSLContext.getName(), context_name) && !IsNullOrEmpty(context_name)) {
+ std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(context_name);
if (nullptr != service) {
- ssl_context_service_ = std::static_pointer_cast<
- minifi::controllers::SSLContextService>(service);
+ ssl_context_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
}
}
}
@@ -293,8 +217,7 @@ InvokeHTTP::~InvokeHTTP() {
curl_global_cleanup();
}
-inline bool InvokeHTTP::matches(const std::string &value,
- const std::string &sregex) {
+inline bool InvokeHTTP::matches(const std::string &value, const std::string &sregex) {
if (sregex == ".*")
return true;
@@ -322,9 +245,7 @@ bool InvokeHTTP::emitFlowFile(const std::string &method) {
return ("POST" == method || "PUT" == method || "PATCH" == method);
}
-struct curl_slist *InvokeHTTP::build_header_list(
- CURL *curl, std::string regex,
- const std::map<std::string, std::string> &attributes) {
+struct curl_slist *InvokeHTTP::build_header_list(CURL *curl, std::string regex, const std::map<std::string, std::string> &attributes) {
struct curl_slist *list = NULL;
if (curl) {
for (auto attribute : attributes) {
@@ -345,8 +266,7 @@ bool InvokeHTTP::isSecure(const std::string &url) {
}
CURLcode InvokeHTTP::configure_ssl_context(CURL *curl, void *ctx, void *param) {
- minifi::controllers::SSLContextService *ssl_context_service =
- static_cast<minifi::controllers::SSLContextService*>(param);
+ minifi::controllers::SSLContextService *ssl_context_service = static_cast<minifi::controllers::SSLContextService*>(param);
if (!ssl_context_service->configure_ssl_context(static_cast<SSL_CTX*>(ctx))) {
return CURLE_FAILED_INIT;
}
@@ -354,26 +274,20 @@ CURLcode InvokeHTTP::configure_ssl_context(CURL *curl, void *ctx, void *param) {
}
void InvokeHTTP::configure_secure_connection(CURL *http_session) {
- logger_->log_debug("InvokeHTTP -- Using certificate file %s",
- ssl_context_service_->getCertificateFile());
+ logger_->log_debug("InvokeHTTP -- Using certificate file %s", ssl_context_service_->getCertificateFile());
curl_easy_setopt(http_session, CURLOPT_VERBOSE, 1L);
- curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION,
- &InvokeHTTP::configure_ssl_context);
- curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA,
- static_cast<void*>(ssl_context_service_.get()));
+ curl_easy_setopt(http_session, CURLOPT_SSL_CTX_FUNCTION, &InvokeHTTP::configure_ssl_context);
+ curl_easy_setopt(http_session, CURLOPT_SSL_CTX_DATA, static_cast<void*>(ssl_context_service_.get()));
}
-void InvokeHTTP::onTrigger(core::ProcessContext *context,
- core::ProcessSession *session) {
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
- FlowFileRecord>(session->get());
+void InvokeHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
logger_->log_info("onTrigger InvokeHTTP with %s", method_.c_str());
if (flowFile == nullptr) {
if (!emitFlowFile(method_)) {
- logger_->log_info("InvokeHTTP -- create flow file with %s",
- method_.c_str());
+ logger_->log_info("InvokeHTTP -- create flow file with %s", method_.c_str());
flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
} else {
logger_->log_info("exiting because method is %s", method_.c_str());
@@ -402,11 +316,9 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context,
curl_easy_setopt(http_session, CURLOPT_TIMEOUT, read_timeout_);
}
HTTPRequestResponse content;
- curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION,
- &HTTPRequestResponse::recieve_write);
+ curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, &HTTPRequestResponse::recieve_write);
- curl_easy_setopt(http_session, CURLOPT_WRITEDATA,
- static_cast<void*>(&content));
+ curl_easy_setopt(http_session, CURLOPT_WRITEDATA, static_cast<void*>(&content));
if (emitFlowFile(method_)) {
logger_->log_info("InvokeHTTP -- reading flowfile");
@@ -419,12 +331,9 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context,
callbackObj->pos = 0;
logger_->log_info("InvokeHTTP -- Setting callback");
curl_easy_setopt(http_session, CURLOPT_UPLOAD, 1L);
- curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE,
- (curl_off_t)callback->getBufferSize());
- curl_easy_setopt(http_session, CURLOPT_READFUNCTION,
- &HTTPRequestResponse::send_write);
- curl_easy_setopt(http_session, CURLOPT_READDATA,
- static_cast<void*>(callbackObj));
+ curl_easy_setopt(http_session, CURLOPT_INFILESIZE_LARGE, (curl_off_t)callback->getBufferSize());
+ curl_easy_setopt(http_session, CURLOPT_READFUNCTION, &HTTPRequestResponse::send_write);
+ curl_easy_setopt(http_session, CURLOPT_READDATA, static_cast<void*>(callbackObj));
} else {
logger_->log_error("InvokeHTTP -- no resource claim");
}
@@ -434,9 +343,7 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context,
}
// append all headers
- struct curl_slist *headers = build_header_list(http_session,
- attribute_to_send_regex_,
- flowFile->getAttributes());
+ struct curl_slist *headers = build_header_list(http_session, attribute_to_send_regex_, flowFile->getAttributes());
curl_easy_setopt(http_session, CURLOPT_HTTPHEADER, headers);
logger_->log_info("InvokeHTTP -- curl performed");
@@ -459,10 +366,8 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context,
flowFile->addAttribute(REQUEST_URL, url_);
flowFile->addAttribute(TRANSACTION_ID, tx_id);
- bool isSuccess = ((int32_t) (http_code / 100)) == 2
- && res != CURLE_ABORTED_BY_CALLBACK;
- bool output_body_to_requestAttr = (!isSuccess || putToAttribute)
- && flowFile != nullptr;
+ bool isSuccess = ((int32_t) (http_code / 100)) == 2 && res != CURLE_ABORTED_BY_CALLBACK;
+ bool output_body_to_requestAttr = (!isSuccess || putToAttribute) && flowFile != nullptr;
bool output_body_to_content = isSuccess && !putToAttribute;
bool body_empty = IsNullOrEmpty(content.data);
@@ -471,11 +376,9 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context,
if (output_body_to_content) {
if (flowFile != nullptr) {
- response_flow = std::static_pointer_cast<FlowFileRecord>(
- session->create(flowFile));
+ response_flow = std::static_pointer_cast<FlowFileRecord>(session->create(flowFile));
} else {
- response_flow = std::static_pointer_cast<FlowFileRecord>(
- session->create());
+ response_flow = std::static_pointer_cast<FlowFileRecord>(session->create());
}
std::string ct = content_type;
@@ -484,28 +387,22 @@ void InvokeHTTP::onTrigger(core::ProcessContext *context,
response_flow->addAttribute(STATUS_MESSAGE, response_body);
response_flow->addAttribute(REQUEST_URL, url_);
response_flow->addAttribute(TRANSACTION_ID, tx_id);
- io::DataStream stream((const uint8_t*) content.data.data(),
- content.data.size());
+ io::DataStream stream((const uint8_t*) content.data.data(), content.data.size());
// need an import from the data stream.
session->importFrom(stream, response_flow);
} else {
logger_->log_info("Cannot output body to content");
- response_flow = std::static_pointer_cast<FlowFileRecord>(
- session->create());
+ response_flow = std::static_pointer_cast<FlowFileRecord>(session->create());
}
route(flowFile, response_flow, session, context, isSuccess, http_code);
} else {
- logger_->log_error("InvokeHTTP -- curl_easy_perform() failed %s\n",
- curl_easy_strerror(res));
+ logger_->log_error("InvokeHTTP -- curl_easy_perform() failed %s\n", curl_easy_strerror(res));
}
curl_slist_free_all(headers);
curl_easy_cleanup(http_session);
}
-void InvokeHTTP::route(std::shared_ptr<FlowFileRecord> &request,
- std::shared_ptr<FlowFileRecord> &response,
- core::ProcessSession *session,
- core::ProcessContext *context, bool isSuccess,
+void InvokeHTTP::route(std::shared_ptr<FlowFileRecord> &request, std::shared_ptr<FlowFileRecord> &response, core::ProcessSession *session, core::ProcessContext *context, bool isSuccess,
int statusCode) {
// check if we should yield the processor
if (!isSuccess && request == nullptr) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp
index 9fce69a..c26b41d 100644
--- a/libminifi/src/processors/ListenHTTP.cpp
+++ b/libminifi/src/processors/ListenHTTP.cpp
@@ -42,38 +42,20 @@ namespace nifi {
namespace minifi {
namespace processors {
-core::Property ListenHTTP::BasePath("Base Path",
- "Base path for incoming connections",
- "contentListener");
-core::Property ListenHTTP::Port(
- "Listening Port", "The Port to listen on for incoming connections", "");
-core::Property ListenHTTP::AuthorizedDNPattern(
- "Authorized DN Pattern",
- "A Regular Expression to apply against the Distinguished Name of incoming"
- " connections. If the Pattern does not match the DN, the connection will be refused.",
- ".*");
-core::Property ListenHTTP::SSLCertificate(
- "SSL Certificate",
- "File containing PEM-formatted file including TLS/SSL certificate and key",
- "");
-core::Property ListenHTTP::SSLCertificateAuthority(
- "SSL Certificate Authority",
- "File containing trusted PEM-formatted certificates", "");
-core::Property ListenHTTP::SSLVerifyPeer(
- "SSL Verify Peer",
- "Whether or not to verify the client's certificate (yes/no)", "no");
-core::Property ListenHTTP::SSLMinimumVersion(
- "SSL Minimum Version",
- "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)",
- "SSL2");
-core::Property ListenHTTP::HeadersAsAttributesRegex(
- "HTTP Headers to receive as Attributes (Regex)",
- "Specifies the Regular Expression that determines the names of HTTP Headers that"
- " should be passed along as FlowFile attributes",
- "");
-
-core::Relationship ListenHTTP::Success("success",
- "All files are routed to success");
+core::Property ListenHTTP::BasePath("Base Path", "Base path for incoming connections", "contentListener");
+core::Property ListenHTTP::Port("Listening Port", "The Port to listen on for incoming connections", "");
+core::Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming"
+ " connections. If the Pattern does not match the DN, the connection will be refused.",
+ ".*");
+core::Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing PEM-formatted file including TLS/SSL certificate and key", "");
+core::Property ListenHTTP::SSLCertificateAuthority("SSL Certificate Authority", "File containing trusted PEM-formatted certificates", "");
+core::Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the client's certificate (yes/no)", "no");
+core::Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2");
+core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", "Specifies the Regular Expression that determines the names of HTTP Headers that"
+ " should be passed along as FlowFile attributes",
+ "");
+
+core::Relationship ListenHTTP::Success("success", "All files are routed to success");
void ListenHTTP::initialize() {
logger_->log_info("Initializing ListenHTTP");
@@ -95,14 +77,11 @@ void ListenHTTP::initialize() {
setSupportedRelationships(relationships);
}
-void ListenHTTP::onSchedule(core::ProcessContext *context,
- core::ProcessSessionFactory *sessionFactory) {
+void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
std::string basePath;
if (!context->getProperty(BasePath.getName(), basePath)) {
- logger_->log_info(
- "%s attribute is missing, so default value of %s will be used",
- BasePath.getName().c_str(), BasePath.getValue().c_str());
+ logger_->log_info("%s attribute is missing, so default value of %s will be used", BasePath.getName().c_str(), BasePath.getValue().c_str());
basePath = BasePath.getValue();
}
@@ -111,26 +90,20 @@ void ListenHTTP::onSchedule(core::ProcessContext *context,
std::string listeningPort;
if (!context->getProperty(Port.getName(), listeningPort)) {
- logger_->log_error("%s attribute is missing or invalid",
- Port.getName().c_str());
+ logger_->log_error("%s attribute is missing or invalid", Port.getName().c_str());
return;
}
std::string authDNPattern;
- if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern)
- && !authDNPattern.empty()) {
- logger_->log_info("ListenHTTP using %s: %s",
- AuthorizedDNPattern.getName().c_str(),
- authDNPattern.c_str());
+ if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) && !authDNPattern.empty()) {
+ logger_->log_info("ListenHTTP using %s: %s", AuthorizedDNPattern.getName().c_str(), authDNPattern.c_str());
}
std::string sslCertFile;
- if (context->getProperty(SSLCertificate.getName(), sslCertFile)
- && !sslCertFile.empty()) {
- logger_->log_info("ListenHTTP using %s: %s",
- SSLCertificate.getName().c_str(), sslCertFile.c_str());
+ if (context->getProperty(SSLCertificate.getName(), sslCertFile) && !sslCertFile.empty()) {
+ logger_->log_info("ListenHTTP using %s: %s", SSLCertificate.getName().c_str(), sslCertFile.c_str());
}
// Read further TLS/SSL options only if TLS/SSL usage is implied by virtue of certificate value being set
@@ -139,12 +112,8 @@ void ListenHTTP::onSchedule(core::ProcessContext *context,
std::string sslMinVer;
if (!sslCertFile.empty()) {
- if (context->getProperty(SSLCertificateAuthority.getName(),
- sslCertAuthorityFile)
- && !sslCertAuthorityFile.empty()) {
- logger_->log_info("ListenHTTP using %s: %s",
- SSLCertificateAuthority.getName().c_str(),
- sslCertAuthorityFile.c_str());
+ if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile) && !sslCertAuthorityFile.empty()) {
+ logger_->log_info("ListenHTTP using %s: %s", SSLCertificateAuthority.getName().c_str(), sslCertAuthorityFile.c_str());
}
if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer)) {
@@ -158,26 +127,19 @@ void ListenHTTP::onSchedule(core::ProcessContext *context,
}
if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer)) {
- logger_->log_info("ListenHTTP using %s: %s",
- SSLMinimumVersion.getName().c_str(), sslMinVer.c_str());
+ logger_->log_info("ListenHTTP using %s: %s", SSLMinimumVersion.getName().c_str(), sslMinVer.c_str());
}
}
std::string headersAsAttributesPattern;
- if (context->getProperty(HeadersAsAttributesRegex.getName(),
- headersAsAttributesPattern)
- && !headersAsAttributesPattern.empty()) {
- logger_->log_info("ListenHTTP using %s: %s",
- HeadersAsAttributesRegex.getName().c_str(),
- headersAsAttributesPattern.c_str());
+ if (context->getProperty(HeadersAsAttributesRegex.getName(), headersAsAttributesPattern) && !headersAsAttributesPattern.empty()) {
+ logger_->log_info("ListenHTTP using %s: %s", HeadersAsAttributesRegex.getName().c_str(), headersAsAttributesPattern.c_str());
}
auto numThreads = getMaxConcurrentTasks();
- logger_->log_info(
- "ListenHTTP starting HTTP server on port %s and path %s with %d threads",
- listeningPort.c_str(), basePath.c_str(), numThreads);
+ logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads", listeningPort.c_str(), basePath.c_str(), numThreads);
// Initialize web server
std::vector<std::string> options;
@@ -231,19 +193,15 @@ void ListenHTTP::onSchedule(core::ProcessContext *context,
}
_server.reset(new CivetServer(options));
- _handler.reset(
- new Handler(context, sessionFactory, std::move(authDNPattern),
- std::move(headersAsAttributesPattern)));
+ _handler.reset(new Handler(context, sessionFactory, std::move(authDNPattern), std::move(headersAsAttributesPattern)));
_server->addHandler(basePath, _handler.get());
}
ListenHTTP::~ListenHTTP() {
}
-void ListenHTTP::onTrigger(core::ProcessContext *context,
- core::ProcessSession *session) {
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
- FlowFileRecord>(session->get());
+void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
// Do nothing if there are no incoming files
if (!flowFile) {
@@ -251,10 +209,7 @@ void ListenHTTP::onTrigger(core::ProcessContext *context,
}
}
-ListenHTTP::Handler::Handler(core::ProcessContext *context,
- core::ProcessSessionFactory *sessionFactory,
- std::string &&authDNPattern,
- std::string &&headersAsAttributesPattern)
+ListenHTTP::Handler::Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern)
: _authDNRegex(std::move(authDNPattern)),
_headersAsAttributesRegex(std::move(headersAsAttributesPattern)),
logger_(logging::LoggerFactory<ListenHTTP::Handler>::getLogger()) {
@@ -268,11 +223,9 @@ void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) {
"Content-Length: 0\r\n\r\n");
}
-bool ListenHTTP::Handler::handlePost(CivetServer *server,
- struct mg_connection *conn) {
+bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn) {
auto req_info = mg_get_request_info(conn);
- logger_->log_info("ListenHTTP handling POST request of length %d",
- req_info->content_length);
+ logger_->log_info("ListenHTTP handling POST request of length %d", req_info->content_length);
// If this is a two-way TLS connection, authorize the peer against the configured pattern
if (req_info->is_ssl && req_info->client_cert != nullptr) {
@@ -280,8 +233,7 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server,
mg_printf(conn, "HTTP/1.1 403 Forbidden\r\n"
"Content-Type: text/html\r\n"
"Content-Length: 0\r\n\r\n");
- logger_->log_warn("ListenHTTP client DN not authorized: %s",
- req_info->client_cert->subject);
+ logger_->log_warn("ListenHTTP client DN not authorized: %s", req_info->client_cert->subject);
return true;
}
}
@@ -337,8 +289,8 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server,
return true;
}
-ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo) :
- logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) {
+ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo)
+ : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) {
_conn = conn;
_reqInfo = reqInfo;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/ListenSyslog.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenSyslog.cpp b/libminifi/src/processors/ListenSyslog.cpp
index e7d2e7b..054d585 100644
--- a/libminifi/src/processors/ListenSyslog.cpp
+++ b/libminifi/src/processors/ListenSyslog.cpp
@@ -35,37 +35,18 @@ namespace nifi {
namespace minifi {
namespace processors {
-core::Property ListenSyslog::RecvBufSize(
- "Receive Buffer Size",
- "The size of each buffer used to receive Syslog messages.", "65507 B");
-core::Property ListenSyslog::MaxSocketBufSize(
- "Max Size of Socket Buffer",
- "The maximum size of the socket buffer that should be used.", "1 MB");
-core::Property ListenSyslog::MaxConnections(
- "Max Number of TCP Connections",
- "The maximum number of concurrent connections to accept Syslog messages in TCP mode.",
- "2");
-core::Property ListenSyslog::MaxBatchSize(
- "Max Batch Size",
- "The maximum number of Syslog events to add to a single FlowFile.", "1");
-core::Property ListenSyslog::MessageDelimiter(
- "Message Delimiter",
- "Specifies the delimiter to place between Syslog messages when multiple "
- "messages are bundled together (see <Max Batch Size> core::Property).",
- "\n");
-core::Property ListenSyslog::ParseMessages(
- "Parse Messages",
- "Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.",
- "false");
-core::Property ListenSyslog::Protocol("Protocol",
- "The protocol for Syslog communication.",
- "UDP");
-core::Property ListenSyslog::Port("Port", "The port for Syslog communication.",
- "514");
-core::Relationship ListenSyslog::Success("success",
- "All files are routed to success");
-core::Relationship ListenSyslog::Invalid("invalid",
- "SysLog message format invalid");
+core::Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each buffer used to receive Syslog messages.", "65507 B");
+core::Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The maximum size of the socket buffer that should be used.", "1 MB");
+core::Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The maximum number of concurrent connections to accept Syslog messages in TCP mode.", "2");
+core::Property ListenSyslog::MaxBatchSize("Max Batch Size", "The maximum number of Syslog events to add to a single FlowFile.", "1");
+core::Property ListenSyslog::MessageDelimiter("Message Delimiter", "Specifies the delimiter to place between Syslog messages when multiple "
+ "messages are bundled together (see <Max Batch Size> core::Property).",
+ "\n");
+core::Property ListenSyslog::ParseMessages("Parse Messages", "Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", "false");
+core::Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog communication.", "UDP");
+core::Property ListenSyslog::Port("Port", "The port for Syslog communication.", "514");
+core::Relationship ListenSyslog::Success("success", "All files are routed to success");
+core::Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid");
void ListenSyslog::initialize() {
// Set the supported properties
@@ -140,8 +121,7 @@ void ListenSyslog::runThread() {
if (_protocol == "TCP")
listen(sockfd, 5);
_serverSocket = sockfd;
- logger_->log_error("ListenSysLog Server socket %d bind OK to port %d",
- _serverSocket, portno);
+ logger_->log_error("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno);
}
FD_ZERO(&_readfds);
FD_SET(_serverSocket, &_readfds);
@@ -171,14 +151,11 @@ void ListenSyslog::runThread() {
socklen_t clilen;
struct sockaddr_in cli_addr;
clilen = sizeof(cli_addr);
- int newsockfd = accept(_serverSocket,
- reinterpret_cast<struct sockaddr *>(&cli_addr),
- &clilen);
+ int newsockfd = accept(_serverSocket, reinterpret_cast<struct sockaddr *>(&cli_addr), &clilen);
if (newsockfd > 0) {
if (_clientSockets.size() < _maxConnections) {
_clientSockets.push_back(newsockfd);
- logger_->log_info("ListenSysLog new client socket %d connection",
- newsockfd);
+ logger_->log_info("ListenSysLog new client socket %d connection", newsockfd);
continue;
} else {
close(newsockfd);
@@ -188,10 +165,8 @@ void ListenSyslog::runThread() {
socklen_t clilen;
struct sockaddr_in cli_addr;
clilen = sizeof(cli_addr);
- int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0,
- (struct sockaddr *) &cli_addr, &clilen);
- if (recvlen > 0
- && (recvlen + getEventQueueByteSize()) <= _recvBufSize) {
+ int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0, (struct sockaddr *) &cli_addr, &clilen);
+ if (recvlen > 0 && (recvlen + getEventQueueByteSize()) <= _recvBufSize) {
uint8_t *payload = new uint8_t[recvlen];
memcpy(payload, _buffer, recvlen);
putEvent(payload, recvlen);
@@ -205,8 +180,7 @@ void ListenSyslog::runThread() {
int recvlen = readline(clientSocket, _buffer, sizeof(_buffer));
if (recvlen <= 0) {
close(clientSocket);
- logger_->log_info("ListenSysLog client socket %d close",
- clientSocket);
+ logger_->log_info("ListenSysLog client socket %d close", clientSocket);
it = _clientSockets.erase(it);
} else {
if ((recvlen + getEventQueueByteSize()) <= _recvBufSize) {
@@ -253,8 +227,7 @@ int ListenSyslog::readline(int fd, char *bufptr, size_t len) {
return -1;
}
-void ListenSyslog::onTrigger(core::ProcessContext *context,
- core::ProcessSession *session) {
+void ListenSyslog::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
std::string value;
bool needResetServerSocket = false;
if (context->getProperty(Protocol.getName(), value)) {
@@ -275,8 +248,7 @@ void ListenSyslog::onTrigger(core::ProcessContext *context,
_messageDelimiter = value;
}
if (context->getProperty(ParseMessages.getName(), value)) {
- org::apache::nifi::minifi::utils::StringUtils::StringToBool(value,
- _parseMessages);
+ org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, _parseMessages);
}
if (context->getProperty(Port.getName(), value)) {
int64_t oldPort = _port;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/LogAttribute.cpp b/libminifi/src/processors/LogAttribute.cpp
index d2dcd10..e308901 100644
--- a/libminifi/src/processors/LogAttribute.cpp
+++ b/libminifi/src/processors/LogAttribute.cpp
@@ -39,27 +39,14 @@ namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
-core::Property LogAttribute::LogLevel(
- "Log Level", "The Log Level to use when logging the Attributes", "info");
-core::Property LogAttribute::AttributesToLog(
- "Attributes to Log",
- "A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.",
- "");
-core::Property LogAttribute::AttributesToIgnore(
- "Attributes to Ignore",
- "A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.",
- "");
-core::Property LogAttribute::LogPayload(
- "Log Payload",
- "If true, the FlowFile's payload will be logged, in addition to its attributes;"
- "otherwise, just the Attributes will be logged.",
- "false");
-core::Property LogAttribute::LogPrefix(
- "Log prefix",
- "Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.",
- "");
-core::Relationship LogAttribute::Success(
- "success", "success operational on the flow record");
+core::Property LogAttribute::LogLevel("Log Level", "The Log Level to use when logging the Attributes", "info");
+core::Property LogAttribute::AttributesToLog("Attributes to Log", "A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.", "");
+core::Property LogAttribute::AttributesToIgnore("Attributes to Ignore", "A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.", "");
+core::Property LogAttribute::LogPayload("Log Payload", "If true, the FlowFile's payload will be logged, in addition to its attributes;"
+ "otherwise, just the Attributes will be logged.",
+ "false");
+core::Property LogAttribute::LogPrefix("Log prefix", "Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.", "");
+core::Relationship LogAttribute::Success("success", "success operational on the flow record");
void LogAttribute::initialize() {
// Set the supported properties
@@ -76,8 +63,7 @@ void LogAttribute::initialize() {
setSupportedRelationships(relationships);
}
-void LogAttribute::onTrigger(core::ProcessContext *context,
- core::ProcessSession *session) {
+void LogAttribute::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
std::string dashLine = "--------------------------------------------------";
LogAttrLevel level = LogAttrLevelInfo;
bool logPayload = false;
@@ -96,8 +82,7 @@ void LogAttribute::onTrigger(core::ProcessContext *context,
dashLine = "-----" + value + "-----";
}
if (context->getProperty(LogPayload.getName(), value)) {
- org::apache::nifi::minifi::utils::StringUtils::StringToBool(value,
- logPayload);
+ org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, logPayload);
}
message << "Logging for flow file " << "\n";
@@ -105,10 +90,8 @@ void LogAttribute::onTrigger(core::ProcessContext *context,
message << "\nStandard FlowFile Attributes";
message << "\n" << "UUID:" << flow->getUUIDStr();
message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate());
- message << "\n" << "lineageStartDate:"
- << getTimeStr(flow->getlineageStartDate());
- message << "\n" << "Size:" << flow->getSize() << " Offset:"
- << flow->getOffset();
+ message << "\n" << "lineageStartDate:" << getTimeStr(flow->getlineageStartDate());
+ message << "\n" << "Size:" << flow->getSize() << " Offset:" << flow->getOffset();
message << "\nFlowFile Attributes Map Content";
std::map<std::string, std::string> attrs = flow->getAttributes();
std::map<std::string, std::string>::iterator it;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp
index d8832c7..0aba3d7 100644
--- a/libminifi/src/processors/PutFile.cpp
+++ b/libminifi/src/processors/PutFile.cpp
@@ -38,19 +38,12 @@ namespace nifi {
namespace minifi {
namespace processors {
-core::Property PutFile::Directory("Output Directory",
- "The output directory to which to put files",
- ".");
-core::Property PutFile::ConflictResolution(
- "Conflict Resolution Strategy",
- "Indicates what should happen when a file with the same name already exists in the output directory",
- CONFLICT_RESOLUTION_STRATEGY_FAIL);
-
-core::Relationship PutFile::Success("success",
- "All files are routed to success");
-core::Relationship PutFile::Failure(
- "failure",
- "Failed files (conflict, write failure, etc.) are transferred to failure");
+core::Property PutFile::Directory("Output Directory", "The output directory to which to put files", ".");
+core::Property PutFile::ConflictResolution("Conflict Resolution Strategy", "Indicates what should happen when a file with the same name already exists in the output directory",
+ CONFLICT_RESOLUTION_STRATEGY_FAIL);
+
+core::Relationship PutFile::Success("success", "All files are routed to success");
+core::Relationship PutFile::Failure("failure", "Failed files (conflict, write failure, etc.) are transferred to failure");
void PutFile::initialize() {
// Set the supported properties
@@ -65,28 +58,23 @@ void PutFile::initialize() {
setSupportedRelationships(relationships);
}
-void PutFile::onSchedule(core::ProcessContext *context,
- core::ProcessSessionFactory *sessionFactory) {
+void PutFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
if (!context->getProperty(Directory.getName(), directory_)) {
logger_->log_error("Directory attribute is missing or invalid");
}
- if (!context->getProperty(ConflictResolution.getName(),
- conflict_resolution_)) {
- logger_->log_error(
- "Conflict Resolution Strategy attribute is missing or invalid");
+ if (!context->getProperty(ConflictResolution.getName(), conflict_resolution_)) {
+ logger_->log_error("Conflict Resolution Strategy attribute is missing or invalid");
}
}
-void PutFile::onTrigger(core::ProcessContext *context,
- core::ProcessSession *session) {
+void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
if (IsNullOrEmpty(directory_) || IsNullOrEmpty(conflict_resolution_)) {
context->yield();
return;
}
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
- FlowFileRecord>(session->get());
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get());
// Do nothing if there are no incoming files
if (!flowFile) {
@@ -111,16 +99,13 @@ void PutFile::onTrigger(core::ProcessContext *context,
destFileSs << directory_ << "/" << filename;
std::string destFile = destFileSs.str();
- logger_->log_info("PutFile writing file %s into directory %s",
- filename.c_str(), directory_.c_str());
+ logger_->log_info("PutFile writing file %s into directory %s", filename.c_str(), directory_.c_str());
// If file exists, apply conflict resolution strategy
struct stat statResult;
if (stat(destFile.c_str(), &statResult) == 0) {
- logger_->log_info(
- "Destination file %s exists; applying Conflict Resolution Strategy: %s",
- destFile.c_str(), conflict_resolution_.c_str());
+ logger_->log_info("Destination file %s exists; applying Conflict Resolution Strategy: %s", destFile.c_str(), conflict_resolution_.c_str());
if (conflict_resolution_ == CONFLICT_RESOLUTION_STRATEGY_REPLACE) {
putFile(session, flowFile, tmpFile, destFile);
@@ -134,9 +119,7 @@ void PutFile::onTrigger(core::ProcessContext *context,
}
}
-bool PutFile::putFile(core::ProcessSession *session,
- std::shared_ptr<FlowFileRecord> flowFile,
- const std::string &tmpFile, const std::string &destFile) {
+bool PutFile::putFile(core::ProcessSession *session, std::shared_ptr<FlowFileRecord> flowFile, const std::string &tmpFile, const std::string &destFile) {
ReadCallback cb(tmpFile, destFile);
session->read(flowFile, &cb);
@@ -149,8 +132,7 @@ bool PutFile::putFile(core::ProcessSession *session,
return false;
}
-PutFile::ReadCallback::ReadCallback(const std::string &tmpFile,
- const std::string &destFile)
+PutFile::ReadCallback::ReadCallback(const std::string &tmpFile, const std::string &destFile)
: _tmpFile(tmpFile),
_tmpFileOs(tmpFile),
_destFile(destFile),
@@ -170,25 +152,19 @@ void PutFile::ReadCallback::process(std::ifstream *stream) {
bool PutFile::ReadCallback::commit() {
bool success = false;
- logger_->log_info("PutFile committing put file operation to %s",
- _destFile.c_str());
+ logger_->log_info("PutFile committing put file operation to %s", _destFile.c_str());
if (_writeSucceeded) {
_tmpFileOs.close();
if (rename(_tmpFile.c_str(), _destFile.c_str())) {
- logger_->log_info(
- "PutFile commit put file operation to %s failed because rename() call failed",
- _destFile.c_str());
+ logger_->log_info("PutFile commit put file operation to %s failed because rename() call failed", _destFile.c_str());
} else {
success = true;
- logger_->log_info("PutFile commit put file operation to %s succeeded",
- _destFile.c_str());
+ logger_->log_info("PutFile commit put file operation to %s succeeded", _destFile.c_str());
}
} else {
- logger_->log_error(
- "PutFile commit put file operation to %s failed because write failed",
- _destFile.c_str());
+ logger_->log_error("PutFile commit put file operation to %s failed because write failed", _destFile.c_str());
}
return success;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/processors/TailFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp
index abb02ca..d4f1b80 100644
--- a/libminifi/src/processors/TailFile.cpp
+++ b/libminifi/src/processors/TailFile.cpp
@@ -46,16 +46,11 @@ namespace nifi {
namespace minifi {
namespace processors {
-core::Property TailFile::FileName(
- "File to Tail",
- "Fully-qualified filename of the file that should be tailed", "");
-core::Property TailFile::StateFile(
- "State File",
- "Specifies the file that should be used for storing state about"
- " what data has been ingested so that upon restart NiFi can resume from where it left off",
- "TailFileState");
-core::Relationship TailFile::Success("success",
- "All files are routed to success");
+core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed", "");
+core::Property TailFile::StateFile("State File", "Specifies the file that should be used for storing state about"
+ " what data has been ingested so that upon restart NiFi can resume from where it left off",
+ "TailFileState");
+core::Relationship TailFile::Success("success", "All files are routed to success");
void TailFile::initialize() {
// Set the supported properties
@@ -84,8 +79,7 @@ void TailFile::parseStateFileLine(char *buf) {
++line;
char first = line[0];
- if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n')
- || (first == '=')) {
+ if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') || (first == '=')) {
return;
}
@@ -125,8 +119,7 @@ void TailFile::recoverState() {
return;
}
char buf[BUFFER_SIZE];
- for (file.getline(buf, BUFFER_SIZE); file.good();
- file.getline(buf, BUFFER_SIZE)) {
+ for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
parseStateFileLine(buf);
}
}
@@ -142,12 +135,10 @@ void TailFile::storeState() {
file.close();
}
-static bool sortTailMatchedFileItem(TailMatchedFileItem i,
- TailMatchedFileItem j) {
+static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) {
return (i.modifiedTime < j.modifiedTime);
}
-void TailFile::checkRollOver(const std::string &fileLocation,
- const std::string &fileName) {
+void TailFile::checkRollOver(const std::string &fileLocation, const std::string &fileName) {
struct stat statbuf;
std::vector<TailMatchedFileItem> matchedFiles;
std::string fullPath = fileLocation + "/" + _currentTailFileName;
@@ -157,8 +148,7 @@ void TailFile::checkRollOver(const std::string &fileLocation,
// there are new input for the current tail file
return;
- uint64_t modifiedTimeCurrentTailFile =
- ((uint64_t) (statbuf.st_mtime) * 1000);
+ uint64_t modifiedTimeCurrentTailFile = ((uint64_t) (statbuf.st_mtime) * 1000);
std::string pattern = fileName;
std::size_t found = fileName.find_last_of(".");
if (found != std::string::npos)
@@ -176,10 +166,8 @@ void TailFile::checkRollOver(const std::string &fileLocation,
if (!(entry->d_type & DT_DIR)) {
std::string fileName = d_name;
std::string fileFullName = fileLocation + "/" + d_name;
- if (fileFullName.find(pattern) != std::string::npos
- && stat(fileFullName.c_str(), &statbuf) == 0) {
- if (((uint64_t) (statbuf.st_mtime) * 1000)
- >= modifiedTimeCurrentTailFile) {
+ if (fileFullName.find(pattern) != std::string::npos && stat(fileFullName.c_str(), &statbuf) == 0) {
+ if (((uint64_t) (statbuf.st_mtime) * 1000) >= modifiedTimeCurrentTailFile) {
TailMatchedFileItem item;
item.fileName = fileName;
item.modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
@@ -191,18 +179,14 @@ void TailFile::checkRollOver(const std::string &fileLocation,
closedir(d);
// Sort the list based on modified time
- std::sort(matchedFiles.begin(), matchedFiles.end(),
- sortTailMatchedFileItem);
- for (std::vector<TailMatchedFileItem>::iterator it = matchedFiles.begin();
- it != matchedFiles.end(); ++it) {
+ std::sort(matchedFiles.begin(), matchedFiles.end(), sortTailMatchedFileItem);
+ for (std::vector<TailMatchedFileItem>::iterator it = matchedFiles.begin(); it != matchedFiles.end(); ++it) {
TailMatchedFileItem item = *it;
if (item.fileName == _currentTailFileName) {
++it;
if (it != matchedFiles.end()) {
TailMatchedFileItem nextItem = *it;
- logger_->log_info("TailFile File Roll Over from %s to %s",
- _currentTailFileName.c_str(),
- nextItem.fileName.c_str());
+ logger_->log_info("TailFile File Roll Over from %s to %s", _currentTailFileName.c_str(), nextItem.fileName.c_str());
_currentTailFileName = nextItem.fileName;
_currentTailFilePosition = 0;
storeState();
@@ -215,8 +199,7 @@ void TailFile::checkRollOver(const std::string &fileLocation,
}
}
-void TailFile::onTrigger(core::ProcessContext *context,
- core::ProcessSession *session) {
+void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
std::string value;
std::string fileLocation = "";
@@ -245,8 +228,7 @@ void TailFile::onTrigger(core::ProcessContext *context,
context->yield();
return;
}
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
- FlowFileRecord>(session->create());
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
if (!flowFile)
return;
std::size_t found = _currentTailFileName.find_last_of(".");
@@ -256,12 +238,8 @@ void TailFile::onTrigger(core::ProcessContext *context,
flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
session->import(fullPath, flowFile, true, this->_currentTailFilePosition);
session->transfer(flowFile, Success);
- logger_->log_info("TailFile %s for %d bytes", _currentTailFileName.c_str(),
- flowFile->getSize());
- std::string logName = baseName + "."
- + std::to_string(_currentTailFilePosition) + "-"
- + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "."
- + extension;
+ logger_->log_info("TailFile %s for %d bytes", _currentTailFileName.c_str(), flowFile->getSize());
+ std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension;
flowFile->updateKeyedAttribute(FILENAME, logName);
this->_currentTailFilePosition += flowFile->getSize();
storeState();
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/provenance/Provenance.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index 083d0b2..ff6a149 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -34,49 +34,37 @@ namespace nifi {
namespace minifi {
namespace provenance {
-const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY+1] =
-{ "CREATE", "RECEIVE", "FETCH", "SEND", "DOWNLOAD", "DROP", "EXPIRE", "FORK",
- "JOIN", "CLONE", "CONTENT_MODIFIED", "ATTRIBUTES_MODIFIED", "ROUTE",
- "ADDINFO", "REPLAY"};
+const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY + 1] = { "CREATE", "RECEIVE", "FETCH", "SEND", "DOWNLOAD", "DROP", "EXPIRE", "FORK", "JOIN", "CLONE", "CONTENT_MODIFIED",
+ "ATTRIBUTES_MODIFIED", "ROUTE", "ADDINFO", "REPLAY" };
// DeSerialize
-bool ProvenanceEventRecord::DeSerialize(
- const std::shared_ptr<core::Repository> &repo, std::string key) {
+bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::Repository> &repo, std::string key) {
std::string value;
bool ret;
ret = repo->Get(key, value);
if (!ret) {
- logger_->log_error("NiFi Provenance Store event %s can not found",
- key.c_str());
+ logger_->log_error("NiFi Provenance Store event %s can not found", key.c_str());
return false;
} else {
- logger_->log_debug("NiFi Provenance Read event %s length %d", key.c_str(),
- value.length());
+ logger_->log_debug("NiFi Provenance Read event %s length %d", key.c_str(), value.length());
}
- org::apache::nifi::minifi::io::DataStream stream(
- (const uint8_t*) value.data(), value.length());
+ org::apache::nifi::minifi::io::DataStream stream((const uint8_t*) value.data(), value.length());
ret = DeSerialize(stream);
if (ret) {
- logger_->log_debug(
- "NiFi Provenance retrieve event %s size %d eventType %d success",
- _eventIdStr.c_str(), stream.getSize(), _eventType);
+ logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d success", _eventIdStr.c_str(), stream.getSize(), _eventType);
} else {
- logger_->log_debug(
- "NiFi Provenance retrieve event %s size %d eventType %d fail",
- _eventIdStr.c_str(), stream.getSize(), _eventType);
+ logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d fail", _eventIdStr.c_str(), stream.getSize(), _eventType);
}
return ret;
}
-bool ProvenanceEventRecord::Serialize(
- const std::shared_ptr<core::Repository> &repo) {
-
+bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::Repository> &repo) {
org::apache::nifi::minifi::io::DataStream outStream;
int ret;
@@ -170,9 +158,7 @@ bool ProvenanceEventRecord::Serialize(
return false;
}
- if (this->_eventType == ProvenanceEventRecord::FORK
- || this->_eventType == ProvenanceEventRecord::CLONE
- || this->_eventType == ProvenanceEventRecord::JOIN) {
+ if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) {
// write UUIDs
uint32_t number = this->_parentUuids.size();
ret = write(number, &outStream);
@@ -196,8 +182,7 @@ bool ProvenanceEventRecord::Serialize(
return false;
}
}
- } else if (this->_eventType == ProvenanceEventRecord::SEND
- || this->_eventType == ProvenanceEventRecord::FETCH) {
+ } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) {
ret = writeUTF(this->_transitUri, &outStream);
if (ret <= 0) {
return false;
@@ -213,19 +198,15 @@ bool ProvenanceEventRecord::Serialize(
}
}
// Persistent to the DB
- if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()),
- outStream.getSize())) {
- logger_->log_debug("NiFi Provenance Store event %s size %d success",
- _eventIdStr.c_str(), outStream.getSize());
+ if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
+ logger_->log_debug("NiFi Provenance Store event %s size %d success", _eventIdStr.c_str(), outStream.getSize());
} else {
- logger_->log_error("NiFi Provenance Store event %s size %d fail",
- _eventIdStr.c_str(), outStream.getSize());
+ logger_->log_error("NiFi Provenance Store event %s size %d fail", _eventIdStr.c_str(), outStream.getSize());
}
return true;
}
-bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer,
- const int bufferSize) {
+bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
int ret;
org::apache::nifi::minifi::io::DataStream outStream(buffer, bufferSize);
@@ -325,9 +306,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer,
return false;
}
- if (this->_eventType == ProvenanceEventRecord::FORK
- || this->_eventType == ProvenanceEventRecord::CLONE
- || this->_eventType == ProvenanceEventRecord::JOIN) {
+ if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) {
// read UUIDs
uint32_t number = 0;
ret = read(number, &outStream);
@@ -356,8 +335,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer,
}
this->addChildUuid(childUUID);
}
- } else if (this->_eventType == ProvenanceEventRecord::SEND
- || this->_eventType == ProvenanceEventRecord::FETCH) {
+ } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) {
ret = readUTF(this->_transitUri, &outStream);
if (ret <= 0) {
return false;
@@ -386,8 +364,7 @@ void ProvenanceReporter::commit() {
}
}
-void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow,
- std::string detail) {
+void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow, std::string detail) {
ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CREATE, flow);
if (event) {
@@ -396,9 +373,7 @@ void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow,
}
}
-void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow,
- core::Relationship relation, std::string detail,
- uint64_t processingDuration) {
+void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation, std::string detail, uint64_t processingDuration) {
ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ROUTE, flow);
if (event) {
@@ -409,10 +384,8 @@ void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow,
}
}
-void ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> flow,
- std::string detail) {
- ProvenanceEventRecord *event = allocate(
- ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow);
+void ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> flow, std::string detail) {
+ ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow);
if (event) {
event->setDetails(detail);
@@ -420,11 +393,8 @@ void ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> flow,
}
}
-void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow,
- std::string detail,
- uint64_t processingDuration) {
- ProvenanceEventRecord *event = allocate(
- ProvenanceEventRecord::CONTENT_MODIFIED, flow);
+void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail, uint64_t processingDuration) {
+ ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CONTENT_MODIFIED, flow);
if (event) {
event->setDetails(detail);
@@ -433,8 +403,7 @@ void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow,
}
}
-void ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent,
- std::shared_ptr<core::FlowFile> child) {
+void ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent, std::shared_ptr<core::FlowFile> child) {
ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CLONE, parent);
if (event) {
@@ -444,10 +413,7 @@ void ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent,
}
}
-void ProvenanceReporter::join(
- std::vector<std::shared_ptr<core::FlowFile> > parents,
- std::shared_ptr<core::FlowFile> child, std::string detail,
- uint64_t processingDuration) {
+void ProvenanceReporter::join(std::vector<std::shared_ptr<core::FlowFile> > parents, std::shared_ptr<core::FlowFile> child, std::string detail, uint64_t processingDuration) {
ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::JOIN, child);
if (event) {
@@ -463,10 +429,7 @@ void ProvenanceReporter::join(
}
}
-void ProvenanceReporter::fork(
- std::vector<std::shared_ptr<core::FlowFile> > child,
- std::shared_ptr<core::FlowFile> parent, std::string detail,
- uint64_t processingDuration) {
+void ProvenanceReporter::fork(std::vector<std::shared_ptr<core::FlowFile> > child, std::shared_ptr<core::FlowFile> parent, std::string detail, uint64_t processingDuration) {
ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FORK, parent);
if (event) {
@@ -482,8 +445,7 @@ void ProvenanceReporter::fork(
}
}
-void ProvenanceReporter::expire(std::shared_ptr<core::FlowFile> flow,
- std::string detail) {
+void ProvenanceReporter::expire(std::shared_ptr<core::FlowFile> flow, std::string detail) {
ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::EXPIRE, flow);
if (event) {
@@ -492,8 +454,7 @@ void ProvenanceReporter::expire(std::shared_ptr<core::FlowFile> flow,
}
}
-void ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow,
- std::string reason) {
+void ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow, std::string reason) {
ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::DROP, flow);
if (event) {
@@ -503,9 +464,7 @@ void ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow,
}
}
-void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow,
- std::string transitUri, std::string detail,
- uint64_t processingDuration, bool force) {
+void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, uint64_t processingDuration, bool force) {
ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::SEND, flow);
if (event) {
@@ -522,11 +481,7 @@ void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow,
}
}
-void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow,
- std::string transitUri,
- std::string sourceSystemFlowFileIdentifier,
- std::string detail,
- uint64_t processingDuration) {
+void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string sourceSystemFlowFileIdentifier, std::string detail, uint64_t processingDuration) {
ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::RECEIVE, flow);
if (event) {
@@ -538,9 +493,7 @@ void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow,
}
}
-void ProvenanceReporter::fetch(std::shared_ptr<core::FlowFile> flow,
- std::string transitUri, std::string detail,
- uint64_t processingDuration) {
+void ProvenanceReporter::fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, uint64_t processingDuration) {
ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FETCH, flow);
if (event) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/provenance/ProvenanceRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp b/libminifi/src/provenance/ProvenanceRepository.cpp
index 77de5ba..e4a8ffa 100644
--- a/libminifi/src/provenance/ProvenanceRepository.cpp
+++ b/libminifi/src/provenance/ProvenanceRepository.cpp
@@ -40,14 +40,11 @@ void ProvenanceRepository::run() {
for (it->SeekToFirst(); it->Valid(); it->Next()) {
ProvenanceEventRecord eventRead;
std::string key = it->key().ToString();
- if (eventRead.DeSerialize(
- reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())),
- it->value().size())) {
+ if (eventRead.DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), it->value().size())) {
if ((curTime - eventRead.getEventTime()) > max_partition_millis_)
purgeList.push_back(key);
} else {
- logger_->log_debug("NiFi Provenance retrieve event %s fail",
- key.c_str());
+ logger_->log_debug("NiFi Provenance retrieve event %s fail", key.c_str());
purgeList.push_back(key);
}
}
@@ -56,8 +53,7 @@ void ProvenanceRepository::run() {
for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) {
std::string eventId = *itPurge;
- logger_->log_info("ProvenanceRepository Repo Purge %s",
- eventId.c_str());
+ logger_->log_info("ProvenanceRepository Repo Purge %s", eventId.c_str());
Delete(eventId);
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/test/Server.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/Server.cpp b/libminifi/test/Server.cpp
index bb3e682..39efb12 100644
--- a/libminifi/test/Server.cpp
+++ b/libminifi/test/Server.cpp
@@ -50,8 +50,7 @@ typedef enum {
} FlowControlMsgType;
// FlowControl Protocol Msg Type String
-static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = {
- "REGISTER_REQ", "REGISTER_RESP", "REPORT_REQ", "REPORT_RESP" };
+static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = { "REGISTER_REQ", "REGISTER_RESP", "REPORT_REQ", "REPORT_RESP" };
// Flow Control Msg Type to String
inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) {
@@ -83,10 +82,8 @@ typedef enum {
} FlowControlMsgID;
// FlowControl Protocol Msg ID String
-static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = {
- "FLOW_SERIAL_NUMBER", "FLOW_YAML_NAME", "FLOW_YAML_CONTENT",
- "REPORT_INTERVAL", "PROCESSOR_NAME"
- "PROPERTY_NAME", "PROPERTY_VALUE", "REPORT_BLOB" };
+static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = { "FLOW_SERIAL_NUMBER", "FLOW_YAML_NAME", "FLOW_YAML_CONTENT", "REPORT_INTERVAL", "PROCESSOR_NAME"
+ "PROPERTY_NAME", "PROPERTY_VALUE", "REPORT_BLOB" };
#define TYPE_HDR_LEN 4 // Fix Hdr Type
#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes
@@ -122,9 +119,7 @@ typedef enum {
} FlowControlRespCode;
// FlowControl Resp Code str
-static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = { "RESP_SUCCESS",
- "RESP_TRIGGER_REGISTER", "RESP_START_FLOW_CONTROLLER",
- "RESP_STOP_FLOW_CONTROLLER", "RESP_FAILURE" };
+static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = { "RESP_SUCCESS", "RESP_TRIGGER_REGISTER", "RESP_START_FLOW_CONTROLLER", "RESP_STOP_FLOW_CONTROLLER", "RESP_FAILURE" };
// Flow Control Resp Code to String
inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) {
@@ -332,8 +327,7 @@ int main(int argc, char *argv[]) {
exit(1);
}
- if (signal(SIGINT, sigHandler) == SIG_ERR
- || signal(SIGTERM, sigHandler) == SIG_ERR) {
+ if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR) {
return -1;
}
@@ -360,13 +354,10 @@ int main(int argc, char *argv[]) {
FlowControlProtocolHeader hdr;
int status = readHdr(newsockfd, &hdr);
if (status > 0) {
- printf("Flow Control Protocol receive MsgType %s\n",
- FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+ printf("Flow Control Protocol receive MsgType %s\n", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
printf("Flow Control Protocol receive Seq Num %d\n", hdr.seqNumber);
- printf("Flow Control Protocol receive Resp Code %s\n",
- FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
- printf("Flow Control Protocol receive Payload len %d\n",
- hdr.payloadLen);
+ printf("Flow Control Protocol receive Resp Code %s\n", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+ printf("Flow Control Protocol receive Payload len %d\n", hdr.payloadLen);
if (((FlowControlMsgType) hdr.msgType) == REGISTER_REQ) {
printf("Flow Control Protocol Register Req receive\n");
uint8_t *payload = new uint8_t[hdr.payloadLen];
@@ -384,12 +375,10 @@ int main(int argc, char *argv[]) {
} else if (((FlowControlMsgID) msgID) == FLOW_YAML_NAME) {
uint32_t len;
payloadPtr = decode(payloadPtr, len);
- printf("Flow Control Protocol receive YAML name length %d\n",
- len);
+ printf("Flow Control Protocol receive YAML name length %d\n", len);
std::string flowName = (const char *) payloadPtr;
payloadPtr += len;
- printf("Flow Control Protocol receive YAML name %s\n",
- flowName.c_str());
+ printf("Flow Control Protocol receive YAML name %s\n", flowName.c_str());
} else {
break;
}
@@ -399,11 +388,9 @@ int main(int argc, char *argv[]) {
// Calculate the total payload msg size
char *ymlContent;
uint32_t yamlLen = readYAML(&ymlContent);
- uint32_t payloadSize = FlowControlMsgIDEncodingLen(REPORT_INTERVAL,
- 0);
+ uint32_t payloadSize = FlowControlMsgIDEncodingLen(REPORT_INTERVAL, 0);
if (yamlLen > 0)
- payloadSize += FlowControlMsgIDEncodingLen(FLOW_YAML_CONTENT,
- yamlLen);
+ payloadSize += FlowControlMsgIDEncodingLen(FLOW_YAML_CONTENT, yamlLen);
uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
uint8_t *data = new uint8_t[size];
@@ -444,12 +431,10 @@ int main(int argc, char *argv[]) {
if (((FlowControlMsgID) msgID) == FLOW_YAML_NAME) {
uint32_t len;
payloadPtr = decode(payloadPtr, len);
- printf("Flow Control Protocol receive YAML name length %d\n",
- len);
+ printf("Flow Control Protocol receive YAML name length %d\n", len);
std::string flowName = (const char *) payloadPtr;
payloadPtr += len;
- printf("Flow Control Protocol receive YAML name %s\n",
- flowName.c_str());
+ printf("Flow Control Protocol receive YAML name %s\n", flowName.c_str());
} else {
break;
}
@@ -475,16 +460,11 @@ int main(int argc, char *argv[]) {
propertyValue2 = "41";
flag = 0;
}
- uint32_t payloadSize = FlowControlMsgIDEncodingLen(
- PROCESSOR_NAME, processor.size() + 1);
- payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME,
- propertyName1.size() + 1);
- payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE,
- propertyValue1.size() + 1);
- payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME,
- propertyName2.size() + 1);
- payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE,
- propertyValue2.size() + 1);
+ uint32_t payloadSize = FlowControlMsgIDEncodingLen(PROCESSOR_NAME, processor.size() + 1);
+ payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName1.size() + 1);
+ payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue1.size() + 1);
+ payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName2.size() + 1);
+ payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue2.size() + 1);
uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
uint8_t *data = new uint8_t[size];
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/test/TestBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 7b1ac6b..e675043 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -36,40 +36,40 @@
class LogTestController {
public:
static LogTestController& getInstance() {
- static LogTestController instance;
- return instance;
+ static LogTestController instance;
+ return instance;
}
-
+
template<typename T>
void setTrace() {
setLevel<T>(spdlog::level::trace);
}
-
+
template<typename T>
void setDebug() {
setLevel<T>(spdlog::level::debug);
}
-
+
template<typename T>
void setInfo() {
setLevel<T>(spdlog::level::info);
}
-
+
template<typename T>
void setWarn() {
setLevel<T>(spdlog::level::warn);
}
-
+
template<typename T>
void setError() {
setLevel<T>(spdlog::level::err);
}
-
+
template<typename T>
void setOff() {
setLevel<T>(spdlog::level::off);
}
-
+
template<typename T>
void setLevel(spdlog::level::level_enum level) {
logging::LoggerFactory<T>::getLogger();
@@ -77,17 +77,17 @@ class LogTestController {
modified_loggers.push_back(name);
setLevel(name, level);
}
-
+
bool contains(const std::string &ending) {
- return contains(log_output, ending);
+ return contains(log_output, ending);
}
-
+
bool contains(const std::ostringstream &stream, const std::string &ending) {
std::string str = stream.str();
logger_->log_info("Looking for %s in %s.", ending, str);
return (ending.length() > 0 && str.find(ending) != std::string::npos);
}
-
+
void reset() {
for (auto const & name : modified_loggers) {
setLevel(name, spdlog::level::err);
@@ -95,35 +95,40 @@ class LogTestController {
modified_loggers = std::vector<std::string>();
resetStream(log_output);
}
-
+
inline void resetStream(std::ostringstream &stream) {
stream.str("");
stream.clear();
}
-
+
std::ostringstream log_output;
-
+
std::shared_ptr<logging::Logger> logger_;
private:
- class TestBootstrapLogger: public logging::Logger {
- public:
- TestBootstrapLogger(std::shared_ptr<spdlog::logger> logger):Logger(logger){};
- };
+ class TestBootstrapLogger : public logging::Logger {
+ public:
+ TestBootstrapLogger(std::shared_ptr<spdlog::logger> logger)
+ : Logger(logger) {
+ }
+ ;
+ };
LogTestController() {
- std::shared_ptr<logging::LoggerProperties> logger_properties = std::make_shared<logging::LoggerProperties>();
- logger_properties->set("logger.root", "ERROR,ostream");
- logger_properties->set("logger." + core::getClassName<LogTestController>(), "INFO");
- logger_properties->set("logger." + core::getClassName<logging::LoggerConfiguration>(), "DEBUG");
- std::shared_ptr<spdlog::sinks::dist_sink_mt> dist_sink = std::make_shared<spdlog::sinks::dist_sink_mt>();
- dist_sink->add_sink(std::make_shared<spdlog::sinks::ostream_sink_mt>(log_output, true));
- dist_sink->add_sink(spdlog::sinks::stderr_sink_mt::instance());
- logger_properties->add_sink("ostream", dist_sink);
- logging::LoggerConfiguration::getConfiguration().initialize(logger_properties);
- logger_ = logging::LoggerFactory<LogTestController>::getLogger();
+ std::shared_ptr<logging::LoggerProperties> logger_properties = std::make_shared<logging::LoggerProperties>();
+ logger_properties->set("logger.root", "ERROR,ostream");
+ logger_properties->set("logger." + core::getClassName<LogTestController>(), "INFO");
+ logger_properties->set("logger." + core::getClassName<logging::LoggerConfiguration>(), "DEBUG");
+ std::shared_ptr<spdlog::sinks::dist_sink_mt> dist_sink = std::make_shared<spdlog::sinks::dist_sink_mt>();
+ dist_sink->add_sink(std::make_shared<spdlog::sinks::ostream_sink_mt>(log_output, true));
+ dist_sink->add_sink(spdlog::sinks::stderr_sink_mt::instance());
+ logger_properties->add_sink("ostream", dist_sink);
+ logging::LoggerConfiguration::getConfiguration().initialize(logger_properties);
+ logger_ = logging::LoggerFactory<LogTestController>::getLogger();
}
LogTestController(LogTestController const&);
LogTestController& operator=(LogTestController const&);
- ~LogTestController() {};
+ ~LogTestController() {
+ }
+ ;
void setLevel(const std::string name, spdlog::level::level_enum level) {
logger_->log_info("Setting log level for %s to %s", name, spdlog::level::to_str(level));