You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:57 UTC
[29/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade
to librdkafka 0.11.4
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_cgrp.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_cgrp.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_cgrp.h
deleted file mode 100644
index 0424b5d..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_cgrp.h
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-#pragma once
-
-#include "rdinterval.h"
-
-#include "rdkafka_assignor.h"
-
-/**
- * Client groups implementation
- *
- * Client groups handling for a single cgrp is assigned to a single
- * rd_kafka_broker_t object at any given time.
- * The main thread will call cgrp_serve() to serve its cgrps.
- *
- * This means that the cgrp itself does not need to be locked since it
- * is only ever used from the main thread.
- *
- */
-
-
-extern const char *rd_kafka_cgrp_join_state_names[];
-
-/**
- * Client group
- */
-typedef struct rd_kafka_cgrp_s {
- TAILQ_ENTRY(rd_kafka_cgrp_s) rkcg_rkb_link; /* rkb_cgrps */
- const rd_kafkap_str_t *rkcg_group_id;
- rd_kafkap_str_t *rkcg_member_id; /* Last assigned MemberId */
- const rd_kafkap_str_t *rkcg_client_id;
-
- enum {
- /* Init state */
- RD_KAFKA_CGRP_STATE_INIT,
-
- /* Cgrp has been stopped. This is a final state */
- RD_KAFKA_CGRP_STATE_TERM,
-
- /* Query for group coordinator */
- RD_KAFKA_CGRP_STATE_QUERY_COORD,
-
- /* Outstanding query, awaiting response */
- RD_KAFKA_CGRP_STATE_WAIT_COORD,
-
- /* Wait ack from assigned cgrp manager broker thread */
- RD_KAFKA_CGRP_STATE_WAIT_BROKER,
-
- /* Wait for manager broker thread to connect to broker */
- RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT,
-
- /* Coordinator is up and manager is assigned. */
- RD_KAFKA_CGRP_STATE_UP,
- } rkcg_state;
- rd_ts_t rkcg_ts_statechange; /* Timestamp of last
- * state change. */
-
-
- enum {
- RD_KAFKA_CGRP_JOIN_STATE_INIT,
-
- /* all: JoinGroupRequest sent, awaiting response. */
- RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN,
-
- /* Leader: MetadataRequest sent, awaiting response. */
- RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA,
-
- /* Follower: SyncGroupRequest sent, awaiting response. */
- RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC,
-
- /* all: waiting for previous assignment to decommission */
- RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN,
-
- /* all: waiting for application's rebalance_cb to assign() */
- RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_REBALANCE_CB,
-
- /* all: waiting for application's rebalance_cb to revoke */
- RD_KAFKA_CGRP_JOIN_STATE_WAIT_REVOKE_REBALANCE_CB,
-
- /* all: synchronized and assigned
- * may be an empty assignment. */
- RD_KAFKA_CGRP_JOIN_STATE_ASSIGNED,
-
- /* all: fetchers are started and operational */
- RD_KAFKA_CGRP_JOIN_STATE_STARTED
- } rkcg_join_state;
-
- /* State when group leader */
- struct {
- char *protocol;
- rd_kafka_group_member_t *members;
- int member_cnt;
- } rkcg_group_leader;
-
- rd_kafka_q_t *rkcg_q; /* Application poll queue */
- rd_kafka_q_t *rkcg_ops; /* Manager ops queue */
- rd_kafka_q_t *rkcg_wait_coord_q; /* Ops awaiting coord */
- int32_t rkcg_version; /* Ops queue version barrier
- * Increased by:
- * Rebalance delegation
- * Assign/Unassign
- */
- mtx_t rkcg_lock;
-
- int rkcg_flags;
-#define RD_KAFKA_CGRP_F_TERMINATE 0x1 /* Terminate cgrp (async) */
-#define RD_KAFKA_CGRP_F_WAIT_UNASSIGN 0x4 /* Waiting for unassign
- * to complete */
-#define RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN 0x8 /* Send LeaveGroup when
- * unassign is done */
-#define RD_KAFKA_CGRP_F_SUBSCRIPTION 0x10 /* If set:
- * subscription
- * else:
- * static assignment */
-#define RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT 0x20 /* A Heartbeat request
- * is in transit, dont
- * send a new one. */
-#define RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION 0x40 /* Subscription contains
- * wildcards. */
-
- rd_interval_t rkcg_coord_query_intvl; /* Coordinator query intvl*/
- rd_interval_t rkcg_heartbeat_intvl; /* Heartbeat intvl */
- rd_interval_t rkcg_join_intvl; /* JoinGroup interval */
- rd_interval_t rkcg_timeout_scan_intvl; /* Timeout scanner */
-
- TAILQ_HEAD(, rd_kafka_topic_s) rkcg_topics;/* Topics subscribed to */
-
- rd_list_t rkcg_toppars; /* Toppars subscribed to*/
-
- int rkcg_assigned_cnt; /* Assigned partitions */
-
- int32_t rkcg_coord_id; /* Current coordinator id */
-
- int32_t rkcg_generation_id; /* Current generation id */
-
- rd_kafka_assignor_t *rkcg_assignor; /* Selected partition
- * assignor strategy. */
-
- rd_kafka_broker_t *rkcg_rkb; /* Current handling broker,
- * if the coordinator broker
- * is not available this
- * will be another broker
- * that will handle the
- * querying of coordinator
- * etc.
- * Broker in this sense
- * is a broker_t object,
- * not necessarily a
- * real broker. */
-
- /* Current subscription */
- rd_kafka_topic_partition_list_t *rkcg_subscription;
- /* The actual topics subscribed (after metadata+wildcard matching) */
- rd_list_t *rkcg_subscribed_topics; /**< (rd_kafka_topic_info_t *) */
-
- /* Current assignment */
- rd_kafka_topic_partition_list_t *rkcg_assignment;
-
- int rkcg_wait_unassign_cnt; /* Waiting for this number
- * of partitions to be
- * unassigned and
- * decommissioned before
- * transitioning to the
- * next state. */
-
- int rkcg_wait_commit_cnt; /* Waiting for this number
- * of commits to finish. */
-
- rd_kafka_resp_err_t rkcg_last_err; /* Last error propagated to
- * application.
- * This is for silencing
- * same errors. */
-
- rd_kafka_timer_t rkcg_offset_commit_tmr; /* Offset commit timer */
-
- rd_kafka_t *rkcg_rk;
-
- rd_kafka_op_t *rkcg_reply_rko; /* Send reply for op
- * (OP_TERMINATE)
- * to this rko's queue. */
-
- rd_ts_t rkcg_ts_terminate; /* Timestamp of when
- * cgrp termination was
- * initiated. */
-
- /* Protected by rd_kafka_*lock() */
- struct {
- rd_ts_t ts_rebalance; /* Timestamp of
- * last rebalance */
- int rebalance_cnt; /* Number of
- rebalances */
- int assignment_size; /* Partition count
- * of last rebalance
- * assignment */
- } rkcg_c;
-
-} rd_kafka_cgrp_t;
-
-
-
-
-#define rd_kafka_cgrp_lock(rkcg) mtx_lock(&(rkcg)->rkcg_lock)
-#define rd_kafka_cgrp_unlock(rkcg) mtx_unlock(&(rkcg)->rkcg_lock)
-
-/* Check if broker is the coordinator */
-#define RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg,rkb) \
- ((rkcg)->rkcg_coord_id != -1 && \
- (rkcg)->rkcg_coord_id == (rkb)->rkb_nodeid)
-
-extern const char *rd_kafka_cgrp_state_names[];
-
-void rd_kafka_cgrp_destroy_final (rd_kafka_cgrp_t *rkcg);
-rd_kafka_cgrp_t *rd_kafka_cgrp_new (rd_kafka_t *rk,
- const rd_kafkap_str_t *group_id,
- const rd_kafkap_str_t *client_id);
-void rd_kafka_cgrp_serve (rd_kafka_cgrp_t *rkcg);
-
-void rd_kafka_cgrp_op (rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp,
- rd_kafka_replyq_t replyq, rd_kafka_op_type_t type,
- rd_kafka_resp_err_t err);
-void rd_kafka_cgrp_terminate0 (rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko);
-void rd_kafka_cgrp_terminate (rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq);
-
-
-rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_del (rd_kafka_cgrp_t *rkcg,
- const char *pattern);
-rd_kafka_resp_err_t rd_kafka_cgrp_topic_pattern_add (rd_kafka_cgrp_t *rkcg,
- const char *pattern);
-
-int rd_kafka_cgrp_topic_check (rd_kafka_cgrp_t *rkcg, const char *topic);
-
-void rd_kafka_cgrp_set_member_id (rd_kafka_cgrp_t *rkcg, const char *member_id);
-
-void rd_kafka_cgrp_handle_heartbeat_error (rd_kafka_cgrp_t *rkcg,
- rd_kafka_resp_err_t err);
-
-void rd_kafka_cgrp_handle_SyncGroup (rd_kafka_cgrp_t *rkcg,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- const rd_kafkap_bytes_t *member_state);
-void rd_kafka_cgrp_set_join_state (rd_kafka_cgrp_t *rkcg, int join_state);
-
-int rd_kafka_cgrp_reassign_broker (rd_kafka_cgrp_t *rkcg);
-
-void rd_kafka_cgrp_coord_query (rd_kafka_cgrp_t *rkcg,
- const char *reason);
-void rd_kafka_cgrp_coord_dead (rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err,
- const char *reason);
-void rd_kafka_cgrp_metadata_update_check (rd_kafka_cgrp_t *rkcg, int do_join);
-#define rd_kafka_cgrp_get(rk) ((rk)->rk_cgrp)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_conf.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_conf.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_conf.c
deleted file mode 100644
index a0e4d9f..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_conf.c
+++ /dev/null
@@ -1,2151 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012,2013 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rdkafka_int.h"
-#include "rd.h"
-
-#include <stdlib.h>
-#include <ctype.h>
-#include <stddef.h>
-
-#include "rdkafka_int.h"
-#include "rdkafka_feature.h"
-#include "rdkafka_interceptor.h"
-#if WITH_PLUGINS
-#include "rdkafka_plugin.h"
-#endif
-
-struct rd_kafka_property {
- rd_kafka_conf_scope_t scope;
- const char *name;
- enum {
- _RK_C_STR,
- _RK_C_INT,
- _RK_C_S2I, /* String to Integer mapping.
- * Supports limited canonical str->int mappings
- * using s2i[] */
- _RK_C_S2F, /* CSV String to Integer flag mapping (OR:ed) */
- _RK_C_BOOL,
- _RK_C_PTR, /* Only settable through special set functions */
- _RK_C_PATLIST, /* Pattern list */
- _RK_C_KSTR, /* Kafka string */
- _RK_C_ALIAS, /* Alias: points to other property through .sdef */
- _RK_C_INTERNAL, /* Internal, don't expose to application */
- _RK_C_INVALID, /* Invalid property, used to catch known
- * but unsupported Java properties. */
- } type;
- int offset;
- const char *desc;
- int vmin;
- int vmax;
- int vdef; /* Default value (int) */
- const char *sdef; /* Default value (string) */
- void *pdef; /* Default value (pointer) */
- struct {
- int val;
- const char *str;
- } s2i[16]; /* _RK_C_S2I and _RK_C_S2F */
-
- /* Value validator (STR) */
- int (*validate) (const struct rd_kafka_property *prop,
- const char *val, int ival);
-
- /* Configuration object constructors and destructor for use when
- * the property value itself is not used, or needs extra care. */
- void (*ctor) (int scope, void *pconf);
- void (*dtor) (int scope, void *pconf);
- void (*copy) (int scope, void *pdst, const void *psrc,
- void *dstptr, const void *srcptr,
- size_t filter_cnt, const char **filter);
-
- rd_kafka_conf_res_t (*set) (int scope, void *pconf,
- const char *name, const char *value,
- void *dstptr,
- rd_kafka_conf_set_mode_t set_mode,
- char *errstr, size_t errstr_size);
-};
-
-
-#define _RK(field) offsetof(rd_kafka_conf_t, field)
-#define _RKT(field) offsetof(rd_kafka_topic_conf_t, field)
-
-
-static rd_kafka_conf_res_t
-rd_kafka_anyconf_get0 (const void *conf, const struct rd_kafka_property *prop,
- char *dest, size_t *dest_size);
-
-
-/**
- * @brief Validate \p broker.version.fallback property.
- */
-static int
-rd_kafka_conf_validate_broker_version (const struct rd_kafka_property *prop,
- const char *val, int ival) {
- struct rd_kafka_ApiVersion *apis;
- size_t api_cnt;
- return rd_kafka_get_legacy_ApiVersions(val, &apis, &api_cnt, NULL);
-}
-
-/**
- * @brief Validate that string is a single item, without delimters (, space).
- */
-static RD_UNUSED int
-rd_kafka_conf_validate_single (const struct rd_kafka_property *prop,
- const char *val, int ival) {
- return !strchr(val, ',') && !strchr(val, ' ');
-}
-
-
-/**
- * librdkafka configuration property definitions.
- */
-static const struct rd_kafka_property rd_kafka_properties[] = {
- /* Global properties */
- { _RK_GLOBAL, "builtin.features", _RK_C_S2F, _RK(builtin_features),
- "Indicates the builtin features for this build of librdkafka. "
- "An application can either query this value or attempt to set it "
- "with its list of required features to check for library support.",
- 0, 0x7fffffff, 0xffff,
- .s2i = {
-#if WITH_ZLIB
- { 0x1, "gzip" },
-#endif
-#if WITH_SNAPPY
- { 0x2, "snappy" },
-#endif
-#if WITH_SSL
- { 0x4, "ssl" },
-#endif
- { 0x8, "sasl" },
- { 0x10, "regex" },
- { 0x20, "lz4" },
-#if defined(_MSC_VER) || WITH_SASL_CYRUS
- { 0x40, "sasl_gssapi" },
-#endif
- { 0x80, "sasl_plain" },
-#if WITH_SASL_SCRAM
- { 0x100, "sasl_scram" },
-#endif
-#if WITH_PLUGINS
- { 0x200, "plugins" },
-#endif
- { 0, NULL }
- }
- },
- { _RK_GLOBAL, "client.id", _RK_C_STR, _RK(client_id_str),
- "Client identifier.",
- .sdef = "rdkafka" },
- { _RK_GLOBAL, "metadata.broker.list", _RK_C_STR, _RK(brokerlist),
- "Initial list of brokers as a CSV list of broker host or host:port. "
- "The application may also use `rd_kafka_brokers_add()` to add "
- "brokers during runtime." },
- { _RK_GLOBAL, "bootstrap.servers", _RK_C_ALIAS, 0,
- "See metadata.broker.list",
- .sdef = "metadata.broker.list" },
- { _RK_GLOBAL, "message.max.bytes", _RK_C_INT, _RK(max_msg_size),
- "Maximum Kafka protocol request message size.",
- 1000, 1000000000, 1000000 },
- { _RK_GLOBAL, "message.copy.max.bytes", _RK_C_INT,
- _RK(msg_copy_max_size),
- "Maximum size for message to be copied to buffer. "
- "Messages larger than this will be passed by reference (zero-copy) "
- "at the expense of larger iovecs.",
- 0, 1000000000, 0xffff },
- { _RK_GLOBAL, "receive.message.max.bytes", _RK_C_INT,
- _RK(recv_max_msg_size),
- "Maximum Kafka protocol response message size. "
- "This is a safety precaution to avoid memory exhaustion in case of "
- "protocol hickups. "
- "The value should be at least fetch.message.max.bytes * number of "
- "partitions consumed from + messaging overhead (e.g. 200000 bytes).",
- 1000, 1000000000, 100000000 },
- { _RK_GLOBAL, "max.in.flight.requests.per.connection", _RK_C_INT,
- _RK(max_inflight),
- "Maximum number of in-flight requests per broker connection. "
- "This is a generic property applied to all broker communication, "
- "however it is primarily relevant to produce requests. "
- "In particular, note that other mechanisms limit the number "
- "of outstanding consumer fetch request per broker to one.",
- 1, 1000000, 1000000 },
- { _RK_GLOBAL, "max.in.flight", _RK_C_ALIAS,
- .sdef = "max.in.flight.requests.per.connection" },
- { _RK_GLOBAL, "metadata.request.timeout.ms", _RK_C_INT,
- _RK(metadata_request_timeout_ms),
- "Non-topic request timeout in milliseconds. "
- "This is for metadata requests, etc.",
- 10, 900*1000, 60*1000},
- { _RK_GLOBAL, "topic.metadata.refresh.interval.ms", _RK_C_INT,
- _RK(metadata_refresh_interval_ms),
- "Topic metadata refresh interval in milliseconds. "
- "The metadata is automatically refreshed on error and connect. "
- "Use -1 to disable the intervalled refresh.",
- -1, 3600*1000, 5*60*1000 },
- { _RK_GLOBAL, "metadata.max.age.ms", _RK_C_INT,
- _RK(metadata_max_age_ms),
- "Metadata cache max age. "
- "Defaults to metadata.refresh.interval.ms * 3",
- 1, 24*3600*1000, -1 },
- { _RK_GLOBAL, "topic.metadata.refresh.fast.interval.ms", _RK_C_INT,
- _RK(metadata_refresh_fast_interval_ms),
- "When a topic loses its leader a new metadata request will be "
- "enqueued with this initial interval, exponentially increasing "
- "until the topic metadata has been refreshed. "
- "This is used to recover quickly from transitioning leader brokers.",
- 1, 60*1000, 250 },
- { _RK_GLOBAL, "topic.metadata.refresh.fast.cnt", _RK_C_INT,
- _RK(metadata_refresh_fast_cnt),
- "*Deprecated: No longer used.*",
- 0, 1000, 10 },
- { _RK_GLOBAL, "topic.metadata.refresh.sparse", _RK_C_BOOL,
- _RK(metadata_refresh_sparse),
- "Sparse metadata requests (consumes less network bandwidth)",
- 0, 1, 1 },
- { _RK_GLOBAL, "topic.blacklist", _RK_C_PATLIST,
- _RK(topic_blacklist),
- "Topic blacklist, a comma-separated list of regular expressions "
- "for matching topic names that should be ignored in "
- "broker metadata information as if the topics did not exist." },
- { _RK_GLOBAL, "debug", _RK_C_S2F, _RK(debug),
- "A comma-separated list of debug contexts to enable. "
- "Debugging the Producer: broker,topic,msg. Consumer: cgrp,topic,fetch",
- .s2i = {
- { RD_KAFKA_DBG_GENERIC, "generic" },
- { RD_KAFKA_DBG_BROKER, "broker" },
- { RD_KAFKA_DBG_TOPIC, "topic" },
- { RD_KAFKA_DBG_METADATA, "metadata" },
- { RD_KAFKA_DBG_QUEUE, "queue" },
- { RD_KAFKA_DBG_MSG, "msg" },
- { RD_KAFKA_DBG_PROTOCOL, "protocol" },
- { RD_KAFKA_DBG_CGRP, "cgrp" },
- { RD_KAFKA_DBG_SECURITY, "security" },
- { RD_KAFKA_DBG_FETCH, "fetch" },
- { RD_KAFKA_DBG_FEATURE, "feature" },
- { RD_KAFKA_DBG_INTERCEPTOR, "interceptor" },
- { RD_KAFKA_DBG_PLUGIN, "plugin" },
- { RD_KAFKA_DBG_ALL, "all" },
- } },
- { _RK_GLOBAL, "socket.timeout.ms", _RK_C_INT, _RK(socket_timeout_ms),
- "Timeout for network requests.",
- 10, 300*1000, 60*1000 },
- { _RK_GLOBAL, "socket.blocking.max.ms", _RK_C_INT,
- _RK(socket_blocking_max_ms),
- "Maximum time a broker socket operation may block. "
- "A lower value improves responsiveness at the expense of "
- "slightly higher CPU usage. **Deprecated**",
- 1, 60*1000, 1000 },
- { _RK_GLOBAL, "socket.send.buffer.bytes", _RK_C_INT,
- _RK(socket_sndbuf_size),
- "Broker socket send buffer size. System default is used if 0.",
- 0, 100000000, 0 },
- { _RK_GLOBAL, "socket.receive.buffer.bytes", _RK_C_INT,
- _RK(socket_rcvbuf_size),
- "Broker socket receive buffer size. System default is used if 0.",
- 0, 100000000, 0 },
- { _RK_GLOBAL, "socket.keepalive.enable", _RK_C_BOOL,
- _RK(socket_keepalive),
- "Enable TCP keep-alives (SO_KEEPALIVE) on broker sockets",
- 0, 1, 0 },
- { _RK_GLOBAL, "socket.nagle.disable", _RK_C_BOOL,
- _RK(socket_nagle_disable),
- "Disable the Nagle algorithm (TCP_NODELAY).",
- 0, 1, 0 },
- { _RK_GLOBAL, "socket.max.fails", _RK_C_INT,
- _RK(socket_max_fails),
- "Disconnect from broker when this number of send failures "
- "(e.g., timed out requests) is reached. Disable with 0. "
- "NOTE: The connection is automatically re-established.",
- 0, 1000000, 3 },
- { _RK_GLOBAL, "broker.address.ttl", _RK_C_INT,
- _RK(broker_addr_ttl),
- "How long to cache the broker address resolving "
- "results (milliseconds).",
- 0, 86400*1000, 1*1000 },
- { _RK_GLOBAL, "broker.address.family", _RK_C_S2I,
- _RK(broker_addr_family),
- "Allowed broker IP address families: any, v4, v6",
- .vdef = AF_UNSPEC,
- .s2i = {
- { AF_UNSPEC, "any" },
- { AF_INET, "v4" },
- { AF_INET6, "v6" },
- } },
- { _RK_GLOBAL, "reconnect.backoff.jitter.ms", _RK_C_INT,
- _RK(reconnect_jitter_ms),
- "Throttle broker reconnection attempts by this value +-50%.",
- 0, 60*60*1000, 500 },
- { _RK_GLOBAL, "statistics.interval.ms", _RK_C_INT,
- _RK(stats_interval_ms),
- "librdkafka statistics emit interval. The application also needs to "
- "register a stats callback using `rd_kafka_conf_set_stats_cb()`. "
- "The granularity is 1000ms. A value of 0 disables statistics.",
- 0, 86400*1000, 0 },
- { _RK_GLOBAL, "enabled_events", _RK_C_INT,
- _RK(enabled_events),
- "See `rd_kafka_conf_set_events()`",
- 0, 0x7fffffff, 0 },
- { _RK_GLOBAL, "error_cb", _RK_C_PTR,
- _RK(error_cb),
- "Error callback (set with rd_kafka_conf_set_error_cb())" },
- { _RK_GLOBAL, "throttle_cb", _RK_C_PTR,
- _RK(throttle_cb),
- "Throttle callback (set with rd_kafka_conf_set_throttle_cb())" },
- { _RK_GLOBAL, "stats_cb", _RK_C_PTR,
- _RK(stats_cb),
- "Statistics callback (set with rd_kafka_conf_set_stats_cb())" },
- { _RK_GLOBAL, "log_cb", _RK_C_PTR,
- _RK(log_cb),
- "Log callback (set with rd_kafka_conf_set_log_cb())",
- .pdef = rd_kafka_log_print },
- { _RK_GLOBAL, "log_level", _RK_C_INT,
- _RK(log_level),
- "Logging level (syslog(3) levels)",
- 0, 7, 6 },
- { _RK_GLOBAL, "log.queue", _RK_C_BOOL, _RK(log_queue),
- "Disable spontaneous log_cb from internal librdkafka "
- "threads, instead enqueue log messages on queue set with "
- "`rd_kafka_set_log_queue()` and serve log callbacks or "
- "events through the standard poll APIs. "
- "**NOTE**: Log messages will linger in a temporary queue "
- "until the log queue has been set.",
- 0, 1, 0 },
- { _RK_GLOBAL, "log.thread.name", _RK_C_BOOL,
- _RK(log_thread_name),
- "Print internal thread name in log messages "
- "(useful for debugging librdkafka internals)",
- 0, 1, 1 },
- { _RK_GLOBAL, "log.connection.close", _RK_C_BOOL,
- _RK(log_connection_close),
- "Log broker disconnects. "
- "It might be useful to turn this off when interacting with "
- "0.9 brokers with an aggressive `connection.max.idle.ms` value.",
- 0, 1, 1 },
- { _RK_GLOBAL, "socket_cb", _RK_C_PTR,
- _RK(socket_cb),
- "Socket creation callback to provide race-free CLOEXEC",
- .pdef =
-#ifdef __linux__
- rd_kafka_socket_cb_linux
-#else
- rd_kafka_socket_cb_generic
-#endif
- },
- { _RK_GLOBAL, "connect_cb", _RK_C_PTR,
- _RK(connect_cb),
- "Socket connect callback",
- },
- { _RK_GLOBAL, "closesocket_cb", _RK_C_PTR,
- _RK(closesocket_cb),
- "Socket close callback",
- },
- { _RK_GLOBAL, "open_cb", _RK_C_PTR,
- _RK(open_cb),
- "File open callback to provide race-free CLOEXEC",
- .pdef =
-#ifdef __linux__
- rd_kafka_open_cb_linux
-#else
- rd_kafka_open_cb_generic
-#endif
- },
- { _RK_GLOBAL, "opaque", _RK_C_PTR,
- _RK(opaque),
- "Application opaque (set with rd_kafka_conf_set_opaque())" },
- { _RK_GLOBAL, "default_topic_conf", _RK_C_PTR,
- _RK(topic_conf),
- "Default topic configuration for automatically subscribed topics" },
- { _RK_GLOBAL, "internal.termination.signal", _RK_C_INT,
- _RK(term_sig),
- "Signal that librdkafka will use to quickly terminate on "
- "rd_kafka_destroy(). If this signal is not set then there will be a "
- "delay before rd_kafka_wait_destroyed() returns true "
- "as internal threads are timing out their system calls. "
- "If this signal is set however the delay will be minimal. "
- "The application should mask this signal as an internal "
- "signal handler is installed.",
- 0, 128, 0 },
- { _RK_GLOBAL, "api.version.request", _RK_C_BOOL,
- _RK(api_version_request),
- "Request broker's supported API versions to adjust functionality to "
- "available protocol features. If set to false, or the "
- "ApiVersionRequest fails, the fallback version "
- "`broker.version.fallback` will be used. "
- "**NOTE**: Depends on broker version >=0.10.0. If the request is not "
- "supported by (an older) broker the `broker.version.fallback` fallback is used.",
- 0, 1, 1 },
- { _RK_GLOBAL, "api.version.request.timeout.ms", _RK_C_INT,
- _RK(api_version_request_timeout_ms),
- "Timeout for broker API version requests.",
- 1, 5*60*1000, 10*1000 },
- { _RK_GLOBAL, "api.version.fallback.ms", _RK_C_INT,
- _RK(api_version_fallback_ms),
- "Dictates how long the `broker.version.fallback` fallback is used "
- "in the case the ApiVersionRequest fails. "
- "**NOTE**: The ApiVersionRequest is only issued when a new connection "
- "to the broker is made (such as after an upgrade).",
- 0, 86400*7*1000, 20*60*1000 /* longer than default Idle timeout (10m)*/ },
-
- { _RK_GLOBAL, "broker.version.fallback", _RK_C_STR,
- _RK(broker_version_fallback),
- "Older broker versions (<0.10.0) provides no way for a client to query "
- "for supported protocol features "
- "(ApiVersionRequest, see `api.version.request`) making it impossible "
- "for the client to know what features it may use. "
- "As a workaround a user may set this property to the expected broker "
- "version and the client will automatically adjust its feature set "
- "accordingly if the ApiVersionRequest fails (or is disabled). "
- "The fallback broker version will be used for `api.version.fallback.ms`. "
- "Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. Any other value, "
- "such as 0.10.2.1, enables ApiVersionRequests.",
- .sdef = "0.9.0",
- .validate = rd_kafka_conf_validate_broker_version },
-
- /* Security related global properties */
- { _RK_GLOBAL, "security.protocol", _RK_C_S2I,
- _RK(security_protocol),
- "Protocol used to communicate with brokers.",
- .vdef = RD_KAFKA_PROTO_PLAINTEXT,
- .s2i = {
- { RD_KAFKA_PROTO_PLAINTEXT, "plaintext" },
-#if WITH_SSL
- { RD_KAFKA_PROTO_SSL, "ssl" },
-#endif
- { RD_KAFKA_PROTO_SASL_PLAINTEXT, "sasl_plaintext" },
-#if WITH_SSL
- { RD_KAFKA_PROTO_SASL_SSL, "sasl_ssl" },
-#endif
- { 0, NULL }
- } },
-
-#if WITH_SSL
- { _RK_GLOBAL, "ssl.cipher.suites", _RK_C_STR,
- _RK(ssl.cipher_suites),
- "A cipher suite is a named combination of authentication, "
- "encryption, MAC and key exchange algorithm used to negotiate the "
- "security settings for a network connection using TLS or SSL network "
- "protocol. See manual page for `ciphers(1)` and "
- "`SSL_CTX_set_cipher_list(3)."
- },
- { _RK_GLOBAL, "ssl.key.location", _RK_C_STR,
- _RK(ssl.key_location),
- "Path to client's private key (PEM) used for authentication."
- },
- { _RK_GLOBAL, "ssl.key.password", _RK_C_STR,
- _RK(ssl.key_password),
- "Private key passphrase"
- },
- { _RK_GLOBAL, "ssl.certificate.location", _RK_C_STR,
- _RK(ssl.cert_location),
- "Path to client's public key (PEM) used for authentication."
- },
- { _RK_GLOBAL, "ssl.ca.location", _RK_C_STR,
- _RK(ssl.ca_location),
- "File or directory path to CA certificate(s) for verifying "
- "the broker's key."
- },
- { _RK_GLOBAL, "ssl.crl.location", _RK_C_STR,
- _RK(ssl.crl_location),
- "Path to CRL for verifying broker's certificate validity."
- },
-#endif /* WITH_SSL */
-
- /* Point user in the right direction if they try to apply
- * Java client SSL / JAAS properties. */
- { _RK_GLOBAL, "ssl.keystore.location", _RK_C_INVALID,
- _RK(dummy),
- "Java KeyStores are not supported, use `ssl.key.location` and "
- "a private key (PEM) file instead. "
- "See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka for more information."
- },
- { _RK_GLOBAL, "ssl.truststore.location", _RK_C_INVALID,
- _RK(dummy),
- "Java TrustStores are not supported, use `ssl.ca.location` "
- "and a certificate file instead. "
- "See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka for more information."
- },
- { _RK_GLOBAL, "sasl.jaas.config", _RK_C_INVALID,
- _RK(dummy),
- "Java JAAS configuration is not supported, see "
- "https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka "
- "for more information."
- },
-
- {_RK_GLOBAL,"sasl.mechanisms", _RK_C_STR,
- _RK(sasl.mechanisms),
- "SASL mechanism to use for authentication. "
- "Supported: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. "
- "**NOTE**: Despite the name only one mechanism must be configured.",
- .sdef = "GSSAPI",
- .validate = rd_kafka_conf_validate_single },
- { _RK_GLOBAL, "sasl.kerberos.service.name", _RK_C_STR,
- _RK(sasl.service_name),
- "Kerberos principal name that Kafka runs as.",
- .sdef = "kafka" },
- { _RK_GLOBAL, "sasl.kerberos.principal", _RK_C_STR,
- _RK(sasl.principal),
- "This client's Kerberos principal name.",
- .sdef = "kafkaclient" },
-#ifndef _MSC_VER
- { _RK_GLOBAL, "sasl.kerberos.kinit.cmd", _RK_C_STR,
- _RK(sasl.kinit_cmd),
- "Full kerberos kinit command string, %{config.prop.name} is replaced "
- "by corresponding config object value, %{broker.name} returns the "
- "broker's hostname.",
- .sdef = "kinit -S \"%{sasl.kerberos.service.name}/%{broker.name}\" "
- "-k -t \"%{sasl.kerberos.keytab}\" %{sasl.kerberos.principal}" },
- { _RK_GLOBAL, "sasl.kerberos.keytab", _RK_C_STR,
- _RK(sasl.keytab),
- "Path to Kerberos keytab file. Uses system default if not set."
- "**NOTE**: This is not automatically used but must be added to the "
- "template in sasl.kerberos.kinit.cmd as "
- "` ... -t %{sasl.kerberos.keytab}`." },
- { _RK_GLOBAL, "sasl.kerberos.min.time.before.relogin", _RK_C_INT,
- _RK(sasl.relogin_min_time),
- "Minimum time in milliseconds between key refresh attempts.",
- 1, 86400*1000, 60*1000 },
-#endif
- { _RK_GLOBAL, "sasl.username", _RK_C_STR,
- _RK(sasl.username),
- "SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms" },
- { _RK_GLOBAL, "sasl.password", _RK_C_STR,
- _RK(sasl.password),
- "SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism" },
-
-#if WITH_PLUGINS
- /* Plugins */
- { _RK_GLOBAL, "plugin.library.paths", _RK_C_STR,
- _RK(plugin_paths),
- "List of plugin libaries to load (; separated). "
- "The library search path is platform dependent (see dlopen(3) for Unix and LoadLibrary() for Windows). If no filename extension is specified the "
- "platform-specific extension (such as .dll or .so) will be appended automatically.",
- .set = rd_kafka_plugins_conf_set },
-#endif
-
- /* Interceptors are added through specific API and not exposed
- * as configuration properties.
- * The interceptor property must be defined after plugin.library.paths
- * so that the plugin libraries are properly loaded before
- * interceptors are configured when duplicating configuration objects.*/
- { _RK_GLOBAL, "interceptors", _RK_C_INTERNAL,
- _RK(interceptors),
- "Interceptors added through rd_kafka_conf_interceptor_add_..() "
- "and any configuration handled by interceptors.",
- .ctor = rd_kafka_conf_interceptor_ctor,
- .dtor = rd_kafka_conf_interceptor_dtor,
- .copy = rd_kafka_conf_interceptor_copy },
-
- /* Global client group properties */
- { _RK_GLOBAL|_RK_CGRP, "group.id", _RK_C_STR,
- _RK(group_id_str),
- "Client group id string. All clients sharing the same group.id "
- "belong to the same group." },
- { _RK_GLOBAL|_RK_CGRP, "partition.assignment.strategy", _RK_C_STR,
- _RK(partition_assignment_strategy),
- "Name of partition assignment strategy to use when elected "
- "group leader assigns partitions to group members.",
- .sdef = "range,roundrobin" },
- { _RK_GLOBAL|_RK_CGRP, "session.timeout.ms", _RK_C_INT,
- _RK(group_session_timeout_ms),
- "Client group session and failure detection timeout.",
- 1, 3600*1000, 30*1000 },
- { _RK_GLOBAL|_RK_CGRP, "heartbeat.interval.ms", _RK_C_INT,
- _RK(group_heartbeat_intvl_ms),
- "Group session keepalive heartbeat interval.",
- 1, 3600*1000, 1*1000 },
- { _RK_GLOBAL|_RK_CGRP, "group.protocol.type", _RK_C_KSTR,
- _RK(group_protocol_type),
- "Group protocol type",
- .sdef = "consumer" },
- { _RK_GLOBAL|_RK_CGRP, "coordinator.query.interval.ms", _RK_C_INT,
- _RK(coord_query_intvl_ms),
- "How often to query for the current client group coordinator. "
- "If the currently assigned coordinator is down the configured "
- "query interval will be divided by ten to more quickly recover "
- "in case of coordinator reassignment.",
- 1, 3600*1000, 10*60*1000 },
-
- /* Global consumer properties */
- { _RK_GLOBAL|_RK_CONSUMER, "enable.auto.commit", _RK_C_BOOL,
- _RK(enable_auto_commit),
- "Automatically and periodically commit offsets in the background.",
- 0, 1, 1 },
- { _RK_GLOBAL|_RK_CONSUMER, "auto.commit.interval.ms", _RK_C_INT,
- _RK(auto_commit_interval_ms),
- "The frequency in milliseconds that the consumer offsets "
- "are committed (written) to offset storage. (0 = disable). "
- "This setting is used by the high-level consumer.",
- 0, 86400*1000, 5*1000 },
- { _RK_GLOBAL|_RK_CONSUMER, "enable.auto.offset.store", _RK_C_BOOL,
- _RK(enable_auto_offset_store),
- "Automatically store offset of last message provided to "
- "application.",
- 0, 1, 1 },
- { _RK_GLOBAL|_RK_CONSUMER, "queued.min.messages", _RK_C_INT,
- _RK(queued_min_msgs),
- "Minimum number of messages per topic+partition "
- "librdkafka tries to maintain in the local consumer queue.",
- 1, 10000000, 100000 },
- { _RK_GLOBAL|_RK_CONSUMER, "queued.max.messages.kbytes", _RK_C_INT,
- _RK(queued_max_msg_kbytes),
- "Maximum number of kilobytes per topic+partition in the "
- "local consumer queue. "
- "This value may be overshot by fetch.message.max.bytes. "
- "This property has higher priority than queued.min.messages.",
- 1, 1000000000, 1000000 /* 1 Gig */ },
- { _RK_GLOBAL|_RK_CONSUMER, "fetch.wait.max.ms", _RK_C_INT,
- _RK(fetch_wait_max_ms),
- "Maximum time the broker may wait to fill the response "
- "with fetch.min.bytes.",
- 0, 300*1000, 100 },
- { _RK_GLOBAL|_RK_CONSUMER, "fetch.message.max.bytes", _RK_C_INT,
- _RK(fetch_msg_max_bytes),
- "Initial maximum number of bytes per topic+partition to request when "
- "fetching messages from the broker. "
- "If the client encounters a message larger than this value "
- "it will gradually try to increase it until the "
- "entire message can be fetched.",
- 1, 1000000000, 1024*1024 },
- { _RK_GLOBAL|_RK_CONSUMER, "max.partition.fetch.bytes", _RK_C_ALIAS,
- .sdef = "fetch.message.max.bytes" },
- { _RK_GLOBAL|_RK_CONSUMER, "fetch.min.bytes", _RK_C_INT,
- _RK(fetch_min_bytes),
- "Minimum number of bytes the broker responds with. "
- "If fetch.wait.max.ms expires the accumulated data will "
- "be sent to the client regardless of this setting.",
- 1, 100000000, 1 },
- { _RK_GLOBAL|_RK_CONSUMER, "fetch.error.backoff.ms", _RK_C_INT,
- _RK(fetch_error_backoff_ms),
- "How long to postpone the next fetch request for a "
- "topic+partition in case of a fetch error.",
- 0, 300*1000, 500 },
- { _RK_GLOBAL|_RK_CONSUMER, "offset.store.method", _RK_C_S2I,
- _RK(offset_store_method),
- "Offset commit store method: "
- "'file' - local file store (offset.store.path, et.al), "
- "'broker' - broker commit store "
- "(requires Apache Kafka 0.8.2 or later on the broker).",
- .vdef = RD_KAFKA_OFFSET_METHOD_BROKER,
- .s2i = {
- { RD_KAFKA_OFFSET_METHOD_NONE, "none" },
- { RD_KAFKA_OFFSET_METHOD_FILE, "file" },
- { RD_KAFKA_OFFSET_METHOD_BROKER, "broker" }
- }
- },
- { _RK_GLOBAL|_RK_CONSUMER, "consume_cb", _RK_C_PTR,
- _RK(consume_cb),
- "Message consume callback (set with rd_kafka_conf_set_consume_cb())"},
- { _RK_GLOBAL|_RK_CONSUMER, "rebalance_cb", _RK_C_PTR,
- _RK(rebalance_cb),
- "Called after consumer group has been rebalanced "
- "(set with rd_kafka_conf_set_rebalance_cb())" },
- { _RK_GLOBAL|_RK_CONSUMER, "offset_commit_cb", _RK_C_PTR,
- _RK(offset_commit_cb),
- "Offset commit result propagation callback. "
- "(set with rd_kafka_conf_set_offset_commit_cb())" },
- { _RK_GLOBAL|_RK_CONSUMER, "enable.partition.eof", _RK_C_BOOL,
- _RK(enable_partition_eof),
- "Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the "
- "consumer reaches the end of a partition.",
- 0, 1, 1 },
- { _RK_GLOBAL|_RK_CONSUMER, "check.crcs", _RK_C_BOOL,
- _RK(check_crcs),
- "Verify CRC32 of consumed messages, ensuring no on-the-wire or "
- "on-disk corruption to the messages occurred. This check comes "
- "at slightly increased CPU usage.",
- 0, 1, 0 },
- /* Global producer properties */
- { _RK_GLOBAL|_RK_PRODUCER, "queue.buffering.max.messages", _RK_C_INT,
- _RK(queue_buffering_max_msgs),
- "Maximum number of messages allowed on the producer queue.",
- 1, 10000000, 100000 },
- { _RK_GLOBAL|_RK_PRODUCER, "queue.buffering.max.kbytes", _RK_C_INT,
- _RK(queue_buffering_max_kbytes),
- "Maximum total message size sum allowed on the producer queue. "
- "This property has higher priority than queue.buffering.max.messages.",
- 1, INT_MAX/1024, 4000000 },
- { _RK_GLOBAL|_RK_PRODUCER, "queue.buffering.max.ms", _RK_C_INT,
- _RK(buffering_max_ms),
- "Delay in milliseconds to wait for messages in the producer queue "
- "to accumulate before constructing message batches (MessageSets) to "
- "transmit to brokers. "
- "A higher value allows larger and more effective "
- "(less overhead, improved compression) batches of messages to "
- "accumulate at the expense of increased message delivery latency.",
- 0, 900*1000, 0 },
- { _RK_GLOBAL|_RK_PRODUCER, "linger.ms", _RK_C_ALIAS,
- .sdef = "queue.buffering.max.ms" },
- { _RK_GLOBAL|_RK_PRODUCER, "message.send.max.retries", _RK_C_INT,
- _RK(max_retries),
- "How many times to retry sending a failing MessageSet. "
- "**Note:** retrying may cause reordering.",
- 0, 10000000, 2 },
- { _RK_GLOBAL | _RK_PRODUCER, "retries", _RK_C_ALIAS,
- .sdef = "message.send.max.retries" },
- { _RK_GLOBAL|_RK_PRODUCER, "retry.backoff.ms", _RK_C_INT,
- _RK(retry_backoff_ms),
- "The backoff time in milliseconds before retrying a message send.",
- 1, 300*1000, 100 },
- { _RK_GLOBAL|_RK_PRODUCER, "compression.codec", _RK_C_S2I,
- _RK(compression_codec),
- "compression codec to use for compressing message sets. "
- "This is the default value for all topics, may be overriden by "
- "the topic configuration property `compression.codec`. ",
- .vdef = RD_KAFKA_COMPRESSION_NONE,
- .s2i = {
- { RD_KAFKA_COMPRESSION_NONE, "none" },
-#if WITH_ZLIB
- { RD_KAFKA_COMPRESSION_GZIP, "gzip" },
-#endif
-#if WITH_SNAPPY
- { RD_KAFKA_COMPRESSION_SNAPPY, "snappy" },
-#endif
- { RD_KAFKA_COMPRESSION_LZ4, "lz4" },
- { 0 }
- } },
- { _RK_GLOBAL|_RK_PRODUCER, "batch.num.messages", _RK_C_INT,
- _RK(batch_num_messages),
- "Maximum number of messages batched in one MessageSet. "
- "The total MessageSet size is also limited by message.max.bytes.",
- 1, 1000000, 10000 },
- { _RK_GLOBAL|_RK_PRODUCER, "delivery.report.only.error", _RK_C_BOOL,
- _RK(dr_err_only),
- "Only provide delivery reports for failed messages.",
- 0, 1, 0 },
- { _RK_GLOBAL|_RK_PRODUCER, "dr_cb", _RK_C_PTR,
- _RK(dr_cb),
- "Delivery report callback (set with rd_kafka_conf_set_dr_cb())" },
- { _RK_GLOBAL|_RK_PRODUCER, "dr_msg_cb", _RK_C_PTR,
- _RK(dr_msg_cb),
- "Delivery report callback (set with rd_kafka_conf_set_dr_msg_cb())" },
-
-
- /*
- * Topic properties
- */
-
- /* Topic producer properties */
- { _RK_TOPIC|_RK_PRODUCER, "request.required.acks", _RK_C_INT,
- _RKT(required_acks),
- "This field indicates how many acknowledgements the leader broker "
- "must receive from ISR brokers before responding to the request: "
- "*0*=Broker does not send any response/ack to client, "
- "*1*=Only the leader broker will need to ack the message, "
- "*-1* or *all*=broker will block until message is committed by all "
- "in sync replicas (ISRs) or broker's `in.sync.replicas` "
- "setting before sending response. ",
- -1, 1000, 1,
- .s2i = {
- { -1, "all" },
- }
- },
- { _RK_TOPIC | _RK_PRODUCER, "acks", _RK_C_ALIAS,
- .sdef = "request.required.acks" },
-
- { _RK_TOPIC|_RK_PRODUCER, "request.timeout.ms", _RK_C_INT,
- _RKT(request_timeout_ms),
- "The ack timeout of the producer request in milliseconds. "
- "This value is only enforced by the broker and relies "
- "on `request.required.acks` being != 0.",
- 1, 900*1000, 5*1000 },
- { _RK_TOPIC|_RK_PRODUCER, "message.timeout.ms", _RK_C_INT,
- _RKT(message_timeout_ms),
- "Local message timeout. "
- "This value is only enforced locally and limits the time a "
- "produced message waits for successful delivery. "
- "A time of 0 is infinite.",
- 0, 900*1000, 300*1000 },
- { _RK_TOPIC|_RK_PRODUCER, "produce.offset.report", _RK_C_BOOL,
- _RKT(produce_offset_report),
- "Report offset of produced message back to application. "
- "The application must be use the `dr_msg_cb` to retrieve the offset "
- "from `rd_kafka_message_t.offset`.",
- 0, 1, 0 },
- { _RK_TOPIC|_RK_PRODUCER, "partitioner_cb", _RK_C_PTR,
- _RKT(partitioner),
- "Partitioner callback "
- "(set with rd_kafka_topic_conf_set_partitioner_cb())" },
- { _RK_TOPIC, "opaque", _RK_C_PTR,
- _RKT(opaque),
- "Application opaque (set with rd_kafka_topic_conf_set_opaque())" },
- { _RK_TOPIC | _RK_PRODUCER, "compression.codec", _RK_C_S2I,
- _RKT(compression_codec),
- "Compression codec to use for compressing message sets. ",
- .vdef = RD_KAFKA_COMPRESSION_INHERIT,
- .s2i = {
- { RD_KAFKA_COMPRESSION_NONE, "none" },
-#if WITH_ZLIB
- { RD_KAFKA_COMPRESSION_GZIP, "gzip" },
-#endif
-#if WITH_SNAPPY
- { RD_KAFKA_COMPRESSION_SNAPPY, "snappy" },
-#endif
- { RD_KAFKA_COMPRESSION_LZ4, "lz4" },
- { RD_KAFKA_COMPRESSION_INHERIT, "inherit" },
- { 0 }
- } },
-
-
- /* Topic consumer properties */
- { _RK_TOPIC|_RK_CONSUMER, "auto.commit.enable", _RK_C_BOOL,
- _RKT(auto_commit),
- "If true, periodically commit offset of the last message handed "
- "to the application. This committed offset will be used when the "
- "process restarts to pick up where it left off. "
- "If false, the application will have to call "
- "`rd_kafka_offset_store()` to store an offset (optional). "
- "**NOTE:** This property should only be used with the simple "
- "legacy consumer, when using the high-level KafkaConsumer the global "
- "`enable.auto.commit` property must be used instead. "
- "**NOTE:** There is currently no zookeeper integration, offsets "
- "will be written to broker or local file according to "
- "offset.store.method.",
- 0, 1, 1 },
- { _RK_TOPIC|_RK_CONSUMER, "enable.auto.commit", _RK_C_ALIAS,
- .sdef = "auto.commit.enable" },
- { _RK_TOPIC|_RK_CONSUMER, "auto.commit.interval.ms", _RK_C_INT,
- _RKT(auto_commit_interval_ms),
- "The frequency in milliseconds that the consumer offsets "
- "are committed (written) to offset storage. "
- "This setting is used by the low-level legacy consumer.",
- 10, 86400*1000, 60*1000 },
- { _RK_TOPIC|_RK_CONSUMER, "auto.offset.reset", _RK_C_S2I,
- _RKT(auto_offset_reset),
- "Action to take when there is no initial offset in offset store "
- "or the desired offset is out of range: "
- "'smallest','earliest' - automatically reset the offset to the smallest offset, "
- "'largest','latest' - automatically reset the offset to the largest offset, "
- "'error' - trigger an error which is retrieved by consuming messages "
- "and checking 'message->err'.",
- .vdef = RD_KAFKA_OFFSET_END,
- .s2i = {
- { RD_KAFKA_OFFSET_BEGINNING, "smallest" },
- { RD_KAFKA_OFFSET_BEGINNING, "earliest" },
- { RD_KAFKA_OFFSET_BEGINNING, "beginning" },
- { RD_KAFKA_OFFSET_END, "largest" },
- { RD_KAFKA_OFFSET_END, "latest" },
- { RD_KAFKA_OFFSET_END, "end" },
- { RD_KAFKA_OFFSET_INVALID, "error" },
- }
- },
- { _RK_TOPIC|_RK_CONSUMER, "offset.store.path", _RK_C_STR,
- _RKT(offset_store_path),
- "Path to local file for storing offsets. If the path is a directory "
- "a filename will be automatically generated in that directory based "
- "on the topic and partition.",
- .sdef = "." },
-
- { _RK_TOPIC|_RK_CONSUMER, "offset.store.sync.interval.ms", _RK_C_INT,
- _RKT(offset_store_sync_interval_ms),
- "fsync() interval for the offset file, in milliseconds. "
- "Use -1 to disable syncing, and 0 for immediate sync after "
- "each write.",
- -1, 86400*1000, -1 },
-
- { _RK_TOPIC|_RK_CONSUMER, "offset.store.method", _RK_C_S2I,
- _RKT(offset_store_method),
- "Offset commit store method: "
- "'file' - local file store (offset.store.path, et.al), "
- "'broker' - broker commit store "
- "(requires \"group.id\" to be configured and "
- "Apache Kafka 0.8.2 or later on the broker.).",
- .vdef = RD_KAFKA_OFFSET_METHOD_BROKER,
- .s2i = {
- { RD_KAFKA_OFFSET_METHOD_FILE, "file" },
- { RD_KAFKA_OFFSET_METHOD_BROKER, "broker" }
- }
- },
-
- { _RK_TOPIC|_RK_CONSUMER, "consume.callback.max.messages", _RK_C_INT,
- _RKT(consume_callback_max_msgs),
- "Maximum number of messages to dispatch in "
- "one `rd_kafka_consume_callback*()` call (0 = unlimited)",
- 0, 1000000, 0 },
-
- { 0, /* End */ }
-};
-
-
-static rd_kafka_conf_res_t
-rd_kafka_anyconf_set_prop0 (int scope, void *conf,
- const struct rd_kafka_property *prop,
- const char *istr, int ival, rd_kafka_conf_set_mode_t set_mode,
- char *errstr, size_t errstr_size) {
- rd_kafka_conf_res_t res;
-
-#define _RK_PTR(TYPE,BASE,OFFSET) (TYPE)(void *)(((char *)(BASE))+(OFFSET))
-
- /* Try interceptors first (only for GLOBAL config) */
- if (scope & _RK_GLOBAL) {
- if (prop->type == _RK_C_PTR || prop->type == _RK_C_INTERNAL)
- res = RD_KAFKA_CONF_UNKNOWN;
- else
- res = rd_kafka_interceptors_on_conf_set(conf,
- prop->name,
- istr,
- errstr,
- errstr_size);
- if (res != RD_KAFKA_CONF_UNKNOWN)
- return res;
- }
-
-
- if (prop->set) {
- /* Custom setter */
- rd_kafka_conf_res_t res;
-
- res = prop->set(scope, conf, prop->name, istr,
- _RK_PTR(void *, conf, prop->offset),
- set_mode, errstr, errstr_size);
-
- if (res != RD_KAFKA_CONF_OK)
- return res;
-
- /* FALLTHRU so that property value is set. */
- }
-
- switch (prop->type)
- {
- case _RK_C_STR:
- {
- char **str = _RK_PTR(char **, conf, prop->offset);
- if (*str)
- rd_free(*str);
- if (istr)
- *str = rd_strdup(istr);
- else
- *str = prop->sdef ? rd_strdup(prop->sdef) : NULL;
- return RD_KAFKA_CONF_OK;
- }
- case _RK_C_KSTR:
- {
- rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **, conf,
- prop->offset);
- if (*kstr)
- rd_kafkap_str_destroy(*kstr);
- if (istr)
- *kstr = rd_kafkap_str_new(istr, -1);
- else
- *kstr = prop->sdef ?
- rd_kafkap_str_new(prop->sdef, -1) : NULL;
- return RD_KAFKA_CONF_OK;
- }
- case _RK_C_PTR:
- *_RK_PTR(const void **, conf, prop->offset) = istr;
- return RD_KAFKA_CONF_OK;
- case _RK_C_BOOL:
- case _RK_C_INT:
- case _RK_C_S2I:
- case _RK_C_S2F:
- {
- int *val = _RK_PTR(int *, conf, prop->offset);
-
- if (prop->type == _RK_C_S2F) {
- switch (set_mode)
- {
- case _RK_CONF_PROP_SET_REPLACE:
- *val = ival;
- break;
- case _RK_CONF_PROP_SET_ADD:
- *val |= ival;
- break;
- case _RK_CONF_PROP_SET_DEL:
- *val &= ~ival;
- break;
- }
- } else {
- /* Single assignment */
- *val = ival;
-
- }
- return RD_KAFKA_CONF_OK;
- }
- case _RK_C_PATLIST:
- {
- /* Split comma-separated list into individual regex expressions
- * that are verified and then append to the provided list. */
- rd_kafka_pattern_list_t **plist;
-
- plist = _RK_PTR(rd_kafka_pattern_list_t **, conf, prop->offset);
-
- if (*plist)
- rd_kafka_pattern_list_destroy(*plist);
-
- if (istr) {
- if (!(*plist =
- rd_kafka_pattern_list_new(istr,
- errstr,
- (int)errstr_size)))
- return RD_KAFKA_CONF_INVALID;
- } else
- *plist = NULL;
-
- return RD_KAFKA_CONF_OK;
- }
-
- case _RK_C_INTERNAL:
- /* Probably handled by setter */
- return RD_KAFKA_CONF_OK;
-
- default:
- rd_kafka_assert(NULL, !*"unknown conf type");
- }
-
- /* unreachable */
- return RD_KAFKA_CONF_INVALID;
-}
-
-
-/**
- * @brief Find s2i (string-to-int mapping) entry and return its array index,
- * or -1 on miss.
- */
-static int rd_kafka_conf_s2i_find (const struct rd_kafka_property *prop,
- const char *value) {
- int j;
-
- for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
- if (prop->s2i[j].str &&
- !rd_strcasecmp(prop->s2i[j].str, value))
- return j;
- }
-
- return -1;
-}
-
-
-static rd_kafka_conf_res_t
-rd_kafka_anyconf_set_prop (int scope, void *conf,
- const struct rd_kafka_property *prop,
- const char *value,
- char *errstr, size_t errstr_size) {
- int ival;
-
- switch (prop->type)
- {
- case _RK_C_STR:
- case _RK_C_KSTR:
- if (prop->s2i[0].str) {
- int match;
-
- if (!value ||
- (match = rd_kafka_conf_s2i_find(prop, value)) == -1){
- rd_snprintf(errstr, errstr_size,
- "Invalid value for "
- "configuration property \"%s\": "
- "%s",
- prop->name, value);
- return RD_KAFKA_CONF_INVALID;
- }
-
- /* Replace value string with canonical form */
- value = prop->s2i[match].str;
- }
- /* FALLTHRU */
- case _RK_C_PATLIST:
- if (prop->validate &&
- (!value || !prop->validate(prop, value, -1))) {
- rd_snprintf(errstr, errstr_size,
- "Invalid value for "
- "configuration property \"%s\": %s",
- prop->name, value);
- return RD_KAFKA_CONF_INVALID;
- }
-
- return rd_kafka_anyconf_set_prop0(scope, conf, prop, value, 0,
- _RK_CONF_PROP_SET_REPLACE,
- errstr, errstr_size);
-
- case _RK_C_PTR:
- rd_snprintf(errstr, errstr_size,
- "Property \"%s\" must be set through dedicated "
- ".._set_..() function", prop->name);
- return RD_KAFKA_CONF_INVALID;
-
- case _RK_C_BOOL:
- if (!value) {
- rd_snprintf(errstr, errstr_size,
- "Bool configuration property \"%s\" cannot "
- "be set to empty value", prop->name);
- return RD_KAFKA_CONF_INVALID;
- }
-
-
- if (!rd_strcasecmp(value, "true") ||
- !rd_strcasecmp(value, "t") ||
- !strcmp(value, "1"))
- ival = 1;
- else if (!rd_strcasecmp(value, "false") ||
- !rd_strcasecmp(value, "f") ||
- !strcmp(value, "0"))
- ival = 0;
- else {
- rd_snprintf(errstr, errstr_size,
- "Expected bool value for \"%s\": "
- "true or false", prop->name);
- return RD_KAFKA_CONF_INVALID;
- }
-
- rd_kafka_anyconf_set_prop0(scope, conf, prop, value, ival,
- _RK_CONF_PROP_SET_REPLACE,
- errstr, errstr_size);
- return RD_KAFKA_CONF_OK;
-
- case _RK_C_INT:
- {
- const char *end;
-
- if (!value) {
- rd_snprintf(errstr, errstr_size,
- "Integer configuration "
- "property \"%s\" cannot be set "
- "to empty value", prop->name);
- return RD_KAFKA_CONF_INVALID;
- }
-
- ival = (int)strtol(value, (char **)&end, 0);
- if (end == value) {
- /* Non numeric, check s2i for string mapping */
- int match = rd_kafka_conf_s2i_find(prop, value);
-
- if (match == -1) {
- rd_snprintf(errstr, errstr_size,
- "Invalid value for "
- "configuration property \"%s\"",
- prop->name);
- return RD_KAFKA_CONF_INVALID;
- }
-
- ival = prop->s2i[match].val;
- }
-
- if (ival < prop->vmin ||
- ival > prop->vmax) {
- rd_snprintf(errstr, errstr_size,
- "Configuration property \"%s\" value "
- "%i is outside allowed range %i..%i\n",
- prop->name, ival,
- prop->vmin,
- prop->vmax);
- return RD_KAFKA_CONF_INVALID;
- }
-
- rd_kafka_anyconf_set_prop0(scope, conf, prop, value, ival,
- _RK_CONF_PROP_SET_REPLACE,
- errstr, errstr_size);
- return RD_KAFKA_CONF_OK;
- }
-
- case _RK_C_S2I:
- case _RK_C_S2F:
- {
- int j;
- const char *next;
-
- if (!value) {
- rd_snprintf(errstr, errstr_size,
- "Configuration "
- "property \"%s\" cannot be set "
- "to empty value", prop->name);
- return RD_KAFKA_CONF_INVALID;
- }
-
- next = value;
- while (next && *next) {
- const char *s, *t;
- rd_kafka_conf_set_mode_t set_mode = _RK_CONF_PROP_SET_ADD; /* S2F */
-
- s = next;
-
- if (prop->type == _RK_C_S2F &&
- (t = strchr(s, ','))) {
- /* CSV flag field */
- next = t+1;
- } else {
- /* Single string */
- t = s+strlen(s);
- next = NULL;
- }
-
-
- /* Left trim */
- while (s < t && isspace((int)*s))
- s++;
-
- /* Right trim */
- while (t > s && isspace((int)*t))
- t--;
-
- /* S2F: +/- prefix */
- if (prop->type == _RK_C_S2F) {
- if (*s == '+') {
- set_mode = _RK_CONF_PROP_SET_ADD;
- s++;
- } else if (*s == '-') {
- set_mode = _RK_CONF_PROP_SET_DEL;
- s++;
- }
- }
-
- /* Empty string? */
- if (s == t)
- continue;
-
- /* Match string to s2i table entry */
- for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
- int new_val;
-
- if (!prop->s2i[j].str)
- continue;
-
- if (strlen(prop->s2i[j].str) == (size_t)(t-s) &&
- !rd_strncasecmp(prop->s2i[j].str, s,
- (int)(t-s)))
- new_val = prop->s2i[j].val;
- else
- continue;
-
- rd_kafka_anyconf_set_prop0(scope, conf, prop,
- value, new_val,
- set_mode,
- errstr, errstr_size);
-
- if (prop->type == _RK_C_S2F) {
- /* Flags: OR it in: do next */
- break;
- } else {
- /* Single assignment */
- return RD_KAFKA_CONF_OK;
- }
- }
-
- /* S2F: Good match: continue with next */
- if (j < (int)RD_ARRAYSIZE(prop->s2i))
- continue;
-
- /* No match */
- rd_snprintf(errstr, errstr_size,
- "Invalid value for "
- "configuration property \"%s\"", prop->name);
- return RD_KAFKA_CONF_INVALID;
-
- }
- return RD_KAFKA_CONF_OK;
- }
-
- case _RK_C_INTERNAL:
- rd_snprintf(errstr, errstr_size,
- "Internal property \"%s\" not settable",
- prop->name);
- return RD_KAFKA_CONF_INVALID;
-
- case _RK_C_INVALID:
- rd_snprintf(errstr, errstr_size, "%s", prop->desc);
- return RD_KAFKA_CONF_INVALID;
-
- default:
- rd_kafka_assert(NULL, !*"unknown conf type");
- }
-
- /* not reachable */
- return RD_KAFKA_CONF_INVALID;
-}
-
-
-
-static void rd_kafka_defaultconf_set (int scope, void *conf) {
- const struct rd_kafka_property *prop;
-
- for (prop = rd_kafka_properties ; prop->name ; prop++) {
- if (!(prop->scope & scope))
- continue;
-
- if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
- continue;
-
- if (prop->ctor)
- prop->ctor(scope, conf);
-
- if (prop->sdef || prop->vdef || prop->pdef)
- rd_kafka_anyconf_set_prop0(scope, conf, prop,
- prop->sdef ?
- prop->sdef : prop->pdef,
- prop->vdef,
- _RK_CONF_PROP_SET_REPLACE,
- NULL, 0);
- }
-}
-
-rd_kafka_conf_t *rd_kafka_conf_new (void) {
- rd_kafka_conf_t *conf = rd_calloc(1, sizeof(*conf));
- rd_kafka_defaultconf_set(_RK_GLOBAL, conf);
- return conf;
-}
-
-rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void) {
- rd_kafka_topic_conf_t *tconf = rd_calloc(1, sizeof(*tconf));
- rd_kafka_defaultconf_set(_RK_TOPIC, tconf);
- return tconf;
-}
-
-
-
-static int rd_kafka_anyconf_set (int scope, void *conf,
- const char *name, const char *value,
- char *errstr, size_t errstr_size) {
- char estmp[1];
- const struct rd_kafka_property *prop;
- rd_kafka_conf_res_t res;
-
- if (!errstr) {
- errstr = estmp;
- errstr_size = 0;
- }
-
- if (value && !*value)
- value = NULL;
-
- /* Try interceptors first (only for GLOBAL config for now) */
- if (scope & _RK_GLOBAL) {
- res = rd_kafka_interceptors_on_conf_set(
- (rd_kafka_conf_t *)conf, name, value,
- errstr, errstr_size);
- /* Handled (successfully or not) by interceptor. */
- if (res != RD_KAFKA_CONF_UNKNOWN)
- return res;
- }
-
- /* Then global config */
-
-
- for (prop = rd_kafka_properties ; prop->name ; prop++) {
-
- if (!(prop->scope & scope))
- continue;
-
- if (strcmp(prop->name, name))
- continue;
-
- if (prop->type == _RK_C_ALIAS)
- return rd_kafka_anyconf_set(scope, conf,
- prop->sdef, value,
- errstr, errstr_size);
-
- return rd_kafka_anyconf_set_prop(scope, conf, prop, value,
- errstr, errstr_size);
- }
-
- rd_snprintf(errstr, errstr_size,
- "No such configuration property: \"%s\"", name);
-
- return RD_KAFKA_CONF_UNKNOWN;
-}
-
-
-rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,
- const char *name,
- const char *value,
- char *errstr, size_t errstr_size) {
- rd_kafka_conf_res_t res;
-
- res = rd_kafka_anyconf_set(_RK_GLOBAL, conf, name, value,
- errstr, errstr_size);
- if (res != RD_KAFKA_CONF_UNKNOWN)
- return res;
-
- /* Fallthru:
- * If the global property was unknown, try setting it on the
- * default topic config. */
- if (!conf->topic_conf) {
- /* Create topic config, might be over-written by application
- * later. */
- conf->topic_conf = rd_kafka_topic_conf_new();
- }
-
- return rd_kafka_topic_conf_set(conf->topic_conf, name, value,
- errstr, errstr_size);
-}
-
-
-rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,
- const char *name,
- const char *value,
- char *errstr, size_t errstr_size) {
- if (!strncmp(name, "topic.", strlen("topic.")))
- name += strlen("topic.");
-
- return rd_kafka_anyconf_set(_RK_TOPIC, conf, name, value,
- errstr, errstr_size);
-}
-
-
-static void rd_kafka_anyconf_clear (int scope, void *conf,
- const struct rd_kafka_property *prop) {
- switch (prop->type)
- {
- case _RK_C_STR:
- {
- char **str = _RK_PTR(char **, conf, prop->offset);
-
- if (*str) {
- if (prop->set) {
- prop->set(scope, conf, prop->name, NULL, *str,
- _RK_CONF_PROP_SET_DEL, NULL, 0);
- /* FALLTHRU */
- }
- rd_free(*str);
- *str = NULL;
- }
- }
- break;
-
- case _RK_C_KSTR:
- {
- rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **, conf,
- prop->offset);
- if (*kstr) {
- rd_kafkap_str_destroy(*kstr);
- *kstr = NULL;
- }
- }
- break;
-
- case _RK_C_PATLIST:
- {
- rd_kafka_pattern_list_t **plist;
- plist = _RK_PTR(rd_kafka_pattern_list_t **, conf, prop->offset);
- if (*plist) {
- rd_kafka_pattern_list_destroy(*plist);
- *plist = NULL;
- }
- }
- break;
-
- case _RK_C_PTR:
- if (_RK_PTR(void *, conf, prop->offset) != NULL) {
- if (!strcmp(prop->name, "default_topic_conf")) {
- rd_kafka_topic_conf_t **tconf;
-
- tconf = _RK_PTR(rd_kafka_topic_conf_t **,
- conf, prop->offset);
- if (*tconf) {
- rd_kafka_topic_conf_destroy(*tconf);
- *tconf = NULL;
- }
- }
- }
- break;
-
- default:
- break;
- }
-
- if (prop->dtor)
- prop->dtor(scope, conf);
-
-}
-
-void rd_kafka_anyconf_destroy (int scope, void *conf) {
- const struct rd_kafka_property *prop;
-
- /* Call on_conf_destroy() interceptors */
- if (scope == _RK_GLOBAL)
- rd_kafka_interceptors_on_conf_destroy(conf);
-
- for (prop = rd_kafka_properties; prop->name ; prop++) {
- if (!(prop->scope & scope))
- continue;
-
- rd_kafka_anyconf_clear(scope, conf, prop);
- }
-}
-
-
-void rd_kafka_conf_destroy (rd_kafka_conf_t *conf) {
- rd_kafka_anyconf_destroy(_RK_GLOBAL, conf);
- //FIXME: partition_assignors
- rd_free(conf);
-}
-
-void rd_kafka_topic_conf_destroy (rd_kafka_topic_conf_t *topic_conf) {
- rd_kafka_anyconf_destroy(_RK_TOPIC, topic_conf);
- rd_free(topic_conf);
-}
-
-
-
-static void rd_kafka_anyconf_copy (int scope, void *dst, const void *src,
- size_t filter_cnt, const char **filter) {
- const struct rd_kafka_property *prop;
-
- for (prop = rd_kafka_properties ; prop->name ; prop++) {
- const char *val = NULL;
- int ival = 0;
- char *valstr;
- size_t valsz;
- size_t fi;
- size_t nlen;
-
- if (!(prop->scope & scope))
- continue;
-
- if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
- continue;
-
- /* Apply filter, if any. */
- nlen = strlen(prop->name);
- for (fi = 0 ; fi < filter_cnt ; fi++) {
- size_t flen = strlen(filter[fi]);
- if (nlen >= flen &&
- !strncmp(filter[fi], prop->name, flen))
- break;
- }
- if (fi < filter_cnt)
- continue; /* Filter matched */
-
- switch (prop->type)
- {
- case _RK_C_STR:
- case _RK_C_PTR:
- val = *_RK_PTR(const char **, src, prop->offset);
-
- if (!strcmp(prop->name, "default_topic_conf") && val)
- val = (void *)rd_kafka_topic_conf_dup(
- (const rd_kafka_topic_conf_t *)
- (void *)val);
- break;
- case _RK_C_KSTR:
- {
- rd_kafkap_str_t **kstr = _RK_PTR(rd_kafkap_str_t **,
- src, prop->offset);
- if (*kstr)
- val = (*kstr)->str;
- break;
- }
-
- case _RK_C_BOOL:
- case _RK_C_INT:
- case _RK_C_S2I:
- case _RK_C_S2F:
- ival = *_RK_PTR(const int *, src, prop->offset);
-
- /* Get string representation of configuration value. */
- valsz = 0;
- rd_kafka_anyconf_get0(src, prop, NULL, &valsz);
- valstr = rd_alloca(valsz);
- rd_kafka_anyconf_get0(src, prop, valstr, &valsz);
- val = valstr;
- break;
- case _RK_C_PATLIST:
- {
- const rd_kafka_pattern_list_t **plist;
- plist = _RK_PTR(const rd_kafka_pattern_list_t **,
- src, prop->offset);
- if (*plist)
- val = (*plist)->rkpl_orig;
- break;
- }
- case _RK_C_INTERNAL:
- /* Handled by ->copy() below. */
- break;
- default:
- continue;
- }
-
- if (prop->copy)
- prop->copy(scope, dst, src,
- _RK_PTR(void *, dst, prop->offset),
- _RK_PTR(const void *, src, prop->offset),
- filter_cnt, filter);
-
- rd_kafka_anyconf_set_prop0(scope, dst, prop, val, ival,
- _RK_CONF_PROP_SET_REPLACE, NULL, 0);
- }
-}
-
-
-rd_kafka_conf_t *rd_kafka_conf_dup (const rd_kafka_conf_t *conf) {
- rd_kafka_conf_t *new = rd_kafka_conf_new();
-
- rd_kafka_interceptors_on_conf_dup(new, conf, 0, NULL);
-
- rd_kafka_anyconf_copy(_RK_GLOBAL, new, conf, 0, NULL);
-
- return new;
-}
-
-rd_kafka_conf_t *rd_kafka_conf_dup_filter (const rd_kafka_conf_t *conf,
- size_t filter_cnt,
- const char **filter) {
- rd_kafka_conf_t *new = rd_kafka_conf_new();
-
- rd_kafka_interceptors_on_conf_dup(new, conf, filter_cnt, filter);
-
- rd_kafka_anyconf_copy(_RK_GLOBAL, new, conf, filter_cnt, filter);
-
- return new;
-}
-
-
-rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup (const rd_kafka_topic_conf_t
- *conf) {
- rd_kafka_topic_conf_t *new = rd_kafka_topic_conf_new();
-
- rd_kafka_anyconf_copy(_RK_TOPIC, new, conf, 0, NULL);
-
- return new;
-}
-
-
-void rd_kafka_conf_set_events (rd_kafka_conf_t *conf, int events) {
- conf->enabled_events = events;
-}
-
-
-void rd_kafka_conf_set_dr_cb (rd_kafka_conf_t *conf,
- void (*dr_cb) (rd_kafka_t *rk,
- void *payload, size_t len,
- rd_kafka_resp_err_t err,
- void *opaque, void *msg_opaque)) {
- conf->dr_cb = dr_cb;
-}
-
-
-void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf,
- void (*dr_msg_cb) (rd_kafka_t *rk,
- const rd_kafka_message_t *
- rkmessage,
- void *opaque)) {
- conf->dr_msg_cb = dr_msg_cb;
-}
-
-
-void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf,
- void (*consume_cb) (rd_kafka_message_t *
- rkmessage,
- void *opaque)) {
- conf->consume_cb = consume_cb;
-}
-
-void rd_kafka_conf_set_rebalance_cb (
- rd_kafka_conf_t *conf,
- void (*rebalance_cb) (rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *partitions,
- void *opaque)) {
- conf->rebalance_cb = rebalance_cb;
-}
-
-void rd_kafka_conf_set_offset_commit_cb (
- rd_kafka_conf_t *conf,
- void (*offset_commit_cb) (rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *offsets,
- void *opaque)) {
- conf->offset_commit_cb = offset_commit_cb;
-}
-
-
-
-void rd_kafka_conf_set_error_cb (rd_kafka_conf_t *conf,
- void (*error_cb) (rd_kafka_t *rk, int err,
- const char *reason,
- void *opaque)) {
- conf->error_cb = error_cb;
-}
-
-
-void rd_kafka_conf_set_throttle_cb (rd_kafka_conf_t *conf,
- void (*throttle_cb) (
- rd_kafka_t *rk,
- const char *broker_name,
- int32_t broker_id,
- int throttle_time_ms,
- void *opaque)) {
- conf->throttle_cb = throttle_cb;
-}
-
-
-void rd_kafka_conf_set_log_cb (rd_kafka_conf_t *conf,
- void (*log_cb) (const rd_kafka_t *rk, int level,
- const char *fac, const char *buf)) {
- conf->log_cb = log_cb;
-}
-
-
-void rd_kafka_conf_set_stats_cb (rd_kafka_conf_t *conf,
- int (*stats_cb) (rd_kafka_t *rk,
- char *json,
- size_t json_len,
- void *opaque)) {
- conf->stats_cb = stats_cb;
-}
-
-void rd_kafka_conf_set_socket_cb (rd_kafka_conf_t *conf,
- int (*socket_cb) (int domain, int type,
- int protocol,
- void *opaque)) {
- conf->socket_cb = socket_cb;
-}
-
-void
-rd_kafka_conf_set_connect_cb (rd_kafka_conf_t *conf,
- int (*connect_cb) (int sockfd,
- const struct sockaddr *addr,
- int addrlen,
- const char *id,
- void *opaque)) {
- conf->connect_cb = connect_cb;
-}
-
-void
-rd_kafka_conf_set_closesocket_cb (rd_kafka_conf_t *conf,
- int (*closesocket_cb) (int sockfd,
- void *opaque)) {
- conf->closesocket_cb = closesocket_cb;
-}
-
-
-
-#ifndef _MSC_VER
-void rd_kafka_conf_set_open_cb (rd_kafka_conf_t *conf,
- int (*open_cb) (const char *pathname,
- int flags, mode_t mode,
- void *opaque)) {
- conf->open_cb = open_cb;
-}
-#endif
-
-void rd_kafka_conf_set_opaque (rd_kafka_conf_t *conf, void *opaque) {
- conf->opaque = opaque;
-}
-
-
-void rd_kafka_conf_set_default_topic_conf (rd_kafka_conf_t *conf,
- rd_kafka_topic_conf_t *tconf) {
- if (conf->topic_conf)
- rd_kafka_topic_conf_destroy(conf->topic_conf);
-
- conf->topic_conf = tconf;
-}
-
-
-void
-rd_kafka_topic_conf_set_partitioner_cb (rd_kafka_topic_conf_t *topic_conf,
- int32_t (*partitioner) (
- const rd_kafka_topic_t *rkt,
- const void *keydata,
- size_t keylen,
- int32_t partition_cnt,
- void *rkt_opaque,
- void *msg_opaque)) {
- topic_conf->partitioner = partitioner;
-}
-
-void rd_kafka_topic_conf_set_opaque (rd_kafka_topic_conf_t *topic_conf,
- void *opaque) {
- topic_conf->opaque = opaque;
-}
-
-
-
-
-/**
- * @brief Convert flags \p ival to csv-string using S2F property \p prop.
- *
- * This function has two modes: size query and write.
- * To query for needed size call with dest==NULL,
- * to write to buffer of size dest_size call with dest!=NULL.
- *
- * An \p ival of -1 means all.
- *
- * @returns the number of bytes written to \p dest (if not NULL), else the
- * total number of bytes needed.
- *
- */
-size_t rd_kafka_conf_flags2str (char *dest, size_t dest_size, const char *delim,
- const struct rd_kafka_property *prop,
- int ival) {
- size_t of = 0;
- int j;
-
- if (dest && dest_size > 0)
- *dest = '\0';
-
- /* Phase 1: scan for set flags, accumulate needed size.
- * Phase 2: write to dest */
- for (j = 0 ; prop->s2i[j].str ; j++) {
- if (prop->type == _RK_C_S2F && ival != -1 &&
- (ival & prop->s2i[j].val) != prop->s2i[j].val)
- continue;
- else if (prop->type == _RK_C_S2I &&
- ival != -1 && prop->s2i[j].val != ival)
- continue;
-
- if (!dest)
- of += strlen(prop->s2i[j].str) + (of > 0 ? 1 : 0);
- else {
- size_t r;
- r = rd_snprintf(dest+of, dest_size-of,
- "%s%s",
- of > 0 ? delim:"",
- prop->s2i[j].str);
- if (r > dest_size-of) {
- r = dest_size-of;
- break;
- }
- of += r;
- }
- }
-
- return of+1/*nul*/;
-}
-
-
-/**
- * Return "original"(re-created) configuration value string
- */
-static rd_kafka_conf_res_t
-rd_kafka_anyconf_get0 (const void *conf, const struct rd_kafka_property *prop,
- char *dest, size_t *dest_size) {
- char tmp[22];
- const char *val = NULL;
- size_t val_len = 0;
- int j;
-
- switch (prop->type)
- {
- case _RK_C_STR:
- val = *_RK_PTR(const char **, conf, prop->offset);
- break;
-
- case _RK_C_KSTR:
- {
- const rd_kafkap_str_t **kstr = _RK_PTR(const rd_kafkap_str_t **,
- conf, prop->offset);
- if (*kstr)
- val = (*kstr)->str;
- break;
- }
-
- case _RK_C_PTR:
- val = *_RK_PTR(const void **, conf, prop->offset);
- if (val) {
- rd_snprintf(tmp, sizeof(tmp), "%p", (void *)val);
- val = tmp;
- }
- break;
-
- case _RK_C_BOOL:
- val = (*_RK_PTR(int *, conf, prop->offset) ? "true" : "false");
- break;
-
- case _RK_C_INT:
- rd_snprintf(tmp, sizeof(tmp), "%i",
- *_RK_PTR(int *, conf, prop->offset));
- val = tmp;
- break;
-
- case _RK_C_S2I:
- for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
- if (prop->s2i[j].val ==
- *_RK_PTR(int *, conf, prop->offset)) {
- val = prop->s2i[j].str;
- break;
- }
- }
- break;
-
- case _RK_C_S2F:
- {
- const int ival = *_RK_PTR(const int *, conf, prop->offset);
-
- val_len = rd_kafka_conf_flags2str(dest, *dest_size, ",",
- prop, ival);
- if (dest) {
- val_len = 0;
- val = dest;
- dest = NULL;
- }
- break;
- }
-
- case _RK_C_PATLIST:
- {
- const rd_kafka_pattern_list_t **plist;
- plist = _RK_PTR(const rd_kafka_pattern_list_t **,
- conf, prop->offset);
- if (*plist)
- val = (*plist)->rkpl_orig;
- break;
- }
-
- default:
- break;
- }
-
- if (val_len) {
- *dest_size = val_len+1;
- return RD_KAFKA_CONF_OK;
- }
-
- if (!val)
- return RD_KAFKA_CONF_INVALID;
-
- val_len = strlen(val);
-
- if (dest) {
- size_t use_len = RD_MIN(val_len, (*dest_size)-1);
- memcpy(dest, val, use_len);
- dest[use_len] = '\0';
- }
-
- /* Return needed size */
- *dest_size = val_len+1;
-
- return RD_KAFKA_CONF_OK;
-}
-
-
-static rd_kafka_conf_res_t rd_kafka_anyconf_get (int scope, const void *conf,
- const char *name,
- char *dest, size_t *dest_size){
- const struct rd_kafka_property *prop;
-
- for (prop = rd_kafka_properties; prop->name ; prop++) {
-
- if (!(prop->scope & scope) || strcmp(prop->name, name))
- continue;
-
- if (prop->type == _RK_C_ALIAS)
- return rd_kafka_anyconf_get(scope, conf,
- prop->sdef,
- dest, dest_size);
-
- if (rd_kafka_anyconf_get0(conf, prop, dest, dest_size) ==
- RD_KAFKA_CONF_OK)
- return RD_KAFKA_CONF_OK;
- }
-
- return RD_KAFKA_CONF_UNKNOWN;
-}
-
-rd_kafka_conf_res_t rd_kafka_topic_conf_get (const rd_kafka_topic_conf_t *conf,
- const char *name,
- char *dest, size_t *dest_size) {
- return rd_kafka_anyconf_get(_RK_TOPIC, conf, name, dest, dest_size);
-}
-
-rd_kafka_conf_res_t rd_kafka_conf_get (const rd_kafka_conf_t *conf,
- const char *name,
- char *dest, size_t *dest_size) {
- rd_kafka_conf_res_t res;
- res = rd_kafka_anyconf_get(_RK_GLOBAL, conf, name, dest, dest_size);
- if (res != RD_KAFKA_CONF_UNKNOWN || !conf->topic_conf)
- return res;
-
- /* Fallthru:
- * If the global property was unknown, try getting it from the
- * default topic config, if any. */
- return rd_kafka_topic_conf_get(conf->topic_conf, name, dest, dest_size);
-}
-
-
-static const char **rd_kafka_anyconf_dump (int scope, const void *conf,
- size_t *cntp) {
- const struct rd_kafka_property *prop;
- char **arr;
- int cnt = 0;
-
- arr = rd_calloc(sizeof(char *), RD_ARRAYSIZE(rd_kafka_properties)*2);
-
- for (prop = rd_kafka_properties; prop->name ; prop++) {
- char *val = NULL;
- size_t val_size;
-
- if (!(prop->scope & scope))
- continue;
-
- /* Skip aliases, show original property instead.
- * Skip invalids. */
- if (prop->type == _RK_C_ALIAS || prop->type == _RK_C_INVALID)
- continue;
-
- /* Query value size */
- if (rd_kafka_anyconf_get0(conf, prop, NULL, &val_size) !=
- RD_KAFKA_CONF_OK)
- continue;
-
- /* Get value */
- val = malloc(val_size);
- rd_kafka_anyconf_get0(conf, prop, val, &val_size);
-
- arr[cnt++] = rd_strdup(prop->name);
- arr[cnt++] = val;
- }
-
- *cntp = cnt;
-
- return (const char **)arr;
-}
-
-
-const char **rd_kafka_conf_dump (rd_kafka_conf_t *conf, size_t *cntp) {
- return rd_kafka_anyconf_dump(_RK_GLOBAL, conf, cntp);
-}
-
-const char **rd_kafka_topic_conf_dump (rd_kafka_topic_conf_t *conf,
- size_t *cntp) {
- return rd_kafka_anyconf_dump(_RK_TOPIC, conf, cntp);
-}
-
-void rd_kafka_conf_dump_free (const char **arr, size_t cnt) {
- char **_arr = (char **)arr;
- unsigned int i;
-
- for (i = 0 ; i < cnt ; i++)
- if (_arr[i])
- rd_free(_arr[i]);
-
- rd_free(_arr);
-}
-
-void rd_kafka_conf_properties_show (FILE *fp) {
- const struct rd_kafka_property *prop;
- int last = 0;
- int j;
- char tmp[512];
- const char *dash80 = "----------------------------------------"
- "----------------------------------------";
-
- for (prop = rd_kafka_properties; prop->name ; prop++) {
- const char *typeinfo = "";
-
- /* Skip invalid properties. */
- if (prop->type == _RK_C_INVALID)
- continue;
-
- if (!(prop->scope & last)) {
- fprintf(fp,
- "%s## %s configuration properties\n\n",
- last ? "\n\n":"",
- prop->scope == _RK_GLOBAL ? "Global": "Topic");
-
- fprintf(fp,
- "%-40s | %3s | %-15s | %13s | %-25s\n"
- "%.*s-|-%.*s-|-%.*s-|-%.*s:|-%.*s\n",
- "Property", "C/P", "Range",
- "Default", "Description",
- 40, dash80, 3, dash80, 15, dash80,
- 13, dash80, 25, dash80);
-
- last = prop->scope & (_RK_GLOBAL|_RK_TOPIC);
-
- }
-
- fprintf(fp, "%-40s | %3s | ", prop->name,
- (!(prop->scope & _RK_PRODUCER) ==
- !(prop->scope & _RK_CONSUMER) ? " * " :
- ((prop->scope & _RK_PRODUCER) ? " P " :
- (prop->scope & _RK_CONSUMER) ? " C " : "")));
-
- switch (prop->type)
- {
- case _RK_C_STR:
- case _RK_C_KSTR:
- typeinfo = "string";
- case _RK_C_PATLIST:
- if (prop->type == _RK_C_PATLIST)
- typeinfo = "pattern list";
- if (prop->s2i[0].str) {
- rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ",
- prop, -1);
- fprintf(fp, "%-15s | %13s",
- tmp, prop->sdef ? prop->sdef : "");
- } else {
- fprintf(fp, "%-15s | %13s",
- "", prop->sdef ? prop->sdef : "");
- }
- break;
- case _RK_C_BOOL:
- typeinfo = "boolean";
- fprintf(fp, "%-15s | %13s", "true, false",
- prop->vdef ? "true" : "false");
- break;
- case _RK_C_INT:
- typeinfo = "integer";
- rd_snprintf(tmp, sizeof(tmp),
- "%d .. %d", prop->vmin, prop->vmax);
- fprintf(fp, "%-15s | %13i", tmp, prop->vdef);
- break;
- case _RK_C_S2I:
- typeinfo = "enum value";
- rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ",
- prop, -1);
- fprintf(fp, "%-15s | ", tmp);
-
- for (j = 0 ; j < (int)RD_ARRAYSIZE(prop->s2i); j++) {
- if (prop->s2i[j].val == prop->vdef) {
- fprintf(fp, "%13s", prop->s2i[j].str);
- break;
- }
- }
- if (j == RD_ARRAYSIZE(prop->s2i))
- fprintf(fp, "%13s", " ");
- break;
-
- case _RK_C_S2F:
- typeinfo = "CSV flags";
- /* Dont duplicate builtin.features value in
- * both Range and Default */
- if (!strcmp(prop->name, "builtin.features"))
- *tmp = '\0';
- else
- rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ",
- prop, -1);
- fprintf(fp, "%-15s | ", tmp);
- rd_kafka_conf_flags2str(tmp, sizeof(tmp), ", ",
- prop, prop->vdef);
- fprintf(fp, "%13s", tmp);
-
- break;
- case _RK_C_PTR:
- typeinfo = "pointer";
- /* FALLTHRU */
- default:
- fprintf(fp, "%-15s | %-13s", "", " ");
- break;
- }
-
- if (prop->type == _RK_C_ALIAS)
- fprintf(fp, " | Alias for `%s`\n", prop->sdef);
- else
- fprintf(fp, " | %s <br>*Type: %s*\n", prop->desc,
- typeinfo);
- }
- fprintf(fp, "\n");
- fprintf(fp, "### C/P legend: C = Consumer, P = Producer, * = both\n");
-}