You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:45 UTC

[17/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade to librdkafka 0.11.4

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_win32.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_win32.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_win32.c
deleted file mode 100644
index b81ff87..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_win32.c
+++ /dev/null
@@ -1,526 +0,0 @@
-/*
- * librdkafka - The Apache Kafka C/C++ library
- *
- * Copyright (c) 2016 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-/**
- * Impelements SASL Kerberos GSSAPI authentication client
- * using the native Win32 SSPI.
- */
-
-#include "rdkafka_int.h"
-#include "rdkafka_transport.h"
-#include "rdkafka_transport_int.h"
-#include "rdkafka_sasl.h"
-#include "rdkafka_sasl_int.h"
-
-
-#include <stdio.h>
-#include <windows.h>
-#include <ntsecapi.h>
-
-#define SECURITY_WIN32
-#pragma comment(lib, "Secur32.lib")
-#include <Sspi.h>
-
-
-#define RD_KAFKA_SASL_SSPI_CTX_ATTRS \
- (ISC_REQ_CONFIDENTIALITY | ISC_REQ_REPLAY_DETECT | \
-  ISC_REQ_SEQUENCE_DETECT | ISC_REQ_CONNECTION)
-
-
- /* Default maximum kerberos token size for newer versions of Windows */
-#define RD_KAFKA_SSPI_MAX_TOKEN_SIZE 48000
-
-
-/**
- * @brief Per-connection SASL state
- */
-typedef struct rd_kafka_sasl_win32_state_s {
-        CredHandle *cred;
-        CtxtHandle *ctx;
-        wchar_t principal[512];
-} rd_kafka_sasl_win32_state_t;
-
-
-/**
- * @returns the string representation of a SECURITY_STATUS error code
- */
-static const char *rd_kafka_sasl_sspi_err2str (SECURITY_STATUS sr) {
-        switch (sr)
-        {
-                case SEC_E_INSUFFICIENT_MEMORY:
-                        return "Insufficient memory";
-                case SEC_E_INTERNAL_ERROR:
-                        return "Internal error";
-                case SEC_E_INVALID_HANDLE:
-                        return "Invalid handle";
-                case SEC_E_INVALID_TOKEN:
-                        return "Invalid token";
-                case SEC_E_LOGON_DENIED:
-                        return "Logon denied";
-                case SEC_E_NO_AUTHENTICATING_AUTHORITY:
-                        return "No authority could be contacted for authentication.";
-                case SEC_E_NO_CREDENTIALS:
-                        return "No credentials";
-                case SEC_E_TARGET_UNKNOWN:
-                        return "Target unknown";
-                case SEC_E_UNSUPPORTED_FUNCTION:
-                        return "Unsupported functionality";
-                case SEC_E_WRONG_CREDENTIAL_HANDLE:
-                        return "The principal that received the authentication "
-                                "request is not the same as the one passed "
-                                "into  the pszTargetName parameter. "
-                                "This indicates a failure in mutual "
-                                "authentication.";
-                default:
-                        return "(no string representation)";
-        }
-}
-
-
-/**
- * @brief Create new CredHandle
- */
-static CredHandle *
-rd_kafka_sasl_sspi_cred_new (rd_kafka_transport_t *rktrans,
-                              char *errstr, size_t errstr_size) {
-        TimeStamp expiry = { 0, 0 };
-        SECURITY_STATUS sr;
-        CredHandle *cred = rd_calloc(1, sizeof(*cred));
-
-        sr = AcquireCredentialsHandle(
-                NULL, __TEXT("Kerberos"), SECPKG_CRED_OUTBOUND,
-                NULL, NULL, NULL, NULL, cred, &expiry);
-
-        if (sr != SEC_E_OK) {
-                rd_free(cred);
-                rd_snprintf(errstr, errstr_size,
-                            "Failed to acquire CredentialsHandle: "
-                            "error code %d", sr);
-                return NULL;
-        }
-
-        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
-                   "Acquired Kerberos credentials handle (expiry in %d.%ds)",
-                   expiry.u.HighPart, expiry.u.LowPart);
-
-        return cred;
-}
-
-
-/**
-  * @brief Start or continue SSPI-based authentication processing.
-  */
-static int rd_kafka_sasl_sspi_continue (rd_kafka_transport_t *rktrans,
-                                        const void *inbuf, size_t insize,
-                                        char *errstr, size_t errstr_size) {
-        rd_kafka_sasl_win32_state_t *state = rktrans->rktrans_sasl.state;
-        SecBufferDesc outbufdesc, inbufdesc;
-        SecBuffer outsecbuf, insecbuf;
-        BYTE outbuf[RD_KAFKA_SSPI_MAX_TOKEN_SIZE];
-        TimeStamp lifespan = { 0, 0 };
-        ULONG ret_ctxattrs;
-        CtxtHandle *ctx;
-        SECURITY_STATUS sr;
-
-        if (inbuf) {
-                if (insize > ULONG_MAX) {
-                        rd_snprintf(errstr, errstr_size,
-                                    "Input buffer length too large (%"PRIusz") "
-                                    "and would overflow", insize);
-                        return -1;
-                }
-
-                inbufdesc.ulVersion = SECBUFFER_VERSION;
-                inbufdesc.cBuffers = 1;
-                inbufdesc.pBuffers  = &insecbuf;
-
-                insecbuf.cbBuffer   = (unsigned long)insize;
-                insecbuf.BufferType = SECBUFFER_TOKEN;
-                insecbuf.pvBuffer   = (void *)inbuf;
-        }
-
-        outbufdesc.ulVersion = SECBUFFER_VERSION;
-        outbufdesc.cBuffers  = 1;
-        outbufdesc.pBuffers  = &outsecbuf;
-
-        outsecbuf.cbBuffer   = sizeof(outbuf);
-        outsecbuf.BufferType = SECBUFFER_TOKEN;
-        outsecbuf.pvBuffer   = outbuf;
-
-        if (!(ctx = state->ctx)) {
-                /* First time: allocate context handle
-                 * which will be filled in by Initialize..() */
-                ctx = rd_calloc(1, sizeof(*ctx));
-        }
-
-        sr = InitializeSecurityContext(
-                state->cred, state->ctx, state->principal,
-                RD_KAFKA_SASL_SSPI_CTX_ATTRS |
-                (state->ctx ? 0 : ISC_REQ_MUTUAL_AUTH | ISC_REQ_IDENTIFY),
-                0, SECURITY_NATIVE_DREP,
-                inbuf ? &inbufdesc : NULL,
-                0, ctx, &outbufdesc, &ret_ctxattrs, &lifespan);
-
-        if (!state->ctx)
-                state->ctx = ctx;
-
-        switch (sr)
-        {
-                case SEC_E_OK:
-                        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLAUTH",
-                                   "Initialized security context");
-
-                        rktrans->rktrans_sasl.complete = 1;
-                        break;
-                case SEC_I_CONTINUE_NEEDED:
-                        break;
-                case SEC_I_COMPLETE_NEEDED:
-                case SEC_I_COMPLETE_AND_CONTINUE:
-                        rd_snprintf(errstr, errstr_size,
-                                    "CompleteAuthToken (Digest auth, %d) "
-                                    "not implemented", sr);
-                        return -1;
-                case SEC_I_INCOMPLETE_CREDENTIALS:
-                        rd_snprintf(errstr, errstr_size,
-                                    "Incomplete credentials: "
-                                    "invalid or untrusted certificate");
-                        return -1;
-                default:
-                        rd_snprintf(errstr, errstr_size,
-                                    "InitializeSecurityContext "
-                                    "failed: %s (0x%x)",
-                                    rd_kafka_sasl_sspi_err2str(sr), sr);
-                        return -1;
-        }
-
-        if (rd_kafka_sasl_send(rktrans,
-                                outsecbuf.pvBuffer, outsecbuf.cbBuffer,
-                                errstr, errstr_size) == -1)
-                return -1;
-
-        return 0;
-}
-
-
-/**
-* @brief Sends the token response to the broker
-*/
-static int rd_kafka_sasl_win32_send_response (rd_kafka_transport_t *rktrans,
-                                              char *errstr,
-                                              size_t errstr_size,
-                                              SecBuffer *server_token) {
-        rd_kafka_sasl_win32_state_t *state = rktrans->rktrans_sasl.state;
-        SECURITY_STATUS sr;
-        SecBuffer in_buffer;
-        SecBuffer out_buffer;
-        SecBuffer buffers[4];
-        SecBufferDesc buffer_desc;
-        SecPkgContext_Sizes sizes;
-        SecPkgCredentials_NamesA names;
-        int send_response;
-        size_t namelen;
-
-        sr = QueryContextAttributes(state->ctx, SECPKG_ATTR_SIZES, &sizes);
-        if (sr != SEC_E_OK) {
-                rd_snprintf(errstr, errstr_size,
-                            "Send response failed: %s (0x%x)",
-                            rd_kafka_sasl_sspi_err2str(sr), sr);
-                return -1;
-        }
-
-        RD_MEMZERO(names);
-        sr = QueryCredentialsAttributesA(state->cred, SECPKG_CRED_ATTR_NAMES,
-                                         &names);
-
-        if (sr != SEC_E_OK) {
-                rd_snprintf(errstr, errstr_size,
-                            "Query credentials failed: %s (0x%x)",
-                            rd_kafka_sasl_sspi_err2str(sr), sr);
-                return -1;
-        }
-
-        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLAUTH",
-                   "Sending response message for user: %s", names.sUserName);
-
-        namelen = strlen(names.sUserName) + 1;
-        if (namelen > ULONG_MAX) {
-                rd_snprintf(errstr, errstr_size,
-                            "User name length too large (%"PRIusz") "
-                            "and would overflow");
-                return -1;
-        }
-
-        in_buffer.pvBuffer = (char *)names.sUserName;
-        in_buffer.cbBuffer = (unsigned long)namelen;
-
-        buffer_desc.cBuffers = 4;
-        buffer_desc.pBuffers = buffers;
-        buffer_desc.ulVersion = SECBUFFER_VERSION;
-
-        /* security trailer */
-        buffers[0].cbBuffer = sizes.cbSecurityTrailer;
-        buffers[0].BufferType = SECBUFFER_TOKEN;
-        buffers[0].pvBuffer = rd_calloc(1, sizes.cbSecurityTrailer);
-
-        /* protection level and buffer size received from the server */
-        buffers[1].cbBuffer = server_token->cbBuffer;
-        buffers[1].BufferType = SECBUFFER_DATA;
-        buffers[1].pvBuffer = rd_calloc(1, server_token->cbBuffer);
-        memcpy(buffers[1].pvBuffer, server_token->pvBuffer, server_token->cbBuffer);
-
-        /* user principal */
-        buffers[2].cbBuffer = in_buffer.cbBuffer;
-        buffers[2].BufferType = SECBUFFER_DATA;
-        buffers[2].pvBuffer = rd_calloc(1, buffers[2].cbBuffer);
-        memcpy(buffers[2].pvBuffer, in_buffer.pvBuffer, in_buffer.cbBuffer);
-
-        /* padding */
-        buffers[3].cbBuffer = sizes.cbBlockSize;
-        buffers[3].BufferType = SECBUFFER_PADDING;
-        buffers[3].pvBuffer = rd_calloc(1, buffers[2].cbBuffer);
-
-        sr = EncryptMessage(state->ctx, KERB_WRAP_NO_ENCRYPT, &buffer_desc, 0);
-        if (sr != SEC_E_OK) {
-                rd_snprintf(errstr, errstr_size,
-                            "Encrypt message failed: %s (0x%x)",
-                            rd_kafka_sasl_sspi_err2str(sr), sr);
-
-                FreeContextBuffer(in_buffer.pvBuffer);
-                rd_free(buffers[0].pvBuffer);
-                rd_free(buffers[1].pvBuffer);
-                rd_free(buffers[2].pvBuffer);
-                rd_free(buffers[3].pvBuffer);
-                return -1;
-        }
-
-        out_buffer.cbBuffer = buffers[0].cbBuffer +
-                              buffers[1].cbBuffer +
-                              buffers[2].cbBuffer +
-                              buffers[3].cbBuffer;
-
-        out_buffer.pvBuffer = rd_calloc(1, buffers[0].cbBuffer +
-                                        buffers[1].cbBuffer +
-                                        buffers[2].cbBuffer +
-                                        buffers[3].cbBuffer);
-
-        memcpy(out_buffer.pvBuffer, buffers[0].pvBuffer, buffers[0].cbBuffer);
-
-        memcpy((unsigned char *)out_buffer.pvBuffer + (int)buffers[0].cbBuffer,
-               buffers[1].pvBuffer, buffers[1].cbBuffer);
-
-        memcpy((unsigned char *)out_buffer.pvBuffer +
-                buffers[0].cbBuffer + buffers[1].cbBuffer,
-                buffers[2].pvBuffer, buffers[2].cbBuffer);
-
-        memcpy((unsigned char *)out_buffer.pvBuffer +
-                buffers[0].cbBuffer + buffers[1].cbBuffer + buffers[2].cbBuffer,
-                buffers[3].pvBuffer, buffers[3].cbBuffer);
-
-        send_response = rd_kafka_sasl_send(rktrans,
-                                           out_buffer.pvBuffer,
-                                           out_buffer.cbBuffer,
-                                           errstr, errstr_size);
-
-        FreeContextBuffer(in_buffer.pvBuffer);
-        rd_free(out_buffer.pvBuffer);
-        rd_free(buffers[0].pvBuffer);
-        rd_free(buffers[1].pvBuffer);
-        rd_free(buffers[2].pvBuffer);
-        rd_free(buffers[3].pvBuffer);
-
-        return send_response;
-}
-
-
-/**
-* @brief Unwrap and validate token response from broker.
-*/
-static int rd_kafka_sasl_win32_validate_token (rd_kafka_transport_t *rktrans,
-                                               const void *inbuf,
-                                               size_t insize,
-                                               char *errstr,
-                                               size_t errstr_size) {
-        rd_kafka_sasl_win32_state_t *state = rktrans->rktrans_sasl.state;
-        SecBuffer buffers[2];
-        SecBufferDesc buffer_desc;
-        SECURITY_STATUS sr;
-        char supported;
-
-        if (insize > ULONG_MAX) {
-                rd_snprintf(errstr, errstr_size,
-                            "Input buffer length too large (%"PRIusz") "
-                            "and would overflow");
-                return -1;
-        }
-
-        buffer_desc.cBuffers = 2;
-        buffer_desc.pBuffers = buffers;
-        buffer_desc.ulVersion = SECBUFFER_VERSION;
-
-        buffers[0].cbBuffer = (unsigned long)insize;
-        buffers[0].BufferType = SECBUFFER_STREAM;
-        buffers[0].pvBuffer = (void *)inbuf;
-
-        buffers[1].cbBuffer = 0;
-        buffers[1].BufferType = SECBUFFER_DATA;
-        buffers[1].pvBuffer = NULL;
-
-        sr = DecryptMessage(state->ctx, &buffer_desc, 0, NULL);
-        if (sr != SEC_E_OK) {
-                rd_snprintf(errstr, errstr_size,
-                            "Decrypt message failed: %s (0x%x)",
-                            rd_kafka_sasl_sspi_err2str(sr), sr);
-                return -1;
-        }
-
-        if (buffers[1].cbBuffer < 4) {
-                rd_snprintf(errstr, errstr_size,
-                            "Validate token: "
-                            "invalid message");
-                return -1;
-        }
-
-        supported = ((char *)buffers[1].pvBuffer)[0];
-        if (!(supported & 1)) {
-                rd_snprintf(errstr, errstr_size,
-                            "Validate token: "
-                            "server does not support layer");
-                return -1;
-        }
-
-        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLAUTH",
-                   "Validated server token");
-
-        return rd_kafka_sasl_win32_send_response(rktrans, errstr,
-                                                 errstr_size, &buffers[1]);
-}
-
-
-/**
-* @brief Handle SASL frame received from broker.
-*/
-static int rd_kafka_sasl_win32_recv (struct rd_kafka_transport_s *rktrans,
-                                     const void *buf, size_t size,
-                                     char *errstr, size_t errstr_size) {
-        rd_kafka_sasl_win32_state_t *state = rktrans->rktrans_sasl.state;
-
-        if (rktrans->rktrans_sasl.complete) {
-                if (rd_kafka_sasl_win32_validate_token(
-                        rktrans, buf, size, errstr, errstr_size) == -1) {
-                        rktrans->rktrans_sasl.complete = 0;
-                        return -1;
-                }
-
-                /* Final ack from broker. */
-                rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLAUTH",
-                           "Authenticated");
-                rd_kafka_sasl_auth_done(rktrans);
-                return 0;
-        }
-
-        return rd_kafka_sasl_sspi_continue(rktrans, buf, size,
-                                           errstr, errstr_size);
-}
-
-
-/**
-* @brief Decommission SSPI state
-*/
-static void rd_kafka_sasl_win32_close (rd_kafka_transport_t *rktrans) {
-        rd_kafka_sasl_win32_state_t *state = rktrans->rktrans_sasl.state;
-
-        if (!state)
-                return;
-
-        if (state->ctx) {
-                DeleteSecurityContext(state->ctx);
-                rd_free(state->ctx);
-        }
-        if (state->cred) {
-                FreeCredentialsHandle(state->cred);
-                rd_free(state->cred);
-        }
-        rd_free(state);
-}
-
-
-static int rd_kafka_sasl_win32_client_new (rd_kafka_transport_t *rktrans,
-                                           const char *hostname,
-                                           char *errstr, size_t errstr_size) {
-        rd_kafka_t *rk = rktrans->rktrans_rkb->rkb_rk;
-        rd_kafka_sasl_win32_state_t *state;
-
-        if (strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) {
-                rd_snprintf(errstr, errstr_size,
-                            "SASL mechanism \"%s\" not supported on platform",
-                            rk->rk_conf.sasl.mechanisms);
-                return -1;
-        }
-
-        state = rd_calloc(1, sizeof(*state));
-        rktrans->rktrans_sasl.state = state;
-
-        _snwprintf(state->principal, RD_ARRAYSIZE(state->principal),
-                   L"%hs/%hs",
-                   rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.principal,
-                   hostname);
-
-        state->cred = rd_kafka_sasl_sspi_cred_new(rktrans, errstr,
-                                                  errstr_size);
-        if (!state->cred)
-                return -1;
-
-        if (rd_kafka_sasl_sspi_continue(rktrans, NULL, 0,
-                                        errstr, errstr_size) == -1)
-                return -1;
-
-        return 0;
-}
-
-/**
- * @brief Validate config
- */
-static int rd_kafka_sasl_win32_conf_validate (rd_kafka_t *rk,
-                                              char *errstr,
-                                              size_t errstr_size) {
-        if (!rk->rk_conf.sasl.principal) {
-                rd_snprintf(errstr, errstr_size,
-                            "sasl.kerberos.principal must be set");
-                return -1;
-        }
-
-        return 0;
-}
-
-const struct rd_kafka_sasl_provider rd_kafka_sasl_win32_provider = {
-        .name          = "Win32 SSPI",
-        .client_new    = rd_kafka_sasl_win32_client_new,
-        .recv          = rd_kafka_sasl_win32_recv,
-        .close         = rd_kafka_sasl_win32_close,
-        .conf_validate = rd_kafka_sasl_win32_conf_validate
-};

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_subscription.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_subscription.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_subscription.c
deleted file mode 100644
index 18a2458..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_subscription.c
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-
-/**
- * This is the high level consumer API which is mutually exclusive
- * with the old legacy simple consumer.
- * Only one of these interfaces may be used on a given rd_kafka_t handle.
- */
-
-#include "rdkafka_int.h"
-#include "rdkafka_subscription.h"
-
-
-rd_kafka_resp_err_t rd_kafka_unsubscribe (rd_kafka_t *rk) {
-        rd_kafka_cgrp_t *rkcg;
-
-        if (!(rkcg = rd_kafka_cgrp_get(rk)))
-                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
-        return rd_kafka_op_err_destroy(rd_kafka_op_req2(rkcg->rkcg_ops,
-                                                        RD_KAFKA_OP_SUBSCRIBE));
-}
-
-
-/** @returns 1 if the topic is invalid (bad regex, empty), else 0 if valid. */
-static size_t _invalid_topic_cb (const rd_kafka_topic_partition_t *rktpar,
-                                 void *opaque) {
-        rd_regex_t *re;
-        char errstr[1];
-
-        if (!*rktpar->topic)
-                return 1;
-
-        if (*rktpar->topic != '^')
-                return 0;
-
-        if (!(re = rd_regex_comp(rktpar->topic, errstr, sizeof(errstr))))
-                return 1;
-
-        rd_regex_destroy(re);
-
-        return 0;
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_subscribe (rd_kafka_t *rk,
-                    const rd_kafka_topic_partition_list_t *topics) {
-
-        rd_kafka_op_t *rko;
-        rd_kafka_cgrp_t *rkcg;
-
-        if (!(rkcg = rd_kafka_cgrp_get(rk)))
-                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
-        /* Validate topics */
-        if (topics->cnt == 0 ||
-            rd_kafka_topic_partition_list_sum(topics,
-                                              _invalid_topic_cb, NULL) > 0)
-                return RD_KAFKA_RESP_ERR__INVALID_ARG;
-
-        rko = rd_kafka_op_new(RD_KAFKA_OP_SUBSCRIBE);
-	rko->rko_u.subscribe.topics = rd_kafka_topic_partition_list_copy(topics);
-
-        return rd_kafka_op_err_destroy(
-                rd_kafka_op_req(rkcg->rkcg_ops, rko, RD_POLL_INFINITE));
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_assign (rd_kafka_t *rk,
-                 const rd_kafka_topic_partition_list_t *partitions) {
-        rd_kafka_op_t *rko;
-        rd_kafka_cgrp_t *rkcg;
-
-        if (!(rkcg = rd_kafka_cgrp_get(rk)))
-                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
-        rko = rd_kafka_op_new(RD_KAFKA_OP_ASSIGN);
-	if (partitions)
-		rko->rko_u.assign.partitions =
-                        rd_kafka_topic_partition_list_copy(partitions);
-
-        return rd_kafka_op_err_destroy(
-                rd_kafka_op_req(rkcg->rkcg_ops, rko, RD_POLL_INFINITE));
-}
-
-
-
-rd_kafka_resp_err_t
-rd_kafka_assignment (rd_kafka_t *rk,
-                     rd_kafka_topic_partition_list_t **partitions) {
-        rd_kafka_op_t *rko;
-        rd_kafka_resp_err_t err;
-        rd_kafka_cgrp_t *rkcg;
-
-        if (!(rkcg = rd_kafka_cgrp_get(rk)))
-                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
-        rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_GET_ASSIGNMENT);
-	if (!rko)
-		return RD_KAFKA_RESP_ERR__TIMED_OUT;
-
-        err = rko->rko_err;
-
-        *partitions = rko->rko_u.assign.partitions;
-	rko->rko_u.assign.partitions = NULL;
-        rd_kafka_op_destroy(rko);
-
-        if (!*partitions && !err) {
-                /* Create an empty list for convenience of the caller */
-                *partitions = rd_kafka_topic_partition_list_new(0);
-        }
-
-        return err;
-}
-
-rd_kafka_resp_err_t
-rd_kafka_subscription (rd_kafka_t *rk,
-                       rd_kafka_topic_partition_list_t **topics){
-	rd_kafka_op_t *rko;
-        rd_kafka_resp_err_t err;
-        rd_kafka_cgrp_t *rkcg;
-
-        if (!(rkcg = rd_kafka_cgrp_get(rk)))
-                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
-        rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_GET_SUBSCRIPTION);
-	if (!rko)
-		return RD_KAFKA_RESP_ERR__TIMED_OUT;
-
-        err = rko->rko_err;
-
-        *topics = rko->rko_u.subscribe.topics;
-	rko->rko_u.subscribe.topics = NULL;
-        rd_kafka_op_destroy(rko);
-
-        if (!*topics && !err) {
-                /* Create an empty list for convenience of the caller */
-                *topics = rd_kafka_topic_partition_list_new(0);
-        }
-
-        return err;
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_pause_partitions (rd_kafka_t *rk,
-			   rd_kafka_topic_partition_list_t *partitions) {
-	return rd_kafka_toppars_pause_resume(rk, 1, RD_KAFKA_TOPPAR_F_APP_PAUSE,
-					     partitions);
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_resume_partitions (rd_kafka_t *rk,
-			   rd_kafka_topic_partition_list_t *partitions) {
-	return rd_kafka_toppars_pause_resume(rk, 0, RD_KAFKA_TOPPAR_F_APP_PAUSE,
-					     partitions);
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_subscription.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_subscription.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_subscription.h
deleted file mode 100644
index 0c51712..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_subscription.h
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_timer.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_timer.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_timer.c
deleted file mode 100644
index 7947980..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_timer.c
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2013, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rdkafka_int.h"
-#include "rd.h"
-#include "rdtime.h"
-#include "rdsysqueue.h"
-
-
-static RD_INLINE void rd_kafka_timers_lock (rd_kafka_timers_t *rkts) {
-        mtx_lock(&rkts->rkts_lock);
-}
-
-static RD_INLINE void rd_kafka_timers_unlock (rd_kafka_timers_t *rkts) {
-        mtx_unlock(&rkts->rkts_lock);
-}
-
-
-static RD_INLINE int rd_kafka_timer_started (const rd_kafka_timer_t *rtmr) {
-	return rtmr->rtmr_interval ? 1 : 0;
-}
-
-
-static RD_INLINE int rd_kafka_timer_scheduled (const rd_kafka_timer_t *rtmr) {
-	return rtmr->rtmr_next ? 1 : 0;
-}
-
-
-static int rd_kafka_timer_cmp (const void *_a, const void *_b) {
-	const rd_kafka_timer_t *a = _a, *b = _b;
-	return (int)(a->rtmr_next - b->rtmr_next);
-}
-
-static void rd_kafka_timer_unschedule (rd_kafka_timers_t *rkts,
-                                       rd_kafka_timer_t *rtmr) {
-	TAILQ_REMOVE(&rkts->rkts_timers, rtmr, rtmr_link);
-	rtmr->rtmr_next = 0;
-}
-
-static void rd_kafka_timer_schedule (rd_kafka_timers_t *rkts,
-				     rd_kafka_timer_t *rtmr, int extra_us) {
-	rd_kafka_timer_t *first;
-
-	/* Timer has been stopped */
-	if (!rtmr->rtmr_interval)
-		return;
-
-        /* Timers framework is terminating */
-        if (unlikely(!rkts->rkts_enabled))
-                return;
-
-	rtmr->rtmr_next = rd_clock() + rtmr->rtmr_interval + extra_us;
-
-	if (!(first = TAILQ_FIRST(&rkts->rkts_timers)) ||
-	    first->rtmr_next > rtmr->rtmr_next) {
-		TAILQ_INSERT_HEAD(&rkts->rkts_timers, rtmr, rtmr_link);
-                cnd_signal(&rkts->rkts_cond);
-	} else
-		TAILQ_INSERT_SORTED(&rkts->rkts_timers, rtmr,
-                                    rd_kafka_timer_t *, rtmr_link,
-				    rd_kafka_timer_cmp);
-}
-
-/**
- * Stop a timer that may be started.
- * If called from inside a timer callback 'lock' must be 0, else 1.
- */
-void rd_kafka_timer_stop (rd_kafka_timers_t *rkts, rd_kafka_timer_t *rtmr,
-                          int lock) {
-	if (lock)
-		rd_kafka_timers_lock(rkts);
-
-	if (!rd_kafka_timer_started(rtmr)) {
-		if (lock)
-			rd_kafka_timers_unlock(rkts);
-		return;
-	}
-
-	if (rd_kafka_timer_scheduled(rtmr))
-		rd_kafka_timer_unschedule(rkts, rtmr);
-
-	rtmr->rtmr_interval = 0;
-
-	if (lock)
-		rd_kafka_timers_unlock(rkts);
-}
-
-
-/**
- * Start the provided timer with the given interval.
- * Upon expiration of the interval (us) the callback will be called in the
- * main rdkafka thread, after callback return the timer will be restarted.
- *
- * Use rd_kafka_timer_stop() to stop a timer.
- */
-void rd_kafka_timer_start (rd_kafka_timers_t *rkts,
-			   rd_kafka_timer_t *rtmr, rd_ts_t interval,
-			   void (*callback) (rd_kafka_timers_t *rkts, void *arg),
-			   void *arg) {
-	rd_kafka_timers_lock(rkts);
-
-	rd_kafka_timer_stop(rkts, rtmr, 0/*!lock*/);
-
-	rtmr->rtmr_interval = interval;
-	rtmr->rtmr_callback = callback;
-	rtmr->rtmr_arg      = arg;
-
-	rd_kafka_timer_schedule(rkts, rtmr, 0);
-
-	rd_kafka_timers_unlock(rkts);
-}
-
-
-/**
- * Delay the next timer invocation by 'backoff_us'
- */
-void rd_kafka_timer_backoff (rd_kafka_timers_t *rkts,
-			     rd_kafka_timer_t *rtmr, int backoff_us) {
-	rd_kafka_timers_lock(rkts);
-	if (rd_kafka_timer_scheduled(rtmr))
-		rd_kafka_timer_unschedule(rkts, rtmr);
-	rd_kafka_timer_schedule(rkts, rtmr, backoff_us);
-	rd_kafka_timers_unlock(rkts);
-}
-
-
-/**
- * @returns the delta time to the next time (>=0) this timer fires, or -1
- *          if timer is stopped.
- */
-rd_ts_t rd_kafka_timer_next (rd_kafka_timers_t *rkts, rd_kafka_timer_t *rtmr,
-                             int do_lock) {
-        rd_ts_t now = rd_clock();
-        rd_ts_t delta = -1;
-
-        if (do_lock)
-                rd_kafka_timers_lock(rkts);
-
-        if (rd_kafka_timer_scheduled(rtmr)) {
-                delta = rtmr->rtmr_next - now;
-                if (delta < 0)
-                        delta = 0;
-        }
-
-        if (do_lock)
-                rd_kafka_timers_unlock(rkts);
-
-        return delta;
-}
-
-
-/**
- * Interrupt rd_kafka_timers_run().
- * Used for termination.
- */
-void rd_kafka_timers_interrupt (rd_kafka_timers_t *rkts) {
-	rd_kafka_timers_lock(rkts);
-	cnd_signal(&rkts->rkts_cond);
-	rd_kafka_timers_unlock(rkts);
-}
-
-
-/**
- * Returns the delta time to the next timer to fire, capped by 'timeout_ms'.
- */
-rd_ts_t rd_kafka_timers_next (rd_kafka_timers_t *rkts, int timeout_us,
-			      int do_lock) {
-	rd_ts_t now = rd_clock();
-	rd_ts_t sleeptime = 0;
-	rd_kafka_timer_t *rtmr;
-
-	if (do_lock)
-		rd_kafka_timers_lock(rkts);
-
-	if (likely((rtmr = TAILQ_FIRST(&rkts->rkts_timers)) != NULL)) {
-		sleeptime = rtmr->rtmr_next - now;
-		if (sleeptime < 0)
-			sleeptime = 0;
-		else if (sleeptime > (rd_ts_t)timeout_us)
-			sleeptime = (rd_ts_t)timeout_us;
-	} else
-		sleeptime = (rd_ts_t)timeout_us;
-
-	if (do_lock)
-		rd_kafka_timers_unlock(rkts);
-
-	return sleeptime;
-}
-
-
-/**
- * Dispatch timers.
- * Will block up to 'timeout' microseconds before returning.
- */
-void rd_kafka_timers_run (rd_kafka_timers_t *rkts, int timeout_us) {
-	rd_ts_t now = rd_clock();
-	rd_ts_t end = now + timeout_us;
-
-        rd_kafka_timers_lock(rkts);
-
-	while (!rd_atomic32_get(&rkts->rkts_rk->rk_terminate) && now <= end) {
-		int64_t sleeptime;
-		rd_kafka_timer_t *rtmr;
-
-		if (timeout_us != RD_POLL_NOWAIT) {
-			sleeptime = rd_kafka_timers_next(rkts,
-							 timeout_us,
-							 0/*no-lock*/);
-
-			if (sleeptime > 0) {
-				cnd_timedwait_ms(&rkts->rkts_cond,
-						 &rkts->rkts_lock,
-						 (int)(sleeptime / 1000));
-
-			}
-		}
-
-		now = rd_clock();
-
-		while ((rtmr = TAILQ_FIRST(&rkts->rkts_timers)) &&
-		       rtmr->rtmr_next <= now) {
-
-			rd_kafka_timer_unschedule(rkts, rtmr);
-                        rd_kafka_timers_unlock(rkts);
-
-			rtmr->rtmr_callback(rkts, rtmr->rtmr_arg);
-
-                        rd_kafka_timers_lock(rkts);
-			/* Restart timer, unless it has been stopped, or
-			 * already reschedueld (start()ed) from callback. */
-			if (rd_kafka_timer_started(rtmr) &&
-			    !rd_kafka_timer_scheduled(rtmr))
-				rd_kafka_timer_schedule(rkts, rtmr, 0);
-		}
-
-		if (timeout_us == RD_POLL_NOWAIT) {
-			/* Only iterate once, even if rd_clock doesn't change */
-			break;
-		}
-	}
-
-	rd_kafka_timers_unlock(rkts);
-}
-
-
-void rd_kafka_timers_destroy (rd_kafka_timers_t *rkts) {
-        rd_kafka_timer_t *rtmr;
-
-        rd_kafka_timers_lock(rkts);
-        rkts->rkts_enabled = 0;
-        while ((rtmr = TAILQ_FIRST(&rkts->rkts_timers)))
-                rd_kafka_timer_stop(rkts, rtmr, 0);
-        rd_kafka_assert(rkts->rkts_rk, TAILQ_EMPTY(&rkts->rkts_timers));
-        rd_kafka_timers_unlock(rkts);
-
-        cnd_destroy(&rkts->rkts_cond);
-        mtx_destroy(&rkts->rkts_lock);
-}
-
-void rd_kafka_timers_init (rd_kafka_timers_t *rkts, rd_kafka_t *rk) {
-        memset(rkts, 0, sizeof(*rkts));
-        rkts->rkts_rk = rk;
-        TAILQ_INIT(&rkts->rkts_timers);
-        mtx_init(&rkts->rkts_lock, mtx_plain);
-        cnd_init(&rkts->rkts_cond);
-        rkts->rkts_enabled = 1;
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_timer.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_timer.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_timer.h
deleted file mode 100644
index de01795..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_timer.h
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2013, Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-#include "rd.h"
-
-/* A timer engine. */
-typedef struct rd_kafka_timers_s {
-
-        TAILQ_HEAD(, rd_kafka_timer_s) rkts_timers;
-
-        struct rd_kafka_s *rkts_rk;
-
-	mtx_t       rkts_lock;
-	cnd_t       rkts_cond;
-
-        int         rkts_enabled;
-} rd_kafka_timers_t;
-
-
-typedef struct rd_kafka_timer_s {
-	TAILQ_ENTRY(rd_kafka_timer_s)  rtmr_link;
-
-	rd_ts_t rtmr_next;
-	rd_ts_t rtmr_interval;   /* interval in microseconds */
-
-	void  (*rtmr_callback) (rd_kafka_timers_t *rkts, void *arg);
-	void   *rtmr_arg;
-} rd_kafka_timer_t;
-
-
-
-void rd_kafka_timer_stop (rd_kafka_timers_t *rkts,
-                          rd_kafka_timer_t *rtmr, int lock);
-void rd_kafka_timer_start (rd_kafka_timers_t *rkts,
-			   rd_kafka_timer_t *rtmr, rd_ts_t interval,
-			   void (*callback) (rd_kafka_timers_t *rkts,
-                                             void *arg),
-			   void *arg);
-
-void rd_kafka_timer_backoff (rd_kafka_timers_t *rkts,
-			     rd_kafka_timer_t *rtmr, int backoff_us);
-rd_ts_t rd_kafka_timer_next (rd_kafka_timers_t *rkts, rd_kafka_timer_t *rtmr,
-                             int do_lock);
-
-void rd_kafka_timers_interrupt (rd_kafka_timers_t *rkts);
-rd_ts_t rd_kafka_timers_next (rd_kafka_timers_t *rkts, int timeout_ms,
-			      int do_lock);
-void rd_kafka_timers_run (rd_kafka_timers_t *rkts, int timeout_us);
-void rd_kafka_timers_destroy (rd_kafka_timers_t *rkts);
-void rd_kafka_timers_init (rd_kafka_timers_t *rkte, rd_kafka_t *rk);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_topic.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_topic.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_topic.c
deleted file mode 100644
index 3975d80..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_topic.c
+++ /dev/null
@@ -1,1306 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012,2013 Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rd.h"
-#include "rdkafka_int.h"
-#include "rdkafka_msg.h"
-#include "rdkafka_topic.h"
-#include "rdkafka_partition.h"
-#include "rdkafka_broker.h"
-#include "rdkafka_cgrp.h"
-#include "rdkafka_metadata.h"
-#include "rdlog.h"
-#include "rdsysqueue.h"
-#include "rdtime.h"
-#include "rdregex.h"
-
-const char *rd_kafka_topic_state_names[] = {
-        "unknown",
-        "exists",
-        "notexists"
-};
-
-
-
-static int
-rd_kafka_topic_metadata_update (rd_kafka_itopic_t *rkt,
-                                const struct rd_kafka_metadata_topic *mdt,
-                                rd_ts_t ts_insert);
-
-
-/**
- * @brief Increases the app's topic reference count and returns the app pointer.
- *
- * The app refcounts are implemented separately from the librdkafka refcounts
- * and to play nicely with shptr we keep one single shptr for the application
- * and increase/decrease a separate rkt_app_refcnt to keep track of its use.
- *
- * This only covers topic_new() & topic_destroy().
- * The topic_t exposed in rd_kafka_message_t is NOT covered and is handled
- * like a standard shptr -> app pointer conversion (keep_a()).
- *
- * @returns a (new) rkt app reference.
- *
- * @remark \p rkt and \p s_rkt are mutually exclusive.
- */
-static rd_kafka_topic_t *rd_kafka_topic_keep_app (rd_kafka_itopic_t *rkt) {
-	rd_kafka_topic_t *app_rkt;
-
-        mtx_lock(&rkt->rkt_app_lock);
-	rkt->rkt_app_refcnt++;
-        if (!(app_rkt = rkt->rkt_app_rkt))
-                app_rkt = rkt->rkt_app_rkt = rd_kafka_topic_keep_a(rkt);
-        mtx_unlock(&rkt->rkt_app_lock);
-
-	return app_rkt;
-}
-
-/**
- * @brief drop rkt app reference
- */
-static void rd_kafka_topic_destroy_app (rd_kafka_topic_t *app_rkt) {
-	rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
-        shptr_rd_kafka_itopic_t *s_rkt = NULL;
-
-        mtx_lock(&rkt->rkt_app_lock);
-	rd_kafka_assert(NULL, rkt->rkt_app_refcnt > 0);
-	rkt->rkt_app_refcnt--;
-        if (unlikely(rkt->rkt_app_refcnt == 0)) {
-		rd_kafka_assert(NULL, rkt->rkt_app_rkt);
-		s_rkt = rd_kafka_topic_a2s(app_rkt);
-                rkt->rkt_app_rkt = NULL;
-	}
-        mtx_unlock(&rkt->rkt_app_lock);
-
-	if (s_rkt) /* final app reference lost, destroy the shared ptr. */
-		rd_kafka_topic_destroy0(s_rkt);
-}
-
-
-/**
- * Final destructor for topic. Refcnt must be 0.
- */
-void rd_kafka_topic_destroy_final (rd_kafka_itopic_t *rkt) {
-
-	rd_kafka_assert(rkt->rkt_rk, rd_refcnt_get(&rkt->rkt_refcnt) == 0);
-
-        rd_kafka_wrlock(rkt->rkt_rk);
-        TAILQ_REMOVE(&rkt->rkt_rk->rk_topics, rkt, rkt_link);
-        rkt->rkt_rk->rk_topic_cnt--;
-        rd_kafka_wrunlock(rkt->rkt_rk);
-
-        rd_kafka_assert(rkt->rkt_rk, rd_list_empty(&rkt->rkt_desp));
-        rd_list_destroy(&rkt->rkt_desp);
-
-	if (rkt->rkt_topic)
-		rd_kafkap_str_destroy(rkt->rkt_topic);
-
-	rd_kafka_anyconf_destroy(_RK_TOPIC, &rkt->rkt_conf);
-
-        mtx_destroy(&rkt->rkt_app_lock);
-	rwlock_destroy(&rkt->rkt_lock);
-        rd_refcnt_destroy(&rkt->rkt_refcnt);
-
-	rd_free(rkt);
-}
-
-/**
- * Application destroy
- */
-void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt) {
-	rd_kafka_topic_destroy_app(app_rkt);
-}
-
-
-/**
- * Finds and returns a topic based on its name, or NULL if not found.
- * The 'rkt' refcount is increased by one and the caller must call
- * rd_kafka_topic_destroy() when it is done with the topic to decrease
- * the refcount.
- *
- * Locality: any thread
- */
-shptr_rd_kafka_itopic_t *rd_kafka_topic_find_fl (const char *func, int line,
-                                                rd_kafka_t *rk,
-                                                const char *topic, int do_lock){
-	rd_kafka_itopic_t *rkt;
-        shptr_rd_kafka_itopic_t *s_rkt = NULL;
-
-        if (do_lock)
-                rd_kafka_rdlock(rk);
-	TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
-		if (!rd_kafkap_str_cmp_str(rkt->rkt_topic, topic)) {
-                        s_rkt = rd_kafka_topic_keep(rkt);
-			break;
-		}
-	}
-        if (do_lock)
-                rd_kafka_rdunlock(rk);
-
-	return s_rkt;
-}
-
-/**
- * Same semantics as ..find() but takes a Kafka protocol string instead.
- */
-shptr_rd_kafka_itopic_t *rd_kafka_topic_find0_fl (const char *func, int line,
-                                                 rd_kafka_t *rk,
-                                                 const rd_kafkap_str_t *topic) {
-	rd_kafka_itopic_t *rkt;
-        shptr_rd_kafka_itopic_t *s_rkt = NULL;
-
-	rd_kafka_rdlock(rk);
-	TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
-		if (!rd_kafkap_str_cmp(rkt->rkt_topic, topic)) {
-                        s_rkt = rd_kafka_topic_keep(rkt);
-			break;
-		}
-	}
-	rd_kafka_rdunlock(rk);
-
-	return s_rkt;
-}
-
-
-/**
- * Compare shptr_rd_kafka_itopic_t for underlying itopic_t
- */
-int rd_kafka_topic_cmp_s_rkt (const void *_a, const void *_b) {
-        shptr_rd_kafka_itopic_t *a = (void *)_a, *b = (void *)_b;
-        rd_kafka_itopic_t *rkt_a = rd_kafka_topic_s2i(a);
-        rd_kafka_itopic_t *rkt_b = rd_kafka_topic_s2i(b);
-
-        if (rkt_a == rkt_b)
-                return 0;
-
-        return rd_kafkap_str_cmp(rkt_a->rkt_topic, rkt_b->rkt_topic);
-}
-
-
-/**
- * Create new topic handle. 
- *
- * Locality: any
- */
-shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk,
-                                              const char *topic,
-                                              rd_kafka_topic_conf_t *conf,
-                                              int *existing,
-                                              int do_lock) {
-	rd_kafka_itopic_t *rkt;
-        shptr_rd_kafka_itopic_t *s_rkt;
-        const struct rd_kafka_metadata_cache_entry *rkmce;
-
-	/* Verify configuration.
-	 * Maximum topic name size + headers must never exceed message.max.bytes
-	 * which is min-capped to 1000.
-	 * See rd_kafka_broker_produce_toppar() and rdkafka_conf.c */
-	if (!topic || strlen(topic) > 512) {
-		if (conf)
-			rd_kafka_topic_conf_destroy(conf);
-		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
-					EINVAL);
-		return NULL;
-	}
-
-	if (do_lock)
-                rd_kafka_wrlock(rk);
-	if ((s_rkt = rd_kafka_topic_find(rk, topic, 0/*no lock*/))) {
-                if (do_lock)
-                        rd_kafka_wrunlock(rk);
-		if (conf)
-			rd_kafka_topic_conf_destroy(conf);
-                if (existing)
-                        *existing = 1;
-		return s_rkt;
-        }
-
-        if (existing)
-                *existing = 0;
-
-	rkt = rd_calloc(1, sizeof(*rkt));
-
-	rkt->rkt_topic     = rd_kafkap_str_new(topic, -1);
-	rkt->rkt_rk        = rk;
-
-	if (!conf) {
-                if (rk->rk_conf.topic_conf)
-                        conf = rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf);
-                else
-                        conf = rd_kafka_topic_conf_new();
-        }
-	rkt->rkt_conf = *conf;
-	rd_free(conf); /* explicitly not rd_kafka_topic_destroy()
-                        * since we dont want to rd_free internal members,
-                        * just the placeholder. The internal members
-                        * were copied on the line above. */
-
-	/* Default partitioner: consistent_random */
-	if (!rkt->rkt_conf.partitioner)
-		rkt->rkt_conf.partitioner = rd_kafka_msg_partitioner_consistent_random;
-
-	if (rkt->rkt_conf.compression_codec == RD_KAFKA_COMPRESSION_INHERIT)
-		rkt->rkt_conf.compression_codec = rk->rk_conf.compression_codec;
-
-	rd_kafka_dbg(rk, TOPIC, "TOPIC", "New local topic: %.*s",
-		     RD_KAFKAP_STR_PR(rkt->rkt_topic));
-
-        rd_list_init(&rkt->rkt_desp, 16, NULL);
-        rd_refcnt_init(&rkt->rkt_refcnt, 0);
-
-        s_rkt = rd_kafka_topic_keep(rkt);
-
-	rwlock_init(&rkt->rkt_lock);
-        mtx_init(&rkt->rkt_app_lock, mtx_plain);
-
-	/* Create unassigned partition */
-	rkt->rkt_ua = rd_kafka_toppar_new(rkt, RD_KAFKA_PARTITION_UA);
-
-	TAILQ_INSERT_TAIL(&rk->rk_topics, rkt, rkt_link);
-	rk->rk_topic_cnt++;
-
-        /* Populate from metadata cache. */
-        if ((rkmce = rd_kafka_metadata_cache_find(rk, topic, 1/*valid*/))) {
-                if (existing)
-                        *existing = 1;
-
-                rd_kafka_topic_metadata_update(rkt, &rkmce->rkmce_mtopic,
-                                               rkmce->rkmce_ts_insert);
-        }
-
-        if (do_lock)
-                rd_kafka_wrunlock(rk);
-
-	return s_rkt;
-}
-
-
-
-/**
- * Create new app topic handle.
- *
- * Locality: application thread
- */
-rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic,
-                                      rd_kafka_topic_conf_t *conf) {
-        shptr_rd_kafka_itopic_t *s_rkt;
-        rd_kafka_itopic_t *rkt;
-        rd_kafka_topic_t *app_rkt;
-        int existing;
-
-        s_rkt = rd_kafka_topic_new0(rk, topic, conf, &existing, 1/*lock*/);
-        if (!s_rkt)
-                return NULL;
-
-        rkt = rd_kafka_topic_s2i(s_rkt);
-
-        /* Save a shared pointer to be used in callbacks. */
-	app_rkt = rd_kafka_topic_keep_app(rkt);
-
-        /* Query for the topic leader (async) */
-        if (!existing)
-                rd_kafka_topic_leader_query(rk, rkt);
-
-        /* Drop our reference since there is already/now a rkt_app_rkt */
-        rd_kafka_topic_destroy0(s_rkt);
-
-        return app_rkt;
-}
-
-
-
-/**
- * Sets the state for topic.
- * NOTE: rd_kafka_topic_wrlock(rkt) MUST be held
- */
-static void rd_kafka_topic_set_state (rd_kafka_itopic_t *rkt, int state) {
-
-        if ((int)rkt->rkt_state == state)
-                return;
-
-        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "STATE",
-                     "Topic %s changed state %s -> %s",
-                     rkt->rkt_topic->str,
-                     rd_kafka_topic_state_names[rkt->rkt_state],
-                     rd_kafka_topic_state_names[state]);
-        rkt->rkt_state = state;
-}
-
-/**
- * Returns the name of a topic.
- * NOTE:
- *   The topic Kafka String representation is crafted with an extra byte
- *   at the end for the Nul that is not included in the length, this way
- *   we can use the topic's String directly.
- *   This is not true for Kafka Strings read from the network.
- */
-const char *rd_kafka_topic_name (const rd_kafka_topic_t *app_rkt) {
-        const rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
-	return rkt->rkt_topic->str;
-}
-
-
-
-
-
-/**
- * @brief Update the leader for a topic+partition.
- * @returns 1 if the leader was changed, else 0, or -1 if leader is unknown.
- *
- * @locks rd_kafka_topic_wrlock(rkt) and rd_kafka_toppar_lock(rktp)
- * @locality any
- */
-int rd_kafka_toppar_leader_update (rd_kafka_toppar_t *rktp,
-                                   int32_t leader_id, rd_kafka_broker_t *rkb) {
-
-        rktp->rktp_leader_id = leader_id;
-        if (rktp->rktp_leader_id != leader_id) {
-                rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPICUPD",
-                             "Topic %s [%"PRId32"] migrated from "
-                             "leader %"PRId32" to %"PRId32,
-                             rktp->rktp_rkt->rkt_topic->str,
-                             rktp->rktp_partition,
-                             rktp->rktp_leader_id, leader_id);
-                rktp->rktp_leader_id = leader_id;
-        }
-
-	if (!rkb) {
-		int had_leader = rktp->rktp_leader ? 1 : 0;
-
-		rd_kafka_toppar_broker_delegate(rktp, NULL, 0);
-
-		return had_leader ? -1 : 0;
-	}
-
-
-	if (rktp->rktp_leader) {
-		if (rktp->rktp_leader == rkb) {
-			/* No change in broker */
-			return 0;
-		}
-
-		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPICUPD",
-			     "Topic %s [%"PRId32"] migrated from "
-			     "broker %"PRId32" to %"PRId32,
-			     rktp->rktp_rkt->rkt_topic->str,
-			     rktp->rktp_partition,
-			     rktp->rktp_leader->rkb_nodeid, rkb->rkb_nodeid);
-	}
-
-	rd_kafka_toppar_broker_delegate(rktp, rkb, 0);
-
-	return 1;
-}
-
-
-static int rd_kafka_toppar_leader_update2 (rd_kafka_itopic_t *rkt,
-					   int32_t partition,
-                                           int32_t leader_id,
-					   rd_kafka_broker_t *rkb) {
-	rd_kafka_toppar_t *rktp;
-        shptr_rd_kafka_toppar_t *s_rktp;
-	int r;
-
-	s_rktp = rd_kafka_toppar_get(rkt, partition, 0);
-        if (unlikely(!s_rktp)) {
-                /* Have only seen this in issue #132.
-                 * Probably caused by corrupt broker state. */
-                rd_kafka_log(rkt->rkt_rk, LOG_WARNING, "LEADER",
-                             "%s [%"PRId32"] is unknown "
-                             "(partition_cnt %i)",
-                             rkt->rkt_topic->str, partition,
-                             rkt->rkt_partition_cnt);
-                return -1;
-        }
-
-        rktp = rd_kafka_toppar_s2i(s_rktp);
-
-        rd_kafka_toppar_lock(rktp);
-        r = rd_kafka_toppar_leader_update(rktp, leader_id, rkb);
-        rd_kafka_toppar_unlock(rktp);
-
-	rd_kafka_toppar_destroy(s_rktp); /* from get() */
-
-	return r;
-}
-
-
-/**
- * Update the number of partitions for a topic and takes according actions.
- * Returns 1 if the partition count changed, else 0.
- * NOTE: rd_kafka_topic_wrlock(rkt) MUST be held.
- */
-static int rd_kafka_topic_partition_cnt_update (rd_kafka_itopic_t *rkt,
-						int32_t partition_cnt) {
-	rd_kafka_t *rk = rkt->rkt_rk;
-	shptr_rd_kafka_toppar_t **rktps;
-	shptr_rd_kafka_toppar_t *rktp_ua;
-        shptr_rd_kafka_toppar_t *s_rktp;
-	rd_kafka_toppar_t *rktp;
-	rd_kafka_msgq_t tmpq = RD_KAFKA_MSGQ_INITIALIZER(tmpq);
-	int32_t i;
-
-	if (likely(rkt->rkt_partition_cnt == partition_cnt))
-		return 0; /* No change in partition count */
-
-        if (unlikely(rkt->rkt_partition_cnt != 0 &&
-                     !rd_kafka_terminating(rkt->rkt_rk)))
-                rd_kafka_log(rk, LOG_NOTICE, "PARTCNT",
-                             "Topic %s partition count changed "
-                             "from %"PRId32" to %"PRId32,
-                             rkt->rkt_topic->str,
-                             rkt->rkt_partition_cnt, partition_cnt);
-        else
-                rd_kafka_dbg(rk, TOPIC, "PARTCNT",
-                             "Topic %s partition count changed "
-                             "from %"PRId32" to %"PRId32,
-                             rkt->rkt_topic->str,
-                             rkt->rkt_partition_cnt, partition_cnt);
-
-
-	/* Create and assign new partition list */
-	if (partition_cnt > 0)
-		rktps = rd_calloc(partition_cnt, sizeof(*rktps));
-	else
-		rktps = NULL;
-
-	for (i = 0 ; i < partition_cnt ; i++) {
-		if (i >= rkt->rkt_partition_cnt) {
-			/* New partition. Check if its in the list of
-			 * desired partitions first. */
-
-                        s_rktp = rd_kafka_toppar_desired_get(rkt, i);
-
-                        rktp = s_rktp ? rd_kafka_toppar_s2i(s_rktp) : NULL;
-                        if (rktp) {
-				rd_kafka_toppar_lock(rktp);
-                                rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_UNKNOWN;
-
-                                /* Remove from desp list since the
-                                 * partition is now known. */
-                                rd_kafka_toppar_desired_unlink(rktp);
-                                rd_kafka_toppar_unlock(rktp);
-			} else
-				s_rktp = rd_kafka_toppar_new(rkt, i);
-			rktps[i] = s_rktp;
-		} else {
-			/* Existing partition, grab our own reference. */
-			rktps[i] = rd_kafka_toppar_keep(
-				rd_kafka_toppar_s2i(rkt->rkt_p[i]));
-			/* Loose previous ref */
-			rd_kafka_toppar_destroy(rkt->rkt_p[i]);
-		}
-	}
-
-	rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);
-
-        /* Propagate notexist errors for desired partitions */
-        RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) {
-                rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED",
-                             "%s [%"PRId32"]: "
-                             "desired partition does not exist in cluster",
-                             rkt->rkt_topic->str,
-                             rd_kafka_toppar_s2i(s_rktp)->rktp_partition);
-                rd_kafka_toppar_enq_error(rd_kafka_toppar_s2i(s_rktp),
-                                          RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
-        }
-
-	/* Remove excessive partitions */
-	for (i = partition_cnt ; i < rkt->rkt_partition_cnt ; i++) {
-		s_rktp = rkt->rkt_p[i];
-                rktp = rd_kafka_toppar_s2i(s_rktp);
-
-		rd_kafka_dbg(rkt->rkt_rk, TOPIC, "REMOVE",
-			     "%s [%"PRId32"] no longer reported in metadata",
-			     rkt->rkt_topic->str, rktp->rktp_partition);
-
-		rd_kafka_toppar_lock(rktp);
-
-		if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED) {
-                        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED",
-                                     "Topic %s [%"PRId32"] is desired "
-                                     "but no longer known: "
-                                     "moving back on desired list",
-                                     rkt->rkt_topic->str, rktp->rktp_partition);
-
-                        /* If this is a desired partition move it back on to
-                         * the desired list since partition is no longer known*/
-			rd_kafka_assert(rkt->rkt_rk,
-                                        !(rktp->rktp_flags &
-                                          RD_KAFKA_TOPPAR_F_UNKNOWN));
-			rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;
-                        rd_kafka_toppar_desired_link(rktp);
-
-                        if (!rd_kafka_terminating(rkt->rkt_rk))
-                                rd_kafka_toppar_enq_error(
-                                        rktp,
-                                        RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
-
-			rd_kafka_toppar_broker_delegate(rktp, NULL, 0);
-
-		} else {
-			/* Tell handling broker to let go of the toppar */
-			rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_REMOVE;
-			rd_kafka_toppar_broker_leave_for_remove(rktp);
-		}
-
-		rd_kafka_toppar_unlock(rktp);
-
-		rd_kafka_toppar_destroy(s_rktp);
-	}
-
-	if (likely(rktp_ua != NULL)) {
-		/* Move messages from removed partitions to UA for
-		 * further processing. */
-		rktp = rd_kafka_toppar_s2i(rktp_ua);
-
-		// FIXME: tmpq not used
-		if (rd_kafka_msgq_len(&tmpq) > 0) {
-			rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPPARMOVE",
-				     "Moving %d messages (%zd bytes) from "
-				     "%d removed partitions to UA partition",
-				     rd_kafka_msgq_len(&tmpq),
-				     rd_kafka_msgq_size(&tmpq),
-				     i - partition_cnt);
-
-
-			rd_kafka_toppar_lock(rktp);
-			rd_kafka_msgq_concat(&rktp->rktp_msgq, &tmpq);
-			rd_kafka_toppar_unlock(rktp);
-		}
-
-		rd_kafka_toppar_destroy(rktp_ua); /* .._get() above */
-	} else {
-		/* No UA, fail messages from removed partitions. */
-		if (rd_kafka_msgq_len(&tmpq) > 0) {
-			rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPPARMOVE",
-				     "Failing %d messages (%zd bytes) from "
-				     "%d removed partitions",
-				     rd_kafka_msgq_len(&tmpq),
-				     rd_kafka_msgq_size(&tmpq),
-				     i - partition_cnt);
-
-			rd_kafka_dr_msgq(rkt, &tmpq,
-					 RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
-		}
-	}
-
-	if (rkt->rkt_p)
-		rd_free(rkt->rkt_p);
-
-	rkt->rkt_p = rktps;
-
-	rkt->rkt_partition_cnt = partition_cnt;
-
-	return 1;
-}
-
-
-
-/**
- * Topic 'rkt' does not exist: propagate to interested parties.
- * The topic's state must have been set to NOTEXISTS and
- * rd_kafka_topic_partition_cnt_update() must have been called prior to
- * calling this function.
- *
- * Locks: rd_kafka_topic_*lock() must be held.
- */
-static void rd_kafka_topic_propagate_notexists (rd_kafka_itopic_t *rkt,
-                                                rd_kafka_resp_err_t err) {
-        shptr_rd_kafka_toppar_t *s_rktp;
-        int i;
-
-        if (rkt->rkt_rk->rk_type != RD_KAFKA_CONSUMER)
-                return;
-
-
-        /* Notify consumers that the topic doesn't exist. */
-        RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i)
-                rd_kafka_toppar_enq_error(rd_kafka_toppar_s2i(s_rktp), err);
-}
-
-
-/**
- * Assign messages on the UA partition to available partitions.
- * Locks: rd_kafka_topic_*lock() must be held.
- */
-static void rd_kafka_topic_assign_uas (rd_kafka_itopic_t *rkt,
-                                       rd_kafka_resp_err_t err) {
-	rd_kafka_t *rk = rkt->rkt_rk;
-	shptr_rd_kafka_toppar_t *s_rktp_ua;
-        rd_kafka_toppar_t *rktp_ua;
-	rd_kafka_msg_t *rkm, *tmp;
-	rd_kafka_msgq_t uas = RD_KAFKA_MSGQ_INITIALIZER(uas);
-	rd_kafka_msgq_t failed = RD_KAFKA_MSGQ_INITIALIZER(failed);
-	int cnt;
-
-	if (rkt->rkt_rk->rk_type != RD_KAFKA_PRODUCER)
-		return;
-
-	s_rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);
-	if (unlikely(!s_rktp_ua)) {
-		rd_kafka_dbg(rk, TOPIC, "ASSIGNUA",
-			     "No UnAssigned partition available for %s",
-			     rkt->rkt_topic->str);
-		return;
-	}
-
-        rktp_ua = rd_kafka_toppar_s2i(s_rktp_ua);
-
-	/* Assign all unassigned messages to new topics. */
-	rd_kafka_dbg(rk, TOPIC, "PARTCNT",
-		     "Partitioning %i unassigned messages in topic %.*s to "
-		     "%"PRId32" partitions",
-		     rd_atomic32_get(&rktp_ua->rktp_msgq.rkmq_msg_cnt),
-		     RD_KAFKAP_STR_PR(rkt->rkt_topic),
-		     rkt->rkt_partition_cnt);
-
-	rd_kafka_toppar_lock(rktp_ua);
-	rd_kafka_msgq_move(&uas, &rktp_ua->rktp_msgq);
-	cnt = rd_atomic32_get(&uas.rkmq_msg_cnt);
-	rd_kafka_toppar_unlock(rktp_ua);
-
-	TAILQ_FOREACH_SAFE(rkm, &uas.rkmq_msgs, rkm_link, tmp) {
-		/* Fast-path for failing messages with forced partition */
-		if (rkm->rkm_partition != RD_KAFKA_PARTITION_UA &&
-		    rkm->rkm_partition >= rkt->rkt_partition_cnt &&
-		    rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN) {
-			rd_kafka_msgq_enq(&failed, rkm);
-			continue;
-		}
-
-		if (unlikely(rd_kafka_msg_partitioner(rkt, rkm, 0) != 0)) {
-			/* Desired partition not available */
-			rd_kafka_msgq_enq(&failed, rkm);
-		}
-	}
-
-	rd_kafka_dbg(rk, TOPIC, "UAS",
-		     "%i/%i messages were partitioned in topic %s",
-		     cnt - rd_atomic32_get(&failed.rkmq_msg_cnt),
-		     cnt, rkt->rkt_topic->str);
-
-	if (rd_atomic32_get(&failed.rkmq_msg_cnt) > 0) {
-		/* Fail the messages */
-		rd_kafka_dbg(rk, TOPIC, "UAS",
-			     "%"PRId32"/%i messages failed partitioning "
-			     "in topic %s",
-			     rd_atomic32_get(&uas.rkmq_msg_cnt), cnt,
-			     rkt->rkt_topic->str);
-		rd_kafka_dr_msgq(rkt, &failed,
-				 rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS ?
-				 err :
-				 RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
-	}
-
-	rd_kafka_toppar_destroy(s_rktp_ua); /* from get() */
-}
-
-
-/**
- * Received metadata request contained no information about topic 'rkt'
- * and thus indicates the topic is not available in the cluster.
- */
-void rd_kafka_topic_metadata_none (rd_kafka_itopic_t *rkt) {
-	rd_kafka_topic_wrlock(rkt);
-
-	if (unlikely(rd_atomic32_get(&rkt->rkt_rk->rk_terminate))) {
-		/* Dont update metadata while terminating, do this
-		 * after acquiring lock for proper synchronisation */
-		rd_kafka_topic_wrunlock(rkt);
-		return;
-	}
-
-	rkt->rkt_ts_metadata = rd_clock();
-
-        rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_NOTEXISTS);
-
-        rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;
-
-	/* Update number of partitions */
-	rd_kafka_topic_partition_cnt_update(rkt, 0);
-
-        /* Purge messages with forced partition */
-        rd_kafka_topic_assign_uas(rkt, RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
-
-        /* Propagate nonexistent topic info */
-        rd_kafka_topic_propagate_notexists(rkt,
-                                           RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
-
-	rd_kafka_topic_wrunlock(rkt);
-}
-
-
-/**
- * @brief Update a topic from metadata.
- *
- * @param ts_age absolute age (timestamp) of metadata.
- * @returns 1 if the number of partitions changed, 0 if not, and -1 if the
- *          topic is unknown.
-
- *
- * @locks rd_kafka*lock()
- */
-static int
-rd_kafka_topic_metadata_update (rd_kafka_itopic_t *rkt,
-                                const struct rd_kafka_metadata_topic *mdt,
-                                rd_ts_t ts_age) {
-        rd_kafka_t *rk = rkt->rkt_rk;
-	int upd = 0;
-	int j;
-        rd_kafka_broker_t **partbrokers;
-        int leader_cnt = 0;
-        int old_state;
-
-	if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR)
-		rd_kafka_dbg(rk, TOPIC|RD_KAFKA_DBG_METADATA, "METADATA",
-			   "Error in metadata reply for "
-			   "topic %s (PartCnt %i): %s",
-			   rkt->rkt_topic->str, mdt->partition_cnt,
-			   rd_kafka_err2str(mdt->err));
-
-        if (unlikely(rd_kafka_terminating(rk))) {
-                /* Dont update metadata while terminating, do this
-                 * after acquiring lock for proper synchronisation */
-                return -1;
-        }
-
-        /* Look up brokers before acquiring rkt lock to preserve lock order */
-        partbrokers = rd_alloca(mdt->partition_cnt * sizeof(*partbrokers));
-
-	for (j = 0 ; j < mdt->partition_cnt ; j++) {
-		if (mdt->partitions[j].leader == -1) {
-                        partbrokers[j] = NULL;
-			continue;
-		}
-
-                partbrokers[j] =
-                        rd_kafka_broker_find_by_nodeid(rk,
-                                                       mdt->partitions[j].
-                                                       leader);
-	}
-
-
-	rd_kafka_topic_wrlock(rkt);
-
-        old_state = rkt->rkt_state;
-	rkt->rkt_ts_metadata = ts_age;
-
-	/* Set topic state */
-	if (mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART ||
-	    mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN/*auto.create.topics fails*/||
-            mdt->err == RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION/*invalid topic*/)
-                rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_NOTEXISTS);
-        else if (mdt->partition_cnt > 0)
-                rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_EXISTS);
-
-	/* Update number of partitions, but not if there are
-	 * (possibly intermittent) errors (e.g., "Leader not available"). */
-	if (mdt->err == RD_KAFKA_RESP_ERR_NO_ERROR)
-		upd += rd_kafka_topic_partition_cnt_update(rkt,
-							   mdt->partition_cnt);
-
-	/* Update leader for each partition */
-	for (j = 0 ; j < mdt->partition_cnt ; j++) {
-                int r;
-		rd_kafka_broker_t *leader;
-
-		rd_kafka_dbg(rk, TOPIC|RD_KAFKA_DBG_METADATA, "METADATA",
-			   "  Topic %s partition %i Leader %"PRId32,
-			   rkt->rkt_topic->str,
-			   mdt->partitions[j].id,
-			   mdt->partitions[j].leader);
-
-		leader = partbrokers[j];
-		partbrokers[j] = NULL;
-
-		/* Update leader for partition */
-		r = rd_kafka_toppar_leader_update2(rkt,
-						   mdt->partitions[j].id,
-                                                   mdt->partitions[j].leader,
-						   leader);
-
-                upd += (r != 0 ? 1 : 0);
-
-                if (leader) {
-                        if (r != -1)
-                                leader_cnt++;
-                        /* Drop reference to broker (from find()) */
-                        rd_kafka_broker_destroy(leader);
-                }
-        }
-
-        /* If all partitions have leaders we can turn off fast leader query. */
-        if (mdt->partition_cnt > 0 && leader_cnt == mdt->partition_cnt)
-                rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;
-
-	if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR && rkt->rkt_partition_cnt) {
-                /* (Possibly intermittent) topic-wide error:
-                 * remove leaders for partitions */
-
-		for (j = 0 ; j < rkt->rkt_partition_cnt ; j++) {
-                        rd_kafka_toppar_t *rktp;
-			if (!rkt->rkt_p[j])
-                                continue;
-
-                        rktp = rd_kafka_toppar_s2i(rkt->rkt_p[j]);
-                        rd_kafka_toppar_lock(rktp);
-                        rd_kafka_toppar_broker_delegate(rktp, NULL, 0);
-                        rd_kafka_toppar_unlock(rktp);
-                }
-        }
-
-	/* Try to assign unassigned messages to new partitions, or fail them */
-	if (upd > 0 || rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
-		rd_kafka_topic_assign_uas(rkt, mdt->err ?
-                                          mdt->err :
-                                          RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
-
-        /* Trigger notexists propagation */
-        if (old_state != (int)rkt->rkt_state &&
-            rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
-                rd_kafka_topic_propagate_notexists(
-                        rkt,
-                        mdt->err ? mdt->err : RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
-
-	rd_kafka_topic_wrunlock(rkt);
-
-	/* Loose broker references */
-	for (j = 0 ; j < mdt->partition_cnt ; j++)
-		if (partbrokers[j])
-			rd_kafka_broker_destroy(partbrokers[j]);
-
-
-	return upd;
-}
-
-/**
- * @brief Update topic by metadata, if topic is locally known.
- * @sa rd_kafka_topic_metadata_update()
- * @locks none
- */
-int
-rd_kafka_topic_metadata_update2 (rd_kafka_broker_t *rkb,
-                                 const struct rd_kafka_metadata_topic *mdt) {
-        rd_kafka_itopic_t *rkt;
-        shptr_rd_kafka_itopic_t *s_rkt;
-        int r;
-
-        rd_kafka_wrlock(rkb->rkb_rk);
-        if (!(s_rkt = rd_kafka_topic_find(rkb->rkb_rk,
-                                          mdt->topic, 0/*!lock*/))) {
-                rd_kafka_wrunlock(rkb->rkb_rk);
-                return -1; /* Ignore topics that we dont have locally. */
-        }
-
-        rkt = rd_kafka_topic_s2i(s_rkt);
-
-        r = rd_kafka_topic_metadata_update(rkt, mdt, rd_clock());
-
-        rd_kafka_wrunlock(rkb->rkb_rk);
-
-        rd_kafka_topic_destroy0(s_rkt); /* from find() */
-
-        return r;
-}
-
-
-
-/**
- * @returns a list of all partitions (s_rktp's) for a topic.
- * @remark rd_kafka_topic_*lock() MUST be held.
- */
-static rd_list_t *rd_kafka_topic_get_all_partitions (rd_kafka_itopic_t *rkt) {
-	rd_list_t *list;
-	shptr_rd_kafka_toppar_t *s_rktp;
-	int i;
-
-        list = rd_list_new(rkt->rkt_partition_cnt +
-                           rd_list_cnt(&rkt->rkt_desp) + 1/*ua*/, NULL);
-
-	for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
-		rd_list_add(list, rd_kafka_toppar_keep(
-				    rd_kafka_toppar_s2i(rkt->rkt_p[i])));
-
-	RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i)
-		rd_list_add(list, rd_kafka_toppar_keep(
-				    rd_kafka_toppar_s2i(s_rktp)));
-
-	if (rkt->rkt_ua)
-		rd_list_add(list, rd_kafka_toppar_keep(
-				    rd_kafka_toppar_s2i(rkt->rkt_ua)));
-
-	return list;
-}
-
-
-
-
-/**
- * Remove all partitions from a topic, including the ua.
- * Must only be called during rd_kafka_t termination.
- *
- * Locality: main thread
- */
-void rd_kafka_topic_partitions_remove (rd_kafka_itopic_t *rkt) {
-        shptr_rd_kafka_toppar_t *s_rktp;
-        shptr_rd_kafka_itopic_t *s_rkt;
-	rd_list_t *partitions;
-	int i;
-
-	/* Purge messages for all partitions outside the topic_wrlock since
-	 * a message can hold a reference to the topic_t and thus
-	 * would trigger a recursive lock dead-lock. */
-	rd_kafka_topic_rdlock(rkt);
-	partitions = rd_kafka_topic_get_all_partitions(rkt);
-	rd_kafka_topic_rdunlock(rkt);
-
-	RD_LIST_FOREACH(s_rktp, partitions, i) {
-		rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
-
-		rd_kafka_toppar_lock(rktp);
-		rd_kafka_msgq_purge(rkt->rkt_rk, &rktp->rktp_msgq);
-		rd_kafka_toppar_purge_queues(rktp);
-		rd_kafka_toppar_unlock(rktp);
-
-		rd_kafka_toppar_destroy(s_rktp);
-	}
-	rd_list_destroy(partitions);
-
-	s_rkt = rd_kafka_topic_keep(rkt);
-	rd_kafka_topic_wrlock(rkt);
-
-	/* Setting the partition count to 0 moves all partitions to
-	 * the desired list (rktp_desp). */
-        rd_kafka_topic_partition_cnt_update(rkt, 0);
-
-        /* Now clean out the desired partitions list.
-         * Use reverse traversal to avoid excessive memory shuffling
-         * in rd_list_remove() */
-        RD_LIST_FOREACH_REVERSE(s_rktp, &rkt->rkt_desp, i) {
-		rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
-		/* Our reference */
-		shptr_rd_kafka_toppar_t *s_rktp2 = rd_kafka_toppar_keep(rktp);
-                rd_kafka_toppar_lock(rktp);
-                rd_kafka_toppar_desired_del(rktp);
-                rd_kafka_toppar_unlock(rktp);
-                rd_kafka_toppar_destroy(s_rktp2);
-        }
-
-        rd_kafka_assert(rkt->rkt_rk, rkt->rkt_partition_cnt == 0);
-
-	if (rkt->rkt_p)
-		rd_free(rkt->rkt_p);
-
-	rkt->rkt_p = NULL;
-	rkt->rkt_partition_cnt = 0;
-
-        if ((s_rktp = rkt->rkt_ua)) {
-                rkt->rkt_ua = NULL;
-                rd_kafka_toppar_destroy(s_rktp);
-	}
-
-	rd_kafka_topic_wrunlock(rkt);
-
-	rd_kafka_topic_destroy0(s_rkt);
-}
-
-
-
-/**
- * Scan all topics and partitions for:
- *  - timed out messages.
- *  - topics that needs to be created on the broker.
- *  - topics who's metadata is too old.
- */
-int rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now) {
-	rd_kafka_itopic_t *rkt;
-	rd_kafka_toppar_t *rktp;
-        shptr_rd_kafka_toppar_t *s_rktp;
-	int totcnt = 0;
-        rd_list_t query_topics;
-
-        rd_list_init(&query_topics, 0, rd_free);
-
-	rd_kafka_rdlock(rk);
-	TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
-		int p;
-                int cnt = 0, tpcnt = 0;
-                rd_kafka_msgq_t timedout;
-                int query_this = 0;
-
-                rd_kafka_msgq_init(&timedout);
-
-		rd_kafka_topic_wrlock(rkt);
-
-                /* Check if metadata information has timed out. */
-                if (rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN &&
-                    !rd_kafka_metadata_cache_topic_get(
-                            rk, rkt->rkt_topic->str, 1/*only valid*/)) {
-                        rd_kafka_dbg(rk, TOPIC, "NOINFO",
-                                     "Topic %s metadata information timed out "
-                                     "(%"PRId64"ms old)",
-                                     rkt->rkt_topic->str,
-                                     (rd_clock() - rkt->rkt_ts_metadata)/1000);
-                        rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_UNKNOWN);
-
-                        query_this = 1;
-                }
-
-                /* Just need a read-lock from here on. */
-                rd_kafka_topic_wrunlock(rkt);
-                rd_kafka_topic_rdlock(rkt);
-
-                if (rkt->rkt_partition_cnt == 0) {
-                        /* If this partition is unknown by brokers try
-                         * to create it by sending a topic-specific
-                         * metadata request.
-                         * This requires "auto.create.topics.enable=true"
-                         * on the brokers. */
-                        rd_kafka_dbg(rk, TOPIC, "NOINFO",
-                                     "Topic %s partition count is zero: "
-                                     "should refresh metadata",
-                                     rkt->rkt_topic->str);
-
-                        query_this = 1;
-                }
-
-		for (p = RD_KAFKA_PARTITION_UA ;
-		     p < rkt->rkt_partition_cnt ; p++) {
-			int did_tmout = 0;
-
-			if (!(s_rktp = rd_kafka_toppar_get(rkt, p, 0)))
-				continue;
-
-                        rktp = rd_kafka_toppar_s2i(s_rktp);
-			rd_kafka_toppar_lock(rktp);
-
-                        /* Check that partition has a leader that is up,
-                         * else add topic to query list. */
-                        if (p != RD_KAFKA_PARTITION_UA &&
-                            (!rktp->rktp_leader ||
-                             rktp->rktp_leader->rkb_source ==
-                             RD_KAFKA_INTERNAL ||
-                             rd_kafka_broker_get_state(rktp->rktp_leader) <
-                             RD_KAFKA_BROKER_STATE_UP)) {
-                                rd_kafka_dbg(rk, TOPIC, "QRYLEADER",
-                                             "Topic %s [%"PRId32"]: "
-                                             "leader is %s: re-query",
-                                             rkt->rkt_topic->str,
-                                             rktp->rktp_partition,
-                                             !rktp->rktp_leader ?
-                                             "unavailable" :
-                                             (rktp->rktp_leader->rkb_source ==
-                                              RD_KAFKA_INTERNAL ? "internal":
-                                              "down"));
-                                query_this = 1;
-                        }
-
-			/* Scan toppar's message queues for timeouts */
-			if (rd_kafka_msgq_age_scan(&rktp->rktp_xmit_msgq,
-						   &timedout, now) > 0)
-				did_tmout = 1;
-
-			if (rd_kafka_msgq_age_scan(&rktp->rktp_msgq,
-						   &timedout, now) > 0)
-				did_tmout = 1;
-
-			tpcnt += did_tmout;
-
-			rd_kafka_toppar_unlock(rktp);
-			rd_kafka_toppar_destroy(s_rktp);
-		}
-
-                rd_kafka_topic_rdunlock(rkt);
-
-                if ((cnt = rd_atomic32_get(&timedout.rkmq_msg_cnt)) > 0) {
-                        totcnt += cnt;
-                        rd_kafka_dbg(rk, MSG, "TIMEOUT",
-                                     "%s: %"PRId32" message(s) "
-                                     "from %i toppar(s) timed out",
-                                     rkt->rkt_topic->str, cnt, tpcnt);
-                        rd_kafka_dr_msgq(rkt, &timedout,
-                                         RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);
-                }
-
-                /* Need to re-query this topic's leader. */
-                if (query_this &&
-                    !rd_list_find(&query_topics, rkt->rkt_topic->str,
-                                  (void *)strcmp))
-                        rd_list_add(&query_topics,
-                                    rd_strdup(rkt->rkt_topic->str));
-
-        }
-        rd_kafka_rdunlock(rk);
-
-        if (!rd_list_empty(&query_topics))
-                rd_kafka_metadata_refresh_topics(rk, NULL, &query_topics,
-                                                 1/*force even if cached
-                                                    * info exists*/,
-                                                 "refresh unavailable topics");
-        rd_list_destroy(&query_topics);
-
-        return totcnt;
-}
-
-
-/**
- * Locks: rd_kafka_topic_*lock() must be held.
- */
-int rd_kafka_topic_partition_available (const rd_kafka_topic_t *app_rkt,
-					int32_t partition) {
-	int avail;
-	shptr_rd_kafka_toppar_t *s_rktp;
-        rd_kafka_toppar_t *rktp;
-        rd_kafka_broker_t *rkb;
-
-	s_rktp = rd_kafka_toppar_get(rd_kafka_topic_a2i(app_rkt),
-                                     partition, 0/*no ua-on-miss*/);
-	if (unlikely(!s_rktp))
-		return 0;
-
-        rktp = rd_kafka_toppar_s2i(s_rktp);
-        rkb = rd_kafka_toppar_leader(rktp, 1/*proper broker*/);
-        avail = rkb ? 1 : 0;
-        if (rkb)
-                rd_kafka_broker_destroy(rkb);
-	rd_kafka_toppar_destroy(s_rktp);
-	return avail;
-}
-
-
-void *rd_kafka_topic_opaque (const rd_kafka_topic_t *app_rkt) {
-        return rd_kafka_topic_a2i(app_rkt)->rkt_conf.opaque;
-}
-
-int rd_kafka_topic_info_cmp (const void *_a, const void *_b) {
-	const rd_kafka_topic_info_t *a = _a, *b = _b;
-	int r;
-
-	if ((r = strcmp(a->topic, b->topic)))
-		return r;
-
-	return a->partition_cnt - b->partition_cnt;
-}
-
-
-/**
- * Allocate new topic_info.
- * \p topic is copied.
- */
-rd_kafka_topic_info_t *rd_kafka_topic_info_new (const char *topic,
-						int partition_cnt) {
-	rd_kafka_topic_info_t *ti;
-	size_t tlen = strlen(topic) + 1;
-
-	/* Allocate space for the topic along with the struct */
-	ti = rd_malloc(sizeof(*ti) + tlen);
-	ti->topic = (char *)(ti+1);
-	memcpy((char *)ti->topic, topic, tlen);
-	ti->partition_cnt = partition_cnt;
-
-	return ti;
-}
-
-/**
- * Destroy/free topic_info
- */
-void rd_kafka_topic_info_destroy (rd_kafka_topic_info_t *ti) {
-	rd_free(ti);
-}
-
-
-/**
- * @brief Match \p topic to \p pattern.
- *
- * If pattern begins with "^" it is considered a regexp,
- * otherwise a simple string comparison is performed.
- *
- * @returns 1 on match, else 0.
- */
-int rd_kafka_topic_match (rd_kafka_t *rk, const char *pattern,
-			  const char *topic) {
-	char errstr[128];
-
-	if (*pattern == '^') {
-		int r = rd_regex_match(pattern, topic, errstr, sizeof(errstr));
-		if (unlikely(r == -1))
-			rd_kafka_dbg(rk, TOPIC, "TOPICREGEX",
-				     "Topic \"%s\" regex \"%s\" "
-				     "matching failed: %s",
-				     topic, pattern, errstr);
-		return r == 1;
-	} else
-		return !strcmp(pattern, topic);
-}
-
-
-
-
-
-
-
-
-
-/**
- * Trigger broker metadata query for topic leader.
- * 'rkt' may be NULL to query for all topics.
- *
- * @locks none
- */
-void rd_kafka_topic_leader_query0 (rd_kafka_t *rk, rd_kafka_itopic_t *rkt,
-                                   int do_rk_lock) {
-        rd_list_t topics;
-
-        rd_list_init(&topics, 1, rd_free);
-        rd_list_add(&topics, rd_strdup(rkt->rkt_topic->str));
-
-        rd_kafka_metadata_refresh_topics(rk, NULL, &topics,
-                                         0/*dont force*/, "leader query");
-
-        if (rkt)
-                rd_list_destroy(&topics);
-}
-
-
-
-/**
- * @brief Populate list \p topics with the topic names (strdupped char *) of
- *        all locally known topics.
- *
- * @remark \p rk lock MUST NOT be held
- */
-void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics) {
-        rd_kafka_itopic_t *rkt;
-
-        rd_kafka_rdlock(rk);
-        rd_list_grow(topics, rk->rk_topic_cnt);
-        TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link)
-                rd_list_add(topics, rd_strdup(rkt->rkt_topic->str));
-        rd_kafka_rdunlock(rk);
-}