You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/03/03 12:23:32 UTC

[nifi-minifi-cpp] 01/01: MINIFICPP - 1110, 1111 - PublishKafka, OPC processors should config and throw in onSchedule

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch MINIFICPP-1158
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit e4a1732ab2239ce917f255939bad20274a83323a
Author: Nghia Le <mi...@gmail.com>
AuthorDate: Wed Jan 15 13:39:18 2020 +0100

    MINIFICPP - 1110,1111 - PublishKafka, OPC processors should config and
    throw in onSchedule
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #710
---
 CMakeLists.txt                                     |   2 +-
 extensions/librdkafka/PublishKafka.cpp             | 211 +++++++++------------
 extensions/librdkafka/PublishKafka.h               |   6 +
 extensions/librdkafka/tests/CMakeLists.txt         |  36 ++++
 .../tests/PublishKafkaOnScheduleTests.cpp          |  74 ++++++++
 extensions/opc/include/opcbase.h                   |   2 -
 extensions/opc/src/fetchopc.cpp                    |  25 +--
 extensions/opc/src/opcbase.cpp                     |  28 +--
 extensions/opc/src/putopc.cpp                      |  26 +--
 .../processors/LogAttribute.cpp                    |  22 +--
 .../standard-processors/processors/LogAttribute.h  |   6 +-
 .../standard-processors/tests/unit/GetTCPTests.cpp |   3 +
 libminifi/include/c2/PayloadParser.h               |   7 +
 libminifi/include/core/ConfigurableComponent.h     |   3 +-
 libminifi/include/core/PropertyValidation.h        |  31 +++
 libminifi/include/core/PropertyValue.h             |  12 +-
 libminifi/include/core/state/Value.h               | 136 +++++++++----
 libminifi/src/c2/protocols/RESTProtocol.cpp        |   4 +
 libminifi/src/core/PropertyValidation.cpp          |   1 +
 libminifi/src/core/state/Value.cpp                 |   1 +
 libminifi/test/integration/IntegrationBase.h       |  11 ++
 .../integration/OnScheduleErrorHandlingTests.cpp   |  36 ++--
 libminifi/test/resources/TestKafkaOnSchedule.yml   |  46 +++++
 23 files changed, 474 insertions(+), 255 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index c3850f8..a57e325 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -357,7 +357,7 @@ option(ENABLE_LIBRDKAFKA "Enables the librdkafka extension." OFF)
 if (ENABLE_ALL OR ENABLE_LIBRDKAFKA)
 	include(BundledLibRdKafka)
 	use_bundled_librdkafka(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
-	createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka" "${TEST_DIR}/kafka-tests")
+	createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka" "extensions/librdkafka/tests")
 endif()
 
 ## Scripting extensions
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index 69fc1a6..f14dd6c 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -149,6 +149,46 @@ void PublishKafka::initialize() {
 
 void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   interrupted_ = false;
+
+  // Try to get a KafkaConnection
+  std::string client_id, brokers;
+  if (!context->getProperty(ClientName.getName(), client_id)) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client Name property missing or invalid");
+  }
+  if (!context->getProperty(SeedBrokers.getName(), brokers)) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Known Brokers property missing or invalid");
+  }
+
+  // Get some properties not (only) used directly to set up librdkafka
+
+  // Batch Size
+  context->getProperty(BatchSize.getName(), batch_size_);
+  logger_->log_debug("PublishKafka: Batch Size [%lu]", batch_size_);
+
+  // Target Batch Payload Size
+  context->getProperty(TargetBatchPayloadSize.getName(), target_batch_payload_size_);
+  logger_->log_debug("PublishKafka: Target Batch Payload Size [%llu]", target_batch_payload_size_);
+
+  // Max Flow Segment Size
+  context->getProperty(MaxFlowSegSize.getName(), max_flow_seg_size_);
+  logger_->log_debug("PublishKafka: Max Flow Segment Size [%llu]", max_flow_seg_size_);
+
+  // Attributes to Send as Headers
+  std::string value;
+  if (context->getProperty(AttributeNameRegex.getName(), value) && !value.empty()) {
+    attributeNameRegex_ = utils::Regex(value);
+    logger_->log_debug("PublishKafka: AttributeNameRegex [%s]", value);
+  }
+
+  // Future Improvement: Get rid of key since we only need to store one connection with current design.
+  key_.brokers_ = brokers;
+  key_.client_id_ = client_id;
+
+  std::unique_ptr<KafkaLease> lease = connection_pool_.getOrCreateConnection(key_);
+  std::shared_ptr<KafkaConnection> conn = lease->getConn();
+  configureNewConnection(conn, context);
+
+  logger_->log_debug("Successfully configured PublishKafka");
 }
 
 void PublishKafka::notifyStop() {
@@ -181,11 +221,11 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
   std::string valueConf;
   std::array<char, 512U> errstr{};
   rd_kafka_conf_res_t result;
+  const std::string PREFIX_ERROR_MSG = "PublishKafka: configure error result: ";
 
   rd_kafka_conf_t* conf_ = rd_kafka_conf_new();
   if (conf_ == nullptr) {
-    logger_->log_error("Failed to create rd_kafka_conf_t object");
-    return false;
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object");
   }
   utils::ScopeGuard confGuard([conf_](){
     rd_kafka_conf_destroy(conf_);
@@ -194,25 +234,23 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
   auto key = conn->getKey();
 
   if (key->brokers_.empty()) {
-    logger_->log_error("There are no brokers");
-    return false;
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
   }
   result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size());
   logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
   if (result != RD_KAFKA_CONF_OK) {
-    logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-    return false;
+    auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
   }
 
   if (key->client_id_.empty()) {
-    logger_->log_error("Client id is empty");
-    return false;
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client id is empty");
   }
   result = rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr.data(), errstr.size());
   logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_);
   if (result != RD_KAFKA_CONF_OK) {
-    logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-    return false;
+    auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
   }
 
   value = "";
