You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:45 UTC
[17/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade
to librdkafka 0.11.4
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_win32.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_win32.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_win32.c
deleted file mode 100644
index b81ff87..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_win32.c
+++ /dev/null
@@ -1,526 +0,0 @@
-/*
- * librdkafka - The Apache Kafka C/C++ library
- *
- * Copyright (c) 2016 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-/**
- * Impelements SASL Kerberos GSSAPI authentication client
- * using the native Win32 SSPI.
- */
-
-#include "rdkafka_int.h"
-#include "rdkafka_transport.h"
-#include "rdkafka_transport_int.h"
-#include "rdkafka_sasl.h"
-#include "rdkafka_sasl_int.h"
-
-
-#include <stdio.h>
-#include <windows.h>
-#include <ntsecapi.h>
-
-#define SECURITY_WIN32
-#pragma comment(lib, "Secur32.lib")
-#include <Sspi.h>
-
-
-#define RD_KAFKA_SASL_SSPI_CTX_ATTRS \
- (ISC_REQ_CONFIDENTIALITY | ISC_REQ_REPLAY_DETECT | \
- ISC_REQ_SEQUENCE_DETECT | ISC_REQ_CONNECTION)
-
-
- /* Default maximum kerberos token size for newer versions of Windows */
-#define RD_KAFKA_SSPI_MAX_TOKEN_SIZE 48000
-
-
-/**
- * @brief Per-connection SASL state
- */
-typedef struct rd_kafka_sasl_win32_state_s {
- CredHandle *cred;
- CtxtHandle *ctx;
- wchar_t principal[512];
-} rd_kafka_sasl_win32_state_t;
-
-
-/**
- * @returns the string representation of a SECURITY_STATUS error code
- */
-static const char *rd_kafka_sasl_sspi_err2str (SECURITY_STATUS sr) {
- switch (sr)
- {
- case SEC_E_INSUFFICIENT_MEMORY:
- return "Insufficient memory";
- case SEC_E_INTERNAL_ERROR:
- return "Internal error";
- case SEC_E_INVALID_HANDLE:
- return "Invalid handle";
- case SEC_E_INVALID_TOKEN:
- return "Invalid token";
- case SEC_E_LOGON_DENIED:
- return "Logon denied";
- case SEC_E_NO_AUTHENTICATING_AUTHORITY:
- return "No authority could be contacted for authentication.";
- case SEC_E_NO_CREDENTIALS:
- return "No credentials";
- case SEC_E_TARGET_UNKNOWN:
- return "Target unknown";
- case SEC_E_UNSUPPORTED_FUNCTION:
- return "Unsupported functionality";
- case SEC_E_WRONG_CREDENTIAL_HANDLE:
- return "The principal that received the authentication "
- "request is not the same as the one passed "
- "into the pszTargetName parameter. "
- "This indicates a failure in mutual "
- "authentication.";
- default:
- return "(no string representation)";
- }
-}
-
-
-/**
- * @brief Create new CredHandle
- */
-static CredHandle *
-rd_kafka_sasl_sspi_cred_new (rd_kafka_transport_t *rktrans,
- char *errstr, size_t errstr_size) {
- TimeStamp expiry = { 0, 0 };
- SECURITY_STATUS sr;
- CredHandle *cred = rd_calloc(1, sizeof(*cred));
-
- sr = AcquireCredentialsHandle(
- NULL, __TEXT("Kerberos"), SECPKG_CRED_OUTBOUND,
- NULL, NULL, NULL, NULL, cred, &expiry);
-
- if (sr != SEC_E_OK) {
- rd_free(cred);
- rd_snprintf(errstr, errstr_size,
- "Failed to acquire CredentialsHandle: "
- "error code %d", sr);
- return NULL;
- }
-
- rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
- "Acquired Kerberos credentials handle (expiry in %d.%ds)",
- expiry.u.HighPart, expiry.u.LowPart);
-
- return cred;
-}
-
-
-/**
- * @brief Start or continue SSPI-based authentication processing.
- */
-static int rd_kafka_sasl_sspi_continue (rd_kafka_transport_t *rktrans,
- const void *inbuf, size_t insize,
- char *errstr, size_t errstr_size) {
- rd_kafka_sasl_win32_state_t *state = rktrans->rktrans_sasl.state;
- SecBufferDesc outbufdesc, inbufdesc;
- SecBuffer outsecbuf, insecbuf;
- BYTE outbuf[RD_KAFKA_SSPI_MAX_TOKEN_SIZE];
- TimeStamp lifespan = { 0, 0 };
- ULONG ret_ctxattrs;
- CtxtHandle *ctx;
- SECURITY_STATUS sr;
-
- if (inbuf) {
- if (insize > ULONG_MAX) {
- rd_snprintf(errstr, errstr_size,
- "Input buffer length too large (%"PRIusz") "
- "and would overflow", insize);
- return -1;
- }
-
- inbufdesc.ulVersion = SECBUFFER_VERSION;
- inbufdesc.cBuffers = 1;
- inbufdesc.pBuffers = &insecbuf;
-
- insecbuf.cbBuffer = (unsigned long)insize;
- insecbuf.BufferType = SECBUFFER_TOKEN;
- insecbuf.pvBuffer = (void *)inbuf;
- }
-
- outbufdesc.ulVersion = SECBUFFER_VERSION;
- outbufdesc.cBuffers = 1;
- outbufdesc.pBuffers = &outsecbuf;
-
- outsecbuf.cbBuffer = sizeof(outbuf);
- outsecbuf.BufferType = SECBUFFER_TOKEN;
- outsecbuf.pvBuffer = outbuf;
-
- if (!(ctx = state->ctx)) {
- /* First time: allocate context handle
- * which will be filled in by Initialize..() */
- ctx = rd_calloc(1, sizeof(*ctx));
- }
-
- sr = InitializeSecurityContext(
- state->cred, state->ctx, state->principal,
- RD_KAFKA_SASL_SSPI_CTX_ATTRS |
- (state->ctx ? 0 : ISC_REQ_MUTUAL_AUTH | ISC_REQ_IDENTIFY),
- 0, SECURITY_NATIVE_DREP,
- inbuf ? &inbufdesc : NULL,
- 0, ctx, &outbufdesc, &ret_ctxattrs, &lifespan);
-
- if (!state->ctx)
- state->ctx = ctx;
-
- switch (sr)
- {
- case SEC_E_OK:
- rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLAUTH",
- "Initialized security context");
-
- rktrans->rktrans_sasl.complete = 1;
- break;
- case SEC_I_CONTINUE_NEEDED:
- break;
- case SEC_I_COMPLETE_NEEDED:
- case SEC_I_COMPLETE_AND_CONTINUE:
- rd_snprintf(errstr, errstr_size,
- "CompleteAuthToken (Digest auth, %d) "
- "not implemented", sr);
- return -1;
- case SEC_I_INCOMPLETE_CREDENTIALS:
- rd_snprintf(errstr, errstr_size,
- "Incomplete credentials: "
- "invalid or untrusted certificate");
- return -1;
- default:
- rd_snprintf(errstr, errstr_size,
- "InitializeSecurityContext "
- "failed: %s (0x%x)",
- rd_kafka_sasl_sspi_err2str(sr), sr);
- return -1;
- }
-
- if (rd_kafka_sasl_send(rktrans,
- outsecbuf.pvBuffer, outsecbuf.cbBuffer,
- errstr, errstr_size) == -1)
- return -1;
-
- return 0;
-}
-
-
-/**
-* @brief Sends the token response to the broker
-*/
-static int rd_kafka_sasl_win32_send_response (rd_kafka_transport_t *rktrans,
- char *errstr,
- size_t errstr_size,
- SecBuffer *server_token) {
- rd_kafka_sasl_win32_state_t *state = rktrans->rktrans_sasl.state;
- SECURITY_STATUS sr;
- SecBuffer in_buffer;
- SecBuffer out_buffer;
- SecBuffer buffers[4];
- SecBufferDesc buffer_desc;
- SecPkgContext_Sizes sizes;
- SecPkgCredentials_NamesA names;
- int send_response;
- size_t namelen;
-
- sr = QueryContextAttributes(state->ctx, SECPKG_ATTR_SIZES, &sizes);
- if (sr != SEC_E_OK) {
- rd_snprintf(errstr, errstr_size,
- "Send response failed: %s (0x%x)",
- rd_kafka_sasl_sspi_err2str(sr), sr);
- return -1;
- }
-
- RD_MEMZERO(names);
- sr = QueryCredentialsAttributesA(state->cred, SECPKG_CRED_ATTR_NAMES,
- &names);
-
- if (sr != SEC_E_OK) {
- rd_snprintf(errstr, errstr_size,
- "Query credentials failed: %s (0x%x)",
- rd_kafka_sasl_sspi_err2str(sr), sr);
- return -1;
- }
-
- rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLAUTH",
- "Sending response message for user: %s", names.sUserName);
-
- namelen = strlen(names.sUserName) + 1;
- if (namelen > ULONG_MAX) {
- rd_snprintf(errstr, errstr_size,
- "User name length too large (%"PRIusz") "
- "and would overflow");
- return -1;
- }
-
- in_buffer.pvBuffer = (char *)names.sUserName;
- in_buffer.cbBuffer = (unsigned long)namelen;
-
- buffer_desc.cBuffers = 4;
- buffer_desc.pBuffers = buffers;
- buffer_desc.ulVersion = SECBUFFER_VERSION;
-
- /* security trailer */
- buffers[0].cbBuffer = sizes.cbSecurityTrailer;
- buffers[0].BufferType = SECBUFFER_TOKEN;
- buffers[0].pvBuffer = rd_calloc(1, sizes.cbSecurityTrailer);
-
- /* protection level and buffer size received from the server */
- buffers[1].cbBuffer = server_token->cbBuffer;
- buffers[1].BufferType = SECBUFFER_DATA;
- buffers[1].pvBuffer = rd_calloc(1, server_token->cbBuffer);
- memcpy(buffers[1].pvBuffer, server_token->pvBuffer, server_token->cbBuffer);
-
- /* user principal */
- buffers[2].cbBuffer = in_buffer.cbBuffer;
- buffers[2].BufferType = SECBUFFER_DATA;
- buffers[2].pvBuffer = rd_calloc(1, buffers[2].cbBuffer);
- memcpy(buffers[2].pvBuffer, in_buffer.pvBuffer, in_buffer.cbBuffer);
-
- /* padding */
- buffers[3].cbBuffer = sizes.cbBlockSize;
- buffers[3].BufferType = SECBUFFER_PADDING;
- buffers[3].pvBuffer = rd_calloc(1, buffers[2].cbBuffer);
-
- sr = EncryptMessage(state->ctx, KERB_WRAP_NO_ENCRYPT, &buffer_desc, 0);
- if (sr != SEC_E_OK) {
- rd_snprintf(errstr, errstr_size,
- "Encrypt message failed: %s (0x%x)",
- rd_kafka_sasl_sspi_err2str(sr), sr);
-
- FreeContextBuffer(in_buffer.pvBuffer);
- rd_free(buffers[0].pvBuffer);
- rd_free(buffers[1].pvBuffer);
- rd_free(buffers[2].pvBuffer);
- rd_free(buffers[3].pvBuffer);
- return -1;
- }
-
- out_buffer.cbBuffer = buffers[0].cbBuffer +
- buffers[1].cbBuffer +
- buffers[2].cbBuffer +
- buffers[3].cbBuffer;
-
- out_buffer.pvBuffer = rd_calloc(1, buffers[0].cbBuffer +
- buffers[1].cbBuffer +
- buffers[2].cbBuffer +
- buffers[3].cbBuffer);
-
- memcpy(out_buffer.pvBuffer, buffers[0].pvBuffer, buffers[0].cbBuffer);
-
- memcpy((unsigned char *)out_buffer.pvBuffer + (int)buffers[0].cbBuffer,
- buffers[1].pvBuffer, buffers[1].cbBuffer);
-
- memcpy((unsigned char *)out_buffer.pvBuffer +
- buffers[0].cbBuffer + buffers[1].cbBuffer,
- buffers[2].pvBuffer, buffers[2].cbBuffer);
-
- memcpy((unsigned char *)out_buffer.pvBuffer +
- buffers[0].cbBuffer + buffers[1].cbBuffer + buffers[2].cbBuffer,
- buffers[3].pvBuffer, buffers[3].cbBuffer);
-
- send_response = rd_kafka_sasl_send(rktrans,
- out_buffer.pvBuffer,
- out_buffer.cbBuffer,
- errstr, errstr_size);
-
- FreeContextBuffer(in_buffer.pvBuffer);
- rd_free(out_buffer.pvBuffer);
- rd_free(buffers[0].pvBuffer);
- rd_free(buffers[1].pvBuffer);
- rd_free(buffers[2].pvBuffer);
- rd_free(buffers[3].pvBuffer);
-
- return send_response;
-}
-
-
-/**
-* @brief Unwrap and validate token response from broker.
-*/
-static int rd_kafka_sasl_win32_validate_token (rd_kafka_transport_t *rktrans,
- const void *inbuf,
- size_t insize,
- char *errstr,
- size_t errstr_size) {
- rd_kafka_sasl_win32_state_t *state = rktrans->rktrans_sasl.state;
- SecBuffer buffers[2];
- SecBufferDesc buffer_desc;
- SECURITY_STATUS sr;
- char supported;
-
- if (insize > ULONG_MAX) {
- rd_snprintf(errstr, errstr_size,
- "Input buffer length too large (%"PRIusz") "
- "and would overflow");
- return -1;
- }
-
- buffer_desc.cBuffers = 2;
- buffer_desc.pBuffers = buffers;
- buffer_desc.ulVersion = SECBUFFER_VERSION;
-
- buffers[0].cbBuffer = (unsigned long)insize;
- buffers[0].BufferType = SECBUFFER_STREAM;
- buffers[0].pvBuffer = (void *)inbuf;
-
- buffers[1].cbBuffer = 0;
- buffers[1].BufferType = SECBUFFER_DATA;
- buffers[1].pvBuffer = NULL;
-
- sr = DecryptMessage(state->ctx, &buffer_desc, 0, NULL);
- if (sr != SEC_E_OK) {
- rd_snprintf(errstr, errstr_size,
- "Decrypt message failed: %s (0x%x)",
- rd_kafka_sasl_sspi_err2str(sr), sr);
- return -1;
- }
-
- if (buffers[1].cbBuffer < 4) {
- rd_snprintf(errstr, errstr_size,
- "Validate token: "
- "invalid message");
- return -1;
- }
-
- supported = ((char *)buffers[1].pvBuffer)[0];
- if (!(supported & 1)) {
- rd_snprintf(errstr, errstr_size,
- "Validate token: "
- "server does not support layer");
- return -1;
- }
-
- rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLAUTH",
- "Validated server token");
-
- return rd_kafka_sasl_win32_send_response(rktrans, errstr,
- errstr_size, &buffers[1]);
-}
-
-
-/**
-* @brief Handle SASL frame received from broker.
-*/
-static int rd_kafka_sasl_win32_recv (struct rd_kafka_transport_s *rktrans,
- const void *buf, size_t size,
- char *errstr, size_t errstr_size) {
- rd_kafka_sasl_win32_state_t *state = rktrans->rktrans_sasl.state;
-
- if (rktrans->rktrans_sasl.complete) {
- if (rd_kafka_sasl_win32_validate_token(
- rktrans, buf, size, errstr, errstr_size) == -1) {
- rktrans->rktrans_sasl.complete = 0;
- return -1;
- }
-
- /* Final ack from broker. */
- rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLAUTH",
- "Authenticated");
- rd_kafka_sasl_auth_done(rktrans);
- return 0;
- }
-
- return rd_kafka_sasl_sspi_continue(rktrans, buf, size,
- errstr, errstr_size);
-}
-
-
-/**
-* @brief Decommission SSPI state
-*/
-static void rd_kafka_sasl_win32_close (rd_kafka_transport_t *rktrans) {
- rd_kafka_sasl_win32_state_t *state = rktrans->rktrans_sasl.state;
-
- if (!state)
- return;
-
- if (state->ctx) {
- DeleteSecurityContext(state->ctx);
- rd_free(state->ctx);
- }
- if (state->cred) {
- FreeCredentialsHandle(state->cred);
- rd_free(state->cred);
- }
- rd_free(state);
-}
-
-
-static int rd_kafka_sasl_win32_client_new (rd_kafka_transport_t *rktrans,
- const char *hostname,
- char *errstr, size_t errstr_size) {
- rd_kafka_t *rk = rktrans->rktrans_rkb->rkb_rk;
- rd_kafka_sasl_win32_state_t *state;
-
- if (strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) {
- rd_snprintf(errstr, errstr_size,
- "SASL mechanism \"%s\" not supported on platform",
- rk->rk_conf.sasl.mechanisms);
- return -1;
- }
-
- state = rd_calloc(1, sizeof(*state));
- rktrans->rktrans_sasl.state = state;
-
- _snwprintf(state->principal, RD_ARRAYSIZE(state->principal),
- L"%hs/%hs",
- rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.principal,
- hostname);
-
- state->cred = rd_kafka_sasl_sspi_cred_new(rktrans, errstr,
- errstr_size);
- if (!state->cred)
- return -1;
-
- if (rd_kafka_sasl_sspi_continue(rktrans, NULL, 0,
- errstr, errstr_size) == -1)
- return -1;
-
- return 0;
-}
-
-/**
- * @brief Validate config
- */
-static int rd_kafka_sasl_win32_conf_validate (rd_kafka_t *rk,
- char *errstr,
- size_t errstr_size) {
- if (!rk->rk_conf.sasl.principal) {
- rd_snprintf(errstr, errstr_size,
- "sasl.kerberos.principal must be set");
- return -1;
- }
-
- return 0;
-}
-
-const struct rd_kafka_sasl_provider rd_kafka_sasl_win32_provider = {
- .name = "Win32 SSPI",
- .client_new = rd_kafka_sasl_win32_client_new,
- .recv = rd_kafka_sasl_win32_recv,
- .close = rd_kafka_sasl_win32_close,
- .conf_validate = rd_kafka_sasl_win32_conf_validate
-};
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_subscription.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_subscription.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_subscription.c
deleted file mode 100644
index 18a2458..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_subscription.c
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-
-/**
- * This is the high level consumer API which is mutually exclusive
- * with the old legacy simple consumer.
- * Only one of these interfaces may be used on a given rd_kafka_t handle.
- */
-
-#include "rdkafka_int.h"
-#include "rdkafka_subscription.h"
-
-
-rd_kafka_resp_err_t rd_kafka_unsubscribe (rd_kafka_t *rk) {
- rd_kafka_cgrp_t *rkcg;
-
- if (!(rkcg = rd_kafka_cgrp_get(rk)))
- return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
- return rd_kafka_op_err_destroy(rd_kafka_op_req2(rkcg->rkcg_ops,
- RD_KAFKA_OP_SUBSCRIBE));
-}
-
-
-/** @returns 1 if the topic is invalid (bad regex, empty), else 0 if valid. */
-static size_t _invalid_topic_cb (const rd_kafka_topic_partition_t *rktpar,
- void *opaque) {
- rd_regex_t *re;
- char errstr[1];
-
- if (!*rktpar->topic)
- return 1;
-
- if (*rktpar->topic != '^')
- return 0;
-
- if (!(re = rd_regex_comp(rktpar->topic, errstr, sizeof(errstr))))
- return 1;
-
- rd_regex_destroy(re);
-
- return 0;
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_subscribe (rd_kafka_t *rk,
- const rd_kafka_topic_partition_list_t *topics) {
-
- rd_kafka_op_t *rko;
- rd_kafka_cgrp_t *rkcg;
-
- if (!(rkcg = rd_kafka_cgrp_get(rk)))
- return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
- /* Validate topics */
- if (topics->cnt == 0 ||
- rd_kafka_topic_partition_list_sum(topics,
- _invalid_topic_cb, NULL) > 0)
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
-
- rko = rd_kafka_op_new(RD_KAFKA_OP_SUBSCRIBE);
- rko->rko_u.subscribe.topics = rd_kafka_topic_partition_list_copy(topics);
-
- return rd_kafka_op_err_destroy(
- rd_kafka_op_req(rkcg->rkcg_ops, rko, RD_POLL_INFINITE));
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_assign (rd_kafka_t *rk,
- const rd_kafka_topic_partition_list_t *partitions) {
- rd_kafka_op_t *rko;
- rd_kafka_cgrp_t *rkcg;
-
- if (!(rkcg = rd_kafka_cgrp_get(rk)))
- return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
- rko = rd_kafka_op_new(RD_KAFKA_OP_ASSIGN);
- if (partitions)
- rko->rko_u.assign.partitions =
- rd_kafka_topic_partition_list_copy(partitions);
-
- return rd_kafka_op_err_destroy(
- rd_kafka_op_req(rkcg->rkcg_ops, rko, RD_POLL_INFINITE));
-}
-
-
-
-rd_kafka_resp_err_t
-rd_kafka_assignment (rd_kafka_t *rk,
- rd_kafka_topic_partition_list_t **partitions) {
- rd_kafka_op_t *rko;
- rd_kafka_resp_err_t err;
- rd_kafka_cgrp_t *rkcg;
-
- if (!(rkcg = rd_kafka_cgrp_get(rk)))
- return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
- rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_GET_ASSIGNMENT);
- if (!rko)
- return RD_KAFKA_RESP_ERR__TIMED_OUT;
-
- err = rko->rko_err;
-
- *partitions = rko->rko_u.assign.partitions;
- rko->rko_u.assign.partitions = NULL;
- rd_kafka_op_destroy(rko);
-
- if (!*partitions && !err) {
- /* Create an empty list for convenience of the caller */
- *partitions = rd_kafka_topic_partition_list_new(0);
- }
-
- return err;
-}
-
-rd_kafka_resp_err_t
-rd_kafka_subscription (rd_kafka_t *rk,
- rd_kafka_topic_partition_list_t **topics){
- rd_kafka_op_t *rko;
- rd_kafka_resp_err_t err;
- rd_kafka_cgrp_t *rkcg;
-
- if (!(rkcg = rd_kafka_cgrp_get(rk)))
- return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
- rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_GET_SUBSCRIPTION);
- if (!rko)
- return RD_KAFKA_RESP_ERR__TIMED_OUT;
-
- err = rko->rko_err;
-
- *topics = rko->rko_u.subscribe.topics;
- rko->rko_u.subscribe.topics = NULL;
- rd_kafka_op_destroy(rko);
-
- if (!*topics && !err) {
- /* Create an empty list for convenience of the caller */
- *topics = rd_kafka_topic_partition_list_new(0);
- }
-
- return err;
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_pause_partitions (rd_kafka_t *rk,
- rd_kafka_topic_partition_list_t *partitions) {
- return rd_kafka_toppars_pause_resume(rk, 1, RD_KAFKA_TOPPAR_F_APP_PAUSE,
- partitions);
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_resume_partitions (rd_kafka_t *rk,
- rd_kafka_topic_partition_list_t *partitions) {
- return rd_kafka_toppars_pause_resume(rk, 0, RD_KAFKA_TOPPAR_F_APP_PAUSE,
- partitions);
-}
-
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_subscription.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_subscription.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_subscription.h
deleted file mode 100644
index 0c51712..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_subscription.h
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_timer.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_timer.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_timer.c
deleted file mode 100644
index 7947980..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_timer.c
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2013, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rdkafka_int.h"
-#include "rd.h"
-#include "rdtime.h"
-#include "rdsysqueue.h"
-
-
-static RD_INLINE void rd_kafka_timers_lock (rd_kafka_timers_t *rkts) {
- mtx_lock(&rkts->rkts_lock);
-}
-
-static RD_INLINE void rd_kafka_timers_unlock (rd_kafka_timers_t *rkts) {
- mtx_unlock(&rkts->rkts_lock);
-}
-
-
-static RD_INLINE int rd_kafka_timer_started (const rd_kafka_timer_t *rtmr) {
- return rtmr->rtmr_interval ? 1 : 0;
-}
-
-
-static RD_INLINE int rd_kafka_timer_scheduled (const rd_kafka_timer_t *rtmr) {
- return rtmr->rtmr_next ? 1 : 0;
-}
-
-
-static int rd_kafka_timer_cmp (const void *_a, const void *_b) {
- const rd_kafka_timer_t *a = _a, *b = _b;
- return (int)(a->rtmr_next - b->rtmr_next);
-}
-
-static void rd_kafka_timer_unschedule (rd_kafka_timers_t *rkts,
- rd_kafka_timer_t *rtmr) {
- TAILQ_REMOVE(&rkts->rkts_timers, rtmr, rtmr_link);
- rtmr->rtmr_next = 0;
-}
-
-static void rd_kafka_timer_schedule (rd_kafka_timers_t *rkts,
- rd_kafka_timer_t *rtmr, int extra_us) {
- rd_kafka_timer_t *first;
-
- /* Timer has been stopped */
- if (!rtmr->rtmr_interval)
- return;
-
- /* Timers framework is terminating */
- if (unlikely(!rkts->rkts_enabled))
- return;
-
- rtmr->rtmr_next = rd_clock() + rtmr->rtmr_interval + extra_us;
-
- if (!(first = TAILQ_FIRST(&rkts->rkts_timers)) ||
- first->rtmr_next > rtmr->rtmr_next) {
- TAILQ_INSERT_HEAD(&rkts->rkts_timers, rtmr, rtmr_link);
- cnd_signal(&rkts->rkts_cond);
- } else
- TAILQ_INSERT_SORTED(&rkts->rkts_timers, rtmr,
- rd_kafka_timer_t *, rtmr_link,
- rd_kafka_timer_cmp);
-}
-
-/**
- * Stop a timer that may be started.
- * If called from inside a timer callback 'lock' must be 0, else 1.
- */
-void rd_kafka_timer_stop (rd_kafka_timers_t *rkts, rd_kafka_timer_t *rtmr,
- int lock) {
- if (lock)
- rd_kafka_timers_lock(rkts);
-
- if (!rd_kafka_timer_started(rtmr)) {
- if (lock)
- rd_kafka_timers_unlock(rkts);
- return;
- }
-
- if (rd_kafka_timer_scheduled(rtmr))
- rd_kafka_timer_unschedule(rkts, rtmr);
-
- rtmr->rtmr_interval = 0;
-
- if (lock)
- rd_kafka_timers_unlock(rkts);
-}
-
-
-/**
- * Start the provided timer with the given interval.
- * Upon expiration of the interval (us) the callback will be called in the
- * main rdkafka thread, after callback return the timer will be restarted.
- *
- * Use rd_kafka_timer_stop() to stop a timer.
- */
-void rd_kafka_timer_start (rd_kafka_timers_t *rkts,
- rd_kafka_timer_t *rtmr, rd_ts_t interval,
- void (*callback) (rd_kafka_timers_t *rkts, void *arg),
- void *arg) {
- rd_kafka_timers_lock(rkts);
-
- rd_kafka_timer_stop(rkts, rtmr, 0/*!lock*/);
-
- rtmr->rtmr_interval = interval;
- rtmr->rtmr_callback = callback;
- rtmr->rtmr_arg = arg;
-
- rd_kafka_timer_schedule(rkts, rtmr, 0);
-
- rd_kafka_timers_unlock(rkts);
-}
-
-
-/**
- * Delay the next timer invocation by 'backoff_us'
- */
-void rd_kafka_timer_backoff (rd_kafka_timers_t *rkts,
- rd_kafka_timer_t *rtmr, int backoff_us) {
- rd_kafka_timers_lock(rkts);
- if (rd_kafka_timer_scheduled(rtmr))
- rd_kafka_timer_unschedule(rkts, rtmr);
- rd_kafka_timer_schedule(rkts, rtmr, backoff_us);
- rd_kafka_timers_unlock(rkts);
-}
-
-
-/**
- * @returns the delta time to the next time (>=0) this timer fires, or -1
- * if timer is stopped.
- */
-rd_ts_t rd_kafka_timer_next (rd_kafka_timers_t *rkts, rd_kafka_timer_t *rtmr,
- int do_lock) {
- rd_ts_t now = rd_clock();
- rd_ts_t delta = -1;
-
- if (do_lock)
- rd_kafka_timers_lock(rkts);
-
- if (rd_kafka_timer_scheduled(rtmr)) {
- delta = rtmr->rtmr_next - now;
- if (delta < 0)
- delta = 0;
- }
-
- if (do_lock)
- rd_kafka_timers_unlock(rkts);
-
- return delta;
-}
-
-
-/**
- * Interrupt rd_kafka_timers_run().
- * Used for termination.
- */
-void rd_kafka_timers_interrupt (rd_kafka_timers_t *rkts) {
- rd_kafka_timers_lock(rkts);
- cnd_signal(&rkts->rkts_cond);
- rd_kafka_timers_unlock(rkts);
-}
-
-
-/**
- * Returns the delta time to the next timer to fire, capped by 'timeout_ms'.
- */
-rd_ts_t rd_kafka_timers_next (rd_kafka_timers_t *rkts, int timeout_us,
- int do_lock) {
- rd_ts_t now = rd_clock();
- rd_ts_t sleeptime = 0;
- rd_kafka_timer_t *rtmr;
-
- if (do_lock)
- rd_kafka_timers_lock(rkts);
-
- if (likely((rtmr = TAILQ_FIRST(&rkts->rkts_timers)) != NULL)) {
- sleeptime = rtmr->rtmr_next - now;
- if (sleeptime < 0)
- sleeptime = 0;
- else if (sleeptime > (rd_ts_t)timeout_us)
- sleeptime = (rd_ts_t)timeout_us;
- } else
- sleeptime = (rd_ts_t)timeout_us;
-
- if (do_lock)
- rd_kafka_timers_unlock(rkts);
-
- return sleeptime;
-}
-
-
-/**
- * Dispatch timers.
- * Will block up to 'timeout' microseconds before returning.
- */
-void rd_kafka_timers_run (rd_kafka_timers_t *rkts, int timeout_us) {
- rd_ts_t now = rd_clock();
- rd_ts_t end = now + timeout_us;
-
- rd_kafka_timers_lock(rkts);
-
- while (!rd_atomic32_get(&rkts->rkts_rk->rk_terminate) && now <= end) {
- int64_t sleeptime;
- rd_kafka_timer_t *rtmr;
-
- if (timeout_us != RD_POLL_NOWAIT) {
- sleeptime = rd_kafka_timers_next(rkts,
- timeout_us,
- 0/*no-lock*/);
-
- if (sleeptime > 0) {
- cnd_timedwait_ms(&rkts->rkts_cond,
- &rkts->rkts_lock,
- (int)(sleeptime / 1000));
-
- }
- }
-
- now = rd_clock();
-
- while ((rtmr = TAILQ_FIRST(&rkts->rkts_timers)) &&
- rtmr->rtmr_next <= now) {
-
- rd_kafka_timer_unschedule(rkts, rtmr);
- rd_kafka_timers_unlock(rkts);
-
- rtmr->rtmr_callback(rkts, rtmr->rtmr_arg);
-
- rd_kafka_timers_lock(rkts);
- /* Restart timer, unless it has been stopped, or
- * already reschedueld (start()ed) from callback. */
- if (rd_kafka_timer_started(rtmr) &&
- !rd_kafka_timer_scheduled(rtmr))
- rd_kafka_timer_schedule(rkts, rtmr, 0);
- }
-
- if (timeout_us == RD_POLL_NOWAIT) {
- /* Only iterate once, even if rd_clock doesn't change */
- break;
- }
- }
-
- rd_kafka_timers_unlock(rkts);
-}
-
-
-void rd_kafka_timers_destroy (rd_kafka_timers_t *rkts) {
- rd_kafka_timer_t *rtmr;
-
- rd_kafka_timers_lock(rkts);
- rkts->rkts_enabled = 0;
- while ((rtmr = TAILQ_FIRST(&rkts->rkts_timers)))
- rd_kafka_timer_stop(rkts, rtmr, 0);
- rd_kafka_assert(rkts->rkts_rk, TAILQ_EMPTY(&rkts->rkts_timers));
- rd_kafka_timers_unlock(rkts);
-
- cnd_destroy(&rkts->rkts_cond);
- mtx_destroy(&rkts->rkts_lock);
-}
-
-void rd_kafka_timers_init (rd_kafka_timers_t *rkts, rd_kafka_t *rk) {
- memset(rkts, 0, sizeof(*rkts));
- rkts->rkts_rk = rk;
- TAILQ_INIT(&rkts->rkts_timers);
- mtx_init(&rkts->rkts_lock, mtx_plain);
- cnd_init(&rkts->rkts_cond);
- rkts->rkts_enabled = 1;
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_timer.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_timer.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_timer.h
deleted file mode 100644
index de01795..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_timer.h
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2013, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-#include "rd.h"
-
-/* A timer engine. */
-typedef struct rd_kafka_timers_s {
-
- TAILQ_HEAD(, rd_kafka_timer_s) rkts_timers;
-
- struct rd_kafka_s *rkts_rk;
-
- mtx_t rkts_lock;
- cnd_t rkts_cond;
-
- int rkts_enabled;
-} rd_kafka_timers_t;
-
-
-typedef struct rd_kafka_timer_s {
- TAILQ_ENTRY(rd_kafka_timer_s) rtmr_link;
-
- rd_ts_t rtmr_next;
- rd_ts_t rtmr_interval; /* interval in microseconds */
-
- void (*rtmr_callback) (rd_kafka_timers_t *rkts, void *arg);
- void *rtmr_arg;
-} rd_kafka_timer_t;
-
-
-
-void rd_kafka_timer_stop (rd_kafka_timers_t *rkts,
- rd_kafka_timer_t *rtmr, int lock);
-void rd_kafka_timer_start (rd_kafka_timers_t *rkts,
- rd_kafka_timer_t *rtmr, rd_ts_t interval,
- void (*callback) (rd_kafka_timers_t *rkts,
- void *arg),
- void *arg);
-
-void rd_kafka_timer_backoff (rd_kafka_timers_t *rkts,
- rd_kafka_timer_t *rtmr, int backoff_us);
-rd_ts_t rd_kafka_timer_next (rd_kafka_timers_t *rkts, rd_kafka_timer_t *rtmr,
- int do_lock);
-
-void rd_kafka_timers_interrupt (rd_kafka_timers_t *rkts);
-rd_ts_t rd_kafka_timers_next (rd_kafka_timers_t *rkts, int timeout_ms,
- int do_lock);
-void rd_kafka_timers_run (rd_kafka_timers_t *rkts, int timeout_us);
-void rd_kafka_timers_destroy (rd_kafka_timers_t *rkts);
-void rd_kafka_timers_init (rd_kafka_timers_t *rkte, rd_kafka_t *rk);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_topic.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_topic.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_topic.c
deleted file mode 100644
index 3975d80..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_topic.c
+++ /dev/null
@@ -1,1306 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012,2013 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rd.h"
-#include "rdkafka_int.h"
-#include "rdkafka_msg.h"
-#include "rdkafka_topic.h"
-#include "rdkafka_partition.h"
-#include "rdkafka_broker.h"
-#include "rdkafka_cgrp.h"
-#include "rdkafka_metadata.h"
-#include "rdlog.h"
-#include "rdsysqueue.h"
-#include "rdtime.h"
-#include "rdregex.h"
-
-const char *rd_kafka_topic_state_names[] = {
- "unknown",
- "exists",
- "notexists"
-};
-
-
-
-static int
-rd_kafka_topic_metadata_update (rd_kafka_itopic_t *rkt,
- const struct rd_kafka_metadata_topic *mdt,
- rd_ts_t ts_insert);
-
-
-/**
- * @brief Increases the app's topic reference count and returns the app pointer.
- *
- * The app refcounts are implemented separately from the librdkafka refcounts
- * and to play nicely with shptr we keep one single shptr for the application
- * and increase/decrease a separate rkt_app_refcnt to keep track of its use.
- *
- * This only covers topic_new() & topic_destroy().
- * The topic_t exposed in rd_kafka_message_t is NOT covered and is handled
- * like a standard shptr -> app pointer conversion (keep_a()).
- *
- * @returns a (new) rkt app reference.
- *
- * @remark \p rkt and \p s_rkt are mutually exclusive.
- */
-static rd_kafka_topic_t *rd_kafka_topic_keep_app (rd_kafka_itopic_t *rkt) {
- rd_kafka_topic_t *app_rkt;
-
- mtx_lock(&rkt->rkt_app_lock);
- rkt->rkt_app_refcnt++;
- if (!(app_rkt = rkt->rkt_app_rkt))
- app_rkt = rkt->rkt_app_rkt = rd_kafka_topic_keep_a(rkt);
- mtx_unlock(&rkt->rkt_app_lock);
-
- return app_rkt;
-}
-
-/**
- * @brief drop rkt app reference
- */
-static void rd_kafka_topic_destroy_app (rd_kafka_topic_t *app_rkt) {
- rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
- shptr_rd_kafka_itopic_t *s_rkt = NULL;
-
- mtx_lock(&rkt->rkt_app_lock);
- rd_kafka_assert(NULL, rkt->rkt_app_refcnt > 0);
- rkt->rkt_app_refcnt--;
- if (unlikely(rkt->rkt_app_refcnt == 0)) {
- rd_kafka_assert(NULL, rkt->rkt_app_rkt);
- s_rkt = rd_kafka_topic_a2s(app_rkt);
- rkt->rkt_app_rkt = NULL;
- }
- mtx_unlock(&rkt->rkt_app_lock);
-
- if (s_rkt) /* final app reference lost, destroy the shared ptr. */
- rd_kafka_topic_destroy0(s_rkt);
-}
-
-
-/**
- * Final destructor for topic. Refcnt must be 0.
- */
-void rd_kafka_topic_destroy_final (rd_kafka_itopic_t *rkt) {
-
- rd_kafka_assert(rkt->rkt_rk, rd_refcnt_get(&rkt->rkt_refcnt) == 0);
-
- rd_kafka_wrlock(rkt->rkt_rk);
- TAILQ_REMOVE(&rkt->rkt_rk->rk_topics, rkt, rkt_link);
- rkt->rkt_rk->rk_topic_cnt--;
- rd_kafka_wrunlock(rkt->rkt_rk);
-
- rd_kafka_assert(rkt->rkt_rk, rd_list_empty(&rkt->rkt_desp));
- rd_list_destroy(&rkt->rkt_desp);
-
- if (rkt->rkt_topic)
- rd_kafkap_str_destroy(rkt->rkt_topic);
-
- rd_kafka_anyconf_destroy(_RK_TOPIC, &rkt->rkt_conf);
-
- mtx_destroy(&rkt->rkt_app_lock);
- rwlock_destroy(&rkt->rkt_lock);
- rd_refcnt_destroy(&rkt->rkt_refcnt);
-
- rd_free(rkt);
-}
-
-/**
- * Application destroy
- */
-void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt) {
- rd_kafka_topic_destroy_app(app_rkt);
-}
-
-
-/**
- * Finds and returns a topic based on its name, or NULL if not found.
- * The 'rkt' refcount is increased by one and the caller must call
- * rd_kafka_topic_destroy() when it is done with the topic to decrease
- * the refcount.
- *
- * Locality: any thread
- */
-shptr_rd_kafka_itopic_t *rd_kafka_topic_find_fl (const char *func, int line,
- rd_kafka_t *rk,
- const char *topic, int do_lock){
- rd_kafka_itopic_t *rkt;
- shptr_rd_kafka_itopic_t *s_rkt = NULL;
-
- if (do_lock)
- rd_kafka_rdlock(rk);
- TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
- if (!rd_kafkap_str_cmp_str(rkt->rkt_topic, topic)) {
- s_rkt = rd_kafka_topic_keep(rkt);
- break;
- }
- }
- if (do_lock)
- rd_kafka_rdunlock(rk);
-
- return s_rkt;
-}
-
-/**
- * Same semantics as ..find() but takes a Kafka protocol string instead.
- */
-shptr_rd_kafka_itopic_t *rd_kafka_topic_find0_fl (const char *func, int line,
- rd_kafka_t *rk,
- const rd_kafkap_str_t *topic) {
- rd_kafka_itopic_t *rkt;
- shptr_rd_kafka_itopic_t *s_rkt = NULL;
-
- rd_kafka_rdlock(rk);
- TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
- if (!rd_kafkap_str_cmp(rkt->rkt_topic, topic)) {
- s_rkt = rd_kafka_topic_keep(rkt);
- break;
- }
- }
- rd_kafka_rdunlock(rk);
-
- return s_rkt;
-}
-
-
-/**
- * Compare shptr_rd_kafka_itopic_t for underlying itopic_t
- */
-int rd_kafka_topic_cmp_s_rkt (const void *_a, const void *_b) {
- shptr_rd_kafka_itopic_t *a = (void *)_a, *b = (void *)_b;
- rd_kafka_itopic_t *rkt_a = rd_kafka_topic_s2i(a);
- rd_kafka_itopic_t *rkt_b = rd_kafka_topic_s2i(b);
-
- if (rkt_a == rkt_b)
- return 0;
-
- return rd_kafkap_str_cmp(rkt_a->rkt_topic, rkt_b->rkt_topic);
-}
-
-
-/**
- * Create new topic handle.
- *
- * Locality: any
- */
-shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk,
- const char *topic,
- rd_kafka_topic_conf_t *conf,
- int *existing,
- int do_lock) {
- rd_kafka_itopic_t *rkt;
- shptr_rd_kafka_itopic_t *s_rkt;
- const struct rd_kafka_metadata_cache_entry *rkmce;
-
- /* Verify configuration.
- * Maximum topic name size + headers must never exceed message.max.bytes
- * which is min-capped to 1000.
- * See rd_kafka_broker_produce_toppar() and rdkafka_conf.c */
- if (!topic || strlen(topic) > 512) {
- if (conf)
- rd_kafka_topic_conf_destroy(conf);
- rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
- EINVAL);
- return NULL;
- }
-
- if (do_lock)
- rd_kafka_wrlock(rk);
- if ((s_rkt = rd_kafka_topic_find(rk, topic, 0/*no lock*/))) {
- if (do_lock)
- rd_kafka_wrunlock(rk);
- if (conf)
- rd_kafka_topic_conf_destroy(conf);
- if (existing)
- *existing = 1;
- return s_rkt;
- }
-
- if (existing)
- *existing = 0;
-
- rkt = rd_calloc(1, sizeof(*rkt));
-
- rkt->rkt_topic = rd_kafkap_str_new(topic, -1);
- rkt->rkt_rk = rk;
-
- if (!conf) {
- if (rk->rk_conf.topic_conf)
- conf = rd_kafka_topic_conf_dup(rk->rk_conf.topic_conf);
- else
- conf = rd_kafka_topic_conf_new();
- }
- rkt->rkt_conf = *conf;
- rd_free(conf); /* explicitly not rd_kafka_topic_destroy()
- * since we dont want to rd_free internal members,
- * just the placeholder. The internal members
- * were copied on the line above. */
-
- /* Default partitioner: consistent_random */
- if (!rkt->rkt_conf.partitioner)
- rkt->rkt_conf.partitioner = rd_kafka_msg_partitioner_consistent_random;
-
- if (rkt->rkt_conf.compression_codec == RD_KAFKA_COMPRESSION_INHERIT)
- rkt->rkt_conf.compression_codec = rk->rk_conf.compression_codec;
-
- rd_kafka_dbg(rk, TOPIC, "TOPIC", "New local topic: %.*s",
- RD_KAFKAP_STR_PR(rkt->rkt_topic));
-
- rd_list_init(&rkt->rkt_desp, 16, NULL);
- rd_refcnt_init(&rkt->rkt_refcnt, 0);
-
- s_rkt = rd_kafka_topic_keep(rkt);
-
- rwlock_init(&rkt->rkt_lock);
- mtx_init(&rkt->rkt_app_lock, mtx_plain);
-
- /* Create unassigned partition */
- rkt->rkt_ua = rd_kafka_toppar_new(rkt, RD_KAFKA_PARTITION_UA);
-
- TAILQ_INSERT_TAIL(&rk->rk_topics, rkt, rkt_link);
- rk->rk_topic_cnt++;
-
- /* Populate from metadata cache. */
- if ((rkmce = rd_kafka_metadata_cache_find(rk, topic, 1/*valid*/))) {
- if (existing)
- *existing = 1;
-
- rd_kafka_topic_metadata_update(rkt, &rkmce->rkmce_mtopic,
- rkmce->rkmce_ts_insert);
- }
-
- if (do_lock)
- rd_kafka_wrunlock(rk);
-
- return s_rkt;
-}
-
-
-
-/**
- * Create new app topic handle.
- *
- * Locality: application thread
- */
-rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic,
- rd_kafka_topic_conf_t *conf) {
- shptr_rd_kafka_itopic_t *s_rkt;
- rd_kafka_itopic_t *rkt;
- rd_kafka_topic_t *app_rkt;
- int existing;
-
- s_rkt = rd_kafka_topic_new0(rk, topic, conf, &existing, 1/*lock*/);
- if (!s_rkt)
- return NULL;
-
- rkt = rd_kafka_topic_s2i(s_rkt);
-
- /* Save a shared pointer to be used in callbacks. */
- app_rkt = rd_kafka_topic_keep_app(rkt);
-
- /* Query for the topic leader (async) */
- if (!existing)
- rd_kafka_topic_leader_query(rk, rkt);
-
- /* Drop our reference since there is already/now a rkt_app_rkt */
- rd_kafka_topic_destroy0(s_rkt);
-
- return app_rkt;
-}
-
-
-
-/**
- * Sets the state for topic.
- * NOTE: rd_kafka_topic_wrlock(rkt) MUST be held
- */
-static void rd_kafka_topic_set_state (rd_kafka_itopic_t *rkt, int state) {
-
- if ((int)rkt->rkt_state == state)
- return;
-
- rd_kafka_dbg(rkt->rkt_rk, TOPIC, "STATE",
- "Topic %s changed state %s -> %s",
- rkt->rkt_topic->str,
- rd_kafka_topic_state_names[rkt->rkt_state],
- rd_kafka_topic_state_names[state]);
- rkt->rkt_state = state;
-}
-
-/**
- * Returns the name of a topic.
- * NOTE:
- * The topic Kafka String representation is crafted with an extra byte
- * at the end for the Nul that is not included in the length, this way
- * we can use the topic's String directly.
- * This is not true for Kafka Strings read from the network.
- */
-const char *rd_kafka_topic_name (const rd_kafka_topic_t *app_rkt) {
- const rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
- return rkt->rkt_topic->str;
-}
-
-
-
-
-
-/**
- * @brief Update the leader for a topic+partition.
- * @returns 1 if the leader was changed, else 0, or -1 if leader is unknown.
- *
- * @locks rd_kafka_topic_wrlock(rkt) and rd_kafka_toppar_lock(rktp)
- * @locality any
- */
-int rd_kafka_toppar_leader_update (rd_kafka_toppar_t *rktp,
- int32_t leader_id, rd_kafka_broker_t *rkb) {
-
- rktp->rktp_leader_id = leader_id;
- if (rktp->rktp_leader_id != leader_id) {
- rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPICUPD",
- "Topic %s [%"PRId32"] migrated from "
- "leader %"PRId32" to %"PRId32,
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition,
- rktp->rktp_leader_id, leader_id);
- rktp->rktp_leader_id = leader_id;
- }
-
- if (!rkb) {
- int had_leader = rktp->rktp_leader ? 1 : 0;
-
- rd_kafka_toppar_broker_delegate(rktp, NULL, 0);
-
- return had_leader ? -1 : 0;
- }
-
-
- if (rktp->rktp_leader) {
- if (rktp->rktp_leader == rkb) {
- /* No change in broker */
- return 0;
- }
-
- rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPICUPD",
- "Topic %s [%"PRId32"] migrated from "
- "broker %"PRId32" to %"PRId32,
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition,
- rktp->rktp_leader->rkb_nodeid, rkb->rkb_nodeid);
- }
-
- rd_kafka_toppar_broker_delegate(rktp, rkb, 0);
-
- return 1;
-}
-
-
-static int rd_kafka_toppar_leader_update2 (rd_kafka_itopic_t *rkt,
- int32_t partition,
- int32_t leader_id,
- rd_kafka_broker_t *rkb) {
- rd_kafka_toppar_t *rktp;
- shptr_rd_kafka_toppar_t *s_rktp;
- int r;
-
- s_rktp = rd_kafka_toppar_get(rkt, partition, 0);
- if (unlikely(!s_rktp)) {
- /* Have only seen this in issue #132.
- * Probably caused by corrupt broker state. */
- rd_kafka_log(rkt->rkt_rk, LOG_WARNING, "LEADER",
- "%s [%"PRId32"] is unknown "
- "(partition_cnt %i)",
- rkt->rkt_topic->str, partition,
- rkt->rkt_partition_cnt);
- return -1;
- }
-
- rktp = rd_kafka_toppar_s2i(s_rktp);
-
- rd_kafka_toppar_lock(rktp);
- r = rd_kafka_toppar_leader_update(rktp, leader_id, rkb);
- rd_kafka_toppar_unlock(rktp);
-
- rd_kafka_toppar_destroy(s_rktp); /* from get() */
-
- return r;
-}
-
-
-/**
- * Update the number of partitions for a topic and takes according actions.
- * Returns 1 if the partition count changed, else 0.
- * NOTE: rd_kafka_topic_wrlock(rkt) MUST be held.
- */
-static int rd_kafka_topic_partition_cnt_update (rd_kafka_itopic_t *rkt,
- int32_t partition_cnt) {
- rd_kafka_t *rk = rkt->rkt_rk;
- shptr_rd_kafka_toppar_t **rktps;
- shptr_rd_kafka_toppar_t *rktp_ua;
- shptr_rd_kafka_toppar_t *s_rktp;
- rd_kafka_toppar_t *rktp;
- rd_kafka_msgq_t tmpq = RD_KAFKA_MSGQ_INITIALIZER(tmpq);
- int32_t i;
-
- if (likely(rkt->rkt_partition_cnt == partition_cnt))
- return 0; /* No change in partition count */
-
- if (unlikely(rkt->rkt_partition_cnt != 0 &&
- !rd_kafka_terminating(rkt->rkt_rk)))
- rd_kafka_log(rk, LOG_NOTICE, "PARTCNT",
- "Topic %s partition count changed "
- "from %"PRId32" to %"PRId32,
- rkt->rkt_topic->str,
- rkt->rkt_partition_cnt, partition_cnt);
- else
- rd_kafka_dbg(rk, TOPIC, "PARTCNT",
- "Topic %s partition count changed "
- "from %"PRId32" to %"PRId32,
- rkt->rkt_topic->str,
- rkt->rkt_partition_cnt, partition_cnt);
-
-
- /* Create and assign new partition list */
- if (partition_cnt > 0)
- rktps = rd_calloc(partition_cnt, sizeof(*rktps));
- else
- rktps = NULL;
-
- for (i = 0 ; i < partition_cnt ; i++) {
- if (i >= rkt->rkt_partition_cnt) {
- /* New partition. Check if its in the list of
- * desired partitions first. */
-
- s_rktp = rd_kafka_toppar_desired_get(rkt, i);
-
- rktp = s_rktp ? rd_kafka_toppar_s2i(s_rktp) : NULL;
- if (rktp) {
- rd_kafka_toppar_lock(rktp);
- rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_UNKNOWN;
-
- /* Remove from desp list since the
- * partition is now known. */
- rd_kafka_toppar_desired_unlink(rktp);
- rd_kafka_toppar_unlock(rktp);
- } else
- s_rktp = rd_kafka_toppar_new(rkt, i);
- rktps[i] = s_rktp;
- } else {
- /* Existing partition, grab our own reference. */
- rktps[i] = rd_kafka_toppar_keep(
- rd_kafka_toppar_s2i(rkt->rkt_p[i]));
- /* Loose previous ref */
- rd_kafka_toppar_destroy(rkt->rkt_p[i]);
- }
- }
-
- rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);
-
- /* Propagate notexist errors for desired partitions */
- RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) {
- rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED",
- "%s [%"PRId32"]: "
- "desired partition does not exist in cluster",
- rkt->rkt_topic->str,
- rd_kafka_toppar_s2i(s_rktp)->rktp_partition);
- rd_kafka_toppar_enq_error(rd_kafka_toppar_s2i(s_rktp),
- RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
- }
-
- /* Remove excessive partitions */
- for (i = partition_cnt ; i < rkt->rkt_partition_cnt ; i++) {
- s_rktp = rkt->rkt_p[i];
- rktp = rd_kafka_toppar_s2i(s_rktp);
-
- rd_kafka_dbg(rkt->rkt_rk, TOPIC, "REMOVE",
- "%s [%"PRId32"] no longer reported in metadata",
- rkt->rkt_topic->str, rktp->rktp_partition);
-
- rd_kafka_toppar_lock(rktp);
-
- if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED) {
- rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESIRED",
- "Topic %s [%"PRId32"] is desired "
- "but no longer known: "
- "moving back on desired list",
- rkt->rkt_topic->str, rktp->rktp_partition);
-
- /* If this is a desired partition move it back on to
- * the desired list since partition is no longer known*/
- rd_kafka_assert(rkt->rkt_rk,
- !(rktp->rktp_flags &
- RD_KAFKA_TOPPAR_F_UNKNOWN));
- rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;
- rd_kafka_toppar_desired_link(rktp);
-
- if (!rd_kafka_terminating(rkt->rkt_rk))
- rd_kafka_toppar_enq_error(
- rktp,
- RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
-
- rd_kafka_toppar_broker_delegate(rktp, NULL, 0);
-
- } else {
- /* Tell handling broker to let go of the toppar */
- rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_REMOVE;
- rd_kafka_toppar_broker_leave_for_remove(rktp);
- }
-
- rd_kafka_toppar_unlock(rktp);
-
- rd_kafka_toppar_destroy(s_rktp);
- }
-
- if (likely(rktp_ua != NULL)) {
- /* Move messages from removed partitions to UA for
- * further processing. */
- rktp = rd_kafka_toppar_s2i(rktp_ua);
-
- // FIXME: tmpq not used
- if (rd_kafka_msgq_len(&tmpq) > 0) {
- rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPPARMOVE",
- "Moving %d messages (%zd bytes) from "
- "%d removed partitions to UA partition",
- rd_kafka_msgq_len(&tmpq),
- rd_kafka_msgq_size(&tmpq),
- i - partition_cnt);
-
-
- rd_kafka_toppar_lock(rktp);
- rd_kafka_msgq_concat(&rktp->rktp_msgq, &tmpq);
- rd_kafka_toppar_unlock(rktp);
- }
-
- rd_kafka_toppar_destroy(rktp_ua); /* .._get() above */
- } else {
- /* No UA, fail messages from removed partitions. */
- if (rd_kafka_msgq_len(&tmpq) > 0) {
- rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPPARMOVE",
- "Failing %d messages (%zd bytes) from "
- "%d removed partitions",
- rd_kafka_msgq_len(&tmpq),
- rd_kafka_msgq_size(&tmpq),
- i - partition_cnt);
-
- rd_kafka_dr_msgq(rkt, &tmpq,
- RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
- }
- }
-
- if (rkt->rkt_p)
- rd_free(rkt->rkt_p);
-
- rkt->rkt_p = rktps;
-
- rkt->rkt_partition_cnt = partition_cnt;
-
- return 1;
-}
-
-
-
-/**
- * Topic 'rkt' does not exist: propagate to interested parties.
- * The topic's state must have been set to NOTEXISTS and
- * rd_kafka_topic_partition_cnt_update() must have been called prior to
- * calling this function.
- *
- * Locks: rd_kafka_topic_*lock() must be held.
- */
-static void rd_kafka_topic_propagate_notexists (rd_kafka_itopic_t *rkt,
- rd_kafka_resp_err_t err) {
- shptr_rd_kafka_toppar_t *s_rktp;
- int i;
-
- if (rkt->rkt_rk->rk_type != RD_KAFKA_CONSUMER)
- return;
-
-
- /* Notify consumers that the topic doesn't exist. */
- RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i)
- rd_kafka_toppar_enq_error(rd_kafka_toppar_s2i(s_rktp), err);
-}
-
-
-/**
- * Assign messages on the UA partition to available partitions.
- * Locks: rd_kafka_topic_*lock() must be held.
- */
-static void rd_kafka_topic_assign_uas (rd_kafka_itopic_t *rkt,
- rd_kafka_resp_err_t err) {
- rd_kafka_t *rk = rkt->rkt_rk;
- shptr_rd_kafka_toppar_t *s_rktp_ua;
- rd_kafka_toppar_t *rktp_ua;
- rd_kafka_msg_t *rkm, *tmp;
- rd_kafka_msgq_t uas = RD_KAFKA_MSGQ_INITIALIZER(uas);
- rd_kafka_msgq_t failed = RD_KAFKA_MSGQ_INITIALIZER(failed);
- int cnt;
-
- if (rkt->rkt_rk->rk_type != RD_KAFKA_PRODUCER)
- return;
-
- s_rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);
- if (unlikely(!s_rktp_ua)) {
- rd_kafka_dbg(rk, TOPIC, "ASSIGNUA",
- "No UnAssigned partition available for %s",
- rkt->rkt_topic->str);
- return;
- }
-
- rktp_ua = rd_kafka_toppar_s2i(s_rktp_ua);
-
- /* Assign all unassigned messages to new topics. */
- rd_kafka_dbg(rk, TOPIC, "PARTCNT",
- "Partitioning %i unassigned messages in topic %.*s to "
- "%"PRId32" partitions",
- rd_atomic32_get(&rktp_ua->rktp_msgq.rkmq_msg_cnt),
- RD_KAFKAP_STR_PR(rkt->rkt_topic),
- rkt->rkt_partition_cnt);
-
- rd_kafka_toppar_lock(rktp_ua);
- rd_kafka_msgq_move(&uas, &rktp_ua->rktp_msgq);
- cnt = rd_atomic32_get(&uas.rkmq_msg_cnt);
- rd_kafka_toppar_unlock(rktp_ua);
-
- TAILQ_FOREACH_SAFE(rkm, &uas.rkmq_msgs, rkm_link, tmp) {
- /* Fast-path for failing messages with forced partition */
- if (rkm->rkm_partition != RD_KAFKA_PARTITION_UA &&
- rkm->rkm_partition >= rkt->rkt_partition_cnt &&
- rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN) {
- rd_kafka_msgq_enq(&failed, rkm);
- continue;
- }
-
- if (unlikely(rd_kafka_msg_partitioner(rkt, rkm, 0) != 0)) {
- /* Desired partition not available */
- rd_kafka_msgq_enq(&failed, rkm);
- }
- }
-
- rd_kafka_dbg(rk, TOPIC, "UAS",
- "%i/%i messages were partitioned in topic %s",
- cnt - rd_atomic32_get(&failed.rkmq_msg_cnt),
- cnt, rkt->rkt_topic->str);
-
- if (rd_atomic32_get(&failed.rkmq_msg_cnt) > 0) {
- /* Fail the messages */
- rd_kafka_dbg(rk, TOPIC, "UAS",
- "%"PRId32"/%i messages failed partitioning "
- "in topic %s",
- rd_atomic32_get(&uas.rkmq_msg_cnt), cnt,
- rkt->rkt_topic->str);
- rd_kafka_dr_msgq(rkt, &failed,
- rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS ?
- err :
- RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
- }
-
- rd_kafka_toppar_destroy(s_rktp_ua); /* from get() */
-}
-
-
-/**
- * Received metadata request contained no information about topic 'rkt'
- * and thus indicates the topic is not available in the cluster.
- */
-void rd_kafka_topic_metadata_none (rd_kafka_itopic_t *rkt) {
- rd_kafka_topic_wrlock(rkt);
-
- if (unlikely(rd_atomic32_get(&rkt->rkt_rk->rk_terminate))) {
- /* Dont update metadata while terminating, do this
- * after acquiring lock for proper synchronisation */
- rd_kafka_topic_wrunlock(rkt);
- return;
- }
-
- rkt->rkt_ts_metadata = rd_clock();
-
- rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_NOTEXISTS);
-
- rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;
-
- /* Update number of partitions */
- rd_kafka_topic_partition_cnt_update(rkt, 0);
-
- /* Purge messages with forced partition */
- rd_kafka_topic_assign_uas(rkt, RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
-
- /* Propagate nonexistent topic info */
- rd_kafka_topic_propagate_notexists(rkt,
- RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
-
- rd_kafka_topic_wrunlock(rkt);
-}
-
-
-/**
- * @brief Update a topic from metadata.
- *
- * @param ts_age absolute age (timestamp) of metadata.
- * @returns 1 if the number of partitions changed, 0 if not, and -1 if the
- * topic is unknown.
-
- *
- * @locks rd_kafka*lock()
- */
-static int
-rd_kafka_topic_metadata_update (rd_kafka_itopic_t *rkt,
- const struct rd_kafka_metadata_topic *mdt,
- rd_ts_t ts_age) {
- rd_kafka_t *rk = rkt->rkt_rk;
- int upd = 0;
- int j;
- rd_kafka_broker_t **partbrokers;
- int leader_cnt = 0;
- int old_state;
-
- if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR)
- rd_kafka_dbg(rk, TOPIC|RD_KAFKA_DBG_METADATA, "METADATA",
- "Error in metadata reply for "
- "topic %s (PartCnt %i): %s",
- rkt->rkt_topic->str, mdt->partition_cnt,
- rd_kafka_err2str(mdt->err));
-
- if (unlikely(rd_kafka_terminating(rk))) {
- /* Dont update metadata while terminating, do this
- * after acquiring lock for proper synchronisation */
- return -1;
- }
-
- /* Look up brokers before acquiring rkt lock to preserve lock order */
- partbrokers = rd_alloca(mdt->partition_cnt * sizeof(*partbrokers));
-
- for (j = 0 ; j < mdt->partition_cnt ; j++) {
- if (mdt->partitions[j].leader == -1) {
- partbrokers[j] = NULL;
- continue;
- }
-
- partbrokers[j] =
- rd_kafka_broker_find_by_nodeid(rk,
- mdt->partitions[j].
- leader);
- }
-
-
- rd_kafka_topic_wrlock(rkt);
-
- old_state = rkt->rkt_state;
- rkt->rkt_ts_metadata = ts_age;
-
- /* Set topic state */
- if (mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART ||
- mdt->err == RD_KAFKA_RESP_ERR_UNKNOWN/*auto.create.topics fails*/||
- mdt->err == RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION/*invalid topic*/)
- rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_NOTEXISTS);
- else if (mdt->partition_cnt > 0)
- rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_EXISTS);
-
- /* Update number of partitions, but not if there are
- * (possibly intermittent) errors (e.g., "Leader not available"). */
- if (mdt->err == RD_KAFKA_RESP_ERR_NO_ERROR)
- upd += rd_kafka_topic_partition_cnt_update(rkt,
- mdt->partition_cnt);
-
- /* Update leader for each partition */
- for (j = 0 ; j < mdt->partition_cnt ; j++) {
- int r;
- rd_kafka_broker_t *leader;
-
- rd_kafka_dbg(rk, TOPIC|RD_KAFKA_DBG_METADATA, "METADATA",
- " Topic %s partition %i Leader %"PRId32,
- rkt->rkt_topic->str,
- mdt->partitions[j].id,
- mdt->partitions[j].leader);
-
- leader = partbrokers[j];
- partbrokers[j] = NULL;
-
- /* Update leader for partition */
- r = rd_kafka_toppar_leader_update2(rkt,
- mdt->partitions[j].id,
- mdt->partitions[j].leader,
- leader);
-
- upd += (r != 0 ? 1 : 0);
-
- if (leader) {
- if (r != -1)
- leader_cnt++;
- /* Drop reference to broker (from find()) */
- rd_kafka_broker_destroy(leader);
- }
- }
-
- /* If all partitions have leaders we can turn off fast leader query. */
- if (mdt->partition_cnt > 0 && leader_cnt == mdt->partition_cnt)
- rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;
-
- if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR && rkt->rkt_partition_cnt) {
- /* (Possibly intermittent) topic-wide error:
- * remove leaders for partitions */
-
- for (j = 0 ; j < rkt->rkt_partition_cnt ; j++) {
- rd_kafka_toppar_t *rktp;
- if (!rkt->rkt_p[j])
- continue;
-
- rktp = rd_kafka_toppar_s2i(rkt->rkt_p[j]);
- rd_kafka_toppar_lock(rktp);
- rd_kafka_toppar_broker_delegate(rktp, NULL, 0);
- rd_kafka_toppar_unlock(rktp);
- }
- }
-
- /* Try to assign unassigned messages to new partitions, or fail them */
- if (upd > 0 || rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
- rd_kafka_topic_assign_uas(rkt, mdt->err ?
- mdt->err :
- RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
-
- /* Trigger notexists propagation */
- if (old_state != (int)rkt->rkt_state &&
- rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
- rd_kafka_topic_propagate_notexists(
- rkt,
- mdt->err ? mdt->err : RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC);
-
- rd_kafka_topic_wrunlock(rkt);
-
- /* Loose broker references */
- for (j = 0 ; j < mdt->partition_cnt ; j++)
- if (partbrokers[j])
- rd_kafka_broker_destroy(partbrokers[j]);
-
-
- return upd;
-}
-
-/**
- * @brief Update topic by metadata, if topic is locally known.
- * @sa rd_kafka_topic_metadata_update()
- * @locks none
- */
-int
-rd_kafka_topic_metadata_update2 (rd_kafka_broker_t *rkb,
- const struct rd_kafka_metadata_topic *mdt) {
- rd_kafka_itopic_t *rkt;
- shptr_rd_kafka_itopic_t *s_rkt;
- int r;
-
- rd_kafka_wrlock(rkb->rkb_rk);
- if (!(s_rkt = rd_kafka_topic_find(rkb->rkb_rk,
- mdt->topic, 0/*!lock*/))) {
- rd_kafka_wrunlock(rkb->rkb_rk);
- return -1; /* Ignore topics that we dont have locally. */
- }
-
- rkt = rd_kafka_topic_s2i(s_rkt);
-
- r = rd_kafka_topic_metadata_update(rkt, mdt, rd_clock());
-
- rd_kafka_wrunlock(rkb->rkb_rk);
-
- rd_kafka_topic_destroy0(s_rkt); /* from find() */
-
- return r;
-}
-
-
-
-/**
- * @returns a list of all partitions (s_rktp's) for a topic.
- * @remark rd_kafka_topic_*lock() MUST be held.
- */
-static rd_list_t *rd_kafka_topic_get_all_partitions (rd_kafka_itopic_t *rkt) {
- rd_list_t *list;
- shptr_rd_kafka_toppar_t *s_rktp;
- int i;
-
- list = rd_list_new(rkt->rkt_partition_cnt +
- rd_list_cnt(&rkt->rkt_desp) + 1/*ua*/, NULL);
-
- for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
- rd_list_add(list, rd_kafka_toppar_keep(
- rd_kafka_toppar_s2i(rkt->rkt_p[i])));
-
- RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i)
- rd_list_add(list, rd_kafka_toppar_keep(
- rd_kafka_toppar_s2i(s_rktp)));
-
- if (rkt->rkt_ua)
- rd_list_add(list, rd_kafka_toppar_keep(
- rd_kafka_toppar_s2i(rkt->rkt_ua)));
-
- return list;
-}
-
-
-
-
-/**
- * Remove all partitions from a topic, including the ua.
- * Must only be called during rd_kafka_t termination.
- *
- * Locality: main thread
- */
-void rd_kafka_topic_partitions_remove (rd_kafka_itopic_t *rkt) {
- shptr_rd_kafka_toppar_t *s_rktp;
- shptr_rd_kafka_itopic_t *s_rkt;
- rd_list_t *partitions;
- int i;
-
- /* Purge messages for all partitions outside the topic_wrlock since
- * a message can hold a reference to the topic_t and thus
- * would trigger a recursive lock dead-lock. */
- rd_kafka_topic_rdlock(rkt);
- partitions = rd_kafka_topic_get_all_partitions(rkt);
- rd_kafka_topic_rdunlock(rkt);
-
- RD_LIST_FOREACH(s_rktp, partitions, i) {
- rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
-
- rd_kafka_toppar_lock(rktp);
- rd_kafka_msgq_purge(rkt->rkt_rk, &rktp->rktp_msgq);
- rd_kafka_toppar_purge_queues(rktp);
- rd_kafka_toppar_unlock(rktp);
-
- rd_kafka_toppar_destroy(s_rktp);
- }
- rd_list_destroy(partitions);
-
- s_rkt = rd_kafka_topic_keep(rkt);
- rd_kafka_topic_wrlock(rkt);
-
- /* Setting the partition count to 0 moves all partitions to
- * the desired list (rktp_desp). */
- rd_kafka_topic_partition_cnt_update(rkt, 0);
-
- /* Now clean out the desired partitions list.
- * Use reverse traversal to avoid excessive memory shuffling
- * in rd_list_remove() */
- RD_LIST_FOREACH_REVERSE(s_rktp, &rkt->rkt_desp, i) {
- rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
- /* Our reference */
- shptr_rd_kafka_toppar_t *s_rktp2 = rd_kafka_toppar_keep(rktp);
- rd_kafka_toppar_lock(rktp);
- rd_kafka_toppar_desired_del(rktp);
- rd_kafka_toppar_unlock(rktp);
- rd_kafka_toppar_destroy(s_rktp2);
- }
-
- rd_kafka_assert(rkt->rkt_rk, rkt->rkt_partition_cnt == 0);
-
- if (rkt->rkt_p)
- rd_free(rkt->rkt_p);
-
- rkt->rkt_p = NULL;
- rkt->rkt_partition_cnt = 0;
-
- if ((s_rktp = rkt->rkt_ua)) {
- rkt->rkt_ua = NULL;
- rd_kafka_toppar_destroy(s_rktp);
- }
-
- rd_kafka_topic_wrunlock(rkt);
-
- rd_kafka_topic_destroy0(s_rkt);
-}
-
-
-
-/**
- * Scan all topics and partitions for:
- * - timed out messages.
- * - topics that needs to be created on the broker.
- * - topics who's metadata is too old.
- */
-int rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now) {
- rd_kafka_itopic_t *rkt;
- rd_kafka_toppar_t *rktp;
- shptr_rd_kafka_toppar_t *s_rktp;
- int totcnt = 0;
- rd_list_t query_topics;
-
- rd_list_init(&query_topics, 0, rd_free);
-
- rd_kafka_rdlock(rk);
- TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
- int p;
- int cnt = 0, tpcnt = 0;
- rd_kafka_msgq_t timedout;
- int query_this = 0;
-
- rd_kafka_msgq_init(&timedout);
-
- rd_kafka_topic_wrlock(rkt);
-
- /* Check if metadata information has timed out. */
- if (rkt->rkt_state != RD_KAFKA_TOPIC_S_UNKNOWN &&
- !rd_kafka_metadata_cache_topic_get(
- rk, rkt->rkt_topic->str, 1/*only valid*/)) {
- rd_kafka_dbg(rk, TOPIC, "NOINFO",
- "Topic %s metadata information timed out "
- "(%"PRId64"ms old)",
- rkt->rkt_topic->str,
- (rd_clock() - rkt->rkt_ts_metadata)/1000);
- rd_kafka_topic_set_state(rkt, RD_KAFKA_TOPIC_S_UNKNOWN);
-
- query_this = 1;
- }
-
- /* Just need a read-lock from here on. */
- rd_kafka_topic_wrunlock(rkt);
- rd_kafka_topic_rdlock(rkt);
-
- if (rkt->rkt_partition_cnt == 0) {
- /* If this partition is unknown by brokers try
- * to create it by sending a topic-specific
- * metadata request.
- * This requires "auto.create.topics.enable=true"
- * on the brokers. */
- rd_kafka_dbg(rk, TOPIC, "NOINFO",
- "Topic %s partition count is zero: "
- "should refresh metadata",
- rkt->rkt_topic->str);
-
- query_this = 1;
- }
-
- for (p = RD_KAFKA_PARTITION_UA ;
- p < rkt->rkt_partition_cnt ; p++) {
- int did_tmout = 0;
-
- if (!(s_rktp = rd_kafka_toppar_get(rkt, p, 0)))
- continue;
-
- rktp = rd_kafka_toppar_s2i(s_rktp);
- rd_kafka_toppar_lock(rktp);
-
- /* Check that partition has a leader that is up,
- * else add topic to query list. */
- if (p != RD_KAFKA_PARTITION_UA &&
- (!rktp->rktp_leader ||
- rktp->rktp_leader->rkb_source ==
- RD_KAFKA_INTERNAL ||
- rd_kafka_broker_get_state(rktp->rktp_leader) <
- RD_KAFKA_BROKER_STATE_UP)) {
- rd_kafka_dbg(rk, TOPIC, "QRYLEADER",
- "Topic %s [%"PRId32"]: "
- "leader is %s: re-query",
- rkt->rkt_topic->str,
- rktp->rktp_partition,
- !rktp->rktp_leader ?
- "unavailable" :
- (rktp->rktp_leader->rkb_source ==
- RD_KAFKA_INTERNAL ? "internal":
- "down"));
- query_this = 1;
- }
-
- /* Scan toppar's message queues for timeouts */
- if (rd_kafka_msgq_age_scan(&rktp->rktp_xmit_msgq,
- &timedout, now) > 0)
- did_tmout = 1;
-
- if (rd_kafka_msgq_age_scan(&rktp->rktp_msgq,
- &timedout, now) > 0)
- did_tmout = 1;
-
- tpcnt += did_tmout;
-
- rd_kafka_toppar_unlock(rktp);
- rd_kafka_toppar_destroy(s_rktp);
- }
-
- rd_kafka_topic_rdunlock(rkt);
-
- if ((cnt = rd_atomic32_get(&timedout.rkmq_msg_cnt)) > 0) {
- totcnt += cnt;
- rd_kafka_dbg(rk, MSG, "TIMEOUT",
- "%s: %"PRId32" message(s) "
- "from %i toppar(s) timed out",
- rkt->rkt_topic->str, cnt, tpcnt);
- rd_kafka_dr_msgq(rkt, &timedout,
- RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);
- }
-
- /* Need to re-query this topic's leader. */
- if (query_this &&
- !rd_list_find(&query_topics, rkt->rkt_topic->str,
- (void *)strcmp))
- rd_list_add(&query_topics,
- rd_strdup(rkt->rkt_topic->str));
-
- }
- rd_kafka_rdunlock(rk);
-
- if (!rd_list_empty(&query_topics))
- rd_kafka_metadata_refresh_topics(rk, NULL, &query_topics,
- 1/*force even if cached
- * info exists*/,
- "refresh unavailable topics");
- rd_list_destroy(&query_topics);
-
- return totcnt;
-}
-
-
-/**
- * Locks: rd_kafka_topic_*lock() must be held.
- */
-int rd_kafka_topic_partition_available (const rd_kafka_topic_t *app_rkt,
- int32_t partition) {
- int avail;
- shptr_rd_kafka_toppar_t *s_rktp;
- rd_kafka_toppar_t *rktp;
- rd_kafka_broker_t *rkb;
-
- s_rktp = rd_kafka_toppar_get(rd_kafka_topic_a2i(app_rkt),
- partition, 0/*no ua-on-miss*/);
- if (unlikely(!s_rktp))
- return 0;
-
- rktp = rd_kafka_toppar_s2i(s_rktp);
- rkb = rd_kafka_toppar_leader(rktp, 1/*proper broker*/);
- avail = rkb ? 1 : 0;
- if (rkb)
- rd_kafka_broker_destroy(rkb);
- rd_kafka_toppar_destroy(s_rktp);
- return avail;
-}
-
-
-void *rd_kafka_topic_opaque (const rd_kafka_topic_t *app_rkt) {
- return rd_kafka_topic_a2i(app_rkt)->rkt_conf.opaque;
-}
-
-int rd_kafka_topic_info_cmp (const void *_a, const void *_b) {
- const rd_kafka_topic_info_t *a = _a, *b = _b;
- int r;
-
- if ((r = strcmp(a->topic, b->topic)))
- return r;
-
- return a->partition_cnt - b->partition_cnt;
-}
-
-
-/**
- * Allocate new topic_info.
- * \p topic is copied.
- */
-rd_kafka_topic_info_t *rd_kafka_topic_info_new (const char *topic,
- int partition_cnt) {
- rd_kafka_topic_info_t *ti;
- size_t tlen = strlen(topic) + 1;
-
- /* Allocate space for the topic along with the struct */
- ti = rd_malloc(sizeof(*ti) + tlen);
- ti->topic = (char *)(ti+1);
- memcpy((char *)ti->topic, topic, tlen);
- ti->partition_cnt = partition_cnt;
-
- return ti;
-}
-
-/**
- * Destroy/free topic_info
- */
-void rd_kafka_topic_info_destroy (rd_kafka_topic_info_t *ti) {
- rd_free(ti);
-}
-
-
-/**
- * @brief Match \p topic to \p pattern.
- *
- * If pattern begins with "^" it is considered a regexp,
- * otherwise a simple string comparison is performed.
- *
- * @returns 1 on match, else 0.
- */
-int rd_kafka_topic_match (rd_kafka_t *rk, const char *pattern,
- const char *topic) {
- char errstr[128];
-
- if (*pattern == '^') {
- int r = rd_regex_match(pattern, topic, errstr, sizeof(errstr));
- if (unlikely(r == -1))
- rd_kafka_dbg(rk, TOPIC, "TOPICREGEX",
- "Topic \"%s\" regex \"%s\" "
- "matching failed: %s",
- topic, pattern, errstr);
- return r == 1;
- } else
- return !strcmp(pattern, topic);
-}
-
-
-
-
-
-
-
-
-
-/**
- * Trigger broker metadata query for topic leader.
- * 'rkt' may be NULL to query for all topics.
- *
- * @locks none
- */
-void rd_kafka_topic_leader_query0 (rd_kafka_t *rk, rd_kafka_itopic_t *rkt,
- int do_rk_lock) {
- rd_list_t topics;
-
- rd_list_init(&topics, 1, rd_free);
- rd_list_add(&topics, rd_strdup(rkt->rkt_topic->str));
-
- rd_kafka_metadata_refresh_topics(rk, NULL, &topics,
- 0/*dont force*/, "leader query");
-
- if (rkt)
- rd_list_destroy(&topics);
-}
-
-
-
-/**
- * @brief Populate list \p topics with the topic names (strdupped char *) of
- * all locally known topics.
- *
- * @remark \p rk lock MUST NOT be held
- */
-void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics) {
- rd_kafka_itopic_t *rkt;
-
- rd_kafka_rdlock(rk);
- rd_list_grow(topics, rk->rk_topic_cnt);
- TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link)
- rd_list_add(topics, rd_strdup(rkt->rkt_topic->str));
- rd_kafka_rdunlock(rk);
-}