You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2021/02/23 15:21:56 UTC
[nifi-minifi-cpp] branch main updated: MINIFICPP-1506 - Apply patch
#3236 from librdkafka repository Upgrade librdkafka to 1.6.0
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new a975d23 MINIFICPP-1506 - Apply patch #3236 from librdkafka repository Upgrade librdkafka to 1.6.0
a975d23 is described below
commit a975d232fa1e755b960383b7447581a36b34c3fa
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Mon Feb 22 14:38:43 2021 +0100
MINIFICPP-1506 - Apply patch #3236 from librdkafka repository
Upgrade librdkafka to 1.6.0
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #1014
---
cmake/BundledLibRdKafka.cmake | 7 +-
thirdparty/librdkafka/high-cpu.patch | 185 +++++++++++++++++++++++++++++++++++
2 files changed, 190 insertions(+), 2 deletions(-)
diff --git a/cmake/BundledLibRdKafka.cmake b/cmake/BundledLibRdKafka.cmake
index fe49890..bbaaf96 100644
--- a/cmake/BundledLibRdKafka.cmake
+++ b/cmake/BundledLibRdKafka.cmake
@@ -16,6 +16,8 @@
# under the License.
function(use_bundled_librdkafka SOURCE_DIR BINARY_DIR)
+ set(PC "${Patch_EXECUTABLE}" -p1 -i "${SOURCE_DIR}/thirdparty/librdkafka/high-cpu.patch")
+
# Define byproducts
if(WIN32)
set(BYPRODUCT "lib/rdkafka.lib")
@@ -41,10 +43,11 @@ function(use_bundled_librdkafka SOURCE_DIR BINARY_DIR)
# Build project
ExternalProject_Add(
kafka-external
- URL "https://github.com/edenhill/librdkafka/archive/v1.5.0.tar.gz"
- URL_HASH "SHA256=f7fee59fdbf1286ec23ef0b35b2dfb41031c8727c90ced6435b8cf576f23a656"
+ URL "https://github.com/edenhill/librdkafka/archive/v1.6.0.tar.gz"
+ URL_HASH "SHA256=3130cbd391ef683dc9acf9f83fe82ff93b8730a1a34d0518e93c250929be9f6b"
LIST_SEPARATOR % # This is needed for passing semicolon-separated lists
CMAKE_ARGS ${LIBRDKAFKA_CMAKE_ARGS}
+ PATCH_COMMAND ${PC}
BUILD_BYPRODUCTS "${BINARY_DIR}/thirdparty/librdkafka-install/${BYPRODUCT}"
EXCLUDE_FROM_ALL TRUE
)
diff --git a/thirdparty/librdkafka/high-cpu.patch b/thirdparty/librdkafka/high-cpu.patch
new file mode 100644
index 0000000..78099ac
--- /dev/null
+++ b/thirdparty/librdkafka/high-cpu.patch
@@ -0,0 +1,185 @@
+diff -rupN orig/src/rdkafka_broker.h patched/src/rdkafka_broker.h
+--- orig/src/rdkafka_broker.h 2021-01-25 23:25:19.000000000 +0100
++++ patched/src/rdkafka_broker.h 2021-02-22 16:44:21.665943600 +0100
+@@ -317,8 +317,17 @@ struct rd_kafka_broker_s { /* rd_kafka_b
+ rd_kafka_resp_err_t err; /**< Last error code */
+ int cnt; /**< Number of identical errors */
+ } rkb_last_err;
++
++ /** Recovery actions that need to be performed*/
++ uint32_t rkb_recovery_actions;
+ };
+
++/*
++ * Recovery actions bit flag
++ */
++#define RKB_RECOVERY_ACTIONS_NONE 0x0000
++#define RKB_RECOVERY_ACTIONS_REINITIALIZE_WAKEUP_FD 0x0001
++
+ #define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt)
+ #define rd_kafka_broker_keep_fl(FUNC,LINE,RKB) \
+ rd_refcnt_add_fl(FUNC, LINE, &(RKB)->rkb_refcnt)
+@@ -450,6 +459,7 @@ rd_kafka_broker_controller_async (rd_kaf
+
+ int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist);
+ void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state);
++void rd_kafka_broker_set_recovery_action (rd_kafka_broker_t* rkbs, uint32_t action);
+
+ void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
+ int level, rd_kafka_resp_err_t err,
+diff -rupN orig/src/rdkafka_broker.c patched/src/rdkafka_broker.c
+--- orig/src/rdkafka_broker.c 2021-01-25 23:25:19.000000000 +0100
++++ patched/src/rdkafka_broker.c 2021-02-22 16:43:33.391455200 +0100
+@@ -272,6 +272,74 @@ int16_t rd_kafka_broker_ApiVersion_suppo
+ return maxver;
+ }
+
++/**
++ * @brief Setup the wake fd for IO events
++ *
++ * @locality broker creation or reconnection
++ * @locks none
++ */
++static void rd_kafka_broker_setup_queue_wakeup_fd(rd_kafka_broker_t* rkb) {
++ int r;
++
++ /*
++ * Fd-based queue wake-ups using a non-blocking pipe.
++ * Writes are best effort, if the socket queue is full
++ * the write fails (silently) but this has no effect on latency
++ * since the POLLIN flag will already have been raised for fd.
++ */
++ rkb->rkb_wakeup_fd[0] = -1;
++ rkb->rkb_wakeup_fd[1] = -1;
++ if ((r = rd_pipe_nonblocking(rkb->rkb_wakeup_fd)) == -1) {
++ rd_rkb_log(rkb, LOG_ERR, "WAKEUPFD",
++ "Failed to setup broker queue wake-up fds: "
++ "%s: disabling low-latency mode",
++ rd_strerror(r));
++
++ }
++ else if (rkb->rkb_source == RD_KAFKA_INTERNAL) {
++ /* nop: internal broker has no IO transport. */
++
++ }
++ else {
++ char onebyte = 1;
++
++ rd_rkb_dbg(rkb, QUEUE, "WAKEUPFD",
++ "Enabled low-latency ops queue wake-ups");
++ rd_kafka_q_io_event_enable(rkb->rkb_ops, rkb->rkb_wakeup_fd[1],
++ &onebyte, sizeof(onebyte));
++ }
++}
++
++/**
++ * @brief Set broker recovery action
++ *
++ * @locality any
++ * @locks none
++ */
++void rd_kafka_broker_set_recovery_action(rd_kafka_broker_t* rkbs, uint32_t action) {
++ rkbs->rkb_recovery_actions |= action;
++}
++
++/**
++ * @brief Reinitialize queue wake up fd's
++ *
++ * @locality any
++ * @locks none
++ */
++static void rd_kafka_broker_reinitialize_wake_up_fd(rd_kafka_broker_t* rkb) {
++ if ((rkb->rkb_recovery_actions & RKB_RECOVERY_ACTIONS_REINITIALIZE_WAKEUP_FD) != 0) {
++ rd_rkb_log(rkb, LOG_WARNING, "WAKEUPFD",
++ "Reinitializing the wakeup fd's");
++
++ if (rkb->rkb_wakeup_fd[0] != -1)
++ rd_close(rkb->rkb_wakeup_fd[0]);
++ if (rkb->rkb_wakeup_fd[1] != -1)
++ rd_close(rkb->rkb_wakeup_fd[1]);
++
++ rd_kafka_broker_setup_queue_wakeup_fd(rkb);
++ rkb->rkb_recovery_actions &= ~RKB_RECOVERY_ACTIONS_REINITIALIZE_WAKEUP_FD;
++ }
++}
+
+ /**
+ * @brief Set broker state.
+@@ -5174,6 +5242,8 @@ static int rd_kafka_broker_thread_main (
+ continue;
+ }
+
++ rd_kafka_broker_reinitialize_wake_up_fd(rkb);
++
+ /* Initiate asynchronous connection attempt.
+ * Only the host lookup is blocking here. */
+ r = rd_kafka_broker_connect(rkb);
+@@ -5412,6 +5482,7 @@ rd_kafka_broker_t *rd_kafka_broker_add (
+ rkb->rkb_proto = proto;
+ rkb->rkb_port = port;
+ rkb->rkb_origname = rd_strdup(name);
++ rkb->rkb_recovery_actions = RKB_RECOVERY_ACTIONS_NONE;
+
+ mtx_init(&rkb->rkb_lock, mtx_plain);
+ mtx_init(&rkb->rkb_logname_lock, mtx_plain);
+@@ -5467,33 +5538,7 @@ rd_kafka_broker_t *rd_kafka_broker_add (
+ pthread_sigmask(SIG_SETMASK, &newset, &oldset);
+ #endif
+
+- /*
+- * Fd-based queue wake-ups using a non-blocking pipe.
+- * Writes are best effort, if the socket queue is full
+- * the write fails (silently) but this has no effect on latency
+- * since the POLLIN flag will already have been raised for fd.
+- */
+- rkb->rkb_wakeup_fd[0] = -1;
+- rkb->rkb_wakeup_fd[1] = -1;
+- rkb->rkb_toppar_wakeup_fd = -1;
+-
+- if ((r = rd_pipe_nonblocking(rkb->rkb_wakeup_fd)) == -1) {
+- rd_rkb_log(rkb, LOG_ERR, "WAKEUPFD",
+- "Failed to setup broker queue wake-up fds: "
+- "%s: disabling low-latency mode",
+- rd_strerror(r));
+-
+- } else if (source == RD_KAFKA_INTERNAL) {
+- /* nop: internal broker has no IO transport. */
+-
+- } else {
+- char onebyte = 1;
+-
+- rd_rkb_dbg(rkb, QUEUE, "WAKEUPFD",
+- "Enabled low-latency ops queue wake-ups");
+- rd_kafka_q_io_event_enable(rkb->rkb_ops, rkb->rkb_wakeup_fd[1],
+- &onebyte, sizeof(onebyte));
+- }
++ rd_kafka_broker_setup_queue_wakeup_fd(rkb);
+
+ /* Lock broker's lock here to synchronise state, i.e., hold off
+ * the broker thread until we've finalized the rkb. */
+diff -rupN orig/src/rdkafka_transport.c patched/src/rdkafka_transport.c
+--- orig/src/rdkafka_transport.c 2021-01-25 23:25:19.000000000 +0100
++++ patched/src/rdkafka_transport.c 2021-02-22 16:45:02.884392000 +0100
+@@ -987,6 +987,22 @@ int rd_kafka_transport_poll(rd_kafka_tra
+ return 0;
+ } else if (r == RD_SOCKET_ERROR)
+ return -1;
++
++ /* In rare cases the local socket used for wake on IO could be
++ * disconnected and this will lead the WSAPoll to return immediately
++ * causing high CPU usage. To fix this set a broker recovery action flag
++ * to reinitialize the local io socket while also rebuildng the transport.
++ * Issue #3139 */
++ if (rktrans->rktrans_pfd[1].revents & POLLERR || rktrans->rktrans_pfd[1].revents & POLLHUP) {
++ char errstr[512];
++ rd_snprintf(errstr, sizeof(errstr),
++ "Internal IO event socket disconnected for broker: %s, revents %d",
++ rktrans->rktrans_rkb->rkb_name, rktrans->rktrans_pfd[1].revents);
++ rd_kafka_broker_set_recovery_action(rktrans->rktrans_rkb,
++ RKB_RECOVERY_ACTIONS_REINITIALIZE_WAKEUP_FD);
++ rd_kafka_transport_connect_done(rktrans, errstr);
++ return -1;
++ }
+ #endif
+ rd_atomic64_add(&rktrans->rktrans_rkb->rkb_c.wakeups, 1);
+