You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2021/02/23 15:21:56 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1506 - Apply patch #3236 from librdkafka repository Upgrade librdkafka to 1.6.0

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new a975d23  MINIFICPP-1506 - Apply patch #3236 from librdkafka repository Upgrade librdkafka to 1.6.0
a975d23 is described below

commit a975d232fa1e755b960383b7447581a36b34c3fa
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Mon Feb 22 14:38:43 2021 +0100

    MINIFICPP-1506 - Apply patch #3236 from librdkafka repository
    Upgrade librdkafka to 1.6.0
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #1014
---
 cmake/BundledLibRdKafka.cmake        |   7 +-
 thirdparty/librdkafka/high-cpu.patch | 185 +++++++++++++++++++++++++++++++++++
 2 files changed, 190 insertions(+), 2 deletions(-)

diff --git a/cmake/BundledLibRdKafka.cmake b/cmake/BundledLibRdKafka.cmake
index fe49890..bbaaf96 100644
--- a/cmake/BundledLibRdKafka.cmake
+++ b/cmake/BundledLibRdKafka.cmake
@@ -16,6 +16,8 @@
 # under the License.
 
 function(use_bundled_librdkafka SOURCE_DIR BINARY_DIR)
+    set(PC "${Patch_EXECUTABLE}" -p1 -i "${SOURCE_DIR}/thirdparty/librdkafka/high-cpu.patch")
+
     # Define byproducts
     if(WIN32)
         set(BYPRODUCT "lib/rdkafka.lib")
@@ -41,10 +43,11 @@ function(use_bundled_librdkafka SOURCE_DIR BINARY_DIR)
     # Build project
     ExternalProject_Add(
             kafka-external
-            URL "https://github.com/edenhill/librdkafka/archive/v1.5.0.tar.gz"
-            URL_HASH "SHA256=f7fee59fdbf1286ec23ef0b35b2dfb41031c8727c90ced6435b8cf576f23a656"
+            URL "https://github.com/edenhill/librdkafka/archive/v1.6.0.tar.gz"
+            URL_HASH "SHA256=3130cbd391ef683dc9acf9f83fe82ff93b8730a1a34d0518e93c250929be9f6b"
             LIST_SEPARATOR % # This is needed for passing semicolon-separated lists
             CMAKE_ARGS ${LIBRDKAFKA_CMAKE_ARGS}
+            PATCH_COMMAND ${PC}
             BUILD_BYPRODUCTS "${BINARY_DIR}/thirdparty/librdkafka-install/${BYPRODUCT}"
             EXCLUDE_FROM_ALL TRUE
     )
