You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/01/20 04:39:22 UTC
[3/3] nifi-minifi-cpp git commit: MINIFICPP-365 Adjusting log levels.
MINIFICPP-365 Adjusting log levels.
Cleaning up c_str calls, adding logging for session events and adjusting Site to Site logs.
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/233d1d44
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/233d1d44
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/233d1d44
Branch: refs/heads/master
Commit: 233d1d44cbe92c4bf19648b8c3fb684f1792c03f
Parents: 8207e95
Author: Aldrin Piri <al...@apache.org>
Authored: Wed Jan 17 15:11:45 2018 -0500
Committer: Aldrin Piri <al...@apache.org>
Committed: Fri Jan 19 23:38:27 2018 -0500
----------------------------------------------------------------------
bootstrap.sh | 2 +-
extensions/civetweb/processors/ListenHTTP.cpp | 30 ++++-----
.../expression-language/ProcessContextExpr.cpp | 2 +-
extensions/gps/GetGPS.cpp | 6 +-
extensions/http-curl/client/HTTPClient.cpp | 10 +--
extensions/http-curl/processors/InvokeHTTP.cpp | 48 ++++++-------
extensions/http-curl/protocols/RESTSender.cpp | 2 +-
.../http-curl/sitetosite/HTTPProtocol.cpp | 18 ++---
extensions/libarchive/BinFiles.cpp | 32 ++++-----
extensions/libarchive/BinFiles.h | 6 +-
extensions/libarchive/FocusArchiveEntry.cpp | 8 +--
extensions/libarchive/MergeContent.cpp | 4 +-
extensions/libarchive/MergeContent.h | 2 +-
extensions/libarchive/UnfocusArchiveEntry.cpp | 8 +--
extensions/librdkafka/PublishKafka.cpp | 36 +++++-----
extensions/mqtt/AbstractMQTTProcessor.cpp | 18 ++---
extensions/mqtt/ConsumeMQTT.cpp | 2 +-
extensions/mqtt/PublishMQTT.cpp | 4 +-
.../rocksdb-repos/DatabaseContentRepository.cpp | 4 +-
extensions/rocksdb-repos/FlowFileRepository.cpp | 10 +--
extensions/rocksdb-repos/FlowFileRepository.h | 14 ++--
.../rocksdb-repos/ProvenanceRepository.cpp | 6 +-
extensions/rocksdb-repos/ProvenanceRepository.h | 14 ++--
extensions/usb-camera/GetUSBCamera.cpp | 18 ++---
libminifi/include/FlowControlProtocol.h | 2 +-
.../StandardControllerServiceProvider.h | 2 +-
.../core/repository/VolatileRepository.h | 2 +-
libminifi/include/processors/ListenSyslog.h | 2 +-
libminifi/src/Connection.cpp | 12 ++--
libminifi/src/FlowControlProtocol.cpp | 46 ++++++-------
libminifi/src/FlowController.cpp | 12 ++--
libminifi/src/FlowFileRecord.cpp | 20 +++---
libminifi/src/RemoteProcessorGroupPort.cpp | 28 ++++----
libminifi/src/ThreadedSchedulingAgent.cpp | 10 +--
libminifi/src/capi/Plan.cpp | 4 +-
libminifi/src/controllers/SSLContextService.cpp | 2 +-
libminifi/src/core/ConfigurableComponent.cpp | 6 +-
libminifi/src/core/Connectable.cpp | 8 +--
libminifi/src/core/FlowConfiguration.cpp | 2 +-
libminifi/src/core/ProcessGroup.cpp | 24 +++----
libminifi/src/core/ProcessSession.cpp | 43 ++++++------
.../src/core/ProcessSessionReadCallback.cpp | 8 +--
libminifi/src/core/Processor.cpp | 16 ++---
libminifi/src/core/Repository.cpp | 4 +-
.../repository/VolatileContentRepository.cpp | 2 +-
libminifi/src/core/yaml/YamlConfiguration.cpp | 9 ++-
libminifi/src/io/FileStream.cpp | 4 +-
libminifi/src/io/tls/TLSSocket.cpp | 8 +--
libminifi/src/processors/ExecuteProcess.cpp | 28 ++++----
libminifi/src/processors/GetFile.cpp | 9 ++-
libminifi/src/processors/GetTCP.cpp | 10 +--
libminifi/src/processors/ListenSyslog.cpp | 6 +-
libminifi/src/processors/LogAttribute.cpp | 12 ++--
libminifi/src/processors/PutFile.cpp | 28 ++++----
libminifi/src/processors/TailFile.cpp | 8 +--
libminifi/src/sitetosite/RawSocketProtocol.cpp | 58 ++++++++--------
libminifi/src/sitetosite/SiteToSiteClient.cpp | 71 +++++++++++---------
main/MiNiFiMain.cpp | 2 +-
58 files changed, 407 insertions(+), 405 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/bootstrap.sh
----------------------------------------------------------------------
diff --git a/bootstrap.sh b/bootstrap.sh
index bc8b1de..0bef12b 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -355,7 +355,7 @@ show_supported_features() {
echo "B. Lib Curl Features ...........$(print_feature_status HTTP_CURL_ENABLED)"
echo "C. Lib Archive Features ........$(print_feature_status LIBARCHIVE_ENABLED)"
echo "D. Execute Script support ......$(print_feature_status EXECUTE_SCRIPT_ENABLED)"
- echo "E. Expression Langauge support .$(print_feature_status EXPRESSION_LANGAUGE_ENABLED)"
+ echo "E. Expression Language support .$(print_feature_status EXPRESSION_LANGAUGE_ENABLED)"
echo "F. Kafka support ...............$(print_feature_status KAFKA_ENABLED)"
echo "G. PCAP support ................$(print_feature_status PCAP_ENABLED)"
echo "H. USB Camera support ..........$(print_feature_status USB_ENABLED)"
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/civetweb/processors/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp
index 73ade40..d94df9b 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -58,7 +58,7 @@ core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as
core::Relationship ListenHTTP::Success("success", "All files are routed to success");
void ListenHTTP::initialize() {
- logger_->log_info("Initializing ListenHTTP");
+ logger_->log_trace("Initializing ListenHTTP");
// Set the supported properties
std::set<core::Property> properties;
@@ -81,7 +81,7 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF
std::string basePath;
if (!context->getProperty(BasePath.getName(), basePath)) {
- logger_->log_info("%s attribute is missing, so default value of %s will be used", BasePath.getName().c_str(), BasePath.getValue().c_str());
+ logger_->log_info("%s attribute is missing, so default value of %s will be used", BasePath.getName(), BasePath.getValue());
basePath = BasePath.getValue();
}
@@ -90,20 +90,20 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF
std::string listeningPort;
if (!context->getProperty(Port.getName(), listeningPort)) {
- logger_->log_error("%s attribute is missing or invalid", Port.getName().c_str());
+ logger_->log_error("%s attribute is missing or invalid", Port.getName());
return;
}
std::string authDNPattern;
if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) && !authDNPattern.empty()) {
- logger_->log_info("ListenHTTP using %s: %s", AuthorizedDNPattern.getName().c_str(), authDNPattern.c_str());
+ logger_->log_debug("ListenHTTP using %s: %s", AuthorizedDNPattern.getName(), authDNPattern);
}
std::string sslCertFile;
if (context->getProperty(SSLCertificate.getName(), sslCertFile) && !sslCertFile.empty()) {
- logger_->log_info("ListenHTTP using %s: %s", SSLCertificate.getName().c_str(), sslCertFile.c_str());
+ logger_->log_debug("ListenHTTP using %s: %s", SSLCertificate.getName(), sslCertFile);
}
// Read further TLS/SSL options only if TLS/SSL usage is implied by virtue of certificate value being set
@@ -113,33 +113,33 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF
if (!sslCertFile.empty()) {
if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile) && !sslCertAuthorityFile.empty()) {
- logger_->log_info("ListenHTTP using %s: %s", SSLCertificateAuthority.getName().c_str(), sslCertAuthorityFile.c_str());
+ logger_->log_debug("ListenHTTP using %s: %s", SSLCertificateAuthority.getName(), sslCertAuthorityFile);
}
if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer)) {
if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) {
- logger_->log_info("ListenHTTP will not verify peers");
+ logger_->log_debug("ListenHTTP will not verify peers");
} else {
- logger_->log_info("ListenHTTP will verify peers");
+ logger_->log_debug("ListenHTTP will verify peers");
}
} else {
- logger_->log_info("ListenHTTP will not verify peers");
+ logger_->log_debug("ListenHTTP will not verify peers");
}
if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer)) {
- logger_->log_info("ListenHTTP using %s: %s", SSLMinimumVersion.getName().c_str(), sslMinVer.c_str());
+ logger_->log_debug("ListenHTTP using %s: %s", SSLMinimumVersion.getName(), sslMinVer);
}
}
std::string headersAsAttributesPattern;
if (context->getProperty(HeadersAsAttributesRegex.getName(), headersAsAttributesPattern) && !headersAsAttributesPattern.empty()) {
- logger_->log_info("ListenHTTP using %s: %s", HeadersAsAttributesRegex.getName().c_str(), headersAsAttributesPattern.c_str());
+ logger_->log_debug("ListenHTTP using %s: %s", HeadersAsAttributesRegex.getName(), headersAsAttributesPattern);
}
auto numThreads = getMaxConcurrentTasks();
- logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads", listeningPort.c_str(), basePath.c_str(), numThreads);
+ logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads", listeningPort, basePath, numThreads);
// Initialize web server
std::vector<std::string> options;
@@ -225,7 +225,7 @@ void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) {
bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn) {
auto req_info = mg_get_request_info(conn);
- logger_->log_info("ListenHTTP handling POST request of length %ll", req_info->content_length);
+ logger_->log_debug("ListenHTTP handling POST request of length %ll", req_info->content_length);
// If this is a two-way TLS connection, authorize the peer against the configured pattern
if (req_info->is_ssl && req_info->client_cert != nullptr) {
@@ -271,12 +271,12 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *
session->transfer(flowFile, Success);
session->commit();
} catch (std::exception &exception) {
- logger_->log_debug("ListenHTTP Caught Exception %s", exception.what());
+ logger_->log_error("ListenHTTP Caught Exception %s", exception.what());
sendErrorResponse(conn);
session->rollback();
throw;
} catch (...) {
- logger_->log_debug("ListenHTTP Caught Exception Processor::onTrigger");
+ logger_->log_error("ListenHTTP Caught Exception Processor::onTrigger");
sendErrorResponse(conn);
session->rollback();
throw;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/expression-language/ProcessContextExpr.cpp
----------------------------------------------------------------------
diff --git a/extensions/expression-language/ProcessContextExpr.cpp b/extensions/expression-language/ProcessContextExpr.cpp
index d442978..0748e2d 100644
--- a/extensions/expression-language/ProcessContextExpr.cpp
+++ b/extensions/expression-language/ProcessContextExpr.cpp
@@ -28,7 +28,7 @@ bool ProcessContext::getProperty(const std::string &name, std::string &value,
if (expressions_.find(name) == expressions_.end()) {
std::string expression_str;
getProperty(name, expression_str);
- logger_->log_info("Compiling expression for %s/%s: %s", getProcessorNode()->getName(), name, expression_str);
+ logger_->log_debug("Compiling expression for %s/%s: %s", getProcessorNode()->getName(), name, expression_str);
expressions_.emplace(name, expression::compile(expression_str));
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/gps/GetGPS.cpp
----------------------------------------------------------------------
diff --git a/extensions/gps/GetGPS.cpp b/extensions/gps/GetGPS.cpp
index 64e8618..08a5d16 100644
--- a/extensions/gps/GetGPS.cpp
+++ b/extensions/gps/GetGPS.cpp
@@ -80,7 +80,7 @@ void GetGPS::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co
if (context->getProperty(GPSDWaitTime.getName(), value)) {
core::Property::StringToInt(value, gpsdWaitTime_);
}
- logger_->log_info("GPSD client scheduled");
+ logger_->log_trace("GPSD client scheduled");
}
void GetGPS::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
@@ -106,7 +106,7 @@ void GetGPS::onTrigger(const std::shared_ptr<core::ProcessContext> &context, con
if (gpsdata->status > 0) {
if (gpsdata->fix.longitude != gpsdata->fix.longitude || gpsdata->fix.altitude != gpsdata->fix.altitude) {
- logger_->log_info("No GPS fix.\n");
+ logger_->log_info("No GPS fix.");
continue;
}
@@ -146,7 +146,7 @@ void GetGPS::onTrigger(const std::shared_ptr<core::ProcessContext> &context, con
}
} catch (std::exception &exception) {
- logger_->log_debug("GetGPS Caught Exception %s", exception.what());
+ logger_->log_error("GetGPS Caught Exception %s", exception.what());
throw;
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/http-curl/client/HTTPClient.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index 707d301..31214cc 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -98,7 +98,7 @@ HTTPClient::~HTTPClient() {
curl_easy_cleanup(http_session_);
http_session_ = nullptr;
}
- logger_->log_info("Closing HTTPClient for %s", url_);
+ logger_->log_trace("Closing HTTPClient for %s", url_);
}
void HTTPClient::forceClose() {
@@ -132,7 +132,7 @@ void HTTPClient::initialize(const std::string &method, const std::string url, co
}
void HTTPClient::setDisablePeerVerification() {
- logger_->log_info("Disabling peer verification");
+ logger_->log_debug("Disabling peer verification");
curl_easy_setopt(http_session_, CURLOPT_SSL_VERIFYPEER, 0L);
}
@@ -152,7 +152,7 @@ void HTTPClient::setReadCallback(HTTPReadCallback *callbackObj) {
}
void HTTPClient::setUploadCallback(HTTPUploadCallback *callbackObj) {
- logger_->log_info("Setting callback for %s", url_);
+ logger_->log_debug("Setting callback for %s", url_);
write_callback_ = callbackObj;
if (method_ == "put" || method_ == "PUT") {
curl_easy_setopt(http_session_, CURLOPT_INFILESIZE_LARGE, (curl_off_t ) callbackObj->ptr->getBufferSize());
@@ -218,7 +218,7 @@ bool HTTPClient::submit() {
}
curl_easy_setopt(http_session_, CURLOPT_URL, url_.c_str());
- logger_->log_info("Submitting to %s", url_);
+ logger_->log_debug("Submitting to %s", url_);
if (callback == nullptr) {
content_.ptr = &read_callback_;
curl_easy_setopt(http_session_, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write);
@@ -240,7 +240,7 @@ bool HTTPClient::submit() {
return false;
}
- logger_->log_info("Finished with %s", url_);
+ logger_->log_debug("Finished with %s", url_);
std::string key = "";
for (auto header_line : header_response_.header_tokens_) {
unsigned int i = 0;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/http-curl/processors/InvokeHTTP.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index a8bcbab..6b501a8 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -112,7 +112,7 @@ core::Relationship InvokeHTTP::RelFailure("failure", "The original FlowFile will
"timeout or general exception. It will have new attributes detailing the request.");
void InvokeHTTP::initialize() {
- logger_->log_info("Initializing InvokeHTTP");
+ logger_->log_trace("Initializing InvokeHTTP");
// Set the supported properties
std::set<core::Property> properties;
@@ -142,12 +142,12 @@ void InvokeHTTP::initialize() {
void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
if (!context->getProperty(Method.getName(), method_)) {
- logger_->log_info("%s attribute is missing, so default value of %s will be used", Method.getName().c_str(), Method.getValue().c_str());
+ logger_->log_debug("%s attribute is missing, so default value of %s will be used", Method.getName(), Method.getValue());
return;
}
if (!context->getProperty(URL.getName(), url_)) {
- logger_->log_info("%s attribute is missing, so default value of %s will be used", URL.getName().c_str(), URL.getValue().c_str());
+ logger_->log_debug("%s attribute is missing, so default value of %s will be used", URL.getName(), URL.getValue());
return;
}
@@ -158,7 +158,7 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
// set the timeout in curl options.
} else {
- logger_->log_info("%s attribute is missing, so default value of %s will be used", ConnectTimeout.getName().c_str(), ConnectTimeout.getValue().c_str());
+ logger_->log_debug("%s attribute is missing, so default value of %s will be used", ConnectTimeout.getName(), ConnectTimeout.getValue());
return;
}
@@ -172,34 +172,34 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
core::Property::StringToInt(timeoutStr, read_timeout_);
} else {
- logger_->log_info("%s attribute is missing, so default value of %s will be used", ReadTimeout.getName().c_str(), ReadTimeout.getValue().c_str());
+ logger_->log_debug("%s attribute is missing, so default value of %s will be used", ReadTimeout.getName(), ReadTimeout.getValue());
}
std::string dateHeaderStr;
if (!context->getProperty(DateHeader.getName(), dateHeaderStr)) {
- logger_->log_info("%s attribute is missing, so default value of %s will be used", DateHeader.getName().c_str(), DateHeader.getValue().c_str());
+ logger_->log_debug("%s attribute is missing, so default value of %s will be used", DateHeader.getName(), DateHeader.getValue());
}
date_header_include_ = utils::StringUtils::StringToBool(dateHeaderStr, date_header_include_);
if (!context->getProperty(PropPutOutputAttributes.getName(), put_attribute_name_)) {
- logger_->log_info("%s attribute is missing, so default value of %s will be used", PropPutOutputAttributes.getName().c_str(), PropPutOutputAttributes.getValue().c_str());
+ logger_->log_debug("%s attribute is missing, so default value of %s will be used", PropPutOutputAttributes.getName(), PropPutOutputAttributes.getValue());
}
if (!context->getProperty(AttributesToSend.getName(), attribute_to_send_regex_)) {
- logger_->log_info("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName().c_str(), AttributesToSend.getValue().c_str());
+ logger_->log_debug("%s attribute is missing, so default value of %s will be used", AttributesToSend.getName(), AttributesToSend.getValue());
}
std::string always_output_response = "false";
if (!context->getProperty(AlwaysOutputResponse.getName(), always_output_response)) {
- logger_->log_info("%s attribute is missing, so default value of %s will be used", AlwaysOutputResponse.getName().c_str(), AlwaysOutputResponse.getValue().c_str());
+ logger_->log_debug("%s attribute is missing, so default value of %s will be used", AlwaysOutputResponse.getName(), AlwaysOutputResponse.getValue());
}
utils::StringUtils::StringToBool(always_output_response, always_output_response_);
std::string penalize_no_retry = "false";
if (!context->getProperty(PenalizeOnNoRetry.getName(), penalize_no_retry)) {
- logger_->log_info("%s attribute is missing, so default value of %s will be used", PenalizeOnNoRetry.getName().c_str(), PenalizeOnNoRetry.getValue().c_str());
+ logger_->log_debug("%s attribute is missing, so default value of %s will be used", PenalizeOnNoRetry.getName(), PenalizeOnNoRetry.getValue());
}
utils::StringUtils::StringToBool(penalize_no_retry, penalize_no_retry_);
@@ -214,7 +214,7 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
std::string useChunkedEncoding = "false";
if (!context->getProperty(UseChunkedEncoding.getName(), useChunkedEncoding)) {
- logger_->log_info("%s attribute is missing, so default value of %s will be used", UseChunkedEncoding.getName().c_str(), UseChunkedEncoding.getValue().c_str());
+ logger_->log_debug("%s attribute is missing, so default value of %s will be used", UseChunkedEncoding.getName(), UseChunkedEncoding.getValue());
}
utils::StringUtils::StringToBool(useChunkedEncoding, use_chunked_encoding_);
@@ -247,18 +247,18 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
if (flowFile == nullptr) {
if (!emitFlowFile(method_)) {
- logger_->log_info("InvokeHTTP -- create flow file with %s", method_.c_str());
+ logger_->log_debug("InvokeHTTP -- create flow file with %s", method_);
flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
} else {
- logger_->log_info("exiting because method is %s", method_.c_str());
+ logger_->log_debug("exiting because method is %s", method_);
return;
}
} else {
context->getProperty(URL.getName(), url, flowFile);
- logger_->log_info("InvokeHTTP -- Received flowfile");
+ logger_->log_debug("InvokeHTTP -- Received flowfile");
}
- logger_->log_info("onTrigger InvokeHTTP with %s to %s", method_, url);
+ logger_->log_debug("onTrigger InvokeHTTP with %s to %s", method_, url);
// create a transaction id
std::string tx_id = generateId();
@@ -285,7 +285,7 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
std::unique_ptr<utils::ByteInputCallBack> callback = nullptr;
std::unique_ptr<utils::HTTPUploadCallback> callbackObj = nullptr;
if (emitFlowFile(method_)) {
- logger_->log_info("InvokeHTTP -- reading flowfile");
+ logger_->log_trace("InvokeHTTP -- reading flowfile");
std::shared_ptr<ResourceClaim> claim = flowFile->getResourceClaim();
if (claim) {
callback = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
@@ -293,7 +293,7 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
callbackObj = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback);
callbackObj->ptr = callback.get();
callbackObj->pos = 0;
- logger_->log_info("InvokeHTTP -- Setting callback, size is %d", callback->getBufferSize());
+ logger_->log_trace("InvokeHTTP -- Setting callback, size is %d", callback->getBufferSize());
if (!use_chunked_encoding_) {
client.appendHeader("Content-Length", std::to_string(flowFile->getSize()));
}
@@ -303,15 +303,15 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
}
} else {
- logger_->log_info("InvokeHTTP -- Not emitting flowfile to HTTP Server");
+ logger_->log_trace("InvokeHTTP -- Not emitting flowfile to HTTP Server");
}
// append all headers
client.build_header_list(attribute_to_send_regex_, flowFile->getAttributes());
- logger_->log_info("InvokeHTTP -- curl performed");
+ logger_->log_trace("InvokeHTTP -- curl performed");
if (client.submit()) {
- logger_->log_info("InvokeHTTP -- curl successful");
+ logger_->log_trace("InvokeHTTP -- curl successful");
bool putToAttribute = !IsNullOrEmpty(put_attribute_name_);
@@ -329,7 +329,7 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
bool isSuccess = ((int32_t) (http_code / 100)) == 2;
bool output_body_to_content = isSuccess && !putToAttribute;
- logger_->log_info("isSuccess: %d, response code %d", isSuccess, http_code);
+ logger_->log_debug("isSuccess: %d, response code %d", isSuccess, http_code);
std::shared_ptr<FlowFileRecord> response_flow = nullptr;
if (output_body_to_content) {
@@ -350,7 +350,7 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
// need an import from the data stream.
session->importFrom(stream, response_flow);
} else {
- logger_->log_info("Cannot output body to content");
+ logger_->log_warn("Cannot output body to content");
response_flow = std::static_pointer_cast<FlowFileRecord>(session->create());
}
route(flowFile, response_flow, session, context, isSuccess, http_code);
@@ -371,7 +371,7 @@ void InvokeHTTP::route(std::shared_ptr<FlowFileRecord> &request,
// If the property to output the response flowfile regardless of status code is set then transfer it
bool responseSent = false;
if (always_output_response_ && response != nullptr) {
- logger_->log_info("Outputting success and response");
+ logger_->log_debug("Outputting success and response");
session->transfer(response, Success);
responseSent = true;
}
@@ -384,7 +384,7 @@ void InvokeHTTP::route(std::shared_ptr<FlowFileRecord> &request,
session->transfer(request, Success);
}
if (response != nullptr && !responseSent) {
- logger_->log_info("Outputting success and response");
+ logger_->log_debug("Outputting success and response");
session->transfer(response, Success);
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/http-curl/protocols/RESTSender.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index ebf532a..5f81005 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -43,7 +43,7 @@ void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerSe
configure->get("c2.rest.url", rest_uri_);
configure->get("c2.rest.url.ack", ack_uri_);
}
- logger_->log_info("Submitting to %s", rest_uri_);
+ logger_->log_debug("Submitting to %s", rest_uri_);
}
C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) {
std::string operation_request_str = getOperation(payload);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/http-curl/sitetosite/HTTPProtocol.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.cpp b/extensions/http-curl/sitetosite/HTTPProtocol.cpp
index d70ab45..e83c71d 100644
--- a/extensions/http-curl/sitetosite/HTTPProtocol.cpp
+++ b/extensions/http-curl/sitetosite/HTTPProtocol.cpp
@@ -67,7 +67,7 @@ std::shared_ptr<Transaction> HttpSiteToSiteClient::createTransaction(std::string
client->setPostFields("");
client->submit();
if (peer_->getStream() != nullptr)
- logger_->log_info("Closing %s",((io::HttpStream*)peer_->getStream())->getClientRef()->getURL());
+ logger_->log_debug("Closing %s",((io::HttpStream*)peer_->getStream())->getClientRef()->getURL());
if (client->getResponseCode() == 201) {
// parse the headers
auto headers = client->getParsedHeaders();
@@ -135,7 +135,7 @@ int HttpSiteToSiteClient::readResponse(const std::shared_ptr<Transaction> &trans
if (current_code == CONFIRM_TRANSACTION && transaction->getState() == DATA_EXCHANGED) {
auto stream = dynamic_cast<io::HttpStream*>(peer_->getStream());
if (!stream->isFinished()) {
- logger_->log_info("confirm read for %s, but not finished ", transaction->getUUIDStr());
+ logger_->log_debug("confirm read for %s, but not finished ", transaction->getUUIDStr());
if (stream->waitForDataAvailable()) {
code = CONTINUE_TRANSACTION;
}
@@ -146,15 +146,15 @@ int HttpSiteToSiteClient::readResponse(const std::shared_ptr<Transaction> &trans
} else {
auto stream = dynamic_cast<io::HttpStream*>(peer_->getStream());
if (stream->isFinished()) {
- logger_->log_info("Finished %s ", transaction->getUUIDStr());
+ logger_->log_debug("Finished %s ", transaction->getUUIDStr());
code = FINISH_TRANSACTION;
current_code = FINISH_TRANSACTION;
} else {
if (stream->waitForDataAvailable()) {
- logger_->log_info("data is available, so continuing transaction %s ", transaction->getUUIDStr());
+ logger_->log_debug("data is available, so continuing transaction %s ", transaction->getUUIDStr());
code = CONTINUE_TRANSACTION;
} else {
- logger_->log_info("No data available for transaction %s ", transaction->getUUIDStr());
+ logger_->log_debug("No data available for transaction %s ", transaction->getUUIDStr());
code = FINISH_TRANSACTION;
current_code = FINISH_TRANSACTION;
}
@@ -183,7 +183,7 @@ int HttpSiteToSiteClient::writeResponse(const std::shared_ptr<Transaction> &tran
return 1;
} else if (code == CONTINUE_TRANSACTION) {
- logger_->log_info("Continuing HTTP transaction");
+ logger_->log_debug("Continuing HTTP transaction");
return 1;
}
return SiteToSiteClient::writeResponse(transaction, code, message);
@@ -233,7 +233,7 @@ bool HttpSiteToSiteClient::transmitPayload(const std::shared_ptr<core::ProcessCo
void HttpSiteToSiteClient::tearDown() {
if (peer_state_ >= ESTABLISHED) {
- logger_->log_info("Site2Site Protocol tearDown");
+ logger_->log_debug("Site2Site Protocol tearDown");
}
known_transactions_.clear();
peer_->Close();
@@ -256,7 +256,7 @@ void HttpSiteToSiteClient::closeTransaction(const std::string &transactionID) {
}
std::string append_str;
- logger_->log_info("Site2Site close transaction %s", transaction->getUUIDStr().c_str());
+ logger_->log_info("Site to Site closed transaction %s", transaction->getUUIDStr());
int code = UNRECOGNIZED_RESPONSE_CODE;
if (transaction->getState() == TRANSACTION_CONFIRMED) {
@@ -314,7 +314,7 @@ void HttpSiteToSiteClient::deleteTransaction(std::string transactionID) {
}
std::string append_str;
- logger_->log_info("Site2Site delete transaction %s", transaction->getUUIDStr().c_str());
+ logger_->log_debug("Site2Site delete transaction %s", transaction->getUUIDStr());
closeTransaction(transactionID);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/libarchive/BinFiles.cpp
----------------------------------------------------------------------
diff --git a/extensions/libarchive/BinFiles.cpp b/extensions/libarchive/BinFiles.cpp
index 34afcf0..035ed29 100644
--- a/extensions/libarchive/BinFiles.cpp
+++ b/extensions/libarchive/BinFiles.cpp
@@ -77,34 +77,34 @@ void BinFiles::onSchedule(core::ProcessContext *context, core::ProcessSessionFac
int64_t valInt;
if (context->getProperty(MinSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
this->binManager_.setMinSize(valInt);
- logger_->log_info("BinFiles: MinSize [%d]", valInt);
+ logger_->log_debug("BinFiles: MinSize [%d]", valInt);
}
value = "";
if (context->getProperty(MaxSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
this->binManager_.setMaxSize(valInt);
- logger_->log_info("BinFiles: MaxSize [%d]", valInt);
+ logger_->log_debug("BinFiles: MaxSize [%d]", valInt);
}
value = "";
if (context->getProperty(MinEntries.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
this->binManager_.setMinEntries(valInt);
- logger_->log_info("BinFiles: MinEntries [%d]", valInt);
+ logger_->log_debug("BinFiles: MinEntries [%d]", valInt);
}
value = "";
if (context->getProperty(MaxEntries.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
this->binManager_.setMaxEntries(valInt);
- logger_->log_info("BinFiles: MaxEntries [%d]", valInt);
+ logger_->log_debug("BinFiles: MaxEntries [%d]", valInt);
}
value = "";
if (context->getProperty(MaxBinCount.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
maxBinCount_ = static_cast<int> (valInt);
- logger_->log_info("BinFiles: MaxBinCount [%d]", valInt);
+ logger_->log_debug("BinFiles: MaxBinCount [%d]", valInt);
}
value = "";
if (context->getProperty(MaxBinAge.getName(), value) && !value.empty()) {
core::TimeUnit unit;
if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
this->binManager_.setBinAge(valInt);
- logger_->log_info("BinFiles: MaxBinAge [%d]", valInt);
+ logger_->log_debug("BinFiles: MaxBinAge [%d]", valInt);
}
}
}
@@ -134,7 +134,7 @@ void BinManager::gatherReadyBins() {
readyBin_.push_back(std::move(bin));
queue->pop_front();
binCount_--;
- logger_->log_info("BinManager move bin %s to ready bins for group %s", readyBin_.back()->getUUIDStr(), readyBin_.back()->getGroupId());
+ logger_->log_debug("BinManager move bin %s to ready bins for group %s", readyBin_.back()->getUUIDStr(), readyBin_.back()->getGroupId());
} else {
break;
}
@@ -147,7 +147,7 @@ void BinManager::gatherReadyBins() {
// erase from the map if the queue is empty for the group
groupBinMap_.erase(group);
}
- logger_->log_info("BinManager groupBinMap size %d", groupBinMap_.size());
+ logger_->log_debug("BinManager groupBinMap size %d", groupBinMap_.size());
}
void BinManager::removeOldestBin() {
@@ -170,12 +170,12 @@ void BinManager::removeOldestBin() {
readyBin_.push_back(std::move(remove));
(*oldqueue)->pop_front();
binCount_--;
- logger_->log_info("BinManager move bin %s to ready bins for group %s", readyBin_.back()->getUUIDStr(), readyBin_.back()->getGroupId());
+ logger_->log_debug("BinManager move bin %s to ready bins for group %s", readyBin_.back()->getUUIDStr(), readyBin_.back()->getGroupId());
if ((*oldqueue)->empty()) {
groupBinMap_.erase(group);
}
}
- logger_->log_info("BinManager groupBinMap size %d", groupBinMap_.size());
+ logger_->log_debug("BinManager groupBinMap size %d", groupBinMap_.size());
}
void BinManager::getReadyBin(std::deque<std::unique_ptr<Bin>> &retBins) {
@@ -195,7 +195,7 @@ bool BinManager::offer(const std::string &group, std::shared_ptr<core::FlowFile>
if (!bin->offer(flow))
return false;
readyBin_.push_back(std::move(bin));
- logger_->log_info("BinManager move bin %s to ready bins for group %s", readyBin_.back()->getUUIDStr(), group);
+ logger_->log_debug("BinManager move bin %s to ready bins for group %s", readyBin_.back()->getUUIDStr(), group);
return true;
}
auto search = groupBinMap_.find(group);
@@ -209,7 +209,7 @@ bool BinManager::offer(const std::string &group, std::shared_ptr<core::FlowFile>
if (!bin->offer(flow))
return false;
queue->push_back(std::move(bin));
- logger_->log_info("BinManager add bin %s to group %s", queue->back()->getUUIDStr(), group);
+ logger_->log_debug("BinManager add bin %s to group %s", queue->back()->getUUIDStr(), group);
binCount_++;
}
} else {
@@ -218,7 +218,7 @@ bool BinManager::offer(const std::string &group, std::shared_ptr<core::FlowFile>
return false;
queue->push_back(std::move(bin));
binCount_++;
- logger_->log_info("BinManager add bin %s to group %s", queue->back()->getUUIDStr(), group);
+ logger_->log_debug("BinManager add bin %s to group %s", queue->back()->getUUIDStr(), group);
}
} else {
std::unique_ptr<std::deque<std::unique_ptr<Bin>>> queue = std::unique_ptr<std::deque<std::unique_ptr<Bin>>> (new std::deque<std::unique_ptr<Bin>>());
@@ -226,7 +226,7 @@ bool BinManager::offer(const std::string &group, std::shared_ptr<core::FlowFile>
if (!bin->offer(flow))
return false;
queue->push_back(std::move(bin));
- logger_->log_info("BinManager add bin %s to group %s", queue->back()->getUUIDStr(), group);
+ logger_->log_debug("BinManager add bin %s to group %s", queue->back()->getUUIDStr(), group);
groupBinMap_.insert(std::make_pair(group, std::move(queue)));
binCount_++;
}
@@ -257,7 +257,7 @@ void BinFiles::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
if (this->binManager_.getBinCount() > maxBinCount_) {
// bin count reach max allowed
context->yield();
- logger_->log_info("BinFiles reach max bin count %d", this->binManager_.getBinCount());
+ logger_->log_debug("BinFiles reach max bin count %d", this->binManager_.getBinCount());
this->binManager_.removeOldestBin();
}
@@ -274,7 +274,7 @@ void BinFiles::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
readyBins.pop_front();
// add bin's flows to the session
this->addFlowsToSession(context.get(), &mergeSession, bin);
- logger_->log_info("BinFiles start to process bin %s for group %s", bin->getUUIDStr(), bin->getGroupId());
+ logger_->log_debug("BinFiles start to process bin %s for group %s", bin->getUUIDStr(), bin->getGroupId());
if (!this->processBin(context.get(), &mergeSession, bin))
this->transferFlowsToFail(context.get(), &mergeSession, bin);
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/libarchive/BinFiles.h
----------------------------------------------------------------------
diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h
index db8a6b8..9a463b2 100644
--- a/extensions/libarchive/BinFiles.h
+++ b/extensions/libarchive/BinFiles.h
@@ -59,10 +59,10 @@ class Bin {
id_generator->generate(uuid_);
uuid_unparse_lower(uuid_, uuidStr);
uuid_str_ = uuidStr;
- logger_->log_info("Bin %s for group %s created", uuid_str_, groupId_);
+ logger_->log_debug("Bin %s for group %s created", uuid_str_, groupId_);
}
virtual ~Bin() {
- logger_->log_info("Bin %s for group %s destroyed", uuid_str_, groupId_);
+ logger_->log_debug("Bin %s for group %s destroyed", uuid_str_, groupId_);
}
// check whether the bin is full
bool isFull() {
@@ -107,7 +107,7 @@ class Bin {
queue_.push_back(flow);
queued_data_size_ += flow->getSize();
- logger_->log_info("Bin %s for group %s offer size %d byte %d min_entry %d max_entry %d", uuid_str_, groupId_, queue_.size(), queued_data_size_, minEntries_, maxEntries_);
+ logger_->log_debug("Bin %s for group %s offer size %d byte %d min_entry %d max_entry %d", uuid_str_, groupId_, queue_.size(), queued_data_size_, minEntries_, maxEntries_);
return true;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/libarchive/FocusArchiveEntry.cpp
----------------------------------------------------------------------
diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp
index afcef8b..36b252c 100644
--- a/extensions/libarchive/FocusArchiveEntry.cpp
+++ b/extensions/libarchive/FocusArchiveEntry.cpp
@@ -90,13 +90,13 @@ void FocusArchiveEntry::onTrigger(core::ProcessContext *context, core::ProcessSe
for (auto &entryMetadata : archiveMetadata.entryMetadata) {
if (entryMetadata.entryType == AE_IFREG) {
- logger_->log_info("FocusArchiveEntry importing %s from %s", entryMetadata.entryName.c_str(), entryMetadata.tmpFileName.c_str());
+ logger_->log_info("FocusArchiveEntry importing %s from %s", entryMetadata.entryName, entryMetadata.tmpFileName);
session->import(entryMetadata.tmpFileName, flowFile, false, 0);
char stashKey[37];
uuid_t stashKeyUuid;
id_generator_->generate(stashKeyUuid);
uuid_unparse_lower(stashKeyUuid, stashKey);
- logger_->log_debug("FocusArchiveEntry generated stash key %s for entry %s", stashKey, entryMetadata.entryName.c_str());
+ logger_->log_debug("FocusArchiveEntry generated stash key %s for entry %s", stashKey, entryMetadata.entryName);
entryMetadata.stashKey.assign(stashKey);
if (entryMetadata.entryName == targetEntry) {
@@ -112,7 +112,7 @@ void FocusArchiveEntry::onTrigger(core::ProcessContext *context, core::ProcessSe
if (targetEntryStashKey != "") {
session->restore(targetEntryStashKey, flowFile);
} else {
- logger_->log_warn("FocusArchiveEntry failed to locate target entry: %s", targetEntry.c_str());
+ logger_->log_warn("FocusArchiveEntry failed to locate target entry: %s", targetEntry);
}
// Set new/updated lens stack to attribute
@@ -276,7 +276,7 @@ int64_t FocusArchiveEntry::ReadCallback::process(std::shared_ptr<io::BaseStream>
auto tmpFileName = file_man_->unique_file(true);
metadata.tmpFileName = tmpFileName;
metadata.entryType = entryType;
- logger_->log_info("FocusArchiveEntry extracting %s to: %s", entryName, tmpFileName.c_str());
+ logger_->log_info("FocusArchiveEntry extracting %s to: %s", entryName, tmpFileName);
auto fd = fopen(tmpFileName.c_str(), "w");
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/libarchive/MergeContent.cpp
----------------------------------------------------------------------
diff --git a/extensions/libarchive/MergeContent.cpp b/extensions/libarchive/MergeContent.cpp
index 91f542b..8bd3f79 100644
--- a/extensions/libarchive/MergeContent.cpp
+++ b/extensions/libarchive/MergeContent.cpp
@@ -128,8 +128,8 @@ void MergeContent::onSchedule(core::ProcessContext *context, core::ProcessSessio
if (mergeStratgey_ == MERGE_STRATEGY_DEFRAGMENT) {
binManager_.setFileCount(FRAGMENT_COUNT_ATTRIBUTE);
}
- logger_->log_info("Merge Content: Strategy [%s] Format [%s] Correlation Attribute [%s] Delimiter [%s]", mergeStratgey_, mergeFormat_, correlationAttributeName_, delimiterStratgey_);
- logger_->log_info("Merge Content: Footer [%s] Header [%s] Demarcator [%s] KeepPath [%d]", footer_, header_, demarcator_, keepPath_);
+ logger_->log_debug("Merge Content: Strategy [%s] Format [%s] Correlation Attribute [%s] Delimiter [%s]", mergeStratgey_, mergeFormat_, correlationAttributeName_, delimiterStratgey_);
+ logger_->log_debug("Merge Content: Footer [%s] Header [%s] Demarcator [%s] KeepPath [%d]", footer_, header_, demarcator_, keepPath_);
if (delimiterStratgey_ == DELIMITER_STRATEGY_FILENAME) {
if (!header_.empty()) {
this->headerContent_ = readContent(header_);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/libarchive/MergeContent.h
----------------------------------------------------------------------
diff --git a/extensions/libarchive/MergeContent.h b/extensions/libarchive/MergeContent.h
index d73eb71..cb6b469 100644
--- a/extensions/libarchive/MergeContent.h
+++ b/extensions/libarchive/MergeContent.h
@@ -220,7 +220,7 @@ public:
if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, perm)) {
try {
permInt = std::stoi(perm);
- logger_->log_info("Merge Tar File %s permission %s", fileName, perm);
+ logger_->log_debug("Merge Tar File %s permission %s", fileName, perm);
archive_entry_set_perm(entry, (mode_t) permInt);
} catch (...) {
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/libarchive/UnfocusArchiveEntry.cpp
----------------------------------------------------------------------
diff --git a/extensions/libarchive/UnfocusArchiveEntry.cpp b/extensions/libarchive/UnfocusArchiveEntry.cpp
index 840ad89..e904946 100644
--- a/extensions/libarchive/UnfocusArchiveEntry.cpp
+++ b/extensions/libarchive/UnfocusArchiveEntry.cpp
@@ -212,7 +212,7 @@ int64_t UnfocusArchiveEntry::WriteCallback::process(std::shared_ptr<io::BaseStre
for (const auto &entryMetadata : _archiveMetadata->entryMetadata) {
entry = archive_entry_new();
- logger_->log_info("UnfocusArchiveEntry writing entry %s", entryMetadata.entryName.c_str());
+ logger_->log_info("UnfocusArchiveEntry writing entry %s", entryMetadata.entryName);
if (entryMetadata.entryType == AE_IFREG && entryMetadata.entrySize > 0) {
size_t stat_ok = stat(entryMetadata.tmpFileName.c_str(), &st);
@@ -230,7 +230,7 @@ int64_t UnfocusArchiveEntry::WriteCallback::process(std::shared_ptr<io::BaseStre
archive_entry_set_gid(entry, entryMetadata.entryGID);
archive_entry_set_mtime(entry, entryMetadata.entryMTime, entryMetadata.entryMTimeNsec);
- logger_->log_info("Writing %s with type %d, perms %d, size %d, uid %d, gid %d, mtime %d,%d", entryMetadata.entryName.c_str(), entryMetadata.entryType, entryMetadata.entryPerm,
+ logger_->log_info("Writing %s with type %d, perms %d, size %d, uid %d, gid %d, mtime %d,%d", entryMetadata.entryName, entryMetadata.entryType, entryMetadata.entryPerm,
entryMetadata.entrySize, entryMetadata.entryUID, entryMetadata.entryGID, entryMetadata.entryMTime, entryMetadata.entryMTimeNsec);
archive_write_header(outputArchive, entry);
@@ -239,7 +239,7 @@ int64_t UnfocusArchiveEntry::WriteCallback::process(std::shared_ptr<io::BaseStre
if (entryMetadata.entryType == AE_IFREG && entryMetadata.entrySize > 0) {
logger_->log_info("UnfocusArchiveEntry writing %d bytes of "
"data from tmp file %s to archive entry %s",
- st.st_size, entryMetadata.tmpFileName.c_str(), entryMetadata.entryName.c_str());
+ st.st_size, entryMetadata.tmpFileName, entryMetadata.entryName);
std::ifstream ifs(entryMetadata.tmpFileName, std::ifstream::in | std::ios::binary);
while (ifs.good()) {
@@ -249,7 +249,7 @@ int64_t UnfocusArchiveEntry::WriteCallback::process(std::shared_ptr<io::BaseStre
if (written < 0) {
logger_->log_error("UnfocusArchiveEntry failed to write data to "
"archive entry %s due to error: %s",
- entryMetadata.entryName.c_str(), archive_error_string(outputArchive));
+ entryMetadata.entryName, archive_error_string(outputArchive));
} else {
nlen += written;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/librdkafka/PublishKafka.cpp
----------------------------------------------------------------------
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index dc99a80..5f39e03 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -95,7 +95,7 @@ void PublishKafka::onSchedule(core::ProcessContext *context, core::ProcessSessio
if (context->getProperty(SeedBrokers.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_, "bootstrap.servers", value.c_str(), errstr, sizeof(errstr));
- logger_->log_info("PublishKafka: bootstrap.servers [%s]", value);
+ logger_->log_debug("PublishKafka: bootstrap.servers [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
@@ -103,21 +103,21 @@ void PublishKafka::onSchedule(core::ProcessContext *context, core::ProcessSessio
if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
valueConf = std::to_string(valInt);
result = rd_kafka_conf_set(conf_, "message.max.bytes", valueConf.c_str(), errstr, sizeof(errstr));
- logger_->log_info("PublishKafka: message.max.bytes [%s]", valueConf);
+ logger_->log_debug("PublishKafka: message.max.bytes [%s]", valueConf);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
value = "";
if (context->getProperty(ClientName.getName(), value) && !value.empty()) {
rd_kafka_conf_set(conf_, "client.id", value.c_str(), errstr, sizeof(errstr));
- logger_->log_info("PublishKafka: client.id [%s]", value);
+ logger_->log_debug("PublishKafka: client.id [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
value = "";
if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty()) {
rd_kafka_conf_set(conf_, "queue.buffering.max.messages", value.c_str(), errstr, sizeof(errstr));
- logger_->log_info("PublishKafka: queue.buffering.max.messages [%s]", value);
+ logger_->log_debug("PublishKafka: queue.buffering.max.messages [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
@@ -126,7 +126,7 @@ void PublishKafka::onSchedule(core::ProcessContext *context, core::ProcessSessio
valInt = valInt/1024;
valueConf = std::to_string(valInt);
rd_kafka_conf_set(conf_, "queue.buffering.max.kbytes", valueConf.c_str(), errstr, sizeof(errstr));
- logger_->log_info("PublishKafka: queue.buffering.max.kbytes [%s]", valueConf);
+ logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [%s]", valueConf);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
@@ -134,7 +134,7 @@ void PublishKafka::onSchedule(core::ProcessContext *context, core::ProcessSessio
max_seg_size_ = ULLONG_MAX;
if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
max_seg_size_ = valInt;
- logger_->log_info("PublishKafka: max flow segment size [%d]", max_seg_size_);
+ logger_->log_debug("PublishKafka: max flow segment size [%d]", max_seg_size_);
}
value = "";
if (context->getProperty(QueueBufferMaxTime.getName(), value) && !value.empty()) {
@@ -142,7 +142,7 @@ void PublishKafka::onSchedule(core::ProcessContext *context, core::ProcessSessio
if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
valueConf = std::to_string(valInt);
rd_kafka_conf_set(conf_, "queue.buffering.max.ms", valueConf.c_str(), errstr, sizeof(errstr));
- logger_->log_info("PublishKafka: queue.buffering.max.ms [%s]", valueConf);
+ logger_->log_debug("PublishKafka: queue.buffering.max.ms [%s]", valueConf);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
@@ -150,21 +150,21 @@ void PublishKafka::onSchedule(core::ProcessContext *context, core::ProcessSessio
value = "";
if (context->getProperty(BatchSize.getName(), value) && !value.empty()) {
rd_kafka_conf_set(conf_, "batch.num.messages", value.c_str(), errstr, sizeof(errstr));
- logger_->log_info("PublishKafka: batch.num.messages [%s]", value);
+ logger_->log_debug("PublishKafka: batch.num.messages [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
value = "";
if (context->getProperty(CompressCodec.getName(), value) && !value.empty()) {
rd_kafka_conf_set(conf_, "compression.codec", value.c_str(), errstr, sizeof(errstr));
- logger_->log_info("PublishKafka: compression.codec [%s]", value);
+ logger_->log_debug("PublishKafka: compression.codec [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
value = "";
if (context->getProperty(DeliveryGuarantee.getName(), value) && !value.empty()) {
rd_kafka_topic_conf_set(topic_conf_, "request.required.acks", value.c_str(), errstr, sizeof(errstr));
- logger_->log_info("PublishKafka: request.required.acks [%s]", value);
+ logger_->log_debug("PublishKafka: request.required.acks [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
@@ -174,7 +174,7 @@ void PublishKafka::onSchedule(core::ProcessContext *context, core::ProcessSessio
if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
valueConf = std::to_string(valInt);
rd_kafka_topic_conf_set(topic_conf_, "request.timeout.ms", valueConf.c_str(), errstr, sizeof(errstr));
- logger_->log_info("PublishKafka: request.timeout.ms [%s]", valueConf);
+ logger_->log_debug("PublishKafka: request.timeout.ms [%s]", valueConf);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
@@ -183,35 +183,35 @@ void PublishKafka::onSchedule(core::ProcessContext *context, core::ProcessSessio
if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
if (value == SECURITY_PROTOCOL_SSL) {
rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr, sizeof(errstr));
- logger_->log_info("PublishKafka: security.protocol [%s]", value);
+ logger_->log_debug("PublishKafka: security.protocol [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
} else {
value = "";
if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
rd_kafka_conf_set(conf_, "ssl.ca.location", value.c_str(), errstr, sizeof(errstr));
- logger_->log_info("PublishKafka: ssl.ca.location [%s]", value);
+ logger_->log_debug("PublishKafka: ssl.ca.location [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
value = "";
if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) {
rd_kafka_conf_set(conf_, "ssl.certificate.location", value.c_str(), errstr, sizeof(errstr));
- logger_->log_info("PublishKafka: ssl.certificate.location [%s]", value);
+ logger_->log_debug("PublishKafka: ssl.certificate.location [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
value = "";
if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) {
rd_kafka_conf_set(conf_, "ssl.key.location", value.c_str(), errstr, sizeof(errstr));
- logger_->log_info("PublishKafka: ssl.key.location [%s]", value);
+ logger_->log_debug("PublishKafka: ssl.key.location [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
value = "";
if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) {
rd_kafka_conf_set(conf_, "ssl.key.password", value.c_str(), errstr, sizeof(errstr));
- logger_->log_info("PublishKafka: ssl.key.password [%s]", value);
+ logger_->log_debug("PublishKafka: ssl.key.password [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
@@ -221,9 +221,9 @@ void PublishKafka::onSchedule(core::ProcessContext *context, core::ProcessSessio
value = "";
if (context->getProperty(Topic.getName(), value) && !value.empty()) {
topic_ = value;
- logger_->log_info("PublishKafka: topic [%s]", topic_);
+ logger_->log_debug("PublishKafka: topic [%s]", topic_);
} else {
- logger_->log_info("PublishKafka: topic not configured");
+ logger_->log_warn("PublishKafka: topic not configured");
return;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/mqtt/AbstractMQTTProcessor.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/AbstractMQTTProcessor.cpp b/extensions/mqtt/AbstractMQTTProcessor.cpp
index 9ce052d..409b69f 100644
--- a/extensions/mqtt/AbstractMQTTProcessor.cpp
+++ b/extensions/mqtt/AbstractMQTTProcessor.cpp
@@ -70,39 +70,39 @@ void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::Proc
value = "";
if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
uri_ = value;
- logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+ logger_->log_debug("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
}
value = "";
if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
clientID_ = value;
- logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+ logger_->log_debug("AbstractMQTTProcessor: ClientID [%s]", clientID_);
}
value = "";
if (context->getProperty(Topic.getName(), value) && !value.empty()) {
topic_ = value;
- logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+ logger_->log_debug("AbstractMQTTProcessor: Topic [%s]", topic_);
}
value = "";
if (context->getProperty(UserName.getName(), value) && !value.empty()) {
userName_ = value;
- logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
+ logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", userName_);
}
value = "";
if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
passWord_ = value;
- logger_->log_info("AbstractMQTTProcessor: PassWord [%s]", passWord_);
+ logger_->log_debug("AbstractMQTTProcessor: PassWord [%s]", passWord_);
}
value = "";
if (context->getProperty(CleanSession.getName(), value) && !value.empty() &&
org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, cleanSession_)) {
- logger_->log_info("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_);
+ logger_->log_debug("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_);
}
value = "";
if (context->getProperty(KeepLiveInterval.getName(), value) && !value.empty()) {
core::TimeUnit unit;
if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
keepAliveInterval_ = valInt/1000;
- logger_->log_info("AbstractMQTTProcessor: KeepLiveInterval [%ll]", keepAliveInterval_);
+ logger_->log_debug("AbstractMQTTProcessor: KeepLiveInterval [%ll]", keepAliveInterval_);
}
}
value = "";
@@ -110,14 +110,14 @@ void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::Proc
core::TimeUnit unit;
if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
connectionTimeOut_ = valInt/1000;
- logger_->log_info("AbstractMQTTProcessor: ConnectionTimeOut [%ll]", connectionTimeOut_);
+ logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeOut [%ll]", connectionTimeOut_);
}
}
value = "";
if (context->getProperty(QOS.getName(), value) && !value.empty() && (value == MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) &&
core::Property::StringToInt(value, valInt)) {
qos_ = valInt;
- logger_->log_info("AbstractMQTTProcessor: QOS [%ll]", qos_);
+ logger_->log_debug("AbstractMQTTProcessor: QOS [%ll]", qos_);
}
if (!client_) {
MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/mqtt/ConsumeMQTT.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/ConsumeMQTT.cpp b/extensions/mqtt/ConsumeMQTT.cpp
index 12de6bf..21cb79d 100644
--- a/extensions/mqtt/ConsumeMQTT.cpp
+++ b/extensions/mqtt/ConsumeMQTT.cpp
@@ -75,7 +75,7 @@ void ConsumeMQTT::onSchedule(core::ProcessContext *context, core::ProcessSession
value = "";
if (context->getProperty(MaxQueueSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
maxQueueSize_ = valInt;
- logger_->log_info("ConsumeMQTT: max queue size [%ll]", maxQueueSize_);
+ logger_->log_debug("ConsumeMQTT: max queue size [%ll]", maxQueueSize_);
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/mqtt/PublishMQTT.cpp
----------------------------------------------------------------------
diff --git a/extensions/mqtt/PublishMQTT.cpp b/extensions/mqtt/PublishMQTT.cpp
index 44ce298..411cc2d 100644
--- a/extensions/mqtt/PublishMQTT.cpp
+++ b/extensions/mqtt/PublishMQTT.cpp
@@ -67,11 +67,11 @@ void PublishMQTT::onSchedule(core::ProcessContext *context, core::ProcessSession
value = "";
if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
max_seg_size_ = valInt;
- logger_->log_info("PublishMQTT: max flow segment size [%ll]", max_seg_size_);
+ logger_->log_debug("PublishMQTT: max flow segment size [%ll]", max_seg_size_);
}
value = "";
if (context->getProperty(Retain.getName(), value) && !value.empty() && org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, retain_)) {
- logger_->log_info("PublishMQTT: Retain [%d]", retain_);
+ logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/rocksdb-repos/DatabaseContentRepository.cpp
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 50f007f..40b2124 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -45,10 +45,10 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
options.max_successive_merges = 0;
rocksdb::Status status = rocksdb::DB::Open(options, directory_.c_str(), &db_);
if (status.ok()) {
- logger_->log_debug("NiFi Content DB Repository database open %s success", directory_.c_str());
+ logger_->log_debug("NiFi Content DB Repository database open %s success", directory_);
is_valid_ = true;
} else {
- logger_->log_error("NiFi Content DB Repository database open %s fail", directory_.c_str());
+ logger_->log_error("NiFi Content DB Repository database open %s fail", directory_);
is_valid_ = false;
}
return is_valid_;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/rocksdb-repos/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index d5b403c..0d055ea 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -47,12 +47,12 @@ void FlowFileRepository::flush() {
if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(value.data()), value.size())) {
purgeList.push_back(eventRead);
}
- logger_->log_info("Issuing batch delete, including %s, Content path %s", eventRead->getUUIDStr(), eventRead->getContentFullPath());
+ logger_->log_debug("Issuing batch delete, including %s, Content path %s", eventRead->getUUIDStr(), eventRead->getContentFullPath());
batch.Delete(key);
}
}
if (db_->Write(rocksdb::WriteOptions(), &batch).ok()) {
- logger_->log_info("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load());
+ logger_->log_trace("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load());
if (decrement_total > repo_size_.load()) {
repo_size_ = 0;
} else {
@@ -98,7 +98,7 @@ void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentReposi
std::string key = it->key().ToString();
repo_size_ += it->value().size();
if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
- logger_->log_info("Found connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
+ logger_->log_debug("Found connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
auto search = connectionMap.find(eventRead->getConnectionUuid());
if (search != connectionMap.end()) {
// we find the connection for the persistent flowfile, create the flowfile and enqueue that
@@ -106,7 +106,7 @@ void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentReposi
eventRead->setStoredToRepository(true);
search->second->put(eventRead);
} else {
- logger_->log_info("Could not find connectinon for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
+ logger_->log_warn("Could not find connectinon for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
if (eventRead->getContentFullPath().length() > 0) {
if (nullptr != eventRead->getResourceClaim()) {
content_repo_->remove(eventRead->getResourceClaim());
@@ -121,7 +121,7 @@ void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentReposi
delete it;
for (auto eventId : purgeList) {
- logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.first.c_str());
+ logger_->log_debug("Repository Repo %s Purge %s", name_, eventId.first);
if (Delete(eventId.first)) {
repo_size_ -= eventId.second;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/rocksdb-repos/FlowFileRepository.h
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h
index a826bad..1bdd440 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -75,26 +75,26 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
if (configure->get(Configure::nifi_flowfile_repository_directory_default, value)) {
directory_ = value;
}
- logger_->log_info("NiFi FlowFile Repository Directory %s", directory_.c_str());
+ logger_->log_debug("NiFi FlowFile Repository Directory %s", directory_);
if (configure->get(Configure::nifi_flowfile_repository_max_storage_size, value)) {
Property::StringToInt(value, max_partition_bytes_);
}
- logger_->log_info("NiFi FlowFile Max Partition Bytes %d", max_partition_bytes_);
+ logger_->log_debug("NiFi FlowFile Max Partition Bytes %d", max_partition_bytes_);
if (configure->get(Configure::nifi_flowfile_repository_max_storage_time, value)) {
TimeUnit unit;
if (Property::StringToTime(value, max_partition_millis_, unit) && Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_)) {
}
}
- logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", max_partition_millis_);
+ logger_->log_debug("NiFi FlowFile Max Storage Time: [%d] ms", max_partition_millis_);
rocksdb::Options options;
options.create_if_missing = true;
options.use_direct_io_for_flush_and_compaction = true;
options.use_direct_reads = true;
- rocksdb::Status status = rocksdb::DB::Open(options, directory_.c_str(), &db_);
+ rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db_);
if (status.ok()) {
- logger_->log_info("NiFi FlowFile Repository database open %s success", directory_.c_str());
+ logger_->log_debug("NiFi FlowFile Repository database open %s success", directory_);
} else {
- logger_->log_error("NiFi FlowFile Repository database open %s fail", directory_.c_str());
+ logger_->log_error("NiFi FlowFile Repository database open %s fail", directory_);
return false;
}
return true;
@@ -148,7 +148,7 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
}
running_ = true;
thread_ = std::thread(&FlowFileRepository::run, shared_from_this());
- logger_->log_info("%s Repository Monitor Thread Start", getName());
+ logger_->log_debug("%s Repository Monitor Thread Start", getName());
}
private:
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/rocksdb-repos/ProvenanceRepository.cpp
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp
index 6ff4056..bbd58fd 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.cpp
+++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp
@@ -39,11 +39,11 @@ void ProvenanceRepository::flush() {
db_->Get(options, key, &value);
decrement_total += value.size();
batch.Delete(key);
- logger_->log_info("Removing %s", key);
+ logger_->log_debug("Removing %s", key);
}
}
if (db_->Write(rocksdb::WriteOptions(), &batch).ok()) {
- logger_->log_info("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load());
+ logger_->log_debug("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load());
if (decrement_total > repo_size_.load()) {
repo_size_ = 0;
} else {
@@ -71,7 +71,7 @@ void ProvenanceRepository::run() {
if ((curTime - eventTime) > (uint64_t)max_partition_millis_)
Delete(key);
} else {
- logger_->log_debug("NiFi Provenance retrieve event %s fail", key.c_str());
+ logger_->log_debug("NiFi Provenance retrieve event %s fail", key);
Delete(key);
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/rocksdb-repos/ProvenanceRepository.h
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h
index 38d63e0..d325c24 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.h
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -71,7 +71,7 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_
return;
running_ = true;
thread_ = std::thread(&ProvenanceRepository::run, shared_from_this());
- logger_->log_info("%s Repository Monitor Thread Start", name_);
+ logger_->log_debug("%s Repository Monitor Thread Start", name_);
}
// initialize
@@ -80,26 +80,26 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_
if (config->get(Configure::nifi_provenance_repository_directory_default, value)) {
directory_ = value;
}
- logger_->log_info("NiFi Provenance Repository Directory %s", directory_.c_str());
+ logger_->log_debug("NiFi Provenance Repository Directory %s", directory_);
if (config->get(Configure::nifi_provenance_repository_max_storage_size, value)) {
core::Property::StringToInt(value, max_partition_bytes_);
}
- logger_->log_info("NiFi Provenance Max Partition Bytes %d", max_partition_bytes_);
+ logger_->log_debug("NiFi Provenance Max Partition Bytes %d", max_partition_bytes_);
if (config->get(Configure::nifi_provenance_repository_max_storage_time, value)) {
core::TimeUnit unit;
if (core::Property::StringToTime(value, max_partition_millis_, unit) && core::Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_)) {
}
}
- logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", max_partition_millis_);
+ logger_->log_debug("NiFi Provenance Max Storage Time: [%d] ms", max_partition_millis_);
rocksdb::Options options;
options.create_if_missing = true;
options.use_direct_io_for_flush_and_compaction = true;
options.use_direct_reads = true;
- rocksdb::Status status = rocksdb::DB::Open(options, directory_.c_str(), &db_);
+ rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db_);
if (status.ok()) {
- logger_->log_info("NiFi Provenance Repository database open %s success", directory_.c_str());
+ logger_->log_debug("NiFi Provenance Repository database open %s success", directory_);
} else {
- logger_->log_error("NiFi Provenance Repository database open %s fail", directory_.c_str());
+ logger_->log_error("NiFi Provenance Repository database open %s fail", directory_);
return false;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/extensions/usb-camera/GetUSBCamera.cpp
----------------------------------------------------------------------
diff --git a/extensions/usb-camera/GetUSBCamera.cpp b/extensions/usb-camera/GetUSBCamera.cpp
index 761431f..09699bd 100644
--- a/extensions/usb-camera/GetUSBCamera.cpp
+++ b/extensions/usb-camera/GetUSBCamera.cpp
@@ -77,7 +77,7 @@ void GetUSBCamera::onFrame(uvc_frame_t *frame, void *ptr) {
try {
uvc_error_t ret;
- cb_data->logger->log_info("Got frame");
+ cb_data->logger->log_debug("Got frame");
ret = uvc_any2rgb(frame, cb_data->frame_buffer);
@@ -170,7 +170,7 @@ void GetUSBCamera::onSchedule(core::ProcessContext *context, core::ProcessSessio
return;
}
- logger_->log_info("UVC initialized");
+ logger_->log_debug("UVC initialized");
// Locate device
res = uvc_find_device(ctx_, &dev_, usb_vendor_id, usb_product_id, usb_serial_no);
@@ -215,7 +215,7 @@ void GetUSBCamera::onSchedule(core::ProcessContext *context, core::ProcessSessio
logger_->log_info("Skipping MJPEG frame formats");
default:
- logger_->log_info("Found unknown format");
+ logger_->log_warn("Found unknown format");
}
}
@@ -277,30 +277,30 @@ void GetUSBCamera::cleanupUvc() {
std::lock_guard<std::recursive_mutex> lock(*dev_access_mtx_);
if (frame_buffer_ != nullptr) {
- logger_->log_info("Deallocating frame buffer");
+ logger_->log_debug("Deallocating frame buffer");
uvc_free_frame(frame_buffer_);
}
if (devh_ != nullptr) {
- logger_->log_info("Stopping UVC streaming");
+ logger_->log_debug("Stopping UVC streaming");
uvc_stop_streaming(devh_);
- logger_->log_info("Closing UVC device handle");
+ logger_->log_debug("Closing UVC device handle");
uvc_close(devh_);
}
if (dev_ != nullptr) {
- logger_->log_info("Closing UVC device descriptor");
+ logger_->log_debug("Closing UVC device descriptor");
uvc_unref_device(dev_);
}
if (ctx_ != nullptr) {
- logger_->log_info("Closing UVC context");
+ logger_->log_debug("Closing UVC context");
uvc_exit(ctx_);
}
if (camera_thread_ != nullptr) {
camera_thread_->join();
- logger_->log_info("UVC thread ended");
+ logger_->log_debug("UVC thread ended");
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/include/FlowControlProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h
index 56b1f62..72da2d3 100644
--- a/libminifi/include/FlowControlProtocol.h
+++ b/libminifi/include/FlowControlProtocol.h
@@ -169,7 +169,7 @@ class FlowControlProtocol {
if (configure->get(Configure::nifi_server_name, value)) {
_serverName = value;
- logger_->log_info("NiFi Server Name %s", _serverName.c_str());
+ logger_->log_info("NiFi Server Name %s", _serverName);
}
if (configure->get(Configure::nifi_server_port, value) && core::Property::StringToInt(value, _serverPort)) {
logger_->log_info("NiFi Server Port: [%ll]", _serverPort);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/include/core/controller/StandardControllerServiceProvider.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h
index a6ca684..4d70757 100644
--- a/libminifi/include/core/controller/StandardControllerServiceProvider.h
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -117,7 +117,7 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
logger_->log_info("Enabling %s", service->getName());
agent_->enableControllerService(service);
} else {
- logger_->log_info("Could not enable %s", service->getName());
+ logger_->log_warn("Could not enable %s", service->getName());
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/include/core/repository/VolatileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h
index 33fcf83..be23a0b 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -378,7 +378,7 @@ void VolatileRepository<T>::start() {
return;
running_ = true;
thread_ = std::thread(&VolatileRepository<T>::run, std::enable_shared_from_this<VolatileRepository<T>>::shared_from_this());
- logger_->log_info("%s Repository Monitor Thread Start", name_);
+ logger_->log_debug("%s Repository Monitor Thread Start", name_);
}
#if defined(__clang__)
#pragma clang diagnostic pop
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/include/processors/ListenSyslog.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ListenSyslog.h b/libminifi/include/processors/ListenSyslog.h
index 25acac9..f9dc678 100644
--- a/libminifi/include/processors/ListenSyslog.h
+++ b/libminifi/include/processors/ListenSyslog.h
@@ -91,7 +91,7 @@ class ListenSyslog : public core::Processor {
}
_clientSockets.clear();
if (_serverSocket > 0) {
- logger_->log_info("ListenSysLog Server socket %d close", _serverSocket);
+ logger_->log_debug("ListenSysLog Server socket %d close", _serverSocket);
close(_serverSocket);
_serverSocket = 0;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index 5b9187f..9513f68 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -58,7 +58,7 @@ Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository,
expired_duration_ = 0;
queued_data_size_ = 0;
- logger_->log_info("Connection %s created", name_.c_str());
+ logger_->log_debug("Connection %s created", name_);
}
bool Connection::isEmpty() {
@@ -122,7 +122,7 @@ std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::
if (getTimeMillis() > (item->getEntryDate() + expired_duration_)) {
// Flow record expired
expiredFlowRecords.insert(item);
- logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr().c_str(), name_.c_str());
+ logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
if (flow_repository_->Delete(item->getUUIDStr())) {
item->setStoredToRepository(false);
}
@@ -136,7 +136,7 @@ std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::
}
std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
item->setOriginalConnection(connectable);
- logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr().c_str(), name_.c_str());
+ logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr(), name_);
return item;
}
} else {
@@ -149,7 +149,7 @@ std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::
}
std::shared_ptr<Connectable> connectable = std::static_pointer_cast<Connectable>(shared_from_this());
item->setOriginalConnection(connectable);
- logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr().c_str(), name_.c_str());
+ logger_->log_debug("Dequeue flow file UUID %s from connection %s", item->getUUIDStr(), name_);
return item;
}
}
@@ -163,13 +163,13 @@ void Connection::drain() {
while (!queue_.empty()) {
std::shared_ptr<core::FlowFile> item = queue_.front();
queue_.pop();
- logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr().c_str(), name_.c_str());
+ logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
if (flow_repository_->Delete(item->getUUIDStr())) {
item->setStoredToRepository(false);
}
}
queued_data_size_ = 0;
- logger_->log_debug("Drain connection %s", name_.c_str());
+ logger_->log_debug("Drain connection %s", name_);
}
} /* namespace minifi */