@@ -220,8 +258,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
     result = rd_kafka_conf_set(conf_, "debug", value.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: debug [%s]", value);
     if (result != RD_KAFKA_CONF_OK) {
-      logger_->log_error("PublishKafka: configure debug error result [%s]", errstr.data());
-      return false;
+      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
   value = "";
@@ -229,8 +267,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
     result = rd_kafka_conf_set(conf_, "sasl.kerberos.service.name", value.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: sasl.kerberos.service.name [%s]", value);
     if (result != RD_KAFKA_CONF_OK) {
-      logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-      return false;
+      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
   value = "";
@@ -238,8 +276,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
     result = rd_kafka_conf_set(conf_, "sasl.kerberos.principal", value.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: sasl.kerberos.principal [%s]", value);
     if (result != RD_KAFKA_CONF_OK) {
-      logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-      return false;
+      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
   value = "";
@@ -247,19 +285,17 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
     result = rd_kafka_conf_set(conf_, "sasl.kerberos.keytab", value.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: sasl.kerberos.keytab [%s]", value);
     if (result != RD_KAFKA_CONF_OK) {
-      logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-      return false;
+      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
-
   value = "";
-  if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
-    valueConf = std::to_string(valInt);
-    result = rd_kafka_conf_set(conf_, "message.max.bytes", valueConf.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: message.max.bytes [%s]", valueConf);
+  if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty()) {
+    result = rd_kafka_conf_set(conf_, "message.max.bytes", value.c_str(), errstr.data(), errstr.size());
+    logger_->log_debug("PublishKafka: message.max.bytes [%s]", value);
     if (result != RD_KAFKA_CONF_OK) {
-      logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-      return false;
+      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
   value = "";
@@ -267,8 +303,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
     result = rd_kafka_conf_set(conf_, "queue.buffering.max.messages", value.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: queue.buffering.max.messages [%s]", value);
     if (result != RD_KAFKA_CONF_OK) {
-      logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-      return false;
+      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
   value = "";
@@ -278,8 +314,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
     result = rd_kafka_conf_set(conf_, "queue.buffering.max.kbytes", valueConf.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [%s]", valueConf);
     if (result != RD_KAFKA_CONF_OK) {
-      logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-      return false;
+      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
   value = "";
@@ -290,8 +326,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
       result = rd_kafka_conf_set(conf_, "queue.buffering.max.ms", valueConf.c_str(), errstr.data(), errstr.size());
       logger_->log_debug("PublishKafka: queue.buffering.max.ms [%s]", valueConf);
       if (result != RD_KAFKA_CONF_OK) {
-        logger_->log_error("PublishKafka: configure queue buffer error result [%s]", errstr.data());
-        return false;
+        auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
       }
     }
   }
@@ -300,8 +336,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
     result = rd_kafka_conf_set(conf_, "batch.num.messages", value.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: batch.num.messages [%s]", value);
     if (result != RD_KAFKA_CONF_OK) {
-      logger_->log_error("PublishKafka: configure batch size error result [%s]", errstr.data());
-      return false;
+      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
   value = "";
@@ -309,8 +345,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
     result = rd_kafka_conf_set(conf_, "compression.codec", value.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: compression.codec [%s]", value);
     if (result != RD_KAFKA_CONF_OK) {
-      logger_->log_error("PublishKafka: configure compression codec error result [%s]", errstr.data());
-      return false;
+      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
   value = "";
@@ -319,16 +355,16 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
       result = rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr.data(), errstr.size());
       logger_->log_debug("PublishKafka: security.protocol [%s]", value);
       if (result != RD_KAFKA_CONF_OK) {
-        logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-        return false;
+        auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
       }
       value = "";
       if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
         result = rd_kafka_conf_set(conf_, "ssl.ca.location", value.c_str(), errstr.data(), errstr.size());
         logger_->log_debug("PublishKafka: ssl.ca.location [%s]", value);
         if (result != RD_KAFKA_CONF_OK) {
-          logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-          return false;
+          auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+          throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
         }
       }
       value = "";
@@ -336,8 +372,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
         result = rd_kafka_conf_set(conf_, "ssl.certificate.location", value.c_str(), errstr.data(), errstr.size());
         logger_->log_debug("PublishKafka: ssl.certificate.location [%s]", value);
         if (result != RD_KAFKA_CONF_OK) {
-          logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-          return false;
+          auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+          throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
         }
       }
       value = "";
@@ -345,8 +381,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
         result = rd_kafka_conf_set(conf_, "ssl.key.location", value.c_str(), errstr.data(), errstr.size());
         logger_->log_debug("PublishKafka: ssl.key.location [%s]", value);
         if (result != RD_KAFKA_CONF_OK) {
-          logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-          return false;
+          auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+          throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
         }
       }
       value = "";
@@ -354,13 +390,13 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
         result = rd_kafka_conf_set(conf_, "ssl.key.password", value.c_str(), errstr.data(), errstr.size());
         logger_->log_debug("PublishKafka: ssl.key.password [%s]", value);
         if (result != RD_KAFKA_CONF_OK) {
-          logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-          return false;
+          auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+          throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
         }
       }
     } else {
-      logger_->log_error("PublishKafka: unknown Security Protocol: %s", value);
-      return false;
+      auto error_msg = utils::StringUtils::join_pack("PublishKafka: unknown Security Protocol: ", value);
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
@@ -374,8 +410,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
       logger_->log_debug("PublishKafka: DynamicProperty: [%s] -> [%s]", prop_key, value);
       result = rd_kafka_conf_set(conf_, prop_key.c_str(), value.c_str(), errstr.data(), errstr.size());
       if (result != RD_KAFKA_CONF_OK) {
-        logger_->log_error("PublishKafka: configure error result [%s]", errstr.data());
-        return false;
+        auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
       }
     } else {
       logger_->log_warn("PublishKafka Dynamic Property '%s' is empty and therefore will not be configured", prop_key);
@@ -391,8 +427,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection>
   rd_kafka_t* producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr.data(), errstr.size());
 
   if (producer == nullptr) {
-    logger_->log_error("Failed to create Kafka producer %s", errstr.data());
-    return false;
+    auto error_msg = utils::StringUtils::join_pack("Failed to create Kafka producer ", errstr.data());
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
   }
 
   // The producer took ownership of the configuration, we must not free it
@@ -497,24 +533,9 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     return;
   }
 
-  // Try to get a KafkaConnection
-  std::string client_id, brokers;
-  if (!context->getProperty(ClientName.getName(), client_id)) {
-    logger_->log_error("Client Name property missing or invalid");
-    context->yield();
-    return;
-  }
-  if (!context->getProperty(SeedBrokers.getName(), brokers)) {
-    logger_->log_error("Knowb Brokers property missing or invalid");
-    context->yield();
-    return;
-  }
-
-  KafkaConnectionKey key;
-  key.brokers_ = brokers;
-  key.client_id_ = client_id;
+  logger_->log_debug("PublishKafka onTrigger");
 
-  std::unique_ptr<KafkaLease> lease = connection_pool_.getOrCreateConnection(key);
+  std::unique_ptr<KafkaLease> lease = connection_pool_.getOrCreateConnection(key_);
   if (lease == nullptr) {
     logger_->log_info("This connection is used by another thread.");
     context->yield();
@@ -522,64 +543,18 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
   }
 
   std::shared_ptr<KafkaConnection> conn = lease->getConn();
-  if (!conn->initialized()) {
-    logger_->log_trace("Connection not initialized to %s, %s", client_id, brokers);
-    if (!configureNewConnection(conn, context)) {
-      logger_->log_error("Could not configure Kafka Connection");
-      context->yield();
-      return;
-    }
-  }
-
-  // Get some properties not (only) used directly to set up librdkafka
-  std::string value;
-
-  // Batch Size
-  uint32_t batch_size;
-  value = "";
-  if (context->getProperty(BatchSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, batch_size)) {
-    logger_->log_debug("PublishKafka: Batch Size [%lu]", batch_size);
-  } else {
-    batch_size = 10;
-  }
-
-  // Target Batch Payload Size
-  uint64_t target_batch_payload_size;
-  value = "";
-  if (context->getProperty(TargetBatchPayloadSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, target_batch_payload_size)) {
-    logger_->log_debug("PublishKafka: Target Batch Payload Size [%llu]", target_batch_payload_size);
-  } else {
-    target_batch_payload_size = 512 * 1024U;
-  }
-
-  // Max Flow Segment Size
-  uint64_t max_flow_seg_size;
-  value = "";
-  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, max_flow_seg_size)) {
-    logger_->log_debug("PublishKafka: Max Flow Segment Size [%llu]", max_flow_seg_size);
-  } else {
-    max_flow_seg_size = 0U;
-  }
-
-  // Attributes to Send as Headers
-  utils::Regex attributeNameRegex;
-  value = "";
-  if (context->getProperty(AttributeNameRegex.getName(), value) && !value.empty()) {
-    attributeNameRegex = utils::Regex(value);
-    logger_->log_debug("PublishKafka: AttributeNameRegex [%s]", value);
-  }
 
   // Collect FlowFiles to process
   uint64_t actual_bytes = 0U;
   std::vector<std::shared_ptr<core::FlowFile>> flowFiles;
-  for (uint32_t i = 0; i < batch_size; i++) {
+  for (uint32_t i = 0; i < batch_size_; i++) {
     std::shared_ptr<core::FlowFile> flowFile = session->get();
     if (flowFile == nullptr) {
       break;
     }
     actual_bytes += flowFile->getSize();
     flowFiles.emplace_back(std::move(flowFile));
-    if (target_batch_payload_size != 0U && actual_bytes >= target_batch_payload_size) {
+    if (target_batch_payload_size_ != 0U && actual_bytes >= target_batch_payload_size_) {
       break;
     }
   }
@@ -646,8 +621,8 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     bool failEmptyFlowFiles = true;
     context->getProperty(FailEmptyFlowFiles.getName(), failEmptyFlowFiles);
 
-    PublishKafka::ReadCallback callback(max_flow_seg_size, kafkaKey, thisTopic->getTopic(), conn->getConnection(), *flowFile,
-                                        attributeNameRegex, messages, flow_file_index, failEmptyFlowFiles);
+    PublishKafka::ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn->getConnection(), *flowFile,
+                                        attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles);
     session->read(flowFile, &callback);
 
     if (!callback.called_) {
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 9ee3296..c6ed19e 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -353,6 +353,12 @@ class PublishKafka : public core::Processor {
   std::shared_ptr<logging::Logger> logger_;
 
   KafkaPool connection_pool_;
+  KafkaConnectionKey key_;
+
+  uint32_t batch_size_;
+  uint64_t target_batch_payload_size_;
+  uint64_t max_flow_seg_size_;
+  utils::Regex attributeNameRegex_;
 
   std::atomic<bool> interrupted_;
   std::mutex messages_mutex_;
diff --git a/extensions/librdkafka/tests/CMakeLists.txt b/extensions/librdkafka/tests/CMakeLists.txt
new file mode 100644
index 0000000..056c7f1
--- /dev/null
+++ b/extensions/librdkafka/tests/CMakeLists.txt
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB KAFKA_TESTS  "*.cpp")
+
+SET(KAFKA_TEST_COUNT 0)
+
+FOREACH(testfile ${KAFKA_TESTS})
+    get_filename_component(testfilename "${testfile}" NAME_WE)
+    add_executable("${testfilename}" "${testfile}")
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/librdkafka/")
+    target_wholearchive_library(${testfilename} minifi-rdkafka-extensions)
+    createTests("${testfilename}")
+    MATH(EXPR KAFKA_TEST_COUNT "${KAFKA_TEST_COUNT}+1")
+    # The line below handles integration test
+    add_test(NAME "${testfilename}" COMMAND "${testfilename}" "${TEST_RESOURCES}/TestKafkaOnSchedule.yml"  "${TEST_RESOURCES}/")
+    target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+ENDFOREACH()
+
+message("-- Finished building ${KAFKA_TEST_COUNT} Kafka related test file(s)...")
diff --git a/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp b/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
new file mode 100644
index 0000000..8a835fa
--- /dev/null
+++ b/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+
+#include <cassert>
+#include "../../../libminifi/test/integration/IntegrationBase.h"
+#include "core/logging/Logger.h"
+#include "../../../libminifi/test/TestBase.h"
+#include "../PublishKafka.h"
+
+class PublishKafkaOnScheduleTests : public IntegrationBase {
+ public:
+    virtual void runAssertions() {
+      std::string logs = LogTestController::getInstance().log_output.str();
+
+      auto result = countPatInStr(logs, "value 1 is outside allowed range 1000..1000000000");
+      size_t last_pos = result.first;
+      int occurrences = result.second;
+
+      assert(occurrences > 1);  // Verify retry of onSchedule and onUnSchedule calls
+
+      std::vector<std::string> must_appear_byorder_msgs = {"notifyStop called",
+                                                           "Successfully configured PublishKafka",
+                                                           "PublishKafka onTrigger"};
+
+      for (const auto &msg : must_appear_byorder_msgs) {
+        last_pos = logs.find(msg, last_pos);
+        assert(last_pos != std::string::npos);
+      }
+    }
+
+    virtual void testSetup() {
+      LogTestController::getInstance().setDebug<core::ProcessGroup>();
+      LogTestController::getInstance().setDebug<core::Processor>();
+      LogTestController::getInstance().setDebug<core::ProcessSession>();
+      LogTestController::getInstance().setDebug<minifi::processors::PublishKafka>();
+    }
+
+    virtual void waitToVerifyProcessor() {
+      std::this_thread::sleep_for(std::chrono::seconds(3));
+      flowController_->updatePropertyValue("kafka", minifi::processors::PublishKafka::MaxMessageSize.getName(), "1999");
+      std::this_thread::sleep_for(std::chrono::seconds(3));
+    }
+
+    virtual void cleanup() {}
+};
+
+int main(int argc, char **argv) {
+  std::string test_file_location, url;
+  if (argc > 1) {
+    test_file_location = argv[1];
+  }
+
+  PublishKafkaOnScheduleTests harness;
+
+  harness.run(test_file_location);
+
+  return 0;
+}
diff --git a/extensions/opc/include/opcbase.h b/extensions/opc/include/opcbase.h
index 2ad4b03..897f75c 100644
--- a/extensions/opc/include/opcbase.h
+++ b/extensions/opc/include/opcbase.h
@@ -71,8 +71,6 @@ class BaseOPCProcessor : public core::Processor {
   std::vector<char> keyBuffer_;
   std::vector<std::vector<char>> trustBuffers_;
 
-  bool configOK_;
-
   virtual std::set<core::Property> getSupportedProperties() const {return {OPCServerEndPoint, ApplicationURI, Username, Password, CertificatePath, KeyPath, TrustedPath};}
 };
 
diff --git a/extensions/opc/src/fetchopc.cpp b/extensions/opc/src/fetchopc.cpp
index ce29fd7..866b57e 100644
--- a/extensions/opc/src/fetchopc.cpp
+++ b/extensions/opc/src/fetchopc.cpp
@@ -95,12 +95,6 @@ namespace processors {
 
     BaseOPCProcessor::onSchedule(context, factory);
 
-    if(!configOK_) {
-      return;
-    }
-
-    configOK_ = false;
-
     std::string value;
     context->getProperty(NodeID.getName(), nodeID_);
     context->getProperty(NodeIDType.getName(), value);
@@ -116,37 +110,30 @@ namespace processors {
       idType_ = opc::OPCNodeIDType::Path;
     } else {
       // Where have our validators gone?
-      logger_->log_error("%s is not a valid node ID type!", value.c_str());
+      auto error_msg = utils::StringUtils::join_pack(value, " is not a valid node ID type!");
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
 
     if(idType_ == opc::OPCNodeIDType::Int) {
       try {
         int t = std::stoi(nodeID_);
       } catch(...) {
-        logger_->log_error("%s cannot be used as an int type node ID", nodeID_.c_str());
-        return;
+        auto error_msg = utils::StringUtils::join_pack(nodeID_, " cannot be used as an int type node ID");
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
       }
     }
     if(idType_ != opc::OPCNodeIDType::Path) {
       if(!context->getProperty(NameSpaceIndex.getName(), nameSpaceIdx_)) {
-        logger_->log_error("%s is mandatory in case %s is not Path", NameSpaceIndex.getName().c_str(), NodeIDType.getName().c_str());
-        return;
+        auto error_msg = utils::StringUtils::join_pack(NameSpaceIndex.getName(), " is mandatory in case ", NodeIDType.getName(), " is not Path");
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
       }
     }
 
     context->getProperty(Lazy.getName(), value);
     lazy_mode_ = value == "On" ? true : false;
-
-    configOK_ = true;
   }
 
   void FetchOPCProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session){
-    if(!configOK_) {
-      logger_->log_error("This processor was not configured properly, yielding. Please check for previous errors in the logs!");
-      yield();
-      return;
-    }
-
     logger_->log_trace("FetchOPCProcessor::onTrigger");
 
     std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
diff --git a/extensions/opc/src/opcbase.cpp b/extensions/opc/src/opcbase.cpp
index 4fb42b3..e2fe67d 100644
--- a/extensions/opc/src/opcbase.cpp
+++ b/extensions/opc/src/opcbase.cpp
@@ -75,33 +75,27 @@ namespace processors {
     username_.clear();
     trustBuffers_.clear();
 
-    configOK_ = false;
-
     context->getProperty(OPCServerEndPoint.getName(), endPointURL_);
     context->getProperty(ApplicationURI.getName(), applicationURI_);
 
     if (context->getProperty(Username.getName(), username_) != context->getProperty(Password.getName(), password_)) {
-      logger_->log_error("Both or neither of Username and Password should be provided!");
-      return;
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Both or neither of Username and Password should be provided!");
     }
 
     auto certificatePathRes = context->getProperty(CertificatePath.getName(), certpath_);
     auto keyPathRes = context->getProperty(KeyPath.getName(), keypath_);
     auto trustedPathRes = context->getProperty(TrustedPath.getName(), trustpath_);
     if (certificatePathRes != keyPathRes || keyPathRes != trustedPathRes) {
-      logger_->log_error("All or none of Certificate path, Key path and Trusted server certificate path should be provided!");
-      return;
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "All or none of Certificate path, Key path and Trusted server certificate path should be provided!");
     }
 
     if (!password_.empty() && (certpath_.empty() || keypath_.empty() || trustpath_.empty() || applicationURI_.empty())) {
-      logger_->log_error("Certificate path, Key path, Trusted server certificate path and Application URI must be provided in case Password is provided!");
-      return;
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Certificate path, Key path, Trusted server certificate path and Application URI must be provided in case Password is provided!");
     }
 
     if (!certpath_.empty()) {
       if (applicationURI_.empty()) {
-        logger_->log_error("Application URI must be provided if Certificate path is provided!");
-        return;
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Application URI must be provided if Certificate path is provided!");
       }
 
       std::ifstream input_cert(certpath_, std::ios::binary);
@@ -120,21 +114,19 @@ namespace processors {
       }
 
       if (certBuffer_.empty()) {
-        logger_->log_error("Failed to load cert from path: %s", certpath_);
-        return;
+        auto error_msg = utils::StringUtils::join_pack("Failed to load cert from path: ", certpath_);
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
       }
       if (keyBuffer_.empty()) {
-        logger_->log_error("Failed to load key from path: %s", keypath_);
-        return;
+        auto error_msg = utils::StringUtils::join_pack("Failed to load key from path: ", keypath_);
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
       }
 
       if (trustBuffers_[0].empty()) {
-        logger_->log_error("Failed to load trusted server certs from path: %s", trustpath_);
-        return;
+        auto error_msg = utils::StringUtils::join_pack("Failed to load trusted server certs from path: ", trustpath_);
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
       }
     }
-
-    configOK_ = true;
   }
 
   bool BaseOPCProcessor::reconnect() {
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index c5acf3f..acc1a63 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -116,13 +116,6 @@ namespace processors {
 
     BaseOPCProcessor::onSchedule(context, factory);
 
-    if(!configOK_) {
-      return;
-    }
-
-    configOK_ = false;
-
-    context->getProperty(OPCServerEndPoint.getName(), endPointURL_);
     std::string value;
     context->getProperty(ParentNodeID.getName(), nodeID_);
     context->getProperty(ParentNodeIDType.getName(), value);
@@ -135,21 +128,22 @@ namespace processors {
       idType_ = opc::OPCNodeIDType::Path;
     } else {
       // Where have our validators gone?
-      logger_->log_error("%s is not a valid node ID type!", value.c_str());
+      auto error_msg = utils::StringUtils::join_pack(value, " is not a valid node ID type!");
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
 
     if(idType_ == opc::OPCNodeIDType::Int) {
       try {
         int t = std::stoi(nodeID_);
       } catch(...) {
-        logger_->log_error("%s cannot be used as an int type node ID", nodeID_.c_str());
-        return;
+        auto error_msg = utils::StringUtils::join_pack(nodeID_, " cannot be used as an int type node ID");
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
       }
     }
     if(idType_ != opc::OPCNodeIDType::Path) {
       if(!context->getProperty(ParentNameSpaceIndex.getName(), nameSpaceIdx_)) {
-        logger_->log_error("%s is mandatory in case %s is not Path", ParentNameSpaceIndex.getName().c_str(), ParentNodeIDType.getName().c_str());
-        return;
+        auto error_msg = utils::StringUtils::join_pack(ParentNameSpaceIndex.getName(), " is mandatory in case ", ParentNodeIDType.getName(), " is not Path");
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
       }
     }
 
@@ -157,17 +151,9 @@ namespace processors {
     context->getProperty(ValueType.getName(), typestr);
     nodeDataType_ = opc::StringToOPCDataTypeMap.at(typestr);  // This throws, but allowed values are generated based on this map -> that's a really unexpected error
 
-    configOK_ = true;
   }
 
   void PutOPCProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-    if (!configOK_) {
-      logger_->log_error(
-          "This processor was not configured properly, yielding. Please check for previous errors in the logs!");
-      yield();
-      return;
-    }
-
     logger_->log_trace("PutOPCProcessor::onTrigger");
 
     std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
diff --git a/extensions/standard-processors/processors/LogAttribute.cpp b/extensions/standard-processors/processors/LogAttribute.cpp
index 458b6e5..0021877 100644
--- a/extensions/standard-processors/processors/LogAttribute.cpp
+++ b/extensions/standard-processors/processors/LogAttribute.cpp
@@ -88,23 +88,13 @@ void LogAttribute::initialize() {
 }
 
 void LogAttribute::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
-  core::Property flowsToLog = FlowFilesToLog;
-
-  if (getProperty(FlowFilesToLog.getName(), flowsToLog)) {
-    // we are going this route since to avoid breaking backwards compatibility the get property function doesn't perform validation ( That's done
-    // in configuration. In future releases we can add that exception handling there.
-    if (!flowsToLog.getValue().validate("Validating FlowFilesToLog").valid())
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for flowfiles to log: " + flowsToLog.getValue().to_string());
-    flowfiles_to_log_ = flowsToLog.getValue();
-  }
+  context->getProperty(FlowFilesToLog.getName(), flowfiles_to_log_);
+  logger_->log_debug("FlowFiles To Log: %llu", flowfiles_to_log_);
 
-  std::string value;
-  if (context->getProperty(HexencodePayload.getName(), value)) {
-    utils::StringUtils::StringToBool(value, hexencode_);
-  }
-  if (context->getProperty(MaxPayloadLineLength.getName(), value)) {
-    core::Property::StringToInt(value, max_line_length_);
-  }
+  context->getProperty(HexencodePayload.getName(), hexencode_);
+
+  context->getProperty(MaxPayloadLineLength.getName(), max_line_length_);
+  logger_->log_debug("Maximum Payload Line Length: %u", max_line_length_);
 }
 // OnTrigger method, implemented by NiFi LogAttribute
 void LogAttribute::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h
index 99f53d4..36627a3 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -40,8 +40,8 @@ class LogAttribute : public core::Processor {
   /*!
    * Create a new processor
    */
-  LogAttribute(std::string name, utils::Identifier uuid = utils::Identifier())
-      : Processor(name, uuid),
+  explicit LogAttribute(std::string name, utils::Identifier uuid = utils::Identifier())
+      : Processor(std::move(name), uuid),
         flowfiles_to_log_(1),
         hexencode_(false),
         max_line_length_(80U),
@@ -71,7 +71,7 @@ class LogAttribute : public core::Processor {
     LogAttrLevelError
   };
   // Convert log level from string to enum
-  bool logLevelStringToEnum(std::string logStr, LogAttrLevel &level) {
+  bool logLevelStringToEnum(const std::string &logStr, LogAttrLevel &level) {
     if (logStr == "trace") {
       level = LogAttrLevelTrace;
       return true;
diff --git a/extensions/standard-processors/tests/unit/GetTCPTests.cpp b/extensions/standard-processors/tests/unit/GetTCPTests.cpp
index 86a8221..10d5f77 100644
--- a/extensions/standard-processors/tests/unit/GetTCPTests.cpp
+++ b/extensions/standard-processors/tests/unit/GetTCPTests.cpp
@@ -114,6 +114,7 @@ TEST_CASE("GetTCPWithoutEOM", "[GetTCP1]") {
   server.writeData(buffer, buffer.size());
   std::this_thread::sleep_for(std::chrono::seconds(2));
 
+  logAttribute->initialize();
   logAttribute->incrementActiveTasks();
   logAttribute->setScheduledState(core::ScheduledState::RUNNING);
   std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2);
@@ -227,6 +228,7 @@ TEST_CASE("GetTCPWithOEM", "[GetTCP2]") {
   server.writeData(buffer, buffer.size());
   std::this_thread::sleep_for(std::chrono::seconds(2));
 
+  logAttribute->initialize();
   logAttribute->incrementActiveTasks();
   logAttribute->setScheduledState(core::ScheduledState::RUNNING);
   std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2);
@@ -349,6 +351,7 @@ TEST_CASE("GetTCPWithOnlyOEM", "[GetTCP3]") {
   server.writeData(buffer, buffer.size());
   std::this_thread::sleep_for(std::chrono::seconds(2));
 
+  logAttribute->initialize();
   logAttribute->incrementActiveTasks();
   logAttribute->setScheduledState(core::ScheduledState::RUNNING);
   std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2);
diff --git a/libminifi/include/c2/PayloadParser.h b/libminifi/include/c2/PayloadParser.h
index a64eb09..9689c40 100644
--- a/libminifi/include/c2/PayloadParser.h
+++ b/libminifi/include/c2/PayloadParser.h
@@ -88,6 +88,13 @@ struct convert_if<int64_t> : public convert_if_base<int64_t, state::response::In
 };
 
 template<>
+struct convert_if<uint32_t > : public convert_if_base<uint32_t, state::response::UInt32Value> {
+  explicit convert_if(const std::shared_ptr<state::response::Value> &node)
+      : convert_if_base(node) {
+  }
+};
+
+template<>
 struct convert_if<int> : public convert_if_base<int, state::response::IntValue> {
   explicit convert_if(const std::shared_ptr<state::response::Value> &node)
       : convert_if_base(node) {
diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h
index de0f047..4905001 100644
--- a/libminifi/include/core/ConfigurableComponent.h
+++ b/libminifi/include/core/ConfigurableComponent.h
@@ -221,10 +221,11 @@ bool ConfigurableComponent::getProperty(const std::string name, T &value) const{
        return true;
      }
      else{
-       logger_->log_debug("Component %s property name %s, empty value", name, item.getName());
+       logger_->log_warn("Component %s property name %s, empty value", name, item.getName());
        return false;
      }
    } else {
+     logger_->log_warn("Could not find property %s", name);
      return false;
    }
 }
diff --git a/libminifi/include/core/PropertyValidation.h b/libminifi/include/core/PropertyValidation.h
index b0e59e9..f221b48 100644
--- a/libminifi/include/core/PropertyValidation.h
+++ b/libminifi/include/core/PropertyValidation.h
@@ -180,6 +180,34 @@ class IntegerValidator : public PropertyValidator {
   }
 };
 
+class UnsignedIntValidator : public PropertyValidator {
+ public:
+  explicit UnsignedIntValidator(const std::string &name)
+      : PropertyValidator(name) {
+  }
+  virtual ~UnsignedIntValidator() {
+
+  }
+  ValidationResult validate(const std::string &subject, const std::shared_ptr<minifi::state::response::Value> &input) const {
+    return PropertyValidator::_validate_internal<minifi::state::response::UInt32Value>(subject, input);
+  }
+
+  ValidationResult validate(const std::string &subject, const std::string &input) const {
+    try {
+      auto negative = input.find_first_of('-') != std::string::npos;
+      if (negative){
+        throw std::out_of_range("non negative expected");
+      }
+      std::stoul(input);
+      return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(true).build();
+    } catch (...) {
+
+    }
+    return ValidationResult::Builder::createBuilder().withSubject(subject).withInput(input).isValid(false).build();
+  }
+
+};
+
 class LongValidator : public PropertyValidator {
  public:
   explicit LongValidator(const std::string &name, int64_t min = (std::numeric_limits<int64_t>::min)(), int64_t max = (std::numeric_limits<int64_t>::max)())
@@ -327,6 +355,8 @@ class StandardValidators {
       return init.BOOLEAN_VALIDATOR;
     } else if (std::dynamic_pointer_cast<minifi::state::response::IntValue>(input) != nullptr) {
       return init.INTEGER_VALIDATOR;
+    } else if (std::dynamic_pointer_cast<minifi::state::response::UInt32Value>(input) != nullptr) {
+      return init.UNSIGNED_INT_VALIDATOR;;
     } else if (std::dynamic_pointer_cast<minifi::state::response::Int64Value>(input) != nullptr) {
       return init.LONG_VALIDATOR;
     } else if (std::dynamic_pointer_cast<minifi::state::response::UInt64Value>(input) != nullptr) {
@@ -349,6 +379,7 @@ class StandardValidators {
  private:
   std::shared_ptr<PropertyValidator> INVALID;
   std::shared_ptr<PropertyValidator> INTEGER_VALIDATOR;
+  std::shared_ptr<PropertyValidator> UNSIGNED_INT_VALIDATOR;
   std::shared_ptr<PropertyValidator> LONG_VALIDATOR;
   std::shared_ptr<PropertyValidator> UNSIGNED_LONG_VALIDATOR;
   std::shared_ptr<PropertyValidator> BOOLEAN_VALIDATOR;
diff --git a/libminifi/include/core/PropertyValue.h b/libminifi/include/core/PropertyValue.h
index 3c32c86..be62226 100644
--- a/libminifi/include/core/PropertyValue.h
+++ b/libminifi/include/core/PropertyValue.h
@@ -40,6 +40,8 @@ static inline std::shared_ptr<state::response::Value> convert(const std::shared_
     }
   } else if (prior->getTypeIndex() == state::response::Value::INT64_TYPE) {
     return std::make_shared<state::response::Int64Value>(ref);
+  } else if (prior->getTypeIndex() == state::response::Value::UINT32_TYPE) {
+    return std::make_shared<state::response::UInt32Value>(ref);
   } else if (prior->getTypeIndex() == state::response::Value::INT_TYPE) {
     return std::make_shared<state::response::IntValue>(ref);
   } else if (prior->getTypeIndex() == state::response::Value::BOOL_TYPE) {
@@ -67,7 +69,7 @@ class PropertyValue : public state::response::ValueNode {
         validator_(o.validator_),
         state::response::ValueNode(o) {
   }
-  PropertyValue(PropertyValue &&o)
+  PropertyValue(PropertyValue &&o) noexcept
       : type_id(o.type_id),
         validator_(std::move(o.validator_)),
         state::response::ValueNode(std::move(o)) {
@@ -106,6 +108,14 @@ class PropertyValue : public state::response::ValueNode {
     throw std::runtime_error("Invalid conversion to int64_t");
   }
 
+  operator uint32_t() const {
+    uint32_t res;
+    if (value_->convertValue(res)) {
+      return res;
+    }
+    throw std::runtime_error("Invalid conversion to uint32_t for" + value_->getStringValue());
+  }
+
   operator int() const {
     int res;
     if (value_->convertValue(res)) {
diff --git a/libminifi/include/core/state/Value.h b/libminifi/include/core/state/Value.h
index 17bbdae..6e3a644 100644
--- a/libminifi/include/core/state/Value.h
+++ b/libminifi/include/core/state/Value.h
@@ -71,6 +71,7 @@ class Value {
 
   static const std::type_index UINT64_TYPE;
   static const std::type_index INT64_TYPE;
+  static const std::type_index UINT32_TYPE;
   static const std::type_index INT_TYPE;
   static const std::type_index BOOL_TYPE;
   static const std::type_index STRING_TYPE;
@@ -87,6 +88,15 @@ class Value {
     type_id = std::type_index(typeid(T));
   }
 
+  virtual bool getValue(uint32_t &ref) {
+    const auto negative = string_value.find_first_of('-') != std::string::npos;
+     if (negative){
+       return false;
+     }
+    ref = std::stoul(string_value);
+    return true;
+  }
+
   virtual bool getValue(int &ref) {
     ref = std::stol(string_value);
     return true;
@@ -115,6 +125,64 @@ class Value {
   std::type_index type_id;
 };
 
+class UInt32Value : public Value {
+ public:
+  explicit UInt32Value(uint32_t value)
+      : Value(std::to_string(value)),
+        value(value) {
+    setTypeId<uint32_t>();
+  }
+
+  explicit UInt32Value(const std::string &strvalue)
+      : Value(strvalue),
+        value(std::stoul(strvalue)) {
+    /**
+     * This is a fundamental change in that we would be changing where this error occurs.
+     * We should be prudent about breaking backwards compatibility, but since Uint32Value
+     * is only created with a validator and type, we **should** be okay.
+     */
+    const auto negative = strvalue.find_first_of('-') != std::string::npos;
+     if (negative){
+       throw std::out_of_range("negative value detected");
+     }
+    setTypeId<uint32_t>();
+  }
+
+  uint32_t getValue() const {
+    return value;
+  }
+ protected:
+
+  virtual bool getValue(uint32_t &ref) {
+    ref = value;
+    return true;
+  }
+
+  virtual bool getValue(int &ref) {
+    if (value <= (std::numeric_limits<int>::max)()) {
+      ref = value;
+      return true;
+    }
+    return false;
+  }
+
+  virtual bool getValue(int64_t &ref) {
+    ref = value;
+    return true;
+  }
+
+  virtual bool getValue(uint64_t &ref) {
+    ref = value;
+    return true;
+  }
+
+  virtual bool getValue(bool &ref) {
+    return false;
+  }
+
+  uint32_t value;
+};
+
 class IntValue : public Value {
  public:
   explicit IntValue(int value)
@@ -139,6 +207,14 @@ class IntValue : public Value {
     return true;
   }
 
+  virtual bool getValue(uint32_t &ref) {
+    if (value >= 0) {
+      ref = value;
+      return true;
+    }
+    return false;
+  }
+
   virtual bool getValue(int64_t &ref) {
     ref = value;
     return true;
@@ -177,39 +253,19 @@ class BoolValue : public Value {
  protected:
 
   virtual bool getValue(int &ref) {
-    if (ref == 1) {
-      ref = true;
-      return true;
-    } else if (ref == 0) {
-      ref = false;
-      return true;
-    } else {
-      return false;
-    }
+    return PreventSwearingInFutureRefactor(ref);
+  }
+
+  virtual bool getValue(uint32_t &ref) {
+    return PreventSwearingInFutureRefactor(ref);
   }
 
   virtual bool getValue(int64_t &ref) {
-    if (ref == 1) {
-      ref = true;
-      return true;
-    } else if (ref == 0) {
-      ref = false;
-      return true;
-    } else {
-      return false;
-    }
+    return PreventSwearingInFutureRefactor(ref);
   }
 
   virtual bool getValue(uint64_t &ref) {
-    if (ref == 1) {
-      ref = true;
-      return true;
-    } else if (ref == 0) {
-      ref = false;
-      return true;
-    } else {
-      return false;
-    }
+    return PreventSwearingInFutureRefactor(ref);
   }
 
   virtual bool getValue(bool &ref) {
@@ -218,6 +274,16 @@ class BoolValue : public Value {
   }
 
   bool value;
+
+ private:
+  template<typename T>
+  bool PreventSwearingInFutureRefactor(T &ref) {
+    if (value != 0 && value != 1) {
+      return false;
+    }
+    ref = value != 0;
+    return true;
+  }
 };
 
 class UInt64Value : public Value {
@@ -252,6 +318,10 @@ class UInt64Value : public Value {
     return false;
   }
 
+  virtual bool getValue(uint32_t &ref) {
+    return false;
+  }
+
   virtual bool getValue(int64_t &ref) {
     if (value <= (std::numeric_limits<int64_t>::max)()) {
       ref = value;
@@ -294,6 +364,10 @@ class Int64Value : public Value {
     return false;
   }
 
+  virtual bool getValue(uint32_t &ref) {
+    return false;
+  }
+
   virtual bool getValue(int64_t &ref) {
     ref = value;
     return true;
@@ -331,7 +405,7 @@ static inline std::shared_ptr<Value> createValue(const std::string &object) {
 }
 
 static inline std::shared_ptr<Value> createValue(const uint32_t &object) {
-  return std::make_shared<UInt64Value>(object);
+  return std::make_shared<UInt32Value>(object);
 }
 #if ( defined(__APPLE__) || defined(__MACH__) || defined(DARWIN) )
 static inline std::shared_ptr<Value> createValue(const size_t &object) {
@@ -370,6 +444,7 @@ class ValueNode {
   auto operator=(const T ref) -> typename std::enable_if<std::is_same<T, int >::value ||
   std::is_same<T, uint32_t >::value ||
   std::is_same<T, size_t >::value ||
+  std::is_same<T, int64_t>::value ||
   std::is_same<T, uint64_t >::value ||
   std::is_same<T, bool >::value ||
   std::is_same<T, char* >::value ||
@@ -379,10 +454,7 @@ class ValueNode {
     return *this;
   }
 
-  ValueNode &operator=(const ValueNode &ref) {
-    value_ = ref.value_;
-    return *this;
-  }
+  ValueNode &operator=(const ValueNode &ref) = default;
 
   inline bool operator==(const ValueNode &rhs) const {
     return to_string() == rhs.to_string();
diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp
index 37a53e6..1cdec27 100644
--- a/libminifi/src/c2/protocols/RESTProtocol.cpp
+++ b/libminifi/src/c2/protocols/RESTProtocol.cpp
@@ -165,6 +165,10 @@ void setJsonStr(const std::string& key, const state::response::ValueNode& value,
       int value = 0;
       base_type->convertValue(value);
       valueVal.SetInt(value);
+    } else if (type_index == state::response::Value::UINT32_TYPE) {
+      uint32_t value = 0;
+      base_type->convertValue(value);
+      valueVal.SetUint(value);
     } else if (type_index == state::response::Value::INT64_TYPE) {
       int64_t value = 0;
       base_type->convertValue(value);
diff --git a/libminifi/src/core/PropertyValidation.cpp b/libminifi/src/core/PropertyValidation.cpp
index ab1f53a..d476416 100644
--- a/libminifi/src/core/PropertyValidation.cpp
+++ b/libminifi/src/core/PropertyValidation.cpp
@@ -27,6 +27,7 @@ std::shared_ptr<PropertyValidator> StandardValidators::VALID = std::make_shared<
 StandardValidators::StandardValidators() {
   INVALID = std::make_shared<AlwaysValid>(false, "INVALID");
   INTEGER_VALIDATOR = std::make_shared<IntegerValidator>("INTEGER_VALIDATOR");
+  UNSIGNED_INT_VALIDATOR = std::make_shared<UnsignedIntValidator>("NON_NEGATIVE_INTEGER_VALIDATOR");
   LONG_VALIDATOR = std::make_shared<LongValidator>("LONG_VALIDATOR");
   // name is used by java nifi validators, so we should keep this LONG and not change to reflect
   // its internal use
diff --git a/libminifi/src/core/state/Value.cpp b/libminifi/src/core/state/Value.cpp
index 4c43c3e..ae1bae3 100644
--- a/libminifi/src/core/state/Value.cpp
+++ b/libminifi/src/core/state/Value.cpp
@@ -29,6 +29,7 @@ namespace response {
 
 const std::type_index Value::UINT64_TYPE = std::type_index(typeid(uint64_t));
 const std::type_index Value::INT64_TYPE = std::type_index(typeid(int64_t));
+const std::type_index Value::UINT32_TYPE = std::type_index(typeid(uint32_t));
 const std::type_index Value::INT_TYPE = std::type_index(typeid(int));
 const std::type_index Value::BOOL_TYPE = std::type_index(typeid(bool));
 const std::type_index Value::STRING_TYPE = std::type_index(typeid(std::string));
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index c51dd2c..0cc4cc6 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -44,6 +44,17 @@ class IntegrationBase {
     configureSecurity();
   }
 
+  // Return the last position and number of occurrences.
+  std::pair<size_t, int> countPatInStr(const std::string &str, const std::string &pattern) {
+    size_t last_pos = 0;
+    int occurrences = 0;
+    for(size_t pos = str.find(pattern); pos != std::string::npos; pos = str.find(pattern, pos + pattern.size())) {
+      last_pos = pos;
+      occurrences++;
+    }
+    return {last_pos, occurrences};
+  }
+
   virtual void testSetup() = 0;
 
   virtual void shutdownBeforeFlowController() {
diff --git a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
index 18b5803..4f5412e 100644
--- a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
+++ b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
@@ -1,6 +1,4 @@
 /**
- * @file GenerateFlowFile.h
- * GenerateFlowFile class declaration
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -29,32 +27,22 @@ class OnScheduleErrorHandlingTests : public IntegrationBase {
  public:
   virtual void runAssertions() {
     std::string logs = LogTestController::getInstance().log_output.str();
-    size_t pos = 0;
-    size_t last_pos = 0;
-    unsigned int occurances = 0;
-    do {
-      pos = logs.find(minifi::processors::KamikazeProcessor::OnScheduleExceptionStr, pos);
-      if (pos != std::string::npos) {
-        last_pos = pos;
-        pos = logs.find(minifi::processors::KamikazeProcessor::OnUnScheduleLogStr, pos);
-        if (pos != std::string::npos) {
-          last_pos = pos;
-          occurances++;
-        }
-      }
-    } while (pos != std::string::npos);
 
-    assert(occurances > 1);  // Verify retry of onSchedule and onUnSchedule calls
+    auto result = countPatInStr(logs, minifi::processors::KamikazeProcessor::OnScheduleExceptionStr);
+    size_t last_pos = result.first;
+    int occurrences = result.second;
 
-    // Make sure onSchedule succeeded after property was set
-    assert(logs.find(minifi::processors::KamikazeProcessor::OnScheduleLogStr, last_pos) != std::string::npos);
+    assert(occurrences > 1);  // Verify retry of onSchedule and onUnSchedule calls
 
-    // Make sure onTrigger was called after onshedule succeeded
-    pos = logs.find(minifi::processors::KamikazeProcessor::OnTriggerExceptionStr);
-    assert(pos != std::string::npos && pos > last_pos);
+    std::vector<std::string> must_appear_byorder_msgs = {minifi::processors::KamikazeProcessor::OnUnScheduleLogStr,
+                                                 minifi::processors::KamikazeProcessor::OnScheduleLogStr,
+                                                 minifi::processors::KamikazeProcessor::OnTriggerExceptionStr,
+                                                 "[warning] ProcessSession rollback for kamikaze executed"};
 
-    pos = logs.find("[warning] ProcessSession rollback for kamikaze executed");  // Check rollback
-    assert(pos != std::string::npos && pos > last_pos);
+    for (const auto &msg : must_appear_byorder_msgs) {
+      last_pos = logs.find(msg, last_pos);
+      assert(last_pos != std::string::npos);
+    }
 
     assert(logs.find(minifi::processors::KamikazeProcessor::OnTriggerLogStr) == std::string::npos);
   }
diff --git a/libminifi/test/resources/TestKafkaOnSchedule.yml b/libminifi/test/resources/TestKafkaOnSchedule.yml
new file mode 100644
index 0000000..eabb0e8
--- /dev/null
+++ b/libminifi/test/resources/TestKafkaOnSchedule.yml
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+Flow Controller:
+  name: MiNiFi Flow
+  onschedule retry interval: 1000 ms
+Processors:
+  - Properties:
+#      Batch Size: 1234
+      Client Name: lmn
+#      Compress Codec: none
+#      Delivery Guarantee: '1'
+      Known Brokers: localhost:9092
+      Max Request Size: '1'
+#      Message Timeout: 5 sec
+#      Request Timeout: 10 sec
+      Topic Name: test
+    class: org.apache.nifi.processors.standard.PublishKafka
+    id: 3744352b-6eb1-4677-98a6-353417a90496
+    name: kafka
+    max concurrent tasks: 1
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 100 sec
+    penalization period: 30 sec
+    yield period: 1 sec
+    run duration nanos: 0
+    auto-terminated relationships list:
+Connections:
+Remote Processing Groups:
+