You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:15:03 UTC
[35/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade
to librdkafka 0.11.4
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka.c b/thirdparty/librdkafka-0.11.1/src/rdkafka.c
deleted file mode 100644
index 6867a6c..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka.c
+++ /dev/null
@@ -1,3392 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2013, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-
-#define _GNU_SOURCE
-#include <errno.h>
-#include <string.h>
-#include <stdarg.h>
-#include <signal.h>
-#include <stdlib.h>
-#include <sys/stat.h>
-
-#include "rdkafka_int.h"
-#include "rdkafka_msg.h"
-#include "rdkafka_broker.h"
-#include "rdkafka_topic.h"
-#include "rdkafka_partition.h"
-#include "rdkafka_offset.h"
-#include "rdkafka_transport.h"
-#include "rdkafka_cgrp.h"
-#include "rdkafka_assignor.h"
-#include "rdkafka_request.h"
-#include "rdkafka_event.h"
-#include "rdkafka_sasl.h"
-#include "rdkafka_interceptor.h"
-
-#include "rdtime.h"
-#include "crc32c.h"
-#include "rdunittest.h"
-
-#ifdef _MSC_VER
-#include <sys/types.h>
-#include <sys/timeb.h>
-#endif
-
-
-
-static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT;
-
-/**
- * @brief Global counter+lock for all active librdkafka instances
- */
-mtx_t rd_kafka_global_lock;
-int rd_kafka_global_cnt;
-
-
-/**
- * Last API error code, per thread.
- * Shared among all rd_kafka_t instances.
- */
-rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
-
-
-/**
- * Current number of threads created by rdkafka.
- * This is used in regression tests.
- */
-rd_atomic32_t rd_kafka_thread_cnt_curr;
-int rd_kafka_thread_cnt (void) {
-#if ENABLE_SHAREDPTR_DEBUG
- rd_shared_ptrs_dump();
-#endif
-
- return rd_atomic32_get(&rd_kafka_thread_cnt_curr);
-}
-
-/**
- * Current thread's name (TLS)
- */
-char RD_TLS rd_kafka_thread_name[64] = "app";
-
-
-
-static void rd_kafka_global_init (void) {
-#if ENABLE_SHAREDPTR_DEBUG
- LIST_INIT(&rd_shared_ptr_debug_list);
- mtx_init(&rd_shared_ptr_debug_mtx, mtx_plain);
- atexit(rd_shared_ptrs_dump);
-#endif
- mtx_init(&rd_kafka_global_lock, mtx_plain);
-#if ENABLE_DEVEL
- rd_atomic32_init(&rd_kafka_op_cnt, 0);
-#endif
- crc32c_global_init();
-}
-
-/**
- * @returns the current number of active librdkafka instances
- */
-static int rd_kafka_global_cnt_get (void) {
- int r;
- mtx_lock(&rd_kafka_global_lock);
- r = rd_kafka_global_cnt;
- mtx_unlock(&rd_kafka_global_lock);
- return r;
-}
-
-
-/**
- * @brief Increase counter for active librdkafka instances.
- * If this is the first instance the global constructors will be called, if any.
- */
-static void rd_kafka_global_cnt_incr (void) {
- mtx_lock(&rd_kafka_global_lock);
- rd_kafka_global_cnt++;
- if (rd_kafka_global_cnt == 1) {
- rd_kafka_transport_init();
-#if WITH_SSL
- rd_kafka_transport_ssl_init();
-#endif
- rd_kafka_sasl_global_init();
- }
- mtx_unlock(&rd_kafka_global_lock);
-}
-
-/**
- * @brief Decrease counter for active librdkafka instances.
- * If this counter reaches 0 the global destructors will be called, if any.
- */
-static void rd_kafka_global_cnt_decr (void) {
- mtx_lock(&rd_kafka_global_lock);
- rd_kafka_assert(NULL, rd_kafka_global_cnt > 0);
- rd_kafka_global_cnt--;
- if (rd_kafka_global_cnt == 0) {
- rd_kafka_sasl_global_term();
-#if WITH_SSL
- rd_kafka_transport_ssl_term();
-#endif
- }
- mtx_unlock(&rd_kafka_global_lock);
-}
-
-
-/**
- * Wait for all rd_kafka_t objects to be destroyed.
- * Returns 0 if all kafka objects are now destroyed, or -1 if the
- * timeout was reached.
- */
-int rd_kafka_wait_destroyed (int timeout_ms) {
- rd_ts_t timeout = rd_clock() + (timeout_ms * 1000);
-
- while (rd_kafka_thread_cnt() > 0 ||
- rd_kafka_global_cnt_get() > 0) {
- if (rd_clock() >= timeout) {
- rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
- ETIMEDOUT);
-#if ENABLE_SHAREDPTR_DEBUG
- rd_shared_ptrs_dump();
-#endif
- return -1;
- }
- rd_usleep(25000, NULL); /* 25ms */
- }
-
- return 0;
-}
-
-static void rd_kafka_log_buf (const rd_kafka_conf_t *conf,
- const rd_kafka_t *rk, int level, const char *fac,
- const char *buf) {
- if (level > conf->log_level)
- return;
- else if (rk && conf->log_queue) {
- rd_kafka_op_t *rko;
-
- if (!rk->rk_logq)
- return; /* Terminating */
-
- rko = rd_kafka_op_new(RD_KAFKA_OP_LOG);
- rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_MEDIUM);
- rko->rko_u.log.level = level;
- strncpy(rko->rko_u.log.fac, fac,
- sizeof(rko->rko_u.log.fac) - 1);
- rko->rko_u.log.str = rd_strdup(buf);
- rd_kafka_q_enq(rk->rk_logq, rko);
-
- } else if (conf->log_cb) {
- conf->log_cb(rk, level, fac, buf);
- }
-}
-
-/**
- * @brief Logger
- *
- * @remark conf must be set, but rk may be NULL
- */
-void rd_kafka_log0 (const rd_kafka_conf_t *conf,
- const rd_kafka_t *rk,
- const char *extra, int level,
- const char *fac, const char *fmt, ...) {
- char buf[2048];
- va_list ap;
- unsigned int elen = 0;
- unsigned int of = 0;
-
- if (level > conf->log_level)
- return;
-
- if (conf->log_thread_name) {
- elen = rd_snprintf(buf, sizeof(buf), "[thrd:%s]: ",
- rd_kafka_thread_name);
- if (unlikely(elen >= sizeof(buf)))
- elen = sizeof(buf);
- of = elen;
- }
-
- if (extra) {
- elen = rd_snprintf(buf+of, sizeof(buf)-of, "%s: ", extra);
- if (unlikely(elen >= sizeof(buf)-of))
- elen = sizeof(buf)-of;
- of += elen;
- }
-
- va_start(ap, fmt);
- rd_vsnprintf(buf+of, sizeof(buf)-of, fmt, ap);
- va_end(ap);
-
- rd_kafka_log_buf(conf, rk, level, fac, buf);
-}
-
-
-
-void rd_kafka_log_print(const rd_kafka_t *rk, int level,
- const char *fac, const char *buf) {
- int secs, msecs;
- struct timeval tv;
- rd_gettimeofday(&tv, NULL);
- secs = (int)tv.tv_sec;
- msecs = (int)(tv.tv_usec / 1000);
- fprintf(stderr, "%%%i|%u.%03u|%s|%s| %s\n",
- level, secs, msecs,
- fac, rk ? rk->rk_name : "", buf);
-}
-
-#ifndef _MSC_VER
-void rd_kafka_log_syslog (const rd_kafka_t *rk, int level,
- const char *fac, const char *buf) {
- static int initialized = 0;
-
- if (!initialized)
- openlog("rdkafka", LOG_PID|LOG_CONS, LOG_USER);
-
- syslog(level, "%s: %s: %s", fac, rk ? rk->rk_name : "", buf);
-}
-#endif
-
-void rd_kafka_set_logger (rd_kafka_t *rk,
- void (*func) (const rd_kafka_t *rk, int level,
- const char *fac, const char *buf)) {
- rk->rk_conf.log_cb = func;
-}
-
-void rd_kafka_set_log_level (rd_kafka_t *rk, int level) {
- rk->rk_conf.log_level = level;
-}
-
-
-
-
-
-
-static const char *rd_kafka_type2str (rd_kafka_type_t type) {
- static const char *types[] = {
- [RD_KAFKA_PRODUCER] = "producer",
- [RD_KAFKA_CONSUMER] = "consumer",
- };
- return types[type];
-}
-
-#define _ERR_DESC(ENUM,DESC) \
- [ENUM - RD_KAFKA_RESP_ERR__BEGIN] = { ENUM, # ENUM + 18/*pfx*/, DESC }
-
-static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
- _ERR_DESC(RD_KAFKA_RESP_ERR__BEGIN, NULL),
- _ERR_DESC(RD_KAFKA_RESP_ERR__BAD_MSG,
- "Local: Bad message format"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__BAD_COMPRESSION,
- "Local: Invalid compressed data"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__DESTROY,
- "Local: Broker handle destroyed"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__FAIL,
- "Local: Communication failure with broker"), //FIXME: too specific
- _ERR_DESC(RD_KAFKA_RESP_ERR__TRANSPORT,
- "Local: Broker transport failure"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
- "Local: Critical system resource failure"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__RESOLVE,
- "Local: Host resolution failure"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
- "Local: Message timed out"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__PARTITION_EOF,
- "Broker: No more messages"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
- "Local: Unknown partition"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__FS,
- "Local: File or filesystem error"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC,
- "Local: Unknown topic"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
- "Local: All broker connections are down"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_ARG,
- "Local: Invalid argument or configuration"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT,
- "Local: Timed out"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__QUEUE_FULL,
- "Local: Queue full"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__ISR_INSUFF,
- "Local: ISR count insufficient"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__NODE_UPDATE,
- "Local: Broker node update"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__SSL,
- "Local: SSL error"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_COORD,
- "Local: Waiting for coordinator"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP,
- "Local: Unknown group"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__IN_PROGRESS,
- "Local: Operation in progress"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS,
- "Local: Previous operation in progress"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION,
- "Local: Existing subscription"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
- "Local: Assign partitions"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
- "Local: Revoke partitions"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__CONFLICT,
- "Local: Conflicting use"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__STATE,
- "Local: Erroneous state"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL,
- "Local: Unknown protocol"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED,
- "Local: Not implemented"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__AUTHENTICATION,
- "Local: Authentication failure"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__NO_OFFSET,
- "Local: No offset stored"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__OUTDATED,
- "Local: Outdated"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,
- "Local: Timed out in queue"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
- "Local: Required feature not supported by broker"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_CACHE,
- "Local: Awaiting cache update"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__INTR,
- "Local: Operation interrupted"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_SERIALIZATION,
- "Local: Key serialization error"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION,
- "Local: Value serialization error"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION,
- "Local: Key deserialization error"),
- _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION,
- "Local: Value deserialization error"),
-
- _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN,
- "Unknown broker error"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_NO_ERROR,
- "Success"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE,
- "Broker: Offset out of range"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG,
- "Broker: Invalid message"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
- "Broker: Unknown topic or partition"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
- "Broker: Invalid message size"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE,
- "Broker: Leader not available"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
- "Broker: Not leader for partition"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
- "Broker: Request timed out"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE,
- "Broker: Broker not available"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE,
- "Broker: Replica not available"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE,
- "Broker: Message size too large"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH,
- "Broker: StaleControllerEpochCode"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
- "Broker: Offset metadata string too large"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION,
- "Broker: Broker disconnected before response received"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS,
- "Broker: Group coordinator load in progress"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,
- "Broker: Group coordinator not available"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP,
- "Broker: Not coordinator for group"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION,
- "Broker: Invalid topic"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE,
- "Broker: Message batch larger than configured server "
- "segment size"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS,
- "Broker: Not enough in-sync replicas"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND,
- "Broker: Message(s) written to insufficient number of "
- "in-sync replicas"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS,
- "Broker: Invalid required acks value"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
- "Broker: Specified group generation id is not valid"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL,
- "Broker: Inconsistent group protocol"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_GROUP_ID,
- "Broker: Invalid group.id"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
- "Broker: Unknown member"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT,
- "Broker: Invalid session timeout"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
- "Broker: Group rebalance in progress"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
- "Broker: Commit offset data size is not valid"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
- "Broker: Topic authorization failed"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
- "Broker: Group authorization failed"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED,
- "Broker: Cluster authorization failed"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP,
- "Broker: Invalid timestamp"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM,
- "Broker: Unsupported SASL mechanism"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE,
- "Broker: Request not valid in current SASL state"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION,
- "Broker: API version not supported"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS,
- "Broker: Topic already exists"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PARTITIONS,
- "Broker: Invalid number of partitions"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR,
- "Broker: Invalid replication factor"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT,
- "Broker: Invalid replica assignment"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_CONFIG,
- "Broker: Configuration is invalid"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_CONTROLLER,
- "Broker: Not controller for cluster"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUEST,
- "Broker: Invalid request"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT,
- "Broker: Message format on broker does not support request"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_POLICY_VIOLATION,
- "Broker: Isolation policy volation"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
- "Broker: Broker received an out of order sequence number"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER,
- "Broker: Broker received a duplicate sequence number"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH,
- "Broker: Producer attempted an operation with an old epoch"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TXN_STATE,
- "Broker: Producer attempted a transactional operation in "
- "an invalid state"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
- "Broker: Producer attempted to use a producer id which is "
- "not currently assigned to its transactional id"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
- "Broker: Transaction timeout is larger than the maximum "
- "value allowed by the broker's max.transaction.timeout.ms"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
- "Broker: Producer attempted to update a transaction while "
- "another concurrent operation on the same transaction was "
- "ongoing"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED,
- "Broker: Indicates that the transaction coordinator sending "
- "a WriteTxnMarker is no longer the current coordinator for "
- "a given producer"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
- "Broker: Transactional Id authorization failed"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_SECURITY_DISABLED,
- "Broker: Security features are disabled"),
- _ERR_DESC(RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED,
- "Broker: Operation not attempted"),
-
- _ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)
-};
-
-
-void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs,
- size_t *cntp) {
- *errdescs = rd_kafka_err_descs;
- *cntp = RD_ARRAYSIZE(rd_kafka_err_descs);
-}
-
-
-const char *rd_kafka_err2str (rd_kafka_resp_err_t err) {
- static RD_TLS char ret[32];
- int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
-
- if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
- err >= RD_KAFKA_RESP_ERR_END_ALL ||
- !rd_kafka_err_descs[idx].desc)) {
- rd_snprintf(ret, sizeof(ret), "Err-%i?", err);
- return ret;
- }
-
- return rd_kafka_err_descs[idx].desc;
-}
-
-
-const char *rd_kafka_err2name (rd_kafka_resp_err_t err) {
- static RD_TLS char ret[32];
- int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
-
- if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
- err >= RD_KAFKA_RESP_ERR_END_ALL ||
- !rd_kafka_err_descs[idx].desc)) {
- rd_snprintf(ret, sizeof(ret), "ERR_%i?", err);
- return ret;
- }
-
- return rd_kafka_err_descs[idx].name;
-}
-
-
-rd_kafka_resp_err_t rd_kafka_last_error (void) {
- return rd_kafka_last_error_code;
-}
-
-
-rd_kafka_resp_err_t rd_kafka_errno2err (int errnox) {
- switch (errnox)
- {
- case EINVAL:
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
-
- case EBUSY:
- return RD_KAFKA_RESP_ERR__CONFLICT;
-
- case ENOENT:
- return RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
-
- case ESRCH:
- return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
-
- case ETIMEDOUT:
- return RD_KAFKA_RESP_ERR__TIMED_OUT;
-
- case EMSGSIZE:
- return RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
-
- case ENOBUFS:
- return RD_KAFKA_RESP_ERR__QUEUE_FULL;
-
- default:
- return RD_KAFKA_RESP_ERR__FAIL;
- }
-}
-
-
-
-/**
- * @brief Final destructor for rd_kafka_t, must only be called with refcnt 0.
- *
- * @locality application thread
- */
-void rd_kafka_destroy_final (rd_kafka_t *rk) {
-
- rd_kafka_assert(rk, rd_atomic32_get(&rk->rk_terminate) != 0);
-
- /* Synchronize state */
- rd_kafka_wrlock(rk);
- rd_kafka_wrunlock(rk);
-
- rd_kafka_assignors_term(rk);
-
- rd_kafka_metadata_cache_destroy(rk);
-
- rd_kafka_timers_destroy(&rk->rk_timers);
-
- rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying op queues");
-
- /* Destroy cgrp */
- if (rk->rk_cgrp) {
- rd_kafka_dbg(rk, GENERIC, "TERMINATE",
- "Destroying cgrp");
- /* Reset queue forwarding (rep -> cgrp) */
- rd_kafka_q_fwd_set(rk->rk_rep, NULL);
- rd_kafka_cgrp_destroy_final(rk->rk_cgrp);
- }
-
- /* Purge op-queues */
- rd_kafka_q_destroy(rk->rk_rep);
- rd_kafka_q_destroy(rk->rk_ops);
-
-#if WITH_SSL
- if (rk->rk_conf.ssl.ctx) {
- rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying SSL CTX");
- rd_kafka_transport_ssl_ctx_term(rk);
- }
-#endif
-
- /* It is not safe to log after this point. */
- rd_kafka_dbg(rk, GENERIC, "TERMINATE",
- "Termination done: freeing resources");
-
- if (rk->rk_logq) {
- rd_kafka_q_destroy(rk->rk_logq);
- rk->rk_logq = NULL;
- }
-
- if (rk->rk_type == RD_KAFKA_PRODUCER) {
- cnd_destroy(&rk->rk_curr_msgs.cnd);
- mtx_destroy(&rk->rk_curr_msgs.lock);
- }
-
- cnd_destroy(&rk->rk_broker_state_change_cnd);
- mtx_destroy(&rk->rk_broker_state_change_lock);
-
- if (rk->rk_full_metadata)
- rd_kafka_metadata_destroy(rk->rk_full_metadata);
- rd_kafkap_str_destroy(rk->rk_client_id);
- rd_kafkap_str_destroy(rk->rk_group_id);
- rd_kafkap_str_destroy(rk->rk_eos.TransactionalId);
- rd_kafka_anyconf_destroy(_RK_GLOBAL, &rk->rk_conf);
- rd_list_destroy(&rk->rk_broker_by_id);
-
- rd_kafkap_bytes_destroy((rd_kafkap_bytes_t *)rk->rk_null_bytes);
- rwlock_destroy(&rk->rk_lock);
-
- rd_free(rk);
- rd_kafka_global_cnt_decr();
-}
-
-
-static void rd_kafka_destroy_app (rd_kafka_t *rk, int blocking) {
- thrd_t thrd;
-#ifndef _MSC_VER
- int term_sig = rk->rk_conf.term_sig;
-#endif
- rd_kafka_dbg(rk, ALL, "DESTROY", "Terminating instance");
-
- /* The legacy/simple consumer lacks an API to close down the consumer*/
- if (rk->rk_cgrp) {
- rd_kafka_dbg(rk, GENERIC, "TERMINATE",
- "Closing consumer group");
- rd_kafka_consumer_close(rk);
- }
-
- rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Interrupting timers");
- rd_kafka_wrlock(rk);
- thrd = rk->rk_thread;
- rd_atomic32_add(&rk->rk_terminate, 1);
- rd_kafka_timers_interrupt(&rk->rk_timers);
- rd_kafka_wrunlock(rk);
-
- rd_kafka_dbg(rk, GENERIC, "TERMINATE",
- "Sending TERMINATE to main background thread");
- /* Send op to trigger queue/io wake-up.
- * The op itself is (likely) ignored by the receiver. */
- rd_kafka_q_enq(rk->rk_ops, rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
-
- rd_kafka_brokers_broadcast_state_change(rk);
-
-#ifndef _MSC_VER
- /* Interrupt main kafka thread to speed up termination. */
- if (term_sig) {
- rd_kafka_dbg(rk, GENERIC, "TERMINATE",
- "Sending thread kill signal %d", term_sig);
- pthread_kill(thrd, term_sig);
- }
-#endif
-
- if (!blocking)
- return; /* FIXME: thread resource leak */
-
- rd_kafka_dbg(rk, GENERIC, "TERMINATE",
- "Joining main background thread");
-
- if (thrd_join(thrd, NULL) != thrd_success)
- rd_kafka_assert(NULL, !*"failed to join main thread");
-
- rd_kafka_destroy_final(rk);
-}
-
-
-/* NOTE: Must only be called by application.
- * librdkafka itself must use rd_kafka_destroy0(). */
-void rd_kafka_destroy (rd_kafka_t *rk) {
- rd_kafka_destroy_app(rk, 1);
-}
-
-
-/**
- * Main destructor for rd_kafka_t
- *
- * Locality: rdkafka main thread or application thread during rd_kafka_new()
- */
-static void rd_kafka_destroy_internal (rd_kafka_t *rk) {
- rd_kafka_itopic_t *rkt, *rkt_tmp;
- rd_kafka_broker_t *rkb, *rkb_tmp;
- rd_list_t wait_thrds;
- thrd_t *thrd;
- int i;
-
- rd_kafka_dbg(rk, ALL, "DESTROY", "Destroy internal");
-
- /* Call on_destroy() interceptors */
- rd_kafka_interceptors_on_destroy(rk);
-
- /* Brokers pick up on rk_terminate automatically. */
-
- /* List of (broker) threads to join to synchronize termination */
- rd_list_init(&wait_thrds, rd_atomic32_get(&rk->rk_broker_cnt), NULL);
-
- rd_kafka_wrlock(rk);
-
- rd_kafka_dbg(rk, ALL, "DESTROY", "Removing all topics");
- /* Decommission all topics */
- TAILQ_FOREACH_SAFE(rkt, &rk->rk_topics, rkt_link, rkt_tmp) {
- rd_kafka_wrunlock(rk);
- rd_kafka_topic_partitions_remove(rkt);
- rd_kafka_wrlock(rk);
- }
-
- /* Decommission brokers.
- * Broker thread holds a refcount and detects when broker refcounts
- * reaches 1 and then decommissions itself. */
- TAILQ_FOREACH_SAFE(rkb, &rk->rk_brokers, rkb_link, rkb_tmp) {
- /* Add broker's thread to wait_thrds list for later joining */
- thrd = malloc(sizeof(*thrd));
- *thrd = rkb->rkb_thread;
- rd_list_add(&wait_thrds, thrd);
- rd_kafka_wrunlock(rk);
-
- /* Send op to trigger queue/io wake-up.
- * The op itself is (likely) ignored by the broker thread. */
- rd_kafka_q_enq(rkb->rkb_ops,
- rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
-
-#ifndef _MSC_VER
- /* Interrupt IO threads to speed up termination. */
- if (rk->rk_conf.term_sig)
- pthread_kill(rkb->rkb_thread, rk->rk_conf.term_sig);
-#endif
-
- rd_kafka_broker_destroy(rkb);
-
- rd_kafka_wrlock(rk);
- }
-
- if (rk->rk_clusterid) {
- rd_free(rk->rk_clusterid);
- rk->rk_clusterid = NULL;
- }
-
- rd_kafka_wrunlock(rk);
-
- rd_kafka_dbg(rk, GENERIC, "TERMINATE",
- "Purging reply queue");
-
- /* Purge op-queue */
- rd_kafka_q_disable(rk->rk_rep);
- rd_kafka_q_purge(rk->rk_rep);
-
- /* Loose our special reference to the internal broker. */
- mtx_lock(&rk->rk_internal_rkb_lock);
- if ((rkb = rk->rk_internal_rkb)) {
- rd_kafka_dbg(rk, GENERIC, "TERMINATE",
- "Decommissioning internal broker");
-
- /* Send op to trigger queue wake-up. */
- rd_kafka_q_enq(rkb->rkb_ops,
- rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
-
- rk->rk_internal_rkb = NULL;
- thrd = malloc(sizeof(*thrd));
- *thrd = rkb->rkb_thread;
- rd_list_add(&wait_thrds, thrd);
- }
- mtx_unlock(&rk->rk_internal_rkb_lock);
- if (rkb)
- rd_kafka_broker_destroy(rkb);
-
-
- rd_kafka_dbg(rk, GENERIC, "TERMINATE",
- "Join %d broker thread(s)", rd_list_cnt(&wait_thrds));
-
- /* Join broker threads */
- RD_LIST_FOREACH(thrd, &wait_thrds, i) {
- if (thrd_join(*thrd, NULL) != thrd_success)
- ;
- free(thrd);
- }
-
- rd_list_destroy(&wait_thrds);
-}
-
-
-/* Stats buffer printf */
-#define _st_printf(...) do { \
- ssize_t r; \
- ssize_t rem = size-of; \
- r = rd_snprintf(buf+of, rem, __VA_ARGS__); \
- if (r >= rem) { \
- size *= 2; \
- rem = size-of; \
- buf = rd_realloc(buf, size); \
- r = rd_snprintf(buf+of, rem, __VA_ARGS__); \
- } \
- of += r; \
- } while (0)
-
-/**
- * Emit stats for toppar
- */
-static RD_INLINE void rd_kafka_stats_emit_toppar (char **bufp, size_t *sizep,
- size_t *ofp,
- rd_kafka_toppar_t *rktp,
- int first) {
- char *buf = *bufp;
- size_t size = *sizep;
- size_t of = *ofp;
- int64_t consumer_lag = -1;
- struct offset_stats offs;
- int32_t leader_nodeid = -1;
-
- rd_kafka_toppar_lock(rktp);
-
- if (rktp->rktp_leader) {
- rd_kafka_broker_lock(rktp->rktp_leader);
- leader_nodeid = rktp->rktp_leader->rkb_nodeid;
- rd_kafka_broker_unlock(rktp->rktp_leader);
- }
-
- /* Grab a copy of the latest finalized offset stats */
- offs = rktp->rktp_offsets_fin;
-
- if (rktp->rktp_hi_offset != RD_KAFKA_OFFSET_INVALID &&
- rktp->rktp_app_offset >= 0) {
- if (unlikely(rktp->rktp_app_offset > rktp->rktp_hi_offset))
- consumer_lag = 0;
- else
- consumer_lag = rktp->rktp_hi_offset -
- rktp->rktp_app_offset;
- }
-
- _st_printf("%s\"%"PRId32"\": { "
- "\"partition\":%"PRId32", "
- "\"leader\":%"PRId32", "
- "\"desired\":%s, "
- "\"unknown\":%s, "
- "\"msgq_cnt\":%i, "
- "\"msgq_bytes\":%"PRIu64", "
- "\"xmit_msgq_cnt\":%i, "
- "\"xmit_msgq_bytes\":%"PRIu64", "
- "\"fetchq_cnt\":%i, "
- "\"fetchq_size\":%"PRIu64", "
- "\"fetch_state\":\"%s\", "
- "\"query_offset\":%"PRId64", "
- "\"next_offset\":%"PRId64", "
- "\"app_offset\":%"PRId64", "
- "\"stored_offset\":%"PRId64", "
- "\"commited_offset\":%"PRId64", " /*FIXME: issue #80 */
- "\"committed_offset\":%"PRId64", "
- "\"eof_offset\":%"PRId64", "
- "\"lo_offset\":%"PRId64", "
- "\"hi_offset\":%"PRId64", "
- "\"consumer_lag\":%"PRId64", "
- "\"txmsgs\":%"PRIu64", "
- "\"txbytes\":%"PRIu64", "
- "\"msgs\": %"PRIu64", "
- "\"rx_ver_drops\": %"PRIu64" "
- "} ",
- first ? "" : ", ",
- rktp->rktp_partition,
- rktp->rktp_partition,
- leader_nodeid,
- (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_DESIRED)?"true":"false",
- (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_UNKNOWN)?"true":"false",
- rd_atomic32_get(&rktp->rktp_msgq.rkmq_msg_cnt),
- rd_atomic64_get(&rktp->rktp_msgq.rkmq_msg_bytes),
- rd_atomic32_get(&rktp->rktp_xmit_msgq.rkmq_msg_cnt),
- rd_atomic64_get(&rktp->rktp_xmit_msgq.rkmq_msg_bytes),
- rd_kafka_q_len(rktp->rktp_fetchq),
- rd_kafka_q_size(rktp->rktp_fetchq),
- rd_kafka_fetch_states[rktp->rktp_fetch_state],
- rktp->rktp_query_offset,
- offs.fetch_offset,
- rktp->rktp_app_offset,
- rktp->rktp_stored_offset,
- rktp->rktp_committed_offset, /* FIXME: issue #80 */
- rktp->rktp_committed_offset,
- offs.eof_offset,
- rktp->rktp_lo_offset,
- rktp->rktp_hi_offset,
- consumer_lag,
- rd_atomic64_get(&rktp->rktp_c.tx_msgs),
- rd_atomic64_get(&rktp->rktp_c.tx_bytes),
- rd_atomic64_get(&rktp->rktp_c.msgs),
- rd_atomic64_get(&rktp->rktp_c.rx_ver_drops));
-
- rd_kafka_toppar_unlock(rktp);
-
- *bufp = buf;
- *sizep = size;
- *ofp = of;
-}
-
-/**
- * Emit all statistics
- */
-static void rd_kafka_stats_emit_all (rd_kafka_t *rk) {
- char *buf;
- size_t size = 1024*10;
- size_t of = 0;
- rd_kafka_broker_t *rkb;
- rd_kafka_itopic_t *rkt;
- shptr_rd_kafka_toppar_t *s_rktp;
- rd_ts_t now;
- rd_kafka_op_t *rko;
- unsigned int tot_cnt;
- size_t tot_size;
-
- buf = rd_malloc(size);
-
-
- rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
- rd_kafka_rdlock(rk);
-
- now = rd_clock();
- _st_printf("{ "
- "\"name\": \"%s\", "
- "\"type\": \"%s\", "
- "\"ts\":%"PRId64", "
- "\"time\":%lli, "
- "\"replyq\":%i, "
- "\"msg_cnt\":%u, "
- "\"msg_size\":%"PRIusz", "
- "\"msg_max\":%u, "
- "\"msg_size_max\":%"PRIusz", "
- "\"simple_cnt\":%i, "
- "\"metadata_cache_cnt\":%i, "
- "\"brokers\":{ "/*open brokers*/,
- rk->rk_name,
- rd_kafka_type2str(rk->rk_type),
- now,
- (signed long long)time(NULL),
- rd_kafka_q_len(rk->rk_rep),
- tot_cnt, tot_size,
- rk->rk_curr_msgs.max_cnt, rk->rk_curr_msgs.max_size,
- rd_atomic32_get(&rk->rk_simple_cnt),
- rk->rk_metadata_cache.rkmc_cnt);
-
-
- TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
- rd_avg_t rtt, throttle, int_latency;
- rd_kafka_toppar_t *rktp;
-
- rd_kafka_broker_lock(rkb);
- rd_avg_rollover(&int_latency, &rkb->rkb_avg_int_latency);
- rd_avg_rollover(&rtt, &rkb->rkb_avg_rtt);
- rd_avg_rollover(&throttle, &rkb->rkb_avg_throttle);
- _st_printf("%s\"%s\": { "/*open broker*/
- "\"name\":\"%s\", "
- "\"nodeid\":%"PRId32", "
- "\"state\":\"%s\", "
- "\"stateage\":%"PRId64", "
- "\"outbuf_cnt\":%i, "
- "\"outbuf_msg_cnt\":%i, "
- "\"waitresp_cnt\":%i, "
- "\"waitresp_msg_cnt\":%i, "
- "\"tx\":%"PRIu64", "
- "\"txbytes\":%"PRIu64", "
- "\"txerrs\":%"PRIu64", "
- "\"txretries\":%"PRIu64", "
- "\"req_timeouts\":%"PRIu64", "
- "\"rx\":%"PRIu64", "
- "\"rxbytes\":%"PRIu64", "
- "\"rxerrs\":%"PRIu64", "
- "\"rxcorriderrs\":%"PRIu64", "
- "\"rxpartial\":%"PRIu64", "
- "\"zbuf_grow\":%"PRIu64", "
- "\"buf_grow\":%"PRIu64", "
- "\"wakeups\":%"PRIu64", "
- "\"int_latency\": {"
- " \"min\":%"PRId64","
- " \"max\":%"PRId64","
- " \"avg\":%"PRId64","
- " \"sum\":%"PRId64","
- " \"cnt\":%i "
- "}, "
- "\"rtt\": {"
- " \"min\":%"PRId64","
- " \"max\":%"PRId64","
- " \"avg\":%"PRId64","
- " \"sum\":%"PRId64","
- " \"cnt\":%i "
- "}, "
- "\"throttle\": {"
- " \"min\":%"PRId64","
- " \"max\":%"PRId64","
- " \"avg\":%"PRId64","
- " \"sum\":%"PRId64","
- " \"cnt\":%i "
- "}, "
- "\"toppars\":{ "/*open toppars*/,
- rkb == TAILQ_FIRST(&rk->rk_brokers) ? "" : ", ",
- rkb->rkb_name,
- rkb->rkb_name,
- rkb->rkb_nodeid,
- rd_kafka_broker_state_names[rkb->rkb_state],
- rkb->rkb_ts_state ? now - rkb->rkb_ts_state : 0,
- rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
- rd_atomic32_get(&rkb->rkb_outbufs.rkbq_msg_cnt),
- rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt),
- rd_atomic32_get(&rkb->rkb_waitresps.rkbq_msg_cnt),
- rd_atomic64_get(&rkb->rkb_c.tx),
- rd_atomic64_get(&rkb->rkb_c.tx_bytes),
- rd_atomic64_get(&rkb->rkb_c.tx_err),
- rd_atomic64_get(&rkb->rkb_c.tx_retries),
- rd_atomic64_get(&rkb->rkb_c.req_timeouts),
- rd_atomic64_get(&rkb->rkb_c.rx),
- rd_atomic64_get(&rkb->rkb_c.rx_bytes),
- rd_atomic64_get(&rkb->rkb_c.rx_err),
- rd_atomic64_get(&rkb->rkb_c.rx_corrid_err),
- rd_atomic64_get(&rkb->rkb_c.rx_partial),
- rd_atomic64_get(&rkb->rkb_c.zbuf_grow),
- rd_atomic64_get(&rkb->rkb_c.buf_grow),
- rd_atomic64_get(&rkb->rkb_c.wakeups),
- int_latency.ra_v.minv,
- int_latency.ra_v.maxv,
- int_latency.ra_v.avg,
- int_latency.ra_v.sum,
- int_latency.ra_v.cnt,
- rtt.ra_v.minv,
- rtt.ra_v.maxv,
- rtt.ra_v.avg,
- rtt.ra_v.sum,
- rtt.ra_v.cnt,
- throttle.ra_v.minv,
- throttle.ra_v.maxv,
- throttle.ra_v.avg,
- throttle.ra_v.sum,
- throttle.ra_v.cnt);
-
- TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
- _st_printf("%s\"%.*s-%"PRId32"\": { "
- "\"topic\":\"%.*s\", "
- "\"partition\":%"PRId32"} ",
- rktp==TAILQ_FIRST(&rkb->rkb_toppars)?"":", ",
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition,
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition);
- }
-
- rd_kafka_broker_unlock(rkb);
-
- _st_printf("} "/*close toppars*/
- "} "/*close broker*/);
- }
-
-
- _st_printf("}, " /* close "brokers" array */
- "\"topics\":{ ");
-
- TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
- int i, j;
-
- rd_kafka_topic_rdlock(rkt);
- _st_printf("%s\"%.*s\": { "
- "\"topic\":\"%.*s\", "
- "\"metadata_age\":%"PRId64", "
- "\"partitions\":{ " /*open partitions*/,
- rkt==TAILQ_FIRST(&rk->rk_topics)?"":", ",
- RD_KAFKAP_STR_PR(rkt->rkt_topic),
- RD_KAFKAP_STR_PR(rkt->rkt_topic),
- rkt->rkt_ts_metadata ?
- (rd_clock() - rkt->rkt_ts_metadata)/1000 : 0);
-
- for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
- rd_kafka_stats_emit_toppar(&buf, &size, &of,
- rd_kafka_toppar_s2i(rkt->rkt_p[i]),
- i == 0);
-
- RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, j)
- rd_kafka_stats_emit_toppar(&buf, &size, &of,
- rd_kafka_toppar_s2i(s_rktp),
- i+j == 0);
-
- i += j;
-
- if (rkt->rkt_ua)
- rd_kafka_stats_emit_toppar(&buf, &size, &of,
- rd_kafka_toppar_s2i(rkt->rkt_ua),
- i++ == 0);
- rd_kafka_topic_rdunlock(rkt);
-
- _st_printf("} "/*close partitions*/
- "} "/*close topic*/);
-
- }
- _st_printf("} "/*close topics*/);
-
- if (rk->rk_cgrp) {
- rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
- _st_printf(", \"cgrp\": { "
- "\"rebalance_age\": %"PRId64", "
- "\"rebalance_cnt\": %d, "
- "\"assignment_size\": %d }",
- rkcg->rkcg_c.ts_rebalance ?
- (rd_clock() - rkcg->rkcg_c.ts_rebalance)/1000 : 0,
- rkcg->rkcg_c.rebalance_cnt,
- rkcg->rkcg_c.assignment_size);
- }
- rd_kafka_rdunlock(rk);
-
- _st_printf("}"/*close object*/);
-
-
- /* Enqueue op for application */
- rko = rd_kafka_op_new(RD_KAFKA_OP_STATS);
- rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
- rko->rko_u.stats.json = buf;
- rko->rko_u.stats.json_len = of;
- rd_kafka_q_enq(rk->rk_rep, rko);
-}
-
-
-
-static void rd_kafka_topic_scan_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
- rd_kafka_t *rk = rkts->rkts_rk;
- rd_kafka_topic_scan_all(rk, rd_clock());
-}
-
-static void rd_kafka_stats_emit_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
- rd_kafka_t *rk = rkts->rkts_rk;
- rd_kafka_stats_emit_all(rk);
-}
-
-
-/**
- * @brief Periodic metadata refresh callback
- *
- * @locality rdkafka main thread
- */
-static void rd_kafka_metadata_refresh_cb (rd_kafka_timers_t *rkts, void *arg) {
- rd_kafka_t *rk = rkts->rkts_rk;
- int sparse = 1;
-
- /* Dont do sparse requests if there is a consumer group with an
- * active subscription since subscriptions need to be able to match
- * on all topics. */
- if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp &&
- rk->rk_cgrp->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
- sparse = 0;
-
- if (sparse)
- rd_kafka_metadata_refresh_known_topics(rk, NULL, 1/*force*/,
- "periodic refresh");
- else
- rd_kafka_metadata_refresh_all(rk, NULL, "periodic refresh");
-}
-
-
-/**
- * Main loop for Kafka handler thread.
- */
-static int rd_kafka_thread_main (void *arg) {
- rd_kafka_t *rk = arg;
- rd_kafka_timer_t tmr_topic_scan = RD_ZERO_INIT;
- rd_kafka_timer_t tmr_stats_emit = RD_ZERO_INIT;
- rd_kafka_timer_t tmr_metadata_refresh = RD_ZERO_INIT;
-
- rd_snprintf(rd_kafka_thread_name, sizeof(rd_kafka_thread_name), "main");
-
- (void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);
-
- /* Acquire lock (which was held by thread creator during creation)
- * to synchronise state. */
- rd_kafka_wrlock(rk);
- rd_kafka_wrunlock(rk);
-
- rd_kafka_timer_start(&rk->rk_timers, &tmr_topic_scan, 1000000,
- rd_kafka_topic_scan_tmr_cb, NULL);
- rd_kafka_timer_start(&rk->rk_timers, &tmr_stats_emit,
- rk->rk_conf.stats_interval_ms * 1000ll,
- rd_kafka_stats_emit_tmr_cb, NULL);
- if (rk->rk_conf.metadata_refresh_interval_ms > 0)
- rd_kafka_timer_start(&rk->rk_timers, &tmr_metadata_refresh,
- rk->rk_conf.metadata_refresh_interval_ms *
- 1000ll,
- rd_kafka_metadata_refresh_cb, NULL);
-
- if (rk->rk_cgrp) {
- rd_kafka_cgrp_reassign_broker(rk->rk_cgrp);
- rd_kafka_q_fwd_set(rk->rk_cgrp->rkcg_ops, rk->rk_ops);
- }
-
- while (likely(!rd_kafka_terminating(rk) ||
- rd_kafka_q_len(rk->rk_ops))) {
- rd_ts_t sleeptime = rd_kafka_timers_next(
- &rk->rk_timers, 1000*1000/*1s*/, 1/*lock*/);
- rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0,
- RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
- if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
- rd_kafka_cgrp_serve(rk->rk_cgrp);
- rd_kafka_timers_run(&rk->rk_timers, RD_POLL_NOWAIT);
- }
-
- rd_kafka_q_disable(rk->rk_ops);
- rd_kafka_q_purge(rk->rk_ops);
-
- rd_kafka_timer_stop(&rk->rk_timers, &tmr_topic_scan, 1);
- rd_kafka_timer_stop(&rk->rk_timers, &tmr_stats_emit, 1);
- rd_kafka_timer_stop(&rk->rk_timers, &tmr_metadata_refresh, 1);
-
- /* Synchronise state */
- rd_kafka_wrlock(rk);
- rd_kafka_wrunlock(rk);
-
- rd_kafka_destroy_internal(rk);
-
- rd_kafka_dbg(rk, GENERIC, "TERMINATE",
- "Main background thread exiting");
-
- rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1);
-
- return 0;
-}
-
-
-static void rd_kafka_term_sig_handler (int sig) {
- /* nop */
-}
-
-
-rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
- char *errstr, size_t errstr_size) {
- rd_kafka_t *rk;
- static rd_atomic32_t rkid;
- rd_kafka_conf_t *conf;
- rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
- int ret_errno = 0;
-#ifndef _MSC_VER
- sigset_t newset, oldset;
-#endif
-
- call_once(&rd_kafka_global_init_once, rd_kafka_global_init);
-
- /* rd_kafka_new() takes ownership of the provided \p app_conf
- * object if rd_kafka_new() succeeds.
- * Since \p app_conf is optional we allocate a default configuration
- * object here if \p app_conf is NULL.
- * The configuration object itself is struct-copied later
- * leaving the default *conf pointer to be ready for freeing.
- * In case new() fails and app_conf was specified we will clear out
- * rk_conf to avoid double-freeing from destroy_internal() and the
- * user's eventual call to rd_kafka_conf_destroy().
- * This is all a bit tricky but that's the nature of
- * legacy interfaces. */
- if (!app_conf)
- conf = rd_kafka_conf_new();
- else
- conf = app_conf;
-
- /* Verify mandatory configuration */
- if (!conf->socket_cb) {
- rd_snprintf(errstr, errstr_size,
- "Mandatory config property 'socket_cb' not set");
- if (!app_conf)
- rd_kafka_conf_destroy(conf);
- rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
- return NULL;
- }
-
- if (!conf->open_cb) {
- rd_snprintf(errstr, errstr_size,
- "Mandatory config property 'open_cb' not set");
- if (!app_conf)
- rd_kafka_conf_destroy(conf);
- rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
- return NULL;
- }
-
- if (conf->metadata_max_age_ms == -1) {
- if (conf->metadata_refresh_interval_ms > 0)
- conf->metadata_max_age_ms =
- conf->metadata_refresh_interval_ms * 3;
- else /* use default value of refresh * 3 */
- conf->metadata_max_age_ms = 5*60*1000 * 3;
- }
-
- rd_kafka_global_cnt_incr();
-
- /*
- * Set up the handle.
- */
- rk = rd_calloc(1, sizeof(*rk));
-
- rk->rk_type = type;
-
- /* Struct-copy the config object. */
- rk->rk_conf = *conf;
- if (!app_conf)
- rd_free(conf); /* Free the base config struct only,
- * not its fields since they were copied to
- * rk_conf just above. Those fields are
- * freed from rd_kafka_destroy_internal()
- * as the rk itself is destroyed. */
-
- /* Call on_new() interceptors */
- rd_kafka_interceptors_on_new(rk, &rk->rk_conf);
-
- rwlock_init(&rk->rk_lock);
- mtx_init(&rk->rk_internal_rkb_lock, mtx_plain);
-
- cnd_init(&rk->rk_broker_state_change_cnd);
- mtx_init(&rk->rk_broker_state_change_lock, mtx_plain);
-
- rk->rk_rep = rd_kafka_q_new(rk);
- rk->rk_ops = rd_kafka_q_new(rk);
- rk->rk_ops->rkq_serve = rd_kafka_poll_cb;
- rk->rk_ops->rkq_opaque = rk;
-
- if (rk->rk_conf.log_queue) {
- rk->rk_logq = rd_kafka_q_new(rk);
- rk->rk_logq->rkq_serve = rd_kafka_poll_cb;
- rk->rk_logq->rkq_opaque = rk;
- }
-
- TAILQ_INIT(&rk->rk_brokers);
- TAILQ_INIT(&rk->rk_topics);
- rd_kafka_timers_init(&rk->rk_timers, rk);
- rd_kafka_metadata_cache_init(rk);
-
- if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb)
- rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR;
- if (rk->rk_conf.rebalance_cb)
- rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE;
- if (rk->rk_conf.offset_commit_cb)
- rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT;
-
- /* Convenience Kafka protocol null bytes */
- rk->rk_null_bytes = rd_kafkap_bytes_new(NULL, 0);
-
- if (rk->rk_conf.debug)
- rk->rk_conf.log_level = LOG_DEBUG;
-
- rd_snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i",
- rk->rk_conf.client_id_str, rd_kafka_type2str(rk->rk_type),
- rd_atomic32_add(&rkid, 1));
-
- /* Construct clientid kafka string */
- rk->rk_client_id = rd_kafkap_str_new(rk->rk_conf.client_id_str,-1);
-
- /* Convert group.id to kafka string (may be NULL) */
- rk->rk_group_id = rd_kafkap_str_new(rk->rk_conf.group_id_str,-1);
-
- /* Config fixups */
- rk->rk_conf.queued_max_msg_bytes =
- (int64_t)rk->rk_conf.queued_max_msg_kbytes * 1000ll;
-
- /* Enable api.version.request=true if fallback.broker.version
- * indicates a supporting broker. */
- if (rd_kafka_ApiVersion_is_queryable(rk->rk_conf.broker_version_fallback))
- rk->rk_conf.api_version_request = 1;
-
- if (rk->rk_type == RD_KAFKA_PRODUCER) {
- mtx_init(&rk->rk_curr_msgs.lock, mtx_plain);
- cnd_init(&rk->rk_curr_msgs.cnd);
- rk->rk_curr_msgs.max_cnt =
- rk->rk_conf.queue_buffering_max_msgs;
- if ((unsigned long long)rk->rk_conf.queue_buffering_max_kbytes * 1024 >
- (unsigned long long)SIZE_MAX)
- rk->rk_curr_msgs.max_size = SIZE_MAX;
- else
- rk->rk_curr_msgs.max_size =
- (size_t)rk->rk_conf.queue_buffering_max_kbytes * 1024;
- }
-
- if (rd_kafka_assignors_init(rk, errstr, errstr_size) == -1) {
- ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
- ret_errno = EINVAL;
- goto fail;
- }
-
- if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
- rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT) {
- if (rd_kafka_sasl_select_provider(rk,
- errstr, errstr_size) == -1) {
- ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
- ret_errno = EINVAL;
- goto fail;
- }
- }
-
-#if WITH_SSL
- if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SSL ||
- rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL) {
- /* Create SSL context */
- if (rd_kafka_transport_ssl_ctx_init(rk, errstr,
- errstr_size) == -1) {
- ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
- ret_errno = EINVAL;
- goto fail;
- }
- }
-#endif
-
- /* Client group, eligible both in consumer and producer mode. */
- if (type == RD_KAFKA_CONSUMER &&
- RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0)
- rk->rk_cgrp = rd_kafka_cgrp_new(rk,
- rk->rk_group_id,
- rk->rk_client_id);
-
-
-
-#ifndef _MSC_VER
- /* Block all signals in newly created thread.
- * To avoid race condition we block all signals in the calling
- * thread, which the new thread will inherit its sigmask from,
- * and then restore the original sigmask of the calling thread when
- * we're done creating the thread. */
- sigemptyset(&oldset);
- sigfillset(&newset);
- if (rk->rk_conf.term_sig) {
- struct sigaction sa_term = {
- .sa_handler = rd_kafka_term_sig_handler
- };
- sigaction(rk->rk_conf.term_sig, &sa_term, NULL);
- }
- pthread_sigmask(SIG_SETMASK, &newset, &oldset);
-#endif
-
- /* Lock handle here to synchronise state, i.e., hold off
- * the thread until we've finalized the handle. */
- rd_kafka_wrlock(rk);
-
- /* Create handler thread */
- if ((thrd_create(&rk->rk_thread,
- rd_kafka_thread_main, rk)) != thrd_success) {
- ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
- ret_errno = errno;
- if (errstr)
- rd_snprintf(errstr, errstr_size,
- "Failed to create thread: %s (%i)",
- rd_strerror(errno), errno);
- rd_kafka_wrunlock(rk);
-#ifndef _MSC_VER
- /* Restore sigmask of caller */
- pthread_sigmask(SIG_SETMASK, &oldset, NULL);
-#endif
- goto fail;
- }
-
- rd_kafka_wrunlock(rk);
-
- rk->rk_eos.PID = -1;
- rk->rk_eos.TransactionalId = rd_kafkap_str_new(NULL, 0);
-
- mtx_lock(&rk->rk_internal_rkb_lock);
- rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL,
- RD_KAFKA_PROTO_PLAINTEXT,
- "", 0, RD_KAFKA_NODEID_UA);
- mtx_unlock(&rk->rk_internal_rkb_lock);
-
- /* Add initial list of brokers from configuration */
- if (rk->rk_conf.brokerlist) {
- if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0)
- rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
- "No brokers configured");
- }
-
-#ifndef _MSC_VER
- /* Restore sigmask of caller */
- pthread_sigmask(SIG_SETMASK, &oldset, NULL);
-#endif
-
- /* Free user supplied conf's base pointer on success,
- * but not the actual allocated fields since the struct
- * will have been copied in its entirety above. */
- if (app_conf)
- rd_free(app_conf);
- rd_kafka_set_last_error(0, 0);
-
- rk->rk_initialized = 1;
-
- return rk;
-
-fail:
- /*
- * Error out and clean up
- */
-
- /* If on_new() interceptors have been called we also need
- * to allow interceptor clean-up by calling on_destroy() */
- rd_kafka_interceptors_on_destroy(rk);
-
- /* If rk_conf is a struct-copy of the application configuration
- * we need to avoid rk_conf fields from being freed from
- * rd_kafka_destroy_internal() since they belong to app_conf.
- * However, there are some internal fields, such as interceptors,
- * that belong to rk_conf and thus needs to be cleaned up.
- * Legacy APIs, sigh.. */
- if (app_conf) {
- rd_kafka_assignors_term(rk);
- rd_kafka_interceptors_destroy(&rk->rk_conf);
- memset(&rk->rk_conf, 0, sizeof(rk->rk_conf));
- }
-
- rd_atomic32_add(&rk->rk_terminate, 1);
- rd_kafka_destroy_internal(rk);
- rd_kafka_destroy_final(rk);
-
- rd_kafka_set_last_error(ret_err, ret_errno);
-
- return NULL;
-}
-
-
-
-
-
-/**
- * Produce a single message.
- * Locality: any application thread
- */
-int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,
- int msgflags,
- void *payload, size_t len,
- const void *key, size_t keylen,
- void *msg_opaque) {
- return rd_kafka_msg_new(rd_kafka_topic_a2i(rkt), partition,
- msgflags, payload, len,
- key, keylen, msg_opaque);
-}
-
-
-/**
- * Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with
- * friends) since it does not have an API for stopping the cgrp we will need to
- * sort that out automatically in the background when all consumption
- * has stopped.
- *
- * Returns 0 if a High level consumer is already instantiated
- * which means a Simple consumer cannot co-operate with it, else 1.
- *
- * A rd_kafka_t handle can never migrate from simple to high-level, or
- * vice versa, so we dont need a ..consumer_del().
- */
-int rd_kafka_simple_consumer_add (rd_kafka_t *rk) {
- if (rd_atomic32_get(&rk->rk_simple_cnt) < 0)
- return 0;
-
- return (int)rd_atomic32_add(&rk->rk_simple_cnt, 1);
-}
-
-
-
-
-/**
- * rktp fetch is split up in these parts:
- * * application side:
- * * broker side (handled by current leader broker thread for rktp):
- * - the fetch state, initial offset, etc.
- * - fetching messages, updating fetched offset, etc.
- * - offset commits
- *
- * Communication between the two are:
- * app side -> rdkafka main side: rktp_ops
- * broker thread -> app side: rktp_fetchq
- *
- * There is no shared state between these threads, instead
- * state is communicated through the two op queues, and state synchronization
- * is performed by version barriers.
- *
- */
-
-static RD_UNUSED
-int rd_kafka_consume_start0 (rd_kafka_itopic_t *rkt, int32_t partition,
- int64_t offset, rd_kafka_q_t *rkq) {
- shptr_rd_kafka_toppar_t *s_rktp;
-
- if (partition < 0) {
- rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
- ESRCH);
- return -1;
- }
-
- if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) {
- rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
- return -1;
- }
-
- rd_kafka_topic_wrlock(rkt);
- s_rktp = rd_kafka_toppar_desired_add(rkt, partition);
- rd_kafka_topic_wrunlock(rkt);
-
- /* Verify offset */
- if (offset == RD_KAFKA_OFFSET_BEGINNING ||
- offset == RD_KAFKA_OFFSET_END ||
- offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
- /* logical offsets */
-
- } else if (offset == RD_KAFKA_OFFSET_STORED) {
- /* offset manager */
-
- if (rkt->rkt_conf.offset_store_method ==
- RD_KAFKA_OFFSET_METHOD_BROKER &&
- RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) {
- /* Broker based offsets require a group id. */
- rd_kafka_toppar_destroy(s_rktp);
- rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
- EINVAL);
- return -1;
- }
-
- } else if (offset < 0) {
- rd_kafka_toppar_destroy(s_rktp);
- rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
- EINVAL);
- return -1;
-
- }
-
- rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_s2i(s_rktp), offset,
- rkq, RD_KAFKA_NO_REPLYQ);
-
- rd_kafka_toppar_destroy(s_rktp);
-
- rd_kafka_set_last_error(0, 0);
- return 0;
-}
-
-
-
-
-int rd_kafka_consume_start (rd_kafka_topic_t *app_rkt, int32_t partition,
- int64_t offset) {
- rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
- rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START",
- "Start consuming partition %"PRId32,partition);
- return rd_kafka_consume_start0(rkt, partition, offset, NULL);
-}
-
-int rd_kafka_consume_start_queue (rd_kafka_topic_t *app_rkt, int32_t partition,
- int64_t offset, rd_kafka_queue_t *rkqu) {
- rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
-
- return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q);
-}
-
-
-
-
-static RD_UNUSED int rd_kafka_consume_stop0 (rd_kafka_toppar_t *rktp) {
- rd_kafka_q_t *tmpq = NULL;
- rd_kafka_resp_err_t err;
-
- rd_kafka_topic_wrlock(rktp->rktp_rkt);
- rd_kafka_toppar_lock(rktp);
- rd_kafka_toppar_desired_del(rktp);
- rd_kafka_toppar_unlock(rktp);
- rd_kafka_topic_wrunlock(rktp->rktp_rkt);
-
- tmpq = rd_kafka_q_new(rktp->rktp_rkt->rkt_rk);
-
- rd_kafka_toppar_op_fetch_stop(rktp, RD_KAFKA_REPLYQ(tmpq, 0));
-
- /* Synchronisation: Wait for stop reply from broker thread */
- err = rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
- rd_kafka_q_destroy(tmpq);
-
- rd_kafka_set_last_error(err, err ? EINVAL : 0);
-
- return err ? -1 : 0;
-}
-
-
-int rd_kafka_consume_stop (rd_kafka_topic_t *app_rkt, int32_t partition) {
- rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
- shptr_rd_kafka_toppar_t *s_rktp;
- int r;
-
- if (partition == RD_KAFKA_PARTITION_UA) {
- rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
- return -1;
- }
-
- rd_kafka_topic_wrlock(rkt);
- if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
- !(s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
- rd_kafka_topic_wrunlock(rkt);
- rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
- ESRCH);
- return -1;
- }
- rd_kafka_topic_wrunlock(rkt);
-
- r = rd_kafka_consume_stop0(rd_kafka_toppar_s2i(s_rktp));
- /* set_last_error() called by stop0() */
-
- rd_kafka_toppar_destroy(s_rktp);
-
- return r;
-}
-
-
-
-rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *app_rkt,
- int32_t partition,
- int64_t offset,
- int timeout_ms) {
- rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
- shptr_rd_kafka_toppar_t *s_rktp;
- rd_kafka_toppar_t *rktp;
- rd_kafka_q_t *tmpq = NULL;
- rd_kafka_resp_err_t err;
-
- /* FIXME: simple consumer check */
-
- if (partition == RD_KAFKA_PARTITION_UA)
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
-
- rd_kafka_topic_rdlock(rkt);
- if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
- !(s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
- rd_kafka_topic_rdunlock(rkt);
- return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
- }
- rd_kafka_topic_rdunlock(rkt);
-
- if (timeout_ms)
- tmpq = rd_kafka_q_new(rkt->rkt_rk);
-
- rktp = rd_kafka_toppar_s2i(s_rktp);
- if ((err = rd_kafka_toppar_op_seek(rktp, offset,
- RD_KAFKA_REPLYQ(tmpq, 0)))) {
- if (tmpq)
- rd_kafka_q_destroy(tmpq);
- rd_kafka_toppar_destroy(s_rktp);
- return err;
- }
-
- rd_kafka_toppar_destroy(s_rktp);
-
- if (tmpq) {
- err = rd_kafka_q_wait_result(tmpq, timeout_ms);
- rd_kafka_q_destroy(tmpq);
- return err;
- }
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-static ssize_t rd_kafka_consume_batch0 (rd_kafka_q_t *rkq,
- int timeout_ms,
- rd_kafka_message_t **rkmessages,
- size_t rkmessages_size) {
- /* Populate application's rkmessages array. */
- return rd_kafka_q_serve_rkmessages(rkq, timeout_ms,
- rkmessages, rkmessages_size);
-}
-
-
-ssize_t rd_kafka_consume_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
- int timeout_ms,
- rd_kafka_message_t **rkmessages,
- size_t rkmessages_size) {
- rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
- shptr_rd_kafka_toppar_t *s_rktp;
- rd_kafka_toppar_t *rktp;
- ssize_t cnt;
-
- /* Get toppar */
- rd_kafka_topic_rdlock(rkt);
- s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
- if (unlikely(!s_rktp))
- s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
- rd_kafka_topic_rdunlock(rkt);
-
- if (unlikely(!s_rktp)) {
- /* No such toppar known */
- rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
- ESRCH);
- return -1;
- }
-
- rktp = rd_kafka_toppar_s2i(s_rktp);
-
- /* Populate application's rkmessages array. */
- cnt = rd_kafka_q_serve_rkmessages(rktp->rktp_fetchq, timeout_ms,
- rkmessages, rkmessages_size);
-
- rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */
-
- rd_kafka_set_last_error(0, 0);
-
- return cnt;
-}
-
-ssize_t rd_kafka_consume_batch_queue (rd_kafka_queue_t *rkqu,
- int timeout_ms,
- rd_kafka_message_t **rkmessages,
- size_t rkmessages_size) {
- /* Populate application's rkmessages array. */
- return rd_kafka_consume_batch0(rkqu->rkqu_q, timeout_ms,
- rkmessages, rkmessages_size);
-}
-
-
-struct consume_ctx {
- void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque);
- void *opaque;
-};
-
-
-/**
- * Trampoline for application's consume_cb()
- */
-static rd_kafka_op_res_t
-rd_kafka_consume_cb (rd_kafka_t *rk,
- rd_kafka_q_t *rkq,
- rd_kafka_op_t *rko,
- rd_kafka_q_cb_type_t cb_type, void *opaque) {
- struct consume_ctx *ctx = opaque;
- rd_kafka_message_t *rkmessage;
-
- if (unlikely(rd_kafka_op_version_outdated(rko, 0))) {
- rd_kafka_op_destroy(rko);
- return RD_KAFKA_OP_RES_HANDLED;
- }
-
- rkmessage = rd_kafka_message_get(rko);
-
- rd_kafka_op_offset_store(rk, rko, rkmessage);
-
- ctx->consume_cb(rkmessage, ctx->opaque);
-
- rd_kafka_op_destroy(rko);
-
- return RD_KAFKA_OP_RES_HANDLED;
-}
-
-
-
-static rd_kafka_op_res_t
-rd_kafka_consume_callback0 (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt,
- void (*consume_cb) (rd_kafka_message_t
- *rkmessage,
- void *opaque),
- void *opaque) {
- struct consume_ctx ctx = { .consume_cb = consume_cb, .opaque = opaque };
- return rd_kafka_q_serve(rkq, timeout_ms, max_cnt,
- RD_KAFKA_Q_CB_RETURN,
- rd_kafka_consume_cb, &ctx);
-
-}
-
-
-int rd_kafka_consume_callback (rd_kafka_topic_t *app_rkt, int32_t partition,
- int timeout_ms,
- void (*consume_cb) (rd_kafka_message_t
- *rkmessage,
- void *opaque),
- void *opaque) {
- rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
- shptr_rd_kafka_toppar_t *s_rktp;
- rd_kafka_toppar_t *rktp;
- int r;
-
- /* Get toppar */
- rd_kafka_topic_rdlock(rkt);
- s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
- if (unlikely(!s_rktp))
- s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
- rd_kafka_topic_rdunlock(rkt);
-
- if (unlikely(!s_rktp)) {
- /* No such toppar known */
- rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
- ESRCH);
- return -1;
- }
-
- rktp = rd_kafka_toppar_s2i(s_rktp);
- r = rd_kafka_consume_callback0(rktp->rktp_fetchq, timeout_ms,
- rkt->rkt_conf.consume_callback_max_msgs,
- consume_cb, opaque);
-
- rd_kafka_toppar_destroy(s_rktp);
-
- rd_kafka_set_last_error(0, 0);
-
- return r;
-}
-
-
-
-int rd_kafka_consume_callback_queue (rd_kafka_queue_t *rkqu,
- int timeout_ms,
- void (*consume_cb) (rd_kafka_message_t
- *rkmessage,
- void *opaque),
- void *opaque) {
- return rd_kafka_consume_callback0(rkqu->rkqu_q, timeout_ms, 0,
- consume_cb, opaque);
-}
-
-
-/**
- * Serve queue 'rkq' and return one message.
- * By serving the queue it will also call any registered callbacks
- * registered for matching events, this includes consumer_cb()
- * in which case no message will be returned.
- */
-static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk,
- rd_kafka_q_t *rkq,
- int timeout_ms) {
- rd_kafka_op_t *rko;
- rd_kafka_message_t *rkmessage = NULL;
- rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
-
- rd_kafka_yield_thread = 0;
- while ((rko = rd_kafka_q_pop(rkq,
- rd_timeout_remains(abs_timeout), 0))) {
- rd_kafka_op_res_t res;
-
- res = rd_kafka_poll_cb(rk, rkq, rko,
- RD_KAFKA_Q_CB_RETURN, NULL);
-
- if (res == RD_KAFKA_OP_RES_PASS)
- break;
-
- if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
- rd_kafka_yield_thread)) {
- /* Callback called rd_kafka_yield(), we must
- * stop dispatching the queue and return. */
- rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR,
- EINTR);
- return NULL;
- }
-
- /* Message was handled by callback. */
- continue;
- }
-
- if (!rko) {
- /* Timeout reached with no op returned. */
- rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
- ETIMEDOUT);
- return NULL;
- }
-
- rd_kafka_assert(rk,
- rko->rko_type == RD_KAFKA_OP_FETCH ||
- rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR);
-
- /* Get rkmessage from rko */
- rkmessage = rd_kafka_message_get(rko);
-
- /* Store offset */
- rd_kafka_op_offset_store(rk, rko, rkmessage);
-
- rd_kafka_set_last_error(0, 0);
-
- return rkmessage;
-}
-
-rd_kafka_message_t *rd_kafka_consume (rd_kafka_topic_t *app_rkt,
- int32_t partition,
- int timeout_ms) {
- rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
- shptr_rd_kafka_toppar_t *s_rktp;
- rd_kafka_toppar_t *rktp;
- rd_kafka_message_t *rkmessage;
-
- rd_kafka_topic_rdlock(rkt);
- s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
- if (unlikely(!s_rktp))
- s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
- rd_kafka_topic_rdunlock(rkt);
-
- if (unlikely(!s_rktp)) {
- /* No such toppar known */
- rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
- ESRCH);
- return NULL;
- }
-
- rktp = rd_kafka_toppar_s2i(s_rktp);
- rkmessage = rd_kafka_consume0(rkt->rkt_rk,
- rktp->rktp_fetchq, timeout_ms);
-
- rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */
-
- return rkmessage;
-}
-
-
-rd_kafka_message_t *rd_kafka_consume_queue (rd_kafka_queue_t *rkqu,
- int timeout_ms) {
- return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms);
-}
-
-
-
-
-rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk) {
- rd_kafka_cgrp_t *rkcg;
-
- if (!(rkcg = rd_kafka_cgrp_get(rk)))
- return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
- rd_kafka_q_fwd_set(rk->rk_rep, rkcg->rkcg_q);
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-
-rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,
- int timeout_ms) {
- rd_kafka_cgrp_t *rkcg;
-
- if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) {
- rd_kafka_message_t *rkmessage = rd_kafka_message_new();
- rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
- return rkmessage;
- }
-
- return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms);
-}
-
-
-rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk) {
- rd_kafka_cgrp_t *rkcg;
- rd_kafka_op_t *rko;
- rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
- rd_kafka_q_t *rkq;
-
- if (!(rkcg = rd_kafka_cgrp_get(rk)))
- return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
- /* Redirect cgrp queue to our temporary queue to make sure
- * all posted ops (e.g., rebalance callbacks) are served by
- * this function. */
- rkq = rd_kafka_q_new(rk);
- rd_kafka_q_fwd_set(rkcg->rkcg_q, rkq);
-
- rd_kafka_cgrp_terminate(rkcg, RD_KAFKA_REPLYQ(rkq, 0)); /* async */
-
- while ((rko = rd_kafka_q_pop(rkq, RD_POLL_INFINITE, 0))) {
- rd_kafka_op_res_t res;
- if ((rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) ==
- RD_KAFKA_OP_TERMINATE) {
- err = rko->rko_err;
- rd_kafka_op_destroy(rko);
- break;
- }
- res = rd_kafka_poll_cb(rk, rkq, rko,
- RD_KAFKA_Q_CB_RETURN, NULL);
- if (res == RD_KAFKA_OP_RES_PASS)
- rd_kafka_op_destroy(rko);
- /* Ignore YIELD, we need to finish */
- }
-
- rd_kafka_q_destroy(rkq);
-
- rd_kafka_q_fwd_set(rkcg->rkcg_q, NULL);
-
- return err;
-}
-
-
-
-rd_kafka_resp_err_t
-rd_kafka_committed (rd_kafka_t *rk,
- rd_kafka_topic_partition_list_t *partitions,
- int timeout_ms) {
- rd_kafka_q_t *rkq;
- rd_kafka_resp_err_t err;
- rd_kafka_cgrp_t *rkcg;
- rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
-
- if (!partitions)
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
-
- if (!(rkcg = rd_kafka_cgrp_get(rk)))
- return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
- /* Set default offsets. */
- rd_kafka_topic_partition_list_reset_offsets(partitions,
- RD_KAFKA_OFFSET_INVALID);
-
- rkq = rd_kafka_q_new(rk);
-
- do {
- rd_kafka_op_t *rko;
- int state_version = rd_kafka_brokers_get_state_version(rk);
-
- rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
- rd_kafka_op_set_replyq(rko, rkq, NULL);
-
- /* Issue #827
- * Copy partition list to avoid use-after-free if we time out
- * here, the app frees the list, and then cgrp starts
- * processing the op. */
- rko->rko_u.offset_fetch.partitions =
- rd_kafka_topic_partition_list_copy(partitions);
- rko->rko_u.offset_fetch.do_free = 1;
-
- if (!rd_kafka_q_enq(rkcg->rkcg_ops, rko)) {
- err = RD_KAFKA_RESP_ERR__DESTROY;
- break;
- }
-
- rko = rd_kafka_q_pop(rkq, rd_timeout_remains(abs_timeout), 0);
- if (rko) {
- if (!(err = rko->rko_err))
- rd_kafka_topic_partition_list_update(
- partitions,
- rko->rko_u.offset_fetch.partitions);
- else if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
- err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
- !rd_kafka_brokers_wait_state_change(
- rk, state_version,
- rd_timeout_remains(abs_timeout)))
- err = RD_KAFKA_RESP_ERR__TIMED_OUT;
-
- rd_kafka_op_destroy(rko);
- } else
- err = RD_KAFKA_RESP_ERR__TIMED_OUT;
- } while (err == RD_KAFKA_RESP_ERR__TRANSPORT ||
- err == RD_KAFKA_RESP_ERR__WAIT_COORD);
-
- rd_kafka_q_destroy(rkq);
-
- return err;
-}
-
-
-
-rd_kafka_resp_err_t
-rd_kafka_position (rd_kafka_t *rk,
- rd_kafka_topic_partition_list_t *partitions) {
- int i;
-
- /* Set default offsets. */
- rd_kafka_topic_partition_list_reset_offsets(partitions,
- RD_KAFKA_OFFSET_INVALID);
-
- for (i = 0 ; i < partitions->cnt ; i++) {
- rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
- shptr_rd_kafka_toppar_t *s_rktp;
- rd_kafka_toppar_t *rktp;
-
- if (!(s_rktp = rd_kafka_toppar_get2(rk, rktpar->topic,
- rktpar->partition, 0, 1))) {
- rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
- rktpar->offset = RD_KAFKA_OFFSET_INVALID;
- continue;
- }
-
- rktp = rd_kafka_toppar_s2i(s_rktp);
- rd_kafka_toppar_lock(rktp);
- rktpar->offset = rktp->rktp_app_offset;
- rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
- rd_kafka_toppar_unlock(rktp);
- rd_kafka_toppar_destroy(s_rktp);
- }
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-struct _query_wmark_offsets_state {
- rd_kafka_resp_err_t err;
- const char *topic;
- int32_t partition;
- int64_t offsets[2];
- int offidx; /* next offset to set from response */
- rd_ts_t ts_end;
- int state_version; /* Broker state version */
-};
-
-static void rd_kafka_query_wmark_offsets_resp_cb (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- void *opaque) {
- struct _query_wmark_offsets_state *state = opaque;
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_topic_partition_t *rktpar;
-
- offsets = rd_kafka_topic_partition_list_new(1);
- err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request, offsets);
- if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
- rd_kafka_topic_partition_list_destroy(offsets);
- return; /* Retrying */
- }
-
- /* Retry if no broker connection is available yet. */
- if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
- err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
- rkb &&
- rd_kafka_brokers_wait_state_change(
- rkb->rkb_rk, state->state_version,
- rd_timeout_remains(state->ts_end))) {
- /* Retry */
- state->state_version = rd_kafka_brokers_get_state_version(rk);
- request->rkbuf_retries = 0;
- if (rd_kafka_buf_retry(rkb, request)) {
- rd_kafka_topic_partition_list_destroy(offsets);
- return; /* Retry in progress */
- }
- /* FALLTHRU */
- }
-
- /* Partition not seen in response. */
- if (!(rktpar = rd_kafka_topic_partition_list_find(offsets,
- state->topic,
- state->partition)))
- err = RD_KAFKA_RESP_ERR__BAD_MSG;
- else if (rktpar->err)
- err = rktpar->err;
- else
- state->offsets[state->offidx] = rktpar->offset;
-
- state->offidx++;
-
- if (err || state->offidx == 2) /* Error or Done */
- state->err = err;
-
- rd_kafka_topic_partition_list_destroy(offsets);
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic,
- int32_t partition,
- int64_t *low, int64_t *high, int timeout_ms) {
- rd_kafka_q_t *rkq;
- struct _query_wmark_offsets_state state;
- rd_ts_t ts_end = rd_timeout_init(timeout_ms);
- rd_kafka_topic_partition_list_t *partitions;
- rd_kafka_topic_partition_t *rktpar;
- struct rd_kafka_partition_leader *leader;
- rd_list_t leaders;
- rd_kafka_resp_err_t err;
-
- partitions = rd_kafka_topic_partition_list_new(1);
- rktpar = rd_kafka_topic_partition_list_add(partitions,
- topic, partition);
-
- rd_list_init(&leaders, partitions->cnt,
- (void *)rd_kafka_partition_leader_destroy);
-
- err = rd_kafka_topic_partition_list_query_leaders(rk, partitions,
- &leaders, timeout_ms);
- if (err) {
- rd_list_destroy(&leaders);
- rd_kafka_topic_partition_list_destroy(partitions);
- return err;
- }
-
- leader = rd_list_elem(&leaders, 0);
-
- rkq = rd_kafka_q_new(rk);
-
- /* Due to KAFKA-1588 we need to send a request for each wanted offset,
- * in this case one for the low watermark and one for the high. */
- state.topic = topic;
- state.partition = partition;
- state.offsets[0] = RD_KAFKA_OFFSET_BEGINNING;
- state.offsets[1] = RD_KAFKA_OFFSET_END;
- state.offidx = 0;
- state.err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
- state.ts_end = ts_end;
- state.state_version = rd_kafka_brokers_get_state_version(rk);
-
-
- rktpar->offset = RD_KAFKA_OFFSET_BEGINNING;
- rd_kafka_OffsetRequest(leader->rkb, partitions, 0,
- RD_KAFKA_REPLYQ(rkq, 0),
- rd_kafka_query_wmark_offsets_resp_cb,
- &state);
-
- rktpar->offset = RD_KAFKA_OFFSET_END;
- rd_kafka_OffsetRequest(leader->rkb, partitions, 0,
- RD_KAFKA_REPLYQ(rkq, 0),
- rd_kafka_query_wmark_offsets_resp_cb,
- &state);
-
- rd_kafka_topic_partition_list_destroy(partitions);
- rd_list_destroy(&leaders);
-
- /* Wait for reply (or timeout) */
- while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
- rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
- rd_kafka_poll_cb, NULL) !=
- RD_KAFKA_OP_RES_YIELD)
- ;
-
- rd_kafka_q_destroy(rkq);
-
- if (state.err)
- return state.err;
- else if (state.offidx != 2)
- return RD_KAFKA_RESP_ERR__FAIL;
-
- /* We are not certain about the returned order. */
- if (state.offsets[0] < state.offsets[1]) {
- *low = state.offsets[0];
- *high = state.offsets[1];
- } else {
- *low = state.offsets[1];
- *high = state.offsets[0];
- }
-
- /* If partition is empty only one offset (the last) will be returned. */
- if (*low < 0 && *high >= 0)
- *low = *high;
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_get_watermark_offsets (rd_kafka_t *rk, const char *topic,
- int32_t partition,
- int64_t *low, int64_t *high) {
- shptr_rd_kafka_toppar_t *s_rktp;
- rd_kafka_toppar_t *rktp;
-
- s_rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, 1);
- if (!s_rktp)
- return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
- rktp = rd_kafka_toppar_s2i(s_rktp);
-
- rd_kafka_toppar_lock(rktp);
- *low = rktp->rktp_lo_offset;
- *high = rktp->rktp_hi_offset;
- rd_kafka_toppar_unlock(rktp);
-
- rd_kafka_toppar_destroy(s_rktp);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * @brief get_offsets_for_times() state
- */
-struct _get_offsets_for_times {
- rd_kafka_topic_partition_list_t *results;
- rd_kafka_resp_err_t err;
- int wait_reply;
- int state_version;
- rd_ts_t ts_end;
-};
-
-/**
- * @brief Handle OffsetRequest responses
- */
-static void rd_kafka_get_offsets_for_times_resp_cb (rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- void *opaque) {
- struct _get_offsets_for_times *state = opaque;
-
- err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request,
- state->results);
- if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
- return; /* Retrying */
-
- /* Retry if no broker connection is available yet. */
- if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
- err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
- rkb &&
- rd_kafka_brokers_wait_state_change(
- rkb->rkb_rk, state->state_version,
- rd_timeout_remains(state->ts_end))) {
- /* Retry */
- state->state_version = rd_kafka_brokers_get_state_version(rk);
- request->rkbuf_retries = 0;
- if (rd_kafka_buf_retry(rkb, request))
- return; /* Retry in progress */
- /* FALLTHRU */
- }
-
- if (err && !state->err)
- state->err = err;
-
- state->wait_reply--;
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_offsets_for_times (rd_kafka_t *rk,
- rd_kafka_topic_partition_list_t *offsets,
- int timeout_ms) {
- rd_kafka_q_t *rkq;
- struct _get_offsets_for_times state = RD_ZERO_INIT;
- rd_ts_t ts_end = rd_timeout_init(timeout_ms);
- rd_list_t leaders;
- int i;
- rd_kafka_resp_err_t err;
- struct rd_kafka_partition_leader *leader;
-
- if (offsets->cnt == 0)
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
-
- rd_list_init(&leaders, offsets->cnt,
- (void *)rd_kafka_partition_leader_destroy);
-
- err = rd_kafka_topic_partition_list_query_leaders(rk, offsets, &leaders,
- timeout_ms);
- if (err) {
- rd_list_destroy(&leaders);
- return err;
- }
-
-
- rkq = rd_kafka_q_new(rk);
-
- state.wait_reply = 0;
- state.results = rd_kafka_topic_partition_list_new(offsets->cnt);
-
- /* For each leader send a request for its partitions */
- RD_LIST_FOREACH(leader, &leaders, i) {
- state.wait_reply++;
- rd_kafka_OffsetRequest(leader->rkb, leader->partitions, 1,
- RD_KAFKA_REPLYQ(rkq, 0),
- rd_kafka_get_offsets_for_times_resp_cb,
- &state);
- }
-
- rd_list_destroy(&leaders);
-
- /* Wait for reply (or timeout) */
- while (state.wait_reply > 0 && rd_timeout_remains(ts_end) > 0)
- rd_kafka_q_serve(rkq, rd_timeout_remains(ts_end),
- 0, RD_KAFKA_Q_CB_CALLBACK,
- rd_kafka_poll_cb, NULL);
-
- rd_kafka_q_destroy(rkq);
-
- /* Then update the queried partitions. */
- if (!state.err)
- rd_kafka_topic_partition_list_update(offsets, state.results);
-
- rd_kafka_topic_partition_list_destroy(state.results);
-
- return state.err;
-}
-
-
-/**
- * rd_kafka_poll() (and similar) op callback handler.
- * Will either call registered callback depending on cb_type and op type
- * or return op to application, if applicable (e.g., fetch message).
- *
- * Returns 1 if op was handled, else 0.
- *
- * Locality: application thread
- */
-rd_kafka_op_res_t
-rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
- rd_kafka_q_cb_type_t cb_type, void *opaque) {
- rd_kafka_msg_t *rkm;
-
- /* Return-as-event requested, see if op can be converted to event,
- * otherwise fall through and trigger callbacks. */
- if (cb_type == RD_KAFKA_Q_CB_EVENT && rd_kafka_event_setup(rk, rko))
- return 0; /* Return as event */
-
- switch ((int)rko->rko_type)
- {
- case RD_KAFKA_OP_FETCH:
- if (!rk->rk_conf.consume_cb ||
- cb_type == RD_KAFKA_Q_CB_RETURN ||
- cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
- return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
- else {
- struct consume_ctx ctx = {
- .consume_cb = rk->rk_conf.consume_cb,
- .opaque = rk->rk_conf.opaque };
-
- return rd_kafka_consume_cb(rk, rkq, rko, cb_type, &ctx);
- }
- break;
-
- case RD_KAFKA_OP_REBALANCE:
- /* If EVENT_REBALANCE is enabled but rebalance_cb isnt
- * we need to perform a dummy assign for the application.
- * This might happen during termination with consumer_close() */
- if (rk->rk_conf.rebalance_cb)
- rk->rk_conf.rebalance_cb(
- rk, rko->rko_err,
- rko->rko_u.rebalance.partitions,
- rk->rk_conf.opaque);
- else {
- rd_kafka_dbg(rk, CGRP, "UNASSIGN",
- "Forcing unassign of %d partition(s)",
- rko->rko_u.rebalance.partitions ?
- rko->rko_u.rebalance.partitions->cnt : 0);
- rd_kafka_assign(rk, NULL);
- }
- break;
-
- case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
- if (!rko->rko_u.offset_commit.cb)
- return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
- rko->rko_u.offset_commit.cb(
- rk, rko->rko_err,
- rko->rko_u.offset_commit.partitions,
- rko->rko_u.offset_commit.opaque);
- break;
-
- case RD_KAFKA_OP_CONSUMER_ERR:
- /* rd_kafka_consumer_poll() (_Q_CB_CONSUMER):
- * Consumer errors are returned to the application
- * as rkmessages, not error callbacks.
- *
- * rd_kafka_poll() (_Q_CB_GLOBAL):
- * convert to ERR op (fallthru)
- */
- if (cb_type == RD_KAFKA_Q_CB_RETURN ||
- cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) {
- /* return as message_t to application */
- return RD_KAFKA_OP_RES_PASS;
- }
- /* FALLTHRU */
-
- case RD_KAFKA_OP_ERR:
- if (rk->rk_conf.error_cb)
- rk->rk_conf.error_cb(rk, rko->rko_err,
- rko->rko_u.err.errstr,
- rk->rk_conf.opaque);
- else
- rd_kafka_log(rk, LOG_ERR, "ERROR",
- "%s: %s: %s",
- rk->rk_name,
- rd_kafka_err2str(rko->rko_err),
- rko->rko_u.err.errstr);
- break;
-
- case RD_KAFKA_OP_DR:
- /* Delivery report:
- * call application DR callback for each message. */
- while ((rkm = TAILQ_FIRST(&rko->rko_u.dr.msgq.rkmq_msgs))) {
- rd_kafka_message_t *rkmessage;
-
- TAILQ_REMOVE(&rko->rko_u.dr.msgq.rkmq_msgs,
- rkm, rkm_link);
-
- rkmessage = rd_kafka_message_get_from_rkm(rko, rkm);
-
- if (rk->rk_conf.dr_msg_cb) {
- rk->rk_conf.dr_msg_cb(rk, rkmessage,
- rk->rk_conf.opaque);
-
- } else {
-
- rk->rk_conf.dr_cb(rk,
- rkmessage->payload,
- rkmessage->len,
- rkmessage->err,
- rk->rk_conf.opaque,
- rkmessage->_private);
- }
-
- rd_kafka_msg_destroy(rk, rkm);
-
- if (unlikely(rd_kafka_yield_thread)) {
- /* Callback called yield(),
- * re-enqueue the op (if there are any
- * remaining messages). */
- if (!TAILQ_EMPTY(&rko->rko_u.dr.msgq.
- rkmq_msgs))
- rd_kafka_q_reenq(rkq, rko);
- else
- rd_kafka_op_destroy(rko);
- return RD_KAFKA_OP_RES_YIELD;
- }
- }
-
- rd_kafka_msgq_init(&rko->rko_u.dr.msgq);
-
- break;
-
- case RD_KAFKA_OP_THROTTLE:
- if (rk->rk_conf.throttle_cb)
- rk->rk_conf.throt
<TRUNCATED>