You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ad...@apache.org on 2021/10/18 10:12:47 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1530 Put Kafka
tests in order
This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 312a18e MINIFICPP-1530 Put Kafka tests in order
312a18e is described below
commit 312a18ed9a08ef8ba9efaeb6187843ed5fc9f204
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Oct 18 12:11:39 2021 +0200
MINIFICPP-1530 Put Kafka tests in order
Signed-off-by: Adam Debreceni <ad...@apache.org>
This closes #1180
---
CMakeLists.txt | 2 +-
.../integration/MiNiFi_integration_test_driver.py | 8 +
docker/test/integration/features/kafka.feature | 34 +++-
.../integration/minifi/core/DockerTestCluster.py | 16 +-
docker/test/integration/steps/steps.py | 5 +
extensions/librdkafka/docker_tests/CMakeLists.txt | 36 ----
extensions/librdkafka/tests/CMakeLists.txt | 41 ----
.../tests/PublishKafkaOnScheduleTests.cpp | 77 --------
extensions/librdkafka/tests/PublishKafkaTests.cpp | 208 ---------------------
9 files changed, 60 insertions(+), 367 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3c67568..2c715e1 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -486,7 +486,7 @@ endif()
if (ENABLE_ALL OR ENABLE_LIBRDKAFKA)
include(BundledLibRdKafka)
use_bundled_librdkafka(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
- createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka" "extensions/librdkafka/tests" "extensions/librdkafka/docker-tests")
+ createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka")
endif()
## Scripting extensions
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index fc857ed..98956f5 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -194,5 +194,13 @@ class MiNiFi_integration_test():
return
assert False
+ def check_minifi_log_matches_regex(self, regex, timeout_seconds=60):
+ for container in self.cluster.containers.values():
+ if container.get_engine() == "minifi-cpp":
+ line_found = self.cluster.wait_for_app_logs_regex(container.get_name(), regex, timeout_seconds)
+ if line_found:
+ return
+ assert False
+
def check_query_results(self, postgresql_container_name, query, number_of_rows, timeout_seconds):
assert self.cluster.check_query_results(postgresql_container_name, query, number_of_rows, timeout_seconds)
diff --git a/docker/test/integration/features/kafka.feature b/docker/test/integration/features/kafka.feature
index fd5adcd..ab4c34e 100644
--- a/docker/test/integration/features/kafka.feature
+++ b/docker/test/integration/features/kafka.feature
@@ -9,15 +9,41 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
Scenario: A MiNiFi instance transfers data to a kafka broker
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content "test" is present in "/tmp/input"
+ And a UpdateAttribute processor
+ And these processor properties are set:
+ | processor name | property name | property value |
+ | UpdateAttribute | kafka_require_num_acks | 1 |
+ | UpdateAttribute | kafka_message_key | unique_message_key_123 |
And a PublishKafka processor set up to communicate with a kafka broker instance
+ And these processor properties are set:
+ | processor name | property name | property value |
+ | PublishKafka | Topic Name | test |
+ | PublishKafka | Delivery Guarantee | ${kafka_require_num_acks} |
+ | PublishKafka | Request Timeout | 12 s |
+ | PublishKafka | Message Timeout | 13 s |
+ | PublishKafka | Known Brokers | kafka-broker:${literal(9000):plus(92)} |
+ | PublishKafka | Client Name | client_no_${literal(6):multiply(7)} |
+ | PublishKafka | Kafka Key | ${kafka_message_key} |
+ | PublishKafka | retry.backoff.ms | ${literal(2):multiply(25):multiply(3)} |
+ | PublishKafka | Message Key Field | kafka.key |
And a PutFile processor with the "Directory" property set to "/tmp/output"
- And the "success" relationship of the GetFile processor is connected to the PublishKafka
+ And the "success" relationship of the GetFile processor is connected to the UpdateAttribute
+ And the "success" relationship of the UpdateAttribute processor is connected to the PublishKafka
And the "success" relationship of the PublishKafka processor is connected to the PutFile
And a kafka broker is set up in correspondence with the PublishKafka
When both instances start up
Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+ And the Minifi logs contain the following message: " is 'test'" in less than 60 seconds
+ And the Minifi logs contain the following message: "PublishKafka: request.required.acks [1]" in less than 10 seconds
+ And the Minifi logs contain the following message: "PublishKafka: request.timeout.ms [12000]" in less than 10 seconds
+ And the Minifi logs contain the following message: "PublishKafka: message.timeout.ms [13000]" in less than 10 seconds
+ And the Minifi logs contain the following message: "PublishKafka: bootstrap.servers [kafka-broker:9092]" in less than 10 seconds
+ And the Minifi logs contain the following message: "PublishKafka: client.id [client_no_42]" in less than 10 seconds
+ And the Minifi logs contain the following message: "PublishKafka: Message Key [unique_message_key_123]" in less than 10 seconds
+ And the Minifi logs contain the following message: "PublishKafka: DynamicProperty: [retry.backoff.ms] -> [150]" in less than 10 seconds
+ And the Minifi logs contain the following message: "The Message Key Field property is set. This property is DEPRECATED and has no effect; please use Kafka Key instead." in less than 10 seconds
Scenario: PublishKafka sends flowfiles to failure when the broker is not available
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
@@ -58,6 +84,9 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
When both instances start up
Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+ # We fallback to the flowfile's uuid as message key if the Kafka Key property is not set
+ And the Minifi logs match the following regex: "PublishKafka: Message Key \[[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}\]" in less than 10 seconds
+
Scenario: PublishKafka sends can use SSL connect with SSL Context Service
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content "test" is present in "/tmp/input"
@@ -83,6 +112,9 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
When both instances start up
Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+ # We fallback to the flowfile's uuid as message key if the Kafka Key property is not set
+ And the Minifi logs match the following regex: "PublishKafka: Message Key \[[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}\]" in less than 10 seconds
+
Scenario: MiNiFi consumes data from a kafka topic
Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
index 206ef14..4f465b9 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -4,6 +4,7 @@ import subprocess
import sys
import time
import os
+import re
from .SingleNodeDockerCluster import SingleNodeDockerCluster
from .utils import retry_check
@@ -54,18 +55,27 @@ class DockerTestCluster(SingleNodeDockerCluster):
return container.status, None
- def wait_for_app_logs(self, container_name, log_entry, timeout_seconds, count=1):
+ def __wait_for_app_logs_impl(self, container_name, log_entry, timeout_seconds, count, use_regex):
wait_start_time = time.perf_counter()
while (time.perf_counter() - wait_start_time) < timeout_seconds:
logging.info('Waiting for app-logs `%s` in container `%s`', log_entry, container_name)
status, logs = self.get_app_log(container_name)
- if logs is not None and count <= logs.decode("utf-8").count(log_entry):
- return True
+ if logs is not None:
+ if not use_regex and logs.decode("utf-8").count(log_entry) >= count:
+ return True
+ elif use_regex and len(re.findall(log_entry, logs.decode("utf-8"))) >= count:
+ return True
elif status == 'exited':
return False
time.sleep(1)
return False
+ def wait_for_app_logs_regex(self, container_name, log_entry, timeout_seconds, count=1):
+ return self.__wait_for_app_logs_impl(container_name, log_entry, timeout_seconds, count, True)
+
+ def wait_for_app_logs(self, container_name, log_entry, timeout_seconds, count=1):
+ return self.__wait_for_app_logs_impl(container_name, log_entry, timeout_seconds, count, False)
+
def wait_for_startup_log(self, container_name, timeout_seconds):
return self.wait_for_app_logs(container_name, self.containers[container_name].get_startup_finished_log_entry(), timeout_seconds, 1)
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 3af539d..110ca60 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -546,3 +546,8 @@ def step_impl(context, query, number_of_rows, timeout_seconds):
@then("the Minifi logs contain the following message: \"{log_message}\" in less than {duration}")
def step_impl(context, log_message, duration):
context.test.check_minifi_log_contents(log_message, timeparse(duration))
+
+
+@then("the Minifi logs match the following regex: \"{regex}\" in less than {duration}")
+def step_impl(context, regex, duration):
+ context.test.check_minifi_log_matches_regex(regex, timeparse(duration))
diff --git a/extensions/librdkafka/docker_tests/CMakeLists.txt b/extensions/librdkafka/docker_tests/CMakeLists.txt
deleted file mode 100644
index 6fe8a0a..0000000
--- a/extensions/librdkafka/docker_tests/CMakeLists.txt
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-file(GLOB KAFKA_TESTS "*.cpp")
-
-SET(KAFKA_TEST_COUNT 0)
-
-FOREACH(testfile ${KAFKA_TESTS})
- get_filename_component(testfilename "${testfile}" NAME_WE)
- add_executable("${testfilename}" "${testfile}")
- target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/librdkafka/")
- target_link_libraries(${testfilename} minifi-rdkafka-extensions)
- createTests("${testfilename}")
- MATH(EXPR KAFKA_TEST_COUNT "${KAFKA_TEST_COUNT}+1")
- # The line below handles integration test
- add_test(NAME "${testfilename}" COMMAND "${testfilename}")
- target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
-ENDFOREACH()
-
-message("-- Finished building ${KAFKA_TEST_COUNT} Kafka related test file(s)...")
diff --git a/extensions/librdkafka/tests/CMakeLists.txt b/extensions/librdkafka/tests/CMakeLists.txt
deleted file mode 100644
index 8c6193c..0000000
--- a/extensions/librdkafka/tests/CMakeLists.txt
+++ /dev/null
@@ -1,41 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-file(GLOB KAFKA_TESTS "*.cpp")
-
-SET(KAFKA_TEST_COUNT 0)
-
-FOREACH(testfile ${KAFKA_TESTS})
- get_filename_component(testfilename "${testfile}" NAME_WE)
- add_executable("${testfilename}" "${testfile}")
- target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/librdkafka/")
- target_link_libraries(${testfilename} minifi-rdkafka-extensions)
- createTests("${testfilename}")
- MATH(EXPR KAFKA_TEST_COUNT "${KAFKA_TEST_COUNT}+1")
- target_include_directories(${testfilename} BEFORE PRIVATE "../..")
- target_include_directories(${testfilename} BEFORE PRIVATE "../../standard-processors/processors")
- target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/catch")
- target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test")
- target_link_libraries(${testfilename} minifi-standard-processors)
- target_link_libraries(${testfilename} minifi-expression-language-extensions)
- target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
- add_test(NAME "${testfilename}" COMMAND "${testfilename}" "${TEST_RESOURCES}/TestKafkaOnSchedule.yml" "${TEST_RESOURCES}/")
-ENDFOREACH()
-
-message("-- Finished building ${KAFKA_TEST_COUNT} Kafka related test file(s)...")
diff --git a/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp b/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
deleted file mode 100644
index 85d26c6..0000000
--- a/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#undef NDEBUG
-
-#include <cassert>
-#include "../../../libminifi/test/integration/IntegrationBase.h"
-#include "core/logging/Logger.h"
-#include "../../../libminifi/test/TestBase.h"
-#include "../PublishKafka.h"
-#include "utils/StringUtils.h"
-#include "utils/IntegrationTestUtils.h"
-
-class PublishKafkaOnScheduleTests : public IntegrationBase {
- public:
- void runAssertions() override {
- using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
- assert(verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
- const std::string logs = LogTestController::getInstance().log_output.str();
- const auto result = utils::StringUtils::countOccurrences(logs, "value 1 is outside allowed range 1000..1000000000");
- const int occurrences = result.second;
- return 1 < occurrences;
- }, std::chrono::milliseconds{10}));
- flowController_->updatePropertyValue("kafka", minifi::processors::PublishKafka::MaxMessageSize.getName(), "1999");
- const std::vector<std::string> must_appear_byorder_msgs = {"notifyStop called",
- "Successfully configured PublishKafka",
- "PublishKafka onTrigger"};
-
- const bool test_success = verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
- const std::string logs = LogTestController::getInstance().log_output.str();
- const auto result = utils::StringUtils::countOccurrences(logs, "value 1 is outside allowed range 1000..1000000000");
- size_t last_pos = result.first;
- for (const std::string& msg : must_appear_byorder_msgs) {
- last_pos = logs.find(msg, last_pos);
- if (last_pos == std::string::npos) {
- return false;
- }
- }
- return true;
- });
- assert(test_success);
- }
-
- void testSetup() override {
- LogTestController::getInstance().setDebug<core::ProcessGroup>();
- LogTestController::getInstance().setDebug<core::Processor>();
- LogTestController::getInstance().setDebug<core::ProcessSession>();
- LogTestController::getInstance().setDebug<minifi::processors::PublishKafka>();
- }
-};
-
-int main(int argc, char **argv) {
- std::string test_file_location, url;
- if (argc > 1) {
- test_file_location = argv[1];
- }
-
- PublishKafkaOnScheduleTests harness;
-
- harness.run(test_file_location);
-
- return 0;
-}
diff --git a/extensions/librdkafka/tests/PublishKafkaTests.cpp b/extensions/librdkafka/tests/PublishKafkaTests.cpp
deleted file mode 100644
index db44eb3..0000000
--- a/extensions/librdkafka/tests/PublishKafkaTests.cpp
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "librdkafka/PublishKafka.h"
-#include "TestBase.h"
-
-namespace {
-
-class PublishKafkaTestRunner {
- public:
- PublishKafkaTestRunner();
- void setPublishKafkaProperty(const core::Property& property, const std::string& value);
- void setPublishKafkaDynamicProperty(const std::string& property, const std::string& value);
- std::shared_ptr<core::FlowFile> createFlowFile() const;
- void enqueueFlowFile(const std::shared_ptr<core::FlowFile>& flow_file);
- void runProcessor();
-
- private:
- TestController test_controller_;
- std::shared_ptr<TestPlan> test_plan_;
- std::shared_ptr<core::Processor> upstream_processor_;
- std::shared_ptr<core::Processor> publish_kafka_processor_;
- std::shared_ptr<minifi::Connection> incoming_connection_;
-};
-
-const core::Relationship Success{"success", ""};
-const core::Relationship Failure{"failure", ""};
-
-PublishKafkaTestRunner::PublishKafkaTestRunner() : test_plan_(test_controller_.createPlan()) {
- LogTestController::getInstance().reset();
- LogTestController::getInstance().setDebug<processors::PublishKafka>();
-
- upstream_processor_ = test_plan_->addProcessor("GenerateFlowFile", "generate_flow_file");
- publish_kafka_processor_ = test_plan_->addProcessor("PublishKafka", "publish_kafka");
- publish_kafka_processor_->setAutoTerminatedRelationships({Success, Failure});
- incoming_connection_ = test_plan_->addConnection(upstream_processor_, Success, publish_kafka_processor_);
-
- test_plan_->setProperty(publish_kafka_processor_, processors::PublishKafka::SeedBrokers.getName(), "localhost:9092");
- test_plan_->setProperty(publish_kafka_processor_, processors::PublishKafka::ClientName.getName(), "Default_client_name");
- test_plan_->setProperty(publish_kafka_processor_, processors::PublishKafka::Topic.getName(), "Default_topic");
-
- test_plan_->increment_location(); // current processor is generate_flow_file
- test_plan_->increment_location(); // current processor is publish_kafka
-}
-
-void PublishKafkaTestRunner::setPublishKafkaProperty(const core::Property& property, const std::string& value) {
- test_plan_->setProperty(publish_kafka_processor_, property.getName(), value);
-}
-
-void PublishKafkaTestRunner::setPublishKafkaDynamicProperty(const std::string& property, const std::string& value) {
- test_plan_->setProperty(publish_kafka_processor_, property, value, true);
-}
-
-std::shared_ptr<core::FlowFile> PublishKafkaTestRunner::createFlowFile() const {
- const auto context = test_plan_->getProcessContextForProcessor(upstream_processor_);
- const auto session_factory = std::make_shared<core::ProcessSessionFactory>(context);
- const auto session = session_factory->createSession();
- auto flow_file = session->create();
-
- const std::string data = "flow file content";
- minifi::io::BufferStream stream{reinterpret_cast<const uint8_t*>(data.c_str()), static_cast<unsigned int>(data.length())};
- session->importFrom(stream, flow_file);
- session->transfer(flow_file, Success);
- session->commit();
-
- return flow_file;
-}
-
-void PublishKafkaTestRunner::enqueueFlowFile(const std::shared_ptr<core::FlowFile>& flow_file) {
- incoming_connection_->put(flow_file);
-}
-
-void PublishKafkaTestRunner::runProcessor() {
- test_plan_->runCurrentProcessor();
-}
-
-} // namespace
-
-TEST_CASE("PublishKafka can set the Topic from an attribute", "[PublishKafka][properties][expression_language]") {
- PublishKafkaTestRunner test_runner;
- test_runner.setPublishKafkaProperty(processors::PublishKafka::Topic, "${kafka_topic}-111");
-
- const auto flow_file = test_runner.createFlowFile();
- flow_file->setAttribute("kafka_topic", "Kafka_topic_for_unit_test");
- test_runner.enqueueFlowFile(flow_file);
-
- test_runner.runProcessor();
-
- REQUIRE(LogTestController::getInstance().contains("PublishKafka: topic for flow file " + flow_file->getUUIDStr() + " is 'Kafka_topic_for_unit_test-111'"));
-}
-
-TEST_CASE("PublishKafka can set DeliveryGuarantee from an attribute", "[PublishKafka][properties][expression_language]") {
- PublishKafkaTestRunner test_runner;
- test_runner.setPublishKafkaProperty(processors::PublishKafka::DeliveryGuarantee, "${kafka_require_num_acks}");
-
- const auto flow_file = test_runner.createFlowFile();
- flow_file->setAttribute("kafka_require_num_acks", "7");
- test_runner.enqueueFlowFile(flow_file);
-
- test_runner.runProcessor();
-
- REQUIRE(LogTestController::getInstance().contains("PublishKafka: request.required.acks [7]"));
-}
-
-TEST_CASE("PublishKafka can set RequestTimeOut", "[PublishKafka][properties]") {
- PublishKafkaTestRunner test_runner;
- test_runner.setPublishKafkaProperty(processors::PublishKafka::RequestTimeOut, "123 s");
-
- const auto flow_file = test_runner.createFlowFile();
- test_runner.enqueueFlowFile(flow_file);
-
- test_runner.runProcessor();
-
- REQUIRE(LogTestController::getInstance().contains("PublishKafka: request.timeout.ms [123000]"));
-}
-
-TEST_CASE("PublishKafka can set MessageTimeOut", "[PublishKafka][properties]") {
- PublishKafkaTestRunner test_runner;
- test_runner.setPublishKafkaProperty(processors::PublishKafka::MessageTimeOut, "5 min");
-
- const auto flow_file = test_runner.createFlowFile();
- test_runner.enqueueFlowFile(flow_file);
-
- test_runner.runProcessor();
-
- REQUIRE(LogTestController::getInstance().contains("PublishKafka: message.timeout.ms [300000]"));
-}
-
-TEST_CASE("PublishKafka can set SeedBrokers with expression language", "[PublishKafka][properties][expression_language]") {
- PublishKafkaTestRunner test_runner;
- test_runner.setPublishKafkaProperty(processors::PublishKafka::SeedBrokers, "localhost:${literal(9000):plus(123)}");
-
- test_runner.runProcessor();
-
- REQUIRE(LogTestController::getInstance().contains("PublishKafka: bootstrap.servers [localhost:9123]"));
-}
-
-TEST_CASE("PublishKafka can set ClientName with expression language", "[PublishKafka][properties][expression_language]") {
- PublishKafkaTestRunner test_runner;
- test_runner.setPublishKafkaProperty(processors::PublishKafka::ClientName, "client_no_${literal(6):multiply(7)}");
-
- test_runner.runProcessor();
-
- REQUIRE(LogTestController::getInstance().contains("PublishKafka: client.id [client_no_42]"));
-}
-
-TEST_CASE("If KafkaKey is not set, then PublishKafka sets the message key to the flow file ID", "[PublishKafka][properties]") {
- PublishKafkaTestRunner test_runner;
-
- const auto flow_file = test_runner.createFlowFile();
- test_runner.enqueueFlowFile(flow_file);
-
- test_runner.runProcessor();
-
- REQUIRE(LogTestController::getInstance().contains("PublishKafka: Message Key [" + flow_file->getUUIDStr() + "]"));
-}
-
-TEST_CASE("PublishKafka can set KafkaKey from an attribute", "[PublishKafka][properties][expression_language]") {
- PublishKafkaTestRunner test_runner;
- test_runner.setPublishKafkaProperty(processors::PublishKafka::KafkaKey, "${kafka_message_key}");
-
- const auto flow_file = test_runner.createFlowFile();
- flow_file->setAttribute("kafka_message_key", "unique_message_key_123");
- test_runner.enqueueFlowFile(flow_file);
-
- test_runner.runProcessor();
-
- REQUIRE(LogTestController::getInstance().contains("PublishKafka: Message Key [unique_message_key_123]"));
-}
-
-TEST_CASE("PublishKafka dynamic properties can use expression language", "[PublishKafka][properties][expression_language]") {
- PublishKafkaTestRunner test_runner;
- test_runner.setPublishKafkaDynamicProperty("retry.backoff.ms", "${literal(3):multiply(60):multiply(1000)}");
-
- test_runner.runProcessor();
-
- REQUIRE(LogTestController::getInstance().contains("PublishKafka: DynamicProperty: [retry.backoff.ms] -> [180000]"));
-}
-
-TEST_CASE("PublishKafka complains if Message Key Field is set, but only if it is set", "[PublishKafka][properties]") {
- PublishKafkaTestRunner test_runner;
-
- SECTION("Message Key Field is not set, so there is no error") {
- test_runner.runProcessor();
- REQUIRE_FALSE(LogTestController::getInstance().contains("error"));
- }
-
- SECTION("Message Key Field is set, so there is an error log") {
- test_runner.setPublishKafkaProperty(processors::PublishKafka::MessageKeyField, "kafka.key");
- test_runner.runProcessor();
- REQUIRE(LogTestController::getInstance().contains("The " + processors::PublishKafka::MessageKeyField.getName() +
- " property is set. This property is DEPRECATED and has no effect"));
- }
-}