You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/16 21:15:55 UTC
[25/42] nifi-minifi-cpp git commit: MINIFICPP-274: PutKafka Processor
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka.c b/thirdparty/librdkafka-0.11.1/src/rdkafka.c
new file mode 100644
index 0000000..6867a6c
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka.c
@@ -0,0 +1,3392 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2013, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#define _GNU_SOURCE
+#include <errno.h>
+#include <string.h>
+#include <stdarg.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <sys/stat.h>
+
+#include "rdkafka_int.h"
+#include "rdkafka_msg.h"
+#include "rdkafka_broker.h"
+#include "rdkafka_topic.h"
+#include "rdkafka_partition.h"
+#include "rdkafka_offset.h"
+#include "rdkafka_transport.h"
+#include "rdkafka_cgrp.h"
+#include "rdkafka_assignor.h"
+#include "rdkafka_request.h"
+#include "rdkafka_event.h"
+#include "rdkafka_sasl.h"
+#include "rdkafka_interceptor.h"
+
+#include "rdtime.h"
+#include "crc32c.h"
+#include "rdunittest.h"
+
+#ifdef _MSC_VER
+#include <sys/types.h>
+#include <sys/timeb.h>
+#endif
+
+
+
+static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT;
+
+/**
+ * @brief Global counter+lock for all active librdkafka instances
+ */
+mtx_t rd_kafka_global_lock;
+int rd_kafka_global_cnt;
+
+
+/**
+ * Last API error code, per thread.
+ * Shared among all rd_kafka_t instances.
+ */
+rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
+
+
+/**
+ * Current number of threads created by rdkafka.
+ * This is used in regression tests.
+ */
+rd_atomic32_t rd_kafka_thread_cnt_curr;
+int rd_kafka_thread_cnt (void) {
+#if ENABLE_SHAREDPTR_DEBUG
+ rd_shared_ptrs_dump();
+#endif
+
+ return rd_atomic32_get(&rd_kafka_thread_cnt_curr);
+}
+
+/**
+ * Current thread's name (TLS)
+ */
+char RD_TLS rd_kafka_thread_name[64] = "app";
+
+
+
+static void rd_kafka_global_init (void) {
+#if ENABLE_SHAREDPTR_DEBUG
+ LIST_INIT(&rd_shared_ptr_debug_list);
+ mtx_init(&rd_shared_ptr_debug_mtx, mtx_plain);
+ atexit(rd_shared_ptrs_dump);
+#endif
+ mtx_init(&rd_kafka_global_lock, mtx_plain);
+#if ENABLE_DEVEL
+ rd_atomic32_init(&rd_kafka_op_cnt, 0);
+#endif
+ crc32c_global_init();
+}
+
+/**
+ * @returns the current number of active librdkafka instances
+ */
+static int rd_kafka_global_cnt_get (void) {
+ int r;
+ mtx_lock(&rd_kafka_global_lock);
+ r = rd_kafka_global_cnt;
+ mtx_unlock(&rd_kafka_global_lock);
+ return r;
+}
+
+
+/**
+ * @brief Increase counter for active librdkafka instances.
+ * If this is the first instance the global constructors will be called, if any.
+ */
+static void rd_kafka_global_cnt_incr (void) {
+ mtx_lock(&rd_kafka_global_lock);
+ rd_kafka_global_cnt++;
+ if (rd_kafka_global_cnt == 1) {
+ rd_kafka_transport_init();
+#if WITH_SSL
+ rd_kafka_transport_ssl_init();
+#endif
+ rd_kafka_sasl_global_init();
+ }
+ mtx_unlock(&rd_kafka_global_lock);
+}
+
+/**
+ * @brief Decrease counter for active librdkafka instances.
+ * If this counter reaches 0 the global destructors will be called, if any.
+ */
+static void rd_kafka_global_cnt_decr (void) {
+ mtx_lock(&rd_kafka_global_lock);
+ rd_kafka_assert(NULL, rd_kafka_global_cnt > 0);
+ rd_kafka_global_cnt--;
+ if (rd_kafka_global_cnt == 0) {
+ rd_kafka_sasl_global_term();
+#if WITH_SSL
+ rd_kafka_transport_ssl_term();
+#endif
+ }
+ mtx_unlock(&rd_kafka_global_lock);
+}
+
+
+/**
+ * Wait for all rd_kafka_t objects to be destroyed.
+ * Returns 0 if all kafka objects are now destroyed, or -1 if the
+ * timeout was reached.
+ */
+int rd_kafka_wait_destroyed (int timeout_ms) {
+ rd_ts_t timeout = rd_clock() + (timeout_ms * 1000);
+
+ while (rd_kafka_thread_cnt() > 0 ||
+ rd_kafka_global_cnt_get() > 0) {
+ if (rd_clock() >= timeout) {
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
+ ETIMEDOUT);
+#if ENABLE_SHAREDPTR_DEBUG
+ rd_shared_ptrs_dump();
+#endif
+ return -1;
+ }
+ rd_usleep(25000, NULL); /* 25ms */
+ }
+
+ return 0;
+}
+
+static void rd_kafka_log_buf (const rd_kafka_conf_t *conf,
+ const rd_kafka_t *rk, int level, const char *fac,
+ const char *buf) {
+ if (level > conf->log_level)
+ return;
+ else if (rk && conf->log_queue) {
+ rd_kafka_op_t *rko;
+
+ if (!rk->rk_logq)
+ return; /* Terminating */
+
+ rko = rd_kafka_op_new(RD_KAFKA_OP_LOG);
+ rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_MEDIUM);
+ rko->rko_u.log.level = level;
+ strncpy(rko->rko_u.log.fac, fac,
+ sizeof(rko->rko_u.log.fac) - 1);
+ rko->rko_u.log.str = rd_strdup(buf);
+ rd_kafka_q_enq(rk->rk_logq, rko);
+
+ } else if (conf->log_cb) {
+ conf->log_cb(rk, level, fac, buf);
+ }
+}
+
+/**
+ * @brief Logger
+ *
+ * @remark conf must be set, but rk may be NULL
+ */
+void rd_kafka_log0 (const rd_kafka_conf_t *conf,
+ const rd_kafka_t *rk,
+ const char *extra, int level,
+ const char *fac, const char *fmt, ...) {
+ char buf[2048];
+ va_list ap;
+ unsigned int elen = 0;
+ unsigned int of = 0;
+
+ if (level > conf->log_level)
+ return;
+
+ if (conf->log_thread_name) {
+ elen = rd_snprintf(buf, sizeof(buf), "[thrd:%s]: ",
+ rd_kafka_thread_name);
+ if (unlikely(elen >= sizeof(buf)))
+ elen = sizeof(buf);
+ of = elen;
+ }
+
+ if (extra) {
+ elen = rd_snprintf(buf+of, sizeof(buf)-of, "%s: ", extra);
+ if (unlikely(elen >= sizeof(buf)-of))
+ elen = sizeof(buf)-of;
+ of += elen;
+ }
+
+ va_start(ap, fmt);
+ rd_vsnprintf(buf+of, sizeof(buf)-of, fmt, ap);
+ va_end(ap);
+
+ rd_kafka_log_buf(conf, rk, level, fac, buf);
+}
+
+
+
+void rd_kafka_log_print(const rd_kafka_t *rk, int level,
+ const char *fac, const char *buf) {
+ int secs, msecs;
+ struct timeval tv;
+ rd_gettimeofday(&tv, NULL);
+ secs = (int)tv.tv_sec;
+ msecs = (int)(tv.tv_usec / 1000);
+ fprintf(stderr, "%%%i|%u.%03u|%s|%s| %s\n",
+ level, secs, msecs,
+ fac, rk ? rk->rk_name : "", buf);
+}
+
+#ifndef _MSC_VER
+void rd_kafka_log_syslog (const rd_kafka_t *rk, int level,
+ const char *fac, const char *buf) {
+ static int initialized = 0;
+
+ if (!initialized)
+ openlog("rdkafka", LOG_PID|LOG_CONS, LOG_USER);
+
+ syslog(level, "%s: %s: %s", fac, rk ? rk->rk_name : "", buf);
+}
+#endif
+
+void rd_kafka_set_logger (rd_kafka_t *rk,
+ void (*func) (const rd_kafka_t *rk, int level,
+ const char *fac, const char *buf)) {
+ rk->rk_conf.log_cb = func;
+}
+
+void rd_kafka_set_log_level (rd_kafka_t *rk, int level) {
+ rk->rk_conf.log_level = level;
+}
+
+
+
+
+
+
+static const char *rd_kafka_type2str (rd_kafka_type_t type) {
+ static const char *types[] = {
+ [RD_KAFKA_PRODUCER] = "producer",
+ [RD_KAFKA_CONSUMER] = "consumer",
+ };
+ return types[type];
+}
+
+#define _ERR_DESC(ENUM,DESC) \
+ [ENUM - RD_KAFKA_RESP_ERR__BEGIN] = { ENUM, # ENUM + 18/*pfx*/, DESC }
+
+static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
+ _ERR_DESC(RD_KAFKA_RESP_ERR__BEGIN, NULL),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__BAD_MSG,
+ "Local: Bad message format"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__BAD_COMPRESSION,
+ "Local: Invalid compressed data"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__DESTROY,
+ "Local: Broker handle destroyed"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__FAIL,
+ "Local: Communication failure with broker"), //FIXME: too specific
+ _ERR_DESC(RD_KAFKA_RESP_ERR__TRANSPORT,
+ "Local: Broker transport failure"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
+ "Local: Critical system resource failure"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__RESOLVE,
+ "Local: Host resolution failure"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
+ "Local: Message timed out"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__PARTITION_EOF,
+ "Broker: No more messages"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+ "Local: Unknown partition"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__FS,
+ "Local: File or filesystem error"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC,
+ "Local: Unknown topic"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
+ "Local: All broker connections are down"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "Local: Invalid argument or configuration"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT,
+ "Local: Timed out"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__QUEUE_FULL,
+ "Local: Queue full"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__ISR_INSUFF,
+ "Local: ISR count insufficient"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__NODE_UPDATE,
+ "Local: Broker node update"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__SSL,
+ "Local: SSL error"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_COORD,
+ "Local: Waiting for coordinator"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP,
+ "Local: Unknown group"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__IN_PROGRESS,
+ "Local: Operation in progress"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS,
+ "Local: Previous operation in progress"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION,
+ "Local: Existing subscription"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
+ "Local: Assign partitions"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
+ "Local: Revoke partitions"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__CONFLICT,
+ "Local: Conflicting use"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__STATE,
+ "Local: Erroneous state"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL,
+ "Local: Unknown protocol"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED,
+ "Local: Not implemented"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__AUTHENTICATION,
+ "Local: Authentication failure"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__NO_OFFSET,
+ "Local: No offset stored"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__OUTDATED,
+ "Local: Outdated"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,
+ "Local: Timed out in queue"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
+ "Local: Required feature not supported by broker"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_CACHE,
+ "Local: Awaiting cache update"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__INTR,
+ "Local: Operation interrupted"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_SERIALIZATION,
+ "Local: Key serialization error"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION,
+ "Local: Value serialization error"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION,
+ "Local: Key deserialization error"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION,
+ "Local: Value deserialization error"),
+
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN,
+ "Unknown broker error"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_NO_ERROR,
+ "Success"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE,
+ "Broker: Offset out of range"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG,
+ "Broker: Invalid message"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
+ "Broker: Unknown topic or partition"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
+ "Broker: Invalid message size"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE,
+ "Broker: Leader not available"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
+ "Broker: Not leader for partition"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
+ "Broker: Request timed out"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE,
+ "Broker: Broker not available"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE,
+ "Broker: Replica not available"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE,
+ "Broker: Message size too large"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH,
+ "Broker: StaleControllerEpochCode"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
+ "Broker: Offset metadata string too large"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION,
+ "Broker: Broker disconnected before response received"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS,
+ "Broker: Group coordinator load in progress"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,
+ "Broker: Group coordinator not available"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP,
+ "Broker: Not coordinator for group"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION,
+ "Broker: Invalid topic"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE,
+ "Broker: Message batch larger than configured server "
+ "segment size"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS,
+ "Broker: Not enough in-sync replicas"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND,
+ "Broker: Message(s) written to insufficient number of "
+ "in-sync replicas"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS,
+ "Broker: Invalid required acks value"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+ "Broker: Specified group generation id is not valid"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL,
+ "Broker: Inconsistent group protocol"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_GROUP_ID,
+ "Broker: Invalid group.id"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
+ "Broker: Unknown member"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT,
+ "Broker: Invalid session timeout"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
+ "Broker: Group rebalance in progress"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
+ "Broker: Commit offset data size is not valid"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
+ "Broker: Topic authorization failed"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
+ "Broker: Group authorization failed"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED,
+ "Broker: Cluster authorization failed"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP,
+ "Broker: Invalid timestamp"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM,
+ "Broker: Unsupported SASL mechanism"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE,
+ "Broker: Request not valid in current SASL state"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION,
+ "Broker: API version not supported"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS,
+ "Broker: Topic already exists"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PARTITIONS,
+ "Broker: Invalid number of partitions"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR,
+ "Broker: Invalid replication factor"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT,
+ "Broker: Invalid replica assignment"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_CONFIG,
+ "Broker: Configuration is invalid"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_CONTROLLER,
+ "Broker: Not controller for cluster"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUEST,
+ "Broker: Invalid request"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT,
+ "Broker: Message format on broker does not support request"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_POLICY_VIOLATION,
+ "Broker: Isolation policy volation"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
+ "Broker: Broker received an out of order sequence number"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER,
+ "Broker: Broker received a duplicate sequence number"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH,
+ "Broker: Producer attempted an operation with an old epoch"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TXN_STATE,
+ "Broker: Producer attempted a transactional operation in "
+ "an invalid state"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
+ "Broker: Producer attempted to use a producer id which is "
+ "not currently assigned to its transactional id"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
+ "Broker: Transaction timeout is larger than the maximum "
+ "value allowed by the broker's max.transaction.timeout.ms"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
+ "Broker: Producer attempted to update a transaction while "
+ "another concurrent operation on the same transaction was "
+ "ongoing"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED,
+ "Broker: Indicates that the transaction coordinator sending "
+ "a WriteTxnMarker is no longer the current coordinator for "
+ "a given producer"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
+ "Broker: Transactional Id authorization failed"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_SECURITY_DISABLED,
+ "Broker: Security features are disabled"),
+ _ERR_DESC(RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED,
+ "Broker: Operation not attempted"),
+
+ _ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)
+};
+
+
+void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs,
+ size_t *cntp) {
+ *errdescs = rd_kafka_err_descs;
+ *cntp = RD_ARRAYSIZE(rd_kafka_err_descs);
+}
+
+
+const char *rd_kafka_err2str (rd_kafka_resp_err_t err) {
+ static RD_TLS char ret[32];
+ int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
+
+ if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
+ err >= RD_KAFKA_RESP_ERR_END_ALL ||
+ !rd_kafka_err_descs[idx].desc)) {
+ rd_snprintf(ret, sizeof(ret), "Err-%i?", err);
+ return ret;
+ }
+
+ return rd_kafka_err_descs[idx].desc;
+}
+
+
+const char *rd_kafka_err2name (rd_kafka_resp_err_t err) {
+ static RD_TLS char ret[32];
+ int idx = err - RD_KAFKA_RESP_ERR__BEGIN;
+
+ if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
+ err >= RD_KAFKA_RESP_ERR_END_ALL ||
+ !rd_kafka_err_descs[idx].desc)) {
+ rd_snprintf(ret, sizeof(ret), "ERR_%i?", err);
+ return ret;
+ }
+
+ return rd_kafka_err_descs[idx].name;
+}
+
+
+rd_kafka_resp_err_t rd_kafka_last_error (void) {
+ return rd_kafka_last_error_code;
+}
+
+
+rd_kafka_resp_err_t rd_kafka_errno2err (int errnox) {
+ switch (errnox)
+ {
+ case EINVAL:
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+ case EBUSY:
+ return RD_KAFKA_RESP_ERR__CONFLICT;
+
+ case ENOENT:
+ return RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
+
+ case ESRCH:
+ return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
+
+ case ETIMEDOUT:
+ return RD_KAFKA_RESP_ERR__TIMED_OUT;
+
+ case EMSGSIZE:
+ return RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
+
+ case ENOBUFS:
+ return RD_KAFKA_RESP_ERR__QUEUE_FULL;
+
+ default:
+ return RD_KAFKA_RESP_ERR__FAIL;
+ }
+}
+
+
+
+/**
+ * @brief Final destructor for rd_kafka_t, must only be called with refcnt 0.
+ *
+ * @locality application thread
+ */
+void rd_kafka_destroy_final (rd_kafka_t *rk) {
+
+ rd_kafka_assert(rk, rd_atomic32_get(&rk->rk_terminate) != 0);
+
+ /* Synchronize state */
+ rd_kafka_wrlock(rk);
+ rd_kafka_wrunlock(rk);
+
+ rd_kafka_assignors_term(rk);
+
+ rd_kafka_metadata_cache_destroy(rk);
+
+ rd_kafka_timers_destroy(&rk->rk_timers);
+
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying op queues");
+
+ /* Destroy cgrp */
+ if (rk->rk_cgrp) {
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Destroying cgrp");
+ /* Reset queue forwarding (rep -> cgrp) */
+ rd_kafka_q_fwd_set(rk->rk_rep, NULL);
+ rd_kafka_cgrp_destroy_final(rk->rk_cgrp);
+ }
+
+ /* Purge op-queues */
+ rd_kafka_q_destroy(rk->rk_rep);
+ rd_kafka_q_destroy(rk->rk_ops);
+
+#if WITH_SSL
+ if (rk->rk_conf.ssl.ctx) {
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying SSL CTX");
+ rd_kafka_transport_ssl_ctx_term(rk);
+ }
+#endif
+
+ /* It is not safe to log after this point. */
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Termination done: freeing resources");
+
+ if (rk->rk_logq) {
+ rd_kafka_q_destroy(rk->rk_logq);
+ rk->rk_logq = NULL;
+ }
+
+ if (rk->rk_type == RD_KAFKA_PRODUCER) {
+ cnd_destroy(&rk->rk_curr_msgs.cnd);
+ mtx_destroy(&rk->rk_curr_msgs.lock);
+ }
+
+ cnd_destroy(&rk->rk_broker_state_change_cnd);
+ mtx_destroy(&rk->rk_broker_state_change_lock);
+
+ if (rk->rk_full_metadata)
+ rd_kafka_metadata_destroy(rk->rk_full_metadata);
+ rd_kafkap_str_destroy(rk->rk_client_id);
+ rd_kafkap_str_destroy(rk->rk_group_id);
+ rd_kafkap_str_destroy(rk->rk_eos.TransactionalId);
+ rd_kafka_anyconf_destroy(_RK_GLOBAL, &rk->rk_conf);
+ rd_list_destroy(&rk->rk_broker_by_id);
+
+ rd_kafkap_bytes_destroy((rd_kafkap_bytes_t *)rk->rk_null_bytes);
+ rwlock_destroy(&rk->rk_lock);
+
+ rd_free(rk);
+ rd_kafka_global_cnt_decr();
+}
+
+
+static void rd_kafka_destroy_app (rd_kafka_t *rk, int blocking) {
+ thrd_t thrd;
+#ifndef _MSC_VER
+ int term_sig = rk->rk_conf.term_sig;
+#endif
+ rd_kafka_dbg(rk, ALL, "DESTROY", "Terminating instance");
+
+ /* The legacy/simple consumer lacks an API to close down the consumer*/
+ if (rk->rk_cgrp) {
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Closing consumer group");
+ rd_kafka_consumer_close(rk);
+ }
+
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Interrupting timers");
+ rd_kafka_wrlock(rk);
+ thrd = rk->rk_thread;
+ rd_atomic32_add(&rk->rk_terminate, 1);
+ rd_kafka_timers_interrupt(&rk->rk_timers);
+ rd_kafka_wrunlock(rk);
+
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Sending TERMINATE to main background thread");
+ /* Send op to trigger queue/io wake-up.
+ * The op itself is (likely) ignored by the receiver. */
+ rd_kafka_q_enq(rk->rk_ops, rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
+
+ rd_kafka_brokers_broadcast_state_change(rk);
+
+#ifndef _MSC_VER
+ /* Interrupt main kafka thread to speed up termination. */
+ if (term_sig) {
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Sending thread kill signal %d", term_sig);
+ pthread_kill(thrd, term_sig);
+ }
+#endif
+
+ if (!blocking)
+ return; /* FIXME: thread resource leak */
+
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Joining main background thread");
+
+ if (thrd_join(thrd, NULL) != thrd_success)
+ rd_kafka_assert(NULL, !*"failed to join main thread");
+
+ rd_kafka_destroy_final(rk);
+}
+
+
+/* NOTE: Must only be called by application.
+ * librdkafka itself must use rd_kafka_destroy0(). */
+void rd_kafka_destroy (rd_kafka_t *rk) {
+ rd_kafka_destroy_app(rk, 1);
+}
+
+
+/**
+ * Main destructor for rd_kafka_t
+ *
+ * Locality: rdkafka main thread or application thread during rd_kafka_new()
+ */
+static void rd_kafka_destroy_internal (rd_kafka_t *rk) {
+ rd_kafka_itopic_t *rkt, *rkt_tmp;
+ rd_kafka_broker_t *rkb, *rkb_tmp;
+ rd_list_t wait_thrds;
+ thrd_t *thrd;
+ int i;
+
+ rd_kafka_dbg(rk, ALL, "DESTROY", "Destroy internal");
+
+ /* Call on_destroy() interceptors */
+ rd_kafka_interceptors_on_destroy(rk);
+
+ /* Brokers pick up on rk_terminate automatically. */
+
+ /* List of (broker) threads to join to synchronize termination */
+ rd_list_init(&wait_thrds, rd_atomic32_get(&rk->rk_broker_cnt), NULL);
+
+ rd_kafka_wrlock(rk);
+
+ rd_kafka_dbg(rk, ALL, "DESTROY", "Removing all topics");
+ /* Decommission all topics */
+ TAILQ_FOREACH_SAFE(rkt, &rk->rk_topics, rkt_link, rkt_tmp) {
+ rd_kafka_wrunlock(rk);
+ rd_kafka_topic_partitions_remove(rkt);
+ rd_kafka_wrlock(rk);
+ }
+
+ /* Decommission brokers.
+ * Broker thread holds a refcount and detects when broker refcounts
+ * reaches 1 and then decommissions itself. */
+ TAILQ_FOREACH_SAFE(rkb, &rk->rk_brokers, rkb_link, rkb_tmp) {
+ /* Add broker's thread to wait_thrds list for later joining */
+ thrd = malloc(sizeof(*thrd));
+ *thrd = rkb->rkb_thread;
+ rd_list_add(&wait_thrds, thrd);
+ rd_kafka_wrunlock(rk);
+
+ /* Send op to trigger queue/io wake-up.
+ * The op itself is (likely) ignored by the broker thread. */
+ rd_kafka_q_enq(rkb->rkb_ops,
+ rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
+
+#ifndef _MSC_VER
+ /* Interrupt IO threads to speed up termination. */
+ if (rk->rk_conf.term_sig)
+ pthread_kill(rkb->rkb_thread, rk->rk_conf.term_sig);
+#endif
+
+ rd_kafka_broker_destroy(rkb);
+
+ rd_kafka_wrlock(rk);
+ }
+
+ if (rk->rk_clusterid) {
+ rd_free(rk->rk_clusterid);
+ rk->rk_clusterid = NULL;
+ }
+
+ rd_kafka_wrunlock(rk);
+
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Purging reply queue");
+
+ /* Purge op-queue */
+ rd_kafka_q_disable(rk->rk_rep);
+ rd_kafka_q_purge(rk->rk_rep);
+
+ /* Loose our special reference to the internal broker. */
+ mtx_lock(&rk->rk_internal_rkb_lock);
+ if ((rkb = rk->rk_internal_rkb)) {
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Decommissioning internal broker");
+
+ /* Send op to trigger queue wake-up. */
+ rd_kafka_q_enq(rkb->rkb_ops,
+ rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));
+
+ rk->rk_internal_rkb = NULL;
+ thrd = malloc(sizeof(*thrd));
+ *thrd = rkb->rkb_thread;
+ rd_list_add(&wait_thrds, thrd);
+ }
+ mtx_unlock(&rk->rk_internal_rkb_lock);
+ if (rkb)
+ rd_kafka_broker_destroy(rkb);
+
+
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Join %d broker thread(s)", rd_list_cnt(&wait_thrds));
+
+ /* Join broker threads */
+ RD_LIST_FOREACH(thrd, &wait_thrds, i) {
+ if (thrd_join(*thrd, NULL) != thrd_success)
+ ;
+ free(thrd);
+ }
+
+ rd_list_destroy(&wait_thrds);
+}
+
+
+/* Stats buffer printf */
+#define _st_printf(...) do { \
+ ssize_t r; \
+ ssize_t rem = size-of; \
+ r = rd_snprintf(buf+of, rem, __VA_ARGS__); \
+ if (r >= rem) { \
+ size *= 2; \
+ rem = size-of; \
+ buf = rd_realloc(buf, size); \
+ r = rd_snprintf(buf+of, rem, __VA_ARGS__); \
+ } \
+ of += r; \
+ } while (0)
+
+/**
+ * Emit stats for toppar
+ */
+static RD_INLINE void rd_kafka_stats_emit_toppar (char **bufp, size_t *sizep,
+ size_t *ofp,
+ rd_kafka_toppar_t *rktp,
+ int first) {
+ char *buf = *bufp;
+ size_t size = *sizep;
+ size_t of = *ofp;
+ int64_t consumer_lag = -1;
+ struct offset_stats offs;
+ int32_t leader_nodeid = -1;
+
+ rd_kafka_toppar_lock(rktp);
+
+ if (rktp->rktp_leader) {
+ rd_kafka_broker_lock(rktp->rktp_leader);
+ leader_nodeid = rktp->rktp_leader->rkb_nodeid;
+ rd_kafka_broker_unlock(rktp->rktp_leader);
+ }
+
+ /* Grab a copy of the latest finalized offset stats */
+ offs = rktp->rktp_offsets_fin;
+
+ if (rktp->rktp_hi_offset != RD_KAFKA_OFFSET_INVALID &&
+ rktp->rktp_app_offset >= 0) {
+ if (unlikely(rktp->rktp_app_offset > rktp->rktp_hi_offset))
+ consumer_lag = 0;
+ else
+ consumer_lag = rktp->rktp_hi_offset -
+ rktp->rktp_app_offset;
+ }
+
+ _st_printf("%s\"%"PRId32"\": { "
+ "\"partition\":%"PRId32", "
+ "\"leader\":%"PRId32", "
+ "\"desired\":%s, "
+ "\"unknown\":%s, "
+ "\"msgq_cnt\":%i, "
+ "\"msgq_bytes\":%"PRIu64", "
+ "\"xmit_msgq_cnt\":%i, "
+ "\"xmit_msgq_bytes\":%"PRIu64", "
+ "\"fetchq_cnt\":%i, "
+ "\"fetchq_size\":%"PRIu64", "
+ "\"fetch_state\":\"%s\", "
+ "\"query_offset\":%"PRId64", "
+ "\"next_offset\":%"PRId64", "
+ "\"app_offset\":%"PRId64", "
+ "\"stored_offset\":%"PRId64", "
+ "\"commited_offset\":%"PRId64", " /*FIXME: issue #80 */
+ "\"committed_offset\":%"PRId64", "
+ "\"eof_offset\":%"PRId64", "
+ "\"lo_offset\":%"PRId64", "
+ "\"hi_offset\":%"PRId64", "
+ "\"consumer_lag\":%"PRId64", "
+ "\"txmsgs\":%"PRIu64", "
+ "\"txbytes\":%"PRIu64", "
+ "\"msgs\": %"PRIu64", "
+ "\"rx_ver_drops\": %"PRIu64" "
+ "} ",
+ first ? "" : ", ",
+ rktp->rktp_partition,
+ rktp->rktp_partition,
+ leader_nodeid,
+ (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_DESIRED)?"true":"false",
+ (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_UNKNOWN)?"true":"false",
+ rd_atomic32_get(&rktp->rktp_msgq.rkmq_msg_cnt),
+ rd_atomic64_get(&rktp->rktp_msgq.rkmq_msg_bytes),
+ rd_atomic32_get(&rktp->rktp_xmit_msgq.rkmq_msg_cnt),
+ rd_atomic64_get(&rktp->rktp_xmit_msgq.rkmq_msg_bytes),
+ rd_kafka_q_len(rktp->rktp_fetchq),
+ rd_kafka_q_size(rktp->rktp_fetchq),
+ rd_kafka_fetch_states[rktp->rktp_fetch_state],
+ rktp->rktp_query_offset,
+ offs.fetch_offset,
+ rktp->rktp_app_offset,
+ rktp->rktp_stored_offset,
+ rktp->rktp_committed_offset, /* FIXME: issue #80 */
+ rktp->rktp_committed_offset,
+ offs.eof_offset,
+ rktp->rktp_lo_offset,
+ rktp->rktp_hi_offset,
+ consumer_lag,
+ rd_atomic64_get(&rktp->rktp_c.tx_msgs),
+ rd_atomic64_get(&rktp->rktp_c.tx_bytes),
+ rd_atomic64_get(&rktp->rktp_c.msgs),
+ rd_atomic64_get(&rktp->rktp_c.rx_ver_drops));
+
+ rd_kafka_toppar_unlock(rktp);
+
+ *bufp = buf;
+ *sizep = size;
+ *ofp = of;
+}
+
+/**
+ * Emit all statistics
+ */
+static void rd_kafka_stats_emit_all (rd_kafka_t *rk) {
+ char *buf;
+ size_t size = 1024*10;
+ size_t of = 0;
+ rd_kafka_broker_t *rkb;
+ rd_kafka_itopic_t *rkt;
+ shptr_rd_kafka_toppar_t *s_rktp;
+ rd_ts_t now;
+ rd_kafka_op_t *rko;
+ unsigned int tot_cnt;
+ size_t tot_size;
+
+ buf = rd_malloc(size);
+
+
+ rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
+ rd_kafka_rdlock(rk);
+
+ now = rd_clock();
+ _st_printf("{ "
+ "\"name\": \"%s\", "
+ "\"type\": \"%s\", "
+ "\"ts\":%"PRId64", "
+ "\"time\":%lli, "
+ "\"replyq\":%i, "
+ "\"msg_cnt\":%u, "
+ "\"msg_size\":%"PRIusz", "
+ "\"msg_max\":%u, "
+ "\"msg_size_max\":%"PRIusz", "
+ "\"simple_cnt\":%i, "
+ "\"metadata_cache_cnt\":%i, "
+ "\"brokers\":{ "/*open brokers*/,
+ rk->rk_name,
+ rd_kafka_type2str(rk->rk_type),
+ now,
+ (signed long long)time(NULL),
+ rd_kafka_q_len(rk->rk_rep),
+ tot_cnt, tot_size,
+ rk->rk_curr_msgs.max_cnt, rk->rk_curr_msgs.max_size,
+ rd_atomic32_get(&rk->rk_simple_cnt),
+ rk->rk_metadata_cache.rkmc_cnt);
+
+
+ TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
+ rd_avg_t rtt, throttle, int_latency;
+ rd_kafka_toppar_t *rktp;
+
+ rd_kafka_broker_lock(rkb);
+ rd_avg_rollover(&int_latency, &rkb->rkb_avg_int_latency);
+ rd_avg_rollover(&rtt, &rkb->rkb_avg_rtt);
+ rd_avg_rollover(&throttle, &rkb->rkb_avg_throttle);
+ _st_printf("%s\"%s\": { "/*open broker*/
+ "\"name\":\"%s\", "
+ "\"nodeid\":%"PRId32", "
+ "\"state\":\"%s\", "
+ "\"stateage\":%"PRId64", "
+ "\"outbuf_cnt\":%i, "
+ "\"outbuf_msg_cnt\":%i, "
+ "\"waitresp_cnt\":%i, "
+ "\"waitresp_msg_cnt\":%i, "
+ "\"tx\":%"PRIu64", "
+ "\"txbytes\":%"PRIu64", "
+ "\"txerrs\":%"PRIu64", "
+ "\"txretries\":%"PRIu64", "
+ "\"req_timeouts\":%"PRIu64", "
+ "\"rx\":%"PRIu64", "
+ "\"rxbytes\":%"PRIu64", "
+ "\"rxerrs\":%"PRIu64", "
+ "\"rxcorriderrs\":%"PRIu64", "
+ "\"rxpartial\":%"PRIu64", "
+ "\"zbuf_grow\":%"PRIu64", "
+ "\"buf_grow\":%"PRIu64", "
+ "\"wakeups\":%"PRIu64", "
+ "\"int_latency\": {"
+ " \"min\":%"PRId64","
+ " \"max\":%"PRId64","
+ " \"avg\":%"PRId64","
+ " \"sum\":%"PRId64","
+ " \"cnt\":%i "
+ "}, "
+ "\"rtt\": {"
+ " \"min\":%"PRId64","
+ " \"max\":%"PRId64","
+ " \"avg\":%"PRId64","
+ " \"sum\":%"PRId64","
+ " \"cnt\":%i "
+ "}, "
+ "\"throttle\": {"
+ " \"min\":%"PRId64","
+ " \"max\":%"PRId64","
+ " \"avg\":%"PRId64","
+ " \"sum\":%"PRId64","
+ " \"cnt\":%i "
+ "}, "
+ "\"toppars\":{ "/*open toppars*/,
+ rkb == TAILQ_FIRST(&rk->rk_brokers) ? "" : ", ",
+ rkb->rkb_name,
+ rkb->rkb_name,
+ rkb->rkb_nodeid,
+ rd_kafka_broker_state_names[rkb->rkb_state],
+ rkb->rkb_ts_state ? now - rkb->rkb_ts_state : 0,
+ rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
+ rd_atomic32_get(&rkb->rkb_outbufs.rkbq_msg_cnt),
+ rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt),
+ rd_atomic32_get(&rkb->rkb_waitresps.rkbq_msg_cnt),
+ rd_atomic64_get(&rkb->rkb_c.tx),
+ rd_atomic64_get(&rkb->rkb_c.tx_bytes),
+ rd_atomic64_get(&rkb->rkb_c.tx_err),
+ rd_atomic64_get(&rkb->rkb_c.tx_retries),
+ rd_atomic64_get(&rkb->rkb_c.req_timeouts),
+ rd_atomic64_get(&rkb->rkb_c.rx),
+ rd_atomic64_get(&rkb->rkb_c.rx_bytes),
+ rd_atomic64_get(&rkb->rkb_c.rx_err),
+ rd_atomic64_get(&rkb->rkb_c.rx_corrid_err),
+ rd_atomic64_get(&rkb->rkb_c.rx_partial),
+ rd_atomic64_get(&rkb->rkb_c.zbuf_grow),
+ rd_atomic64_get(&rkb->rkb_c.buf_grow),
+ rd_atomic64_get(&rkb->rkb_c.wakeups),
+ int_latency.ra_v.minv,
+ int_latency.ra_v.maxv,
+ int_latency.ra_v.avg,
+ int_latency.ra_v.sum,
+ int_latency.ra_v.cnt,
+ rtt.ra_v.minv,
+ rtt.ra_v.maxv,
+ rtt.ra_v.avg,
+ rtt.ra_v.sum,
+ rtt.ra_v.cnt,
+ throttle.ra_v.minv,
+ throttle.ra_v.maxv,
+ throttle.ra_v.avg,
+ throttle.ra_v.sum,
+ throttle.ra_v.cnt);
+
+ TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
+ _st_printf("%s\"%.*s-%"PRId32"\": { "
+ "\"topic\":\"%.*s\", "
+ "\"partition\":%"PRId32"} ",
+ rktp==TAILQ_FIRST(&rkb->rkb_toppars)?"":", ",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition);
+ }
+
+ rd_kafka_broker_unlock(rkb);
+
+ _st_printf("} "/*close toppars*/
+ "} "/*close broker*/);
+ }
+
+
+ _st_printf("}, " /* close "brokers" array */
+ "\"topics\":{ ");
+
+ TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
+ int i, j;
+
+ rd_kafka_topic_rdlock(rkt);
+ _st_printf("%s\"%.*s\": { "
+ "\"topic\":\"%.*s\", "
+ "\"metadata_age\":%"PRId64", "
+ "\"partitions\":{ " /*open partitions*/,
+ rkt==TAILQ_FIRST(&rk->rk_topics)?"":", ",
+ RD_KAFKAP_STR_PR(rkt->rkt_topic),
+ RD_KAFKAP_STR_PR(rkt->rkt_topic),
+ rkt->rkt_ts_metadata ?
+ (rd_clock() - rkt->rkt_ts_metadata)/1000 : 0);
+
+ for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
+ rd_kafka_stats_emit_toppar(&buf, &size, &of,
+ rd_kafka_toppar_s2i(rkt->rkt_p[i]),
+ i == 0);
+
+ RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, j)
+ rd_kafka_stats_emit_toppar(&buf, &size, &of,
+ rd_kafka_toppar_s2i(s_rktp),
+ i+j == 0);
+
+ i += j;
+
+ if (rkt->rkt_ua)
+ rd_kafka_stats_emit_toppar(&buf, &size, &of,
+ rd_kafka_toppar_s2i(rkt->rkt_ua),
+ i++ == 0);
+ rd_kafka_topic_rdunlock(rkt);
+
+ _st_printf("} "/*close partitions*/
+ "} "/*close topic*/);
+
+ }
+ _st_printf("} "/*close topics*/);
+
+ if (rk->rk_cgrp) {
+ rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+ _st_printf(", \"cgrp\": { "
+ "\"rebalance_age\": %"PRId64", "
+ "\"rebalance_cnt\": %d, "
+ "\"assignment_size\": %d }",
+ rkcg->rkcg_c.ts_rebalance ?
+ (rd_clock() - rkcg->rkcg_c.ts_rebalance)/1000 : 0,
+ rkcg->rkcg_c.rebalance_cnt,
+ rkcg->rkcg_c.assignment_size);
+ }
+ rd_kafka_rdunlock(rk);
+
+ _st_printf("}"/*close object*/);
+
+
+ /* Enqueue op for application */
+ rko = rd_kafka_op_new(RD_KAFKA_OP_STATS);
+ rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
+ rko->rko_u.stats.json = buf;
+ rko->rko_u.stats.json_len = of;
+ rd_kafka_q_enq(rk->rk_rep, rko);
+}
+
+
+
+static void rd_kafka_topic_scan_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
+ rd_kafka_t *rk = rkts->rkts_rk;
+ rd_kafka_topic_scan_all(rk, rd_clock());
+}
+
+static void rd_kafka_stats_emit_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
+ rd_kafka_t *rk = rkts->rkts_rk;
+ rd_kafka_stats_emit_all(rk);
+}
+
+
+/**
+ * @brief Periodic metadata refresh callback
+ *
+ * @locality rdkafka main thread
+ */
+static void rd_kafka_metadata_refresh_cb (rd_kafka_timers_t *rkts, void *arg) {
+ rd_kafka_t *rk = rkts->rkts_rk;
+ int sparse = 1;
+
+ /* Dont do sparse requests if there is a consumer group with an
+ * active subscription since subscriptions need to be able to match
+ * on all topics. */
+ if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp &&
+ rk->rk_cgrp->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
+ sparse = 0;
+
+ if (sparse)
+ rd_kafka_metadata_refresh_known_topics(rk, NULL, 1/*force*/,
+ "periodic refresh");
+ else
+ rd_kafka_metadata_refresh_all(rk, NULL, "periodic refresh");
+}
+
+
+/**
+ * Main loop for Kafka handler thread.
+ */
+static int rd_kafka_thread_main (void *arg) {
+ rd_kafka_t *rk = arg;
+ rd_kafka_timer_t tmr_topic_scan = RD_ZERO_INIT;
+ rd_kafka_timer_t tmr_stats_emit = RD_ZERO_INIT;
+ rd_kafka_timer_t tmr_metadata_refresh = RD_ZERO_INIT;
+
+ rd_snprintf(rd_kafka_thread_name, sizeof(rd_kafka_thread_name), "main");
+
+ (void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);
+
+ /* Acquire lock (which was held by thread creator during creation)
+ * to synchronise state. */
+ rd_kafka_wrlock(rk);
+ rd_kafka_wrunlock(rk);
+
+ rd_kafka_timer_start(&rk->rk_timers, &tmr_topic_scan, 1000000,
+ rd_kafka_topic_scan_tmr_cb, NULL);
+ rd_kafka_timer_start(&rk->rk_timers, &tmr_stats_emit,
+ rk->rk_conf.stats_interval_ms * 1000ll,
+ rd_kafka_stats_emit_tmr_cb, NULL);
+ if (rk->rk_conf.metadata_refresh_interval_ms > 0)
+ rd_kafka_timer_start(&rk->rk_timers, &tmr_metadata_refresh,
+ rk->rk_conf.metadata_refresh_interval_ms *
+ 1000ll,
+ rd_kafka_metadata_refresh_cb, NULL);
+
+ if (rk->rk_cgrp) {
+ rd_kafka_cgrp_reassign_broker(rk->rk_cgrp);
+ rd_kafka_q_fwd_set(rk->rk_cgrp->rkcg_ops, rk->rk_ops);
+ }
+
+ while (likely(!rd_kafka_terminating(rk) ||
+ rd_kafka_q_len(rk->rk_ops))) {
+ rd_ts_t sleeptime = rd_kafka_timers_next(
+ &rk->rk_timers, 1000*1000/*1s*/, 1/*lock*/);
+ rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0,
+ RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
+ if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
+ rd_kafka_cgrp_serve(rk->rk_cgrp);
+ rd_kafka_timers_run(&rk->rk_timers, RD_POLL_NOWAIT);
+ }
+
+ rd_kafka_q_disable(rk->rk_ops);
+ rd_kafka_q_purge(rk->rk_ops);
+
+ rd_kafka_timer_stop(&rk->rk_timers, &tmr_topic_scan, 1);
+ rd_kafka_timer_stop(&rk->rk_timers, &tmr_stats_emit, 1);
+ rd_kafka_timer_stop(&rk->rk_timers, &tmr_metadata_refresh, 1);
+
+ /* Synchronise state */
+ rd_kafka_wrlock(rk);
+ rd_kafka_wrunlock(rk);
+
+ rd_kafka_destroy_internal(rk);
+
+ rd_kafka_dbg(rk, GENERIC, "TERMINATE",
+ "Main background thread exiting");
+
+ rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1);
+
+ return 0;
+}
+
+
+static void rd_kafka_term_sig_handler (int sig) {
+ /* nop */
+}
+
+
+rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
+ char *errstr, size_t errstr_size) {
+ rd_kafka_t *rk;
+ static rd_atomic32_t rkid;
+ rd_kafka_conf_t *conf;
+ rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ int ret_errno = 0;
+#ifndef _MSC_VER
+ sigset_t newset, oldset;
+#endif
+
+ call_once(&rd_kafka_global_init_once, rd_kafka_global_init);
+
+ /* rd_kafka_new() takes ownership of the provided \p app_conf
+ * object if rd_kafka_new() succeeds.
+ * Since \p app_conf is optional we allocate a default configuration
+ * object here if \p app_conf is NULL.
+ * The configuration object itself is struct-copied later
+ * leaving the default *conf pointer to be ready for freeing.
+ * In case new() fails and app_conf was specified we will clear out
+ * rk_conf to avoid double-freeing from destroy_internal() and the
+ * user's eventual call to rd_kafka_conf_destroy().
+ * This is all a bit tricky but that's the nature of
+ * legacy interfaces. */
+ if (!app_conf)
+ conf = rd_kafka_conf_new();
+ else
+ conf = app_conf;
+
+ /* Verify mandatory configuration */
+ if (!conf->socket_cb) {
+ rd_snprintf(errstr, errstr_size,
+ "Mandatory config property 'socket_cb' not set");
+ if (!app_conf)
+ rd_kafka_conf_destroy(conf);
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
+ return NULL;
+ }
+
+ if (!conf->open_cb) {
+ rd_snprintf(errstr, errstr_size,
+ "Mandatory config property 'open_cb' not set");
+ if (!app_conf)
+ rd_kafka_conf_destroy(conf);
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
+ return NULL;
+ }
+
+ if (conf->metadata_max_age_ms == -1) {
+ if (conf->metadata_refresh_interval_ms > 0)
+ conf->metadata_max_age_ms =
+ conf->metadata_refresh_interval_ms * 3;
+ else /* use default value of refresh * 3 */
+ conf->metadata_max_age_ms = 5*60*1000 * 3;
+ }
+
+ rd_kafka_global_cnt_incr();
+
+ /*
+ * Set up the handle.
+ */
+ rk = rd_calloc(1, sizeof(*rk));
+
+ rk->rk_type = type;
+
+ /* Struct-copy the config object. */
+ rk->rk_conf = *conf;
+ if (!app_conf)
+ rd_free(conf); /* Free the base config struct only,
+ * not its fields since they were copied to
+ * rk_conf just above. Those fields are
+ * freed from rd_kafka_destroy_internal()
+ * as the rk itself is destroyed. */
+
+ /* Call on_new() interceptors */
+ rd_kafka_interceptors_on_new(rk, &rk->rk_conf);
+
+ rwlock_init(&rk->rk_lock);
+ mtx_init(&rk->rk_internal_rkb_lock, mtx_plain);
+
+ cnd_init(&rk->rk_broker_state_change_cnd);
+ mtx_init(&rk->rk_broker_state_change_lock, mtx_plain);
+
+ rk->rk_rep = rd_kafka_q_new(rk);
+ rk->rk_ops = rd_kafka_q_new(rk);
+ rk->rk_ops->rkq_serve = rd_kafka_poll_cb;
+ rk->rk_ops->rkq_opaque = rk;
+
+ if (rk->rk_conf.log_queue) {
+ rk->rk_logq = rd_kafka_q_new(rk);
+ rk->rk_logq->rkq_serve = rd_kafka_poll_cb;
+ rk->rk_logq->rkq_opaque = rk;
+ }
+
+ TAILQ_INIT(&rk->rk_brokers);
+ TAILQ_INIT(&rk->rk_topics);
+ rd_kafka_timers_init(&rk->rk_timers, rk);
+ rd_kafka_metadata_cache_init(rk);
+
+ if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb)
+ rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR;
+ if (rk->rk_conf.rebalance_cb)
+ rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE;
+ if (rk->rk_conf.offset_commit_cb)
+ rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT;
+
+ /* Convenience Kafka protocol null bytes */
+ rk->rk_null_bytes = rd_kafkap_bytes_new(NULL, 0);
+
+ if (rk->rk_conf.debug)
+ rk->rk_conf.log_level = LOG_DEBUG;
+
+ rd_snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i",
+ rk->rk_conf.client_id_str, rd_kafka_type2str(rk->rk_type),
+ rd_atomic32_add(&rkid, 1));
+
+ /* Construct clientid kafka string */
+ rk->rk_client_id = rd_kafkap_str_new(rk->rk_conf.client_id_str,-1);
+
+ /* Convert group.id to kafka string (may be NULL) */
+ rk->rk_group_id = rd_kafkap_str_new(rk->rk_conf.group_id_str,-1);
+
+ /* Config fixups */
+ rk->rk_conf.queued_max_msg_bytes =
+ (int64_t)rk->rk_conf.queued_max_msg_kbytes * 1000ll;
+
+ /* Enable api.version.request=true if fallback.broker.version
+ * indicates a supporting broker. */
+ if (rd_kafka_ApiVersion_is_queryable(rk->rk_conf.broker_version_fallback))
+ rk->rk_conf.api_version_request = 1;
+
+ if (rk->rk_type == RD_KAFKA_PRODUCER) {
+ mtx_init(&rk->rk_curr_msgs.lock, mtx_plain);
+ cnd_init(&rk->rk_curr_msgs.cnd);
+ rk->rk_curr_msgs.max_cnt =
+ rk->rk_conf.queue_buffering_max_msgs;
+ if ((unsigned long long)rk->rk_conf.queue_buffering_max_kbytes * 1024 >
+ (unsigned long long)SIZE_MAX)
+ rk->rk_curr_msgs.max_size = SIZE_MAX;
+ else
+ rk->rk_curr_msgs.max_size =
+ (size_t)rk->rk_conf.queue_buffering_max_kbytes * 1024;
+ }
+
+ if (rd_kafka_assignors_init(rk, errstr, errstr_size) == -1) {
+ ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
+ ret_errno = EINVAL;
+ goto fail;
+ }
+
+ if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
+ rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT) {
+ if (rd_kafka_sasl_select_provider(rk,
+ errstr, errstr_size) == -1) {
+ ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
+ ret_errno = EINVAL;
+ goto fail;
+ }
+ }
+
+#if WITH_SSL
+ if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SSL ||
+ rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL) {
+ /* Create SSL context */
+ if (rd_kafka_transport_ssl_ctx_init(rk, errstr,
+ errstr_size) == -1) {
+ ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
+ ret_errno = EINVAL;
+ goto fail;
+ }
+ }
+#endif
+
+ /* Client group, eligible both in consumer and producer mode. */
+ if (type == RD_KAFKA_CONSUMER &&
+ RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0)
+ rk->rk_cgrp = rd_kafka_cgrp_new(rk,
+ rk->rk_group_id,
+ rk->rk_client_id);
+
+
+
+#ifndef _MSC_VER
+ /* Block all signals in newly created thread.
+ * To avoid race condition we block all signals in the calling
+ * thread, which the new thread will inherit its sigmask from,
+ * and then restore the original sigmask of the calling thread when
+ * we're done creating the thread. */
+ sigemptyset(&oldset);
+ sigfillset(&newset);
+ if (rk->rk_conf.term_sig) {
+ struct sigaction sa_term = {
+ .sa_handler = rd_kafka_term_sig_handler
+ };
+ sigaction(rk->rk_conf.term_sig, &sa_term, NULL);
+ }
+ pthread_sigmask(SIG_SETMASK, &newset, &oldset);
+#endif
+
+ /* Lock handle here to synchronise state, i.e., hold off
+ * the thread until we've finalized the handle. */
+ rd_kafka_wrlock(rk);
+
+ /* Create handler thread */
+ if ((thrd_create(&rk->rk_thread,
+ rd_kafka_thread_main, rk)) != thrd_success) {
+ ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
+ ret_errno = errno;
+ if (errstr)
+ rd_snprintf(errstr, errstr_size,
+ "Failed to create thread: %s (%i)",
+ rd_strerror(errno), errno);
+ rd_kafka_wrunlock(rk);
+#ifndef _MSC_VER
+ /* Restore sigmask of caller */
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+#endif
+ goto fail;
+ }
+
+ rd_kafka_wrunlock(rk);
+
+ rk->rk_eos.PID = -1;
+ rk->rk_eos.TransactionalId = rd_kafkap_str_new(NULL, 0);
+
+ mtx_lock(&rk->rk_internal_rkb_lock);
+ rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL,
+ RD_KAFKA_PROTO_PLAINTEXT,
+ "", 0, RD_KAFKA_NODEID_UA);
+ mtx_unlock(&rk->rk_internal_rkb_lock);
+
+ /* Add initial list of brokers from configuration */
+ if (rk->rk_conf.brokerlist) {
+ if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0)
+ rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
+ "No brokers configured");
+ }
+
+#ifndef _MSC_VER
+ /* Restore sigmask of caller */
+ pthread_sigmask(SIG_SETMASK, &oldset, NULL);
+#endif
+
+ /* Free user supplied conf's base pointer on success,
+ * but not the actual allocated fields since the struct
+ * will have been copied in its entirety above. */
+ if (app_conf)
+ rd_free(app_conf);
+ rd_kafka_set_last_error(0, 0);
+
+ rk->rk_initialized = 1;
+
+ return rk;
+
+fail:
+ /*
+ * Error out and clean up
+ */
+
+ /* If on_new() interceptors have been called we also need
+ * to allow interceptor clean-up by calling on_destroy() */
+ rd_kafka_interceptors_on_destroy(rk);
+
+ /* If rk_conf is a struct-copy of the application configuration
+ * we need to avoid rk_conf fields from being freed from
+ * rd_kafka_destroy_internal() since they belong to app_conf.
+ * However, there are some internal fields, such as interceptors,
+ * that belong to rk_conf and thus needs to be cleaned up.
+ * Legacy APIs, sigh.. */
+ if (app_conf) {
+ rd_kafka_assignors_term(rk);
+ rd_kafka_interceptors_destroy(&rk->rk_conf);
+ memset(&rk->rk_conf, 0, sizeof(rk->rk_conf));
+ }
+
+ rd_atomic32_add(&rk->rk_terminate, 1);
+ rd_kafka_destroy_internal(rk);
+ rd_kafka_destroy_final(rk);
+
+ rd_kafka_set_last_error(ret_err, ret_errno);
+
+ return NULL;
+}
+
+
+
+
+
+/**
+ * Produce a single message.
+ * Locality: any application thread
+ */
+int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,
+ int msgflags,
+ void *payload, size_t len,
+ const void *key, size_t keylen,
+ void *msg_opaque) {
+ return rd_kafka_msg_new(rd_kafka_topic_a2i(rkt), partition,
+ msgflags, payload, len,
+ key, keylen, msg_opaque);
+}
+
+
+/**
+ * Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with
+ * friends) since it does not have an API for stopping the cgrp we will need to
+ * sort that out automatically in the background when all consumption
+ * has stopped.
+ *
+ * Returns 0 if a High level consumer is already instantiated
+ * which means a Simple consumer cannot co-operate with it, else 1.
+ *
+ * A rd_kafka_t handle can never migrate from simple to high-level, or
+ * vice versa, so we dont need a ..consumer_del().
+ */
+int rd_kafka_simple_consumer_add (rd_kafka_t *rk) {
+ if (rd_atomic32_get(&rk->rk_simple_cnt) < 0)
+ return 0;
+
+ return (int)rd_atomic32_add(&rk->rk_simple_cnt, 1);
+}
+
+
+
+
+/**
+ * rktp fetch is split up in these parts:
+ * * application side:
+ * * broker side (handled by current leader broker thread for rktp):
+ * - the fetch state, initial offset, etc.
+ * - fetching messages, updating fetched offset, etc.
+ * - offset commits
+ *
+ * Communication between the two are:
+ * app side -> rdkafka main side: rktp_ops
+ * broker thread -> app side: rktp_fetchq
+ *
+ * There is no shared state between these threads, instead
+ * state is communicated through the two op queues, and state synchronization
+ * is performed by version barriers.
+ *
+ */
+
+static RD_UNUSED
+int rd_kafka_consume_start0 (rd_kafka_itopic_t *rkt, int32_t partition,
+ int64_t offset, rd_kafka_q_t *rkq) {
+ shptr_rd_kafka_toppar_t *s_rktp;
+
+ if (partition < 0) {
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+ ESRCH);
+ return -1;
+ }
+
+ if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) {
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
+ return -1;
+ }
+
+ rd_kafka_topic_wrlock(rkt);
+ s_rktp = rd_kafka_toppar_desired_add(rkt, partition);
+ rd_kafka_topic_wrunlock(rkt);
+
+ /* Verify offset */
+ if (offset == RD_KAFKA_OFFSET_BEGINNING ||
+ offset == RD_KAFKA_OFFSET_END ||
+ offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
+ /* logical offsets */
+
+ } else if (offset == RD_KAFKA_OFFSET_STORED) {
+ /* offset manager */
+
+ if (rkt->rkt_conf.offset_store_method ==
+ RD_KAFKA_OFFSET_METHOD_BROKER &&
+ RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) {
+ /* Broker based offsets require a group id. */
+ rd_kafka_toppar_destroy(s_rktp);
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
+ EINVAL);
+ return -1;
+ }
+
+ } else if (offset < 0) {
+ rd_kafka_toppar_destroy(s_rktp);
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
+ EINVAL);
+ return -1;
+
+ }
+
+ rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_s2i(s_rktp), offset,
+ rkq, RD_KAFKA_NO_REPLYQ);
+
+ rd_kafka_toppar_destroy(s_rktp);
+
+ rd_kafka_set_last_error(0, 0);
+ return 0;
+}
+
+
+
+
+int rd_kafka_consume_start (rd_kafka_topic_t *app_rkt, int32_t partition,
+ int64_t offset) {
+ rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
+ rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START",
+ "Start consuming partition %"PRId32,partition);
+ return rd_kafka_consume_start0(rkt, partition, offset, NULL);
+}
+
+int rd_kafka_consume_start_queue (rd_kafka_topic_t *app_rkt, int32_t partition,
+ int64_t offset, rd_kafka_queue_t *rkqu) {
+ rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
+
+ return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q);
+}
+
+
+
+
+static RD_UNUSED int rd_kafka_consume_stop0 (rd_kafka_toppar_t *rktp) {
+ rd_kafka_q_t *tmpq = NULL;
+ rd_kafka_resp_err_t err;
+
+ rd_kafka_topic_wrlock(rktp->rktp_rkt);
+ rd_kafka_toppar_lock(rktp);
+ rd_kafka_toppar_desired_del(rktp);
+ rd_kafka_toppar_unlock(rktp);
+ rd_kafka_topic_wrunlock(rktp->rktp_rkt);
+
+ tmpq = rd_kafka_q_new(rktp->rktp_rkt->rkt_rk);
+
+ rd_kafka_toppar_op_fetch_stop(rktp, RD_KAFKA_REPLYQ(tmpq, 0));
+
+ /* Synchronisation: Wait for stop reply from broker thread */
+ err = rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
+ rd_kafka_q_destroy(tmpq);
+
+ rd_kafka_set_last_error(err, err ? EINVAL : 0);
+
+ return err ? -1 : 0;
+}
+
+
+int rd_kafka_consume_stop (rd_kafka_topic_t *app_rkt, int32_t partition) {
+ rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
+ shptr_rd_kafka_toppar_t *s_rktp;
+ int r;
+
+ if (partition == RD_KAFKA_PARTITION_UA) {
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
+ return -1;
+ }
+
+ rd_kafka_topic_wrlock(rkt);
+ if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
+ !(s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
+ rd_kafka_topic_wrunlock(rkt);
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+ ESRCH);
+ return -1;
+ }
+ rd_kafka_topic_wrunlock(rkt);
+
+ r = rd_kafka_consume_stop0(rd_kafka_toppar_s2i(s_rktp));
+ /* set_last_error() called by stop0() */
+
+ rd_kafka_toppar_destroy(s_rktp);
+
+ return r;
+}
+
+
+
+rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *app_rkt,
+ int32_t partition,
+ int64_t offset,
+ int timeout_ms) {
+ rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
+ shptr_rd_kafka_toppar_t *s_rktp;
+ rd_kafka_toppar_t *rktp;
+ rd_kafka_q_t *tmpq = NULL;
+ rd_kafka_resp_err_t err;
+
+ /* FIXME: simple consumer check */
+
+ if (partition == RD_KAFKA_PARTITION_UA)
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+ rd_kafka_topic_rdlock(rkt);
+ if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
+ !(s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
+ rd_kafka_topic_rdunlock(rkt);
+ return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
+ }
+ rd_kafka_topic_rdunlock(rkt);
+
+ if (timeout_ms)
+ tmpq = rd_kafka_q_new(rkt->rkt_rk);
+
+ rktp = rd_kafka_toppar_s2i(s_rktp);
+ if ((err = rd_kafka_toppar_op_seek(rktp, offset,
+ RD_KAFKA_REPLYQ(tmpq, 0)))) {
+ if (tmpq)
+ rd_kafka_q_destroy(tmpq);
+ rd_kafka_toppar_destroy(s_rktp);
+ return err;
+ }
+
+ rd_kafka_toppar_destroy(s_rktp);
+
+ if (tmpq) {
+ err = rd_kafka_q_wait_result(tmpq, timeout_ms);
+ rd_kafka_q_destroy(tmpq);
+ return err;
+ }
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+static ssize_t rd_kafka_consume_batch0 (rd_kafka_q_t *rkq,
+ int timeout_ms,
+ rd_kafka_message_t **rkmessages,
+ size_t rkmessages_size) {
+ /* Populate application's rkmessages array. */
+ return rd_kafka_q_serve_rkmessages(rkq, timeout_ms,
+ rkmessages, rkmessages_size);
+}
+
+
+ssize_t rd_kafka_consume_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
+ int timeout_ms,
+ rd_kafka_message_t **rkmessages,
+ size_t rkmessages_size) {
+ rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
+ shptr_rd_kafka_toppar_t *s_rktp;
+ rd_kafka_toppar_t *rktp;
+ ssize_t cnt;
+
+ /* Get toppar */
+ rd_kafka_topic_rdlock(rkt);
+ s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
+ if (unlikely(!s_rktp))
+ s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
+ rd_kafka_topic_rdunlock(rkt);
+
+ if (unlikely(!s_rktp)) {
+ /* No such toppar known */
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+ ESRCH);
+ return -1;
+ }
+
+ rktp = rd_kafka_toppar_s2i(s_rktp);
+
+ /* Populate application's rkmessages array. */
+ cnt = rd_kafka_q_serve_rkmessages(rktp->rktp_fetchq, timeout_ms,
+ rkmessages, rkmessages_size);
+
+ rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */
+
+ rd_kafka_set_last_error(0, 0);
+
+ return cnt;
+}
+
+ssize_t rd_kafka_consume_batch_queue (rd_kafka_queue_t *rkqu,
+ int timeout_ms,
+ rd_kafka_message_t **rkmessages,
+ size_t rkmessages_size) {
+ /* Populate application's rkmessages array. */
+ return rd_kafka_consume_batch0(rkqu->rkqu_q, timeout_ms,
+ rkmessages, rkmessages_size);
+}
+
+
+struct consume_ctx {
+ void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque);
+ void *opaque;
+};
+
+
+/**
+ * Trampoline for application's consume_cb()
+ */
+static rd_kafka_op_res_t
+rd_kafka_consume_cb (rd_kafka_t *rk,
+ rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko,
+ rd_kafka_q_cb_type_t cb_type, void *opaque) {
+ struct consume_ctx *ctx = opaque;
+ rd_kafka_message_t *rkmessage;
+
+ if (unlikely(rd_kafka_op_version_outdated(rko, 0))) {
+ rd_kafka_op_destroy(rko);
+ return RD_KAFKA_OP_RES_HANDLED;
+ }
+
+ rkmessage = rd_kafka_message_get(rko);
+
+ rd_kafka_op_offset_store(rk, rko, rkmessage);
+
+ ctx->consume_cb(rkmessage, ctx->opaque);
+
+ rd_kafka_op_destroy(rko);
+
+ return RD_KAFKA_OP_RES_HANDLED;
+}
+
+
+
+static rd_kafka_op_res_t
+rd_kafka_consume_callback0 (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt,
+ void (*consume_cb) (rd_kafka_message_t
+ *rkmessage,
+ void *opaque),
+ void *opaque) {
+ struct consume_ctx ctx = { .consume_cb = consume_cb, .opaque = opaque };
+ return rd_kafka_q_serve(rkq, timeout_ms, max_cnt,
+ RD_KAFKA_Q_CB_RETURN,
+ rd_kafka_consume_cb, &ctx);
+
+}
+
+
+int rd_kafka_consume_callback (rd_kafka_topic_t *app_rkt, int32_t partition,
+ int timeout_ms,
+ void (*consume_cb) (rd_kafka_message_t
+ *rkmessage,
+ void *opaque),
+ void *opaque) {
+ rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
+ shptr_rd_kafka_toppar_t *s_rktp;
+ rd_kafka_toppar_t *rktp;
+ int r;
+
+ /* Get toppar */
+ rd_kafka_topic_rdlock(rkt);
+ s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
+ if (unlikely(!s_rktp))
+ s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
+ rd_kafka_topic_rdunlock(rkt);
+
+ if (unlikely(!s_rktp)) {
+ /* No such toppar known */
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+ ESRCH);
+ return -1;
+ }
+
+ rktp = rd_kafka_toppar_s2i(s_rktp);
+ r = rd_kafka_consume_callback0(rktp->rktp_fetchq, timeout_ms,
+ rkt->rkt_conf.consume_callback_max_msgs,
+ consume_cb, opaque);
+
+ rd_kafka_toppar_destroy(s_rktp);
+
+ rd_kafka_set_last_error(0, 0);
+
+ return r;
+}
+
+
+
+int rd_kafka_consume_callback_queue (rd_kafka_queue_t *rkqu,
+ int timeout_ms,
+ void (*consume_cb) (rd_kafka_message_t
+ *rkmessage,
+ void *opaque),
+ void *opaque) {
+ return rd_kafka_consume_callback0(rkqu->rkqu_q, timeout_ms, 0,
+ consume_cb, opaque);
+}
+
+
+/**
+ * Serve queue 'rkq' and return one message.
+ * By serving the queue it will also call any registered callbacks
+ * registered for matching events, this includes consumer_cb()
+ * in which case no message will be returned.
+ */
+static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk,
+ rd_kafka_q_t *rkq,
+ int timeout_ms) {
+ rd_kafka_op_t *rko;
+ rd_kafka_message_t *rkmessage = NULL;
+ rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
+
+ rd_kafka_yield_thread = 0;
+ while ((rko = rd_kafka_q_pop(rkq,
+ rd_timeout_remains(abs_timeout), 0))) {
+ rd_kafka_op_res_t res;
+
+ res = rd_kafka_poll_cb(rk, rkq, rko,
+ RD_KAFKA_Q_CB_RETURN, NULL);
+
+ if (res == RD_KAFKA_OP_RES_PASS)
+ break;
+
+ if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
+ rd_kafka_yield_thread)) {
+ /* Callback called rd_kafka_yield(), we must
+ * stop dispatching the queue and return. */
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR,
+ EINTR);
+ return NULL;
+ }
+
+ /* Message was handled by callback. */
+ continue;
+ }
+
+ if (!rko) {
+ /* Timeout reached with no op returned. */
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
+ ETIMEDOUT);
+ return NULL;
+ }
+
+ rd_kafka_assert(rk,
+ rko->rko_type == RD_KAFKA_OP_FETCH ||
+ rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR);
+
+ /* Get rkmessage from rko */
+ rkmessage = rd_kafka_message_get(rko);
+
+ /* Store offset */
+ rd_kafka_op_offset_store(rk, rko, rkmessage);
+
+ rd_kafka_set_last_error(0, 0);
+
+ return rkmessage;
+}
+
+rd_kafka_message_t *rd_kafka_consume (rd_kafka_topic_t *app_rkt,
+ int32_t partition,
+ int timeout_ms) {
+ rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
+ shptr_rd_kafka_toppar_t *s_rktp;
+ rd_kafka_toppar_t *rktp;
+ rd_kafka_message_t *rkmessage;
+
+ rd_kafka_topic_rdlock(rkt);
+ s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
+ if (unlikely(!s_rktp))
+ s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
+ rd_kafka_topic_rdunlock(rkt);
+
+ if (unlikely(!s_rktp)) {
+ /* No such toppar known */
+ rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
+ ESRCH);
+ return NULL;
+ }
+
+ rktp = rd_kafka_toppar_s2i(s_rktp);
+ rkmessage = rd_kafka_consume0(rkt->rkt_rk,
+ rktp->rktp_fetchq, timeout_ms);
+
+ rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */
+
+ return rkmessage;
+}
+
+
+rd_kafka_message_t *rd_kafka_consume_queue (rd_kafka_queue_t *rkqu,
+ int timeout_ms) {
+ return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms);
+}
+
+
+
+
+rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk) {
+ rd_kafka_cgrp_t *rkcg;
+
+ if (!(rkcg = rd_kafka_cgrp_get(rk)))
+ return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
+
+ rd_kafka_q_fwd_set(rk->rk_rep, rkcg->rkcg_q);
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+
+rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,
+ int timeout_ms) {
+ rd_kafka_cgrp_t *rkcg;
+
+ if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) {
+ rd_kafka_message_t *rkmessage = rd_kafka_message_new();
+ rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
+ return rkmessage;
+ }
+
+ return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms);
+}
+
+
+rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk) {
+ rd_kafka_cgrp_t *rkcg;
+ rd_kafka_op_t *rko;
+ rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+ rd_kafka_q_t *rkq;
+
+ if (!(rkcg = rd_kafka_cgrp_get(rk)))
+ return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
+
+ /* Redirect cgrp queue to our temporary queue to make sure
+ * all posted ops (e.g., rebalance callbacks) are served by
+ * this function. */
+ rkq = rd_kafka_q_new(rk);
+ rd_kafka_q_fwd_set(rkcg->rkcg_q, rkq);
+
+ rd_kafka_cgrp_terminate(rkcg, RD_KAFKA_REPLYQ(rkq, 0)); /* async */
+
+ while ((rko = rd_kafka_q_pop(rkq, RD_POLL_INFINITE, 0))) {
+ rd_kafka_op_res_t res;
+ if ((rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) ==
+ RD_KAFKA_OP_TERMINATE) {
+ err = rko->rko_err;
+ rd_kafka_op_destroy(rko);
+ break;
+ }
+ res = rd_kafka_poll_cb(rk, rkq, rko,
+ RD_KAFKA_Q_CB_RETURN, NULL);
+ if (res == RD_KAFKA_OP_RES_PASS)
+ rd_kafka_op_destroy(rko);
+ /* Ignore YIELD, we need to finish */
+ }
+
+ rd_kafka_q_destroy(rkq);
+
+ rd_kafka_q_fwd_set(rkcg->rkcg_q, NULL);
+
+ return err;
+}
+
+
+
+rd_kafka_resp_err_t
+rd_kafka_committed (rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *partitions,
+ int timeout_ms) {
+ rd_kafka_q_t *rkq;
+ rd_kafka_resp_err_t err;
+ rd_kafka_cgrp_t *rkcg;
+ rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
+
+ if (!partitions)
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+ if (!(rkcg = rd_kafka_cgrp_get(rk)))
+ return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
+
+ /* Set default offsets. */
+ rd_kafka_topic_partition_list_reset_offsets(partitions,
+ RD_KAFKA_OFFSET_INVALID);
+
+ rkq = rd_kafka_q_new(rk);
+
+ do {
+ rd_kafka_op_t *rko;
+ int state_version = rd_kafka_brokers_get_state_version(rk);
+
+ rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
+ rd_kafka_op_set_replyq(rko, rkq, NULL);
+
+ /* Issue #827
+ * Copy partition list to avoid use-after-free if we time out
+ * here, the app frees the list, and then cgrp starts
+ * processing the op. */
+ rko->rko_u.offset_fetch.partitions =
+ rd_kafka_topic_partition_list_copy(partitions);
+ rko->rko_u.offset_fetch.do_free = 1;
+
+ if (!rd_kafka_q_enq(rkcg->rkcg_ops, rko)) {
+ err = RD_KAFKA_RESP_ERR__DESTROY;
+ break;
+ }
+
+ rko = rd_kafka_q_pop(rkq, rd_timeout_remains(abs_timeout), 0);
+ if (rko) {
+ if (!(err = rko->rko_err))
+ rd_kafka_topic_partition_list_update(
+ partitions,
+ rko->rko_u.offset_fetch.partitions);
+ else if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
+ err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
+ !rd_kafka_brokers_wait_state_change(
+ rk, state_version,
+ rd_timeout_remains(abs_timeout)))
+ err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+
+ rd_kafka_op_destroy(rko);
+ } else
+ err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+ } while (err == RD_KAFKA_RESP_ERR__TRANSPORT ||
+ err == RD_KAFKA_RESP_ERR__WAIT_COORD);
+
+ rd_kafka_q_destroy(rkq);
+
+ return err;
+}
+
+
+
+rd_kafka_resp_err_t
+rd_kafka_position (rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *partitions) {
+ int i;
+
+ /* Set default offsets. */
+ rd_kafka_topic_partition_list_reset_offsets(partitions,
+ RD_KAFKA_OFFSET_INVALID);
+
+ for (i = 0 ; i < partitions->cnt ; i++) {
+ rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
+ shptr_rd_kafka_toppar_t *s_rktp;
+ rd_kafka_toppar_t *rktp;
+
+ if (!(s_rktp = rd_kafka_toppar_get2(rk, rktpar->topic,
+ rktpar->partition, 0, 1))) {
+ rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
+ rktpar->offset = RD_KAFKA_OFFSET_INVALID;
+ continue;
+ }
+
+ rktp = rd_kafka_toppar_s2i(s_rktp);
+ rd_kafka_toppar_lock(rktp);
+ rktpar->offset = rktp->rktp_app_offset;
+ rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ rd_kafka_toppar_unlock(rktp);
+ rd_kafka_toppar_destroy(s_rktp);
+ }
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+struct _query_wmark_offsets_state {
+ rd_kafka_resp_err_t err;
+ const char *topic;
+ int32_t partition;
+ int64_t offsets[2];
+ int offidx; /* next offset to set from response */
+ rd_ts_t ts_end;
+ int state_version; /* Broker state version */
+};
+
+static void rd_kafka_query_wmark_offsets_resp_cb (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ struct _query_wmark_offsets_state *state = opaque;
+ rd_kafka_topic_partition_list_t *offsets;
+ rd_kafka_topic_partition_t *rktpar;
+
+ offsets = rd_kafka_topic_partition_list_new(1);
+ err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request, offsets);
+ if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
+ rd_kafka_topic_partition_list_destroy(offsets);
+ return; /* Retrying */
+ }
+
+ /* Retry if no broker connection is available yet. */
+ if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
+ err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
+ rkb &&
+ rd_kafka_brokers_wait_state_change(
+ rkb->rkb_rk, state->state_version,
+ rd_timeout_remains(state->ts_end))) {
+ /* Retry */
+ state->state_version = rd_kafka_brokers_get_state_version(rk);
+ request->rkbuf_retries = 0;
+ if (rd_kafka_buf_retry(rkb, request)) {
+ rd_kafka_topic_partition_list_destroy(offsets);
+ return; /* Retry in progress */
+ }
+ /* FALLTHRU */
+ }
+
+ /* Partition not seen in response. */
+ if (!(rktpar = rd_kafka_topic_partition_list_find(offsets,
+ state->topic,
+ state->partition)))
+ err = RD_KAFKA_RESP_ERR__BAD_MSG;
+ else if (rktpar->err)
+ err = rktpar->err;
+ else
+ state->offsets[state->offidx] = rktpar->offset;
+
+ state->offidx++;
+
+ if (err || state->offidx == 2) /* Error or Done */
+ state->err = err;
+
+ rd_kafka_topic_partition_list_destroy(offsets);
+}
+
+
+rd_kafka_resp_err_t
+rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic,
+ int32_t partition,
+ int64_t *low, int64_t *high, int timeout_ms) {
+ rd_kafka_q_t *rkq;
+ struct _query_wmark_offsets_state state;
+ rd_ts_t ts_end = rd_timeout_init(timeout_ms);
+ rd_kafka_topic_partition_list_t *partitions;
+ rd_kafka_topic_partition_t *rktpar;
+ struct rd_kafka_partition_leader *leader;
+ rd_list_t leaders;
+ rd_kafka_resp_err_t err;
+
+ partitions = rd_kafka_topic_partition_list_new(1);
+ rktpar = rd_kafka_topic_partition_list_add(partitions,
+ topic, partition);
+
+ rd_list_init(&leaders, partitions->cnt,
+ (void *)rd_kafka_partition_leader_destroy);
+
+ err = rd_kafka_topic_partition_list_query_leaders(rk, partitions,
+ &leaders, timeout_ms);
+ if (err) {
+ rd_list_destroy(&leaders);
+ rd_kafka_topic_partition_list_destroy(partitions);
+ return err;
+ }
+
+ leader = rd_list_elem(&leaders, 0);
+
+ rkq = rd_kafka_q_new(rk);
+
+ /* Due to KAFKA-1588 we need to send a request for each wanted offset,
+ * in this case one for the low watermark and one for the high. */
+ state.topic = topic;
+ state.partition = partition;
+ state.offsets[0] = RD_KAFKA_OFFSET_BEGINNING;
+ state.offsets[1] = RD_KAFKA_OFFSET_END;
+ state.offidx = 0;
+ state.err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
+ state.ts_end = ts_end;
+ state.state_version = rd_kafka_brokers_get_state_version(rk);
+
+
+ rktpar->offset = RD_KAFKA_OFFSET_BEGINNING;
+ rd_kafka_OffsetRequest(leader->rkb, partitions, 0,
+ RD_KAFKA_REPLYQ(rkq, 0),
+ rd_kafka_query_wmark_offsets_resp_cb,
+ &state);
+
+ rktpar->offset = RD_KAFKA_OFFSET_END;
+ rd_kafka_OffsetRequest(leader->rkb, partitions, 0,
+ RD_KAFKA_REPLYQ(rkq, 0),
+ rd_kafka_query_wmark_offsets_resp_cb,
+ &state);
+
+ rd_kafka_topic_partition_list_destroy(partitions);
+ rd_list_destroy(&leaders);
+
+ /* Wait for reply (or timeout) */
+ while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
+ rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
+ rd_kafka_poll_cb, NULL) !=
+ RD_KAFKA_OP_RES_YIELD)
+ ;
+
+ rd_kafka_q_destroy(rkq);
+
+ if (state.err)
+ return state.err;
+ else if (state.offidx != 2)
+ return RD_KAFKA_RESP_ERR__FAIL;
+
+ /* We are not certain about the returned order. */
+ if (state.offsets[0] < state.offsets[1]) {
+ *low = state.offsets[0];
+ *high = state.offsets[1];
+ } else {
+ *low = state.offsets[1];
+ *high = state.offsets[0];
+ }
+
+ /* If partition is empty only one offset (the last) will be returned. */
+ if (*low < 0 && *high >= 0)
+ *low = *high;
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+rd_kafka_resp_err_t
+rd_kafka_get_watermark_offsets (rd_kafka_t *rk, const char *topic,
+ int32_t partition,
+ int64_t *low, int64_t *high) {
+ shptr_rd_kafka_toppar_t *s_rktp;
+ rd_kafka_toppar_t *rktp;
+
+ s_rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, 1);
+ if (!s_rktp)
+ return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
+ rktp = rd_kafka_toppar_s2i(s_rktp);
+
+ rd_kafka_toppar_lock(rktp);
+ *low = rktp->rktp_lo_offset;
+ *high = rktp->rktp_hi_offset;
+ rd_kafka_toppar_unlock(rktp);
+
+ rd_kafka_toppar_destroy(s_rktp);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * @brief get_offsets_for_times() state
+ */
+struct _get_offsets_for_times {
+ rd_kafka_topic_partition_list_t *results;
+ rd_kafka_resp_err_t err;
+ int wait_reply;
+ int state_version;
+ rd_ts_t ts_end;
+};
+
+/**
+ * @brief Handle OffsetRequest responses
+ */
+static void rd_kafka_get_offsets_for_times_resp_cb (rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ struct _get_offsets_for_times *state = opaque;
+
+ err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request,
+ state->results);
+ if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
+ return; /* Retrying */
+
+ /* Retry if no broker connection is available yet. */
+ if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
+ err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
+ rkb &&
+ rd_kafka_brokers_wait_state_change(
+ rkb->rkb_rk, state->state_version,
+ rd_timeout_remains(state->ts_end))) {
+ /* Retry */
+ state->state_version = rd_kafka_brokers_get_state_version(rk);
+ request->rkbuf_retries = 0;
+ if (rd_kafka_buf_retry(rkb, request))
+ return; /* Retry in progress */
+ /* FALLTHRU */
+ }
+
+ if (err && !state->err)
+ state->err = err;
+
+ state->wait_reply--;
+}
+
+
+rd_kafka_resp_err_t
+rd_kafka_offsets_for_times (rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *offsets,
+ int timeout_ms) {
+ rd_kafka_q_t *rkq;
+ struct _get_offsets_for_times state = RD_ZERO_INIT;
+ rd_ts_t ts_end = rd_timeout_init(timeout_ms);
+ rd_list_t leaders;
+ int i;
+ rd_kafka_resp_err_t err;
+ struct rd_kafka_partition_leader *leader;
+
+ if (offsets->cnt == 0)
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+ rd_list_init(&leaders, offsets->cnt,
+ (void *)rd_kafka_partition_leader_destroy);
+
+ err = rd_kafka_topic_partition_list_query_leaders(rk, offsets, &leaders,
+ timeout_ms);
+ if (err) {
+ rd_list_destroy(&leaders);
+ return err;
+ }
+
+
+ rkq = rd_kafka_q_new(rk);
+
+ state.wait_reply = 0;
+ state.results = rd_kafka_topic_partition_list_new(offsets->cnt);
+
+ /* For each leader send a request for its partitions */
+ RD_LIST_FOREACH(leader, &leaders, i) {
+ state.wait_reply++;
+ rd_kafka_OffsetRequest(leader->rkb, leader->partitions, 1,
+ RD_KAFKA_REPLYQ(rkq, 0),
+ rd_kafka_get_offsets_for_times_resp_cb,
+ &state);
+ }
+
+ rd_list_destroy(&leaders);
+
+ /* Wait for reply (or timeout) */
+ while (state.wait_reply > 0 && rd_timeout_remains(ts_end) > 0)
+ rd_kafka_q_serve(rkq, rd_timeout_remains(ts_end),
+ 0, RD_KAFKA_Q_CB_CALLBACK,
+ rd_kafka_poll_cb, NULL);
+
+ rd_kafka_q_destroy(rkq);
+
+ /* Then update the queried partitions. */
+ if (!state.err)
+ rd_kafka_topic_partition_list_update(offsets, state.results);
+
+ rd_kafka_topic_partition_list_destroy(state.results);
+
+ return state.err;
+}
+
+
+/**
+ * rd_kafka_poll() (and similar) op callback handler.
+ * Will either call registered callback depending on cb_type and op type
+ * or return op to application, if applicable (e.g., fetch message).
+ *
+ * Returns 1 if op was handled, else 0.
+ *
+ * Locality: application thread
+ */
+rd_kafka_op_res_t
+rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
+ rd_kafka_q_cb_type_t cb_type, void *opaque) {
+ rd_kafka_msg_t *rkm;
+
+ /* Return-as-event requested, see if op can be converted to event,
+ * otherwise fall through and trigger callbacks. */
+ if (cb_type == RD_KAFKA_Q_CB_EVENT && rd_kafka_event_setup(rk, rko))
+ return 0; /* Return as event */
+
+ switch ((int)rko->rko_type)
+ {
+ case RD_KAFKA_OP_FETCH:
+ if (!rk->rk_conf.consume_cb ||
+ cb_type == RD_KAFKA_Q_CB_RETURN ||
+ cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
+ return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
+ else {
+ struct consume_ctx ctx = {
+ .consume_cb = rk->rk_conf.consume_cb,
+ .opaque = rk->rk_conf.opaque };
+
+ return rd_kafka_consume_cb(rk, rkq, rko, cb_type, &ctx);
+ }
+ break;
+
+ case RD_KAFKA_OP_REBALANCE:
+ /* If EVENT_REBALANCE is enabled but rebalance_cb isnt
+ * we need to perform a dummy assign for the application.
+ * This might happen during termination with consumer_close() */
+ if (rk->rk_conf.rebalance_cb)
+ rk->rk_conf.rebalance_cb(
+ rk, rko->rko_err,
+ rko->rko_u.rebalance.partitions,
+ rk->rk_conf.opaque);
+ else {
+ rd_kafka_dbg(rk, CGRP, "UNASSIGN",
+ "Forcing unassign of %d partition(s)",
+ rko->rko_u.rebalance.partitions ?
+ rko->rko_u.rebalance.partitions->cnt : 0);
+ rd_kafka_assign(rk, NULL);
+ }
+ break;
+
+ case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
+ if (!rko->rko_u.offset_commit.cb)
+ return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
+ rko->rko_u.offset_commit.cb(
+ rk, rko->rko_err,
+ rko->rko_u.offset_commit.partitions,
+ rko->rko_u.offset_commit.opaque);
+ break;
+
+ case RD_KAFKA_OP_CONSUMER_ERR:
+ /* rd_kafka_consumer_poll() (_Q_CB_CONSUMER):
+ * Consumer errors are returned to the application
+ * as rkmessages, not error callbacks.
+ *
+ * rd_kafka_poll() (_Q_CB_GLOBAL):
+ * convert to ERR op (fallthru)
+ */
+ if (cb_type == RD_KAFKA_Q_CB_RETURN ||
+ cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) {
+ /* return as message_t to application */
+ return RD_KAFKA_OP_RES_PASS;
+ }
+ /* FALLTHRU */
+
+ case RD_KAFKA_OP_ERR:
+ if (rk->rk_conf.error_cb)
+ rk->rk_conf.error_cb(rk, rko->rko_err,
+ rko->rko_u.err.errstr,
+ rk->rk_conf.opaque);
+ else
+ rd_kafka_log(rk, LOG_ERR, "ERROR",
+ "%s: %s: %s",
+ rk->rk_name,
+ rd_kafka_err2str(rko->rko_err),
+ rko->rko_u.err.errstr);
+ break;
+
+ case RD_KAFKA_OP_DR:
+ /* Delivery report:
+ * call application DR callback for each message. */
+ while ((rkm = TAILQ_FIRST(&rko->rko_u.dr.msgq.rkmq_msgs))) {
+ rd_kafka_message_t *rkmessage;
+
+ TAILQ_REMOVE(&rko->rko_u.dr.msgq.rkmq_msgs,
+ rkm, rkm_link);
+
+ rkmessage = rd_kafka_message_get_from_rkm(rko, rkm);
+
+ if (rk->rk_conf.dr_msg_cb) {
+ rk->rk_conf.dr_msg_cb(rk, rkmessage,
+ rk->rk_conf.opaque);
+
+ } else {
+
+ rk->rk_conf.dr_cb(rk,
+ rkmessage->payload,
+ rkmessage->len,
+ rkmessage->err,
+ rk->rk_conf.opaque,
+ rkmessage->_private);
+ }
+
+ rd_kafka_msg_destroy(rk, rkm);
+
+ if (unlikely(rd_kafka_yield_thread)) {
+ /* Callback called yield(),
+ * re-enqueue the op (if there are any
+ * remaining messages). */
+ if (!TAILQ_EMPTY(&rko->rko_u.dr.msgq.
+ rkmq_msgs))
+ rd_kafka_q_reenq(rkq, rko);
+ else
+ rd_kafka_op_destroy(rko);
+ return RD_KAFKA_OP_RES_YIELD;
+ }
+ }
+
+ rd_kafka_msgq_init(&rko->rko_u.dr.msgq);
+
+ break;
+
+ case RD_KAFKA_OP_THROTTLE:
+ if (rk->rk_conf.throttle_cb)
+ rk->rk_conf.throttle_
<TRUNCATED>