You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/04/20 16:42:19 UTC
[nifi-minifi-cpp] 05/08: MINIFICPP-1322 PublishKafka queue size and batch size properties should be in sync
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 091a4623365a7e60f28f11aa7a86b2ecd6aceff1
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Apr 20 18:01:01 2022 +0200
MINIFICPP-1322 PublishKafka queue size and batch size properties should be in sync
Closes #1304
Signed-off-by: Marton Szasz <sz...@apache.org>
@szaszm: I fixed small issue with the test after merging.
---
CMakeLists.txt | 2 +-
extensions/librdkafka/PublishKafka.cpp | 8 ++++-
.../librdkafka/tests}/CMakeLists.txt | 3 +-
extensions/librdkafka/tests/PublishKafkaTests.cpp | 38 ++++++++++++++++++++++
4 files changed, 47 insertions(+), 4 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b2f0352d5..ff9630eb6 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -502,7 +502,7 @@ endif()
if (ENABLE_ALL OR ENABLE_LIBRDKAFKA)
include(BundledLibRdKafka)
use_bundled_librdkafka(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
- createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka")
+ createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka" "extensions/librdkafka/tests")
endif()
## Scripting extensions
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index e5456c3d7..a45eba348 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -602,7 +602,13 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessCon
}
}
value = "";
- if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty()) {
+ uint32_t int_val;
+ if (context->getProperty(QueueBufferMaxMessage.getName(), int_val)) {
+ if (int_val < batch_size_) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid configuration: Batch Size cannot be larger than Queue Max Message");
+ }
+
+ value = std::to_string(int_val);
result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.messages", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: queue.buffering.max.messages [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
diff --git a/libminifi/test/kafka-tests/CMakeLists.txt b/extensions/librdkafka/tests/CMakeLists.txt
similarity index 86%
rename from libminifi/test/kafka-tests/CMakeLists.txt
rename to extensions/librdkafka/tests/CMakeLists.txt
index 1ef0e79b7..439fccd8d 100644
--- a/libminifi/test/kafka-tests/CMakeLists.txt
+++ b/extensions/librdkafka/tests/CMakeLists.txt
@@ -24,8 +24,7 @@ FOREACH(testfile ${KAFKA_INTEGRATION_TESTS})
get_filename_component(testfilename "${testfile}" NAME_WE)
add_executable("${testfilename}" "${testfile}")
target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/librdkafka")
- target_include_directories(${testfilename} SYSTEM BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/librdkafka-0.11.4/src")
- target_include_directories(${testfilename} SYSTEM BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/librdkafka-0.11.4/src-cpp")
+ target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test")
createTests("${testfilename}")
target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
target_link_libraries(${testfilename} minifi-rdkafka-extensions)
diff --git a/extensions/librdkafka/tests/PublishKafkaTests.cpp b/extensions/librdkafka/tests/PublishKafkaTests.cpp
new file mode 100644
index 000000000..b508ee02e
--- /dev/null
+++ b/extensions/librdkafka/tests/PublishKafkaTests.cpp
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TestBase.h"
+#include "Catch.h"
+#include "PublishKafka.h"
+#include "SingleProcessorTestController.h"
+
+namespace org::apache::nifi::minifi::test {
+
+TEST_CASE("Scheduling should fail when batch size is larger than the max queue message count", "[testPublishKafka]") {
+ LogTestController::getInstance().setTrace<TestPlan>();
+ LogTestController::getInstance().setTrace<processors::PublishKafka>();
+ const auto publish_kafka = std::make_shared<processors::PublishKafka>("PublishKafka");
+ SingleProcessorTestController test_controller(publish_kafka);
+ publish_kafka->setProperty(processors::PublishKafka::ClientName, "test_client");
+ publish_kafka->setProperty(processors::PublishKafka::SeedBrokers, "test_seedbroker");
+ publish_kafka->setProperty(processors::PublishKafka::QueueBufferMaxMessage, "1000");
+ publish_kafka->setProperty(processors::PublishKafka::BatchSize, "1500");
+ test_controller.enqueueFlowFile("");
+ REQUIRE_THROWS_WITH(test_controller.trigger(), "Process Schedule Operation: Invalid configuration: Batch Size cannot be larger than Queue Max Message");
+}
+
+} // namespace org::apache::nifi::minifi::test