You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/04/20 16:42:19 UTC

[nifi-minifi-cpp] 05/08: MINIFICPP-1322 PublishKafka queue size and batch size properties should be in sync

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 091a4623365a7e60f28f11aa7a86b2ecd6aceff1
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Apr 20 18:01:01 2022 +0200

    MINIFICPP-1322 PublishKafka queue size and batch size properties should be in sync
    
    Closes #1304
    Signed-off-by: Marton Szasz <sz...@apache.org>
    
    @szaszm: I fixed small issue with the test after merging.
---
 CMakeLists.txt                                     |  2 +-
 extensions/librdkafka/PublishKafka.cpp             |  8 ++++-
 .../librdkafka/tests}/CMakeLists.txt               |  3 +-
 extensions/librdkafka/tests/PublishKafkaTests.cpp  | 38 ++++++++++++++++++++++
 4 files changed, 47 insertions(+), 4 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index b2f0352d5..ff9630eb6 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -502,7 +502,7 @@ endif()
 if (ENABLE_ALL OR ENABLE_LIBRDKAFKA)
 	include(BundledLibRdKafka)
 	use_bundled_librdkafka(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
-	createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka")
+	createExtension(RDKAFKA-EXTENSIONS "RDKAFKA EXTENSIONS" "This Enables librdkafka functionality including PublishKafka" "extensions/librdkafka" "extensions/librdkafka/tests")
 endif()
 
 ## Scripting extensions
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index e5456c3d7..a45eba348 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -602,7 +602,13 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessCon
     }
   }
   value = "";
-  if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty()) {
+  uint32_t int_val;
+  if (context->getProperty(QueueBufferMaxMessage.getName(), int_val)) {
+    if (int_val < batch_size_) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid configuration: Batch Size cannot be larger than Queue Max Message");
+    }
+
+    value = std::to_string(int_val);
     result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.messages", value.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: queue.buffering.max.messages [%s]", value);
     if (result != RD_KAFKA_CONF_OK) {
diff --git a/libminifi/test/kafka-tests/CMakeLists.txt b/extensions/librdkafka/tests/CMakeLists.txt
similarity index 86%
rename from libminifi/test/kafka-tests/CMakeLists.txt
rename to extensions/librdkafka/tests/CMakeLists.txt
index 1ef0e79b7..439fccd8d 100644
--- a/libminifi/test/kafka-tests/CMakeLists.txt
+++ b/extensions/librdkafka/tests/CMakeLists.txt
@@ -24,8 +24,7 @@ FOREACH(testfile ${KAFKA_INTEGRATION_TESTS})
 	get_filename_component(testfilename "${testfile}" NAME_WE)
 	add_executable("${testfilename}" "${testfile}")
 	target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/librdkafka")
-	target_include_directories(${testfilename} SYSTEM BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/librdkafka-0.11.4/src")
-	target_include_directories(${testfilename} SYSTEM BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/librdkafka-0.11.4/src-cpp")
+	target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test")
 	createTests("${testfilename}")
 	target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
 	target_link_libraries(${testfilename} minifi-rdkafka-extensions)
diff --git a/extensions/librdkafka/tests/PublishKafkaTests.cpp b/extensions/librdkafka/tests/PublishKafkaTests.cpp
new file mode 100644
index 000000000..b508ee02e
--- /dev/null
+++ b/extensions/librdkafka/tests/PublishKafkaTests.cpp
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TestBase.h"
+#include "Catch.h"
+#include "PublishKafka.h"
+#include "SingleProcessorTestController.h"
+
+namespace org::apache::nifi::minifi::test {
+
+TEST_CASE("Scheduling should fail when batch size is larger than the max queue message count", "[testPublishKafka]") {
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::PublishKafka>();
+  const auto publish_kafka = std::make_shared<processors::PublishKafka>("PublishKafka");
+  SingleProcessorTestController test_controller(publish_kafka);
+  publish_kafka->setProperty(processors::PublishKafka::ClientName, "test_client");
+  publish_kafka->setProperty(processors::PublishKafka::SeedBrokers, "test_seedbroker");
+  publish_kafka->setProperty(processors::PublishKafka::QueueBufferMaxMessage, "1000");
+  publish_kafka->setProperty(processors::PublishKafka::BatchSize, "1500");
+  test_controller.enqueueFlowFile("");
+  REQUIRE_THROWS_WITH(test_controller.trigger(), "Process Schedule Operation: Invalid configuration: Batch Size cannot be larger than Queue Max Message");
+}
+
+}  // namespace org::apache::nifi::minifi::test