You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/01/20 04:39:21 UTC
[2/3] nifi-minifi-cpp git commit: MINIFICPP-365 Adjusting log levels.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/FlowControlProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp
index b5fc928..60122b0 100644
--- a/libminifi/src/FlowControlProtocol.cpp
+++ b/libminifi/src/FlowControlProtocol.cpp
@@ -107,7 +107,7 @@ int FlowControlProtocol::connectServer(const char *host, uint16_t port) {
return 0;
}
- logger_->log_info("Flow Control Protocol socket %ll connect to server %s port %ll success", sock, host, port);
+ logger_->log_debug("Flow Control Protocol socket %ll connect to server %s port %ll success", sock, host, port);
return sock;
}
@@ -207,7 +207,7 @@ void FlowControlProtocol::start() {
if (running_)
return;
running_ = true;
- logger_->log_info("FlowControl Protocol Start");
+ logger_->log_trace("FlowControl Protocol Start");
_thread = new std::thread(run, this);
_thread->detach();
}
@@ -234,7 +234,7 @@ void FlowControlProtocol::run(FlowControlProtocol *protocol) {
int FlowControlProtocol::sendRegisterReq() {
if (_registered) {
- logger_->log_info("Already registered");
+ logger_->log_debug("Already registered");
return -1;
}
@@ -291,21 +291,21 @@ int FlowControlProtocol::sendRegisterReq() {
logger_->log_error("Flow Control Protocol Read Register Resp header failed");
return -1;
}
- logger_->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
- logger_->log_info("Flow Control Protocol receive Seq Num %ll", hdr.seqNumber);
- logger_->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
- logger_->log_info("Flow Control Protocol receive Payload len %ll", hdr.payloadLen);
+ logger_->log_debug("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+ logger_->log_debug("Flow Control Protocol receive Seq Num %ll", hdr.seqNumber);
+ logger_->log_debug("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+ logger_->log_debug("Flow Control Protocol receive Payload len %ll", hdr.payloadLen);
if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) {
this->_registered = true;
this->_seqNumber++;
- logger_->log_info("Flow Control Protocol Register success");
+ logger_->log_trace("Flow Control Protocol Register success");
uint8_t *payload = new uint8_t[hdr.payloadLen];
uint8_t *payloadPtr = payload;
status = readData(payload, hdr.payloadLen);
if (status <= 0) {
delete[] payload;
- logger_->log_info("Flow Control Protocol Register Read Payload fail");
+ logger_->log_warn("Flow Control Protocol Register Read Payload fail");
close(_socket);
_socket = 0;
return -1;
@@ -317,7 +317,7 @@ int FlowControlProtocol::sendRegisterReq() {
// Fixed 4 bytes
uint32_t reportInterval;
payloadPtr = this->decode(payloadPtr, reportInterval);
- logger_->log_info("Flow Control Protocol receive report interval %ll ms", reportInterval);
+ logger_->log_debug("Flow Control Protocol receive report interval %ll ms", reportInterval);
this->_reportInterval = reportInterval;
} else {
break;
@@ -328,7 +328,7 @@ int FlowControlProtocol::sendRegisterReq() {
_socket = 0;
return 0;
} else {
- logger_->log_info("Flow Control Protocol Register fail");
+ logger_->log_warn("Flow Control Protocol Register fail");
close(_socket);
_socket = 0;
return -1;
@@ -385,10 +385,10 @@ int FlowControlProtocol::sendReportReq() {
logger_->log_error("Flow Control Protocol Read Report Resp header failed");
return -1;
}
- logger_->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
- logger_->log_info("Flow Control Protocol receive Seq Num %ll", hdr.seqNumber);
- logger_->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
- logger_->log_info("Flow Control Protocol receive Payload len %ll", hdr.payloadLen);
+ logger_->log_debug("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+ logger_->log_debug("Flow Control Protocol receive Seq Num %ll", hdr.seqNumber);
+ logger_->log_debug("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+ logger_->log_debug("Flow Control Protocol receive Payload len %ll", hdr.payloadLen);
if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) {
this->_seqNumber++;
@@ -397,7 +397,7 @@ int FlowControlProtocol::sendReportReq() {
status = readData(payload, hdr.payloadLen);
if (status <= 0) {
delete[] payload;
- logger_->log_info("Flow Control Protocol Report Resp Read Payload fail");
+ logger_->log_warn("Flow Control Protocol Report Resp Read Payload fail");
close(_socket);
_socket = 0;
return -1;
@@ -413,19 +413,19 @@ int FlowControlProtocol::sendReportReq() {
payloadPtr = this->decode(payloadPtr, len);
processor = (const char *) payloadPtr;
payloadPtr += len;
- logger_->log_info("Flow Control Protocol receive report resp processor %s", processor.c_str());
+ logger_->log_debug("Flow Control Protocol receive report resp processor %s", processor);
} else if (((FlowControlMsgID) msgID) == PROPERTY_NAME) {
uint32_t len;
payloadPtr = this->decode(payloadPtr, len);
propertyName = (const char *) payloadPtr;
payloadPtr += len;
- logger_->log_info("Flow Control Protocol receive report resp property name %s", propertyName.c_str());
+ logger_->log_debug("Flow Control Protocol receive report resp property name %s", propertyName);
} else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE) {
uint32_t len;
payloadPtr = this->decode(payloadPtr, len);
propertyValue = (const char *) payloadPtr;
payloadPtr += len;
- logger_->log_info("Flow Control Protocol receive report resp property value %s", propertyValue.c_str());
+ logger_->log_debug("Flow Control Protocol receive report resp property value %s", propertyValue);
this->_controller->updatePropertyValue(processor, propertyName, propertyValue);
} else {
break;
@@ -436,28 +436,28 @@ int FlowControlProtocol::sendReportReq() {
_socket = 0;
return 0;
} else if (hdr.status == RESP_TRIGGER_REGISTER && hdr.seqNumber == this->_seqNumber) {
- logger_->log_info("Flow Control Protocol trigger reregister");
+ logger_->log_trace("Flow Control Protocol trigger reregister");
this->_registered = false;
this->_seqNumber++;
close(_socket);
_socket = 0;
return 0;
} else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) {
- logger_->log_info("Flow Control Protocol stop flow controller");
+ logger_->log_trace("Flow Control Protocol stop flow controller");
this->_controller->stop(true);
this->_seqNumber++;
close(_socket);
_socket = 0;
return 0;
} else if (hdr.status == RESP_START_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) {
- logger_->log_info("Flow Control Protocol start flow controller");
+ logger_->log_trace("Flow Control Protocol start flow controller");
this->_controller->start();
this->_seqNumber++;
close(_socket);
_socket = 0;
return 0;
} else {
- logger_->log_info("Flow Control Protocol Report fail");
+ logger_->log_trace("Flow Control Protocol Report fail");
close(_socket);
_socket = 0;
return -1;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 4d2e1b7..e6118e6 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -135,7 +135,7 @@ void FlowController::initializePaths(const std::string &adjustedFilename) {
}
std::string pathString(path);
configuration_filename_ = pathString;
- logger_->log_info("FlowController NiFi Configuration file %s", pathString.c_str());
+ logger_->log_info("FlowController NiFi Configuration file %s", pathString);
// Create the content repo directory if needed
struct stat contentDirStat;
@@ -181,7 +181,7 @@ bool FlowController::applyConfiguration(const std::string &configurePayload) {
if (newRoot == nullptr)
return false;
- logger_->log_info("Starting to reload Flow Controller with flow control name %s, version %d", newRoot->getName().c_str(), newRoot->getVersion());
+ logger_->log_info("Starting to reload Flow Controller with flow control name %s, version %d", newRoot->getName(), newRoot->getVersion());
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
stop(true);
@@ -287,7 +287,7 @@ void FlowController::load() {
void FlowController::reload(std::string yamlFile) {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
- logger_->log_info("Starting to reload Flow Controller with yaml %s", yamlFile.c_str());
+ logger_->log_info("Starting to reload Flow Controller with yaml %s", yamlFile);
stop(true);
unload();
std::string oldYamlFile = this->configuration_filename_;
@@ -296,7 +296,7 @@ void FlowController::reload(std::string yamlFile) {
start();
if (this->root_ != nullptr) {
this->configuration_filename_ = oldYamlFile;
- logger_->log_info("Rollback Flow Controller to YAML %s", oldYamlFile.c_str());
+ logger_->log_info("Rollback Flow Controller to YAML %s", oldYamlFile);
stop(true);
unload();
load();
@@ -405,7 +405,7 @@ void FlowController::initializeC2() {
auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
if (nullptr == ptr) {
- logger_->log_error("No metric defined for %s", clazz.c_str());
+ logger_->log_error("No metric defined for %s", clazz);
continue;
}
@@ -454,7 +454,7 @@ void FlowController::initializeC2() {
ret = metrics_[clazz];
}
if (nullptr == ret) {
- logger_->log_error("No metric defined for %s", clazz.c_str());
+ logger_->log_error("No metric defined for %s", clazz);
continue;
}
component_metrics_by_id_[id].push_back(ret);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 3816485..8775de7 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -98,11 +98,11 @@ FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository
}
FlowFileRecord::~FlowFileRecord() {
- logger_->log_debug("Destroying flow file record, UUID %s", uuidStr_.c_str());
+ logger_->log_debug("Destroying flow file record, UUID %s", uuidStr_);
if (!snapshot_)
- logger_->log_debug("Delete FlowFile UUID %s", uuidStr_.c_str());
+ logger_->log_debug("Delete FlowFile UUID %s", uuidStr_);
else
- logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuidStr_.c_str());
+ logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuidStr_);
if (claim_) {
releaseClaim(claim_);
} else {
@@ -119,11 +119,11 @@ void FlowFileRecord::releaseClaim(std::shared_ptr<ResourceClaim> claim) {
// Decrease the flow file record owned count for the resource claim
claim_->decreaseFlowFileRecordOwnedCount();
std::string value;
- logger_->log_debug("Delete Resource Claim %s, %s, attempt %llu", getUUIDStr(), claim_->getContentFullPath().c_str(), claim_->getFlowFileRecordOwnedCount());
+ logger_->log_debug("Delete Resource Claim %s, %s, attempt %llu", getUUIDStr(), claim_->getContentFullPath(), claim_->getFlowFileRecordOwnedCount());
if (claim_->getFlowFileRecordOwnedCount() <= 0) {
// we cannot rely on the stored variable here since we aren't guaranteed atomicity
if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_, value)) {
- logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath().c_str());
+ logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath());
content_repo_->remove(claim_);
}
}
@@ -184,7 +184,7 @@ bool FlowFileRecord::DeSerialize(std::string key) {
ret = flow_repository_->Get(key, value);
if (!ret) {
- logger_->log_error("NiFi FlowFile Store event %s can not found", key.c_str());
+ logger_->log_error("NiFi FlowFile Store event %s can not found", key);
return false;
}
io::DataStream stream((const uint8_t*) value.data(), value.length());
@@ -192,9 +192,9 @@ bool FlowFileRecord::DeSerialize(std::string key) {
ret = DeSerialize(stream);
if (ret) {
- logger_->log_debug("NiFi FlowFile retrieve uuid %s size %llu connection %s success", uuidStr_.c_str(), stream.getSize(), uuid_connection_.c_str());
+ logger_->log_debug("NiFi FlowFile retrieve uuid %s size %llu connection %s success", uuidStr_, stream.getSize(), uuid_connection_);
} else {
- logger_->log_debug("NiFi FlowFile retrieve uuid %s size %llu connection %s fail", uuidStr_.c_str(), stream.getSize(), uuid_connection_.c_str());
+ logger_->log_debug("NiFi FlowFile retrieve uuid %s size %llu connection %s fail", uuidStr_, stream.getSize(), uuid_connection_);
}
return ret;
@@ -263,10 +263,10 @@ bool FlowFileRecord::Serialize() {
}
if (flow_repository_->Put(uuidStr_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
- logger_->log_debug("NiFi FlowFile Store event %s size %llu success", uuidStr_.c_str(), outStream.getSize());
+ logger_->log_debug("NiFi FlowFile Store event %s size %llu success", uuidStr_, outStream.getSize());
return true;
} else {
- logger_->log_error("NiFi FlowFile Store event %s size %llu fail", uuidStr_.c_str(), outStream.getSize());
+ logger_->log_error("NiFi FlowFile Store event %s size %llu fail", uuidStr_, outStream.getSize());
return false;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index caddc32..31b6c89 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -71,7 +71,7 @@ std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessorGroupPort::getNextP
nextProtocol = sitetosite::createClient(config);
} else if (peer_index_ >= 0) {
std::lock_guard<std::mutex> lock(peer_mutex_);
- logger_->log_info("Creating client from peer %ll", peer_index_.load());
+ logger_->log_debug("Creating client from peer %ll", peer_index_.load());
sitetosite::SiteToSiteClientConfiguration config(stream_factory_, peers_[this->peer_index_].getPeer(), client_type_);
peer_index_++;
@@ -81,12 +81,12 @@ std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessorGroupPort::getNextP
nextProtocol = sitetosite::createClient(config);
} else {
- logger_->log_info("Refreshing the peer list since there are none configured.");
+ logger_->log_debug("Refreshing the peer list since there are none configured.");
refreshPeerList();
}
}
}
- logger_->log_info("Obtained protocol from available_protocols_");
+ logger_->log_debug("Obtained protocol from available_protocols_");
return nextProtocol;
}
@@ -95,11 +95,11 @@ void RemoteProcessorGroupPort::returnProtocol(std::unique_ptr<sitetosite::SiteTo
if (max_concurrent_tasks_ > count)
count = max_concurrent_tasks_;
if (available_protocols_.size_approx() >= count) {
- logger_->log_info("not enqueueing protocol %s", getUUIDStr());
+ logger_->log_debug("not enqueueing protocol %s", getUUIDStr());
// let the memory be freed
return;
}
- logger_->log_info("enqueueing protocol %s, have a total of %lu", getUUIDStr(), available_protocols_.size_approx());
+ logger_->log_debug("enqueueing protocol %s, have a total of %lu", getUUIDStr(), available_protocols_.size_approx());
available_protocols_.enqueue(std::move(return_protocol));
}
@@ -142,13 +142,13 @@ void RemoteProcessorGroupPort::initialize() {
if (peer_index_ >= static_cast<int>(peers_.size())) {
peer_index_ = 0;
}
- logger_->log_info("Creating client");
+ logger_->log_trace("Creating client");
nextProtocol = sitetosite::createClient(config);
- logger_->log_info("Created client, moving into available protocols");
+ logger_->log_trace("Created client, moving into available protocols");
returnProtocol(std::move(nextProtocol));
}
}
- logger_->log_info("Finished initialization");
+ logger_->log_trace("Finished initialization");
}
void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
@@ -168,14 +168,14 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
}
void RemoteProcessorGroupPort::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
- logger_->log_info("On trigger %s", getUUIDStr());
+ logger_->log_trace("On trigger %s", getUUIDStr());
if (!transmitting_) {
return;
}
std::string value;
- logger_->log_info("On trigger %s", getUUIDStr());
+ logger_->log_trace("On trigger %s", getUUIDStr());
if (url_.empty()) {
if (context->getProperty(hostName.getName(), value) && !value.empty()) {
host_ = value;
@@ -193,7 +193,7 @@ void RemoteProcessorGroupPort::onTrigger(const std::shared_ptr<core::ProcessCont
std::unique_ptr<sitetosite::SiteToSiteClient> protocol_ = nullptr;
try {
- logger_->log_info("get protocol in on trigger");
+ logger_->log_trace("get protocol in on trigger");
protocol_ = getNextProtocol();
if (!protocol_) {
@@ -203,7 +203,7 @@ void RemoteProcessorGroupPort::onTrigger(const std::shared_ptr<core::ProcessCont
}
if (!protocol_->transfer(direction_, context, session)) {
- logger_->log_info("protocol transmission failed, yielding");
+ logger_->log_warn("protocol transmission failed, yielding");
context->yield();
}
@@ -264,7 +264,7 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
const std::vector<char> &response_body = client->getResponseBody();
if (!response_body.empty()) {
std::string controller = std::string(response_body.begin(), response_body.end());
- logger_->log_debug("controller config %s", controller.c_str());
+ logger_->log_debug("controller config %s", controller);
Json::Value value;
Json::Reader reader;
bool parsingSuccessful = reader.parse(controller, value);
@@ -280,7 +280,7 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {
if (!secure.empty())
this->site2site_secure_ = secure.asBool();
}
- logger_->log_info("process group remote site2site port %d, is secure %d", site2site_port_, site2site_secure_);
+ logger_->log_debug("process group remote site2site port %d, is secure %d", site2site_port_, site2site_secure_);
}
} else {
logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo: received HTTP code %ll from %s", client->getResponseCode(), fullUrl);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/ThreadedSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp
index 8864ba0..61dd3fe 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -58,12 +58,12 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
}
if (processor->getScheduledState() != core::RUNNING) {
- logger_->log_info("Can not schedule threads for processor %s because it is not running", processor->getName().c_str());
+ logger_->log_debug("Can not schedule threads for processor %s because it is not running", processor->getName());
return;
}
if (thread_pool_.isRunning(processor->getUUIDStr())) {
- logger_->log_info("Can not schedule threads for processor %s because there are existing threads running");
+ logger_->log_warn("Can not schedule threads for processor %s because there are existing threads running");
return;
}
@@ -93,7 +93,7 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processo
std::future<uint64_t> future;
thread_pool_.execute(std::move(functor), future);
}
- logger_->log_info("Scheduled thread %d concurrent workers for for process %s", processor->getMaxConcurrentTasks(), processor->getName().c_str());
+ logger_->log_debug("Scheduled thread %d concurrent workers for for process %s", processor->getMaxConcurrentTasks(), processor->getName());
return;
}
@@ -104,10 +104,10 @@ void ThreadedSchedulingAgent::stop() {
void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> processor) {
std::lock_guard<std::mutex> lock(mutex_);
- logger_->log_info("Shutting down threads for processor %s/%s", processor->getName().c_str(), processor->getUUIDStr().c_str());
+ logger_->log_debug("Shutting down threads for processor %s/%s", processor->getName(), processor->getUUIDStr());
if (processor->getScheduledState() != core::RUNNING) {
- logger_->log_info("Cannot unschedule threads for processor %s because it is not running", processor->getName().c_str());
+ logger_->log_warn("Cannot unschedule threads for processor %s because it is not running", processor->getName());
return;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/capi/Plan.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp
index 53133d1..9775c80 100644
--- a/libminifi/src/capi/Plan.cpp
+++ b/libminifi/src/capi/Plan.cpp
@@ -115,7 +115,7 @@ bool linkToPrevious) {
bool ExecutionPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) {
uint32_t i = 0;
- logger_->log_info("Attempting to set property %s %s for %s", prop, value, proc->getName());
+ logger_->log_debug("Attempting to set property %s %s for %s", prop, value, proc->getName());
for (i = 0; i < processor_queue_.size(); i++) {
if (processor_queue_.at(i) == proc) {
break;
@@ -162,7 +162,7 @@ bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<co
if (verify != nullptr) {
verify(context, current_session);
} else {
- logger_->log_info("Running %s", processor->getName());
+ logger_->log_debug("Running %s", processor->getName());
processor->onTrigger(context, current_session);
}
current_session->commit();
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/controllers/SSLContextService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp
index e8dc520..1c3fd71 100644
--- a/libminifi/src/controllers/SSLContextService.cpp
+++ b/libminifi/src/controllers/SSLContextService.cpp
@@ -134,7 +134,7 @@ void SSLContextService::onEnable() {
certificate = test_cert;
logger_->log_debug("%s now good", certificate);
} else {
- logger_->log_debug("%s still not good", test_cert);
+ logger_->log_warn("%s still not good", test_cert);
valid_ = false;
}
cert_file_test.close();
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/core/ConfigurableComponent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp
index 0273aa2..a8a0b0d 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -88,7 +88,7 @@ bool ConfigurableComponent::setProperty(const std::string name, std::string valu
Property item = it->second;
item.setValue(value);
properties_[item.getName()] = item;
- logger_->log_debug("Component %s property name %s value %s", name.c_str(), item.getName().c_str(), value.c_str());
+ logger_->log_debug("Component %s property name %s value %s", name, item.getName(), value);
return true;
} else {
return false;
@@ -109,7 +109,7 @@ bool ConfigurableComponent::updateProperty(const std::string &name, const std::s
Property item = it->second;
item.addValue(value);
properties_[item.getName()] = item;
- logger_->log_debug("Component %s property name %s value %s", name.c_str(), item.getName().c_str(), value.c_str());
+ logger_->log_debug("Component %s property name %s value %s", name, item.getName(), value);
return true;
} else {
return false;
@@ -130,7 +130,7 @@ bool ConfigurableComponent::setProperty(Property &prop, std::string value) {
Property item = it->second;
item.setValue(value);
properties_[item.getName()] = item;
- logger_->log_debug("property name %s value %s", prop.getName().c_str(), item.getName().c_str(), value.c_str());
+ logger_->log_debug("property name %s value %s", prop.getName(), item.getName(), value);
return true;
} else {
Property newProp(prop);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/core/Connectable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp
index e2f033e..29ee411 100644
--- a/libminifi/src/core/Connectable.cpp
+++ b/libminifi/src/core/Connectable.cpp
@@ -49,7 +49,7 @@ Connectable::~Connectable() {
bool Connectable::setSupportedRelationships(std::set<core::Relationship> relationships) {
if (isRunning()) {
- logger_->log_info("Can not set processor supported relationship while the process %s is running", name_.c_str());
+ logger_->log_warn("Can not set processor supported relationship while the process %s is running", name_);
return false;
}
@@ -58,7 +58,7 @@ bool Connectable::setSupportedRelationships(std::set<core::Relationship> relatio
relationships_.clear();
for (auto item : relationships) {
relationships_[item.getName()] = item;
- logger_->log_info("Processor %s supported relationship name %s", name_.c_str(), item.getName().c_str());
+ logger_->log_debug("Processor %s supported relationship name %s", name_, item.getName());
}
return true;
}
@@ -81,7 +81,7 @@ bool Connectable::isSupportedRelationship(core::Relationship relationship) {
bool Connectable::setAutoTerminatedRelationships(std::set<Relationship> relationships) {
if (isRunning()) {
- logger_->log_info("Can not set processor auto terminated relationship while the process %s is running", name_.c_str());
+ logger_->log_warn("Can not set processor auto terminated relationship while the process %s is running", name_);
return false;
}
@@ -90,7 +90,7 @@ bool Connectable::setAutoTerminatedRelationships(std::set<Relationship> relation
auto_terminated_relationships_.clear();
for (auto item : relationships) {
auto_terminated_relationships_[item.getName()] = item;
- logger_->log_info("Processor %s auto terminated relationship name %s", name_.c_str(), item.getName().c_str());
+ logger_->log_debug("Processor %s auto terminated relationship name %s", name_, item.getName());
}
return true;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index 22d737a..b082dce 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -37,7 +37,7 @@ FlowConfiguration::~FlowConfiguration() {
std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, uuid_t uuid) {
auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(name, uuid);
if (nullptr == ptr) {
- logger_->log_error("No Processor defined for %s", name.c_str());
+ logger_->log_error("No Processor defined for %s", name);
return nullptr;
}
std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/core/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 5915089..023ca9d 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -54,7 +54,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid,
yield_period_msec_ = 0;
transmitting_ = false;
- logger_->log_info("ProcessGroup %s created", name_);
+ logger_->log_debug("ProcessGroup %s created", name_);
}
ProcessGroup::~ProcessGroup() {
@@ -79,7 +79,7 @@ void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
if (processors_.find(processor) == processors_.end()) {
// We do not have the same processor in this process group yet
processors_.insert(processor);
- logger_->log_info("Add processor %s into process group %s", processor->getName().c_str(), name_.c_str());
+ logger_->log_debug("Add processor %s into process group %s", processor->getName(), name_);
}
}
@@ -89,7 +89,7 @@ void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
if (processors_.find(processor) != processors_.end()) {
// We do have the same processor in this process group yet
processors_.erase(processor);
- logger_->log_info("Remove processor %s from process group %s", processor->getName().c_str(), name_.c_str());
+ logger_->log_debug("Remove processor %s from process group %s", processor->getName(), name_);
}
}
@@ -99,7 +99,7 @@ void ProcessGroup::addProcessGroup(ProcessGroup *child) {
if (child_process_groups_.find(child) == child_process_groups_.end()) {
// We do not have the same child process group in this process group yet
child_process_groups_.insert(child);
- logger_->log_info("Add child process group %s into process group %s", child->getName().c_str(), name_.c_str());
+ logger_->log_debug("Add child process group %s into process group %s", child->getName(), name_);
}
}
@@ -109,7 +109,7 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
if (child_process_groups_.find(child) != child_process_groups_.end()) {
// We do have the same child process group in this process group yet
child_process_groups_.erase(child);
- logger_->log_info("Remove child process group %s from process group %s", child->getName().c_str(), name_.c_str());
+ logger_->log_debug("Remove child process group %s from process group %s", child->getName(), name_);
}
}
@@ -119,7 +119,7 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, Ev
try {
// Start all the processor node, input and output ports
for (auto processor : processors_) {
- logger_->log_debug("Starting %s", processor->getName().c_str());
+ logger_->log_debug("Starting %s", processor->getName());
if (!processor->isRunning() && processor->getScheduledState() != DISABLED) {
if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
@@ -171,7 +171,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
std::shared_ptr<Processor> ret = NULL;
for (auto processor : processors_) {
- logger_->log_info("find processor %s", processor->getName().c_str());
+ logger_->log_debug("find processor %s", processor->getName());
uuid_t processorUUID;
if (processor->getUUID(processorUUID)) {
@@ -186,7 +186,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
}
}
for (auto processGroup : child_process_groups_) {
- logger_->log_info("find processor child %s", processGroup->getName().c_str());
+ logger_->log_debug("find processor child %s", processGroup->getName());
std::shared_ptr<Processor> processor = processGroup->findProcessor(uuid);
if (processor)
return processor;
@@ -212,7 +212,7 @@ void ProcessGroup::getAllProcessors(std::vector<std::shared_ptr<Processor>> &pro
std::shared_ptr<Processor> ret = NULL;
for (auto processor : processors_) {
- logger_->log_debug("Current processor is %s", processor->getName().c_str());
+ logger_->log_debug("Current processor is %s", processor->getName());
processor_vec.push_back(processor);
}
for (auto processGroup : child_process_groups_) {
@@ -224,7 +224,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &proces
std::lock_guard<std::recursive_mutex> lock(mutex_);
std::shared_ptr<Processor> ret = NULL;
for (auto processor : processors_) {
- logger_->log_debug("Current processor is %s", processor->getName().c_str());
+ logger_->log_debug("Current processor is %s", processor->getName());
if (processor->getName() == processorName)
return processor;
}
@@ -275,7 +275,7 @@ void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
if (connections_.find(connection) == connections_.end()) {
// We do not have the same connection in this process group yet
connections_.insert(connection);
- logger_->log_info("Add connection %s into process group %s", connection->getName().c_str(), name_.c_str());
+ logger_->log_debug("Add connection %s into process group %s", connection->getName(), name_);
uuid_t sourceUUID;
std::shared_ptr<Processor> source = NULL;
connection->getSourceUUID(sourceUUID);
@@ -297,7 +297,7 @@ void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) {
if (connections_.find(connection) != connections_.end()) {
// We do not have the same connection in this process group yet
connections_.erase(connection);
- logger_->log_info("Remove connection %s into process group %s", connection->getName().c_str(), name_.c_str());
+ logger_->log_debug("Remove connection %s into process group %s", connection->getName(), name_);
uuid_t sourceUUID;
std::shared_ptr<Processor> source = NULL;
connection->getSourceUUID(sourceUUID);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index bdfddc5..bd98db6 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -46,7 +46,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::create() {
std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty);
_addedFlowFiles[record->getUUIDStr()] = record;
- logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
+ logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr());
std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " creates flow record " << record->getUUIDStr();
provenance_report_->create(record, details.str());
@@ -64,7 +64,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::create(const std::shared_ptr<cor
if (record) {
_addedFlowFiles[record->getUUIDStr()] = record;
- logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
+ logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr());
}
if (record) {
@@ -106,7 +106,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer(std::shared_
if (record) {
this->_clonedFlowFiles[record->getUUIDStr()] = record;
- logger_->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr().c_str());
+ logger_->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr());
// Copy attributes
std::map<std::string, std::string> parentAttributes = parent->getAttributes();
std::map<std::string, std::string>::iterator it;
@@ -191,10 +191,13 @@ void ProcessSession::removeAttribute(const std::shared_ptr<core::FlowFile> &flow
}
void ProcessSession::penalize(const std::shared_ptr<core::FlowFile> &flow) {
- flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode()->getPenalizationPeriodMsec());
+ uint64_t penalization_period = process_context_->getProcessorNode()->getPenalizationPeriodMsec();
+ logging::LOG_INFO(logger_) << "Penalizing " << flow->getUUIDStr() << " for " << penalization_period << "ms at " << process_context_->getProcessorNode()->getName();
+ flow->setPenaltyExpiration(getTimeMillis() + penalization_period);
}
void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
+ logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " from " << process_context_->getProcessorNode()->getName() << " to relationship " << relationship.getName();
_transferRelationship[flow->getUUIDStr()] = relationship;
}
@@ -371,8 +374,8 @@ void ProcessSession::importFrom(io::DataStream &stream, const std::shared_ptr<co
}
flow->setResourceClaim(claim);
- logger_->log_debug("Import offset %llu length %llu into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
- flow->getUUIDStr().c_str());
+ logger_->log_debug("Import offset %llu length %llu into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath(),
+ flow->getUUIDStr());
content_stream->closeStream();
std::stringstream details;
@@ -443,8 +446,8 @@ void ProcessSession::import(std::string source, const std::shared_ptr<core::Flow
}
flow->setResourceClaim(claim);
- logger_->log_debug("Import offset %llu length %llu into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(),
- flow->getUUIDStr().c_str());
+ logger_->log_debug("Import offset %llu length %llu into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath(),
+ flow->getUUIDStr());
stream->closeStream();
input.close();
@@ -531,7 +534,7 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
flowFile->setResourceClaim(claim);
claim->increaseFlowFileRecordOwnedCount();
logger_->log_debug("Import offset %llu length %llu into content %s for FlowFile UUID %s", flowFile->getOffset(), flowFile->getSize(),
- flowFile->getResourceClaim()->getContentFullPath().c_str(), flowFile->getUUIDStr().c_str());
+ flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr());
stream->closeStream();
std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
uint64_t endTime = getTimeMillis();
@@ -568,18 +571,18 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow
}
bool ProcessSession::exportContent(const std::string &destination, const std::string &tmpFile, const std::shared_ptr<core::FlowFile> &flow, bool keepContent) {
- logger_->log_info("Exporting content of %s to %s", flow->getUUIDStr().c_str(), destination.c_str());
+ logger_->log_debug("Exporting content of %s to %s", flow->getUUIDStr(), destination);
ProcessSessionReadCallback cb(tmpFile, destination, logger_);
read(flow, &cb);
- logger_->log_info("Committing %s", destination.c_str());
+ logger_->log_info("Committing %s", destination);
bool commit_ok = cb.commit();
if (commit_ok) {
logger_->log_info("Commit OK.");
} else {
- logger_->log_error("Commit of %s to %s failed!", flow->getUUIDStr().c_str(), destination.c_str());
+ logger_->log_error("Commit of %s to %s failed!", flow->getUUIDStr(), destination);
}
return commit_ok;
}
@@ -597,12 +600,12 @@ bool ProcessSession::exportContent(const std::string &destination, const std::sh
}
void ProcessSession::stash(const std::string &key, const std::shared_ptr<core::FlowFile> &flow) {
- logger_->log_info("Stashing content from %s to key %s", flow->getUUIDStr().c_str(), key.c_str());
+ logger_->log_debug("Stashing content from %s to key %s", flow->getUUIDStr(), key);
if (!flow->getResourceClaim()) {
logger_->log_warn("Attempted to stash content of record %s when "
"there is no resource claim",
- flow->getUUIDStr().c_str());
+ flow->getUUIDStr());
return;
}
@@ -615,11 +618,11 @@ void ProcessSession::stash(const std::string &key, const std::shared_ptr<core::F
}
void ProcessSession::restore(const std::string &key, const std::shared_ptr<core::FlowFile> &flow) {
- logger_->log_info("Restoring content to %s from key %s", flow->getUUIDStr().c_str(), key.c_str());
+ logger_->log_info("Restoring content to %s from key %s", flow->getUUIDStr(), key);
// Restore the claim
if (!flow->hasStashClaim(key)) {
- logger_->log_warn("Requested restore to record %s from unknown key %s", flow->getUUIDStr().c_str(), key.c_str());
+ logger_->log_warn("Requested restore to record %s from unknown key %s", flow->getUUIDStr(), key);
return;
}
@@ -627,7 +630,7 @@ void ProcessSession::restore(const std::string &key, const std::shared_ptr<core:
if (flow->getResourceClaim()) {
logger_->log_warn("Restoring stashed content of record %s from key %s when there is "
"existing content; existing content will be overwritten",
- flow->getUUIDStr().c_str(), key.c_str());
+ flow->getUUIDStr(), key);
flow->releaseClaim(flow->getResourceClaim());
}
@@ -768,7 +771,7 @@ void ProcessSession::commit() {
_originalFlowFiles.clear();
// persistent the provenance report
this->provenance_report_->commit();
- logger_->log_trace("ProcessSession committed for %s", process_context_->getProcessorNode()->getName().c_str());
+ logger_->log_trace("ProcessSession committed for %s", process_context_->getProcessorNode()->getName());
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
throw;
@@ -797,7 +800,7 @@ void ProcessSession::rollback() {
_addedFlowFiles.clear();
_updatedFlowFiles.clear();
_deletedFlowFiles.clear();
- logger_->log_debug("ProcessSession rollback for %s", process_context_->getProcessorNode()->getName().c_str());
+ logger_->log_debug("ProcessSession rollback for %s", process_context_->getProcessorNode()->getName());
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
throw;
@@ -835,7 +838,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
_updatedFlowFiles[ret->getUUIDStr()] = ret;
std::map<std::string, std::string> empty;
std::shared_ptr<core::FlowFile> snapshot = std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), process_context_->getContentRepository(), empty);
- logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr().c_str());
+ logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr());
snapshot = ret;
// save a snapshot
_originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/core/ProcessSessionReadCallback.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSessionReadCallback.cpp b/libminifi/src/core/ProcessSessionReadCallback.cpp
index bc72e9f..65fd26a 100644
--- a/libminifi/src/core/ProcessSessionReadCallback.cpp
+++ b/libminifi/src/core/ProcessSessionReadCallback.cpp
@@ -64,19 +64,19 @@ int64_t ProcessSessionReadCallback::process(std::shared_ptr<io::BaseStream> stre
bool ProcessSessionReadCallback::commit() {
bool success = false;
- logger_->log_info("committing export operation to %s", _destFile.c_str());
+ logger_->log_debug("committing export operation to %s", _destFile);
if (_writeSucceeded) {
_tmpFileOs.close();
if (rename(_tmpFile.c_str(), _destFile.c_str())) {
- logger_->log_info("commit export operation to %s failed because rename() call failed", _destFile.c_str());
+ logger_->log_warn("commit export operation to %s failed because rename() call failed", _destFile);
} else {
success = true;
- logger_->log_info("commit export operation to %s succeeded", _destFile.c_str());
+ logger_->log_debug("commit export operation to %s succeeded", _destFile);
}
} else {
- logger_->log_error("commit export operation to %s failed because write failed", _destFile.c_str());
+ logger_->log_error("commit export operation to %s failed because write failed", _destFile);
}
return success;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/core/Processor.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 2bceac5..7adf718 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -62,7 +62,7 @@ Processor::Processor(std::string name, uuid_t uuid)
active_tasks_ = 0;
yield_expiration_ = 0;
incoming_connections_Iter = this->_incomingConnections.begin();
- logger_->log_info("Processor %s created UUID %s", name_, uuidStr_);
+ logger_->log_debug("Processor %s created UUID %s", name_, uuidStr_);
}
bool Processor::isRunning() {
@@ -80,7 +80,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
bool ret = false;
if (isRunning()) {
- logger_->log_info("Can not add connection while the process %s is running", name_.c_str());
+ logger_->log_warn("Can not add connection while the process %s is running", name_);
return false;
}
std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
@@ -102,7 +102,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
if (_incomingConnections.find(connection) == _incomingConnections.end()) {
_incomingConnections.insert(connection);
connection->setDestination(shared_from_this());
- logger_->log_info("Add connection %s into Processor %s incoming connection", connection->getName().c_str(), name_.c_str());
+ logger_->log_debug("Add connection %s into Processor %s incoming connection", connection->getName(), name_);
incoming_connections_Iter = this->_incomingConnections.begin();
ret = true;
}
@@ -121,7 +121,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
existedConnection.insert(connection);
connection->setSource(shared_from_this());
out_going_connections_[relationship] = existedConnection;
- logger_->log_info("Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName().c_str(), name_.c_str(), relationship.c_str());
+ logger_->log_debug("Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName(), name_, relationship);
ret = true;
}
} else {
@@ -130,7 +130,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
newConnection.insert(connection);
connection->setSource(shared_from_this());
out_going_connections_[relationship] = newConnection;
- logger_->log_info("Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName().c_str(), name_.c_str(), relationship.c_str());
+ logger_->log_debug("Add connection %s into Processor %s outgoing connection for relationship %s", connection->getName(), name_, relationship);
ret = true;
}
}
@@ -140,7 +140,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) {
void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
if (isRunning()) {
- logger_->log_info("Can not remove connection while the process %s is running", name_.c_str());
+ logger_->log_warn("Can not remove connection while the process %s is running", name_);
return;
}
@@ -159,7 +159,7 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
if (_incomingConnections.find(connection) != _incomingConnections.end()) {
_incomingConnections.erase(connection);
connection->setDestination(NULL);
- logger_->log_info("Remove connection %s into Processor %s incoming connection", connection->getName().c_str(), name_.c_str());
+ logger_->log_debug("Remove connection %s into Processor %s incoming connection", connection->getName(), name_);
incoming_connections_Iter = this->_incomingConnections.begin();
}
}
@@ -174,7 +174,7 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) {
if (out_going_connections_[relationship].find(connection) != out_going_connections_[relationship].end()) {
out_going_connections_[relationship].erase(connection);
connection->setSource(NULL);
- logger_->log_info("Remove connection %s into Processor %s outgoing connection for relationship %s", connection->getName().c_str(), name_.c_str(), relationship.c_str());
+ logger_->log_debug("Remove connection %s into Processor %s outgoing connection for relationship %s", connection->getName(), name_, relationship);
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/core/Repository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp
index 575f694..1432d9e 100644
--- a/libminifi/src/core/Repository.cpp
+++ b/libminifi/src/core/Repository.cpp
@@ -40,7 +40,7 @@ void Repository::start() {
return;
running_ = true;
thread_ = std::thread(&Repository::threadExecutor, this);
- logger_->log_info("%s Repository Monitor Thread Start", name_.c_str());
+ logger_->log_debug("%s Repository Monitor Thread Start", name_);
}
void Repository::stop() {
@@ -49,7 +49,7 @@ void Repository::stop() {
running_ = false;
if (thread_.joinable())
thread_.join();
- logger_->log_info("%s Repository Monitor Thread Stop", name_.c_str());
+ logger_->log_debug("%s Repository Monitor Thread Stop", name_);
}
// repoSize
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/core/repository/VolatileContentRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
index 7c9aad9..60f538d 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -83,7 +83,7 @@ void VolatileContentRepository::start() {
thread_ = std::thread(&VolatileContentRepository::run, shared_from_parent<VolatileContentRepository>());
thread_.detach();
running_ = true;
- logger_->log_info("%s Repository Monitor Thread Start", getName());
+ logger_->log_debug("%s Repository Monitor Thread Start", getName());
}
std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 5b7e2ff..8b93c51 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -309,8 +309,6 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
if (inputPorts && inputPorts.IsSequence()) {
for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
- logger_->log_debug("Got a current port, iterating...");
-
YAML::Node currPort = portIter->as<YAML::Node>();
this->parsePortYaml(&currPort, group, sitetosite::SEND);
@@ -688,7 +686,7 @@ void YamlConfiguration::parsePropertiesNodeYaml(YAML::Node *propertiesNode,
YAML::Node propertiesNode = nodeVal["value"];
// must insert the sequence in differently.
std::string rawValueString = propertiesNode.as<std::string>();
- logger_->log_info("Found %s=%s", propertyName, rawValueString);
+ logger_->log_debug("Found %s=%s", propertyName, rawValueString);
if (!processor->updateProperty(propertyName, rawValueString)) {
std::shared_ptr<core::Connectable> proc = std::dynamic_pointer_cast<core::Connectable>(processor);
if (proc != 0) {
@@ -757,7 +755,8 @@ void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode,
errMsg += " [in '" + yamlSection + "' section of configuration file]";
}
}
- logger_->log_error(errMsg.c_str());
+ logging::LOG_ERROR(logger_) << errMsg;
+
throw std::invalid_argument(errMsg);
}
}
@@ -783,7 +782,7 @@ YAML::Node YamlConfiguration::getOptionalField(YAML::Node *yamlNode,
infoMessage += defaultValue.as<std::string>();
}
- logger_->log_info(infoMessage.c_str());
+ logging::LOG_ERROR(logger_) << infoMessage;
result = defaultValue;
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/io/FileStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index e903d95..78916d6 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -142,9 +142,7 @@ int FileStream::readData(uint8_t *buf, int buflen) {
size_t ret = len - offset_;
offset_ = len;
length_ = len;
- std::stringstream str;
- str << path_ << " eof bit, ended at " << offset_;
- logger_->log_info(str.str().c_str());
+ logging::LOG_DEBUG(logger_) << path_ << " eof bit, ended at " << offset_;
return ret;
} else {
offset_ += buflen;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/io/tls/TLSSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp
index 40cf1e9..d51fa76 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -94,7 +94,7 @@ int16_t TLSContext::initialize() {
int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(), SSL_FILETYPE_PEM);
if (retp != 1) {
- logger_->log_error("Could not create load private key,%i on %s error : %s", retp, privatekey.c_str(), std::strerror(errno));
+ logger_->log_error("Could not create load private key,%i on %s error : %s", retp, privatekey, std::strerror(errno));
error_value = TLS_ERROR_KEY_ERROR;
return error_value;
}
@@ -114,7 +114,7 @@ int16_t TLSContext::initialize() {
}
}
- logger_->log_info("Load/Verify Client Certificate OK.");
+ logger_->log_debug("Load/Verify Client Certificate OK.");
}
return 0;
}
@@ -159,13 +159,13 @@ int16_t TLSSocket::initialize() {
ssl = SSL_new(context_->getContext());
SSL_set_fd(ssl, socket_file_descriptor_);
if (SSL_connect(ssl) == -1) {
- logger_->log_error("SSL socket connect failed to %s %d", requested_hostname_.c_str(), port_);
+ logger_->log_error("SSL socket connect failed to %s %d", requested_hostname_, port_);
SSL_free(ssl);
ssl = NULL;
close(socket_file_descriptor_);
return -1;
} else {
- logger_->log_info("SSL socket connect success to %s %d", requested_hostname_.c_str(), port_);
+ logger_->log_debug("SSL socket connect success to %s %d", requested_hostname_, port_);
return 0;
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/processors/ExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ExecuteProcess.cpp b/libminifi/src/processors/ExecuteProcess.cpp
index 13d52c8..f88f5c7 100644
--- a/libminifi/src/processors/ExecuteProcess.cpp
+++ b/libminifi/src/processors/ExecuteProcess.cpp
@@ -85,7 +85,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
if (context->getProperty(BatchDuration.getName(), value)) {
core::TimeUnit unit;
if (core::Property::StringToTime(value, _batchDuration, unit) && core::Property::ConvertTimeUnitToMS(_batchDuration, unit, _batchDuration)) {
- logger_->log_info("Setting _batchDuration");
+ logger_->log_debug("Setting _batchDuration");
}
}
if (context->getProperty(RedirectErrorStream.getName(), value)) {
@@ -99,12 +99,12 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
if (_workingDir.length() > 0 && _workingDir != ".") {
// change to working directory
if (chdir(_workingDir.c_str()) != 0) {
- logger_->log_error("Execute Command can not chdir %s", _workingDir.c_str());
+ logger_->log_error("Execute Command can not chdir %s", _workingDir);
yield();
return;
}
}
- logger_->log_info("Execute Command %s", _fullCommand.c_str());
+ logger_->log_info("Execute Command %s", _fullCommand);
// split the command into array
char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
int argc = 0;
@@ -153,13 +153,13 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
int numRead = read(_pipefd[0], buffer, sizeof(buffer));
if (numRead <= 0)
break;
- logger_->log_info("Execute Command Respond %d", numRead);
+ logger_->log_debug("Execute Command Respond %d", numRead);
ExecuteProcess::WriteCallback callback(buffer, numRead);
std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
if (!flowFile)
continue;
- flowFile->addAttribute("command", _command.c_str());
- flowFile->addAttribute("command.arguments", _commandArgument.c_str());
+ flowFile->addAttribute("command", _command);
+ flowFile->addAttribute("command.arguments", _commandArgument);
session->write(flowFile, &callback);
session->transfer(flowFile, Success);
session->commit();
@@ -173,15 +173,15 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
int numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
if (numRead <= 0) {
if (totalRead > 0) {
- logger_->log_info("Execute Command Respond %d", totalRead);
+ logger_->log_debug("Execute Command Respond %d", totalRead);
// child exits and close the pipe
ExecuteProcess::WriteCallback callback(buffer, totalRead);
if (!flowFile) {
flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
if (!flowFile)
break;
- flowFile->addAttribute("command", _command.c_str());
- flowFile->addAttribute("command.arguments", _commandArgument.c_str());
+ flowFile->addAttribute("command", _command);
+ flowFile->addAttribute("command.arguments", _commandArgument);
session->write(flowFile, &callback);
} else {
session->append(flowFile, &callback);
@@ -192,14 +192,14 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
} else {
if (numRead == static_cast<int>((sizeof(buffer) - totalRead))) {
// we reach the max buffer size
- logger_->log_info("Execute Command Max Respond %d", sizeof(buffer));
+ logger_->log_debug("Execute Command Max Respond %d", sizeof(buffer));
ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
if (!flowFile) {
flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
if (!flowFile)
continue;
- flowFile->addAttribute("command", _command.c_str());
- flowFile->addAttribute("command.arguments", _commandArgument.c_str());
+ flowFile->addAttribute("command", _command);
+ flowFile->addAttribute("command.arguments", _commandArgument);
session->write(flowFile, &callback);
} else {
session->append(flowFile, &callback);
@@ -217,9 +217,9 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
wait(&status);
if (WIFEXITED(status)) {
- logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WEXITSTATUS(status), _pid);
+ logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WEXITSTATUS(status), _pid);
} else {
- logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WTERMSIG(status), _pid);
+ logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WTERMSIG(status), _pid);
}
close(_pipefd[0]);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/processors/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp
index 0472c55..5d80b42 100644
--- a/libminifi/src/processors/GetFile.cpp
+++ b/libminifi/src/processors/GetFile.cpp
@@ -136,14 +136,14 @@ void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
metrics_->iterations_++;
- logger_->log_info("Is listing empty %i", isListingEmpty());
+ logger_->log_debug("Is listing empty %i", isListingEmpty());
if (isListingEmpty()) {
if (request_.pollInterval == 0 || (getTimeMillis() - last_listing_time_) > request_.pollInterval) {
performListing(request_.directory, request_);
last_listing_time_.store(getTimeMillis());
}
}
- logger_->log_info("Is listing empty %i", isListingEmpty());
+ logger_->log_debug("Is listing empty %i", isListingEmpty());
if (!isListingEmpty()) {
try {
@@ -152,7 +152,7 @@ void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
while (!list.empty()) {
std::string fileName = list.front();
list.pop();
- logger_->log_info("GetFile process %s", fileName.c_str());
+ logger_->log_info("GetFile process %s", fileName);
std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
if (flowFile == nullptr)
return;
@@ -241,13 +241,12 @@ bool GetFile::acceptFile(std::string fullName, std::string name, const GetFileRe
}
void GetFile::performListing(std::string dir, const GetFileRequest &request) {
- logger_->log_info("Performing file listing against %s", dir.c_str());
DIR *d;
d = opendir(dir.c_str());
if (!d)
return;
// only perform a listing while we are not empty
- logger_->log_info("Performing file listing against %s", dir.c_str());
+ logger_->log_debug("Performing file listing against %s", dir);
while (isRunning()) {
struct dirent *entry;
entry = readdir(d);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/processors/GetTCP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetTCP.cpp b/libminifi/src/processors/GetTCP.cpp
index bfb9a3c..3bf15b2 100644
--- a/libminifi/src/processors/GetTCP.cpp
+++ b/libminifi/src/processors/GetTCP.cpp
@@ -168,12 +168,12 @@ void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co
}
}
if (startLoc > 0) {
- logger_->log_info("Starting at %i, ending at %i", startLoc, size_read);
+ logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
if (size_read-startLoc > 0) {
handler_->handle(socket_ptr->getHostname(), buffer.data()+startLoc, (size_read-startLoc), true);
}
} else {
- logger_->log_info("Handling at %i, ending at %i", startLoc, size_read);
+ logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
if (size_read > 0) {
handler_->handle(socket_ptr->getHostname(), buffer.data(), size_read, false);
}
@@ -202,7 +202,7 @@ void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co
return -1;
}
}while (running_);
- logger_->log_info("Ending private thread");
+ logger_->log_debug("Ending private thread");
return 0;
};
@@ -268,12 +268,12 @@ void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, con
live_clients_[endpoint] = future;
}
} else {
- logger_->log_info("Thread still running for %s", endPointFuture->first);
+ logger_->log_debug("Thread still running for %s", endPointFuture->first);
// we have a thread corresponding to this.
}
}
}
- logger_->log_info("Updating endpoint");
+ logger_->log_debug("Updating endpoint");
context->yield();
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/processors/ListenSyslog.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenSyslog.cpp b/libminifi/src/processors/ListenSyslog.cpp
index a71b69f..a3236a6 100644
--- a/libminifi/src/processors/ListenSyslog.cpp
+++ b/libminifi/src/processors/ListenSyslog.cpp
@@ -71,7 +71,7 @@ void ListenSyslog::startSocketThread() {
if (_thread != NULL)
return;
- logger_->log_info("ListenSysLog Socket Thread Start");
+ logger_->log_trace("ListenSysLog Socket Thread Start");
_serverTheadRunning = true;
_thread = new std::thread(run, this);
_thread->detach();
@@ -107,7 +107,7 @@ void ListenSyslog::runThread() {
else
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) {
- logger_->log_info("ListenSysLog Server socket creation failed");
+ logger_->log_error("ListenSysLog Server socket creation failed");
break;
}
bzero(reinterpret_cast<char *>(&serv_addr), sizeof(serv_addr));
@@ -180,7 +180,7 @@ void ListenSyslog::runThread() {
int recvlen = readline(clientSocket, _buffer, sizeof(_buffer));
if (recvlen <= 0) {
close(clientSocket);
- logger_->log_info("ListenSysLog client socket %d close", clientSocket);
+ logger_->log_debug("ListenSysLog client socket %d close", clientSocket);
it = _clientSockets.erase(it);
} else {
if ((uint64_t)(recvlen + getEventQueueByteSize()) <= _recvBufSize) {
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/processors/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/LogAttribute.cpp b/libminifi/src/processors/LogAttribute.cpp
index 0c2f64e..65f45c6 100644
--- a/libminifi/src/processors/LogAttribute.cpp
+++ b/libminifi/src/processors/LogAttribute.cpp
@@ -64,7 +64,7 @@ void LogAttribute::initialize() {
}
void LogAttribute::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
- logger_->log_info("enter log attribute");
+ logger_->log_trace("enter log attribute");
std::string dashLine = "--------------------------------------------------";
LogAttrLevel level = LogAttrLevelInfo;
bool logPayload = false;
@@ -123,19 +123,19 @@ void LogAttribute::onTrigger(core::ProcessContext *context, core::ProcessSession
switch (level) {
case LogAttrLevelInfo:
- logger_->log_info("%s", output.c_str());
+ logger_->log_info("%s", output);
break;
case LogAttrLevelDebug:
- logger_->log_debug("%s", output.c_str());
+ logger_->log_debug("%s", output);
break;
case LogAttrLevelError:
- logger_->log_error("%s", output.c_str());
+ logger_->log_error("%s", output);
break;
case LogAttrLevelTrace:
- logger_->log_trace("%s", output.c_str());
+ logger_->log_trace("%s", output);
break;
case LogAttrLevelWarn:
- logger_->log_warn("%s", output.c_str());
+ logger_->log_warn("%s", output);
break;
default:
break;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/processors/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp
index 9ffbbd7..f073025 100644
--- a/libminifi/src/processors/PutFile.cpp
+++ b/libminifi/src/processors/PutFile.cpp
@@ -123,14 +123,14 @@ void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
flowFile->getKeyedAttribute(FILENAME, filename);
std::string tmpFile = tmpWritePath(filename, directory);
- logger_->log_info("PutFile using temporary file %s", tmpFile.c_str());
+ logger_->log_debug("PutFile using temporary file %s", tmpFile);
// Determine dest full file paths
std::stringstream destFileSs;
destFileSs << directory << "/" << filename;
std::string destFile = destFileSs.str();
- logger_->log_info("PutFile writing file %s into directory %s", filename.c_str(), directory.c_str());
+ logger_->log_debug("PutFile writing file %s into directory %s", filename, directory);
// If file exists, apply conflict resolution strategy
struct stat statResult;
@@ -141,7 +141,7 @@ void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
// it's a directory, count the files
DIR *myDir = opendir(directory.c_str());
if (!myDir) {
- logger_->log_warn("Could not open %s", directory.c_str());
+ logger_->log_warn("Could not open %s", directory);
session->transfer(flowFile, Failure);
return;
}
@@ -153,7 +153,7 @@ void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
ct++;
if (ct >= max_dest_files_) {
logger_->log_warn("Routing to failure because the output directory %s has at least %u files, which exceeds the "
- "configured max number of files", directory.c_str(), max_dest_files_);
+ "configured max number of files", directory, max_dest_files_);
session->transfer(flowFile, Failure);
closedir(myDir);
return;
@@ -165,9 +165,9 @@ void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses
}
if (stat(destFile.c_str(), &statResult) == 0) {
- logger_->log_info("Destination file %s exists; applying Conflict Resolution Strategy: %s",
- destFile.c_str(),
- conflict_resolution_.c_str());
+ logger_->log_warn("Destination file %s exists; applying Conflict Resolution Strategy: %s",
+ destFile,
+ conflict_resolution_);
if (conflict_resolution_ == CONFLICT_RESOLUTION_STRATEGY_REPLACE) {
putFile(session, flowFile, tmpFile, destFile, directory);
@@ -215,7 +215,7 @@ bool PutFile::putFile(core::ProcessSession *session,
// Attempt to create directories in file's path
std::stringstream dir_path_stream;
- logger_->log_warn("Destination directory does not exist; will attempt to create: ", destDir);
+ logger_->log_debug("Destination directory does not exist; will attempt to create: ", destDir);
size_t i = 0;
auto pos = destFile.find('/');
@@ -225,7 +225,7 @@ bool PutFile::putFile(core::ProcessSession *session,
auto dir_path = dir_path_stream.str();
if (!dir_path_component.empty()) {
- logger_->log_info("Attempting to create directory if it does not already exist: %s", dir_path);
+ logger_->log_debug("Attempting to create directory if it does not already exist: %s", dir_path);
mkdir(dir_path.c_str(), 0700);
dir_path_stream << '/';
} else if (pos == 0) {
@@ -241,7 +241,7 @@ bool PutFile::putFile(core::ProcessSession *session,
ReadCallback cb(tmpFile, destFile);
session->read(flowFile, &cb);
- logger_->log_info("Committing %s", destFile);
+ logger_->log_debug("Committing %s", destFile);
if (cb.commit()) {
session->transfer(flowFile, Success);
return true;
@@ -296,18 +296,18 @@ int64_t PutFile::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
bool PutFile::ReadCallback::commit() {
bool success = false;
- logger_->log_info("PutFile committing put file operation to %s", dest_file_.c_str());
+ logger_->log_info("PutFile committing put file operation to %s", dest_file_);
if (write_succeeded_) {
if (rename(tmp_file_.c_str(), dest_file_.c_str())) {
logger_->log_info("PutFile commit put file operation to %s failed because rename() call failed",
- dest_file_.c_str());
+ dest_file_);
} else {
success = true;
- logger_->log_info("PutFile commit put file operation to %s succeeded", dest_file_.c_str());
+ logger_->log_info("PutFile commit put file operation to %s succeeded", dest_file_);
}
} else {
- logger_->log_error("PutFile commit put file operation to %s failed because write failed", dest_file_.c_str());
+ logger_->log_error("PutFile commit put file operation to %s failed because write failed", dest_file_);
}
return success;
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/processors/TailFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp
index 490efae..61aa86b 100644
--- a/libminifi/src/processors/TailFile.cpp
+++ b/libminifi/src/processors/TailFile.cpp
@@ -135,7 +135,7 @@ void TailFile::parseStateFileLine(char *buf) {
void TailFile::recoverState() {
std::ifstream file(_stateFile.c_str(), std::ifstream::in);
if (!file.good()) {
- logger_->log_error("load state file failed %s", _stateFile.c_str());
+ logger_->log_error("load state file failed %s", _stateFile);
return;
}
char buf[BUFFER_SIZE];
@@ -147,7 +147,7 @@ void TailFile::recoverState() {
void TailFile::storeState() {
std::ofstream file(_stateFile.c_str());
if (!file.is_open()) {
- logger_->log_error("store state file failed %s", _stateFile.c_str());
+ logger_->log_error("store state file failed %s", _stateFile);
return;
}
file << "FILENAME=" << this->_currentTailFileName << "\n";
@@ -206,7 +206,7 @@ void TailFile::checkRollOver(const std::string &fileLocation, const std::string
++it;
if (it != matchedFiles.end()) {
TailMatchedFileItem nextItem = *it;
- logger_->log_info("TailFile File Roll Over from %s to %s", _currentTailFileName.c_str(), nextItem.fileName.c_str());
+ logger_->log_info("TailFile File Roll Over from %s to %s", _currentTailFileName, nextItem.fileName);
_currentTailFileName = nextItem.fileName;
_currentTailFilePosition = 0;
storeState();
@@ -286,7 +286,7 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se
}
} else {
- logger_->log_warn("Unable to stat file %s", fullPath.c_str());
+ logger_->log_warn("Unable to stat file %s", fullPath);
}
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/233d1d44/libminifi/src/sitetosite/RawSocketProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index df063ff..2bccb0b 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -99,7 +99,7 @@ bool RawSiteToSiteClient::establish() {
return false;
}
- logger_->log_info("Site2Site socket established");
+ logger_->log_debug("Site2Site socket established");
peer_state_ = ESTABLISHED;
return true;
@@ -112,11 +112,11 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
return false;
}
- logger_->log_info("Negotiate protocol version with destination port %s current version %d", port_id_str_, _currentVersion);
+ logger_->log_debug("Negotiate protocol version with destination port %s current version %d", port_id_str_, _currentVersion);
int ret = peer_->writeUTF(getResourceName());
- logger_->log_info("result of writing resource name is %i", ret);
+ logger_->log_trace("result of writing resource name is %i", ret);
if (ret <= 0) {
logger_->log_debug("result of writing resource name is %i", ret);
// tearDown();
@@ -126,7 +126,7 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
ret = peer_->write(_currentVersion);
if (ret <= 0) {
- logger_->log_info("result of writing version is %i", ret);
+ logger_->log_debug("result of writing version is %i", ret);
return false;
}
@@ -134,13 +134,13 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
ret = peer_->read(statusCode);
if (ret <= 0) {
- logger_->log_info("result of writing version status code %i", ret);
+ logger_->log_debug("result of writing version status code %i", ret);
return false;
}
- logger_->log_info("status code is %i", statusCode);
+ logger_->log_debug("status code is %i", statusCode);
switch (statusCode) {
case RESOURCE_OK:
- logger_->log_info("Site2Site Protocol Negotiate protocol version OK");
+ logger_->log_debug("Site2Site Protocol Negotiate protocol version OK");
return true;
case DIFFERENT_RESOURCE_VERSION:
uint32_t serverVersion;
@@ -161,11 +161,11 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
ret = -1;
return false;
case NEGOTIATED_ABORT:
- logger_->log_info("Site2Site Negotiate protocol response ABORT");
+ logger_->log_warn("Site2Site Negotiate protocol response ABORT");
ret = -1;
return false;
default:
- logger_->log_info("Negotiate protocol response unknown code %d", statusCode);
+ logger_->log_warn("Negotiate protocol response unknown code %d", statusCode);
return true;
}
@@ -179,7 +179,7 @@ bool RawSiteToSiteClient::initiateCodecResourceNegotiation() {
return false;
}
- logger_->log_info("Negotiate Codec version with destination port %s current version %d", port_id_str_, _currentCodecVersion);
+ logger_->log_trace("Negotiate Codec version with destination port %s current version %d", port_id_str_, _currentCodecVersion);
int ret = peer_->writeUTF(getCodecResourceName());
@@ -204,7 +204,7 @@ bool RawSiteToSiteClient::initiateCodecResourceNegotiation() {
switch (statusCode) {
case RESOURCE_OK:
- logger_->log_info("Site2Site Codec Negotiate version OK");
+ logger_->log_trace("Site2Site Codec Negotiate version OK");
return true;
case DIFFERENT_RESOURCE_VERSION:
uint32_t serverVersion;
@@ -224,11 +224,11 @@ bool RawSiteToSiteClient::initiateCodecResourceNegotiation() {
ret = -1;
return false;
case NEGOTIATED_ABORT:
- logger_->log_info("Site2Site Codec Negotiate response ABORT");
+ logger_->log_error("Site2Site Codec Negotiate response ABORT");
ret = -1;
return false;
default:
- logger_->log_info("Negotiate Codec response unknown code %d", statusCode);
+ logger_->log_error("Negotiate Codec response unknown code %d", statusCode);
return true;
}
@@ -240,7 +240,7 @@ bool RawSiteToSiteClient::handShake() {
logger_->log_error("Site2Site peer state is not established while handshake");
return false;
}
- logger_->log_info("Site2Site Protocol Perform hand shake with destination port %s", port_id_str_);
+ logger_->log_debug("Site2Site Protocol Perform hand shake with destination port %s", port_id_str_);
uuid_t uuid;
// Generate the global UUID for the com identify
id_generator_->generate(uuid);
@@ -290,7 +290,7 @@ bool RawSiteToSiteClient::handShake() {
if (ret <= 0) {
return false;
}
- logger_->log_info("Site2Site Protocol Send handshake properties %s %s", it->first.c_str(), it->second.c_str());
+ logger_->log_debug("Site2Site Protocol Send handshake properties %s %s", it->first, it->second);
}
RespondCode code;
@@ -304,7 +304,7 @@ bool RawSiteToSiteClient::handShake() {
switch (code) {
case PROPERTIES_OK:
- logger_->log_info("Site2Site HandShake Completed");
+ logger_->log_debug("Site2Site HandShake Completed");
peer_state_ = HANDSHAKED;
return true;
case PORT_NOT_IN_VALID_STATE:
@@ -314,7 +314,7 @@ bool RawSiteToSiteClient::handShake() {
ret = -1;
return false;
default:
- logger_->log_info("HandShake Failed because of unknown respond code %d", code);
+ logger_->log_error("HandShake Failed because of unknown respond code %d", code);
ret = -1;
return false;
}
@@ -324,7 +324,7 @@ bool RawSiteToSiteClient::handShake() {
void RawSiteToSiteClient::tearDown() {
if (peer_state_ >= ESTABLISHED) {
- logger_->log_info("Site2Site Protocol tearDown");
+ logger_->log_trace("Site2Site Protocol tearDown");
// need to write shutdown request
writeRequestType(SHUTDOWN);
}
@@ -378,9 +378,7 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
}
PeerStatus status(std::make_shared<Peer>(port_id_, host, port, secure), count, true);
peers.push_back(std::move(status));
- std::stringstream str;
- str << "Site2Site Peer host " << host << " port " << port << " Secure " << secure;
- logger_->log_info(str.str().c_str());
+ logging::LOG_TRACE(logger_) << "Site2Site Peer host " << host << " port " << port << " Secure " << secure;
}
tearDown();
@@ -490,7 +488,7 @@ bool RawSiteToSiteClient::negotiateCodec() {
return false;
}
- logger_->log_info("Site2Site Protocol Negotiate Codec with destination port %s", port_id_str_);
+ logger_->log_trace("Site2Site Protocol Negotiate Codec with destination port %s", port_id_str_);
int status = writeRequestType(NEGOTIATE_FLOWFILE_CODEC);
@@ -506,7 +504,7 @@ bool RawSiteToSiteClient::negotiateCodec() {
return false;
}
- logger_->log_info("Site2Site Codec Completed and move to READY state for data transfer");
+ logger_->log_trace("Site2Site Codec Completed and move to READY state for data transfer");
peer_state_ = READY;
return true;
@@ -519,7 +517,7 @@ bool RawSiteToSiteClient::bootstrap() {
tearDown();
if (establish() && handShake() && negotiateCodec()) {
- logger_->log_info("Site2Site Ready For data transaction");
+ logger_->log_debug("Site to Site ready for data transaction");
return true;
} else {
peer_->yield();
@@ -561,24 +559,24 @@ std::shared_ptr<Transaction> RawSiteToSiteClient::createTransaction(std::string
switch (code) {
case MORE_DATA:
dataAvailable = true;
- logger_->log_info("Site2Site peer indicates that data is available");
+ logger_->log_trace("Site2Site peer indicates that data is available");
transaction = std::make_shared<Transaction>(direction, crcstream);
known_transactions_[transaction->getUUIDStr()] = transaction;
transactionID = transaction->getUUIDStr();
transaction->setDataAvailable(dataAvailable);
- logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str());
+ logger_->log_trace("Site2Site create transaction %s", transaction->getUUIDStr());
return transaction;
case NO_MORE_DATA:
dataAvailable = false;
- logger_->log_info("Site2Site peer indicates that no data is available");
+ logger_->log_trace("Site2Site peer indicates that no data is available");
transaction = std::make_shared<Transaction>(direction, crcstream);
known_transactions_[transaction->getUUIDStr()] = transaction;
transactionID = transaction->getUUIDStr();
transaction->setDataAvailable(dataAvailable);
- logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str());
+ logger_->log_trace("Site2Site create transaction %s", transaction->getUUIDStr());
return transaction;
default:
- logger_->log_info("Site2Site got unexpected response %d when asking for data", code);
+ logger_->log_warn("Site2Site got unexpected response %d when asking for data", code);
return NULL;
}
} else {
@@ -591,7 +589,7 @@ std::shared_ptr<Transaction> RawSiteToSiteClient::createTransaction(std::string
transaction = std::make_shared<Transaction>(direction, crcstream);
known_transactions_[transaction->getUUIDStr()] = transaction;
transactionID = transaction->getUUIDStr();
- logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str());
+ logger_->log_trace("Site2Site create transaction %s", transaction->getUUIDStr());
return transaction;
}
}