You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2021/10/18 10:12:47 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1530 Put Kafka tests in order

This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 312a18e  MINIFICPP-1530 Put Kafka tests in order
312a18e is described below

commit 312a18ed9a08ef8ba9efaeb6187843ed5fc9f204
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Oct 18 12:11:39 2021 +0200

    MINIFICPP-1530 Put Kafka tests in order
    
    Signed-off-by: Adam Debreceni <ad...@apache.org>
    
    This closes #1180
---
 CMakeLists.txt                                     |   2 +-
 .../integration/MiNiFi_integration_test_driver.py  |   8 +
 docker/test/integration/features/kafka.feature     |  34 +++-
 .../integration/minifi/core/DockerTestCluster.py   |  16 +-
 docker/test/integration/steps/steps.py             |   5 +
 extensions/librdkafka/docker_tests/CMakeLists.txt  |  36 ----
 extensions/librdkafka/tests/CMakeLists.txt         |  41 ----
 .../tests/PublishKafkaOnScheduleTests.cpp          |  77 --------
 extensions/librdkafka/tests/PublishKafkaTests.cpp  | 208 ---------------------
 9 files changed, 60 insertions(+), 367 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3c67568..2c715e1 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -486,7 +486,7 @@ endif()
 if (ENABLE_ALL OR ENABLE_LIBRDKAFKA)
 	include(BundledLibRdKafka)
 	use_bundled_librdkafka(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
-	createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka" "extensions/librdkafka/tests" "extensions/librdkafka/docker-tests")
+	createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka")
 endif()
 
 ## Scripting extensions
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index fc857ed..98956f5 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -194,5 +194,13 @@ class MiNiFi_integration_test():
                     return
         assert False
 
+    def check_minifi_log_matches_regex(self, regex, timeout_seconds=60):
+        for container in self.cluster.containers.values():
+            if container.get_engine() == "minifi-cpp":
+                line_found = self.cluster.wait_for_app_logs_regex(container.get_name(), regex, timeout_seconds)
+                if line_found:
+                    return
+        assert False
+
     def check_query_results(self, postgresql_container_name, query, number_of_rows, timeout_seconds):
         assert self.cluster.check_query_results(postgresql_container_name, query, number_of_rows, timeout_seconds)
diff --git a/docker/test/integration/features/kafka.feature b/docker/test/integration/features/kafka.feature
index fd5adcd..ab4c34e 100644
--- a/docker/test/integration/features/kafka.feature
+++ b/docker/test/integration/features/kafka.feature
@@ -9,15 +9,41 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
   Scenario: A MiNiFi instance transfers data to a kafka broker
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a file with the content "test" is present in "/tmp/input"
+    And a UpdateAttribute processor
+    And these processor properties are set:
+      | processor name  | property name          | property value                        |
+      | UpdateAttribute | kafka_require_num_acks | 1                                     |
+      | UpdateAttribute | kafka_message_key      | unique_message_key_123                |
     And a PublishKafka processor set up to communicate with a kafka broker instance
+    And these processor properties are set:
+      | processor name | property name          | property value                         |
+      | PublishKafka   | Topic Name             | test                                   |
+      | PublishKafka   | Delivery Guarantee     | ${kafka_require_num_acks}              |
+      | PublishKafka   | Request Timeout        | 12 s                                   |
+      | PublishKafka   | Message Timeout        | 13 s                                   |
+      | PublishKafka   | Known Brokers          | kafka-broker:${literal(9000):plus(92)} |
+      | PublishKafka   | Client Name            | client_no_${literal(6):multiply(7)}    |
+      | PublishKafka   | Kafka Key              | ${kafka_message_key}                   |
+      | PublishKafka   | retry.backoff.ms       | ${literal(2):multiply(25):multiply(3)} |
+      | PublishKafka   | Message Key Field      | kafka.key                              |
     And a PutFile processor with the "Directory" property set to "/tmp/output"
-    And the "success" relationship of the GetFile processor is connected to the PublishKafka
+    And the "success" relationship of the GetFile processor is connected to the UpdateAttribute
+    And the "success" relationship of the UpdateAttribute processor is connected to the PublishKafka
     And the "success" relationship of the PublishKafka processor is connected to the PutFile
 
     And a kafka broker is set up in correspondence with the PublishKafka
 
     When both instances start up
     Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+    And the Minifi logs contain the following message: " is 'test'" in less than 60 seconds
+    And the Minifi logs contain the following message: "PublishKafka: request.required.acks [1]" in less than 10 seconds
+    And the Minifi logs contain the following message: "PublishKafka: request.timeout.ms [12000]" in less than 10 seconds
+    And the Minifi logs contain the following message: "PublishKafka: message.timeout.ms [13000]" in less than 10 seconds
+    And the Minifi logs contain the following message: "PublishKafka: bootstrap.servers [kafka-broker:9092]" in less than 10 seconds
+    And the Minifi logs contain the following message: "PublishKafka: client.id [client_no_42]" in less than 10 seconds
+    And the Minifi logs contain the following message: "PublishKafka: Message Key [unique_message_key_123]" in less than 10 seconds
+    And the Minifi logs contain the following message: "PublishKafka: DynamicProperty: [retry.backoff.ms] -> [150]" in less than 10 seconds
+    And the Minifi logs contain the following message: "The Message Key Field property is set. This property is DEPRECATED and has no effect; please use Kafka Key instead." in less than 10 seconds
 
   Scenario: PublishKafka sends flowfiles to failure when the broker is not available
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
@@ -58,6 +84,9 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
     When both instances start up
     Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
 
+    # We fallback to the flowfile's uuid as message key if the Kafka Key property is not set
+    And the Minifi logs match the following regex: "PublishKafka: Message Key \[[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}\]" in less than 10 seconds
+
   Scenario: PublishKafka sends can use SSL connect with SSL Context Service
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a file with the content "test" is present in "/tmp/input"
@@ -83,6 +112,9 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
     When both instances start up
     Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
 
+    # We fallback to the flowfile's uuid as message key if the Kafka Key property is not set
+    And the Minifi logs match the following regex: "PublishKafka: Message Key \[[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}\]" in less than 10 seconds
+
   Scenario: MiNiFi consumes data from a kafka topic
     Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
     And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
index 206ef14..4f465b9 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -4,6 +4,7 @@ import subprocess
 import sys
 import time
 import os
+import re
 
 from .SingleNodeDockerCluster import SingleNodeDockerCluster
 from .utils import retry_check
@@ -54,18 +55,27 @@ class DockerTestCluster(SingleNodeDockerCluster):
 
         return container.status, None
 
-    def wait_for_app_logs(self, container_name, log_entry, timeout_seconds, count=1):
+    def __wait_for_app_logs_impl(self, container_name, log_entry, timeout_seconds, count, use_regex):
         wait_start_time = time.perf_counter()
         while (time.perf_counter() - wait_start_time) < timeout_seconds:
             logging.info('Waiting for app-logs `%s` in container `%s`', log_entry, container_name)
             status, logs = self.get_app_log(container_name)
-            if logs is not None and count <= logs.decode("utf-8").count(log_entry):
-                return True
+            if logs is not None:
+                if not use_regex and logs.decode("utf-8").count(log_entry) >= count:
+                    return True
+                elif use_regex and len(re.findall(log_entry, logs.decode("utf-8"))) >= count:
+                    return True
             elif status == 'exited':
                 return False
             time.sleep(1)
         return False
 
+    def wait_for_app_logs_regex(self, container_name, log_entry, timeout_seconds, count=1):
+        return self.__wait_for_app_logs_impl(container_name, log_entry, timeout_seconds, count, True)
+
+    def wait_for_app_logs(self, container_name, log_entry, timeout_seconds, count=1):
+        return self.__wait_for_app_logs_impl(container_name, log_entry, timeout_seconds, count, False)
+
     def wait_for_startup_log(self, container_name, timeout_seconds):
         return self.wait_for_app_logs(container_name, self.containers[container_name].get_startup_finished_log_entry(), timeout_seconds, 1)
 
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 3af539d..110ca60 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -546,3 +546,8 @@ def step_impl(context, query, number_of_rows, timeout_seconds):
 @then("the Minifi logs contain the following message: \"{log_message}\" in less than {duration}")
 def step_impl(context, log_message, duration):
     context.test.check_minifi_log_contents(log_message, timeparse(duration))
+
+
+@then("the Minifi logs match the following regex: \"{regex}\" in less than {duration}")
+def step_impl(context, regex, duration):
+    context.test.check_minifi_log_matches_regex(regex, timeparse(duration))
diff --git a/extensions/librdkafka/docker_tests/CMakeLists.txt b/extensions/librdkafka/docker_tests/CMakeLists.txt
deleted file mode 100644
index 6fe8a0a..0000000
--- a/extensions/librdkafka/docker_tests/CMakeLists.txt
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-file(GLOB KAFKA_TESTS  "*.cpp")
-
-SET(KAFKA_TEST_COUNT 0)
-
-FOREACH(testfile ${KAFKA_TESTS})
-    get_filename_component(testfilename "${testfile}" NAME_WE)
-    add_executable("${testfilename}" "${testfile}")
-    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/librdkafka/")
-    target_link_libraries(${testfilename} minifi-rdkafka-extensions)
-    createTests("${testfilename}")
-    MATH(EXPR KAFKA_TEST_COUNT "${KAFKA_TEST_COUNT}+1")
-    # The line below handles integration test
-    add_test(NAME "${testfilename}" COMMAND "${testfilename}")
-    target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
-ENDFOREACH()
-
-message("-- Finished building ${KAFKA_TEST_COUNT} Kafka related test file(s)...")
diff --git a/extensions/librdkafka/tests/CMakeLists.txt b/extensions/librdkafka/tests/CMakeLists.txt
deleted file mode 100644
index 8c6193c..0000000
--- a/extensions/librdkafka/tests/CMakeLists.txt
+++ /dev/null
@@ -1,41 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-file(GLOB KAFKA_TESTS  "*.cpp")
-
-SET(KAFKA_TEST_COUNT 0)
-
-FOREACH(testfile ${KAFKA_TESTS})
-    get_filename_component(testfilename "${testfile}" NAME_WE)
-    add_executable("${testfilename}" "${testfile}")
-    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/librdkafka/")
-    target_link_libraries(${testfilename} minifi-rdkafka-extensions)
-    createTests("${testfilename}")
-    MATH(EXPR KAFKA_TEST_COUNT "${KAFKA_TEST_COUNT}+1")
-    target_include_directories(${testfilename} BEFORE PRIVATE "../..")
-    target_include_directories(${testfilename} BEFORE PRIVATE "../../standard-processors/processors")
-    target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/catch")
-    target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test")
-    target_link_libraries(${testfilename} minifi-standard-processors)
-    target_link_libraries(${testfilename} minifi-expression-language-extensions)
-    target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
-    add_test(NAME "${testfilename}" COMMAND "${testfilename}" "${TEST_RESOURCES}/TestKafkaOnSchedule.yml"  "${TEST_RESOURCES}/")
-ENDFOREACH()
-
-message("-- Finished building ${KAFKA_TEST_COUNT} Kafka related test file(s)...")
diff --git a/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp b/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
deleted file mode 100644
index 85d26c6..0000000
--- a/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#undef NDEBUG
-
-#include <cassert>
-#include "../../../libminifi/test/integration/IntegrationBase.h"
-#include "core/logging/Logger.h"
-#include "../../../libminifi/test/TestBase.h"
-#include "../PublishKafka.h"
-#include "utils/StringUtils.h"
-#include "utils/IntegrationTestUtils.h"
-
-class PublishKafkaOnScheduleTests : public IntegrationBase {
- public:
-    void runAssertions() override {
-      using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
-      assert(verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
-        const std::string logs = LogTestController::getInstance().log_output.str();
-        const auto result = utils::StringUtils::countOccurrences(logs, "value 1 is outside allowed range 1000..1000000000");
-        const int occurrences = result.second;
-        return 1 < occurrences;
-      }, std::chrono::milliseconds{10}));
-      flowController_->updatePropertyValue("kafka", minifi::processors::PublishKafka::MaxMessageSize.getName(), "1999");
-      const std::vector<std::string> must_appear_byorder_msgs = {"notifyStop called",
-          "Successfully configured PublishKafka",
-          "PublishKafka onTrigger"};
-
-      const bool test_success = verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
-        const std::string logs = LogTestController::getInstance().log_output.str();
-        const auto result = utils::StringUtils::countOccurrences(logs, "value 1 is outside allowed range 1000..1000000000");
-        size_t last_pos = result.first;
-        for (const std::string& msg : must_appear_byorder_msgs) {
-          last_pos = logs.find(msg, last_pos);
-          if (last_pos == std::string::npos)  {
-            return false;
-          }
-        }
-        return true;
-      });
-      assert(test_success);
-    }
-
-    void testSetup() override {
-      LogTestController::getInstance().setDebug<core::ProcessGroup>();
-      LogTestController::getInstance().setDebug<core::Processor>();
-      LogTestController::getInstance().setDebug<core::ProcessSession>();
-      LogTestController::getInstance().setDebug<minifi::processors::PublishKafka>();
-    }
-};
-
-int main(int argc, char **argv) {
-  std::string test_file_location, url;
-  if (argc > 1) {
-    test_file_location = argv[1];
-  }
-
-  PublishKafkaOnScheduleTests harness;
-
-  harness.run(test_file_location);
-
-  return 0;
-}
diff --git a/extensions/librdkafka/tests/PublishKafkaTests.cpp b/extensions/librdkafka/tests/PublishKafkaTests.cpp
deleted file mode 100644
index db44eb3..0000000
--- a/extensions/librdkafka/tests/PublishKafkaTests.cpp
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "librdkafka/PublishKafka.h"
-#include "TestBase.h"
-
-namespace {
-
-class PublishKafkaTestRunner {
- public:
-  PublishKafkaTestRunner();
-  void setPublishKafkaProperty(const core::Property& property, const std::string& value);
-  void setPublishKafkaDynamicProperty(const std::string& property, const std::string& value);
-  std::shared_ptr<core::FlowFile> createFlowFile() const;
-  void enqueueFlowFile(const std::shared_ptr<core::FlowFile>& flow_file);
-  void runProcessor();
-
- private:
-  TestController test_controller_;
-  std::shared_ptr<TestPlan> test_plan_;
-  std::shared_ptr<core::Processor> upstream_processor_;
-  std::shared_ptr<core::Processor> publish_kafka_processor_;
-  std::shared_ptr<minifi::Connection> incoming_connection_;
-};
-
-const core::Relationship Success{"success", ""};
-const core::Relationship Failure{"failure", ""};
-
-PublishKafkaTestRunner::PublishKafkaTestRunner() : test_plan_(test_controller_.createPlan()) {
-  LogTestController::getInstance().reset();
-  LogTestController::getInstance().setDebug<processors::PublishKafka>();
-
-  upstream_processor_ = test_plan_->addProcessor("GenerateFlowFile", "generate_flow_file");
-  publish_kafka_processor_ = test_plan_->addProcessor("PublishKafka", "publish_kafka");
-  publish_kafka_processor_->setAutoTerminatedRelationships({Success, Failure});
-  incoming_connection_ = test_plan_->addConnection(upstream_processor_, Success, publish_kafka_processor_);
-
-  test_plan_->setProperty(publish_kafka_processor_, processors::PublishKafka::SeedBrokers.getName(), "localhost:9092");
-  test_plan_->setProperty(publish_kafka_processor_, processors::PublishKafka::ClientName.getName(), "Default_client_name");
-  test_plan_->setProperty(publish_kafka_processor_, processors::PublishKafka::Topic.getName(), "Default_topic");
-
-  test_plan_->increment_location();  // current processor is generate_flow_file
-  test_plan_->increment_location();  // current processor is publish_kafka
-}
-
-void PublishKafkaTestRunner::setPublishKafkaProperty(const core::Property& property, const std::string& value) {
-  test_plan_->setProperty(publish_kafka_processor_, property.getName(), value);
-}
-
-void PublishKafkaTestRunner::setPublishKafkaDynamicProperty(const std::string& property, const std::string& value) {
-  test_plan_->setProperty(publish_kafka_processor_, property, value, true);
-}
-
-std::shared_ptr<core::FlowFile> PublishKafkaTestRunner::createFlowFile() const {
-  const auto context = test_plan_->getProcessContextForProcessor(upstream_processor_);
-  const auto session_factory = std::make_shared<core::ProcessSessionFactory>(context);
-  const auto session = session_factory->createSession();
-  auto flow_file = session->create();
-
-  const std::string data = "flow file content";
-  minifi::io::BufferStream stream{reinterpret_cast<const uint8_t*>(data.c_str()), static_cast<unsigned int>(data.length())};
-  session->importFrom(stream, flow_file);
-  session->transfer(flow_file, Success);
-  session->commit();
-
-  return flow_file;
-}
-
-void PublishKafkaTestRunner::enqueueFlowFile(const std::shared_ptr<core::FlowFile>& flow_file) {
-  incoming_connection_->put(flow_file);
-}
-
-void PublishKafkaTestRunner::runProcessor() {
-  test_plan_->runCurrentProcessor();
-}
-
-}  // namespace
-
-TEST_CASE("PublishKafka can set the Topic from an attribute", "[PublishKafka][properties][expression_language]") {
-  PublishKafkaTestRunner test_runner;
-  test_runner.setPublishKafkaProperty(processors::PublishKafka::Topic, "${kafka_topic}-111");
-
-  const auto flow_file = test_runner.createFlowFile();
-  flow_file->setAttribute("kafka_topic", "Kafka_topic_for_unit_test");
-  test_runner.enqueueFlowFile(flow_file);
-
-  test_runner.runProcessor();
-
-  REQUIRE(LogTestController::getInstance().contains("PublishKafka: topic for flow file " + flow_file->getUUIDStr() + " is 'Kafka_topic_for_unit_test-111'"));
-}
-
-TEST_CASE("PublishKafka can set DeliveryGuarantee from an attribute", "[PublishKafka][properties][expression_language]") {
-  PublishKafkaTestRunner test_runner;
-  test_runner.setPublishKafkaProperty(processors::PublishKafka::DeliveryGuarantee, "${kafka_require_num_acks}");
-
-  const auto flow_file = test_runner.createFlowFile();
-  flow_file->setAttribute("kafka_require_num_acks", "7");
-  test_runner.enqueueFlowFile(flow_file);
-
-  test_runner.runProcessor();
-
-  REQUIRE(LogTestController::getInstance().contains("PublishKafka: request.required.acks [7]"));
-}
-
-TEST_CASE("PublishKafka can set RequestTimeOut", "[PublishKafka][properties]") {
-  PublishKafkaTestRunner test_runner;
-  test_runner.setPublishKafkaProperty(processors::PublishKafka::RequestTimeOut, "123 s");
-
-  const auto flow_file = test_runner.createFlowFile();
-  test_runner.enqueueFlowFile(flow_file);
-
-  test_runner.runProcessor();
-
-  REQUIRE(LogTestController::getInstance().contains("PublishKafka: request.timeout.ms [123000]"));
-}
-
-TEST_CASE("PublishKafka can set MessageTimeOut", "[PublishKafka][properties]") {
-  PublishKafkaTestRunner test_runner;
-  test_runner.setPublishKafkaProperty(processors::PublishKafka::MessageTimeOut, "5 min");
-
-  const auto flow_file = test_runner.createFlowFile();
-  test_runner.enqueueFlowFile(flow_file);
-
-  test_runner.runProcessor();
-
-  REQUIRE(LogTestController::getInstance().contains("PublishKafka: message.timeout.ms [300000]"));
-}
-
-TEST_CASE("PublishKafka can set SeedBrokers with expression language", "[PublishKafka][properties][expression_language]") {
-  PublishKafkaTestRunner test_runner;
-  test_runner.setPublishKafkaProperty(processors::PublishKafka::SeedBrokers, "localhost:${literal(9000):plus(123)}");
-
-  test_runner.runProcessor();
-
-  REQUIRE(LogTestController::getInstance().contains("PublishKafka: bootstrap.servers [localhost:9123]"));
-}
-
-TEST_CASE("PublishKafka can set ClientName with expression language", "[PublishKafka][properties][expression_language]") {
-  PublishKafkaTestRunner test_runner;
-  test_runner.setPublishKafkaProperty(processors::PublishKafka::ClientName, "client_no_${literal(6):multiply(7)}");
-
-  test_runner.runProcessor();
-
-  REQUIRE(LogTestController::getInstance().contains("PublishKafka: client.id [client_no_42]"));
-}
-
-TEST_CASE("If KafkaKey is not set, then PublishKafka sets the message key to the flow file ID", "[PublishKafka][properties]") {
-  PublishKafkaTestRunner test_runner;
-
-  const auto flow_file = test_runner.createFlowFile();
-  test_runner.enqueueFlowFile(flow_file);
-
-  test_runner.runProcessor();
-
-  REQUIRE(LogTestController::getInstance().contains("PublishKafka: Message Key [" + flow_file->getUUIDStr() + "]"));
-}
-
-TEST_CASE("PublishKafka can set KafkaKey from an attribute", "[PublishKafka][properties][expression_language]") {
-  PublishKafkaTestRunner test_runner;
-  test_runner.setPublishKafkaProperty(processors::PublishKafka::KafkaKey, "${kafka_message_key}");
-
-  const auto flow_file = test_runner.createFlowFile();
-  flow_file->setAttribute("kafka_message_key", "unique_message_key_123");
-  test_runner.enqueueFlowFile(flow_file);
-
-  test_runner.runProcessor();
-
-  REQUIRE(LogTestController::getInstance().contains("PublishKafka: Message Key [unique_message_key_123]"));
-}
-
-TEST_CASE("PublishKafka dynamic properties can use expression language", "[PublishKafka][properties][expression_language]") {
-  PublishKafkaTestRunner test_runner;
-  test_runner.setPublishKafkaDynamicProperty("retry.backoff.ms", "${literal(3):multiply(60):multiply(1000)}");
-
-  test_runner.runProcessor();
-
-  REQUIRE(LogTestController::getInstance().contains("PublishKafka: DynamicProperty: [retry.backoff.ms] -> [180000]"));
-}
-
-TEST_CASE("PublishKafka complains if Message Key Field is set, but only if it is set", "[PublishKafka][properties]") {
-  PublishKafkaTestRunner test_runner;
-
-  SECTION("Message Key Field is not set, so there is no error") {
-    test_runner.runProcessor();
-    REQUIRE_FALSE(LogTestController::getInstance().contains("error"));
-  }
-
-  SECTION("Message Key Field is set, so there is an error log") {
-    test_runner.setPublishKafkaProperty(processors::PublishKafka::MessageKeyField, "kafka.key");
-    test_runner.runProcessor();
-    REQUIRE(LogTestController::getInstance().contains("The " + processors::PublishKafka::MessageKeyField.getName() +
-        " property is set. This property is DEPRECATED and has no effect"));
-  }
-}