diff --git a/thirdparty/librdkafka/high-cpu.patch b/thirdparty/librdkafka/high-cpu.patch
new file mode 100644
index 0000000..78099ac
--- /dev/null
+++ b/thirdparty/librdkafka/high-cpu.patch
@@ -0,0 +1,185 @@
+diff -rupN orig/src/rdkafka_broker.h patched/src/rdkafka_broker.h
+--- orig/src/rdkafka_broker.h	2021-01-25 23:25:19.000000000 +0100
++++ patched/src/rdkafka_broker.h	2021-02-22 16:44:21.665943600 +0100
+@@ -317,8 +317,17 @@ struct rd_kafka_broker_s { /* rd_kafka_b
+                 rd_kafka_resp_err_t err; /**< Last error code */
+                 int  cnt;                /**< Number of identical errors */
+         } rkb_last_err;
++
++        /** Recovery actions that need to be performed*/
++        uint32_t rkb_recovery_actions;
+ };
+ 
++/* 
++ * Recovery actions bit flag
++ */
++#define RKB_RECOVERY_ACTIONS_NONE 0x0000
++#define RKB_RECOVERY_ACTIONS_REINITIALIZE_WAKEUP_FD 0x0001
++
+ #define rd_kafka_broker_keep(rkb)   rd_refcnt_add(&(rkb)->rkb_refcnt)
+ #define rd_kafka_broker_keep_fl(FUNC,LINE,RKB)  \
+         rd_refcnt_add_fl(FUNC, LINE, &(RKB)->rkb_refcnt)
+@@ -450,6 +459,7 @@ rd_kafka_broker_controller_async (rd_kaf
+ 
+ int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist);
+ void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state);
++void rd_kafka_broker_set_recovery_action (rd_kafka_broker_t* rkbs, uint32_t action);
+ 
+ void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
+                            int level, rd_kafka_resp_err_t err,
+diff -rupN orig/src/rdkafka_broker.c patched/src/rdkafka_broker.c
+--- orig/src/rdkafka_broker.c	2021-01-25 23:25:19.000000000 +0100
++++ patched/src/rdkafka_broker.c	2021-02-22 16:43:33.391455200 +0100
+@@ -272,6 +272,74 @@ int16_t rd_kafka_broker_ApiVersion_suppo
+                 return maxver;
+ }
+ 
++/**
++ * @brief Setup the wake fd for IO events
++ *
++ * @locality broker creation or reconnection
++ * @locks none
++ */
++static void rd_kafka_broker_setup_queue_wakeup_fd(rd_kafka_broker_t* rkb) {
++    int r;
++
++    /*
++     * Fd-based queue wake-ups using a non-blocking pipe.
++     * Writes are best effort, if the socket queue is full
++     * the write fails (silently) but this has no effect on latency
++     * since the POLLIN flag will already have been raised for fd.
++     */
++    rkb->rkb_wakeup_fd[0] = -1;
++    rkb->rkb_wakeup_fd[1] = -1;
++    if ((r = rd_pipe_nonblocking(rkb->rkb_wakeup_fd)) == -1) {
++        rd_rkb_log(rkb, LOG_ERR, "WAKEUPFD",
++            "Failed to setup broker queue wake-up fds: "
++            "%s: disabling low-latency mode",
++            rd_strerror(r));
++
++    }
++    else if (rkb->rkb_source == RD_KAFKA_INTERNAL) {
++        /* nop: internal broker has no IO transport. */
++
++    }
++    else {
++        char onebyte = 1;
++
++        rd_rkb_dbg(rkb, QUEUE, "WAKEUPFD",
++            "Enabled low-latency ops queue wake-ups");
++        rd_kafka_q_io_event_enable(rkb->rkb_ops, rkb->rkb_wakeup_fd[1],
++            &onebyte, sizeof(onebyte));
++    }
++}
++
++/**
++ * @brief Set broker recovery action
++ *
++ * @locality any
++ * @locks none
++ */
++void rd_kafka_broker_set_recovery_action(rd_kafka_broker_t* rkbs, uint32_t action) {
++    rkbs->rkb_recovery_actions |= action;
++}
++
++/**
++ * @brief Reinitialize queue wake up fd's
++ *
++ * @locality any
++ * @locks none
++ */
++static void rd_kafka_broker_reinitialize_wake_up_fd(rd_kafka_broker_t* rkb) {
++    if ((rkb->rkb_recovery_actions & RKB_RECOVERY_ACTIONS_REINITIALIZE_WAKEUP_FD) != 0) {
++        rd_rkb_log(rkb, LOG_WARNING, "WAKEUPFD",
++            "Reinitializing the wakeup fd's");
++
++        if (rkb->rkb_wakeup_fd[0] != -1)
++            rd_close(rkb->rkb_wakeup_fd[0]);
++        if (rkb->rkb_wakeup_fd[1] != -1)
++            rd_close(rkb->rkb_wakeup_fd[1]);
++
++        rd_kafka_broker_setup_queue_wakeup_fd(rkb);
++        rkb->rkb_recovery_actions &= ~RKB_RECOVERY_ACTIONS_REINITIALIZE_WAKEUP_FD;
++    }
++}
+ 
+ /**
+  * @brief Set broker state.
+@@ -5174,6 +5242,8 @@ static int rd_kafka_broker_thread_main (
+                                 continue;
+                         }
+ 
++                        rd_kafka_broker_reinitialize_wake_up_fd(rkb);
++
+ 			/* Initiate asynchronous connection attempt.
+ 			 * Only the host lookup is blocking here. */
+                         r = rd_kafka_broker_connect(rkb);
+@@ -5412,6 +5482,7 @@ rd_kafka_broker_t *rd_kafka_broker_add (
+ 	rkb->rkb_proto = proto;
+         rkb->rkb_port = port;
+         rkb->rkb_origname = rd_strdup(name);
++        rkb->rkb_recovery_actions = RKB_RECOVERY_ACTIONS_NONE;
+ 
+ 	mtx_init(&rkb->rkb_lock, mtx_plain);
+         mtx_init(&rkb->rkb_logname_lock, mtx_plain);
+@@ -5467,33 +5538,7 @@ rd_kafka_broker_t *rd_kafka_broker_add (
+         pthread_sigmask(SIG_SETMASK, &newset, &oldset);
+ #endif
+ 
+-        /*
+-         * Fd-based queue wake-ups using a non-blocking pipe.
+-         * Writes are best effort, if the socket queue is full
+-         * the write fails (silently) but this has no effect on latency
+-         * since the POLLIN flag will already have been raised for fd.
+-         */
+-        rkb->rkb_wakeup_fd[0]     = -1;
+-        rkb->rkb_wakeup_fd[1]     = -1;
+-        rkb->rkb_toppar_wakeup_fd = -1;
+-
+-        if ((r = rd_pipe_nonblocking(rkb->rkb_wakeup_fd)) == -1) {
+-                rd_rkb_log(rkb, LOG_ERR, "WAKEUPFD",
+-                           "Failed to setup broker queue wake-up fds: "
+-                           "%s: disabling low-latency mode",
+-                           rd_strerror(r));
+-
+-        } else if (source == RD_KAFKA_INTERNAL) {
+-                /* nop: internal broker has no IO transport. */
+-
+-        } else {
+-                char onebyte = 1;
+-
+-                rd_rkb_dbg(rkb, QUEUE, "WAKEUPFD",
+-                           "Enabled low-latency ops queue wake-ups");
+-                rd_kafka_q_io_event_enable(rkb->rkb_ops, rkb->rkb_wakeup_fd[1],
+-                                           &onebyte, sizeof(onebyte));
+-        }
++        rd_kafka_broker_setup_queue_wakeup_fd(rkb);
+ 
+         /* Lock broker's lock here to synchronise state, i.e., hold off
+ 	 * the broker thread until we've finalized the rkb. */
+diff -rupN orig/src/rdkafka_transport.c patched/src/rdkafka_transport.c
+--- orig/src/rdkafka_transport.c	2021-01-25 23:25:19.000000000 +0100
++++ patched/src/rdkafka_transport.c	2021-02-22 16:45:02.884392000 +0100
+@@ -987,6 +987,22 @@ int rd_kafka_transport_poll(rd_kafka_tra
+ 			return 0;
+ 	} else if (r == RD_SOCKET_ERROR)
+ 		return -1;
++
++        /* In rare cases the local socket used for wake on IO could be
++        * disconnected and this will lead the WSAPoll to return immediately
++        * causing high CPU usage. To fix this set a broker recovery action flag
++        * to reinitialize the local io socket while also rebuildng the transport.
++        * Issue #3139 */
++        if (rktrans->rktrans_pfd[1].revents & POLLERR || rktrans->rktrans_pfd[1].revents & POLLHUP) {
++                char errstr[512];
++                rd_snprintf(errstr, sizeof(errstr),
++                        "Internal IO event socket disconnected for broker: %s, revents %d",
++                        rktrans->rktrans_rkb->rkb_name, rktrans->rktrans_pfd[1].revents);
++                rd_kafka_broker_set_recovery_action(rktrans->rktrans_rkb,
++                        RKB_RECOVERY_ACTIONS_REINITIALIZE_WAKEUP_FD);
++                rd_kafka_transport_connect_done(rktrans, errstr);
++                return -1;
++        }
+ #endif
+         rd_atomic64_add(&rktrans->rktrans_rkb->rkb_c.wakeups, 1);
+