You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by lo...@apache.org on 2022/01/17 15:09:28 UTC
[nifi-minifi-cpp] 03/03: MINIFICPP-1688: When storing time durations we should use std::chrono instead of simple integers
This is an automated email from the ASF dual-hosted git repository.
lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 59b204f9e44dc00106654a1e02156da67465672b
Author: Martin Zink <ma...@apache.org>
AuthorDate: Mon Dec 6 14:48:06 2021 +0100
MINIFICPP-1688: When storing time durations we should use std::chrono instead of simple integers
Co-authored-by: Márton Szász <sz...@gmail.com>
Signed-off-by: Gabor Gyimesi <ga...@gmail.com>
This closes #1225
---
extensions/aws/processors/ListS3.cpp | 7 +-
extensions/aws/processors/S3Processor.cpp | 7 +-
extensions/expression-language/Expression.cpp | 7 +-
extensions/http-curl/processors/InvokeHTTP.cpp | 14 +--
extensions/http-curl/tests/C2PauseResumeTest.cpp | 2 +-
extensions/http-curl/tests/C2UpdateTest.cpp | 4 +-
extensions/jni/jvm/JniFlowFile.cpp | 6 +-
extensions/libarchive/BinFiles.cpp | 20 ++--
extensions/libarchive/BinFiles.h | 17 ++--
extensions/librdkafka/ConsumeKafka.cpp | 11 +-
extensions/librdkafka/PublishKafka.cpp | 60 +++++------
.../controllerservice/MQTTControllerService.cpp | 4 +-
.../mqtt/controllerservice/MQTTControllerService.h | 12 +--
.../mqtt/processors/AbstractMQTTProcessor.cpp | 28 ++---
extensions/mqtt/processors/AbstractMQTTProcessor.h | 10 +-
.../SourceInitiatedSubscriptionListener.cpp | 41 +++-----
.../SourceInitiatedSubscriptionListener.h | 10 +-
extensions/rocksdb-repos/FlowFileRepository.cpp | 9 +-
extensions/rocksdb-repos/FlowFileRepository.h | 25 ++---
extensions/rocksdb-repos/ProvenanceRepository.h | 21 ++--
extensions/sftp/client/SFTPClient.cpp | 9 +-
extensions/sftp/client/SFTPClient.h | 6 +-
extensions/sftp/processors/ListSFTP.cpp | 59 +++++------
extensions/sftp/processors/ListSFTP.h | 8 +-
extensions/sftp/processors/SFTPProcessorBase.cpp | 20 ++--
extensions/sftp/processors/SFTPProcessorBase.h | 4 +-
extensions/splunk/QuerySplunkIndexingStatus.cpp | 8 +-
.../processors/DefragmentText.cpp | 24 ++---
.../processors/DefragmentText.h | 2 +-
.../processors/ExecuteProcess.cpp | 16 +--
.../processors/ExecuteProcess.h | 3 +-
.../standard-processors/processors/GetFile.cpp | 20 ++--
.../standard-processors/processors/GetFile.h | 11 +-
.../standard-processors/processors/GetTCP.cpp | 17 ++--
extensions/standard-processors/processors/GetTCP.h | 3 +-
.../processors/LogAttribute.cpp | 4 +-
.../tests/unit/GetFileTests.cpp | 7 +-
.../tests/unit/TailFileTests.cpp | 24 +++--
.../tests/unit/YamlConfigurationTests.cpp | 18 ++--
.../tests/unit/YamlConnectionParserTest.cpp | 8 +-
extensions/usb-camera/GetUSBCamera.cpp | 4 +-
extensions/usb-camera/GetUSBCamera.h | 2 +-
.../CollectorInitiatedSubscription.cpp | 4 +-
.../windows-event-log/ConsumeWindowsEventLog.cpp | 4 +-
libminifi/include/Connection.h | 18 ++--
libminifi/include/FlowControlProtocol.h | 4 +-
libminifi/include/RemoteProcessorGroupPort.h | 4 +-
libminifi/include/SchedulingAgent.h | 8 +-
libminifi/include/c2/C2Agent.h | 4 +-
.../AbstractAutoPersistingKeyValueStoreService.h | 2 +-
libminifi/include/core/ConfigurableComponent.h | 8 +-
libminifi/include/core/FlowFile.h | 18 ++--
libminifi/include/core/ProcessContext.h | 4 +-
libminifi/include/core/ProcessGroup.h | 18 ++--
libminifi/include/core/Processor.h | 65 ++++--------
libminifi/include/core/ProcessorConfig.h | 6 +-
libminifi/include/core/Property.h | 93 -----------------
libminifi/include/core/PropertyValidation.h | 5 +-
libminifi/include/core/Repository.h | 18 ++--
libminifi/include/core/TypedValues.h | 51 +++-------
.../core/repository/VolatileFlowFileRepository.h | 10 +-
.../core/repository/VolatileProvenanceRepository.h | 8 +-
.../include/core/repository/VolatileRepository.h | 12 ++-
.../include/core/state/nodes/SchedulingNodes.h | 6 +-
libminifi/include/core/yaml/YamlConnectionParser.h | 2 +-
libminifi/include/provenance/Provenance.h | 34 +++----
libminifi/include/sitetosite/Peer.h | 66 ++++++------
libminifi/include/sitetosite/RawSocketProtocol.h | 24 ++---
libminifi/include/sitetosite/SiteToSiteClient.h | 2 +-
libminifi/include/utils/StringUtils.h | 13 ---
libminifi/include/utils/ThreadPool.h | 4 +-
libminifi/include/utils/TimeUtil.h | 113 +++++++++++++++++++--
libminifi/include/utils/ValueParser.h | 59 -----------
libminifi/src/Connection.cpp | 40 +-------
libminifi/src/CronDrivenSchedulingAgent.cpp | 8 +-
libminifi/src/DiskSpaceWatchdog.cpp | 10 +-
libminifi/src/EventDrivenSchedulingAgent.cpp | 5 +-
libminifi/src/FlowFileRecord.cpp | 21 ++--
libminifi/src/RemoteProcessorGroupPort.cpp | 15 +--
libminifi/src/ThreadedSchedulingAgent.cpp | 11 +-
libminifi/src/TimerDrivenSchedulingAgent.cpp | 11 +-
libminifi/src/c2/C2Agent.cpp | 21 ++--
.../AbstractAutoPersistingKeyValueStoreService.cpp | 17 ++--
libminifi/src/core/FlowFile.cpp | 13 +--
libminifi/src/core/ProcessGroup.cpp | 6 +-
libminifi/src/core/ProcessSession.cpp | 33 +++---
libminifi/src/core/Processor.cpp | 33 +++++-
libminifi/src/core/Repository.cpp | 7 +-
libminifi/src/core/RepositoryFactory.cpp | 8 +-
.../SiteToSiteProvenanceReportingTask.cpp | 6 +-
.../core/repository/VolatileContentRepository.cpp | 4 +-
libminifi/src/core/yaml/YamlConfiguration.cpp | 65 +++++-------
libminifi/src/core/yaml/YamlConnectionParser.cpp | 16 +--
libminifi/src/provenance/Provenance.cpp | 53 ++++++----
libminifi/src/sitetosite/RawSocketProtocol.cpp | 8 +-
libminifi/src/sitetosite/SiteToSiteClient.cpp | 19 ++--
libminifi/src/utils/ProcessorConfigUtils.cpp | 9 +-
libminifi/test/TestBase.cpp | 8 +-
.../DeleteAzureDataLakeStorageTests.cpp | 2 +-
.../azure-tests/PutAzureDataLakeStorageTests.cpp | 2 +-
libminifi/test/flow-tests/FlowControllerTests.cpp | 4 +-
.../rocksdb-tests/DBProvenanceRepositoryTests.cpp | 12 +--
libminifi/test/rocksdb-tests/ProvenanceTests.cpp | 11 +-
libminifi/test/rocksdb-tests/RepoTests.cpp | 15 +--
libminifi/test/unit/ConnectionTests.cpp | 10 +-
libminifi/test/unit/CpuUsageTest.cpp | 45 ++++----
libminifi/test/unit/FileUtilsTests.cpp | 7 +-
libminifi/test/unit/PropertyTests.cpp | 68 -------------
libminifi/test/unit/PropertyValidationTests.cpp | 27 +++++
libminifi/test/unit/ProvenanceTestHelper.h | 6 +-
libminifi/test/unit/TimeUtilTests.cpp | 57 +++++++++++
libminifi/test/unit/tls/TLSStreamTests.cpp | 2 +-
nanofi/include/sitetosite/CRawSocketProtocol.h | 2 +-
113 files changed, 921 insertions(+), 1075 deletions(-)
diff --git a/extensions/aws/processors/ListS3.cpp b/extensions/aws/processors/ListS3.cpp
index b5cc1a7..0f958eb 100644
--- a/extensions/aws/processors/ListS3.cpp
+++ b/extensions/aws/processors/ListS3.cpp
@@ -115,11 +115,12 @@ void ListS3::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co
context->getProperty(UseVersions.getName(), list_request_params_->use_versions);
logger_->log_debug("ListS3: UseVersions [%s]", list_request_params_->use_versions ? "true" : "false");
- std::string min_obj_age_str;
- if (!context->getProperty(MinimumObjectAge.getName(), min_obj_age_str) || min_obj_age_str.empty() || !core::Property::getTimeMSFromString(min_obj_age_str, list_request_params_->min_object_age)) {
+ if (auto minimum_object_age = context->getProperty<core::TimePeriodValue>(MinimumObjectAge)) {
+ list_request_params_->min_object_age = minimum_object_age->getMilliseconds().count();
+ } else {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Minimum Object Age missing or invalid");
}
- logger_->log_debug("S3Processor: Minimum Object Age [%llud]", min_obj_age_str, list_request_params_->min_object_age);
+ logger_->log_debug("S3Processor: Minimum Object Age [%llud]", list_request_params_->min_object_age);
context->getProperty(WriteObjectTags.getName(), write_object_tags_);
logger_->log_debug("ListS3: WriteObjectTags [%s]", write_object_tags_ ? "true" : "false");
diff --git a/extensions/aws/processors/S3Processor.cpp b/extensions/aws/processors/S3Processor.cpp
index 7ea8005..f143618 100644
--- a/extensions/aws/processors/S3Processor.cpp
+++ b/extensions/aws/processors/S3Processor.cpp
@@ -202,10 +202,9 @@ void S3Processor::onSchedule(const std::shared_ptr<core::ProcessContext>& contex
}
logger_->log_debug("S3Processor: Region [%s]", client_config_.region);
- uint64_t timeout_val;
- if (context->getProperty(CommunicationsTimeout.getName(), value) && !value.empty() && core::Property::getTimeMSFromString(value, timeout_val)) {
- logger_->log_debug("S3Processor: Communications Timeout [%llu]", timeout_val);
- client_config_.connectTimeoutMs = gsl::narrow<int64_t>(timeout_val);
+ if (auto communications_timeout = context->getProperty<core::TimePeriodValue>(CommunicationsTimeout)) {
+ logger_->log_debug("S3Processor: Communications Timeout %" PRId64 " ms", communications_timeout->getMilliseconds().count());
+ client_config_.connectTimeoutMs = gsl::narrow<int64_t>(communications_timeout->getMilliseconds().count());
} else {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout missing or invalid");
}
diff --git a/extensions/expression-language/Expression.cpp b/extensions/expression-language/Expression.cpp
index 85800f5..6598da6 100644
--- a/extensions/expression-language/Expression.cpp
+++ b/extensions/expression-language/Expression.cpp
@@ -613,7 +613,7 @@ Value expr_escapeCsv(const std::vector<Value> &args) {
Value expr_format(const std::vector<Value> &args) {
std::chrono::milliseconds dur(args[0].asUnsignedLong());
- std::chrono::time_point<std::chrono::system_clock> dt(dur);
+ std::chrono::system_clock::time_point dt(dur);
auto zone = date::current_zone();
if (args.size() > 2) {
zone = date::locate_zone(args[2].asString());
@@ -643,7 +643,7 @@ Value expr_toDate(const std::vector<Value> &args) {
Value expr_format(const std::vector<Value>& args) {
const std::chrono::milliseconds dur(args.at(0).asUnsignedLong());
- const std::chrono::time_point<std::chrono::system_clock> dt(dur);
+ const std::chrono::system_clock::time_point dt(dur);
const auto unix_time = std::chrono::system_clock::to_time_t(dt);
const auto zoned_time = [&args, unix_time] {
std::tm buf{};
@@ -686,8 +686,7 @@ Value expr_toDate(const std::vector<Value>&) {
#endif // EXPRESSION_LANGUAGE_USE_DATE
Value expr_now(const std::vector<Value>& /*args*/) {
- int64_t unix_time_ms{std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count()};
- return Value(unix_time_ms);
+ return Value(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count());
}
Value expr_unescapeCsv(const std::vector<Value> &args) {
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index 79dc785..84b6615 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -182,11 +182,9 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
return;
}
- uint64_t valInt;
- std::string timeoutStr;
- if (context->getProperty(ConnectTimeout.getName(), timeoutStr)
- && core::Property::getTimeMSFromString(timeoutStr, valInt)) {
- connect_timeout_ms_ = std::chrono::milliseconds(valInt);
+
+ if (auto connect_timeout = context->getProperty<core::TimePeriodValue>(ConnectTimeout)) {
+ connect_timeout_ms_ = connect_timeout->getMilliseconds();
} else {
logger_->log_debug("%s attribute is missing, so default value of %s will be used", ConnectTimeout.getName(), ConnectTimeout.getValue());
return;
@@ -197,10 +195,8 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context
content_type_ = contentTypeStr;
}
- timeoutStr.clear();
- if (context->getProperty(ReadTimeout.getName(), timeoutStr)
- && core::Property::getTimeMSFromString(timeoutStr, valInt)) {
- read_timeout_ms_ = std::chrono::milliseconds(valInt);
+ if (auto read_timeout = context->getProperty<core::TimePeriodValue>(ReadTimeout)) {
+ read_timeout_ms_ = read_timeout->getMilliseconds();
} else {
logger_->log_debug("%s attribute is missing, so default value of %s will be used", ReadTimeout.getName(), ReadTimeout.getValue());
}
diff --git a/extensions/http-curl/tests/C2PauseResumeTest.cpp b/extensions/http-curl/tests/C2PauseResumeTest.cpp
index aee973b..fec823b 100644
--- a/extensions/http-curl/tests/C2PauseResumeTest.cpp
+++ b/extensions/http-curl/tests/C2PauseResumeTest.cpp
@@ -102,7 +102,7 @@ class PauseResumeHandler: public HeartbeatHandler {
};
std::atomic<uint32_t> get_invoke_count_{0};
- std::chrono::time_point<std::chrono::system_clock> pause_start_time_;
+ std::chrono::system_clock::time_point pause_start_time_;
std::atomic<FlowState> flow_state_{FlowState::STARTED};
std::atomic_bool& flow_resumed_successfully_;
};
diff --git a/extensions/http-curl/tests/C2UpdateTest.cpp b/extensions/http-curl/tests/C2UpdateTest.cpp
index 58f4f64..8cb17d9 100644
--- a/extensions/http-curl/tests/C2UpdateTest.cpp
+++ b/extensions/http-curl/tests/C2UpdateTest.cpp
@@ -32,11 +32,11 @@ int main(int argc, char **argv) {
harness.setUrl(args.url, &handler);
handler.setC2RestResponse(harness.getC2RestUrl(), "configuration");
- const auto start = std::chrono::system_clock::now();
+ const auto start = std::chrono::steady_clock::now();
harness.run(args.test_file);
- const auto then = std::chrono::system_clock::now();
+ const auto then = std::chrono::steady_clock::now();
const auto seconds = std::chrono::duration_cast<std::chrono::seconds>(then - start).count();
assert(handler.getCallCount() <= gsl::narrow<size_t>(seconds + 1));
return 0;
diff --git a/extensions/jni/jvm/JniFlowFile.cpp b/extensions/jni/jvm/JniFlowFile.cpp
index 7693025..523ec9f 100644
--- a/extensions/jni/jvm/JniFlowFile.cpp
+++ b/extensions/jni/jvm/JniFlowFile.cpp
@@ -53,7 +53,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_nifi_processor_JniFlowFile_getEntryDate(
auto ff = ptr->get();
THROW_IF_NULL(ff, env, NO_FF_OBJECT);
- jlong entryDate = ff->getEntryDate();
+ jlong entryDate = std::chrono::duration_cast<std::chrono::milliseconds>(ff->getEntryDate().time_since_epoch()).count();
return entryDate;
}
JNIEXPORT jlong JNICALL Java_org_apache_nifi_processor_JniFlowFile_getLineageStartDate(JNIEnv *env, jobject obj) {
@@ -61,7 +61,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_nifi_processor_JniFlowFile_getLineageSta
auto ff = ptr->get();
THROW_IF_NULL(ff, env, NO_FF_OBJECT);
- jlong val = ff->getlineageStartDate();
+ jlong val = std::chrono::duration_cast<std::chrono::milliseconds>(ff->getlineageStartDate().time_since_epoch()).count();
return val;
}
JNIEXPORT jlong JNICALL Java_org_apache_nifi_processor_JniFlowFile_getLineageStartIndex(JNIEnv *env, jobject obj) {
@@ -69,7 +69,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_nifi_processor_JniFlowFile_getLineageSta
auto ff = ptr->get();
THROW_IF_NULL(ff, env, NO_FF_OBJECT);
- jlong val = ff->getlineageStartDate();
+ jlong val = std::chrono::duration_cast<std::chrono::milliseconds>(ff->getlineageStartDate().time_since_epoch()).count();
return val;
}
JNIEXPORT jlong JNICALL Java_org_apache_nifi_processor_JniFlowFile_getLastQueueDatePrim(JNIEnv *env, jobject obj) {
diff --git a/extensions/libarchive/BinFiles.cpp b/extensions/libarchive/BinFiles.cpp
index a468159..38d617e 100644
--- a/extensions/libarchive/BinFiles.cpp
+++ b/extensions/libarchive/BinFiles.cpp
@@ -118,13 +118,9 @@ void BinFiles::onSchedule(core::ProcessContext *context, core::ProcessSessionFac
if (context->getProperty(MaxBinCount.getName(), maxBinCount_)) {
logger_->log_debug("BinFiles: MaxBinCount [%" PRIu32 "]", maxBinCount_);
}
- std::string maxBinAgeStr;
- if (context->getProperty(MaxBinAge.getName(), maxBinAgeStr)) {
- core::TimeUnit unit;
- if (core::Property::StringToTime(maxBinAgeStr, val64, unit) && core::Property::ConvertTimeUnitToMS(val64, unit, val64)) {
- this->binManager_.setBinAge(val64);
- logger_->log_debug("BinFiles: MaxBinAge [%" PRIu64 "]", val64);
- }
+ if (auto max_bin_age = context->getProperty<core::TimePeriodValue>(MaxBinAge)) {
+ this->binManager_.setBinAge(max_bin_age->getMilliseconds());
+ logger_->log_debug("BinFiles: MaxBinAge [%" PRId64 "] ms", int64_t{max_bin_age->getMilliseconds().count()});
}
if (context->getProperty(BatchSize.getName(), batchSize_)) {
logger_->log_debug("BinFiles: BatchSize [%" PRIu32 "]", batchSize_);
@@ -152,7 +148,7 @@ void BinManager::gatherReadyBins() {
std::unique_ptr < std::deque<std::unique_ptr<Bin>>>&queue = it->second;
while (!queue->empty()) {
std::unique_ptr<Bin> &bin = queue->front();
- if (bin->isReadyForMerge() || (binAge_ != ULLONG_MAX && bin->isOlderThan(binAge_))) {
+ if (bin->isReadyForMerge() || (binAge_ != std::chrono::milliseconds::max() && bin->isOlderThan(binAge_))) {
readyBin_.push_back(std::move(bin));
queue->pop_front();
binCount_--;
@@ -174,19 +170,19 @@ void BinManager::gatherReadyBins() {
void BinManager::removeOldestBin() {
std::lock_guard < std::mutex > lock(mutex_);
- uint64_t olddate = ULLONG_MAX;
+ std::chrono::system_clock::time_point olddate = std::chrono::system_clock::time_point::max();
std::unique_ptr < std::deque<std::unique_ptr<Bin>>>* oldqueue;
for (std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>>>::iterator it=groupBinMap_.begin(); it !=groupBinMap_.end(); ++it) {
std::unique_ptr < std::deque<std::unique_ptr<Bin>>>&queue = it->second;
if (!queue->empty()) {
std::unique_ptr<Bin> &bin = queue->front();
- if (bin->getBinAge() < olddate) {
- olddate = bin->getBinAge();
+ if (bin->getCreationDate() < olddate) {
+ olddate = bin->getCreationDate();
oldqueue = &queue;
}
}
}
- if (olddate != ULLONG_MAX) {
+ if (olddate != std::chrono::system_clock::time_point::max()) {
std::unique_ptr<Bin> &remove = (*oldqueue)->front();
std::string group = remove->getGroupId();
readyBin_.push_back(std::move(remove));
diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h
index aa8df05..e23e622 100644
--- a/extensions/libarchive/BinFiles.h
+++ b/extensions/libarchive/BinFiles.h
@@ -57,7 +57,7 @@ class Bin {
fileCount_(fileCount),
groupId_(groupId) {
queued_data_size_ = 0;
- creation_dated_ = utils::timeutils::getTimeMillis();
+ creation_dated_ = std::chrono::system_clock::now();
uuid_ = utils::IdGenerator::getIdGenerator()->generate();
logger_->log_debug("Bin %s for group %s created", getUUIDStr(), groupId_);
}
@@ -73,8 +73,8 @@ class Bin {
return isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_);
}
// check whether the bin is older than the time specified in msec
- [[nodiscard]] bool isOlderThan(const uint64_t &duration) const {
- return utils::timeutils::getTimeMillis() > (creation_dated_ + duration);
+ [[nodiscard]] bool isOlderThan(const std::chrono::milliseconds duration) const {
+ return std::chrono::system_clock::now() > (creation_dated_ + duration);
}
std::deque<std::shared_ptr<core::FlowFile>>& getFlowFile() {
return queue_;
@@ -104,7 +104,7 @@ class Bin {
return true;
}
// getBinAge
- [[nodiscard]] uint64_t getBinAge() const {
+ [[nodiscard]] std::chrono::system_clock::time_point getCreationDate() const {
return creation_dated_;
}
[[nodiscard]] int getSize() const {
@@ -128,7 +128,7 @@ class Bin {
uint64_t queued_data_size_;
// Queue for the Flow File
std::deque<std::shared_ptr<core::FlowFile>> queue_;
- uint64_t creation_dated_;
+ std::chrono::system_clock::time_point creation_dated_;
std::string fileCount_;
std::string groupId_;
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<Bin>::getLogger();
@@ -154,8 +154,8 @@ class BinManager {
void setMinEntries(uint32_t entries) {
minEntries_ = {entries};
}
- void setBinAge(uint64_t age) {
- binAge_ = {age};
+ void setBinAge(std::chrono::milliseconds age) {
+ binAge_ = age;
}
[[nodiscard]] int getBinCount() const {
return binCount_;
@@ -184,8 +184,7 @@ class BinManager {
uint32_t maxEntries_{std::numeric_limits<decltype(maxEntries_)>::max()};
uint32_t minEntries_{1};
std::string fileCount_;
- // Bin Age in msec
- uint64_t binAge_{std::numeric_limits<decltype(binAge_)>::max()};
+ std::chrono::milliseconds binAge_{std::chrono::milliseconds::max()};
std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>> >groupBinMap_;
std::deque<std::unique_ptr<Bin>> readyBin_;
int binCount_{0};
diff --git a/extensions/librdkafka/ConsumeKafka.cpp b/extensions/librdkafka/ConsumeKafka.cpp
index eef2fb4..608fa6f 100644
--- a/extensions/librdkafka/ConsumeKafka.cpp
+++ b/extensions/librdkafka/ConsumeKafka.cpp
@@ -25,6 +25,8 @@
#include "utils/ProcessorConfigUtils.h"
#include "utils/gsl.h"
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -40,13 +42,10 @@ class ConsumeKafkaMaxPollTimeValidator : public TimePeriodValidator {
~ConsumeKafkaMaxPollTimeValidator() override = default;
ValidationResult validate(const std::string& subject, const std::string& input) const override {
- uint64_t value;
- TimeUnit timeUnit;
- uint64_t value_as_ms;
+ auto parsed_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(input);
return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(
- core::TimePeriodValue::StringToTime(input, value, timeUnit) &&
- org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(value, timeUnit, value_as_ms) &&
- 0 < value_as_ms && value_as_ms <= 4000).build();
+ parsed_value.has_value() &&
+ 0ms < *parsed_value && *parsed_value <= 4s).build();
}
};
} // namespace core
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index f021bed..b82d22f 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -621,17 +621,14 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessCon
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
- value = "";
- if (context->getProperty(QueueBufferMaxTime.getName(), value) && !value.empty()) {
- core::TimeUnit unit;
- if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
- valueConf = std::to_string(valInt);
- result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.ms", valueConf.c_str(), errstr.data(), errstr.size());
- logger_->log_debug("PublishKafka: queue.buffering.max.ms [%s]", valueConf);
- if (result != RD_KAFKA_CONF_OK) {
- auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
- throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
- }
+
+ if (auto queue_buffer_max_time = context->getProperty<core::TimePeriodValue>(QueueBufferMaxTime)) {
+ valueConf = std::to_string(queue_buffer_max_time->getMilliseconds().count());
+ result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.ms", valueConf.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: queue.buffering.max.ms [%s]", valueConf);
+ if (result != RD_KAFKA_CONF_OK) {
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
@@ -703,7 +700,6 @@ bool PublishKafka::createNewTopic(const std::shared_ptr<core::ProcessContext> &c
rd_kafka_conf_res_t result;
std::string value;
std::array<char, 512U> errstr{};
- int64_t valInt;
std::string valueConf;
value = "";
@@ -730,32 +726,24 @@ bool PublishKafka::createNewTopic(const std::shared_ptr<core::ProcessContext> &c
return false;
}
}
- value = "";
- if (context->getProperty(RequestTimeOut.getName(), value) && !value.empty()) {
- core::TimeUnit unit;
- if (core::Property::StringToTime(value, valInt, unit) &&
- core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
- valueConf = std::to_string(valInt);
- result = rd_kafka_topic_conf_set(topic_conf_.get(), "request.timeout.ms", valueConf.c_str(), errstr.data(), errstr.size());
- logger_->log_debug("PublishKafka: request.timeout.ms [%s]", valueConf);
- if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure request.timeout.ms error result [%s]", errstr.data());
- return false;
- }
+
+ if (auto request_timeout = context->getProperty<core::TimePeriodValue>(RequestTimeOut)) {
+ valueConf = std::to_string(request_timeout->getMilliseconds().count());
+ result = rd_kafka_topic_conf_set(topic_conf_.get(), "request.timeout.ms", valueConf.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: request.timeout.ms [%s]", valueConf);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure request.timeout.ms error result [%s]", errstr.data());
+ return false;
}
}
- value = "";
- if (context->getProperty(MessageTimeOut.getName(), value) && !value.empty()) {
- core::TimeUnit unit;
- if (core::Property::StringToTime(value, valInt, unit) &&
- core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
- valueConf = std::to_string(valInt);
- result = rd_kafka_topic_conf_set(topic_conf_.get(), "message.timeout.ms", valueConf.c_str(), errstr.data(), errstr.size());
- logger_->log_debug("PublishKafka: message.timeout.ms [%s]", valueConf);
- if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure message.timeout.ms error result [%s]", errstr.data());
- return false;
- }
+
+ if (auto message_timeout = context->getProperty<core::TimePeriodValue>(MessageTimeOut)) {
+ valueConf = std::to_string(message_timeout->getMilliseconds().count());
+ result = rd_kafka_topic_conf_set(topic_conf_.get(), "message.timeout.ms", valueConf.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: message.timeout.ms [%s]", valueConf);
+ if (result != RD_KAFKA_CONF_OK) {
+ logger_->log_error("PublishKafka: configure message.timeout.ms error result [%s]", errstr.data());
+ return false;
}
}
diff --git a/extensions/mqtt/controllerservice/MQTTControllerService.cpp b/extensions/mqtt/controllerservice/MQTTControllerService.cpp
index 4a10266..27d5c68 100644
--- a/extensions/mqtt/controllerservice/MQTTControllerService.cpp
+++ b/extensions/mqtt/controllerservice/MQTTControllerService.cpp
@@ -39,7 +39,7 @@ core::Property MQTTControllerService::ClientID("Client ID", "MQTT client ID to u
core::Property MQTTControllerService::UserName("Username", "Username to use when connecting to the broker", "");
core::Property MQTTControllerService::Password("Password", "Password to use when connecting to the broker", "");
core::Property MQTTControllerService::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec");
-core::Property MQTTControllerService::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec");
+core::Property MQTTControllerService::ConnectionTimeout("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec");
core::Property MQTTControllerService::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0");
core::Property MQTTControllerService::Topic("Topic", "The topic to publish the message to", "");
core::Property MQTTControllerService::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", "");
@@ -86,7 +86,7 @@ void MQTTControllerService::initializeProperties() {
supportedProperties.insert(Password);
supportedProperties.insert(KeepLiveInterval);
- supportedProperties.insert(ConnectionTimeOut);
+ supportedProperties.insert(ConnectionTimeout);
supportedProperties.insert(Topic);
supportedProperties.insert(QOS);
supportedProperties.insert(SecurityProtocol);
diff --git a/extensions/mqtt/controllerservice/MQTTControllerService.h b/extensions/mqtt/controllerservice/MQTTControllerService.h
index 07b79b1..e8f30bc 100644
--- a/extensions/mqtt/controllerservice/MQTTControllerService.h
+++ b/extensions/mqtt/controllerservice/MQTTControllerService.h
@@ -74,8 +74,6 @@ class MQTTControllerService : public core::controller::ControllerService {
: ControllerService(name, uuid),
initialized_(false),
client_(nullptr),
- keepAliveInterval_(0),
- connectionTimeOut_(0),
qos_(2),
ssl_context_service_(nullptr) {
}
@@ -84,8 +82,6 @@ class MQTTControllerService : public core::controller::ControllerService {
: ControllerService(name),
initialized_(false),
client_(nullptr),
- keepAliveInterval_(0),
- connectionTimeOut_(0),
qos_(2),
ssl_context_service_(nullptr) {
setConfiguration(configuration);
@@ -98,7 +94,7 @@ class MQTTControllerService : public core::controller::ControllerService {
static core::Property Password;
static core::Property CleanSession;
static core::Property KeepLiveInterval;
- static core::Property ConnectionTimeOut;
+ static core::Property ConnectionTimeout;
static core::Property Topic;
static core::Property QOS;
static core::Property SecurityProtocol;
@@ -265,7 +261,7 @@ class MQTTControllerService : public core::controller::ControllerService {
if (MQTTClient_isConnected(client_))
return true;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
- conn_opts.keepAliveInterval = keepAliveInterval_;
+ conn_opts.keepAliveInterval = std::chrono::duration_cast<std::chrono::seconds>(keepAliveInterval_).count();
conn_opts.cleansession = 1;
if (!userName_.empty()) {
conn_opts.username = userName_.c_str();
@@ -293,8 +289,8 @@ class MQTTControllerService : public core::controller::ControllerService {
MQTTClient client_;
std::string uri_;
std::string topic_;
- int64_t keepAliveInterval_;
- int64_t connectionTimeOut_;
+ std::chrono::milliseconds keepAliveInterval_{0};
+ std::chrono::milliseconds connectionTimeout_{0};
int64_t qos_;
std::string clientID_;
std::string userName_;
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
index 1359e91..8b83834 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
@@ -41,7 +41,7 @@ core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to u
core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", "");
core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", "");
core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec");
-core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeout("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec");
core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0");
core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", "");
core::Property AbstractMQTTProcessor::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", "");
@@ -51,7 +51,7 @@ core::Property AbstractMQTTProcessor::SecurityPrivateKey("Security Private Key",
core::Property AbstractMQTTProcessor::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", "");
const std::set<core::Property> AbstractMQTTProcessor::getSupportedProperties() {
- return {BrokerURL, CleanSession, ClientID, UserName, PassWord, KeepLiveInterval, ConnectionTimeOut, QOS, Topic};
+ return {BrokerURL, CleanSession, ClientID, UserName, PassWord, KeepLiveInterval, ConnectionTimeout, QOS, Topic};
}
void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*factory*/) {
@@ -96,22 +96,16 @@ void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContex
logger_->log_debug("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_);
}
- value = "";
- if (context->getProperty(KeepLiveInterval.getName(), value) && !value.empty()) {
- core::TimeUnit unit;
- if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
- keepAliveInterval_ = valInt/1000;
- logger_->log_debug("AbstractMQTTProcessor: KeepLiveInterval [%" PRId64 "]", keepAliveInterval_);
- }
+ if (auto keep_alive_interval = context->getProperty<core::TimePeriodValue>(KeepLiveInterval)) {
+ keepAliveInterval_ = keep_alive_interval->getMilliseconds();
+ logger_->log_debug("AbstractMQTTProcessor: KeepLiveInterval [%" PRId64 "] ms", int64_t{keepAliveInterval_.count()});
}
- value = "";
- if (context->getProperty(ConnectionTimeOut.getName(), value) && !value.empty()) {
- core::TimeUnit unit;
- if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
- connectionTimeOut_ = valInt/1000;
- logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeOut [%" PRId64 "]", connectionTimeOut_);
- }
+
+ if (auto connection_timeout = context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
+ connectionTimeout_ = connection_timeout->getMilliseconds();
+ logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] ms", int64_t{connectionTimeout_.count()});
}
+
value = "";
if (context->getProperty(QOS.getName(), value) && !value.empty() && (value == MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) &&
core::Property::StringToInt(value, valInt)) {
@@ -165,7 +159,7 @@ bool AbstractMQTTProcessor::reconnect() {
if (MQTTClient_isConnected(client_))
return true;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
- conn_opts.keepAliveInterval = keepAliveInterval_;
+ conn_opts.keepAliveInterval = std::chrono::duration_cast<std::chrono::seconds>(keepAliveInterval_).count();
conn_opts.cleansession = cleanSession_;
if (!userName_.empty()) {
conn_opts.username = userName_.c_str();
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.h b/extensions/mqtt/processors/AbstractMQTTProcessor.h
index 605eb0d..1b41f4c 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.h
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.h
@@ -54,8 +54,6 @@ class AbstractMQTTProcessor : public core::Processor {
: core::Processor(name, uuid) {
client_ = nullptr;
cleanSession_ = false;
- keepAliveInterval_ = 60;
- connectionTimeOut_ = 30;
qos_ = 0;
isSubscriber_ = false;
}
@@ -65,7 +63,7 @@ class AbstractMQTTProcessor : public core::Processor {
MQTTClient_unsubscribe(client_, topic_.c_str());
}
if (client_ && MQTTClient_isConnected(client_)) {
- MQTTClient_disconnect(client_, connectionTimeOut_);
+ MQTTClient_disconnect(client_, std::chrono::milliseconds{connectionTimeout_}.count());
}
if (client_)
MQTTClient_destroy(&client_);
@@ -79,7 +77,7 @@ class AbstractMQTTProcessor : public core::Processor {
static core::Property PassWord;
static core::Property CleanSession;
static core::Property KeepLiveInterval;
- static core::Property ConnectionTimeOut;
+ static core::Property ConnectionTimeout;
static core::Property Topic;
static core::Property QOS;
static core::Property SecurityProtocol;
@@ -130,8 +128,8 @@ class AbstractMQTTProcessor : public core::Processor {
MQTTClient_deliveryToken delivered_token_;
std::string uri_;
std::string topic_;
- int64_t keepAliveInterval_;
- int64_t connectionTimeOut_;
+ std::chrono::milliseconds keepAliveInterval_ = std::chrono::seconds(60);
+ std::chrono::milliseconds connectionTimeout_ = std::chrono::seconds(30);
int64_t qos_;
bool cleanSession_;
std::string clientID_;
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
index 5d9d643..9ab293b 100644
--- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
@@ -248,9 +248,9 @@ bool SourceInitiatedSubscriptionListener::loadState() {
return true;
}
-std::string SourceInitiatedSubscriptionListener::Handler::millisecondsToXsdDuration(int64_t milliseconds) {
+std::string SourceInitiatedSubscriptionListener::Handler::millisecondsToXsdDuration(std::chrono::milliseconds milliseconds) {
char buf[1024];
- snprintf(buf, sizeof(buf), "PT%" PRId64 ".%03" PRId64 "S", milliseconds / 1000, milliseconds % 1000);
+ snprintf(buf, sizeof(buf), "PT%" PRId64 ".%03" PRId64 "S", int64_t{milliseconds.count() / 1000}, int64_t{milliseconds.count() % 1000});
return buf;
}
@@ -814,43 +814,30 @@ void SourceInitiatedSubscriptionListener::onSchedule(const std::shared_ptr<core:
if (!context->getProperty(InitialExistingEventsStrategy.getName(), initial_existing_events_strategy_)) {
throw Exception(PROCESSOR_EXCEPTION, "Initial Existing Events Strategy attribute is missing or invalid");
}
- if (!context->getProperty(SubscriptionExpirationInterval.getName(), value)) {
- throw Exception(PROCESSOR_EXCEPTION, "Subscription Expiration Interval attribute is missing or invalid");
+ if (auto subscription_expiration_interval = context->getProperty<core::TimePeriodValue>(SubscriptionExpirationInterval)) {
+ subscription_expiration_interval_ = subscription_expiration_interval->getMilliseconds();
} else {
- core::TimeUnit unit;
- if (!core::Property::StringToTime(value, subscription_expiration_interval_, unit) ||
- !core::Property::ConvertTimeUnitToMS(subscription_expiration_interval_, unit, subscription_expiration_interval_)) {
- throw Exception(PROCESSOR_EXCEPTION, "Subscription Expiration Interval attribute is invalid");
- }
+ throw Exception(PROCESSOR_EXCEPTION, "Subscription Expiration Interval attribute is missing or invalid");
}
- if (!context->getProperty(HeartbeatInterval.getName(), value)) {
- throw Exception(PROCESSOR_EXCEPTION, "Heartbeat Interval attribute is missing or invalid");
+ if (auto heartbeat_interval = context->getProperty<core::TimePeriodValue>(HeartbeatInterval)) {
+ heartbeat_interval_ = heartbeat_interval->getMilliseconds();
} else {
- core::TimeUnit unit;
- if (!core::Property::StringToTime(value, heartbeat_interval_, unit) || !core::Property::ConvertTimeUnitToMS(heartbeat_interval_, unit, heartbeat_interval_)) {
- throw Exception(PROCESSOR_EXCEPTION, "Heartbeat Interval attribute is invalid");
- }
+ throw Exception(PROCESSOR_EXCEPTION, "Heartbeat Interval attribute is missing or invalid");
}
if (!context->getProperty(MaxElements.getName(), value)) {
throw Exception(PROCESSOR_EXCEPTION, "Max Elements attribute is missing or invalid");
} else if (!core::Property::StringToInt(value, max_elements_)) {
throw Exception(PROCESSOR_EXCEPTION, "Max Elements attribute is invalid");
}
- if (!context->getProperty(MaxLatency.getName(), value)) {
- throw Exception(PROCESSOR_EXCEPTION, "Max Latency attribute is missing or invalid");
+ if (auto max_latency = context->getProperty<core::TimePeriodValue>(MaxLatency)) {
+ max_latency_ = max_latency->getMilliseconds();
} else {
- core::TimeUnit unit;
- if (!core::Property::StringToTime(value, max_latency_, unit) || !core::Property::ConvertTimeUnitToMS(max_latency_, unit, max_latency_)) {
- throw Exception(PROCESSOR_EXCEPTION, "Max Latency attribute is invalid");
- }
+ throw Exception(PROCESSOR_EXCEPTION, "Max Latency attribute is missing or invalid");
}
- if (!context->getProperty(ConnectionRetryInterval.getName(), value)) {
- throw Exception(PROCESSOR_EXCEPTION, "Connection Retry Interval attribute is missing or invalid");
+ if (auto connection_retry_interval = context->getProperty<core::TimePeriodValue>(ConnectionRetryInterval)) {
+ connection_retry_interval_ = connection_retry_interval->getMilliseconds();
} else {
- core::TimeUnit unit;
- if (!core::Property::StringToTime(value, connection_retry_interval_, unit) || !core::Property::ConvertTimeUnitToMS(connection_retry_interval_, unit, connection_retry_interval_)) {
- throw Exception(PROCESSOR_EXCEPTION, "Connection Retry Interval attribute is invalid");
- }
+ throw Exception(PROCESSOR_EXCEPTION, "Connection Retry Interval attribute is missing or invalid");
}
if (!context->getProperty(ConnectionRetryCount.getName(), value)) {
throw Exception(PROCESSOR_EXCEPTION, "Connection Retry Count attribute is missing or invalid");
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
index 976a8a1..47d3b71 100644
--- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
@@ -109,7 +109,7 @@ class SourceInitiatedSubscriptionListener : public core::Processor {
bool isAckRequested(WsXmlDocH doc);
void sendResponse(struct mg_connection* conn, const std::string& machineId, const std::string& remoteIp, char* xml_buf, size_t xml_buf_size);
- static std::string millisecondsToXsdDuration(int64_t milliseconds);
+ static std::string millisecondsToXsdDuration(std::chrono::milliseconds milliseconds);
};
protected:
@@ -130,11 +130,11 @@ class SourceInitiatedSubscriptionListener : public core::Processor {
std::string ssl_ca_cert_thumbprint_;
std::string xpath_xml_query_;
std::string initial_existing_events_strategy_;
- int64_t subscription_expiration_interval_;
- int64_t heartbeat_interval_;
+ std::chrono::milliseconds subscription_expiration_interval_;
+ std::chrono::milliseconds heartbeat_interval_;
uint32_t max_elements_;
- int64_t max_latency_;
- int64_t connection_retry_interval_;
+ std::chrono::milliseconds max_latency_;
+ std::chrono::milliseconds connection_retry_interval_;
uint32_t connection_retry_count_;
std::unique_ptr<CivetServer> server_;
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 92cbef7..425f3bd 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -32,6 +32,8 @@
#include "utils/gsl.h"
#include "core/Resource.h"
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -60,7 +62,6 @@ void FlowFileRepository::flush() {
keys.push_back(keystrings.back());
}
}
-
auto multistatus = opendb->MultiGet(options, keys, &values);
for (size_t i = 0; i < keys.size() && i < values.size() && i < multistatus.size(); ++i) {
@@ -120,7 +121,7 @@ void FlowFileRepository::run() {
prune_stored_flowfiles();
}
while (running_) {
- std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
+ std::this_thread::sleep_for(purge_period_);
flush();
auto now = std::chrono::steady_clock::now();
if ((now-last) > std::chrono::seconds(30)) {
@@ -199,7 +200,7 @@ void FlowFileRepository::prune_stored_flowfiles() {
}
bool FlowFileRepository::ExecuteWithRetry(std::function<rocksdb::Status()> operation) {
- int waitTime = 0;
+ std::chrono::milliseconds waitTime = 0ms;
for (int i=0; i < 3; ++i) {
auto status = operation();
if (status.ok()) {
@@ -208,7 +209,7 @@ bool FlowFileRepository::ExecuteWithRetry(std::function<rocksdb::Status()> opera
}
logger_->log_error("Rocksdb operation failed: %s", status.ToString());
waitTime += FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS;
- std::this_thread::sleep_for(std::chrono::milliseconds(waitTime));
+ std::this_thread::sleep_for(waitTime);
}
return false;
}
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h
index 7eb4567..157cac6 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -51,9 +51,9 @@ namespace repository {
#define FLOWFILE_CHECKPOINT_DIRECTORY "./flowfile_checkpoint"
#endif
#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M
-#define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute
-#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2000) // 2000 msec
-#define FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS (500) // msec
+constexpr auto MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME = std::chrono::minutes(10);
+constexpr auto FLOWFILE_REPOSITORY_PURGE_PERIOD = std::chrono::seconds(2);
+constexpr auto FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS = std::chrono::milliseconds(500);
/**
* Flow File repository
@@ -68,9 +68,12 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
: FlowFileRepository(name) {
}
- FlowFileRepository(const std::string repo_name = "", const std::string& checkpoint_dir = FLOWFILE_CHECKPOINT_DIRECTORY,
- std::string directory = FLOWFILE_REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
- int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD)
+ FlowFileRepository(const std::string repo_name = "",
+ const std::string& checkpoint_dir = FLOWFILE_CHECKPOINT_DIRECTORY,
+ std::string directory = FLOWFILE_REPOSITORY_DIRECTORY,
+ std::chrono::milliseconds maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+ int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE,
+ std::chrono::milliseconds purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD)
: core::SerializableComponent(repo_name),
Repository(repo_name.length() > 0 ? repo_name : core::getClassName<FlowFileRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod),
checkpoint_dir_(checkpoint_dir),
@@ -102,12 +105,10 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
}
logger_->log_debug("NiFi FlowFile Max Partition Bytes %d", max_partition_bytes_);
if (configure->get(Configure::nifi_flowfile_repository_max_storage_time, value)) {
- TimeUnit unit;
- if (Property::StringToTime(value, max_partition_millis_, unit)) {
- Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_);
- }
+ if (auto max_partition = utils::timeutils::StringToDuration<std::chrono::milliseconds>(value))
+ max_partition_millis_ = *max_partition;
}
- logger_->log_debug("NiFi FlowFile Max Storage Time: [%d] ms", max_partition_millis_);
+ logger_->log_debug("NiFi FlowFile Max Storage Time: [%" PRId64 "] ms", int64_t{max_partition_millis_.count()});
const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
logger_->log_info("Using %s FlowFileRepository", encrypted_env ? "encrypted" : "plaintext");
@@ -199,7 +200,7 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr
virtual void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo);
void start() {
- if (this->purge_period_ <= 0) {
+ if (this->purge_period_ <= std::chrono::milliseconds(0)) {
return;
}
if (running_) {
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h
index f34c03b..2505b0e 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.h
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -16,6 +16,7 @@
*/
#pragma once
+#include <cinttypes>
#include <vector>
#include <string>
#include <memory>
@@ -38,8 +39,8 @@ namespace provenance {
#define PROVENANCE_DIRECTORY "./provenance_repository"
#define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024) // 10M
-#define MAX_PROVENANCE_ENTRY_LIFE_TIME (60000) // 1 minute
-#define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec
+constexpr auto MAX_PROVENANCE_ENTRY_LIFE_TIME = std::chrono::minutes(1);
+constexpr auto PROVENANCE_PURGE_PERIOD = std::chrono::milliseconds(2500);
class ProvenanceRepository : public core::Repository, public std::enable_shared_from_this<ProvenanceRepository> {
public:
@@ -50,8 +51,8 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_
/*!
* Create a new provenance repository
*/
- explicit ProvenanceRepository(const std::string& repo_name = "", std::string directory = PROVENANCE_DIRECTORY, int64_t maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME,
- int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE, uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD)
+ explicit ProvenanceRepository(const std::string& repo_name = "", std::string directory = PROVENANCE_DIRECTORY, std::chrono::milliseconds maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME,
+ int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE, std::chrono::milliseconds purgePeriod = PROVENANCE_PURGE_PERIOD)
: core::SerializableComponent(repo_name),
Repository(repo_name.length() > 0 ? repo_name : core::getClassName<ProvenanceRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod) {
db_ = nullptr;
@@ -83,12 +84,10 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_
}
logger_->log_debug("MiNiFi Provenance Max Partition Bytes %d", max_partition_bytes_);
if (config->get(Configure::nifi_provenance_repository_max_storage_time, value)) {
- core::TimeUnit unit;
- if (core::Property::StringToTime(value, max_partition_millis_, unit)) {
- core::Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_);
- }
+ if (auto max_partition = utils::timeutils::StringToDuration<std::chrono::milliseconds>(value))
+ max_partition_millis_ = *max_partition;
}
- logger_->log_debug("MiNiFi Provenance Max Storage Time: [%d] ms", max_partition_millis_);
+ logger_->log_debug("MiNiFi Provenance Max Storage Time: [%" PRId64 "] ms", int64_t{max_partition_millis_.count()});
rocksdb::Options options;
options.create_if_missing = true;
options.use_direct_io_for_flush_and_compaction = true;
@@ -102,8 +101,8 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_
options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleFIFO;
options.compaction_options_fifo = rocksdb::CompactionOptionsFIFO(max_partition_bytes_, false);
- if (max_partition_millis_ > 0) {
- options.ttl = max_partition_millis_ / 1000;
+ if (max_partition_millis_ > std::chrono::milliseconds(0)) {
+ options.ttl = std::chrono::duration_cast<std::chrono::seconds>(max_partition_millis_).count();
}
logger_->log_info("Write buffer: %llu", options.write_buffer_size);
diff --git a/extensions/sftp/client/SFTPClient.cpp b/extensions/sftp/client/SFTPClient.cpp
index 172a153..3554d30 100644
--- a/extensions/sftp/client/SFTPClient.cpp
+++ b/extensions/sftp/client/SFTPClient.cpp
@@ -231,13 +231,12 @@ bool SFTPClient::setProxy(ProxyType type, const utils::HTTPProxy& proxy) {
return true;
}
-bool SFTPClient::setConnectionTimeout(int64_t timeout) {
- return curl_easy_setopt(easy_, CURLOPT_CONNECTTIMEOUT_MS, timeout) == CURLE_OK;
+bool SFTPClient::setConnectionTimeout(std::chrono::milliseconds timeout) {
+ return curl_easy_setopt(easy_, CURLOPT_CONNECTTIMEOUT_MS, timeout.count()) == CURLE_OK;
}
-void SFTPClient::setDataTimeout(int64_t timeout) {
- data_timeout_ = timeout;
- libssh2_session_set_timeout(ssh_session_, timeout);
+void SFTPClient::setDataTimeout(std::chrono::milliseconds timeout) {
+ libssh2_session_set_timeout(ssh_session_, timeout.count());
}
void SFTPClient::setSendKeepAlive(bool send_keepalive) {
diff --git a/extensions/sftp/client/SFTPClient.h b/extensions/sftp/client/SFTPClient.h
index 6305380..e8ca876 100644
--- a/extensions/sftp/client/SFTPClient.h
+++ b/extensions/sftp/client/SFTPClient.h
@@ -102,9 +102,9 @@ class SFTPClient {
bool setProxy(ProxyType type, const utils::HTTPProxy& proxy);
- bool setConnectionTimeout(int64_t timeout);
+ bool setConnectionTimeout(std::chrono::milliseconds timeout);
- void setDataTimeout(int64_t timeout);
+ void setDataTimeout(std::chrono::milliseconds timeout);
void setSendKeepAlive(bool send_keepalive);
@@ -179,8 +179,6 @@ class SFTPClient {
std::string private_key_file_path_;
std::string private_key_passphrase_;
- int64_t data_timeout_ = 0;
-
bool send_keepalive_ = false;
std::vector<char> curl_errorbuffer_;
diff --git a/extensions/sftp/processors/ListSFTP.cpp b/extensions/sftp/processors/ListSFTP.cpp
index de482dc..01bb166 100644
--- a/extensions/sftp/processors/ListSFTP.cpp
+++ b/extensions/sftp/processors/ListSFTP.cpp
@@ -54,6 +54,8 @@
#include "rapidjson/istreamwrapper.h"
#include "rapidjson/writer.h"
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -261,20 +263,19 @@ void ListSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
}
context->getProperty(TargetSystemTimestampPrecision.getName(), target_system_timestamp_precision_);
context->getProperty(EntityTrackingInitialListingTarget.getName(), entity_tracking_initial_listing_target_);
- if (!context->getProperty(MinimumFileAge.getName(), value)) {
- logger_->log_error("Minimum File Age attribute is missing or invalid");
+
+ if (auto minimum_file_age = context->getProperty<core::TimePeriodValue>(MinimumFileAge)) {
+ minimum_file_age_ = minimum_file_age->getMilliseconds();
} else {
- core::TimeUnit unit;
- if (!core::Property::StringToTime(value, minimum_file_age_, unit) || !core::Property::ConvertTimeUnitToMS(minimum_file_age_, unit, minimum_file_age_)) {
- logger_->log_error("Minimum File Age attribute is invalid");
- }
+ logger_->log_error("Minimum File Age attribute is missing or invalid");
}
- if (context->getProperty(MaximumFileAge.getName(), value)) {
- core::TimeUnit unit;
- if (!core::Property::StringToTime(value, maximum_file_age_, unit) || !core::Property::ConvertTimeUnitToMS(maximum_file_age_, unit, maximum_file_age_)) {
- logger_->log_error("Maximum File Age attribute is invalid");
- }
+
+ if (auto maximum_file_age = context->getProperty<core::TimePeriodValue>(MaximumFileAge)) {
+ maximum_file_age_ = maximum_file_age->getMilliseconds();
+ } else {
+ logger_->log_error("Maximum File Age attribute is missing or invalid");
}
+
if (!context->getProperty(MinimumFileSize.getName(), minimum_file_size_)) {
logger_->log_error("Minimum File Size attribute is invalid");
}
@@ -292,7 +293,7 @@ void ListSFTP::invalidateCache() {
already_loaded_from_cache_ = false;
- last_run_time_ = std::chrono::time_point<std::chrono::steady_clock>();
+ last_run_time_ = std::chrono::steady_clock::time_point();
last_listed_latest_entry_timestamp_ = 0U;
last_processed_latest_entry_timestamp_ = 0U;
latest_identifiers_processed_.clear();
@@ -358,22 +359,21 @@ bool ListSFTP::filterFile(const std::string& parent_path, const std::string& fil
}
/* Age */
- time_t now = time(nullptr);
- uint64_t file_age = (now - attrs.mtime) * 1000;
+ auto file_age = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - std::chrono::system_clock::from_time_t(attrs.mtime));
if (file_age < minimum_file_age_) {
logger_->log_debug("Ignoring \"%s/%s\" because it is younger than the Minimum File Age: %ld ms < %lu ms",
parent_path.c_str(),
filename.c_str(),
- file_age,
- minimum_file_age_);
+ file_age.count(),
+ minimum_file_age_.count());
return false;
}
- if (maximum_file_age_ != 0U && file_age > maximum_file_age_) {
- logger_->log_debug("Ignoring \"%s/%s\" because it is older than the Maximum File Age: %ld ms > %lu ms",
+ if (maximum_file_age_ != 0ms && file_age > maximum_file_age_) {
+ logger_->log_debug("Ignoring \"%s/%s\" because it is older than the Maximum File Age: %" PRId64 " ms > %" PRId64 " ms",
parent_path.c_str(),
filename.c_str(),
- file_age,
- maximum_file_age_);
+ int64_t{file_age.count()},
+ int64_t{maximum_file_age_.count()});
return false;
}
@@ -611,7 +611,7 @@ void ListSFTP::listByTrackingTimestamps(
already_loaded_from_cache_ = true;
}
- std::chrono::time_point<std::chrono::steady_clock> current_run_time = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::time_point current_run_time = std::chrono::steady_clock::now();
time_t now = time(nullptr);
/* Order children by timestamp and try to detect timestamp precision if needed */
@@ -855,7 +855,7 @@ void ListSFTP::listByTrackingEntities(
uint16_t port,
const std::string& username,
const std::string& remote_path,
- uint64_t entity_tracking_time_window,
+ std::chrono::milliseconds entity_tracking_time_window,
std::vector<Child>&& files) {
/* Load state from cache file if needed */
if (!already_loaded_from_cache_) {
@@ -870,7 +870,7 @@ void ListSFTP::listByTrackingEntities(
time_t now = time(nullptr);
uint64_t min_timestamp_to_list = (!initial_listing_complete_ && entity_tracking_initial_listing_target_ == ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE)
- ? 0U : (now * 1000 - entity_tracking_time_window);
+ ? 0U : (now * 1000 - entity_tracking_time_window.count());
/* Skip files not in the tracking window */
for (auto it = files.begin(); it != files.end(); ) {
@@ -965,7 +965,7 @@ void ListSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
/* Parse processor-specific properties */
std::string remote_path;
- uint64_t entity_tracking_time_window = 0U;
+ std::chrono::milliseconds entity_tracking_time_window = 3h; /* The default is 3 hours */
std::string value;
context->getProperty(RemotePath.getName(), remote_path);
@@ -974,16 +974,11 @@ void ListSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
remote_path.resize(remote_path.size() - 1);
}
if (context->getProperty(EntityTrackingTimeWindow.getName(), value)) {
- core::TimeUnit unit;
- if (!core::Property::StringToTime(value, entity_tracking_time_window, unit) ||
- !core::Property::ConvertTimeUnitToMS(entity_tracking_time_window, unit, entity_tracking_time_window)) {
- /* The default is 3 hours */
- entity_tracking_time_window = 3 * 3600 * 1000;
+ if (auto parsed_entity_time_window = utils::timeutils::StringToDuration<std::chrono::milliseconds>(value)) {
+ entity_tracking_time_window = parsed_entity_time_window.value();
+ } else {
logger_->log_error("Entity Tracking Time Window attribute is invalid");
}
- } else {
- /* The default is 3 hours */
- entity_tracking_time_window = 3 * 3600 * 1000;
}
/* Check whether we need to invalidate the cache based on the new properties */
diff --git a/extensions/sftp/processors/ListSFTP.h b/extensions/sftp/processors/ListSFTP.h
index 9967b38..91e196b 100644
--- a/extensions/sftp/processors/ListSFTP.h
+++ b/extensions/sftp/processors/ListSFTP.h
@@ -119,8 +119,8 @@ class ListSFTP : public SFTPProcessorBase {
bool ignore_dotted_files_;
std::string target_system_timestamp_precision_;
std::string entity_tracking_initial_listing_target_;
- uint64_t minimum_file_age_;
- uint64_t maximum_file_age_;
+ std::chrono::milliseconds minimum_file_age_;
+ std::chrono::milliseconds maximum_file_age_;
uint64_t minimum_file_size_;
uint64_t maximum_file_size_;
@@ -142,7 +142,7 @@ class ListSFTP : public SFTPProcessorBase {
bool already_loaded_from_cache_;
- std::chrono::time_point<std::chrono::steady_clock> last_run_time_;
+ std::chrono::steady_clock::time_point last_run_time_;
uint64_t last_listed_latest_entry_timestamp_;
uint64_t last_processed_latest_entry_timestamp_;
std::set<std::string> latest_identifiers_processed_;
@@ -192,7 +192,7 @@ class ListSFTP : public SFTPProcessorBase {
uint16_t port,
const std::string& username,
const std::string& remote_path,
- uint64_t entity_tracking_time_window,
+ std::chrono::milliseconds entity_tracking_time_window,
std::vector<Child>&& files);
};
diff --git a/extensions/sftp/processors/SFTPProcessorBase.cpp b/extensions/sftp/processors/SFTPProcessorBase.cpp
index ec95831..3e95eb3 100644
--- a/extensions/sftp/processors/SFTPProcessorBase.cpp
+++ b/extensions/sftp/processors/SFTPProcessorBase.cpp
@@ -176,22 +176,18 @@ void SFTPProcessorBase::parseCommonPropertiesOnSchedule(const std::shared_ptr<co
strict_host_checking_ = utils::StringUtils::toBool(value).value_or(false);
}
context->getProperty(HostKeyFile.getName(), host_key_file_);
- if (!context->getProperty(ConnectionTimeout.getName(), value)) {
- logger_->log_error("Connection Timeout attribute is missing or invalid");
+ if (auto connection_timeout = context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
+ connection_timeout_ = connection_timeout->getMilliseconds();
} else {
- core::TimeUnit unit;
- if (!core::Property::StringToTime(value, connection_timeout_, unit) || !core::Property::ConvertTimeUnitToMS(connection_timeout_, unit, connection_timeout_)) {
- logger_->log_error("Connection Timeout attribute is invalid");
- }
+ logger_->log_error("Connection Timeout attribute is missing or invalid");
}
- if (!context->getProperty(DataTimeout.getName(), value)) {
- logger_->log_error("Data Timeout attribute is missing or invalid");
+
+ if (auto data_timeout = context->getProperty<core::TimePeriodValue>(DataTimeout)) {
+ data_timeout_ = data_timeout->getMilliseconds();
} else {
- core::TimeUnit unit;
- if (!core::Property::StringToTime(value, data_timeout_, unit) || !core::Property::ConvertTimeUnitToMS(data_timeout_, unit, data_timeout_)) {
- logger_->log_error("Data Timeout attribute is invalid");
- }
+ logger_->log_error("Data Timeout attribute is missing or invalid");
}
+
if (!context->getProperty(SendKeepaliveOnTimeout.getName(), value)) {
logger_->log_error("Send Keep Alive On Timeout attribute is missing or invalid");
} else {
diff --git a/extensions/sftp/processors/SFTPProcessorBase.h b/extensions/sftp/processors/SFTPProcessorBase.h
index cb1a7a6..973efbd 100644
--- a/extensions/sftp/processors/SFTPProcessorBase.h
+++ b/extensions/sftp/processors/SFTPProcessorBase.h
@@ -73,8 +73,8 @@ class SFTPProcessorBase : public core::Processor {
protected:
std::shared_ptr<core::logging::Logger> logger_;
- int64_t connection_timeout_;
- int64_t data_timeout_;
+ std::chrono::milliseconds connection_timeout_;
+ std::chrono::milliseconds data_timeout_;
std::string host_key_file_;
bool strict_host_checking_;
bool use_keepalive_on_timeout_;
diff --git a/extensions/splunk/QuerySplunkIndexingStatus.cpp b/extensions/splunk/QuerySplunkIndexingStatus.cpp
index bd7e35a..8eebb0f 100644
--- a/extensions/splunk/QuerySplunkIndexingStatus.cpp
+++ b/extensions/splunk/QuerySplunkIndexingStatus.cpp
@@ -70,12 +70,8 @@ void QuerySplunkIndexingStatus::onSchedule(const std::shared_ptr<core::ProcessCo
gsl_Expects(context && sessionFactory);
SplunkHECProcessor::onSchedule(context, sessionFactory);
std::string max_wait_time_str;
- if (context->getProperty(MaximumWaitingTime.getName(), max_wait_time_str)) {
- core::TimeUnit unit;
- uint64_t max_wait_time;
- if (core::Property::StringToTime(max_wait_time_str, max_wait_time, unit) && core::Property::ConvertTimeUnitToMS(max_wait_time, unit, max_wait_time)) {
- max_age_ = std::chrono::milliseconds(max_wait_time);
- }
+ if (auto max_age = context->getProperty<core::TimePeriodValue>(MaximumWaitingTime)) {
+ max_age_ = max_age->getMilliseconds();
}
context->getProperty(MaxQuerySize.getName(), batch_size_);
diff --git a/extensions/standard-processors/processors/DefragmentText.cpp b/extensions/standard-processors/processors/DefragmentText.cpp
index d473445..53a15a4 100644
--- a/extensions/standard-processors/processors/DefragmentText.cpp
+++ b/extensions/standard-processors/processors/DefragmentText.cpp
@@ -63,24 +63,16 @@ void DefragmentText::initialize() {
void DefragmentText::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory*) {
gsl_Expects(context);
- std::string max_buffer_age_str;
- if (context->getProperty(MaxBufferAge.getName(), max_buffer_age_str)) {
- core::TimeUnit unit;
- uint64_t max_buffer_age;
- if (core::Property::StringToTime(max_buffer_age_str, max_buffer_age, unit) && core::Property::ConvertTimeUnitToMS(max_buffer_age, unit, max_buffer_age)) {
- buffer_.setMaxAge(std::chrono::milliseconds(max_buffer_age));
- setTriggerWhenEmpty(true);
- logger_->log_trace("The Buffer maximum age is configured to be %" PRIu64 " ms", max_buffer_age);
- }
+ if (auto max_buffer_age = context->getProperty<core::TimePeriodValue>(MaxBufferAge)) {
+ buffer_.setMaxAge(max_buffer_age->getMilliseconds());
+ setTriggerWhenEmpty(true);
+ logger_->log_trace("The Buffer maximum age is configured to be %" PRId64 " ms", int64_t{max_buffer_age->getMilliseconds().count()});
}
- std::string max_buffer_size_str;
- if (context->getProperty(MaxBufferSize.getName(), max_buffer_size_str)) {
- uint64_t max_buffer_size = core::DataSizeValue(max_buffer_size_str).getValue();
- if (max_buffer_size > 0) {
- buffer_.setMaxSize(max_buffer_size);
- logger_->log_trace("The Buffer maximum size is configured to be %" PRIu64 " B", max_buffer_size);
- }
+ auto max_buffer_size = context->getProperty<core::DataSizeValue>(MaxBufferSize);
+ if (max_buffer_size.has_value() && max_buffer_size->getValue() > 0) {
+ buffer_.setMaxSize(max_buffer_size->getValue());
+ logger_->log_trace("The Buffer maximum size is configured to be %" PRIu64 " B", max_buffer_size->getValue());
}
context->getProperty(PatternLoc.getName(), pattern_location_);
diff --git a/extensions/standard-processors/processors/DefragmentText.h b/extensions/standard-processors/processors/DefragmentText.h
index 1f52379..ee2b2e7 100644
--- a/extensions/standard-processors/processors/DefragmentText.h
+++ b/extensions/standard-processors/processors/DefragmentText.h
@@ -79,7 +79,7 @@ class DefragmentText : public core::Processor {
void store(core::ProcessSession* session, const std::shared_ptr<core::FlowFile>& new_buffered_flow_file);
std::shared_ptr<core::FlowFile> buffered_flow_file_;
- std::chrono::time_point<std::chrono::steady_clock> creation_time_;
+ std::chrono::steady_clock::time_point creation_time_;
std::optional<std::chrono::milliseconds> max_age_;
std::optional<size_t> max_size_;
};
diff --git a/extensions/standard-processors/processors/ExecuteProcess.cpp b/extensions/standard-processors/processors/ExecuteProcess.cpp
index 2dabc6e..29d444d 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.cpp
+++ b/extensions/standard-processors/processors/ExecuteProcess.cpp
@@ -40,6 +40,8 @@
#pragma GCC diagnostic ignored "-Wunused-result"
#endif
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -94,11 +96,9 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
if (context->getProperty(WorkingDir, value, flow_file)) {
this->_workingDir = value;
}
- if (context->getProperty(BatchDuration.getName(), value)) {
- core::TimeUnit unit;
- if (core::Property::StringToTime(value, _batchDuration, unit) && core::Property::ConvertTimeUnitToMS(_batchDuration, unit, _batchDuration)) {
- logger_->log_debug("Setting _batchDuration");
- }
+ if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) {
+ _batchDuration = batch_duration->getMilliseconds();
+ logger_->log_debug("Setting _batchDuration");
}
if (context->getProperty(RedirectErrorStream.getName(), value)) {
_redirectErrorStream = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
@@ -158,9 +158,9 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
default: // this is the code the parent runs
// the parent isn't going to write to the pipe
close(_pipefd[1]);
- if (_batchDuration > 0) {
- while (1) {
- std::this_thread::sleep_for(std::chrono::milliseconds(_batchDuration));
+ if (_batchDuration > 0ms) {
+ while (true) {
+ std::this_thread::sleep_for(_batchDuration);
char buffer[4096];
const auto numRead = read(_pipefd[0], buffer, sizeof(buffer));
if (numRead <= 0)
diff --git a/extensions/standard-processors/processors/ExecuteProcess.h b/extensions/standard-processors/processors/ExecuteProcess.h
index bdf14b5..9960c75 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.h
+++ b/extensions/standard-processors/processors/ExecuteProcess.h
@@ -60,7 +60,6 @@ class ExecuteProcess : public core::Processor {
ExecuteProcess(const std::string& name, const utils::Identifier& uuid = {}) // NOLINT
: Processor(name, uuid) {
_redirectErrorStream = false;
- _batchDuration = 0;
_workingDir = ".";
_processRunning = false;
_pid = 0;
@@ -111,7 +110,7 @@ class ExecuteProcess : public core::Processor {
std::string _command;
std::string _commandArgument;
std::string _workingDir;
- int64_t _batchDuration;
+ std::chrono::milliseconds _batchDuration = std::chrono::milliseconds(0);
bool _redirectErrorStream;
// Full command
std::string _fullCommand;
diff --git a/extensions/standard-processors/processors/GetFile.cpp b/extensions/standard-processors/processors/GetFile.cpp
index 446dcec..6f19ef0 100644
--- a/extensions/standard-processors/processors/GetFile.cpp
+++ b/extensions/standard-processors/processors/GetFile.cpp
@@ -38,6 +38,8 @@
#include "core/TypedValues.h"
#include "utils/FileReaderCallback.h"
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -119,8 +121,10 @@ void GetFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFact
request_.keepSourceFile = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
}
- context->getProperty(MaxAge.getName(), request_.maxAge);
- context->getProperty(MinAge.getName(), request_.minAge);
+ if (auto max_age = context->getProperty<core::TimePeriodValue>(MaxAge))
+ request_.maxAge = max_age->getMilliseconds();
+ if (auto min_age = context->getProperty<core::TimePeriodValue>(MinAge))
+ request_.minAge = min_age->getMilliseconds();
if (context->getProperty(MaxSize.getName(), value)) {
core::Property::StringToInt(value, request_.maxSize);
@@ -154,9 +158,9 @@ void GetFile::onTrigger(core::ProcessContext* /*context*/, core::ProcessSession*
const bool is_dir_empty_before_poll = isListingEmpty();
logger_->log_debug("Listing is %s before polling directory", is_dir_empty_before_poll ? "empty" : "not empty");
if (is_dir_empty_before_poll) {
- if (request_.pollInterval == 0 || (utils::timeutils::getTimeMillis() - last_listing_time_) > request_.pollInterval) {
+ if (request_.pollInterval == 0ms || (std::chrono::system_clock::now() - last_listing_time_.load()) > request_.pollInterval) {
performListing(request_);
- last_listing_time_.store(utils::timeutils::getTimeMillis());
+ last_listing_time_.store(std::chrono::system_clock::now());
}
}
@@ -242,7 +246,7 @@ bool GetFile::fileMatchesRequestCriteria(std::string fullName, std::string name,
}
#endif
uint64_t file_size = gsl::narrow<uint64_t>(statbuf.st_size);
- uint64_t modifiedTime = gsl::narrow<uint64_t>(statbuf.st_mtime) * 1000;
+ auto modifiedTime = std::chrono::system_clock::time_point() + std::chrono::seconds(gsl::narrow<uint64_t>(statbuf.st_mtime));
if (request.minSize > 0 && file_size < request.minSize)
return false;
@@ -250,10 +254,10 @@ bool GetFile::fileMatchesRequestCriteria(std::string fullName, std::string name,
if (request.maxSize > 0 && file_size > request.maxSize)
return false;
- uint64_t fileAge = utils::timeutils::getTimeMillis() - modifiedTime;
- if (request.minAge > 0 && fileAge < request.minAge)
+ auto fileAge = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - modifiedTime);
+ if (request.minAge > 0ms && fileAge < request.minAge)
return false;
- if (request.maxAge > 0 && fileAge > request.maxAge)
+ if (request.maxAge > 0ms && fileAge > request.maxAge)
return false;
if (request.ignoreHiddenFile && utils::file::FileUtils::is_hidden(fullName))
diff --git a/extensions/standard-processors/processors/GetFile.h b/extensions/standard-processors/processors/GetFile.h
index 71fc6ff..9912028 100644
--- a/extensions/standard-processors/processors/GetFile.h
+++ b/extensions/standard-processors/processors/GetFile.h
@@ -41,12 +41,12 @@ namespace processors {
struct GetFileRequest {
bool recursive = true;
bool keepSourceFile = false;
- uint64_t minAge = 0;
- uint64_t maxAge = 0;
+ std::chrono::milliseconds minAge{0};
+ std::chrono::milliseconds maxAge{0};
uint64_t minSize = 0;
uint64_t maxSize = 0;
bool ignoreHiddenFile = true;
- uint64_t pollInterval = 0;
+ std::chrono::milliseconds pollInterval{0};
uint64_t batchSize = 10;
std::string fileFilter = "[^\\.].*";
std::string inputDirectory;
@@ -113,8 +113,7 @@ class GetFile : public core::Processor, public state::response::MetricsNodeSourc
*/
explicit GetFile(const std::string& name, const utils::Identifier& uuid = {})
: Processor(name, uuid),
- metrics_(std::make_shared<GetFileMetrics>()),
- last_listing_time_(0) {
+ metrics_(std::make_shared<GetFileMetrics>()) {
}
// Destructor
~GetFile() override = default;
@@ -176,7 +175,7 @@ class GetFile : public core::Processor, public state::response::MetricsNodeSourc
GetFileRequest request_;
std::queue<std::string> directory_listing_;
mutable std::mutex directory_listing_mutex_;
- std::atomic<uint64_t> last_listing_time_;
+ std::atomic<std::chrono::time_point<std::chrono::system_clock>> last_listing_time_{};
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<GetFile>::getLogger();
};
diff --git a/extensions/standard-processors/processors/GetTCP.cpp b/extensions/standard-processors/processors/GetTCP.cpp
index 78e0339..1cf1781 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -149,12 +149,11 @@ void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co
logger_->log_trace("EOM is defined as %i", endOfMessageByte);
- std::string reconnect_interval_str;
- if (context->getProperty(ReconnectInterval.getName(), reconnect_interval_str) &&
- core::Property::getTimeMSFromString(reconnect_interval_str, reconnect_interval_)) {
- logger_->log_debug("Reconnect interval is %llu ms", reconnect_interval_);
+ if (auto reconnect_interval = context->getProperty<core::TimePeriodValue>(ReconnectInterval)) {
+ reconnect_interval_ = reconnect_interval->getMilliseconds();
+ logger_->log_debug("Reconnect interval is %" PRId64 " ms", reconnect_interval_.count());
} else {
- logger_->log_debug("Reconnect interval using default value of %llu ms", reconnect_interval_);
+ logger_->log_debug("Reconnect interval using default value of %" PRId64 " ms", reconnect_interval_.count());
}
handler_ = std::unique_ptr<DataHandler>(new DataHandler(sessionFactory));
@@ -200,17 +199,17 @@ void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co
socket_ptr->close();
return -1;
}
- logger_->log_info("Sleeping for %" PRIu64 " msec before attempting to reconnect", reconnect_interval_);
- std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_));
+ logger_->log_info("Sleeping for %" PRId64 " msec before attempting to reconnect", int64_t{reconnect_interval_.count()});
+ std::this_thread::sleep_for(reconnect_interval_);
socket_ring_buffer_.enqueue(std::move(socket_ptr));
} else {
socket_ptr->close();
- std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_));
+ std::this_thread::sleep_for(reconnect_interval_);
logger_->log_info("Read response returned a -1 from socket, exiting thread");
return -1;
}
} else {
- std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_));
+ std::this_thread::sleep_for(reconnect_interval_);
logger_->log_info("Could not use socket, exiting thread");
return -1;
}
diff --git a/extensions/standard-processors/processors/GetTCP.h b/extensions/standard-processors/processors/GetTCP.h
index a962f29..3f8110f 100644
--- a/extensions/standard-processors/processors/GetTCP.h
+++ b/extensions/standard-processors/processors/GetTCP.h
@@ -179,7 +179,6 @@ class GetTCP : public core::Processor, public state::response::MetricsNodeSource
stay_connected_(true),
concurrent_handlers_(2),
endOfMessageByte(13),
- reconnect_interval_(5000),
receive_buffer_size_(16 * 1024 * 1024),
connection_attempt_limit_(3),
ssl_service_(nullptr) {
@@ -256,7 +255,7 @@ class GetTCP : public core::Processor, public state::response::MetricsNodeSource
int8_t endOfMessageByte;
- uint64_t reconnect_interval_;
+ std::chrono::milliseconds reconnect_interval_{5000};
uint64_t receive_buffer_size_;
diff --git a/extensions/standard-processors/processors/LogAttribute.cpp b/extensions/standard-processors/processors/LogAttribute.cpp
index 61f54c8..931df64 100644
--- a/extensions/standard-processors/processors/LogAttribute.cpp
+++ b/extensions/standard-processors/processors/LogAttribute.cpp
@@ -128,8 +128,8 @@ void LogAttribute::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
message << dashLine;
message << "\nStandard FlowFile Attributes";
message << "\n" << "UUID:" << flow->getUUIDStr();
- message << "\n" << "EntryDate:" << utils::timeutils::getTimeStr(flow->getEntryDate());
- message << "\n" << "lineageStartDate:" << utils::timeutils::getTimeStr(flow->getlineageStartDate());
+ message << "\n" << "EntryDate:" << utils::timeutils::getTimeStr(std::chrono::duration_cast<std::chrono::milliseconds>(flow->getEntryDate().time_since_epoch()).count());
+ message << "\n" << "lineageStartDate:" << utils::timeutils::getTimeStr(std::chrono::duration_cast<std::chrono::milliseconds>(flow->getlineageStartDate().time_since_epoch()).count());
message << "\n" << "Size:" << flow->getSize() << " Offset:" << flow->getOffset();
message << "\nFlowFile Attributes Map Content";
std::map<std::string, std::string> attrs = flow->getAttributes();
diff --git a/extensions/standard-processors/tests/unit/GetFileTests.cpp b/extensions/standard-processors/tests/unit/GetFileTests.cpp
index 6929dd2..d0bb06f 100644
--- a/extensions/standard-processors/tests/unit/GetFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/GetFileTests.cpp
@@ -19,6 +19,7 @@
#include <memory>
#include <string>
#include <fstream>
+#include <chrono>
#include "TestBase.h"
#include "LogAttribute.h"
@@ -31,7 +32,7 @@
#include <fileapi.h>
#endif
-using namespace std::chrono_literals; // NOLINT using namespace directive is required for literals
+using namespace std::literals::chrono_literals;
namespace {
@@ -252,12 +253,12 @@ TEST_CASE("Test if GetFile honors PollInterval property when triggered multiple
test_controller.setProperty(minifi::processors::GetFile::KeepSourceFile, "true");
test_controller.runSession();
- auto start_time = utils::timeutils::getTimeMillis();
+ auto start_time = std::chrono::steady_clock::now();
while (LogTestController::getInstance().countOccurrences("Logged 2 flow files") < 2) {
test_controller.test_plan_->reset();
test_controller.runSession();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
- REQUIRE(utils::timeutils::getTimeMillis() - start_time >= 100);
+ REQUIRE(std::chrono::steady_clock::now() - start_time >= 100ms);
}
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index ab99255..cd410f3 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -42,6 +42,8 @@
#include "LogAttribute.h"
#include "utils/TestUtils.h"
+using namespace std::literals::chrono_literals;
+
static const std::string NEWLINE_FILE = "" // NOLINT
"one,two,three\n"
"four,five,six, seven";
@@ -1157,7 +1159,7 @@ TEST_CASE("TailFile yields if no work is done", "[yield]") {
testController.runSession(plan, true);
- REQUIRE(tail_file->getYieldTime() > 0);
+ REQUIRE(tail_file->getYieldTime() > 0ms);
SECTION("No logging happened between onTrigger calls => yield") {
plan->reset();
@@ -1165,7 +1167,7 @@ TEST_CASE("TailFile yields if no work is done", "[yield]") {
testController.runSession(plan, true);
- REQUIRE(tail_file->getYieldTime() > 0);
+ REQUIRE(tail_file->getYieldTime() > 0ms);
}
SECTION("Some logging happened between onTrigger calls => don't yield") {
@@ -1176,7 +1178,7 @@ TEST_CASE("TailFile yields if no work is done", "[yield]") {
testController.runSession(plan, true);
- REQUIRE(tail_file->getYieldTime() == 0);
+ REQUIRE(tail_file->getYieldTime() == 0ms);
}
}
@@ -1185,7 +1187,7 @@ TEST_CASE("TailFile yields if no work is done", "[yield]") {
testController.runSession(plan, true);
- REQUIRE(tail_file->getYieldTime() == 0);
+ REQUIRE(tail_file->getYieldTime() == 0ms);
SECTION("No logging happened between onTrigger calls => yield") {
plan->reset();
@@ -1193,7 +1195,7 @@ TEST_CASE("TailFile yields if no work is done", "[yield]") {
testController.runSession(plan, true);
- REQUIRE(tail_file->getYieldTime() > 0);
+ REQUIRE(tail_file->getYieldTime() > 0ms);
}
SECTION("Some logging happened between onTrigger calls => don't yield") {
@@ -1204,7 +1206,7 @@ TEST_CASE("TailFile yields if no work is done", "[yield]") {
testController.runSession(plan, true);
- REQUIRE(tail_file->getYieldTime() == 0);
+ REQUIRE(tail_file->getYieldTime() == 0ms);
}
}
}
@@ -1236,7 +1238,7 @@ TEST_CASE("TailFile yields if no work is done on any files", "[yield][multiple_f
SECTION("No file changed => yield") {
testController.runSession(plan, true);
- REQUIRE(tail_file->getYieldTime() > 0);
+ REQUIRE(tail_file->getYieldTime() > 0ms);
}
SECTION("One file changed => don't yield") {
@@ -1246,7 +1248,7 @@ TEST_CASE("TailFile yields if no work is done on any files", "[yield][multiple_f
testController.runSession(plan, true);
- REQUIRE(tail_file->getYieldTime() == 0);
+ REQUIRE(tail_file->getYieldTime() == 0ms);
}
SECTION("More than one file changed => don't yield") {
@@ -1262,7 +1264,7 @@ TEST_CASE("TailFile yields if no work is done on any files", "[yield][multiple_f
testController.runSession(plan, true);
- REQUIRE(tail_file->getYieldTime() == 0);
+ REQUIRE(tail_file->getYieldTime() == 0ms);
}
}
@@ -1300,7 +1302,7 @@ TEST_CASE("TailFile doesn't yield if work was done on rotated files only", "[yie
testController.runSession(plan, true);
- REQUIRE(tail_file->getYieldTime() > 0);
+ REQUIRE(tail_file->getYieldTime() > 0ms);
}
SECTION("File rotated and new stuff is added => don't yield") {
@@ -1316,7 +1318,7 @@ TEST_CASE("TailFile doesn't yield if work was done on rotated files only", "[yie
testController.runSession(plan, true);
- REQUIRE(tail_file->getYieldTime() == 0);
+ REQUIRE(tail_file->getYieldTime() == 0ms);
}
}
diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index 1e2ab4c..7bba6d4 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -27,7 +27,7 @@
#include "TestBase.h"
#include "utils/TestUtils.h"
-using namespace std::chrono_literals; // NOLINT using namespace directive is required for literals
+using namespace std::literals::chrono_literals;
TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
TestController test_controller;
@@ -153,10 +153,10 @@ Provenance Reporting:
REQUIRE(!rootFlowConfig->findProcessorByName("TailFile")->getUUIDStr().empty());
REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks());
REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingStrategy());
- REQUIRE(1 * 1000 * 1000 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano());
+ REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano());
REQUIRE(30s == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod());
- REQUIRE(1 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec());
- REQUIRE(0 == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano());
+ REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec());
+ REQUIRE(0s == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano());
std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
rootFlowConfig->getConnections(connectionMap);
@@ -167,7 +167,7 @@ Provenance Reporting:
REQUIRE(!it.second->getUUIDStr().empty());
REQUIRE(it.second->getDestination());
REQUIRE(it.second->getSource());
- REQUIRE(60000 == it.second->getFlowExpirationDuration());
+ REQUIRE(60s == it.second->getFlowExpirationDuration());
}
}
@@ -478,10 +478,10 @@ NiFi Properties Overrides: {}
REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks());
REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingStrategy());
REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks());
- REQUIRE(1 * 1000 * 1000 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano());
+ REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano());
REQUIRE(30s == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod());
- REQUIRE(1 * 1000 == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec());
- REQUIRE(0 == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano());
+ REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec());
+ REQUIRE(0s == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano());
std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
rootFlowConfig->getConnections(connectionMap);
@@ -492,7 +492,7 @@ NiFi Properties Overrides: {}
REQUIRE(!it.second->getUUIDStr().empty());
REQUIRE(it.second->getDestination());
REQUIRE(it.second->getSource());
- REQUIRE(0 == it.second->getFlowExpirationDuration());
+ REQUIRE(0s == it.second->getFlowExpirationDuration());
}
}
diff --git a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
index a0015e8..7c185f0 100644
--- a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
@@ -23,6 +23,8 @@
#include "TestBase.h"
#include "utils/TestUtils.h"
+using namespace std::literals::chrono_literals;
+
namespace {
using org::apache::nifi::minifi::core::yaml::YamlConnectionParser;
@@ -94,7 +96,7 @@ TEST_CASE("Connections components are parsed from yaml", "[YamlConfiguration]")
YAML::Node connection_node = YAML::Load(std::string {
"flowfile expiration: 2 min\n" });
YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
- REQUIRE(120000 == yaml_connection_parser.getFlowFileExpirationFromYaml()); // 2 * 60 * 1000 ms
+ REQUIRE(2min == yaml_connection_parser.getFlowFileExpirationFromYaml());
}
SECTION("Drop empty value is read") {
SECTION("When config contains true value") {
@@ -165,7 +167,7 @@ TEST_CASE("Connections components are parsed from yaml", "[YamlConfiguration]")
YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
CHECK(0 == yaml_connection_parser.getWorkQueueSizeFromYaml());
CHECK(0 == yaml_connection_parser.getWorkQueueDataSizeFromYaml());
- CHECK(0 == yaml_connection_parser.getFlowFileExpirationFromYaml());
+ CHECK(0s == yaml_connection_parser.getFlowFileExpirationFromYaml());
CHECK(0 == yaml_connection_parser.getDropEmptyFromYaml());
}
}
@@ -189,7 +191,7 @@ TEST_CASE("Connections components are parsed from yaml", "[YamlConfiguration]")
"drop empty: NULL\n"});
YamlConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
CHECK(2 == yaml_connection_parser.getWorkQueueDataSizeFromYaml());
- CHECK(0 == yaml_connection_parser.getFlowFileExpirationFromYaml());
+ CHECK(0s == yaml_connection_parser.getFlowFileExpirationFromYaml());
CHECK(0 == yaml_connection_parser.getDropEmptyFromYaml());
}
}
diff --git a/extensions/usb-camera/GetUSBCamera.cpp b/extensions/usb-camera/GetUSBCamera.cpp
index 2eaff3e..0d4399f 100644
--- a/extensions/usb-camera/GetUSBCamera.cpp
+++ b/extensions/usb-camera/GetUSBCamera.cpp
@@ -82,7 +82,7 @@ void GetUSBCamera::onFrame(uvc_frame_t *frame, void *ptr) {
return;
}
- auto now = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
+ auto now = std::chrono::steady_clock::now();
if (now - cb_data->last_frame_time < std::chrono::milliseconds(static_cast<int>(1000.0 / cb_data->target_fps))) {
return;
@@ -339,7 +339,7 @@ void GetUSBCamera::onSchedule(core::ProcessContext *context, core::ProcessSessio
cb_data_.device_height = height;
cb_data_.device_fps = fps;
cb_data_.target_fps = target_fps;
- cb_data_.last_frame_time = std::chrono::milliseconds(0);
+ cb_data_.last_frame_time = std::chrono::steady_clock::time_point();
res = uvc_start_streaming(devh_, &ctrl, onFrame, &cb_data_, 0);
diff --git a/extensions/usb-camera/GetUSBCamera.h b/extensions/usb-camera/GetUSBCamera.h
index 307b03c..49da8c6 100644
--- a/extensions/usb-camera/GetUSBCamera.h
+++ b/extensions/usb-camera/GetUSBCamera.h
@@ -87,7 +87,7 @@ class GetUSBCamera : public core::Processor {
uint16_t device_height;
uint32_t device_fps;
double target_fps;
- std::chrono::milliseconds last_frame_time;
+ std::chrono::steady_clock::time_point last_frame_time;
} CallbackData;
static void onFrame(uvc_frame_t *frame, void *ptr);
diff --git a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
index 3365f0a..cd10b04 100644
--- a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
+++ b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
@@ -39,6 +39,8 @@
#pragma comment(lib, "wevtapi.lib")
#pragma comment(lib, "Wecapi.lib")
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -648,7 +650,7 @@ int CollectorInitiatedSubscription::processQueue(const std::shared_ptr<core::Pro
session->write(flowFile, &wc);
}
session->putAttribute(flowFile, core::SpecialFlowAttribute::MIME_TYPE, "application/xml");
- session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0);
+ session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0ms);
session->transfer(flowFile, s_success);
flowFileCount++;
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index bbfec46..da1a93a 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -51,6 +51,8 @@
#pragma comment(lib, "wevtapi.lib")
#pragma comment(lib, "ole32.lib")
+using namespace std::chrono_literals; // NOLINT(build/namespaces)
+
namespace org {
namespace apache {
namespace nifi {
@@ -712,7 +714,7 @@ void ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender&
session.putAttribute(flowFile, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
session.putAttribute(flowFile, "Timezone name", timezone_name_);
session.putAttribute(flowFile, "Timezone offset", timezone_offset_);
- session.getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0);
+ session.getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0ms);
session.transfer(flowFile, Success);
};
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index 39a3ec4..e79a384 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -124,11 +124,11 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
return max_data_queue_size_;
}
// Set Flow expiration duration in millisecond
- void setFlowExpirationDuration(uint64_t duration) {
+ void setFlowExpirationDuration(std::chrono::milliseconds duration) {
expired_duration_ = duration;
}
// Get Flow expiration duration in millisecond
- uint64_t getFlowExpirationDuration() {
+ std::chrono::milliseconds getFlowExpirationDuration() {
return expired_duration_;
}
@@ -183,26 +183,26 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
// Relationship for this connection
std::set<core::Relationship> relationships_;
// Source Processor (ProcessNode/Port)
- std::shared_ptr<core::Connectable> source_connectable_;
+ std::shared_ptr<core::Connectable> source_connectable_ = nullptr;
// Destination Processor (ProcessNode/Port)
- std::shared_ptr<core::Connectable> dest_connectable_;
+ std::shared_ptr<core::Connectable> dest_connectable_ = nullptr;
// Max queue size to apply back pressure
- std::atomic<uint64_t> max_queue_size_;
+ std::atomic<uint64_t> max_queue_size_ = 0;
// Max queue data size to apply back pressure
- std::atomic<uint64_t> max_data_queue_size_;
+ std::atomic<uint64_t> max_data_queue_size_ = 0;
// Flow File Expiration Duration in= MilliSeconds
- std::atomic<uint64_t> expired_duration_;
+ std::atomic<std::chrono::milliseconds> expired_duration_ = std::chrono::milliseconds(0);
// flow file repository
std::shared_ptr<core::Repository> flow_repository_;
// content repository reference.
std::shared_ptr<core::ContentRepository> content_repo_;
private:
- bool drop_empty_;
+ bool drop_empty_ = false;
// Mutex for protection
mutable std::mutex mutex_;
// Queued data size
- std::atomic<uint64_t> queued_data_size_;
+ std::atomic<uint64_t> queued_data_size_ = 0;
// Queue for the Flow File
utils::FlowFileQueue queue_;
// flow repository
diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h
index 9321371..7c5f3b8 100644
--- a/libminifi/include/FlowControlProtocol.h
+++ b/libminifi/include/FlowControlProtocol.h
@@ -172,8 +172,8 @@ class FlowControlProtocol {
logger_->log_info("NiFi Server Port: [%" PRIu16 "]", _serverPort);
}
if (configure->get(Configure::nifi_server_report_interval, value)) {
- core::TimeUnit unit;
- if (core::Property::StringToTime(value, _reportInterval, unit) && core::Property::ConvertTimeUnitToMS(_reportInterval, unit, _reportInterval)) {
+ if (auto parsed_time = utils::timeutils::StringToDuration<std::chrono::milliseconds>(value)) {
+ _reportInterval = parsed_time->count();
logger_->log_info("NiFi server report interval: [%" PRId64 "] ms", _reportInterval);
}
} else {
diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h
index fea3e7a..1401655 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -128,7 +128,7 @@ class RemoteProcessorGroupPort : public core::Processor {
this->setTriggerWhenEmpty(true);
}
// Set Timeout
- void setTimeOut(uint64_t timeout) {
+ void setTimeout(uint64_t timeout) {
timeout_ = timeout;
}
// SetTransmitting
@@ -208,7 +208,7 @@ class RemoteProcessorGroupPort : public core::Processor {
utils::Identifier protocol_uuid_;
- std::chrono::milliseconds idle_timeout_{15000};
+ std::chrono::milliseconds idle_timeout_ = std::chrono::seconds(15);
// rest API end point info
std::vector<struct RPG> nifi_instances_;
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index be95708..e89a19a 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -121,9 +121,9 @@ class SchedulingAgent {
// Whether it is running
std::atomic<bool> running_;
// AdministrativeYieldDuration
- int64_t admin_yield_duration_;
+ std::chrono::milliseconds admin_yield_duration_;
// BoredYieldDuration
- int64_t bored_yield_duration_;
+ std::chrono::milliseconds bored_yield_duration_;
std::shared_ptr<Configure> configure_;
@@ -139,9 +139,9 @@ class SchedulingAgent {
private:
struct SchedulingInfo {
- std::chrono::time_point<std::chrono::steady_clock> start_time_ = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::time_point start_time_ = std::chrono::steady_clock::now();
// Mutable is required to be able to modify this while leaving in std::set
- mutable std::chrono::time_point<std::chrono::steady_clock> last_alert_time_ = std::chrono::steady_clock::now();
+ mutable std::chrono::steady_clock::time_point last_alert_time_ = std::chrono::steady_clock::now();
std::string name_;
std::string uuid_;
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index a7f1dc9..5d0b6b5 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -85,7 +85,7 @@ class C2Agent : public state::UpdateController {
*/
void performHeartBeat();
- int64_t getHeartBeatDelay() {
+ std::chrono::milliseconds getHeartBeatDelay() {
std::lock_guard<std::mutex> lock(heartbeat_mutex);
return heart_beat_period_;
}
@@ -185,7 +185,7 @@ class C2Agent : public state::UpdateController {
utils::ConcurrentQueue<C2Payload> requests;
// heart beat period.
- int64_t heart_beat_period_;
+ std::chrono::milliseconds heart_beat_period_;
// maximum number of queued messages to send to the c2 server
size_t max_c2_responses;
diff --git a/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h b/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
index a59b336..7940d14 100644
--- a/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
+++ b/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
@@ -52,7 +52,7 @@ class AbstractAutoPersistingKeyValueStoreService : virtual public PersistableKey
protected:
bool always_persist_;
- uint64_t auto_persistence_interval_;
+ std::chrono::milliseconds auto_persistence_interval_;
std::thread persisting_thread_;
bool running_;
diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h
index 250ea4f..a25a46a 100644
--- a/libminifi/include/core/ConfigurableComponent.h
+++ b/libminifi/include/core/ConfigurableComponent.h
@@ -230,8 +230,12 @@ bool ConfigurableComponent::getProperty(const std::string name, T &value) const
return false;
}
logger_->log_debug("Component %s property name %s value %s", name, property.getName(), property.getValue().to_string());
- // cast throws if the value is invalid
- value = static_cast<T>(property.getValue());
+
+ if constexpr (std::is_base_of<TransformableValue, T>::value) {
+ value = T(property.getValue().operator std::string());
+ } else {
+ value = static_cast<T>(property.getValue()); // cast throws if the value is invalid
+ }
return true;
} else {
logger_->log_warn("Could not find property %s", name);
diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h
index 6a794f9..2e1899b 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -108,24 +108,24 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
* Get entry date for this record
* @return entry date uint64_t
*/
- [[nodiscard]] uint64_t getEntryDate() const;
+ [[nodiscard]] std::chrono::system_clock::time_point getEntryDate() const;
/**
* Gets the event time.
* @return event time.
*/
- [[nodiscard]] uint64_t getEventTime() const;
+ [[nodiscard]] std::chrono::system_clock::time_point getEventTime() const;
/**
* Get lineage start date
* @return lineage start date uint64_t
*/
- [[nodiscard]] uint64_t getlineageStartDate() const;
+ [[nodiscard]] std::chrono::system_clock::time_point getlineageStartDate() const;
/**
* Sets the lineage start date
* @param date new lineage start date
*/
- void setLineageStartDate(uint64_t date);
+ void setLineageStartDate(const std::chrono::system_clock::time_point date);
void setLineageIdentifiers(const std::vector<utils::Identifier>& lineage_Identifiers) {
lineage_Identifiers_ = lineage_Identifiers;
@@ -215,7 +215,7 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
to_be_processed_after_ = std::chrono::steady_clock::now() + duration;
}
- [[nodiscard]] std::chrono::time_point<std::chrono::steady_clock> getPenaltyExpiration() const {
+ [[nodiscard]] std::chrono::steady_clock::time_point getPenaltyExpiration() const {
return to_be_processed_after_;
}
@@ -257,11 +257,11 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
// Mark for deletion
bool marked_delete_;
// Date at which the flow file entered the flow
- uint64_t entry_date_;
+ std::chrono::system_clock::time_point entry_date_{};
// event time
- uint64_t event_time_;
+ std::chrono::system_clock::time_point event_time_{};
// Date at which the origin of this flow file entered the flow
- uint64_t lineage_start_date_;
+ std::chrono::system_clock::time_point lineage_start_date_{};
// Date at which the flow file was queued
uint64_t last_queue_date_;
// Size in bytes of the data corresponding to this flow file
@@ -272,7 +272,7 @@ class FlowFile : public CoreComponent, public ReferenceContainer {
// Offset to the content
uint64_t offset_;
// Penalty expiration
- std::chrono::time_point<std::chrono::steady_clock> to_be_processed_after_;
+ std::chrono::steady_clock::time_point to_be_processed_after_;
// Attributes key/values pairs for the flow record
AttributeMap attributes_;
// Pointer to the associated content resource claim
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index 271f70c..40de4ff 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -132,9 +132,9 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
}
bool getDynamicProperty(const Property &property, std::string &value, const std::shared_ptr<FlowFile>& flow_file, const std::map<std::string, std::string>& variables) {
std::map<std::string, std::optional<std::string>> original_attributes;
- for (const auto& [variable, value] : variables) {
+ for (const auto& [variable, attr_value] : variables) {
original_attributes[variable] = flow_file->getAttribute(variable);
- flow_file->setAttribute(variable, value);
+ flow_file->setAttribute(variable, attr_value);
}
auto onExit = gsl::finally([&]{
for (const auto& attr : original_attributes) {
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index 2cd4b5e..8b595a1 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -94,12 +94,12 @@ class ProcessGroup : public CoreComponent {
bool getTransmitting() {
return transmitting_;
}
- // setTimeOut
- void setTimeOut(uint64_t time) {
- timeOut_ = time;
+ // setTimeout
+ void setTimeout(uint64_t time) {
+ timeout_ = time;
}
- uint64_t getTimeOut() {
- return timeOut_;
+ uint64_t getTimeout() {
+ return timeout_;
}
// setInterface
void setInterface(std::string &ifc) {
@@ -142,11 +142,11 @@ class ProcessGroup : public CoreComponent {
return proxy_;
}
// Set Processor yield period in MilliSecond
- void setYieldPeriodMsec(uint64_t period) {
+ void setYieldPeriodMsec(std::chrono::milliseconds period) {
yield_period_msec_ = period;
}
// Get Processor yield period in MilliSecond
- uint64_t getYieldPeriodMsec(void) {
+ std::chrono::milliseconds getYieldPeriodMsec(void) {
return (yield_period_msec_);
}
@@ -259,8 +259,8 @@ class ProcessGroup : public CoreComponent {
// Parent Process Group
ProcessGroup* parent_process_group_;
// Yield Period in Milliseconds
- std::atomic<uint64_t> yield_period_msec_;
- std::atomic<uint64_t> timeOut_;
+ std::atomic<std::chrono::milliseconds> yield_period_msec_;
+ std::atomic<uint64_t> timeout_;
std::atomic<int64_t> onschedule_retry_msec_;
// URL
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index 06c16a9..d5569db 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -57,7 +57,7 @@ namespace minifi {
namespace core {
// Minimum scheduling period in Nano Second
-#define MINIMUM_SCHEDULING_NANOS 30000
+constexpr std::chrono::nanoseconds MINIMUM_SCHEDULING_NANOS{30000};
// Default penalization period in second
@@ -95,14 +95,11 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
return loss_tolerant_;
}
// Set Processor Scheduling Period in Nano Second
- void setSchedulingPeriodNano(uint64_t period) {
- uint64_t minPeriod = MINIMUM_SCHEDULING_NANOS;
- // std::max has some variances on c++11-c++14 and then c++14 onward.
- // to avoid macro conditional checks we can use this simple conditional expr.
- scheduling_period_nano_ = period > minPeriod ? period : minPeriod;
+ void setSchedulingPeriodNano(std::chrono::nanoseconds period) {
+ scheduling_period_nano_ = std::max(MINIMUM_SCHEDULING_NANOS, period);
}
// Get Processor Scheduling Period in Nano Second
- uint64_t getSchedulingPeriodNano() const {
+ std::chrono::nanoseconds getSchedulingPeriodNano() const {
return scheduling_period_nano_;
}
@@ -123,20 +120,20 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
}
// Set Processor Run Duration in Nano Second
- void setRunDurationNano(uint64_t period) {
+ void setRunDurationNano(std::chrono::nanoseconds period) {
run_duration_nano_ = period;
}
// Get Processor Run Duration in Nano Second
- uint64_t getRunDurationNano() const {
+ std::chrono::nanoseconds getRunDurationNano() const {
return (run_duration_nano_);
}
// Set Processor yield period in MilliSecond
- void setYieldPeriodMsec(uint64_t period) {
+ void setYieldPeriodMsec(std::chrono::milliseconds period) {
yield_period_msec_ = period;
}
// Get Processor yield period in MilliSecond
- uint64_t getYieldPeriodMsec() const {
- return (yield_period_msec_);
+ std::chrono::milliseconds getYieldPeriodMsec() const {
+ return yield_period_msec_;
}
void setPenalizationPeriod(std::chrono::milliseconds period) {
@@ -176,33 +173,15 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
void clearActiveTask() {
active_tasks_ = 0;
}
- // Yield based on the yield period
- void yield() override {
- yield_expiration_ = (utils::timeutils::getTimeMillis() + yield_period_msec_);
- }
- // Yield based on the input time
- void yield(uint64_t time) {
- yield_expiration_ = (utils::timeutils::getTimeMillis() + time);
- }
- // whether need be to yield
- virtual bool isYield() {
- if (yield_expiration_ > 0)
- return (yield_expiration_ >= utils::timeutils::getTimeMillis());
- else
- return false;
- }
- // clear yield expiration
- void clearYield() {
- yield_expiration_ = 0;
- }
- // get yield time
- uint64_t getYieldTime() const {
- uint64_t curTime = utils::timeutils::getTimeMillis();
- if (yield_expiration_ > curTime)
- return (yield_expiration_ - curTime);
- else
- return 0;
- }
+ void yield() override;
+
+ void yield(std::chrono::milliseconds delta_time);
+
+ virtual bool isYield();
+
+ void clearYield();
+
+ std::chrono::milliseconds getYieldTime() const;
// Whether flow file queue full in any of the outgoing connection
bool flowFilesOutGoingFull() const;
@@ -269,11 +248,11 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
// lossTolerant
std::atomic<bool> loss_tolerant_;
// SchedulePeriod in Nano Seconds
- std::atomic<uint64_t> scheduling_period_nano_;
+ std::atomic<std::chrono::nanoseconds> scheduling_period_nano_;
// Run Duration in Nano Seconds
- std::atomic<uint64_t> run_duration_nano_;
+ std::atomic<std::chrono::nanoseconds> run_duration_nano_;
// Yield Period in Milliseconds
- std::atomic<uint64_t> yield_period_msec_;
+ std::atomic<std::chrono::milliseconds> yield_period_msec_;
// Active Tasks
std::atomic<uint8_t> active_tasks_;
@@ -286,7 +265,7 @@ class Processor : public Connectable, public ConfigurableComponent, public std::
// Mutex for protection
mutable std::mutex mutex_;
// Yield Expiration
- std::atomic<uint64_t> yield_expiration_;
+ std::atomic<std::chrono::time_point<std::chrono::system_clock>> yield_expiration_{};
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h
index 927f9f3..fee9bd7 100644
--- a/libminifi/include/core/ProcessorConfig.h
+++ b/libminifi/include/core/ProcessorConfig.h
@@ -32,10 +32,10 @@ namespace core {
#define DEFAULT_SCHEDULING_STRATEGY "TIMER_DRIVEN"
#define DEFAULT_SCHEDULING_PERIOD_STR "1 sec"
-#define DEFAULT_SCHEDULING_PERIOD_MILLIS 1000
-#define DEFAULT_RUN_DURATION 0
+constexpr std::chrono::milliseconds DEFAULT_SCHEDULING_PERIOD_MILLIS{1000};
+constexpr std::chrono::nanoseconds DEFAULT_RUN_DURATION{0};
#define DEFAULT_MAX_CONCURRENT_TASKS 1
-#define DEFAULT_YIELD_PERIOD_SECONDS 1
+constexpr std::chrono::seconds DEFAULT_YIELD_PERIOD_SECONDS{1};
constexpr std::chrono::seconds DEFAULT_PENALIZATION_PERIOD{30};
struct ProcessorConfig {
diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h
index db1a536..1f6c27a 100644
--- a/libminifi/include/core/Property.h
+++ b/libminifi/include/core/Property.h
@@ -180,87 +180,6 @@ class Property {
// Compare
bool operator <(const Property & right) const;
- template<typename T>
- static bool ConvertTimeUnitToMS(int64_t input, TimeUnit unit, T &out) {
- if (unit == NANOSECOND) {
- out = input / 1000 / 1000;
- return true;
- } else if (unit == MICROSECOND) {
- out = input / 1000;
- return true;
- } else if (unit == MILLISECOND) {
- out = input;
- return true;
- } else if (unit == SECOND) {
- out = input * 1000;
- return true;
- } else if (unit == MINUTE) {
- out = input * 60 * 1000;
- return true;
- } else if (unit == HOUR) {
- out = input * 60 * 60 * 1000;
- return true;
- } else if (unit == DAY) {
- out = 24 * 60 * 60 * 1000;
- return true;
- } else {
- return false;
- }
- }
-
- static bool ConvertTimeUnitToMS(int64_t input, TimeUnit unit, int64_t &out) {
- return ConvertTimeUnitToMS<int64_t>(input, unit, out);
- }
-
- static bool ConvertTimeUnitToMS(int64_t input, TimeUnit unit, uint64_t &out) {
- return ConvertTimeUnitToMS<uint64_t>(input, unit, out);
- }
-
- template<typename T>
- static bool ConvertTimeUnitToNS(int64_t input, TimeUnit unit, T &out) {
- if (unit == NANOSECOND) {
- out = input;
- return true;
- } else if (unit == MICROSECOND) {
- out = input * 1000;
- return true;
- } else if (unit == MILLISECOND) {
- out = input * 1000 * 1000;
- return true;
- } else if (unit == SECOND) {
- out = input * 1000 * 1000 * 1000;
- return true;
- } else if (unit == MINUTE) {
- out = input * 60 * 1000 * 1000 * 1000;
- return true;
- } else if (unit == HOUR) {
- out = input * 60 * 60 * 1000 * 1000 * 1000;
- return true;
- } else if (unit == DAY) {
- out = input * 24 * 60 * 60 * 1000 * 1000 * 1000;
- return true;
- } else {
- return false;
- }
- }
-
- static bool ConvertTimeUnitToNS(int64_t input, TimeUnit unit, uint64_t &out) {
- return ConvertTimeUnitToNS<uint64_t>(input, unit, out);
- }
-
- static bool ConvertTimeUnitToNS(int64_t input, TimeUnit unit, int64_t &out) {
- return ConvertTimeUnitToNS<int64_t>(input, unit, out);
- }
-
-// Convert String
- static bool StringToTime(std::string input, uint64_t &output, TimeUnit &timeunit) {
- return utils::internal::StringToTime(input, output, timeunit);
- }
-
-// Convert String
- static bool StringToTime(const std::string& input, int64_t &output, TimeUnit &timeunit) {
- return utils::internal::StringToTime(input, output, timeunit);
- }
static bool StringToDateTime(const std::string& input, int64_t& output) {
int64_t temp = utils::timeutils::parseDateTimeStr(input);
@@ -311,18 +230,6 @@ class Property {
return true;
}
- static bool getTimeMSFromString(const std::string& str, uint64_t& valInt) {
- core::TimeUnit unit;
- return !str.empty() && StringToTime(str, valInt, unit)
- && ConvertTimeUnitToMS(valInt, unit, valInt);
- }
-
- static bool getTimeMSFromString(const std::string& str, int64_t& valInt) {
- core::TimeUnit unit;
- return !str.empty() && StringToTime(str, valInt, unit)
- && ConvertTimeUnitToMS(valInt, unit, valInt);
- }
-
// Convert String to Integer
template<typename T>
static bool StringToInt(std::string input, T &output) {
diff --git a/libminifi/include/core/PropertyValidation.h b/libminifi/include/core/PropertyValidation.h
index ed691ce..68b9eb6 100644
--- a/libminifi/include/core/PropertyValidation.h
+++ b/libminifi/include/core/PropertyValidation.h
@@ -321,9 +321,8 @@ class TimePeriodValidator : public PropertyValidator {
}
ValidationResult validate(const std::string &subject, const std::string &input) const override {
- uint64_t out;
- TimeUnit outTimeUnit;
- return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(core::TimePeriodValue::StringToTime(input, out, outTimeUnit)).build();
+ auto parsed_time = utils::timeutils::StringToDuration<std::chrono::milliseconds>(input);
+ return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(parsed_time.has_value()).build();
}
};
diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h
index dc6bbff..cf51bda 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -1,5 +1,5 @@
/**
- * @file Repository
+ * @file Repository
* Repository class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -56,17 +56,19 @@ namespace core {
#define REPOSITORY_DIRECTORY "./repo"
#define MAX_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M
-#define MAX_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute
-#define REPOSITORY_PURGE_PERIOD (2500) // 2500 msec
+constexpr auto MAX_REPOSITORY_ENTRY_LIFE_TIME = std::chrono::minutes(10);
+constexpr auto REPOSITORY_PURGE_PERIOD = std::chrono::milliseconds(2500);
class Repository : public virtual core::SerializableComponent, public core::TraceableResource {
public:
/*
* Constructor for the repository
*/
- Repository(std::string repo_name = "Repository", std::string directory = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
- MAX_REPOSITORY_STORAGE_SIZE,
- uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD)
+ Repository(std::string repo_name = "Repository",
+ std::string directory = REPOSITORY_DIRECTORY,
+ std::chrono::milliseconds maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME,
+ int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE,
+ std::chrono::milliseconds purgePeriod = REPOSITORY_PURGE_PERIOD)
: core::SerializableComponent(repo_name),
thread_(),
repo_size_(0),
@@ -247,11 +249,11 @@ class Repository : public virtual core::SerializableComponent, public core::Trac
// repository directory
std::string directory_;
// max db entry life time
- int64_t max_partition_millis_;
+ std::chrono::milliseconds max_partition_millis_;
// max db size
int64_t max_partition_bytes_;
// purge period
- uint64_t purge_period_;
+ std::chrono::milliseconds purge_period_;
// thread
std::thread thread_;
// whether the monitoring thread is running for the repo while it was enabled
diff --git a/libminifi/include/core/TypedValues.h b/libminifi/include/core/TypedValues.h
index e25802e..a309a44 100644
--- a/libminifi/include/core/TypedValues.h
+++ b/libminifi/include/core/TypedValues.h
@@ -31,6 +31,7 @@
#include "utils/PropertyErrors.h"
#include "utils/Literals.h"
#include "utils/Export.h"
+#include "utils/TimeUtil.h"
namespace org {
namespace apache {
@@ -55,22 +56,24 @@ class TimePeriodValue : public TransformableValue, public state::response::UInt6
explicit TimePeriodValue(const std::string &timeString)
: state::response::UInt64Value(0) {
- TimeUnit units{};
- if (!StringToTime(timeString, value, units)) {
+ auto parsed_time = utils::timeutils::StringToDuration<std::chrono::milliseconds>(timeString);
+ if (!parsed_time) {
throw utils::internal::ParseException("Couldn't parse TimePeriodValue");
}
string_value = timeString;
- if (!ConvertTimeUnitToMS<uint64_t>(value, units, value)) {
- throw utils::internal::ConversionException("Couldn't convert TimePeriodValue to milliseconds");
- }
+ value = parsed_time->count();
}
explicit TimePeriodValue(uint64_t value)
: state::response::UInt64Value(value) {
}
- uint64_t getMilliseconds() const {
- return getValue();
+ TimePeriodValue()
+ : state::response::UInt64Value(0) {
+ }
+
+ std::chrono::milliseconds getMilliseconds() const {
+ return std::chrono::milliseconds(getValue());
}
static std::optional<TimePeriodValue> fromString(const std::string& str) {
@@ -80,36 +83,6 @@ class TimePeriodValue : public TransformableValue, public state::response::UInt6
return std::nullopt;
}
}
-
- // Convert TimeUnit to MilliSecond
- template<typename T>
- static bool ConvertTimeUnitToMS(T input, TimeUnit unit, T &out) {
- if (unit == MILLISECOND) {
- out = input;
- return true;
- } else if (unit == SECOND) {
- out = input * 1000;
- return true;
- } else if (unit == MINUTE) {
- out = input * 60 * 1000;
- return true;
- } else if (unit == HOUR) {
- out = input * 60 * 60 * 1000;
- return true;
- } else if (unit == DAY) {
- out = 24 * 60 * 60 * 1000;
- return true;
- } else if (unit == NANOSECOND) {
- out = input / 1000 / 1000;
- return true;
- } else {
- return false;
- }
- }
-
- static bool StringToTime(std::string input, uint64_t &output, TimeUnit &timeunit) {
- return utils::internal::StringToTime(input, output, timeunit);
- }
};
/**
@@ -133,6 +106,10 @@ class DataSizeValue : public TransformableValue, public state::response::UInt64V
: state::response::UInt64Value(value) {
}
+ DataSizeValue()
+ : state::response::UInt64Value(0) {
+ }
+
// Convert String to Integer
template<typename T, typename std::enable_if<
diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h b/libminifi/include/core/repository/VolatileFlowFileRepository.h
index 73df967..5e99ac7 100644
--- a/libminifi/include/core/repository/VolatileFlowFileRepository.h
+++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h
@@ -40,9 +40,11 @@ class VolatileFlowFileRepository : public VolatileRepository<std::string>, publi
using utils::EnableSharedFromThis<VolatileFlowFileRepository>::sharedFromThis;
public:
- explicit VolatileFlowFileRepository(std::string repo_name = "", std::string /*dir*/ = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
- MAX_REPOSITORY_STORAGE_SIZE,
- uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD)
+ explicit VolatileFlowFileRepository(std::string repo_name = "",
+ std::string /*dir*/ = REPOSITORY_DIRECTORY,
+ std::chrono::milliseconds maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME,
+ int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE,
+ std::chrono::milliseconds purgePeriod = REPOSITORY_PURGE_PERIOD)
: core::SerializableComponent(repo_name),
VolatileRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod) {
purge_required_ = true;
@@ -52,7 +54,7 @@ class VolatileFlowFileRepository : public VolatileRepository<std::string>, publi
virtual void run() {
repo_full_ = false;
while (running_) {
- std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
+ std::this_thread::sleep_for(purge_period_);
flush();
}
flush();
diff --git a/libminifi/include/core/repository/VolatileProvenanceRepository.h b/libminifi/include/core/repository/VolatileProvenanceRepository.h
index 78f24ad..14ab1c0 100644
--- a/libminifi/include/core/repository/VolatileProvenanceRepository.h
+++ b/libminifi/include/core/repository/VolatileProvenanceRepository.h
@@ -34,9 +34,11 @@ namespace repository {
*/
class VolatileProvenanceRepository : public VolatileRepository<std::string> {
public:
- explicit VolatileProvenanceRepository(std::string repo_name = "", std::string /*dir*/ = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
- MAX_REPOSITORY_STORAGE_SIZE,
- uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD)
+ explicit VolatileProvenanceRepository(std::string repo_name = "",
+ std::string /*dir*/ = REPOSITORY_DIRECTORY,
+ std::chrono::milliseconds maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME,
+ int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE,
+ std::chrono::milliseconds purgePeriod = REPOSITORY_PURGE_PERIOD)
: core::SerializableComponent(repo_name), VolatileRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod) { // NOLINT
purge_required_ = false;
}
diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h
index 9190d0a..0f28508 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -59,9 +59,11 @@ class VolatileRepository : public core::Repository, public utils::EnableSharedFr
static const char *volatile_repo_max_bytes;
// Constructor
- explicit VolatileRepository(std::string repo_name = "", std::string /*dir*/ = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
- MAX_REPOSITORY_STORAGE_SIZE,
- uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD)
+ explicit VolatileRepository(std::string repo_name = "",
+ std::string /*dir*/ = REPOSITORY_DIRECTORY,
+ std::chrono::milliseconds maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME,
+ int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE,
+ std::chrono::milliseconds purgePeriod = REPOSITORY_PURGE_PERIOD)
: core::SerializableComponent(repo_name),
Repository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod),
current_size_(0),
@@ -90,7 +92,7 @@ class VolatileRepository : public core::Repository, public utils::EnableSharedFr
/**
* Places a new object into the volatile memory area
* @param key key to add to the repository
- * @param buf buffer
+ * @param buf buffer
**/
virtual bool Put(T key, const uint8_t *buf, size_t bufLen);
@@ -401,7 +403,7 @@ void VolatileRepository<T>::setConnectionMap(std::map<std::string, std::shared_p
template<typename T>
void VolatileRepository<T>::start() {
- if (this->purge_period_ <= 0)
+ if (this->purge_period_ <= std::chrono::milliseconds(0))
return;
if (running_)
return;
diff --git a/libminifi/include/core/state/nodes/SchedulingNodes.h b/libminifi/include/core/state/nodes/SchedulingNodes.h
index bf81426..45dfdbe 100644
--- a/libminifi/include/core/state/nodes/SchedulingNodes.h
+++ b/libminifi/include/core/state/nodes/SchedulingNodes.h
@@ -61,13 +61,13 @@ class SchedulingDefaults : public DeviceInformation {
SerializedResponseNode defaultSchedulingPeriod;
defaultSchedulingPeriod.name = "defaultSchedulingPeriodMillis";
- defaultSchedulingPeriod.value = DEFAULT_SCHEDULING_PERIOD_MILLIS;
+ defaultSchedulingPeriod.value = core::DEFAULT_SCHEDULING_PERIOD_MILLIS.count();
schedulingDefaults.children.push_back(defaultSchedulingPeriod);
SerializedResponseNode defaultRunDuration;
defaultRunDuration.name = "defaultRunDurationNanos";
- defaultRunDuration.value = DEFAULT_RUN_DURATION;
+ defaultRunDuration.value = core::DEFAULT_RUN_DURATION.count();
schedulingDefaults.children.push_back(defaultRunDuration);
@@ -79,7 +79,7 @@ class SchedulingDefaults : public DeviceInformation {
SerializedResponseNode yieldDuration;
yieldDuration.name = "yieldDurationMillis";
- yieldDuration.value = DEFAULT_YIELD_PERIOD_SECONDS*1000;
+ yieldDuration.value = std::chrono::milliseconds(core::DEFAULT_YIELD_PERIOD_SECONDS).count();
schedulingDefaults.children.push_back(yieldDuration);
diff --git a/libminifi/include/core/yaml/YamlConnectionParser.h b/libminifi/include/core/yaml/YamlConnectionParser.h
index 1633f6b..831bca2 100644
--- a/libminifi/include/core/yaml/YamlConnectionParser.h
+++ b/libminifi/include/core/yaml/YamlConnectionParser.h
@@ -49,7 +49,7 @@ class YamlConnectionParser {
[[nodiscard]] uint64_t getWorkQueueDataSizeFromYaml() const;
[[nodiscard]] utils::Identifier getSourceUUIDFromYaml() const;
[[nodiscard]] utils::Identifier getDestinationUUIDFromYaml() const;
- [[nodiscard]] uint64_t getFlowFileExpirationFromYaml() const;
+ [[nodiscard]] std::chrono::milliseconds getFlowFileExpirationFromYaml() const;
[[nodiscard]] bool getDropEmptyFromYaml() const;
private:
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index 2c4a597..0202986 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -166,7 +166,7 @@ class ProvenanceEventRecord : public core::SerializableComponent {
ProvenanceEventRecord()
: core::SerializableComponent(core::getClassName<ProvenanceEventRecord>()) {
- _eventTime = utils::timeutils::getTimeMillis();
+ _eventTime = std::chrono::system_clock::now();
}
// Destructor
@@ -192,23 +192,23 @@ class ProvenanceEventRecord : public core::SerializableComponent {
return _offset;
}
// ! Get Entry Date
- uint64_t getFlowFileEntryDate() {
+ std::chrono::system_clock::time_point getFlowFileEntryDate() {
return _entryDate;
}
// ! Get Lineage Start Date
- uint64_t getlineageStartDate() {
+ std::chrono::system_clock::time_point getlineageStartDate() {
return _lineageStartDate;
}
// ! Get Event Time
- uint64_t getEventTime() {
+ std::chrono::system_clock::time_point getEventTime() {
return _eventTime;
}
// ! Get Event Duration
- uint64_t getEventDuration() {
+ std::chrono::milliseconds getEventDuration() {
return _eventDuration;
}
// Set Event Duration
- void setEventDuration(uint64_t duration) {
+ void setEventDuration(std::chrono::milliseconds duration) {
_eventDuration = duration;
}
// ! Get Event Type
@@ -390,13 +390,13 @@ class ProvenanceEventRecord : public core::SerializableComponent {
// Event type
ProvenanceEventType _eventType;
// Date at which the event was created
- uint64_t _eventTime;
+ std::chrono::system_clock::time_point _eventTime{};
// Date at which the flow file entered the flow
- uint64_t _entryDate;
+ std::chrono::system_clock::time_point _entryDate{};
// Date at which the origin of this flow file entered the flow
- uint64_t _lineageStartDate;
+ std::chrono::system_clock::time_point _lineageStartDate{};
// Event Duration
- uint64_t _eventDuration;
+ std::chrono::milliseconds _eventDuration{};
// Component ID
std::string _componentId;
// Component Type
@@ -481,27 +481,27 @@ class ProvenanceReporter {
// create
void create(std::shared_ptr<core::FlowFile> flow, std::string detail);
// route
- void route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation, std::string detail, uint64_t processingDuration);
+ void route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation, std::string detail, std::chrono::milliseconds processingDuration);
// modifyAttributes
void modifyAttributes(std::shared_ptr<core::FlowFile> flow, std::string detail);
// modifyContent
- void modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail, uint64_t processingDuration);
+ void modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail, std::chrono::milliseconds processingDuration);
// clone
void clone(std::shared_ptr<core::FlowFile> parent, std::shared_ptr<core::FlowFile> child);
// join
- void join(std::vector<std::shared_ptr<core::FlowFile> > parents, std::shared_ptr<core::FlowFile> child, std::string detail, uint64_t processingDuration);
+ void join(std::vector<std::shared_ptr<core::FlowFile> > parents, std::shared_ptr<core::FlowFile> child, std::string detail, std::chrono::milliseconds processingDuration);
// fork
- void fork(std::vector<std::shared_ptr<core::FlowFile> > child, std::shared_ptr<core::FlowFile> parent, std::string detail, uint64_t processingDuration);
+ void fork(std::vector<std::shared_ptr<core::FlowFile> > child, std::shared_ptr<core::FlowFile> parent, std::string detail, std::chrono::milliseconds processingDuration);
// expire
void expire(std::shared_ptr<core::FlowFile> flow, std::string detail);
// drop
void drop(std::shared_ptr<core::FlowFile> flow, std::string reason);
// send
- void send(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, uint64_t processingDuration, bool force);
+ void send(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, std::chrono::milliseconds processingDuration, bool force);
// fetch
- void fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, uint64_t processingDuration);
+ void fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, std::chrono::milliseconds processingDuration);
// receive
- void receive(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string sourceSystemFlowFileIdentifier, std::string detail, uint64_t processingDuration);
+ void receive(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string sourceSystemFlowFileIdentifier, std::string detail, std::chrono::milliseconds processingDuration);
protected:
// allocate
diff --git a/libminifi/include/sitetosite/Peer.h b/libminifi/include/sitetosite/Peer.h
index 195ab81..dfa6a75 100644
--- a/libminifi/include/sitetosite/Peer.h
+++ b/libminifi/include/sitetosite/Peer.h
@@ -146,7 +146,10 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
/*
* Create a new site2site peer
*/
- explicit SiteToSitePeer(std::unique_ptr<org::apache::nifi::minifi::io::BaseStream> injected_socket, const std::string host, uint16_t port, const std::string &ifc)
+ explicit SiteToSitePeer(std::unique_ptr<org::apache::nifi::minifi::io::BaseStream> injected_socket,
+ const std::string host,
+ uint16_t port,
+ const std::string &ifc)
: SiteToSitePeer(host, port, ifc) {
stream_ = std::move(injected_socket);
}
@@ -155,12 +158,11 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
: stream_(nullptr),
host_(host),
port_(port),
- timeout_(30000),
- yield_expiration_(0),
+ timeout_(std::chrono::seconds(30)),
logger_(core::logging::LoggerFactory<SiteToSitePeer>::getLogger()) {
url_ = "nifi://" + host_ + ":" + std::to_string(port_);
- yield_expiration_ = 0;
- timeout_ = 30000; // 30 seconds
+ yield_expiration_ = std::chrono::system_clock::time_point();
+ timeout_ = std::chrono::seconds(30);
local_network_interface_ = io::NetworkInterface(ifc, nullptr);
}
@@ -180,7 +182,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
Close();
}
// Set Processor yield period in MilliSecond
- void setYieldPeriodMsec(uint64_t period) {
+ void setYieldPeriodMsec(std::chrono::milliseconds period) {
yield_period_msec_ = period;
}
// get URL
@@ -195,12 +197,12 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
return local_network_interface_.getInterface();
}
// Get Processor yield period in MilliSecond
- uint64_t getYieldPeriodMsec(void) {
+ std::chrono::milliseconds getYieldPeriodMsec(void) {
return (yield_period_msec_);
}
// Yield based on the yield period
void yield() {
- yield_expiration_ = (utils::timeutils::getTimeMillis() + yield_period_msec_);
+ yield_expiration_ = std::chrono::system_clock::now() + yield_period_msec_.load();
}
// setHostName
void setHostName(std::string host_) {
@@ -221,39 +223,33 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
return port_;
}
// Yield based on the input time
- void yield(uint64_t time) {
- yield_expiration_ = (utils::timeutils::getTimeMillis() + time);
+ void yield(std::chrono::milliseconds time) {
+ yield_expiration_ = (std::chrono::system_clock::now() + time);
}
// whether need be to yield
bool isYield() {
- if (yield_expiration_ > 0)
- return (yield_expiration_ >= utils::timeutils::getTimeMillis());
- else
- return false;
+ return yield_expiration_.load() >= std::chrono::system_clock::now();
}
// clear yield expiration
void clearYield() {
- yield_expiration_ = 0;
+ yield_expiration_ = std::chrono::system_clock::time_point();
}
// Yield based on the yield period
void yield(std::string portId) {
std::lock_guard<std::mutex> lock(mutex_);
- uint64_t yieldExpiration = (utils::timeutils::getTimeMillis() + yield_period_msec_);
- yield_expiration_PortIdMap[portId] = yieldExpiration;
+ yield_expiration_PortIdMap[portId] = std::chrono::system_clock::now() + yield_period_msec_.load();
}
// Yield based on the input time
- void yield(std::string portId, uint64_t time) {
- std::lock_guard<std::mutex> lock(mutex_);
- uint64_t yieldExpiration = (utils::timeutils::getTimeMillis() + time);
- yield_expiration_PortIdMap[portId] = yieldExpiration;
+ void yield(std::string portId, std::chrono::milliseconds time) {
+ yield_expiration_PortIdMap[portId] = std::chrono::system_clock::now() + time;
}
// whether need be to yield
bool isYield(std::string portId) {
std::lock_guard<std::mutex> lock(mutex_);
- std::map<std::string, uint64_t>::iterator it = this->yield_expiration_PortIdMap.find(portId);
+ auto it = this->yield_expiration_PortIdMap.find(portId);
if (it != yield_expiration_PortIdMap.end()) {
- uint64_t yieldExpiration = it->second;
- return (yieldExpiration >= utils::timeutils::getTimeMillis());
+ auto yieldExpiration = it->second;
+ return (yieldExpiration >= std::chrono::system_clock::now());
} else {
return false;
}
@@ -261,17 +257,17 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
// clear yield expiration
void clearYield(std::string portId) {
std::lock_guard<std::mutex> lock(mutex_);
- std::map<std::string, uint64_t>::iterator it = this->yield_expiration_PortIdMap.find(portId);
+ auto it = this->yield_expiration_PortIdMap.find(portId);
if (it != yield_expiration_PortIdMap.end()) {
yield_expiration_PortIdMap.erase(portId);
}
}
- // setTimeOut
- void setTimeOut(uint64_t time) {
+ // setTimeout
+ void setTimeout(std::chrono::milliseconds time) {
timeout_ = time;
}
- // getTimeOut
- uint64_t getTimeOut() {
+ // getTimeout
+ std::chrono::milliseconds getTimeout() {
return timeout_;
}
void setHTTPProxy(const utils::HTTPProxy &proxy) {
@@ -318,8 +314,8 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
host_ = std::move(other.host_);
port_ = std::move(other.port_);
local_network_interface_ = std::move(other.local_network_interface_);
- yield_expiration_ = 0;
- timeout_ = 30000; // 30 seconds
+ yield_expiration_ = std::chrono::system_clock::time_point();
+ timeout_ = std::chrono::seconds(30);
url_ = "nifi://" + host_ + ":" + std::to_string(port_);
return *this;
@@ -344,13 +340,13 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
// URL
std::string url_;
// socket timeout;
- std::atomic<uint64_t> timeout_;
+ std::atomic<std::chrono::milliseconds> timeout_{};
// Yield Period in Milliseconds
- std::atomic<uint64_t> yield_period_msec_;
+ std::atomic<std::chrono::milliseconds> yield_period_msec_;
// Yield Expiration
- std::atomic<uint64_t> yield_expiration_;
+ std::atomic<std::chrono::time_point<std::chrono::system_clock>> yield_expiration_{};
// Yield Expiration per destination PortID
- std::map<std::string, uint64_t> yield_expiration_PortIdMap;
+ std::map<std::string, std::chrono::time_point<std::chrono::system_clock>> yield_expiration_PortIdMap;
// Logger
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<SiteToSitePeer>::getLogger();
};
diff --git a/libminifi/include/sitetosite/RawSocketProtocol.h b/libminifi/include/sitetosite/RawSocketProtocol.h
index f588aca..89f2b76 100644
--- a/libminifi/include/sitetosite/RawSocketProtocol.h
+++ b/libminifi/include/sitetosite/RawSocketProtocol.h
@@ -75,9 +75,9 @@ class RawSiteToSiteClient : public sitetosite::SiteToSiteClient {
peer_ = std::move(peer);
_batchSize = 0;
_batchCount = 0;
- _batchDuration = 0;
- _batchSendNanos = 5000000000; // 5 seconds
- _timeOut = 30000; // 30 seconds
+ _batchDuration = std::chrono::seconds(0);
+ _batchSendNanos = std::chrono::seconds(5);
+ _timeout = std::chrono::seconds(30);
_supportedVersion[0] = 5;
_supportedVersion[1] = 4;
_supportedVersion[2] = 3;
@@ -104,22 +104,22 @@ class RawSiteToSiteClient : public sitetosite::SiteToSiteClient {
_batchCount = count;
}
// setBatchDuration
- void setBatchDuration(uint64_t duration) {
+ void setBatchDuration(std::chrono::milliseconds duration) {
_batchDuration = duration;
}
- // setTimeOut
- void setTimeOut(uint64_t time) {
- _timeOut = time;
+ // setTimeout
+ void setTimeout(std::chrono::milliseconds time) {
+ _timeout = time;
if (peer_)
- peer_->setTimeOut(time);
+ peer_->setTimeout(time);
}
/**
* Provides a reference to the time out
* @returns timeout
*/
- uint64_t getTimeOut() const {
- return _timeOut;
+ std::chrono::milliseconds getTimeout() const {
+ return _timeout;
}
// getResourceName
@@ -180,9 +180,9 @@ class RawSiteToSiteClient : public sitetosite::SiteToSiteClient {
// Batch Size
std::atomic<uint64_t> _batchSize;
// Batch Duration in msec
- std::atomic<uint64_t> _batchDuration;
+ std::atomic<std::chrono::milliseconds> _batchDuration;
// Timeout in msec
- std::atomic<uint64_t> _timeOut;
+ std::atomic<std::chrono::milliseconds> _timeout;
// commsIdentifier
utils::Identifier _commsIdentifier;
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index 72db758..61ab94f 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -247,7 +247,7 @@ class SiteToSiteClient : public core::Connectable {
std::map<utils::Identifier, std::shared_ptr<Transaction>> known_transactions_;
// BATCH_SEND_NANOS
- uint64_t _batchSendNanos{5000000000};
+ std::chrono::nanoseconds _batchSendNanos = std::chrono::seconds(5);
/***
* versioning
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 7976097..39f5925 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -584,19 +584,6 @@ class StringUtils {
};
} // namespace utils
-
-namespace core {
-enum TimeUnit {
- DAY,
- HOUR,
- MINUTE,
- SECOND,
- MILLISECOND,
- MICROSECOND,
- NANOSECOND
-};
-
-} // namespace core
} // namespace org::apache::nifi::minifi
#endif // LIBMINIFI_INCLUDE_UTILS_STRINGUTILS_H_
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index aac1c31..744a39b 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -106,7 +106,7 @@ class Worker {
identifier_ = identifier;
}
- virtual std::chrono::time_point<std::chrono::steady_clock> getNextExecutionTime() const {
+ virtual std::chrono::steady_clock::time_point getNextExecutionTime() const {
return next_exec_time_;
}
@@ -123,7 +123,7 @@ class Worker {
protected:
TaskId identifier_;
- std::chrono::time_point<std::chrono::steady_clock> next_exec_time_;
+ std::chrono::steady_clock::time_point next_exec_time_;
std::function<T()> task;
std::unique_ptr<AfterExecute<T>> run_determinant_;
std::shared_ptr<std::promise<T>> promise;
diff --git a/libminifi/include/utils/TimeUtil.h b/libminifi/include/utils/TimeUtil.h
index 44d3af9..0f83e19 100644
--- a/libminifi/include/utils/TimeUtil.h
+++ b/libminifi/include/utils/TimeUtil.h
@@ -27,6 +27,9 @@
#include <limits>
#include <sstream>
#include <string>
+#include <optional>
+#include <functional>
+#include <algorithm>
#define TIME_FORMAT "%Y-%m-%d %H:%M:%S"
@@ -38,18 +41,11 @@ namespace utils {
namespace timeutils {
/**
- * Gets the current time in milliseconds
- * @returns milliseconds since epoch
- */
-inline uint64_t getTimeMillis() {
- return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
-}
-
-/**
* Gets the current time in nanoseconds
* @returns nanoseconds since epoch
*/
inline uint64_t getTimeNano() {
+ // The precision is platform dependent (1 ns on libstdc++, 0.1 us on msvc and 1 us on libc++)
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}
@@ -197,6 +193,106 @@ inline bool getDateTimeStr(int64_t unix_timestamp, std::string& date_time_str) {
return true;
}
+namespace details {
+
+template<class Duration>
+bool unit_matches(const std::string&) {
+ return false;
+}
+
+template<>
+inline bool unit_matches<std::chrono::nanoseconds>(const std::string& unit) {
+ return unit == "ns" || unit == "nano" || unit == "nanos" || unit == "nanoseconds" || unit == "nanosecond";
+}
+
+template<>
+inline bool unit_matches<std::chrono::microseconds>(const std::string& unit) {
+ return unit == "us" || unit == "micro" || unit == "micros" || unit == "microseconds" || unit == "microsecond";
+}
+
+template<>
+inline bool unit_matches<std::chrono::milliseconds>(const std::string& unit) {
+ return unit == "msec" || unit == "ms" || unit == "millisecond" || unit == "milliseconds" || unit == "msecs" || unit == "millis" || unit == "milli";
+}
+
+template<>
+inline bool unit_matches<std::chrono::seconds>(const std::string& unit) {
+ return unit == "sec" || unit == "s" || unit == "second" || unit == "seconds" || unit == "secs";
+}
+
+template<>
+inline bool unit_matches<std::chrono::minutes>(const std::string& unit) {
+ return unit == "min" || unit == "m" || unit == "mins" || unit == "minute" || unit == "minutes";
+}
+
+template<>
+inline bool unit_matches<std::chrono::hours>(const std::string& unit) {
+ return unit == "h" || unit == "hr" || unit == "hour" || unit == "hrs" || unit == "hours";
+}
+
+template<>
+inline bool unit_matches<std::chrono::days>(const std::string& unit) {
+ return unit == "d" || unit == "day" || unit == "days";
+}
+
+
+template<class TargetDuration, class SourceDuration>
+std::optional<TargetDuration> cast_if_unit_matches(const std::string& unit, const int64_t value) {
+ if (unit_matches<SourceDuration>(unit)) {
+ return std::chrono::duration_cast<TargetDuration>(SourceDuration(value));
+ } else {
+ return std::nullopt;
+ }
+}
+
+template<class TargetDuration, typename... T>
+std::optional<TargetDuration> cast_to_matching_unit(std::string& unit, const int64_t value) {
+ std::optional<TargetDuration> result;
+ ((result = cast_if_unit_matches<TargetDuration, T>(unit, value)) || ...);
+ return result;
+}
+
+inline bool get_unit_and_value(const std::string& input, std::string& unit, int64_t& value) {
+ const char* begin = input.c_str();
+ char *end;
+ errno = 0;
+ value = std::strtoll(begin, &end, 0);
+ if (end == begin || errno == ERANGE) {
+ return false;
+ }
+
+ if (end[0] == '\0') {
+ return false;
+ }
+
+ while (*end == ' ') {
+ // Skip the spaces
+ end++;
+ }
+ unit = std::string(end);
+ std::transform(unit.begin(), unit.end(), unit.begin(), ::tolower);
+ return true;
+}
+
+} // namespace details
+
+template<class TargetDuration>
+std::optional<TargetDuration> StringToDuration(const std::string& input) {
+ std::string unit;
+ int64_t value;
+ if (!details::get_unit_and_value(input, unit, value))
+ return std::nullopt;
+
+ return details::cast_to_matching_unit<TargetDuration,
+ std::chrono::nanoseconds,
+ std::chrono::microseconds,
+ std::chrono::milliseconds,
+ std::chrono::seconds,
+ std::chrono::minutes,
+ std::chrono::hours,
+ std::chrono::days>(unit, value);
+}
+
} /* namespace timeutils */
} /* namespace utils */
} /* namespace minifi */
@@ -205,7 +301,6 @@ inline bool getDateTimeStr(int64_t unix_timestamp, std::string& date_time_str) {
} /* namespace org */
// for backwards compatibility, to be removed after 0.8
-using org::apache::nifi::minifi::utils::timeutils::getTimeMillis;
using org::apache::nifi::minifi::utils::timeutils::getTimeNano;
using org::apache::nifi::minifi::utils::timeutils::getTimeStr;
using org::apache::nifi::minifi::utils::timeutils::parseDateTimeStr;
diff --git a/libminifi/include/utils/ValueParser.h b/libminifi/include/utils/ValueParser.h
index 8910096..5d6cec9 100644
--- a/libminifi/include/utils/ValueParser.h
+++ b/libminifi/include/utils/ValueParser.h
@@ -191,65 +191,6 @@ class ValueParser {
const std::string& str;
std::size_t offset;
};
-
-template<typename Out>
-bool StringToTime(const std::string& input, Out& output, core::TimeUnit& timeunit) {
- if (input.size() == 0) {
- return false;
- }
-
- const char* begin = input.c_str();
- char *end;
- errno = 0;
- auto ival = std::strtoll(begin, &end, 0);
- if (end == begin || errno == ERANGE) {
- return false;
- }
-
- if (end[0] == '\0') {
- return false;
- }
-
- while (*end == ' ') {
- // Skip the space
- end++;
- }
-
- std::string unit(end);
- std::transform(unit.begin(), unit.end(), unit.begin(), ::tolower);
-
- if (unit == "ns" || unit == "nano" || unit == "nanos" || unit == "nanoseconds") {
- timeunit = core::TimeUnit::NANOSECOND;
- output = ival;
- return true;
- } else if (unit == "us" || unit == "micro" || unit == "micros" || unit == "microseconds" || unit == "microsecond") {
- timeunit = core::TimeUnit::MICROSECOND;
- output = ival;
- return true;
- } else if (unit == "msec" || unit == "ms" || unit == "millisecond" || unit == "milliseconds" || unit == "msecs" || unit == "millis" || unit == "milli") {
- timeunit = core::TimeUnit::MILLISECOND;
- output = ival;
- return true;
- } else if (unit == "sec" || unit == "s" || unit == "second" || unit == "seconds" || unit == "secs") {
- timeunit = core::TimeUnit::SECOND;
- output = ival;
- return true;
- } else if (unit == "min" || unit == "m" || unit == "mins" || unit == "minute" || unit == "minutes") {
- timeunit = core::TimeUnit::MINUTE;
- output = ival;
- return true;
- } else if (unit == "h" || unit == "hr" || unit == "hour" || unit == "hrs" || unit == "hours") {
- timeunit = core::TimeUnit::HOUR;
- output = ival;
- return true;
- } else if (unit == "d" || unit == "day" || unit == "days") {
- timeunit = core::TimeUnit::DAY;
- output = ival;
- return true;
- } else {
- return false;
- }
-}
} /* namespace internal */
} /* namespace utils */
} /* namespace minifi */
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index e2bc4f9..5dd1d0b 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -33,6 +33,8 @@
#include "core/Processor.h"
#include "core/logging/LoggerConfiguration.h"
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -42,14 +44,6 @@ Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository,
: core::Connectable(name),
flow_repository_(flow_repository),
content_repo_(content_repo) {
- source_connectable_ = nullptr;
- dest_connectable_ = nullptr;
- max_queue_size_ = 0;
- max_data_queue_size_ = 0;
- expired_duration_ = 0;
- queued_data_size_ = 0;
- drop_empty_ = false;
-
logger_->log_debug("Connection %s created", name_);
}
@@ -57,14 +51,6 @@ Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository,
: core::Connectable(name, uuid),
flow_repository_(flow_repository),
content_repo_(content_repo) {
- source_connectable_ = nullptr;
- dest_connectable_ = nullptr;
- max_queue_size_ = 0;
- max_data_queue_size_ = 0;
- expired_duration_ = 0;
- queued_data_size_ = 0;
- drop_empty_ = false;
-
logger_->log_debug("Connection %s created", name_);
}
@@ -74,15 +60,6 @@ Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository,
flow_repository_(flow_repository),
content_repo_(content_repo) {
src_uuid_ = srcUUID;
-
- source_connectable_ = nullptr;
- dest_connectable_ = nullptr;
- max_queue_size_ = 0;
- max_data_queue_size_ = 0;
- expired_duration_ = 0;
- queued_data_size_ = 0;
- drop_empty_ = false;
-
logger_->log_debug("Connection %s created", name_);
}
@@ -94,15 +71,6 @@ Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository,
src_uuid_ = srcUUID;
dest_uuid_ = destUUID;
-
- source_connectable_ = nullptr;
- dest_connectable_ = nullptr;
- max_queue_size_ = 0;
- max_data_queue_size_ = 0;
- expired_duration_ = 0;
- queued_data_size_ = 0;
- drop_empty_ = false;
-
logger_->log_debug("Connection %s created", name_);
}
@@ -180,9 +148,9 @@ std::shared_ptr<core::FlowFile> Connection::poll(std::set<std::shared_ptr<core::
std::shared_ptr<core::FlowFile> item = queue_.pop();
queued_data_size_ -= item->getSize();
- if (expired_duration_ > 0) {
+ if (expired_duration_.load() > 0ms) {
// We need to check for flow expiration
- if (utils::timeutils::getTimeMillis() > (item->getEntryDate() + expired_duration_)) {
+ if (std::chrono::system_clock::now() > (item->getEntryDate() + expired_duration_.load())) {
// Flow record expired
expiredFlowRecords.insert(item);
logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp b/libminifi/src/CronDrivenSchedulingAgent.cpp
index 21c071a..101db10 100644
--- a/libminifi/src/CronDrivenSchedulingAgent.cpp
+++ b/libminifi/src/CronDrivenSchedulingAgent.cpp
@@ -27,6 +27,8 @@
#include "core/ProcessSessionFactory.h"
#include "core/Property.h"
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -66,10 +68,10 @@ utils::TaskRescheduleInfo CronDrivenSchedulingAgent::run(const std::shared_ptr<c
if (processor->isYield()) {
// Honor the yield
- return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime()));
- } else if (shouldYield && this->bored_yield_duration_ > 0) {
+ return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
+ } else if (shouldYield && this->bored_yield_duration_ > 0ms) {
// No work to do or need to apply back pressure
- return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(this->bored_yield_duration_));
+ return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_);
}
}
return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(result - from));
diff --git a/libminifi/src/DiskSpaceWatchdog.cpp b/libminifi/src/DiskSpaceWatchdog.cpp
index 49b79f2..fc930b5 100644
--- a/libminifi/src/DiskSpaceWatchdog.cpp
+++ b/libminifi/src/DiskSpaceWatchdog.cpp
@@ -21,6 +21,7 @@
#include "core/logging/Logger.h"
#include "properties/Configure.h"
#include "utils/file/PathUtils.h"
+#include "utils/TimeUtil.h"
namespace org {
namespace apache {
@@ -30,13 +31,6 @@ namespace minifi {
namespace {
namespace chr = std::chrono;
-std::optional<chr::milliseconds> time_string_to_milliseconds(const std::string& str) {
- uint64_t millisec_value{};
- const bool success = core::Property::getTimeMSFromString(str, millisec_value);
- if (!success) return std::nullopt;
- return chr::milliseconds{millisec_value};
-}
-
template<typename T, typename = std::enable_if_t<std::is_integral<T>::value>>
std::optional<T> data_size_string_to_int(const std::string& str) {
T result{};
@@ -51,7 +45,7 @@ std::optional<T> data_size_string_to_int(const std::string& str) {
namespace disk_space_watchdog {
Config read_config(const Configure& conf) {
- const auto interval_ms = conf.get(Configure::minifi_disk_space_watchdog_interval) | utils::flatMap(time_string_to_milliseconds);
+ const auto interval_ms = conf.get(Configure::minifi_disk_space_watchdog_interval) | utils::flatMap(utils::timeutils::StringToDuration<chr::milliseconds>);
const auto stop_bytes = conf.get(Configure::minifi_disk_space_watchdog_stop_threshold) | utils::flatMap(data_size_string_to_int<std::uintmax_t>);
const auto restart_bytes = conf.get(Configure::minifi_disk_space_watchdog_restart_threshold) | utils::flatMap(data_size_string_to_int<std::uintmax_t>);
if (restart_bytes < stop_bytes) { throw std::runtime_error{"disk space watchdog stop threshold must be <= restart threshold"}; }
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp
index 1f210ea..e9823e9 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -24,6 +24,8 @@
#include "core/ProcessSessionFactory.h"
#include "core/Property.h"
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -48,8 +50,7 @@ utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(const std::shared_ptr<
return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime()));
} else if (shouldYield) {
// No work to do or need to apply back pressure
- return utils::TaskRescheduleInfo::RetryIn(
- std::chrono::milliseconds((this->bored_yield_duration_ > 0) ? this->bored_yield_duration_ : 10)); // No work left to do, stand by
+ return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_ > 0ms ? this->bored_yield_duration_ : 10ms); // No work left to do, stand by
}
}
return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 52c9e56..0694fc5 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -71,19 +71,22 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(const std::string& k
bool FlowFileRecord::Serialize(io::OutputStream &outStream) {
{
- const auto ret = outStream.write(event_time_);
+ uint64_t event_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(event_time_.time_since_epoch()).count();
+ const auto ret = outStream.write(event_time_ms);
if (ret != 8) {
return false;
}
}
{
- const auto ret = outStream.write(entry_date_);
+ uint64_t entry_date_ms = std::chrono::duration_cast<std::chrono::milliseconds>(entry_date_.time_since_epoch()).count();
+ const auto ret = outStream.write(entry_date_ms);
if (ret != 8) {
return false;
}
}
{
- const auto ret = outStream.write(lineage_start_date_);
+ uint64_t lineage_start_date_ms = std::chrono::duration_cast<std::chrono::milliseconds>(lineage_start_date_.time_since_epoch()).count();
+ const auto ret = outStream.write(lineage_start_date_ms);
if (ret != 8) {
return false;
}
@@ -177,24 +180,30 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inS
auto file = std::make_shared<FlowFileRecord>();
{
- const auto ret = inStream.read(file->event_time_);
+ uint64_t event_time_in_ms;
+ const auto ret = inStream.read(event_time_in_ms);
if (ret != 8) {
return {};
}
+ file->event_time_ = std::chrono::system_clock::time_point() + std::chrono::milliseconds(event_time_in_ms);
}
{
- const auto ret = inStream.read(file->entry_date_);
+ uint64_t entry_date_in_ms;
+ const auto ret = inStream.read(entry_date_in_ms);
if (ret != 8) {
return {};
}
+ file->entry_date_ = std::chrono::system_clock::time_point() + std::chrono::milliseconds(entry_date_in_ms);
}
{
- const auto ret = inStream.read(file->lineage_start_date_);
+ uint64_t lineage_start_date_in_ms;
+ const auto ret = inStream.read(lineage_start_date_in_ms);
if (ret != 8) {
return {};
}
+ file->lineage_start_date_ = std::chrono::system_clock::time_point() + std::chrono::milliseconds(lineage_start_date_in_ms);
}
{
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp
index 28603a3..45a980f 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -167,17 +167,12 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon
}
}
{
- uint64_t idleTimeoutVal = 15000;
- std::string idleTimeoutStr;
- if (!context->getProperty(idleTimeout.getName(), idleTimeoutStr)
- || !core::Property::getTimeMSFromString(idleTimeoutStr, idleTimeoutVal)) {
- logger_->log_debug("%s attribute is invalid, so default value of %s will be used", idleTimeout.getName(),
- idleTimeout.getValue());
- if (!core::Property::getTimeMSFromString(idleTimeout.getValue(), idleTimeoutVal)) {
- assert(false); // Couldn't parse our default value
- }
+ if (auto idle_timeout = context->getProperty<core::TimePeriodValue>(idleTimeout)) {
+ idle_timeout_ = idle_timeout->getMilliseconds();
+ } else {
+ logger_->log_debug("%s attribute is invalid, so default value of %s will be used", idleTimeout.getName(), idleTimeout.getDefaultValue());
+ idle_timeout_ = core::TimePeriodValue(idleTimeout.getDefaultValue().to_string()).getMilliseconds();
}
- idle_timeout_ = std::chrono::milliseconds(idleTimeoutVal);
}
std::lock_guard<std::mutex> lock(peer_mutex_);
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp
index 5717b7b..b045a08 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -38,6 +38,9 @@
#include "core/ProcessSessionFactory.h"
#include "utils/ValueParser.h"
+using namespace std::literals::chrono_literals;
+
+
namespace org {
namespace apache {
namespace nifi {
@@ -46,23 +49,23 @@ namespace minifi {
void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processor) {
std::lock_guard<std::mutex> lock(mutex_);
- admin_yield_duration_ = 100; // We should prevent burning CPU in case of rollbacks
+ admin_yield_duration_ = 100ms; // We should prevent burning CPU in case of rollbacks
std::string yieldValue;
if (configure_->get(Configure::nifi_administrative_yield_duration, yieldValue)) {
std::optional<core::TimePeriodValue> value = core::TimePeriodValue::fromString(yieldValue);
if (value) {
admin_yield_duration_ = value->getMilliseconds();
- logger_->log_debug("nifi_administrative_yield_duration: [%" PRId64 "] ms", admin_yield_duration_);
+ logger_->log_debug("nifi_administrative_yield_duration: [%" PRId64 "] ms", int64_t{admin_yield_duration_.count()});
}
}
- bored_yield_duration_ = 0;
+ bored_yield_duration_ = 0ms;
if (configure_->get(Configure::nifi_bored_yield_duration, yieldValue)) {
std::optional<core::TimePeriodValue> value = core::TimePeriodValue::fromString(yieldValue);
if (value) {
bored_yield_duration_ = value->getMilliseconds();
- logger_->log_debug("nifi_bored_yield_duration: [%" PRId64 "] ms", bored_yield_duration_);
+ logger_->log_debug("nifi_bored_yield_duration: [%" PRId64 "] ms", int64_t{bored_yield_duration_.count()});
}
}
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 1b6b7f6..f82482e 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -24,6 +24,8 @@
#include <iostream>
#include "core/Property.h"
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -35,13 +37,12 @@ utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(const std::shared_ptr<
bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
if (processor->isYield()) {
// Honor the yield
- return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime()));
- } else if (shouldYield && this->bored_yield_duration_ > 0) {
+ return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
+ } else if (shouldYield && this->bored_yield_duration_ > 0ms) {
// No work to do or need to apply back pressure
- return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(this->bored_yield_duration_));
+ return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_);
}
- return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::nanoseconds(processor->getSchedulingPeriodNano())));
+ return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(processor->getSchedulingPeriodNano()));
}
return utils::TaskRescheduleInfo::Done();
}
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 58cc853..f802f7b 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -43,6 +43,8 @@
#include "utils/Monitors.h"
#include "utils/StringUtils.h"
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -54,7 +56,7 @@ C2Agent::C2Agent(core::controller::ControllerServiceProvider *controller,
const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configuration,
const std::shared_ptr<utils::file::FileSystem> &filesystem)
- : heart_beat_period_(3000),
+ : heart_beat_period_(3s),
max_c2_responses(5),
update_sink_(updateSink),
update_service_(nullptr),
@@ -161,22 +163,19 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf
}
if (configure->get("nifi.c2.agent.heartbeat.period", "c2.agent.heartbeat.period", heartbeat_period)) {
- core::TimeUnit unit;
-
try {
- int64_t schedulingPeriod = 0;
- if (core::Property::StringToTime(heartbeat_period, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToMS(schedulingPeriod, unit, schedulingPeriod)) {
- heart_beat_period_ = schedulingPeriod;
- logger_->log_debug("Using %u ms as the heartbeat period", heart_beat_period_);
+ if (auto heartbeat_period_ms = utils::timeutils::StringToDuration<std::chrono::milliseconds>(heartbeat_period)) {
+ heart_beat_period_ = *heartbeat_period_ms;
+ logger_->log_debug("Using %u ms as the heartbeat period", heart_beat_period_.count());
} else {
- heart_beat_period_ = std::stoi(heartbeat_period);
+ heart_beat_period_ = std::chrono::milliseconds(std::stoi(heartbeat_period));
}
} catch (const std::invalid_argument &) {
- heart_beat_period_ = 3000;
+ heart_beat_period_ = 3s;
}
} else {
if (!reconfigure)
- heart_beat_period_ = 3000;
+ heart_beat_period_ = 3s;
}
std::string heartbeat_reporters;
@@ -640,7 +639,7 @@ utils::TaskRescheduleInfo C2Agent::produce() {
checkTriggers();
- return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
+ return utils::TaskRescheduleInfo::RetryIn(heart_beat_period_);
}
utils::TaskRescheduleInfo C2Agent::consume() {
diff --git a/libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp b/libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp
index 2163344..ad9b074 100644
--- a/libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp
+++ b/libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp
@@ -16,6 +16,9 @@
*/
#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+#include <cinttypes>
+
+using namespace std::literals::chrono_literals;
namespace org {
namespace apache {
@@ -72,16 +75,14 @@ void AbstractAutoPersistingKeyValueStoreService::onEnable() {
} else {
always_persist_ = utils::StringUtils::toBool(value).value_or(false);
}
- if (!getProperty(AutoPersistenceInterval.getName(), value)) {
+ core::TimePeriodValue auto_persistence_interval;
+ if (!getProperty(AutoPersistenceInterval.getName(), auto_persistence_interval)) {
logger_->log_error("Auto Persistence Interval attribute is missing or invalid");
} else {
- core::TimeUnit unit;
- if (!core::Property::StringToTime(value, auto_persistence_interval_, unit) || !core::Property::ConvertTimeUnitToMS(auto_persistence_interval_, unit, auto_persistence_interval_)) {
- logger_->log_error("Auto Persistence Interval attribute is invalid");
- }
+ auto_persistence_interval_ = auto_persistence_interval.getMilliseconds();
}
- if (!always_persist_ && auto_persistence_interval_ != 0U) {
+ if (!always_persist_ && auto_persistence_interval_ != 0s) {
if (!persisting_thread_.joinable()) {
logger_->log_trace("Starting auto persistence thread");
running_ = true;
@@ -100,8 +101,8 @@ void AbstractAutoPersistingKeyValueStoreService::persistingThreadFunc() {
std::unique_lock<std::mutex> lock(persisting_mutex_);
while (true) {
- logger_->log_trace("Persisting thread is going to sleep for %d ms", auto_persistence_interval_);
- persisting_cv_.wait_for(lock, std::chrono::milliseconds(auto_persistence_interval_), [this] {
+ logger_->log_trace("Persisting thread is going to sleep for %" PRId64 " ms", int64_t{auto_persistence_interval_.count()});
+ persisting_cv_.wait_for(lock, auto_persistence_interval_, [this] {
return !running_;
});
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index 05b589c..22a75f2 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -40,9 +40,6 @@ FlowFile::FlowFile()
: CoreComponent("FlowFile"),
stored(false),
marked_delete_(false),
- entry_date_(0),
- event_time_(0),
- lineage_start_date_(0),
last_queue_date_(0),
size_(0),
id_(0),
@@ -50,7 +47,7 @@ FlowFile::FlowFile()
to_be_processed_after_(std::chrono::steady_clock::now()),
claim_(nullptr) {
id_ = numeric_id_generator_->generateId();
- entry_date_ = utils::timeutils::getTimeMillis();
+ entry_date_ = std::chrono::system_clock::now();
event_time_ = entry_date_;
lineage_start_date_ = entry_date_;
}
@@ -130,14 +127,14 @@ bool FlowFile::hasStashClaim(const std::string& key) {
}
// ! Get Entry Date
-uint64_t FlowFile::getEntryDate() const {
+std::chrono::system_clock::time_point FlowFile::getEntryDate() const {
return entry_date_;
}
-uint64_t FlowFile::getEventTime() const {
+std::chrono::system_clock::time_point FlowFile::getEventTime() const {
return event_time_;
}
// ! Get Lineage Start Date
-uint64_t FlowFile::getlineageStartDate() const {
+std::chrono::system_clock::time_point FlowFile::getlineageStartDate() const {
return lineage_start_date_;
}
@@ -202,7 +199,7 @@ bool FlowFile::addAttribute(const std::string& key, const std::string& value) {
}
}
-void FlowFile::setLineageStartDate(const uint64_t date) {
+void FlowFile::setLineageStartDate(const std::chrono::system_clock::time_point date) {
lineage_start_date_ = date;
}
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index aa5ff43..bfe4d4d 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -30,6 +30,8 @@
#include "core/Processor.h"
#include "core/logging/LoggerConfiguration.h"
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -52,7 +54,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, const std::string& name, const
type_(type),
parent_process_group_(parent),
logger_(logging::LoggerFactory<ProcessGroup>::getLogger()) {
- yield_period_msec_ = 0;
+ yield_period_msec_ = 0ms;
if (parent_process_group_ != 0) {
onschedule_retry_msec_ = parent_process_group_->getOnScheduleRetryPeriod();
@@ -71,7 +73,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, const std::string& name)
type_(type),
parent_process_group_(0),
logger_(logging::LoggerFactory<ProcessGroup>::getLogger()) {
- yield_period_msec_ = 0;
+ yield_period_msec_ = 0ms;
onschedule_retry_msec_ = ONSCHEDULE_RETRY_INTERVAL;
transmitting_ = false;
transport_protocol_ = "RAW";
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index f4afffd..411a71a 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -241,7 +241,7 @@ void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputSt
std::shared_ptr<ResourceClaim> claim = content_session_->create();
try {
- uint64_t startTime = utils::timeutils::getTimeMillis();
+ auto start_time = std::chrono::steady_clock::now();
std::shared_ptr<io::BaseStream> stream = content_session_->write(claim);
// Call the callback to write the content
if (nullptr == stream) {
@@ -257,8 +257,8 @@ void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputSt
stream->close();
std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flow->getUUIDStr();
- uint64_t endTime = utils::timeutils::getTimeMillis();
- provenance_report_->modifyContent(flow, details, endTime - startTime);
+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
+ provenance_report_->modifyContent(flow, details, duration);
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
throw;
@@ -293,7 +293,7 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputS
}
try {
- uint64_t startTime = utils::timeutils::getTimeMillis();
+ auto start_time = std::chrono::steady_clock::now();
std::shared_ptr<io::BaseStream> stream = content_session_->write(claim, ContentSession::WriteMode::APPEND);
if (nullptr == stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for append");
@@ -312,8 +312,8 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputS
std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
- uint64_t endTime = utils::timeutils::getTimeMillis();
- provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
+ provenance_report_->modifyContent(flow, details.str(), duration);
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
throw;
@@ -445,7 +445,7 @@ void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptr<c
std::vector<uint8_t> charBuffer(max_read);
try {
- auto startTime = utils::timeutils::getTimeMillis();
+ auto start_time = std::chrono::steady_clock::now();
std::shared_ptr<io::BaseStream> content_stream = content_session_->write(claim);
if (nullptr == content_stream) {
@@ -472,8 +472,8 @@ void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptr<c
content_stream->close();
std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
- auto endTime = utils::timeutils::getTimeMillis();
- provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
+ provenance_report_->modifyContent(flow, details.str(), duration);
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
throw;
@@ -489,7 +489,7 @@ void ProcessSession::import(std::string source, const std::shared_ptr<FlowFile>
std::vector<uint8_t> charBuffer(size);
try {
- auto startTime = utils::timeutils::getTimeMillis();
+ auto start_time = std::chrono::steady_clock::now();
std::ifstream input;
input.open(source.c_str(), std::fstream::in | std::fstream::binary);
std::shared_ptr<io::BaseStream> stream = content_session_->write(claim);
@@ -536,8 +536,8 @@ void ProcessSession::import(std::string source, const std::shared_ptr<FlowFile>
std::remove(source.c_str());
std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
- auto endTime = utils::timeutils::getTimeMillis();
- provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
+ provenance_report_->modifyContent(flow, details.str(), duration);
} else {
stream->close();
input.close();
@@ -574,7 +574,6 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
}
}
- uint64_t startTime = 0U;
while (input.good()) {
input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
std::streamsize read = input.gcount();
@@ -590,7 +589,7 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
uint8_t* begin = buffer.data();
uint8_t* end = begin + read;
while (true) {
- startTime = utils::timeutils::getTimeMillis();
+ auto start_time = std::chrono::steady_clock::now();
uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
const auto len = gsl::narrow<size_t>(delimiterPos - begin);
@@ -606,7 +605,7 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
/* Create claim and stream if needed and append data */
if (claim == nullptr) {
- startTime = utils::timeutils::getTimeMillis();
+ start_time = std::chrono::steady_clock::now();
claim = content_session_->create();
}
if (stream == nullptr) {
@@ -634,8 +633,8 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p
<< ", FlowFile UUID " << flowFile->getUUIDStr();
stream->close();
std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
- uint64_t endTime = utils::timeutils::getTimeMillis();
- provenance_report_->modifyContent(flowFile, details, endTime - startTime);
+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
+ provenance_report_->modifyContent(flowFile, details, duration);
flows.push_back(flowFile);
/* Reset these to start processing the next FlowFile with a clean slate */
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 744e1a3..f31f7c0 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -35,6 +35,8 @@
#include "io/StreamFactory.h"
#include "utils/gsl.h"
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -53,11 +55,10 @@ Processor::Processor(const std::string& name)
_triggerWhenEmpty = false;
scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
run_duration_nano_ = DEFAULT_RUN_DURATION;
- yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
+ yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS;
penalization_period_ = DEFAULT_PENALIZATION_PERIOD;
max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS;
active_tasks_ = 0;
- yield_expiration_ = 0;
incoming_connections_Iter = this->_incomingConnections.begin();
logger_->log_debug("Processor %s created UUID %s", name_, getUUIDStr());
}
@@ -74,11 +75,10 @@ Processor::Processor(const std::string& name, const utils::Identifier& uuid)
_triggerWhenEmpty = false;
scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
run_duration_nano_ = DEFAULT_RUN_DURATION;
- yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
+ yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS;
penalization_period_ = DEFAULT_PENALIZATION_PERIOD;
max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS;
active_tasks_ = 0;
- yield_expiration_ = 0;
incoming_connections_Iter = this->_incomingConnections.begin();
logger_->log_debug("Processor %s created with uuid %s", name_, getUUIDStr());
}
@@ -421,6 +421,31 @@ void Processor::setMaxConcurrentTasks(const uint8_t tasks) {
max_concurrent_tasks_ = tasks;
}
+void Processor::yield() {
+ yield_expiration_ = std::chrono::system_clock::now() + yield_period_msec_.load();
+}
+
+void Processor::yield(std::chrono::milliseconds delta_time) {
+ yield_expiration_ = std::chrono::system_clock::now() + delta_time;
+}
+
+bool Processor::isYield() {
+ return yield_expiration_.load() >= std::chrono::system_clock::now();
+}
+
+void Processor::clearYield() {
+ yield_expiration_ = std::chrono::system_clock::time_point();
+}
+
+std::chrono::milliseconds Processor::getYieldTime() const {
+ auto yield_expiration = yield_expiration_.load();
+ auto current_time = std::chrono::system_clock::now();
+ if (yield_expiration > current_time)
+ return std::chrono::duration_cast<std::chrono::milliseconds>(yield_expiration - current_time);
+ else
+ return 0ms;
+}
+
} // namespace core
} // namespace minifi
} // namespace nifi
diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp
index bb0eee0..a670309 100644
--- a/libminifi/src/core/Repository.cpp
+++ b/libminifi/src/core/Repository.cpp
@@ -17,14 +17,13 @@
*/
#include "core/Repository.h"
#include <cstdint>
-#include <vector>
#include "io/BufferStream.h"
-#include "core/Relationship.h"
#include "core/logging/Logger.h"
-#include "FlowController.h"
#include "provenance/Provenance.h"
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -32,7 +31,7 @@ namespace minifi {
namespace core {
void Repository::start() {
- if (this->purge_period_ <= 0)
+ if (this->purge_period_ <= 0ms)
return;
if (running_)
return;
diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp
index 2f82b8e..1ce4c66 100644
--- a/libminifi/src/core/RepositoryFactory.cpp
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -27,6 +27,8 @@
#include "core/repository/VolatileFlowFileRepository.h"
#include "core/repository/VolatileProvenanceRepository.h"
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -34,7 +36,6 @@ namespace minifi {
namespace core {
std::shared_ptr<core::Repository> createRepository(const std::string configuration_class_name, bool fail_safe, const std::string repo_name) {
- std::shared_ptr<core::Repository> return_obj = nullptr;
std::string class_name_lc = configuration_class_name;
std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower);
try {
@@ -61,13 +62,13 @@ std::shared_ptr<core::Repository> createRepository(const std::string configurati
return return_obj;
}
if (fail_safe) {
- return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1);
+ return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1ms, 1, 1ms);
} else {
throw std::runtime_error("Support for the provided configuration class could not be found");
}
} catch (const std::runtime_error &) {
if (fail_safe) {
- return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1);
+ return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1ms, 1, 1ms);
}
}
@@ -75,7 +76,6 @@ std::shared_ptr<core::Repository> createRepository(const std::string configurati
}
std::shared_ptr<core::ContentRepository> createContentRepository(const std::string configuration_class_name, bool fail_safe, const std::string repo_name) {
- std::shared_ptr<core::ContentRepository> return_obj = nullptr;
std::string class_name_lc = configuration_class_name;
std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower);
try {
diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
index 2e50036..fea406a 100644
--- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
+++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
@@ -113,9 +113,9 @@ void SiteToSiteProvenanceReportingTask::getJsonReport(const std::shared_ptr<core
rapidjson::Value parentUuidJson(rapidjson::kArrayType);
rapidjson::Value childUuidJson(rapidjson::kArrayType);
- recordJson.AddMember("timestampMillis", record->getEventTime(), alloc);
- recordJson.AddMember("durationMillis", record->getEventDuration(), alloc);
- recordJson.AddMember("lineageStart", record->getlineageStartDate(), alloc);
+ recordJson.AddMember("timestampMillis", std::chrono::duration_cast<std::chrono::milliseconds>(record->getEventTime().time_since_epoch()).count(), alloc);
+ recordJson.AddMember("durationMillis", record->getEventDuration().count(), alloc);
+ recordJson.AddMember("lineageStart", std::chrono::duration_cast<std::chrono::milliseconds>(record->getlineageStartDate().time_since_epoch()).count(), alloc);
recordJson.AddMember("entitySize", record->getFileSize(), alloc);
recordJson.AddMember("entityOffset", record->getFileOffset(), alloc);
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
index 71b4a02..e44dfed 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -27,6 +27,8 @@
#include "io/FileStream.h"
#include "utils/StringUtils.h"
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -66,7 +68,7 @@ void VolatileContentRepository::run() {
}
void VolatileContentRepository::start() {
- if (this->purge_period_ <= 0)
+ if (this->purge_period_ <= 0ms)
return;
if (running_)
return;
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index a251b0f..79f6ab2 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -26,6 +26,7 @@
#include "core/yaml/YamlConnectionParser.h"
#include "core/state/Value.h"
#include "Defaults.h"
+#include "utils/TimeUtil.h"
#ifdef YAML_CONFIGURATION_USE_REGEX
#include <regex>
@@ -78,17 +79,13 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::createProcessGroup(const
}
if (yamlNode["onschedule retry interval"]) {
- int64_t onScheduleRetryPeriodValue = -1;
std::string onScheduleRetryPeriod = yamlNode["onschedule retry interval"].as<std::string>();
logger_->log_debug("parseRootProcessGroup: onschedule retry period => [%s]", onScheduleRetryPeriod);
- core::TimeUnit unit;
-
- if (core::Property::StringToTime(onScheduleRetryPeriod, onScheduleRetryPeriodValue, unit)
- && core::Property::ConvertTimeUnitToMS(onScheduleRetryPeriodValue, unit, onScheduleRetryPeriodValue)
- && group) {
- logger_->log_debug("parseRootProcessGroup: onschedule retry => [%" PRId64 "] ms", onScheduleRetryPeriodValue);
- group->setOnScheduleRetryPeriod(onScheduleRetryPeriodValue);
+ auto on_schedule_retry_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(onScheduleRetryPeriod);
+ if (on_schedule_retry_period_value.has_value() && group) {
+ logger_->log_debug("parseRootProcessGroup: onschedule retry => [%" PRId64 "] ms", on_schedule_retry_period_value->count());
+ group->setOnScheduleRetryPeriod(on_schedule_retry_period_value->count());
}
}
@@ -145,9 +142,6 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(const YAML::N
}
void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode, core::ProcessGroup* parentGroup) {
- int64_t schedulingPeriod = -1;
- int64_t penalizationPeriod = -1;
- int64_t yieldPeriod = -1;
int64_t runDurationNanos = -1;
utils::Identifier uuid;
std::shared_ptr<core::Processor> processor = nullptr;
@@ -251,25 +245,23 @@ void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode,
// Take care of scheduling
- core::TimeUnit unit;
-
if (procCfg.schedulingStrategy == "TIMER_DRIVEN" || procCfg.schedulingStrategy == "EVENT_DRIVEN") {
- if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
- logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%" PRId64 "] ns", schedulingPeriod);
- processor->setSchedulingPeriodNano(schedulingPeriod);
+ if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(procCfg.schedulingPeriod)) {
+ logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%" PRId64 "] ns", scheduling_period->count());
+ processor->setSchedulingPeriodNano(*scheduling_period);
}
} else {
processor->setCronPeriod(procCfg.schedulingPeriod);
}
- if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
- logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%" PRId64 "] ms", penalizationPeriod);
- processor->setPenalizationPeriod(std::chrono::milliseconds{penalizationPeriod});
+ if (auto penalization_period = utils::timeutils::StringToDuration<std::chrono::milliseconds>(procCfg.penalizationPeriod)) {
+ logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%" PRId64 "] ms", penalization_period->count());
+ processor->setPenalizationPeriod(penalization_period.value());
}
- if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
- logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%" PRId64 "] ms", yieldPeriod);
- processor->setYieldPeriodMsec(yieldPeriod);
+ if (auto yield_period = utils::timeutils::StringToDuration<std::chrono::milliseconds>(procCfg.yieldPeriod)) {
+ logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%" PRId64 "] ms", yield_period->count());
+ processor->setYieldPeriodMsec(yield_period.value());
}
// Default to running
@@ -294,7 +286,7 @@ void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode,
if (core::Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
- processor->setRunDurationNano((uint64_t) runDurationNanos);
+ processor->setRunDurationNano(std::chrono::nanoseconds(runDurationNanos));
}
std::set<core::Relationship> autoTerminatedRelationships;
@@ -337,9 +329,6 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(const YAML::Node& rpgNode, c
std::string url = urlNode.as<std::string>();
logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url);
- core::TimeUnit unit;
- int64_t timeoutValue = -1;
- int64_t yieldPeriodValue = -1;
uuid = id;
auto group = this->createRemoteProcessGroup(name, uuid);
group->setParent(parentGroup);
@@ -348,9 +337,10 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(const YAML::Node& rpgNode, c
std::string yieldPeriod = currRpgNode["yield period"].as<std::string>();
logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod);
- if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
- logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%" PRId64 "] ms", yieldPeriodValue);
- group->setYieldPeriodMsec(yieldPeriodValue);
+ auto yield_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(yieldPeriod);
+ if (yield_period_value.has_value() && group) {
+ logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%" PRId64 "] ms", yield_period_value->count());
+ group->setYieldPeriodMsec(*yield_period_value);
}
}
@@ -358,9 +348,10 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(const YAML::Node& rpgNode, c
std::string timeout = currRpgNode["timeout"].as<std::string>();
logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout);
- if (core::Property::StringToTime(timeout, timeoutValue, unit) && core::Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
- logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%" PRId64 "] ms", timeoutValue);
- group->setTimeOut(timeoutValue);
+ auto timeout_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(timeout);
+ if (timeout_value.has_value() && group) {
+ logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%" PRId64 "] ms", timeout_value->count());
+ group->setTimeout(timeout_value->count());
}
}
@@ -435,7 +426,6 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(const YAML::Node& rpgNode, c
void YamlConfiguration::parseProvenanceReportingYaml(const YAML::Node& reportNode, core::ProcessGroup* parentGroup) {
utils::Identifier port_uuid;
- int64_t schedulingPeriod = -1;
if (!parentGroup) {
logger_->log_error("parseProvenanceReportingYaml: no parent group exists");
@@ -458,10 +448,9 @@ void YamlConfiguration::parseProvenanceReportingYaml(const YAML::Node& reportNod
yaml::checkRequiredField(node, "scheduling period", CONFIG_YAML_PROVENANCE_REPORT_KEY);
auto schedulingPeriodStr = node["scheduling period"].as<std::string>();
- core::TimeUnit unit;
- if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
- logger_->log_debug("ProvenanceReportingTask schedulingPeriod %" PRId64 " ns", schedulingPeriod);
- processor->setSchedulingPeriodNano(schedulingPeriod);
+ if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(schedulingPeriodStr)) {
+ logger_->log_debug("ProvenanceReportingTask schedulingPeriod %" PRId64 " ns", scheduling_period->count());
+ processor->setSchedulingPeriodNano(*scheduling_period);
}
if (schedulingStrategyStr == "TIMER_DRIVEN") {
@@ -626,7 +615,7 @@ void YamlConfiguration::parsePortYaml(const YAML::Node& portNode, core::ProcessG
processor = std::static_pointer_cast<core::Processor>(port);
port->setDirection(direction);
- port->setTimeOut(parent->getTimeOut());
+ port->setTimeout(parent->getTimeout());
port->setTransmitting(true);
processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
processor->initialize();
diff --git a/libminifi/src/core/yaml/YamlConnectionParser.cpp b/libminifi/src/core/yaml/YamlConnectionParser.cpp
index b434d9d..bfb1041 100644
--- a/libminifi/src/core/yaml/YamlConnectionParser.cpp
+++ b/libminifi/src/core/yaml/YamlConnectionParser.cpp
@@ -167,16 +167,15 @@ utils::Identifier YamlConnectionParser::getDestinationUUIDFromYaml() const {
throw std::invalid_argument(error_msg);
}
-uint64_t YamlConnectionParser::getFlowFileExpirationFromYaml() const {
+std::chrono::milliseconds YamlConnectionParser::getFlowFileExpirationFromYaml() const {
+ using namespace std::literals::chrono_literals;
const YAML::Node expiration_node = connectionNode_["flowfile expiration"];
if (!expiration_node) {
logger_->log_debug("parseConnection: flowfile expiration is not set, assuming 0 (never expire)");
- return 0;
+ return 0ms;
}
- uint64_t expirationDuration = 0;
- TimeUnit unit;
- const std::string flowfile_expiration_str = expiration_node.as<std::string>();
- if (!core::Property::StringToTime(flowfile_expiration_str, expirationDuration, unit) || !core::Property::ConvertTimeUnitToMS(expirationDuration, unit, expirationDuration)) {
+ auto expiration_duration = utils::timeutils::StringToDuration<std::chrono::milliseconds>(expiration_node.as<std::string>());
+ if (!expiration_duration.has_value()) {
// We should throw here, but we do not.
// The reason is that our parser only accepts time formats that consists of a number and
// a unit, but users might use this field populated with a "0" (and no units).
@@ -184,9 +183,10 @@ uint64_t YamlConnectionParser::getFlowFileExpirationFromYaml() const {
// all already-supported configuration files.
// This has the side-effect of allowing values like "20 minuites" and silently defaulting to 0.
logger_->log_debug("Parsing failure for flowfile expiration duration");
+ expiration_duration = 0ms;
}
- logger_->log_debug("parseConnection: flowfile expiration => [%d]", expirationDuration);
- return expirationDuration;
+ logger_->log_debug("parseConnection: flowfile expiration => [%d]", expiration_duration->count());
+ return *expiration_duration;
}
bool YamlConnectionParser::getDropEmptyFromYaml() const {
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index f0b40c4..e982c1c 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -44,14 +44,11 @@ const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY + 1] = { "CREAT
"ATTRIBUTES_MODIFIED", "ROUTE", "ADDINFO", "REPLAY" };
ProvenanceEventRecord::ProvenanceEventRecord(ProvenanceEventRecord::ProvenanceEventType event, std::string componentId, std::string componentType)
- : core::SerializableComponent(core::getClassName<ProvenanceEventRecord>()),
- _entryDate(0),
- _lineageStartDate(0),
- _eventDuration(0) {
+ : core::SerializableComponent(core::getClassName<ProvenanceEventRecord>()) {
_eventType = event;
_componentId = componentId;
_componentType = componentType;
- _eventTime = utils::timeutils::getTimeMillis();
+ _eventTime = std::chrono::system_clock::now();
}
// DeSerialize
@@ -102,25 +99,29 @@ bool ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::BufferStrea
}
}
{
- const auto ret = outStream.write(this->_eventTime);
+ uint64_t event_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(_eventTime.time_since_epoch()).count();
+ const auto ret = outStream.write(event_time_ms);
if (ret != 8) {
return false;
}
}
{
- const auto ret = outStream.write(this->_entryDate);
+ uint64_t entry_date_ms = std::chrono::duration_cast<std::chrono::milliseconds>(_entryDate.time_since_epoch()).count();
+ const auto ret = outStream.write(entry_date_ms);
if (ret != 8) {
return false;
}
}
{
- const auto ret = outStream.write(this->_eventDuration);
+ uint64_t event_duration_ms = this->_eventDuration.count();
+ const auto ret = outStream.write(event_duration_ms);
if (ret != 8) {
return false;
}
}
{
- const auto ret = outStream.write(this->_lineageStartDate);
+ uint64_t lineage_start_date_ms = std::chrono::duration_cast<std::chrono::milliseconds>(_lineageStartDate.time_since_epoch()).count();
+ const auto ret = outStream.write(lineage_start_date_ms);
if (ret != 8) {
return false;
}
@@ -278,31 +279,39 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType;
{
- const auto ret = outStream.read(this->_eventTime);
+ uint64_t event_time_in_ms;
+ const auto ret = outStream.read(event_time_in_ms);
if (ret != 8) {
return false;
}
+ _eventTime = std::chrono::system_clock::time_point() + std::chrono::milliseconds(event_time_in_ms);
}
{
- const auto ret = outStream.read(this->_entryDate);
+ uint64_t entry_date_in_ms;
+ const auto ret = outStream.read(entry_date_in_ms);
if (ret != 8) {
return false;
}
+ _entryDate = std::chrono::system_clock::time_point() + std::chrono::milliseconds(entry_date_in_ms);
}
{
- const auto ret = outStream.read(this->_eventDuration);
+ uint64_t event_duration_ms;
+ const auto ret = outStream.read(event_duration_ms);
if (ret != 8) {
return false;
}
+ _eventDuration = std::chrono::milliseconds(event_duration_ms);
}
{
- const auto ret = outStream.read(this->_lineageStartDate);
+ uint64_t lineage_start_date_in_ms;
+ const auto ret = outStream.read(lineage_start_date_in_ms);
if (ret != 8) {
return false;
}
+ _lineageStartDate = std::chrono::system_clock::time_point() + std::chrono::milliseconds(lineage_start_date_in_ms);
}
{
@@ -480,7 +489,7 @@ void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow, std::strin
}
}
-void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation, std::string detail, uint64_t processingDuration) {
+void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation, std::string detail, std::chrono::milliseconds processingDuration) {
auto event = allocate(ProvenanceEventRecord::ROUTE, flow);
if (event) {
@@ -500,7 +509,7 @@ void ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> flow,
}
}
-void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail, uint64_t processingDuration) {
+void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail, std::chrono::milliseconds processingDuration) {
auto event = allocate(ProvenanceEventRecord::CONTENT_MODIFIED, flow);
if (event) {
@@ -520,7 +529,7 @@ void ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent, std::shar
}
}
-void ProvenanceReporter::join(std::vector<std::shared_ptr<core::FlowFile> > parents, std::shared_ptr<core::FlowFile> child, std::string detail, uint64_t processingDuration) {
+void ProvenanceReporter::join(std::vector<std::shared_ptr<core::FlowFile> > parents, std::shared_ptr<core::FlowFile> child, std::string detail, std::chrono::milliseconds processingDuration) {
auto event = allocate(ProvenanceEventRecord::JOIN, child);
if (event) {
@@ -536,7 +545,7 @@ void ProvenanceReporter::join(std::vector<std::shared_ptr<core::FlowFile> > pare
}
}
-void ProvenanceReporter::fork(std::vector<std::shared_ptr<core::FlowFile> > child, std::shared_ptr<core::FlowFile> parent, std::string detail, uint64_t processingDuration) {
+void ProvenanceReporter::fork(std::vector<std::shared_ptr<core::FlowFile> > child, std::shared_ptr<core::FlowFile> parent, std::string detail, std::chrono::milliseconds processingDuration) {
auto event = allocate(ProvenanceEventRecord::FORK, parent);
if (event) {
@@ -571,7 +580,7 @@ void ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow, std::string
}
}
-void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, uint64_t processingDuration, bool force) {
+void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, std::chrono::milliseconds processingDuration, bool force) {
auto event = allocate(ProvenanceEventRecord::SEND, flow);
if (event) {
@@ -587,7 +596,11 @@ void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow, std::string
}
}
-void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string sourceSystemFlowFileIdentifier, std::string detail, uint64_t processingDuration) {
+void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow,
+ std::string transitUri,
+ std::string sourceSystemFlowFileIdentifier,
+ std::string detail,
+ std::chrono::milliseconds processingDuration) {
auto event = allocate(ProvenanceEventRecord::RECEIVE, flow);
if (event) {
@@ -599,7 +612,7 @@ void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow, std::stri
}
}
-void ProvenanceReporter::fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, uint64_t processingDuration) {
+void ProvenanceReporter::fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, std::chrono::milliseconds processingDuration) {
auto event = allocate(ProvenanceEventRecord::FETCH, flow);
if (event) {
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index 7f854d3..28f037b 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -33,6 +33,8 @@
#include "sitetosite/Peer.h"
#include "utils/gsl.h"
+using namespace std::literals::chrono_literals;
+
namespace org {
namespace apache {
namespace nifi {
@@ -256,14 +258,14 @@ bool RawSiteToSiteClient::handShake() {
std::map<std::string, std::string> properties;
properties[HandShakePropertyStr[GZIP]] = "false";
properties[HandShakePropertyStr[PORT_IDENTIFIER]] = port_id_.to_string();
- properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(_timeOut);
+ properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(_timeout.load().count());
if (_currentVersion >= 5) {
if (_batchCount > 0)
properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string(_batchCount);
if (_batchSize > 0)
properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string(_batchSize);
- if (_batchDuration > 0)
- properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(_batchDuration);
+ if (_batchDuration.load() > 0ms)
+ properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(_batchDuration.load().count());
}
if (_currentVersion >= 3) {
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index b0418fd..968ea39 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -131,11 +131,11 @@ bool SiteToSiteClient::transferFlowFiles(const std::shared_ptr<core::ProcessCont
utils::Identifier transactionID = transaction->getUUID();
bool continueTransaction = true;
- uint64_t startSendingNanos = utils::timeutils::getTimeNano();
+ std::chrono::high_resolution_clock::time_point transaction_started_at = std::chrono::high_resolution_clock::now();
try {
while (continueTransaction) {
- uint64_t startTime = utils::timeutils::getTimeMillis();
+ auto start_time = std::chrono::steady_clock::now();
std::string payload;
DataPacket packet(getLogger(), transaction, flow->getAttributes(), payload);
@@ -146,15 +146,15 @@ bool SiteToSiteClient::transferFlowFiles(const std::shared_ptr<core::ProcessCont
logger_->log_debug("Site2Site transaction %s send flow record %s", transactionID.to_string(), flow->getUUIDStr());
if (resp == 0) {
- uint64_t endTime = utils::timeutils::getTimeMillis();
+ auto end_time = std::chrono::steady_clock::now();
std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + peer_->getHostName();
- session->getProvenanceReporter()->send(flow, transitUri, details, endTime - startTime, false);
+ session->getProvenanceReporter()->send(flow, transitUri, details, std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time), false);
}
session->remove(flow);
- uint64_t transferNanos = utils::timeutils::getTimeNano() - startSendingNanos;
- if (transferNanos > _batchSendNanos)
+ std::chrono::nanoseconds transfer_duration = std::chrono::high_resolution_clock::now() - transaction_started_at;
+ if (transfer_duration > _batchSendNanos)
break;
flow = session->get();
@@ -381,7 +381,6 @@ bool SiteToSiteClient::complete(const utils::Identifier& transactionID) {
} else {
RespondCode code;
std::string message;
- int ret;
ret = readResponse(transaction, code, message);
@@ -669,7 +668,7 @@ bool SiteToSiteClient::receiveFlowFiles(const std::shared_ptr<core::ProcessConte
try {
while (true) {
std::map<std::string, std::string> empty;
- uint64_t startTime = utils::timeutils::getTimeMillis();
+ auto start_time = std::chrono::steady_clock::now();
std::string payload;
DataPacket packet(getLogger(), transaction, empty, payload);
bool eof = false;
@@ -706,10 +705,10 @@ bool SiteToSiteClient::receiveFlowFiles(const std::shared_ptr<core::ProcessConte
}
}
core::Relationship relation; // undefined relationship
- uint64_t endTime = utils::timeutils::getTimeMillis();
+ auto end_time = std::chrono::steady_clock::now();
std::string transitUri = peer_->getURL() + "/" + sourceIdentifier;
std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host=" + peer_->getHostName();
- session->getProvenanceReporter()->receive(flowFile, transitUri, sourceIdentifier, details, endTime - startTime);
+ session->getProvenanceReporter()->receive(flowFile, transitUri, sourceIdentifier, details, std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time));
session->transfer(flowFile, relation);
// receive the transfer for the flow record
bytes += packet._size;
diff --git a/libminifi/src/utils/ProcessorConfigUtils.cpp b/libminifi/src/utils/ProcessorConfigUtils.cpp
index 93b4ec8..4bd515a 100644
--- a/libminifi/src/utils/ProcessorConfigUtils.cpp
+++ b/libminifi/src/utils/ProcessorConfigUtils.cpp
@@ -48,13 +48,8 @@ bool parseBooleanPropertyOrThrow(const core::ProcessContext& context, const std:
}
std::chrono::milliseconds parseTimePropertyMSOrThrow(const core::ProcessContext& context, const std::string& property_name) {
- core::TimeUnit unit;
- uint64_t time_value_ms;
- const std::string value_str = getRequiredPropertyOrThrow(context, property_name);
- if (!core::Property::StringToTime(value_str, time_value_ms, unit) || !core::Property::ConvertTimeUnitToMS(time_value_ms, unit, time_value_ms)) {
- throw std::runtime_error(property_name + " property is invalid: value is " + value_str);
- }
- return std::chrono::milliseconds(time_value_ms);
+ const core::TimePeriodValue time_property = getRequiredPropertyOrThrow<core::TimePeriodValue>(context, property_name);
+ return time_property.getMilliseconds();
}
std::optional<uint64_t> getOptionalUintProperty(const core::ProcessContext& context, const std::string& property_name) {
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 941e508..6dcc798 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -96,13 +96,13 @@ bool LogTestController::contains(const std::ostringstream& stream, const std::st
if (ending.length() == 0) {
return false;
}
- auto start = std::chrono::system_clock::now();
+ auto start = std::chrono::steady_clock::now();
bool found = false;
bool timed_out = false;
do {
std::string str = stream.str();
found = (str.find(ending) != std::string::npos);
- auto now = std::chrono::system_clock::now();
+ auto now = std::chrono::steady_clock::now();
timed_out = (now - start > timeout);
if (!found && !timed_out) {
std::this_thread::sleep_for(sleep_interval);
@@ -117,7 +117,7 @@ std::optional<std::smatch> LogTestController::matchesRegex(const std::string& re
if (regex_str.length() == 0) {
return std::nullopt;
}
- auto start = std::chrono::system_clock::now();
+ auto start = std::chrono::steady_clock::now();
bool found = false;
bool timed_out = false;
std::regex matcher_regex(regex_str);
@@ -125,7 +125,7 @@ std::optional<std::smatch> LogTestController::matchesRegex(const std::string& re
do {
std::string str = log_output.str();
found = std::regex_search(str, match, matcher_regex);
- auto now = std::chrono::system_clock::now();
+ auto now = std::chrono::steady_clock::now();
timed_out = (now - start > timeout);
if (!found && !timed_out) {
std::this_thread::sleep_for(sleep_interval);
diff --git a/libminifi/test/azure-tests/DeleteAzureDataLakeStorageTests.cpp b/libminifi/test/azure-tests/DeleteAzureDataLakeStorageTests.cpp
index ea485f5..7946adb 100644
--- a/libminifi/test/azure-tests/DeleteAzureDataLakeStorageTests.cpp
+++ b/libminifi/test/azure-tests/DeleteAzureDataLakeStorageTests.cpp
@@ -22,7 +22,7 @@
namespace {
-using namespace std::chrono_literals;
+using namespace std::literals::chrono_literals;
using DeleteAzureDataLakeStorageTestsFixture = AzureDataLakeStorageTestsFixture<minifi::azure::processors::DeleteAzureDataLakeStorage>;
diff --git a/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp b/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp
index 39463c4..3f18290 100644
--- a/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp
+++ b/libminifi/test/azure-tests/PutAzureDataLakeStorageTests.cpp
@@ -22,7 +22,7 @@
namespace {
-using namespace std::chrono_literals;
+using namespace std::literals::chrono_literals;
using PutAzureDataLakeStorageTestsFixture = AzureDataLakeStorageTestsFixture<minifi::azure::processors::PutAzureDataLakeStorage>;
diff --git a/libminifi/test/flow-tests/FlowControllerTests.cpp b/libminifi/test/flow-tests/FlowControllerTests.cpp
index bcaef04..f222897 100644
--- a/libminifi/test/flow-tests/FlowControllerTests.cpp
+++ b/libminifi/test/flow-tests/FlowControllerTests.cpp
@@ -35,6 +35,8 @@
#include "CustomProcessors.h"
#include "TestControllerWithFlow.h"
+using namespace std::literals::chrono_literals;
+
const char* yamlConfig =
R"(
Flow Controller:
@@ -109,7 +111,7 @@ TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") {
auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessorByName("TestProcessor"));
// prevent execution of the consumer processor
- sinkProc->yield(10000);
+ sinkProc->yield(10s);
std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
diff --git a/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp
index bd9f09f..89dbe2b 100644
--- a/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBProvenanceRepositoryTests.cpp
@@ -27,7 +27,7 @@
#define TEST_PROVENANCE_STORAGE_SIZE (1024*100) // 100 KB
#define TEST_MAX_PROVENANCE_STORAGE_SIZE (100*1024*1024) // 100 MB
-#define TEST_PROVENANCE_ENTRY_LIFE_TIME (1000) // 1 sec
+using namespace std::literals::chrono_literals;
void generateData(std::vector<char>& data) {
std::random_device rd;
@@ -50,7 +50,7 @@ void verifyMaxKeyCount(const minifi::provenance::ProvenanceRepository& repo, uin
uint64_t k = keyCount;
for (int i = 0; i < 50; ++i) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ std::this_thread::sleep_for(100ms);
k = std::min(k, repo.getKeyCount());
if (k < keyCount) {
break;
@@ -66,8 +66,7 @@ TEST_CASE("Test size limit", "[sizeLimitTest]") {
REQUIRE(!temp_dir.empty());
// 60 sec, 100 KB - going to exceed the size limit
- minifi::provenance::ProvenanceRepository provdb("TestProvRepo", temp_dir,
- MAX_PROVENANCE_ENTRY_LIFE_TIME, TEST_PROVENANCE_STORAGE_SIZE, 1000);
+ minifi::provenance::ProvenanceRepository provdb("TestProvRepo", temp_dir, 1min, TEST_PROVENANCE_STORAGE_SIZE, 1s);
auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, temp_dir);
@@ -87,8 +86,7 @@ TEST_CASE("Test time limit", "[timeLimitTest]") {
REQUIRE(!temp_dir.empty());
// 1 sec, 100 MB - going to exceed TTL
- minifi::provenance::ProvenanceRepository provdb("TestProvRepo", temp_dir,
- TEST_PROVENANCE_ENTRY_LIFE_TIME, TEST_MAX_PROVENANCE_STORAGE_SIZE, 1000);
+ minifi::provenance::ProvenanceRepository provdb("TestProvRepo", temp_dir, 1s, TEST_MAX_PROVENANCE_STORAGE_SIZE, 1s);
auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, temp_dir);
@@ -109,7 +107,7 @@ TEST_CASE("Test time limit", "[timeLimitTest]") {
* When the 2nd 50 MB is written the records of the 1st serialization are dropped -> around 160 of them
* That's why the final check verifies keyCount to be below 400
*/
- std::this_thread::sleep_for(std::chrono::milliseconds(2000));
+ std::this_thread::sleep_for(2s);
provisionRepo(provdb, keyCount /2, 102400);
diff --git a/libminifi/test/rocksdb-tests/ProvenanceTests.cpp b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
index 91d8df9..85a49e9 100644
--- a/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
+++ b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
@@ -31,6 +31,7 @@
#include "../TestBase.h"
namespace provenance = minifi::provenance;
+using namespace std::literals::chrono_literals;
TEST_CASE("Test Provenance record create", "[Testprovenance::ProvenanceEventRecord]") {
provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "blah", "blahblah");
@@ -46,7 +47,7 @@ TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEv
std::string smileyface = ":)";
record1.setDetails(smileyface);
- uint64_t sample = 65555;
+ auto sample = 65555ms;
std::shared_ptr<core::Repository> testRepository = std::make_shared<TestRepository>();
record1.setEventDuration(sample);
@@ -71,7 +72,7 @@ TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") {
record1.addChildFlowFile(ffr1);
- uint64_t sample = 65555;
+ auto sample = 65555ms;
std::shared_ptr<core::Repository> testRepository = std::make_shared<TestRepository>();
record1.setEventDuration(sample);
@@ -95,7 +96,7 @@ TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::Pro
std::string smileyface = ":)";
record1.setDetails(smileyface);
- uint64_t sample = 65555;
+ auto sample = 65555ms;
std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileProvenanceRepository>();
testRepository->initialize(0);
@@ -122,7 +123,7 @@ TEST_CASE("Test Flowfile record added to provenance using Volatile Repo", "[Test
record1.addChildFlowFile(ffr1);
- uint64_t sample = 65555;
+ auto sample = 65555ms;
std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileProvenanceRepository>();
testRepository->initialize(0);
record1.setEventDuration(sample);
@@ -147,7 +148,7 @@ TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::Provena
std::string smileyface = ":)";
record1.setDetails(smileyface);
- uint64_t sample = 65555;
+ auto sample = 65555ms;
std::shared_ptr<core::Repository> testRepository = std::make_shared<core::Repository>();
testRepository->initialize(0);
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index c6d75c4..c57b4d9 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -34,6 +34,8 @@
#include "utils/gsl.h"
#include "utils/IntegrationTestUtils.h"
+using namespace std::literals::chrono_literals;
+
namespace {
#ifdef WIN32
@@ -56,7 +58,7 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
TestController testController;
auto dir = testController.createTempDirectory();
- std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir, 0, 0, 1);
+ std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir, 0ms, 0, 1ms);
repository->initialize(std::make_shared<minifi::Configure>());
@@ -78,7 +80,7 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") {
LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
TestController testController;
auto dir = testController.createTempDirectory();
- std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir, 0, 0, 1);
+ std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir, 0ms, 0, 1ms);
repository->initialize(std::make_shared<minifi::Configure>());
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
@@ -101,7 +103,7 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") {
LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
TestController testController;
auto dir = testController.createTempDirectory();
- std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir, 0, 0, 1);
+ std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir, 0ms, 0, 1ms);
repository->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
@@ -151,7 +153,7 @@ TEST_CASE("Test Delete Content ", "[TestFFR4]") {
auto dir = testController.createTempDirectory();
- std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir, 0, 0, 1);
+ std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir, 0ms, 0, 1ms);
std::fstream file;
std::stringstream ss;
@@ -205,7 +207,7 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
auto dir = testController.createTempDirectory();
- std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir, 0, 0, 1);
+ std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir, 0ms, 0, 1ms);
std::fstream file;
std::stringstream ss;
@@ -349,8 +351,7 @@ TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
public:
explicit TestFlowFileRepository(const std::string& name)
: core::SerializableComponent(name),
- FlowFileRepository(name, REPOTEST_FLOWFILE_CHECKPOINT_DIR, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
- MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+ FlowFileRepository(name, REPOTEST_FLOWFILE_CHECKPOINT_DIR, FLOWFILE_REPOSITORY_DIRECTORY, 10min, MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1ms) {}
void flush() override {
FlowFileRepository::flush();
if (onFlush_) {
diff --git a/libminifi/test/unit/ConnectionTests.cpp b/libminifi/test/unit/ConnectionTests.cpp
index 6921800..ffeab28 100644
--- a/libminifi/test/unit/ConnectionTests.cpp
+++ b/libminifi/test/unit/ConnectionTests.cpp
@@ -34,14 +34,14 @@ TEST_CASE("Connection::poll() works correctly", "[poll]") {
SECTION("when called on an empty Connection, poll() returns nullptr") {
SECTION("without expiration duration") {}
- SECTION("with expiration duration") { connection->setFlowExpirationDuration(1000); }
+ SECTION("with expiration duration") { connection->setFlowExpirationDuration(1s); }
REQUIRE(nullptr == connection->poll(expired_flow_files));
}
SECTION("when called on a connection with a single flow file, poll() returns the flow file") {
SECTION("without expiration duration") {}
- SECTION("with expiration duration") { connection->setFlowExpirationDuration(1000); }
+ SECTION("with expiration duration") { connection->setFlowExpirationDuration(1s); }
const auto flow_file = std::make_shared<core::FlowFile>();
connection->put(flow_file);
@@ -51,7 +51,7 @@ TEST_CASE("Connection::poll() works correctly", "[poll]") {
SECTION("when called on a connection with a single penalized flow file, poll() returns nullptr") {
SECTION("without expiration duration") {}
- SECTION("with expiration duration") { connection->setFlowExpirationDuration(1000); }
+ SECTION("with expiration duration") { connection->setFlowExpirationDuration(1s); }
const auto flow_file = std::make_shared<core::FlowFile>();
flow_file->penalize(std::chrono::seconds{10});
@@ -61,7 +61,7 @@ TEST_CASE("Connection::poll() works correctly", "[poll]") {
SECTION("when called on a connection with a single expired flow file, poll() returns nullptr and returns the expired flow file in the out parameter") {
const auto flow_file = std::make_shared<core::FlowFile>();
- connection->setFlowExpirationDuration(1); // 1 millisecond
+ connection->setFlowExpirationDuration(1ms);
connection->put(flow_file);
std::this_thread::sleep_for(std::chrono::milliseconds{2});
REQUIRE(nullptr == connection->poll(expired_flow_files));
@@ -70,7 +70,7 @@ TEST_CASE("Connection::poll() works correctly", "[poll]") {
SECTION("when there is a non-penalized flow file followed by a penalized flow file, poll() returns the non-penalized flow file") {
SECTION("without expiration duration") {}
- SECTION("with expiration duration") { connection->setFlowExpirationDuration(1000); }
+ SECTION("with expiration duration") { connection->setFlowExpirationDuration(1s); }
const auto penalized_flow_file = std::make_shared<core::FlowFile>();
penalized_flow_file->penalize(std::chrono::seconds{10});
diff --git a/libminifi/test/unit/CpuUsageTest.cpp b/libminifi/test/unit/CpuUsageTest.cpp
index c8ac357..9203416 100644
--- a/libminifi/test/unit/CpuUsageTest.cpp
+++ b/libminifi/test/unit/CpuUsageTest.cpp
@@ -23,39 +23,40 @@
#include "utils/ProcessCpuUsageTracker.h"
#include "../TestBase.h"
-void busySleep(int duration_ms, std::chrono::milliseconds& start_ms, std::chrono::milliseconds& end_ms, const std::chrono::system_clock::time_point& origin) {
- start_ms = std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now() - origin);
- end_ms = std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now() - origin);
- while (end_ms-start_ms < std::chrono::milliseconds(duration_ms)) {
- end_ms = std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now() - origin);
+using namespace std::literals::chrono_literals;
+using steady_clock = std::chrono::steady_clock;
+using milliseconds = std::chrono::milliseconds;
+
+steady_clock::duration busySleep(const milliseconds duration) {
+ auto start_time = steady_clock::now();
+ while (steady_clock::now()-start_time < duration) {
+ // noop
}
+ return steady_clock::now() - start_time;
}
-void idleSleep(int duration_ms, std::chrono::milliseconds& start_ms, std::chrono::milliseconds& end_ms, const std::chrono::system_clock::time_point& origin) {
- start_ms = std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now() - origin);
- std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
- end_ms = std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now() - origin);
+steady_clock::duration idleSleep(const milliseconds duration) {
+ auto start_time = steady_clock::now();
+ std::this_thread::sleep_for(duration);
+ return steady_clock::now() - start_time;
}
-void printCpuUtilization(const std::string& target, const std::string& sleep_type, uint64_t start, uint64_t end, double utilizationPercent) {
- std::cout << target << " CPU Utilization during "<< sleep_type << " between " << start << "ms and " << end << "ms : " << utilizationPercent << std::endl;
+void printCpuUtilization(const std::string& target, const std::string& sleep_type, const steady_clock::duration& sleep_duration, double utilizationPercent) {
+ std::cout << target << " CPU Utilization during "<< sleep_type << " lasting " << duration_cast<milliseconds>(sleep_duration).count() << "ms : " << utilizationPercent << std::endl;
}
TEST_CASE("Test System CPU Utilization", "[testcpuusage]") {
constexpr int number_of_rounds = 3;
- constexpr int sleep_duration_ms = 1000;
+ constexpr milliseconds sleep_duration = 1s;
constexpr bool cout_enabled = true;
org::apache::nifi::minifi::utils::SystemCpuUsageTracker hostTracker;
org::apache::nifi::minifi::utils::ProcessCpuUsageTracker processTracker;
auto vCores = (std::max)(uint32_t{1}, std::thread::hardware_concurrency());
- auto test_start = std::chrono::system_clock::now();
for (int i = 0; i < number_of_rounds; ++i) {
{
- std::chrono::milliseconds idle_sleep_start;
- std::chrono::milliseconds idle_sleep_end;
- idleSleep(sleep_duration_ms, idle_sleep_start, idle_sleep_end, test_start);
+ auto idle_sleep_duration = idleSleep(sleep_duration);
double system_cpu_usage_during_idle_sleep = hostTracker.getCpuUsageAndRestartCollection();
double process_cpu_usage_during_idle_sleep = processTracker.getCpuUsageAndRestartCollection();
@@ -64,15 +65,13 @@ TEST_CASE("Test System CPU Utilization", "[testcpuusage]") {
REQUIRE(system_cpu_usage_during_idle_sleep <= 1);
REQUIRE(process_cpu_usage_during_idle_sleep < 0.1);
if (cout_enabled) {
- printCpuUtilization("System", "idle sleep", idle_sleep_start.count(), idle_sleep_end.count(), system_cpu_usage_during_idle_sleep);
- printCpuUtilization("Process", "idle sleep", idle_sleep_start.count(), idle_sleep_end.count(), process_cpu_usage_during_idle_sleep);
+ printCpuUtilization("System", "idle sleep", idle_sleep_duration, system_cpu_usage_during_idle_sleep);
+ printCpuUtilization("Process", "idle sleep", idle_sleep_duration, process_cpu_usage_during_idle_sleep);
std::cout << std::endl;
}
}
{
- std::chrono::milliseconds busy_sleep_start;
- std::chrono::milliseconds busy_sleep_end;
- busySleep(sleep_duration_ms, busy_sleep_start, busy_sleep_end, test_start);
+ auto busy_sleep_duration = busySleep(sleep_duration);
double system_cpu_usage_during_busy_sleep = hostTracker.getCpuUsageAndRestartCollection();
double process_cpu_usage_during_busy_sleep = processTracker.getCpuUsageAndRestartCollection();
@@ -81,8 +80,8 @@ TEST_CASE("Test System CPU Utilization", "[testcpuusage]") {
REQUIRE(process_cpu_usage_during_busy_sleep >= 0);
REQUIRE(process_cpu_usage_during_busy_sleep <= 1);
if (cout_enabled) {
- printCpuUtilization("System", "busy sleep", busy_sleep_start.count(), busy_sleep_end.count(), system_cpu_usage_during_busy_sleep);
- printCpuUtilization("Process", "busy sleep", busy_sleep_start.count(), busy_sleep_end.count(), process_cpu_usage_during_busy_sleep);
+ printCpuUtilization("System", "busy sleep", busy_sleep_duration, system_cpu_usage_during_busy_sleep);
+ printCpuUtilization("Process", "busy sleep", busy_sleep_duration, process_cpu_usage_during_busy_sleep);
std::cout << std::endl;
}
}
diff --git a/libminifi/test/unit/FileUtilsTests.cpp b/libminifi/test/unit/FileUtilsTests.cpp
index b615edc..cf02a23 100644
--- a/libminifi/test/unit/FileUtilsTests.cpp
+++ b/libminifi/test/unit/FileUtilsTests.cpp
@@ -200,7 +200,8 @@ TEST_CASE("TestFileUtils::getFullPath", "[TestGetFullPath]") {
TEST_CASE("FileUtils::last_write_time and last_write_time_point work", "[last_write_time][last_write_time_point]") {
using namespace std::chrono;
- uint64_t time_before_write = utils::timeutils::getTimeMillis() / 1000;
+ // TODO(MINIFICPP-1636) this should use std::chrono::file_clock
+ uint64_t time_before_write = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
time_point<system_clock, seconds> time_point_before_write = time_point_cast<seconds>(system_clock::now());
TestController testController;
@@ -216,7 +217,7 @@ TEST_CASE("FileUtils::last_write_time and last_write_time_point work", "[last_wr
test_file_stream << "foo\n";
test_file_stream.flush();
- uint64_t time_after_first_write = utils::timeutils::getTimeMillis() / 1000;
+ uint64_t time_after_first_write = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
time_point<system_clock, seconds> time_point_after_first_write = time_point_cast<seconds>(system_clock::now());
uint64_t first_mtime = FileUtils::last_write_time(test_file);
@@ -231,7 +232,7 @@ TEST_CASE("FileUtils::last_write_time and last_write_time_point work", "[last_wr
test_file_stream << "bar\n";
test_file_stream.flush();
- uint64_t time_after_second_write = utils::timeutils::getTimeMillis() / 1000;
+ uint64_t time_after_second_write = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
time_point<system_clock, seconds> time_point_after_second_write = time_point_cast<seconds>(system_clock::now());
uint64_t second_mtime = FileUtils::last_write_time(test_file);
diff --git a/libminifi/test/unit/PropertyTests.cpp b/libminifi/test/unit/PropertyTests.cpp
index 72409b0..ef65004 100644
--- a/libminifi/test/unit/PropertyTests.cpp
+++ b/libminifi/test/unit/PropertyTests.cpp
@@ -23,78 +23,10 @@
#include "utils/StringUtils.h"
#include "../TestBase.h"
namespace {
-enum class ParsingStatus { ParsingFail , ParsingSuccessful , ValuesMatch };
enum class ConversionTestTarget { MS, NS };
namespace core = minifi::core;
-ParsingStatus checkTimeValue(const std::string &input, int64_t t1, core::TimeUnit t2) {
- int64_t TimeVal = 0;
- core::TimeUnit unit;
- bool parsing_succeeded = org::apache::nifi::minifi::core::Property::StringToTime(input, TimeVal, unit);
- if (parsing_succeeded) {
- if (TimeVal == t1 && unit == t2) {
- return ParsingStatus::ValuesMatch;
- } else {
- return ParsingStatus::ParsingSuccessful;
- }
- } else {
- return ParsingStatus::ParsingFail;
- }
-}
-
-bool conversionTest(uint64_t number, core::TimeUnit unit, uint64_t check, ConversionTestTarget conversionUnit) {
- uint64_t out = 0;
- bool returnStatus = false;
- if (conversionUnit == ConversionTestTarget::NS) {
- returnStatus = org::apache::nifi::minifi::core::Property::ConvertTimeUnitToNS(number, unit, out);
- } else if (conversionUnit == ConversionTestTarget::MS) {
- returnStatus = org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(number, unit, out);
- }
- return returnStatus && out == check;
-}
-
-TEST_CASE("Test Time Conversion", "[testConversion]") {
- uint64_t out;
- REQUIRE(true == conversionTest(2000000, core::TimeUnit::NANOSECOND, 2, ConversionTestTarget::MS));
- REQUIRE(true == conversionTest(5000, core::TimeUnit::MICROSECOND, 5, ConversionTestTarget::MS));
- REQUIRE(true == conversionTest(3, core::TimeUnit::MILLISECOND, 3, ConversionTestTarget::MS));
- REQUIRE(true == conversionTest(5, core::TimeUnit::SECOND, 5000, ConversionTestTarget::MS));
- REQUIRE(true == conversionTest(2, core::TimeUnit::MINUTE, 120000, ConversionTestTarget::MS));
- REQUIRE(true == conversionTest(1, core::TimeUnit::HOUR, 3600000, ConversionTestTarget::MS));
- REQUIRE(true == conversionTest(1, core::TimeUnit::DAY, 86400000, ConversionTestTarget::MS));
-
- REQUIRE(true == conversionTest(5000, core::TimeUnit::NANOSECOND, 5000, ConversionTestTarget::NS));
- REQUIRE(true == conversionTest(2, core::TimeUnit::MICROSECOND, 2000, ConversionTestTarget::NS));
- REQUIRE(true == conversionTest(3, core::TimeUnit::MILLISECOND, 3000000, ConversionTestTarget::NS));
- REQUIRE(true == conversionTest(7, core::TimeUnit::SECOND, 7000000000, ConversionTestTarget::NS));
- REQUIRE(true == conversionTest(1, core::TimeUnit::MINUTE, 60000000000, ConversionTestTarget::NS));
- REQUIRE(true == conversionTest(1, core::TimeUnit::HOUR, 3600000000000, ConversionTestTarget::NS));
- REQUIRE(true == conversionTest(1, core::TimeUnit::DAY, 86400000000000, ConversionTestTarget::NS));
-
- REQUIRE(false == org::apache::nifi::minifi::core::Property::ConvertTimeUnitToNS(23, static_cast<core::TimeUnit>(-1), out));
- REQUIRE(false == org::apache::nifi::minifi::core::Property::ConvertTimeUnitToMS(23, static_cast<core::TimeUnit>(-1), out));
-}
-
-TEST_CASE("Test Is it Time", "[testTime]") {
- REQUIRE(ParsingStatus::ValuesMatch == checkTimeValue("1 SEC", 1, core::TimeUnit::SECOND));
- REQUIRE(ParsingStatus::ValuesMatch == checkTimeValue("1d", 1, core::TimeUnit::DAY));
- REQUIRE(ParsingStatus::ValuesMatch == checkTimeValue("10 days", 10, core::TimeUnit::DAY));
- REQUIRE(ParsingStatus::ValuesMatch == checkTimeValue("100ms", 100, core::TimeUnit::MILLISECOND));
- REQUIRE(ParsingStatus::ValuesMatch == checkTimeValue("20 us", 20, core::TimeUnit::MICROSECOND));
- REQUIRE(ParsingStatus::ValuesMatch == checkTimeValue("1ns", 1, core::TimeUnit::NANOSECOND));
- REQUIRE(ParsingStatus::ValuesMatch == checkTimeValue("1min", 1, core::TimeUnit::MINUTE));
- REQUIRE(ParsingStatus::ValuesMatch == checkTimeValue("1 hour", 1, core::TimeUnit::HOUR));
-
- REQUIRE(ParsingStatus::ParsingSuccessful == checkTimeValue("100 SEC", 100, core::TimeUnit::MICROSECOND));
- REQUIRE(ParsingStatus::ParsingSuccessful == checkTimeValue("10 ms", 1, core::TimeUnit::HOUR));
- REQUIRE(ParsingStatus::ParsingSuccessful == checkTimeValue("100us", 100, core::TimeUnit::HOUR));
- REQUIRE(ParsingStatus::ParsingSuccessful == checkTimeValue("100 ns", 100, core::TimeUnit::MILLISECOND));
- REQUIRE(ParsingStatus::ParsingSuccessful == checkTimeValue("1 minute", 10, core::TimeUnit::MINUTE));
-
- REQUIRE(ParsingStatus::ParsingFail == checkTimeValue("5 apples", 1, core::TimeUnit::HOUR));
- REQUIRE(ParsingStatus::ParsingFail == checkTimeValue("1 year", 1, core::TimeUnit::DAY));
-}
TEST_CASE("Test Trimmer Right", "[testTrims]") {
std::string test = "a quick brown fox jumped over the road\t\n";
diff --git a/libminifi/test/unit/PropertyValidationTests.cpp b/libminifi/test/unit/PropertyValidationTests.cpp
index 4963cd5..f67b33c 100644
--- a/libminifi/test/unit/PropertyValidationTests.cpp
+++ b/libminifi/test/unit/PropertyValidationTests.cpp
@@ -232,6 +232,33 @@ TEST_CASE("Correctly Typed Property With Invalid Validation") {
REQUIRE(callbackCount == 1);
}
+TEST_CASE("TimePeriodValue Property") {
+ using namespace std::literals::chrono_literals;
+ auto prop = PropertyBuilder::createProperty("prop")
+ ->withDefaultValue<TimePeriodValue>("10 minutes")
+ ->build();
+ TestConfigurableComponent component;
+ component.setSupportedProperties({prop});
+ TimePeriodValue time_period_value;
+ REQUIRE(component.getProperty(prop.getName(), time_period_value));
+ CHECK(time_period_value.getMilliseconds() == 10min);
+ REQUIRE_THROWS_AS(component.setProperty(prop.getName(), "20"), ParseException);
+}
+
+TEST_CASE("TimePeriodValue Property without validator") {
+ using namespace std::literals::chrono_literals;
+ auto prop = PropertyBuilder::createProperty("prop")
+ ->withDefaultValue("60 minutes")
+ ->build();
+ TestConfigurableComponent component;
+ component.setSupportedProperties({prop});
+ TimePeriodValue time_period_value;
+ REQUIRE(component.getProperty(prop.getName(), time_period_value));
+ CHECK(time_period_value.getMilliseconds() == 1h);
+ REQUIRE_NOTHROW(component.setProperty(prop.getName(), "20"));
+ REQUIRE_THROWS_AS(component.getProperty(prop.getName(), time_period_value), ValueException);
+}
+
} /* namespace core */
} /* namespace minifi */
} /* namespace nifi */
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index 2fc63e8..1ac2f7c 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -42,6 +42,8 @@
#pragma GCC diagnostic ignored "-Woverloaded-virtual"
#endif
+using namespace std::literals::chrono_literals;
+
/**
* Test repository
*/
@@ -49,7 +51,7 @@ class TestRepository : public org::apache::nifi::minifi::core::Repository {
public:
TestRepository()
: org::apache::nifi::minifi::core::SerializableComponent("repo_name"),
- Repository("repo_name", "./dir", 1000, 100, 0) {
+ Repository("repo_name", "./dir", 1s, 100, 0ms) {
}
bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &) override {
@@ -174,7 +176,7 @@ class TestFlowRepository : public org::apache::nifi::minifi::core::Repository {
public:
TestFlowRepository()
: org::apache::nifi::minifi::core::SerializableComponent("ff"),
- org::apache::nifi::minifi::core::Repository("ff", "./dir", 1000, 100, 0) {
+ org::apache::nifi::minifi::core::Repository("ff", "./dir", 1s, 100, 0ms) {
}
bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &) override {
diff --git a/libminifi/test/unit/TimeUtilTests.cpp b/libminifi/test/unit/TimeUtilTests.cpp
index 7c5974d..2b764a2 100644
--- a/libminifi/test/unit/TimeUtilTests.cpp
+++ b/libminifi/test/unit/TimeUtilTests.cpp
@@ -18,6 +18,8 @@
#include "utils/TimeUtil.h"
#include "../TestBase.h"
+using namespace std::literals::chrono_literals;
+
namespace {
constexpr int ONE_HOUR = 60 * 60;
constexpr int ONE_DAY = 24 * ONE_HOUR;
@@ -92,3 +94,58 @@ TEST_CASE("Test time conversion", "[testtimeconversion]") {
using org::apache::nifi::minifi::utils::timeutils::getTimeStr;
REQUIRE("2017-02-16 20:14:56.196" == getTimeStr(1487276096196, true));
}
+
+TEST_CASE("Test system_clock epoch", "[systemclockepoch]") {
+ using namespace std::chrono;
+ time_point<system_clock> epoch;
+ time_point<system_clock> unix_epoch_plus_3e9_sec = sys_days(January / 24 / 2065) + 5h + 20min;
+ REQUIRE(epoch.time_since_epoch() == 0s);
+ REQUIRE(unix_epoch_plus_3e9_sec.time_since_epoch() == 3000000000s);
+}
+
+TEST_CASE("Test clock resolutions", "[clockresolutiontests]") {
+ using namespace std::chrono;
+ CHECK(std::is_constructible<system_clock::duration, std::chrono::microseconds>::value); // The resolution of the system_clock is at least microseconds
+ CHECK(std::is_constructible<steady_clock::duration, std::chrono::microseconds>::value); // The resolution of the system_clock is at least microseconds
+ CHECK(std::is_constructible<high_resolution_clock::duration, std::chrono::nanoseconds>::value); // The resolution of the high_resolution_clock is at least nanoseconds
+}
+
+TEST_CASE("Test string to duration conversion", "[timedurationtests]") {
+ using org::apache::nifi::minifi::utils::timeutils::StringToDuration;
+ auto one_hour = StringToDuration<std::chrono::milliseconds>("1h");
+ REQUIRE(one_hour);
+ CHECK(one_hour.value() == 1h);
+ CHECK(one_hour.value() == 3600s);
+
+ REQUIRE(StringToDuration<std::chrono::milliseconds>("1 hour"));
+ REQUIRE(StringToDuration<std::chrono::seconds>("102 hours") == 102h);
+ REQUIRE(StringToDuration<std::chrono::days>("102 hours") == std::chrono::days(4));
+ REQUIRE(StringToDuration<std::chrono::milliseconds>("5 ns") == 0ms);
+
+ REQUIRE(StringToDuration<std::chrono::seconds>("1d") == std::chrono::days(1));
+ REQUIRE(StringToDuration<std::chrono::seconds>("10 days") == std::chrono::days(10));
+ REQUIRE(StringToDuration<std::chrono::seconds>("100ms") == 0ms);
+ REQUIRE(StringToDuration<std::chrono::seconds>("20 us") == 0s);
+ REQUIRE(StringToDuration<std::chrono::seconds>("1ns") == 0ns);
+ REQUIRE(StringToDuration<std::chrono::seconds>("1min") == 1min);
+ REQUIRE(StringToDuration<std::chrono::seconds>("1 hour") == 1h);
+ REQUIRE(StringToDuration<std::chrono::seconds>("100 SEC") == 100s);
+ REQUIRE(StringToDuration<std::chrono::seconds>("10 ms") == 0ms);
+ REQUIRE(StringToDuration<std::chrono::seconds>("100 ns") == 0ns);
+ REQUIRE(StringToDuration<std::chrono::seconds>("1 minute") == 1min);
+
+ REQUIRE(StringToDuration<std::chrono::nanoseconds>("1d") == std::chrono::days(1));
+ REQUIRE(StringToDuration<std::chrono::nanoseconds>("10 days") == std::chrono::days(10));
+ REQUIRE(StringToDuration<std::chrono::nanoseconds>("100ms") == 100ms);
+ REQUIRE(StringToDuration<std::chrono::nanoseconds>("20 us") == 20us);
+ REQUIRE(StringToDuration<std::chrono::nanoseconds>("1ns") == 1ns);
+ REQUIRE(StringToDuration<std::chrono::nanoseconds>("1min") == 1min);
+ REQUIRE(StringToDuration<std::chrono::nanoseconds>("1 hour") == 1h);
+ REQUIRE(StringToDuration<std::chrono::nanoseconds>("100 SEC") == 100s);
+ REQUIRE(StringToDuration<std::chrono::nanoseconds>("10 ms") == 10ms);
+ REQUIRE(StringToDuration<std::chrono::nanoseconds>("100 ns") == 100ns);
+ REQUIRE(StringToDuration<std::chrono::nanoseconds>("1 minute") == 1min);
+
+ REQUIRE_FALSE(StringToDuration<std::chrono::seconds>("5 apples") == 1s);
+ REQUIRE_FALSE(StringToDuration<std::chrono::seconds>("1 year") == 1s);
+}
diff --git a/libminifi/test/unit/tls/TLSStreamTests.cpp b/libminifi/test/unit/tls/TLSStreamTests.cpp
index 9fc5939..6b616ac 100644
--- a/libminifi/test/unit/tls/TLSStreamTests.cpp
+++ b/libminifi/test/unit/tls/TLSStreamTests.cpp
@@ -27,7 +27,7 @@
#include "../../SimpleSSLTestServer.h"
#include "../utils/IntegrationTestUtils.h"
-using namespace std::chrono_literals;
+using namespace std::literals::chrono_literals;
static std::shared_ptr<minifi::io::TLSContext> createContext(const std::filesystem::path& key_dir) {
auto configuration = std::make_shared<minifi::Configure>();
diff --git a/nanofi/include/sitetosite/CRawSocketProtocol.h b/nanofi/include/sitetosite/CRawSocketProtocol.h
index 27d144b..f97d4ed 100644
--- a/nanofi/include/sitetosite/CRawSocketProtocol.h
+++ b/nanofi/include/sitetosite/CRawSocketProtocol.h
@@ -156,7 +156,7 @@ static inline void setBatchDuration(struct CRawSiteToSiteClient *client, uint64_
client->_batch_duration = duration;
}
-static inline uint64_t getTimeOut(const struct CRawSiteToSiteClient *client) {
+static inline uint64_t getTimeout(const struct CRawSiteToSiteClient *client) {
return client->_timeout;
}