You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/03/03 12:23:32 UTC
[nifi-minifi-cpp] 01/01: MINIFICPP - 1110, 1111 - PublishKafka,
OPC processors should config and throw in onSchedule
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch MINIFICPP-1158
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit e4a1732ab2239ce917f255939bad20274a83323a
Author: Nghia Le <mi...@gmail.com>
AuthorDate: Wed Jan 15 13:39:18 2020 +0100
MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
throw in onSchedule
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #710
---
CMakeLists.txt | 2 +-
extensions/librdkafka/PublishKafka.cpp | 211 +++++++++------------
extensions/librdkafka/PublishKafka.h | 6 +
extensions/librdkafka/tests/CMakeLists.txt | 36 ++++
.../tests/PublishKafkaOnScheduleTests.cpp | 74 ++++++++
extensions/opc/include/opcbase.h | 2 -
extensions/opc/src/fetchopc.cpp | 25 +--
extensions/opc/src/opcbase.cpp | 28 +--
extensions/opc/src/putopc.cpp | 26 +--
.../processors/LogAttribute.cpp | 22 +--
.../standard-processors/processors/LogAttribute.h | 6 +-
.../standard-processors/tests/unit/GetTCPTests.cpp | 3 +
libminifi/include/c2/PayloadParser.h | 7 +
libminifi/include/core/ConfigurableComponent.h | 3 +-
libminifi/include/core/PropertyValidation.h | 31 +++
libminifi/include/core/PropertyValue.h | 12 +-
libminifi/include/core/state/Value.h | 136 +++++++++----
libminifi/src/c2/protocols/RESTProtocol.cpp | 4 +
libminifi/src/core/PropertyValidation.cpp | 1 +
libminifi/src/core/state/Value.cpp | 1 +
libminifi/test/integration/IntegrationBase.h | 11 ++
.../integration/OnScheduleErrorHandlingTests.cpp | 36 ++--
libminifi/test/resources/TestKafkaOnSchedule.yml | 46 +++++
23 files changed, 474 insertions(+), 255 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index c3850f8..a57e325 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -357,7 +357,7 @@ option(ENABLE_LIBRDKAFKA "Enables the librdkafka extension." OFF)
if (ENABLE_ALL OR ENABLE_LIBRDKAFKA)
include(BundledLibRdKafka)
use_bundled_librdkafka(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
- createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka" "${TEST_DIR}/kafka-tests")
+ createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka" "extensions/librdkafka/tests")
endif()
## Scripting extensions
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index 69fc1a6..f14dd6c 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -149,6 +149,46 @@ void PublishKafka::initialize() {
void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
interrupted_ = false;
+
+ // Try to get a KafkaConnection
+ std::string client_id, brokers;
+ if (!context->getProperty(ClientName.getName(), client_id)) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client Name property missing or invalid");
+ }
+ if (!context->getProperty(SeedBrokers.getName(), brokers)) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Known Brokers property missing or invalid");
+ }
+
+ // Get some properties not (only) used directly to set up librdkafka
+
+ // Batch Size
+ context->getProperty(BatchSize.getName(), batch_size_);
+ logger_->log_debug("PublishKafka: Batch Size [%lu]", batch_size_);
+
+ // Target Batch Payload Size
+ context->getProperty(TargetBatchPayloadSize.getName(), target_batch_payload_size_);
+ logger_->log_debug("PublishKafka: Target Batch Payload Size [%llu]", target_batch_payload_size_);
+
+ // Max Flow Segment Size
+ context->getProperty(MaxFlowSegSize.getName(), max_flow_seg_size_);
+ logger_->log_debug("PublishKafka: Max Flow Segment Size [%llu]", max_flow_seg_size_);
+
+ // Attributes to Send as Headers
+ std::string value;
+ if (context->getProperty(AttributeNameRegex.getName(), value) && !value.empty()) {
+ attributeNameRegex_ = utils::Regex(value);
+ logger_->log_debug("PublishKafka: AttributeNameRegex [%s]", value);
+ }
+
+ // Future Improvement: Get rid of key since we only need to store one connection with current design.
+ key_.brokers_ = brokers;
+ key_.client_id_ = client_id;
+
+ std::unique_ptr<KafkaLease> lease = connection_pool_.getOrCreateConnection(key_);
+ std::shared_ptr<KafkaConnection> conn = lease->getConn();
+ configureNewConnection(conn, context);
+
+ logger_->log_debug("Successfully configured PublishKafka");
}
void PublishKafka::notifyStop() {
@@ -181,11 +221,11 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
std::string valueConf;
std::array<char, 512U> errstr{};
rd_kafka_conf_res_t result;
+ const std::string PREFIX_ERROR_MSG = "PublishKafka: configure error result: ";
rd_kafka_conf_t* conf_ = rd_kafka_conf_new();
if (conf_ == nullptr) {
- logger_->log_error("Failed to create rd_kafka_conf_t object");
- return false;
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object");
}
utils::ScopeGuard confGuard([conf_](){
rd_kafka_conf_destroy(conf_);
@@ -194,25 +234,23 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
auto key = conn->getKey();
if (key->brokers_.empty()) {
- logger_->log_error("There are no brokers");
- return false;
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
}
result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
if (key->client_id_.empty()) {
- logger_->log_error("Client id is empty");
- return false;
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client id is empty");
}
result = rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
value = "";
@@ -220,8 +258,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
result = rd_kafka_conf_set(conf_, "debug", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: debug [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure debug error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
@@ -229,8 +267,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
result = rd_kafka_conf_set(conf_, "sasl.kerberos.service.name", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: sasl.kerberos.service.name [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
@@ -238,8 +276,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
result = rd_kafka_conf_set(conf_, "sasl.kerberos.principal", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: sasl.kerberos.principal [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
@@ -247,19 +285,17 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
result = rd_kafka_conf_set(conf_, "sasl.kerberos.keytab", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: sasl.kerberos.keytab [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
-
value = "";
- if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
- valueConf = std::to_string(valInt);
- result = rd_kafka_conf_set(conf_, "message.max.bytes", valueConf.c_str(), errstr.data(), errstr.size());
- logger_->log_debug("PublishKafka: message.max.bytes [%s]", valueConf);
+ if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty()) {
+ result = rd_kafka_conf_set(conf_, "message.max.bytes", value.c_str(), errstr.data(), errstr.size());
+ logger_->log_debug("PublishKafka: message.max.bytes [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
@@ -267,8 +303,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
result = rd_kafka_conf_set(conf_, "queue.buffering.max.messages", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: queue.buffering.max.messages [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
@@ -278,8 +314,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
result = rd_kafka_conf_set(conf_, "queue.buffering.max.kbytes", valueConf.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [%s]", valueConf);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
@@ -290,8 +326,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
result = rd_kafka_conf_set(conf_, "queue.buffering.max.ms", valueConf.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: queue.buffering.max.ms [%s]", valueConf);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure queue buffer error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
}
@@ -300,8 +336,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
result = rd_kafka_conf_set(conf_, "batch.num.messages", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: batch.num.messages [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure batch size error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
@@ -309,8 +345,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
result = rd_kafka_conf_set(conf_, "compression.codec", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: compression.codec [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure compression codec error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
@@ -319,16 +355,16 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
result = rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: security.protocol [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
value = "";
if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_, "ssl.ca.location", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: ssl.ca.location [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
@@ -336,8 +372,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
result = rd_kafka_conf_set(conf_, "ssl.certificate.location", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: ssl.certificate.location [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
@@ -345,8 +381,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
result = rd_kafka_conf_set(conf_, "ssl.key.location", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: ssl.key.location [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
@@ -354,13 +390,13 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
result = rd_kafka_conf_set(conf_, "ssl.key.password", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: ssl.key.password [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
} else {
- logger_->log_error("PublishKafka: unknown Security Protocol: %s", value);
- return false;
+ auto error_msg = utils::StringUtils::join_pack("PublishKafka: unknown Security Protocol: ", value);
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
@@ -374,8 +410,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
logger_->log_debug("PublishKafka: DynamicProperty: [%s] -> [%s]", prop_key, value);
result = rd_kafka_conf_set(conf_, prop_key.c_str(), value.c_str(), errstr.data(), errstr.size());
if (result != RD_KAFKA_CONF_OK) {
- logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
} else {
logger_->log_warn("PublishKafka Dynamic Property '%s' is empty and therefore will not be configured", prop_key);
@@ -391,8 +427,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
rd_kafka_t* producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr.data(), errstr.size());
if (producer == nullptr) {
- logger_->log_error("Failed to create Kafka producer %s", errstr.data());
- return false;
+ auto error_msg = utils::StringUtils::join_pack("Failed to create Kafka producer ", errstr.data());
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
// The producer took ownership of the configuration, we must not free it
@@ -497,24 +533,9 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
return;
}
- // Try to get a KafkaConnection
- std::string client_id, brokers;
- if (!context->getProperty(ClientName.getName(), client_id)) {
- logger_->log_error("Client Name property missing or invalid");
- context->yield();
- return;
- }
- if (!context->getProperty(SeedBrokers.getName(), brokers)) {
- logger_->log_error("Knowb Brokers property missing or invalid");
- context->yield();
- return;
- }
-
- KafkaConnectionKey key;
- key.brokers_ = brokers;
- key.client_id_ = client_id;
+ logger_->log_debug("PublishKafka onTrigger");
- std::unique_ptr<KafkaLease> lease = connection_pool_.getOrCreateConnection(key);
+ std::unique_ptr<KafkaLease> lease = connection_pool_.getOrCreateConnection(key_);
if (lease == nullptr) {
logger_->log_info("This connection is used by another thread.");
context->yield();
@@ -522,64 +543,18 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
}
std::shared_ptr<KafkaConnection> conn = lease->getConn();
- if (!conn->initialized()) {
- logger_->log_trace("Connection not initialized to %s, %s", client_id, brokers);
- if (!configureNewConnection(conn, context)) {
- logger_->log_error("Could not configure Kafka Connection");
- context->yield();
- return;
- }
- }
-
- // Get some properties not (only) used directly to set up librdkafka
- std::string value;
-
- // Batch Size
- uint32_t batch_size;
- value = "";
- if (context->getProperty(BatchSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, batch_size)) {
- logger_->log_debug("PublishKafka: Batch Size [%lu]", batch_size);
- } else {
- batch_size = 10;
- }
-
- // Target Batch Payload Size
- uint64_t target_batch_payload_size;
- value = "";
- if (context->getProperty(TargetBatchPayloadSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, target_batch_payload_size)) {
- logger_->log_debug("PublishKafka: Target Batch Payload Size [%llu]", target_batch_payload_size);
- } else {
- target_batch_payload_size = 512 * 1024U;
- }
-
- // Max Flow Segment Size
- uint64_t max_flow_seg_size;
- value = "";
- if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, max_flow_seg_size)) {
- logger_->log_debug("PublishKafka: Max Flow Segment Size [%llu]", max_flow_seg_size);
- } else {
- max_flow_seg_size = 0U;
- }
-
- // Attributes to Send as Headers
- utils::Regex attributeNameRegex;
- value = "";
- if (context->getProperty(AttributeNameRegex.getName(), value) && !value.empty()) {
- attributeNameRegex = utils::Regex(value);
- logger_->log_debug("PublishKafka: AttributeNameRegex [%s]", value);
- }
// Collect FlowFiles to process
uint64_t actual_bytes = 0U;
std::vector<std::shared_ptr<core::FlowFile>> flowFiles;
- for (uint32_t i = 0; i < batch_size; i++) {
+ for (uint32_t i = 0; i < batch_size_; i++) {
std::shared_ptr<core::FlowFile> flowFile = session->get();
if (flowFile == nullptr) {
break;
}
actual_bytes += flowFile->getSize();
flowFiles.emplace_back(std::move(flowFile));
- if (target_batch_payload_size != 0U && actual_bytes >= target_batch_payload_size) {
+ if (target_batch_payload_size_ != 0U && actual_bytes >= target_batch_payload_size_) {
break;
}
}
@@ -646,8 +621,8 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
bool failEmptyFlowFiles = true;
context->getProperty(FailEmptyFlowFiles.getName(), failEmptyFlowFiles);
- PublishKafka::ReadCallback callback(max_flow_seg_size, kafkaKey, thisTopic->getTopic(), conn->getConnection(), *flowFile,
- attributeNameRegex, messages, flow_file_index, failEmptyFlowFiles);
+ PublishKafka::ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn->getConnection(), *flowFile,
+ attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles);
session->read(flowFile, &callback);
if (!callback.called_) {
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 9ee3296..c6ed19e 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -353,6 +353,12 @@ class PublishKafka : public core::Processor {
std::shared_ptr<logging::Logger> logger_;
KafkaPool connection_pool_;
+ KafkaConnectionKey key_;
+
+ uint32_t batch_size_;
+ uint64_t target_batch_payload_size_;
+ uint64_t max_flow_seg_size_;
+ utils::Regex attributeNameRegex_;
std::atomic<bool> interrupted_;
std::mutex messages_mutex_;
diff --git a/extensions/librdkafka/tests/CMakeLists.txt b/extensions/librdkafka/tests/CMakeLists.txt
new file mode 100644
index 0000000..056c7f1
--- /dev/null
+++ b/extensions/librdkafka/tests/CMakeLists.txt
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB KAFKA_TESTS "*.cpp")
+
+SET(KAFKA_TEST_COUNT 0)
+
+FOREACH(testfile ${KAFKA_TESTS})
+ get_filename_component(testfilename "${testfile}" NAME_WE)
+ add_executable("${testfilename}" "${testfile}")
+ target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/librdkafka/")
+ target_wholearchive_library(${testfilename} minifi-rdkafka-extensions)
+ createTests("${testfilename}")
+ MATH(EXPR KAFKA_TEST_COUNT "${KAFKA_TEST_COUNT}+1")
+ # The line below handles integration test
+ add_test(NAME "${testfilename}" COMMAND "${testfilename}" "${TEST_RESOURCES}/TestKafkaOnSchedule.yml" "${TEST_RESOURCES}/")
+ target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+ENDFOREACH()
+
+message("-- Finished building ${KAFKA_TEST_COUNT} Kafka related test file(s)...")
diff --git a/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp b/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
new file mode 100644
index 0000000..8a835fa
--- /dev/null
+++ b/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include <cassert>
+#include "../../../libminifi/test/integration/IntegrationBase.h"
+#include "core/logging/Logger.h"
+#include "../../../libminifi/test/TestBase.h"
+#include "../PublishKafka.h"
+
+class PublishKafkaOnScheduleTests : public IntegrationBase {
+ public:
+ virtual void runAssertions() {
+ std::string logs = LogTestController::getInstance().log_output.str();
+
+ auto result = countPatInStr(logs, "value 1 is outside allowed range 1000..1000000000");
+ size_t last_pos = result.first;
+ int occurrences = result.second;
+
+ assert(occurrences > 1); // Verify retry of onSchedule and onUnSchedule calls
+
+ std::vector<std::string> must_appear_byorder_msgs = {"notifyStop called",
+ "Successfully configured PublishKafka",
+ "PublishKafka onTrigger"};
+
+ for (const auto &msg : must_appear_byorder_msgs) {
+ last_pos = logs.find(msg, last_pos);
+ assert(last_pos != std::string::npos);
+ }
+ }
+
+ virtual void testSetup() {
+ LogTestController::getInstance().setDebug<core::ProcessGroup>();
+ LogTestController::getInstance().setDebug<core::Processor>();
+ LogTestController::getInstance().setDebug<core::ProcessSession>();
+ LogTestController::getInstance().setDebug<minifi::processors::PublishKafka>();
+ }
+
+ virtual void waitToVerifyProcessor() {
+ std::this_thread::sleep_for(std::chrono::seconds(3));
+ flowController_->updatePropertyValue("kafka", minifi::processors::PublishKafka::MaxMessageSize.getName(), "1999");
+ std::this_thread::sleep_for(std::chrono::seconds(3));
+ }
+
+ virtual void cleanup() {}
+};
+
+int main(int argc, char **argv) {
+ std::string test_file_location, url;
+ if (argc > 1) {
+ test_file_location = argv[1];
+ }
+
+ PublishKafkaOnScheduleTests harness;
+
+ harness.run(test_file_location);
+
+ return 0;
+}
diff --git a/extensions/opc/include/opcbase.h b/extensions/opc/include/opcbase.h
index 2ad4b03..897f75c 100644
--- a/extensions/opc/include/opcbase.h
+++ b/extensions/opc/include/opcbase.h
@@ -71,8 +71,6 @@ class BaseOPCProcessor : public core::Processor {
std::vector<char> keyBuffer_;
std::vector<std::vector<char>> trustBuffers_;
- bool configOK_;
-
virtual std::set<core::Property> getSupportedProperties() const {return {OPCServerEndPoint, ApplicationURI, Username, Password, CertificatePath, KeyPath, TrustedPath};}
};
diff --git a/extensions/opc/src/fetchopc.cpp b/extensions/opc/src/fetchopc.cpp
index ce29fd7..866b57e 100644
--- a/extensions/opc/src/fetchopc.cpp
+++ b/extensions/opc/src/fetchopc.cpp
@@ -95,12 +95,6 @@ namespace processors {
BaseOPCProcessor::onSchedule(context, factory);
- if(!configOK_) {
- return;
- }
-
- configOK_ = false;
-
std::string value;
context->getProperty(NodeID.getName(), nodeID_);
context->getProperty(NodeIDType.getName(), value);
@@ -116,37 +110,30 @@ namespace processors {
idType_ = opc::OPCNodeIDType::Path;
} else {
// Where have our validators gone?
- logger_->log_error("%s is not a valid node ID type!", value.c_str());
+ auto error_msg = utils::StringUtils::join_pack(value, " is not a valid node ID type!");
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
if(idType_ == opc::OPCNodeIDType::Int) {
try {
int t = std::stoi(nodeID_);
} catch(...) {
- logger_->log_error("%s cannot be used as an int type node ID", nodeID_.c_str());
- return;
+ auto error_msg = utils::StringUtils::join_pack(nodeID_, " cannot be used as an int type node ID");
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
if(idType_ != opc::OPCNodeIDType::Path) {
if(!context->getProperty(NameSpaceIndex.getName(), nameSpaceIdx_)) {
- logger_->log_error("%s is mandatory in case %s is not Path", NameSpaceIndex.getName().c_str(), NodeIDType.getName().c_str());
- return;
+ auto error_msg = utils::StringUtils::join_pack(NameSpaceIndex.getName(), " is mandatory in case ", NodeIDType.getName(), " is not Path");
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
context->getProperty(Lazy.getName(), value);
lazy_mode_ = value == "On" ? true : false;
-
- configOK_ = true;
}
void FetchOPCProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session){
- if(!configOK_) {
- logger_->log_error("This processor was not configured properly, yielding. Please check for previous errors in the logs!");
- yield();
- return;
- }
-
logger_->log_trace("FetchOPCProcessor::onTrigger");
std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
diff --git a/extensions/opc/src/opcbase.cpp b/extensions/opc/src/opcbase.cpp
index 4fb42b3..e2fe67d 100644
--- a/extensions/opc/src/opcbase.cpp
+++ b/extensions/opc/src/opcbase.cpp
@@ -75,33 +75,27 @@ namespace processors {
username_.clear();
trustBuffers_.clear();
- configOK_ = false;
-
context->getProperty(OPCServerEndPoint.getName(), endPointURL_);
context->getProperty(ApplicationURI.getName(), applicationURI_);
if (context->getProperty(Username.getName(), username_) != context->getProperty(Password.getName(), password_)) {
- logger_->log_error("Both or neither of Username and Password should be provided!");
- return;
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Both or neither of Username and Password should be provided!");
}
auto certificatePathRes = context->getProperty(CertificatePath.getName(), certpath_);
auto keyPathRes = context->getProperty(KeyPath.getName(), keypath_);
auto trustedPathRes = context->getProperty(TrustedPath.getName(), trustpath_);
if (certificatePathRes != keyPathRes || keyPathRes != trustedPathRes) {
- logger_->log_error("All or none of Certificate path, Key path and Trusted server certificate path should be provided!");
- return;
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "All or none of Certificate path, Key path and Trusted server certificate path should be provided!");
}
if (!password_.empty() && (certpath_.empty() || keypath_.empty() || trustpath_.empty() || applicationURI_.empty())) {
- logger_->log_error("Certificate path, Key path, Trusted server certificate path and Application URI must be provided in case Password is provided!");
- return;
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Certificate path, Key path, Trusted server certificate path and Application URI must be provided in case Password is provided!");
}
if (!certpath_.empty()) {
if (applicationURI_.empty()) {
- logger_->log_error("Application URI must be provided if Certificate path is provided!");
- return;
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Application URI must be provided if Certificate path is provided!");
}
std::ifstream input_cert(certpath_, std::ios::binary);
@@ -120,21 +114,19 @@ namespace processors {
}
if (certBuffer_.empty()) {
- logger_->log_error("Failed to load cert from path: %s", certpath_);
- return;
+ auto error_msg = utils::StringUtils::join_pack("Failed to load cert from path: ", certpath_);
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
if (keyBuffer_.empty()) {
- logger_->log_error("Failed to load key from path: %s", keypath_);
- return;
+ auto error_msg = utils::StringUtils::join_pack("Failed to load key from path: ", keypath_);
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
if (trustBuffers_[0].empty()) {
- logger_->log_error("Failed to load trusted server certs from path: %s", trustpath_);
- return;
+ auto error_msg = utils::StringUtils::join_pack("Failed to load trusted server certs from path: ", trustpath_);
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
-
- configOK_ = true;
}
bool BaseOPCProcessor::reconnect() {
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index c5acf3f..acc1a63 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -116,13 +116,6 @@ namespace processors {
BaseOPCProcessor::onSchedule(context, factory);
- if(!configOK_) {
- return;
- }
-
- configOK_ = false;
-
- context->getProperty(OPCServerEndPoint.getName(), endPointURL_);
std::string value;
context->getProperty(ParentNodeID.getName(), nodeID_);
context->getProperty(ParentNodeIDType.getName(), value);
@@ -135,21 +128,22 @@ namespace processors {
idType_ = opc::OPCNodeIDType::Path;
} else {
// Where have our validators gone?
- logger_->log_error("%s is not a valid node ID type!", value.c_str());
+ auto error_msg = utils::StringUtils::join_pack(value, " is not a valid node ID type!");
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
if(idType_ == opc::OPCNodeIDType::Int) {
try {
int t = std::stoi(nodeID_);
} catch(...) {
- logger_->log_error("%s cannot be used as an int type node ID", nodeID_.c_str());
- return;
+ auto error_msg = utils::StringUtils::join_pack(nodeID_, " cannot be used as an int type node ID");
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
if(idType_ != opc::OPCNodeIDType::Path) {
if(!context->getProperty(ParentNameSpaceIndex.getName(), nameSpaceIdx_)) {
- logger_->log_error("%s is mandatory in case %s is not Path", ParentNameSpaceIndex.getName().c_str(), ParentNodeIDType.getName().c_str());
- return;
+ auto error_msg = utils::StringUtils::join_pack(ParentNameSpaceIndex.getName(), " is mandatory in case ", ParentNodeIDType.getName(), " is not Path");
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
@@ -157,17 +151,9 @@ namespace processors {
context->getProperty(ValueType.getName(), typestr);
nodeDataType_ = opc::StringToOPCDataTypeMap.at(typestr); // This throws, but allowed values are generated based on this map -> that's a really unexpected error
- configOK_ = true;
}
void PutOPCProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
- if (!configOK_) {
- logger_->log_error(
- "This processor was not configured properly, yielding. Please check for previous errors in the logs!");
- yield();
- return;
- }
-
logger_->log_trace("PutOPCProcessor::onTrigger");
std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
diff --git a/extensions/standard-processors/processors/LogAttribute.cpp b/extensions/standard-processors/processors/LogAttribute.cpp
index 458b6e5..0021877 100644
--- a/extensions/standard-processors/processors/LogAttribute.cpp
+++ b/extensions/standard-processors/processors/LogAttribute.cpp
@@ -88,23 +88,13 @@ void LogAttribute::initialize() {
}
void LogAttribute::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
- core::Property flowsToLog = FlowFilesToLog;
-
- if (getProperty(FlowFilesToLog.getName(), flowsToLog)) {
- // we are going this route since to avoid breaking backwards compatibility the get property function doesn't perform validation ( That's done
- // in configuration. In future releases we can add that exception handling there.
- if (!flowsToLog.getValue().validate("Validating FlowFilesToLog").valid())
- throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for flowfiles to log: " + flowsToLog.getValue().to_string());
- flowfiles_to_log_ = flowsToLog.getValue();
- }
+ context->getProperty(FlowFilesToLog.getName(), flowfiles_to_log_);
+ logger_->log_debug("FlowFiles To Log: %llu", flowfiles_to_log_);
- std::string value;
- if (context->getProperty(HexencodePayload.getName(), value)) {
- utils::StringUtils::StringToBool(value, hexencode_);
- }
- if (context->getProperty(MaxPayloadLineLength.getName(), value)) {
- core::Property::StringToInt(value, max_line_length_);
- }
+ context->getProperty(HexencodePayload.getName(), hexencode_);
+
+ context->getProperty(MaxPayloadLineLength.getName(), max_line_length_);
+ logger_->log_debug("Maximum Payload Line Length: %u", max_line_length_);
}
// OnTrigger method, implemented by NiFi LogAttribute
void LogAttribute::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h
index 99f53d4..36627a3 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -40,8 +40,8 @@ class LogAttribute : public core::Processor {
/*!
* Create a new processor
*/
- LogAttribute(std::string name, utils::Identifier uuid = utils::Identifier())
- : Processor(name, uuid),
+ explicit LogAttribute(std::string name, utils::Identifier uuid = utils::Identifier())
+ : Processor(std::move(name), uuid),
flowfiles_to_log_(1),
hexencode_(false),
max_line_length_(80U),
@@ -71,7 +71,7 @@ class LogAttribute : public core::Processor {
LogAttrLevelError
};
// Convert log level from string to enum
- bool logLevelStringToEnum(std::string logStr, LogAttrLevel &level) {
+ bool logLevelStringToEnum(const std::string &logStr, LogAttrLevel &level) {
if (logStr == "trace") {
level = LogAttrLevelTrace;
return true;
diff --git a/extensions/standard-processors/tests/unit/GetTCPTests.cpp b/extensions/standard-processors/tests/unit/GetTCPTests.cpp
index 86a8221..10d5f77 100644
--- a/extensions/standard-processors/tests/unit/GetTCPTests.cpp
+++ b/extensions/standard-processors/tests/unit/GetTCPTests.cpp
@@ -114,6 +114,7 @@ TEST_CASE("GetTCPWithoutEOM", "[GetTCP1]") {
server.writeData(buffer, buffer.size());
std::this_thread::sleep_for(std::chrono::seconds(2));
+ logAttribute->initialize();
logAttribute->incrementActiveTasks();
logAttribute->setScheduledState(core::ScheduledState::RUNNING);
std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2);
@@ -227,6 +228,7 @@ TEST_CASE("GetTCPWithOEM", "[GetTCP2]") {
server.writeData(buffer, buffer.size());
std::this_thread::sleep_for(std::chrono::seconds(2));
+ logAttribute->initialize();
logAttribute->incrementActiveTasks();
logAttribute->setScheduledState(core::ScheduledState::RUNNING);
std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2);
@@ -349,6 +351,7 @@ TEST_CASE("GetTCPWithOnlyOEM", "[GetTCP3]") {
server.writeData(buffer, buffer.size());
std::this_thread::sleep_for(std::chrono::seconds(2));
+ logAttribute->initialize();
logAttribute->incrementActiveTasks();
logAttribute->setScheduledState(core::ScheduledState::RUNNING);
std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2);
diff --git a/libminifi/include/c2/PayloadParser.h b/libminifi/include/c2/PayloadParser.h
index a64eb09..9689c40 100644
--- a/libminifi/include/c2/PayloadParser.h
+++ b/libminifi/include/c2/PayloadParser.h
@@ -88,6 +88,13 @@ struct convert_if<int64_t> : public convert_if_base<int64_t, state::response::In
};
template<>
+struct convert_if<uint32_t > : public convert_if_base<uint32_t, state::response::UInt32Value> {
+ explicit convert_if(const std::shared_ptr<state::response::Value> &node)
+ : convert_if_base(node) {
+ }
+};
+
+template<>
struct convert_if<int> : public convert_if_base<int, state::response::IntValue> {
explicit convert_if(const std::shared_ptr<state::response::Value> &node)
: convert_if_base(node) {
diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h
index de0f047..4905001 100644
--- a/libminifi/include/core/ConfigurableComponent.h
+++ b/libminifi/include/core/ConfigurableComponent.h
@@ -221,10 +221,11 @@ bool ConfigurableComponent::getProperty(const std::string name, T &value) const{
return true;
}
else{
- logger_->log_debug("Component %s property name %s, empty value", name, item.getName());
+ logger_->log_warn("Component %s property name %s, empty value", name, item.getName());
return false;
}
} else {
+ logger_->log_warn("Could not find property %s", name);
return false;
}
}
diff --git a/libminifi/include/core/PropertyValidation.h b/libminifi/include/core/PropertyValidation.h
index b0e59e9..f221b48 100644
--- a/libminifi/include/core/PropertyValidation.h
+++ b/libminifi/include/core/PropertyValidation.h
@@ -180,6 +180,34 @@ class IntegerValidator : public PropertyValidator {
}
};
+class UnsignedIntValidator : public PropertyValidator {
+ public:
+ explicit UnsignedIntValidator(const std::string &name)
+ : PropertyValidator(name) {
+ }
+ virtual ~UnsignedIntValidator() {
+
+ }
+ ValidationResult validate(const std::string &subject, const std::shared_ptr<minifi::state::response::Value> &input) const {
+ return PropertyValidator::_validate_internal<minifi::state::response::UInt32Value>(subject, input);
+ }
+
+ ValidationResult validate(const std::string &subject, const std::string &input) const {
+ try {
+ auto negative = input.find_first_of('-') != std::string::npos;
+ if (negative){
+ throw std::out_of_range("non negative expected");
+ }
+ std::stoul(input);
+ return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(true).build();
+ } catch (...) {
+
+ }
+ return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(false).build();
+ }
+
+};
+
class LongValidator : public PropertyValidator {
public:
explicit LongValidator(const std::string &name, int64_t min = (std::numeric_limits<int64_t>::min)(), int64_t max = (std::numeric_limits<int64_t>::max)())
@@ -327,6 +355,8 @@ class StandardValidators {
return init.BOOLEAN_VALIDATOR;
} else if (std::dynamic_pointer_cast<minifi::state::response::IntValue>(input) != nullptr) {
return init.INTEGER_VALIDATOR;
+ } else if (std::dynamic_pointer_cast<minifi::state::response::UInt32Value>(input) != nullptr) {
+ return init.UNSIGNED_INT_VALIDATOR;;
} else if (std::dynamic_pointer_cast<minifi::state::response::Int64Value>(input) != nullptr) {
return init.LONG_VALIDATOR;
} else if (std::dynamic_pointer_cast<minifi::state::response::UInt64Value>(input) != nullptr) {
@@ -349,6 +379,7 @@ class StandardValidators {
private:
std::shared_ptr<PropertyValidator> INVALID;
std::shared_ptr<PropertyValidator> INTEGER_VALIDATOR;
+ std::shared_ptr<PropertyValidator> UNSIGNED_INT_VALIDATOR;
std::shared_ptr<PropertyValidator> LONG_VALIDATOR;
std::shared_ptr<PropertyValidator> UNSIGNED_LONG_VALIDATOR;
std::shared_ptr<PropertyValidator> BOOLEAN_VALIDATOR;
diff --git a/libminifi/include/core/PropertyValue.h b/libminifi/include/core/PropertyValue.h
index 3c32c86..be62226 100644
--- a/libminifi/include/core/PropertyValue.h
+++ b/libminifi/include/core/PropertyValue.h
@@ -40,6 +40,8 @@ static inline std::shared_ptr<state::response::Value> convert(const std::shared_
}
} else if (prior->getTypeIndex() == state::response::Value::INT64_TYPE) {
return std::make_shared<state::response::Int64Value>(ref);
+ } else if (prior->getTypeIndex() == state::response::Value::UINT32_TYPE) {
+ return std::make_shared<state::response::UInt32Value>(ref);
} else if (prior->getTypeIndex() == state::response::Value::INT_TYPE) {
return std::make_shared<state::response::IntValue>(ref);
} else if (prior->getTypeIndex() == state::response::Value::BOOL_TYPE) {
@@ -67,7 +69,7 @@ class PropertyValue : public state::response::ValueNode {
validator_(o.validator_),
state::response::ValueNode(o) {
}
- PropertyValue(PropertyValue &&o)
+ PropertyValue(PropertyValue &&o) noexcept
: type_id(o.type_id),
validator_(std::move(o.validator_)),
state::response::ValueNode(std::move(o)) {
@@ -106,6 +108,14 @@ class PropertyValue : public state::response::ValueNode {
throw std::runtime_error("Invalid conversion to int64_t");
}
+ operator uint32_t() const {
+ uint32_t res;
+ if (value_->convertValue(res)) {
+ return res;
+ }
+ throw std::runtime_error("Invalid conversion to uint32_t for" + value_->getStringValue());
+ }
+
operator int() const {
int res;
if (value_->convertValue(res)) {
diff --git a/libminifi/include/core/state/Value.h b/libminifi/include/core/state/Value.h
index 17bbdae..6e3a644 100644
--- a/libminifi/include/core/state/Value.h
+++ b/libminifi/include/core/state/Value.h
@@ -71,6 +71,7 @@ class Value {
static const std::type_index UINT64_TYPE;
static const std::type_index INT64_TYPE;
+ static const std::type_index UINT32_TYPE;
static const std::type_index INT_TYPE;
static const std::type_index BOOL_TYPE;
static const std::type_index STRING_TYPE;
@@ -87,6 +88,15 @@ class Value {
type_id = std::type_index(typeid(T));
}
+ virtual bool getValue(uint32_t &ref) {
+ const auto negative = string_value.find_first_of('-') != std::string::npos;
+ if (negative){
+ return false;
+ }
+ ref = std::stoul(string_value);
+ return true;
+ }
+
virtual bool getValue(int &ref) {
ref = std::stol(string_value);
return true;
@@ -115,6 +125,64 @@ class Value {
std::type_index type_id;
};
+class UInt32Value : public Value {
+ public:
+ explicit UInt32Value(uint32_t value)
+ : Value(std::to_string(value)),
+ value(value) {
+ setTypeId<uint32_t>();
+ }
+
+ explicit UInt32Value(const std::string &strvalue)
+ : Value(strvalue),
+ value(std::stoul(strvalue)) {
+ /**
+ * This is a fundamental change in that we would be changing where this error occurs.
+ * We should be prudent about breaking backwards compatibility, but since Uint32Value
+ * is only created with a validator and type, we **should** be okay.
+ */
+ const auto negative = strvalue.find_first_of('-') != std::string::npos;
+ if (negative){
+ throw std::out_of_range("negative value detected");
+ }
+ setTypeId<uint32_t>();
+ }
+
+ uint32_t getValue() const {
+ return value;
+ }
+ protected:
+
+ virtual bool getValue(uint32_t &ref) {
+ ref = value;
+ return true;
+ }
+
+ virtual bool getValue(int &ref) {
+ if (value <= (std::numeric_limits<int>::max)()) {
+ ref = value;
+ return true;
+ }
+ return false;
+ }
+
+ virtual bool getValue(int64_t &ref) {
+ ref = value;
+ return true;
+ }
+
+ virtual bool getValue(uint64_t &ref) {
+ ref = value;
+ return true;
+ }
+
+ virtual bool getValue(bool &ref) {
+ return false;
+ }
+
+ uint32_t value;
+};
+
class IntValue : public Value {
public:
explicit IntValue(int value)
@@ -139,6 +207,14 @@ class IntValue : public Value {
return true;
}
+ virtual bool getValue(uint32_t &ref) {
+ if (value >= 0) {
+ ref = value;
+ return true;
+ }
+ return false;
+ }
+
virtual bool getValue(int64_t &ref) {
ref = value;
return true;
@@ -177,39 +253,19 @@ class BoolValue : public Value {
protected:
virtual bool getValue(int &ref) {
- if (ref == 1) {
- ref = true;
- return true;
- } else if (ref == 0) {
- ref = false;
- return true;
- } else {
- return false;
- }
+ return PreventSwearingInFutureRefactor(ref);
+ }
+
+ virtual bool getValue(uint32_t &ref) {
+ return PreventSwearingInFutureRefactor(ref);
}
virtual bool getValue(int64_t &ref) {
- if (ref == 1) {
- ref = true;
- return true;
- } else if (ref == 0) {
- ref = false;
- return true;
- } else {
- return false;
- }
+ return PreventSwearingInFutureRefactor(ref);
}
virtual bool getValue(uint64_t &ref) {
- if (ref == 1) {
- ref = true;
- return true;
- } else if (ref == 0) {
- ref = false;
- return true;
- } else {
- return false;
- }
+ return PreventSwearingInFutureRefactor(ref);
}
virtual bool getValue(bool &ref) {
@@ -218,6 +274,16 @@ class BoolValue : public Value {
}
bool value;
+
+ private:
+ template<typename T>
+ bool PreventSwearingInFutureRefactor(T &ref) {
+ if (value != 0 && value != 1) {
+ return false;
+ }
+ ref = value != 0;
+ return true;
+ }
};
class UInt64Value : public Value {
@@ -252,6 +318,10 @@ class UInt64Value : public Value {
return false;
}
+ virtual bool getValue(uint32_t &ref) {
+ return false;
+ }
+
virtual bool getValue(int64_t &ref) {
if (value <= (std::numeric_limits<int64_t>::max)()) {
ref = value;
@@ -294,6 +364,10 @@ class Int64Value : public Value {
return false;
}
+ virtual bool getValue(uint32_t &ref) {
+ return false;
+ }
+
virtual bool getValue(int64_t &ref) {
ref = value;
return true;
@@ -331,7 +405,7 @@ static inline std::shared_ptr<Value> createValue(const std::string &object) {
}
static inline std::shared_ptr<Value> createValue(const uint32_t &object) {
- return std::make_shared<UInt64Value>(object);
+ return std::make_shared<UInt32Value>(object);
}
#if ( defined(__APPLE__) || defined(__MACH__) || defined(DARWIN) )
static inline std::shared_ptr<Value> createValue(const size_t &object) {
@@ -370,6 +444,7 @@ class ValueNode {
auto operator=(const T ref) -> typename std::enable_if<std::is_same<T, int >::value ||
std::is_same<T, uint32_t >::value ||
std::is_same<T, size_t >::value ||
+ std::is_same<T, int64_t>::value ||
std::is_same<T, uint64_t >::value ||
std::is_same<T, bool >::value ||
std::is_same<T, char* >::value ||
@@ -379,10 +454,7 @@ class ValueNode {
return *this;
}
- ValueNode &operator=(const ValueNode &ref) {
- value_ = ref.value_;
- return *this;
- }
+ ValueNode &operator=(const ValueNode &ref) = default;
inline bool operator==(const ValueNode &rhs) const {
return to_string() == rhs.to_string();
diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp
index 37a53e6..1cdec27 100644
--- a/libminifi/src/c2/protocols/RESTProtocol.cpp
+++ b/libminifi/src/c2/protocols/RESTProtocol.cpp
@@ -165,6 +165,10 @@ void setJsonStr(const std::string& key, const state::response::ValueNode& value,
int value = 0;
base_type->convertValue(value);
valueVal.SetInt(value);
+ } else if (type_index == state::response::Value::UINT32_TYPE) {
+ uint32_t value = 0;
+ base_type->convertValue(value);
+ valueVal.SetUint(value);
} else if (type_index == state::response::Value::INT64_TYPE) {
int64_t value = 0;
base_type->convertValue(value);
diff --git a/libminifi/src/core/PropertyValidation.cpp b/libminifi/src/core/PropertyValidation.cpp
index ab1f53a..d476416 100644
--- a/libminifi/src/core/PropertyValidation.cpp
+++ b/libminifi/src/core/PropertyValidation.cpp
@@ -27,6 +27,7 @@ std::shared_ptr<PropertyValidator> StandardValidators::VALID = std::make_shared<
StandardValidators::StandardValidators() {
INVALID = std::make_shared<AlwaysValid>(false, "INVALID");
INTEGER_VALIDATOR = std::make_shared<IntegerValidator>("INTEGER_VALIDATOR");
+ UNSIGNED_INT_VALIDATOR = std::make_shared<UnsignedIntValidator>("NON_NEGATIVE_INTEGER_VALIDATOR");
LONG_VALIDATOR = std::make_shared<LongValidator>("LONG_VALIDATOR");
// name is used by java nifi validators, so we should keep this LONG and not change to reflect
// its internal use
diff --git a/libminifi/src/core/state/Value.cpp b/libminifi/src/core/state/Value.cpp
index 4c43c3e..ae1bae3 100644
--- a/libminifi/src/core/state/Value.cpp
+++ b/libminifi/src/core/state/Value.cpp
@@ -29,6 +29,7 @@ namespace response {
const std::type_index Value::UINT64_TYPE = std::type_index(typeid(uint64_t));
const std::type_index Value::INT64_TYPE = std::type_index(typeid(int64_t));
+const std::type_index Value::UINT32_TYPE = std::type_index(typeid(uint32_t));
const std::type_index Value::INT_TYPE = std::type_index(typeid(int));
const std::type_index Value::BOOL_TYPE = std::type_index(typeid(bool));
const std::type_index Value::STRING_TYPE = std::type_index(typeid(std::string));
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index c51dd2c..0cc4cc6 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -44,6 +44,17 @@ class IntegrationBase {
configureSecurity();
}
+ // Return the last position and number of occurrences.
+ std::pair<size_t, int> countPatInStr(const std::string &str, const std::string &pattern) {
+ size_t last_pos = 0;
+ int occurrences = 0;
+ for(size_t pos = str.find(pattern); pos != std::string::npos; pos = str.find(pattern, pos + pattern.size())) {
+ last_pos = pos;
+ occurrences++;
+ }
+ return {last_pos, occurrences};
+ }
+
virtual void testSetup() = 0;
virtual void shutdownBeforeFlowController() {
diff --git a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
index 18b5803..4f5412e 100644
--- a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
+++ b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
@@ -1,6 +1,4 @@
/**
- * @file GenerateFlowFile.h
- * GenerateFlowFile class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -29,32 +27,22 @@ class OnScheduleErrorHandlingTests : public IntegrationBase {
public:
virtual void runAssertions() {
std::string logs = LogTestController::getInstance().log_output.str();
- size_t pos = 0;
- size_t last_pos = 0;
- unsigned int occurances = 0;
- do {
- pos = logs.find(minifi::processors::KamikazeProcessor::OnScheduleExceptionStr, pos);
- if (pos != std::string::npos) {
- last_pos = pos;
- pos = logs.find(minifi::processors::KamikazeProcessor::OnUnScheduleLogStr, pos);
- if (pos != std::string::npos) {
- last_pos = pos;
- occurances++;
- }
- }
- } while (pos != std::string::npos);
- assert(occurances > 1); // Verify retry of onSchedule and onUnSchedule calls
+ auto result = countPatInStr(logs, minifi::processors::KamikazeProcessor::OnScheduleExceptionStr);
+ size_t last_pos = result.first;
+ int occurrences = result.second;
- // Make sure onSchedule succeeded after property was set
- assert(logs.find(minifi::processors::KamikazeProcessor::OnScheduleLogStr, last_pos) != std::string::npos);
+ assert(occurrences > 1); // Verify retry of onSchedule and onUnSchedule calls
- // Make sure onTrigger was called after onshedule succeeded
- pos = logs.find(minifi::processors::KamikazeProcessor::OnTriggerExceptionStr);
- assert(pos != std::string::npos && pos > last_pos);
+ std::vector<std::string> must_appear_byorder_msgs = {minifi::processors::KamikazeProcessor::OnUnScheduleLogStr,
+ minifi::processors::KamikazeProcessor::OnScheduleLogStr,
+ minifi::processors::KamikazeProcessor::OnTriggerExceptionStr,
+ "[warning] ProcessSession rollback for kamikaze executed"};
- pos = logs.find("[warning] ProcessSession rollback for kamikaze executed"); // Check rollback
- assert(pos != std::string::npos && pos > last_pos);
+ for (const auto &msg : must_appear_byorder_msgs) {
+ last_pos = logs.find(msg, last_pos);
+ assert(last_pos != std::string::npos);
+ }
assert(logs.find(minifi::processors::KamikazeProcessor::OnTriggerLogStr) == std::string::npos);
}
diff --git a/libminifi/test/resources/TestKafkaOnSchedule.yml b/libminifi/test/resources/TestKafkaOnSchedule.yml
new file mode 100644
index 0000000..eabb0e8
--- /dev/null
+++ b/libminifi/test/resources/TestKafkaOnSchedule.yml
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+Flow Controller:
+ name: MiNiFi Flow
+ onschedule retry interval: 1000 ms
+Processors:
+ - Properties:
+# Batch Size: 1234
+ Client Name: lmn
+# Compress Codec: none
+# Delivery Guarantee: '1'
+ Known Brokers: localhost:9092
+ Max Request Size: '1'
+# Message Timeout: 5 sec
+# Request Timeout: 10 sec
+ Topic Name: test
+ class: org.apache.nifi.processors.standard.PublishKafka
+ id: 3744352b-6eb1-4677-98a6-353417a90496
+ name: kafka
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 100 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list:
+Connections:
+Remote Processing Groups:
+