You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:56 UTC
[28/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade
to librdkafka 0.11.4
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_conf.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_conf.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_conf.h
deleted file mode 100644
index 0da8015..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_conf.h
+++ /dev/null
@@ -1,338 +0,0 @@
-#pragma once
-
-#include "rdlist.h"
-
-
-/**
- * Forward declarations
- */
-struct rd_kafka_transport_s;
-
-
-/**
- * MessageSet compression codecs
- */
-typedef enum {
- RD_KAFKA_COMPRESSION_NONE,
- RD_KAFKA_COMPRESSION_GZIP = RD_KAFKA_MSG_ATTR_GZIP,
- RD_KAFKA_COMPRESSION_SNAPPY = RD_KAFKA_MSG_ATTR_SNAPPY,
- RD_KAFKA_COMPRESSION_LZ4 = RD_KAFKA_MSG_ATTR_LZ4,
- RD_KAFKA_COMPRESSION_INHERIT /* Inherit setting from global conf */
-} rd_kafka_compression_t;
-
-
-typedef enum {
- RD_KAFKA_PROTO_PLAINTEXT,
- RD_KAFKA_PROTO_SSL,
- RD_KAFKA_PROTO_SASL_PLAINTEXT,
- RD_KAFKA_PROTO_SASL_SSL,
- RD_KAFKA_PROTO_NUM,
-} rd_kafka_secproto_t;
-
-
-typedef enum {
- RD_KAFKA_CONFIGURED,
- RD_KAFKA_LEARNED,
- RD_KAFKA_INTERNAL,
-} rd_kafka_confsource_t;
-
-typedef enum {
- _RK_GLOBAL = 0x1,
- _RK_PRODUCER = 0x2,
- _RK_CONSUMER = 0x4,
- _RK_TOPIC = 0x8,
- _RK_CGRP = 0x10
-} rd_kafka_conf_scope_t;
-
-typedef enum {
- _RK_CONF_PROP_SET_REPLACE, /* Replace current value (default) */
- _RK_CONF_PROP_SET_ADD, /* Add value (S2F) */
- _RK_CONF_PROP_SET_DEL /* Remove value (S2F) */
-} rd_kafka_conf_set_mode_t;
-
-
-
-typedef enum {
- RD_KAFKA_OFFSET_METHOD_NONE,
- RD_KAFKA_OFFSET_METHOD_FILE,
- RD_KAFKA_OFFSET_METHOD_BROKER
-} rd_kafka_offset_method_t;
-
-
-
-
-/**
- * Optional configuration struct passed to rd_kafka_new*().
- *
- * The struct is populated ted through string properties
- * by calling rd_kafka_conf_set().
- *
- */
-struct rd_kafka_conf_s {
- /*
- * Generic configuration
- */
- int enabled_events;
- int max_msg_size;
- int msg_copy_max_size;
- int recv_max_msg_size;
- int max_inflight;
- int metadata_request_timeout_ms;
- int metadata_refresh_interval_ms;
- int metadata_refresh_fast_cnt;
- int metadata_refresh_fast_interval_ms;
- int metadata_refresh_sparse;
- int metadata_max_age_ms;
- int debug;
- int broker_addr_ttl;
- int broker_addr_family;
- int socket_timeout_ms;
- int socket_blocking_max_ms;
- int socket_sndbuf_size;
- int socket_rcvbuf_size;
- int socket_keepalive;
- int socket_nagle_disable;
- int socket_max_fails;
- char *client_id_str;
- char *brokerlist;
- int stats_interval_ms;
- int term_sig;
- int reconnect_jitter_ms;
- int api_version_request;
- int api_version_request_timeout_ms;
- int api_version_fallback_ms;
- char *broker_version_fallback;
- rd_kafka_secproto_t security_protocol;
-
-#if WITH_SSL
- struct {
- SSL_CTX *ctx;
- char *cipher_suites;
- char *key_location;
- char *key_password;
- char *cert_location;
- char *ca_location;
- char *crl_location;
- } ssl;
-#endif
-
- struct {
- const struct rd_kafka_sasl_provider *provider;
- char *principal;
- char *mechanisms;
- char *service_name;
- char *kinit_cmd;
- char *keytab;
- int relogin_min_time;
- char *username;
- char *password;
-#if WITH_SASL_SCRAM
- /* SCRAM EVP-wrapped hash function
- * (return value from EVP_shaX()) */
- const void/*EVP_MD*/ *scram_evp;
- /* SCRAM direct hash function (e.g., SHA256()) */
- unsigned char *(*scram_H) (const unsigned char *d, size_t n,
- unsigned char *md);
- /* Hash size */
- size_t scram_H_size;
-#endif
- } sasl;
-
-#if WITH_PLUGINS
- char *plugin_paths;
- rd_list_t plugins;
-#endif
-
- /* Interceptors */
- struct {
- /* rd_kafka_interceptor_method_t lists */
- rd_list_t on_conf_set; /* on_conf_set interceptors
- * (not copied on conf_dup()) */
- rd_list_t on_conf_dup; /* .. (not copied) */
- rd_list_t on_conf_destroy; /* .. (not copied) */
- rd_list_t on_new; /* .. (copied) */
- rd_list_t on_destroy; /* .. (copied) */
- rd_list_t on_send; /* .. (copied) */
- rd_list_t on_acknowledgement; /* .. (copied) */
- rd_list_t on_consume; /* .. (copied) */
- rd_list_t on_commit; /* .. (copied) */
-
- /* rd_strtup_t list */
- rd_list_t config; /* Configuration name=val's
- * handled by interceptors. */
- } interceptors;
-
- /* Client group configuration */
- int coord_query_intvl_ms;
-
- int builtin_features;
- /*
- * Consumer configuration
- */
- int check_crcs;
- int queued_min_msgs;
- int queued_max_msg_kbytes;
- int64_t queued_max_msg_bytes;
- int fetch_wait_max_ms;
- int fetch_msg_max_bytes;
- int fetch_min_bytes;
- int fetch_error_backoff_ms;
- char *group_id_str;
-
- rd_kafka_pattern_list_t *topic_blacklist;
- struct rd_kafka_topic_conf_s *topic_conf; /* Default topic config
- * for automatically
- * subscribed topics. */
- int enable_auto_commit;
- int enable_auto_offset_store;
- int auto_commit_interval_ms;
- int group_session_timeout_ms;
- int group_heartbeat_intvl_ms;
- rd_kafkap_str_t *group_protocol_type;
- char *partition_assignment_strategy;
- rd_list_t partition_assignors;
- int enabled_assignor_cnt;
- struct rd_kafka_assignor_s *assignor;
-
- void (*rebalance_cb) (rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *partitions,
- void *opaque);
-
- void (*offset_commit_cb) (rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *offsets,
- void *opaque);
-
- rd_kafka_offset_method_t offset_store_method;
- int enable_partition_eof;
-
- /*
- * Producer configuration
- */
- int queue_buffering_max_msgs;
- int queue_buffering_max_kbytes;
- int buffering_max_ms;
- int max_retries;
- int retry_backoff_ms;
- int batch_num_messages;
- rd_kafka_compression_t compression_codec;
- int dr_err_only;
-
- /* Message delivery report callback.
- * Called once for each produced message, either on
- * successful and acknowledged delivery to the broker in which
- * case 'err' is 0, or if the message could not be delivered
- * in which case 'err' is non-zero (use rd_kafka_err2str()
- * to obtain a human-readable error reason).
- *
- * If the message was produced with neither RD_KAFKA_MSG_F_FREE
- * or RD_KAFKA_MSG_F_COPY set then 'payload' is the original
- * pointer provided to rd_kafka_produce().
- * rdkafka will not perform any further actions on 'payload'
- * at this point and the application may rd_free the payload data
- * at this point.
- *
- * 'opaque' is 'conf.opaque', while 'msg_opaque' is
- * the opaque pointer provided in the rd_kafka_produce() call.
- */
- void (*dr_cb) (rd_kafka_t *rk,
- void *payload, size_t len,
- rd_kafka_resp_err_t err,
- void *opaque, void *msg_opaque);
-
- void (*dr_msg_cb) (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
- void *opaque);
-
- /* Consume callback */
- void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque);
-
- /* Log callback */
- void (*log_cb) (const rd_kafka_t *rk, int level,
- const char *fac, const char *buf);
- int log_level;
- int log_queue;
- int log_thread_name;
- int log_connection_close;
-
- /* Error callback */
- void (*error_cb) (rd_kafka_t *rk, int err,
- const char *reason, void *opaque);
-
- /* Throttle callback */
- void (*throttle_cb) (rd_kafka_t *rk, const char *broker_name,
- int32_t broker_id, int throttle_time_ms,
- void *opaque);
-
- /* Stats callback */
- int (*stats_cb) (rd_kafka_t *rk,
- char *json,
- size_t json_len,
- void *opaque);
-
- /* Socket creation callback */
- int (*socket_cb) (int domain, int type, int protocol, void *opaque);
-
- /* Connect callback */
- int (*connect_cb) (int sockfd,
- const struct sockaddr *addr,
- int addrlen,
- const char *id,
- void *opaque);
-
- /* Close socket callback */
- int (*closesocket_cb) (int sockfd, void *opaque);
-
- /* File open callback */
- int (*open_cb) (const char *pathname, int flags, mode_t mode,
- void *opaque);
-
- /* Opaque passed to callbacks. */
- void *opaque;
-
- /* For use with value-less properties. */
- int dummy;
-};
-
-int rd_kafka_socket_cb_linux (int domain, int type, int protocol, void *opaque);
-int rd_kafka_socket_cb_generic (int domain, int type, int protocol,
- void *opaque);
-#ifndef _MSC_VER
-int rd_kafka_open_cb_linux (const char *pathname, int flags, mode_t mode,
- void *opaque);
-#endif
-int rd_kafka_open_cb_generic (const char *pathname, int flags, mode_t mode,
- void *opaque);
-
-
-
-struct rd_kafka_topic_conf_s {
- int required_acks;
- int32_t request_timeout_ms;
- int message_timeout_ms;
-
- int32_t (*partitioner) (const rd_kafka_topic_t *rkt,
- const void *keydata, size_t keylen,
- int32_t partition_cnt,
- void *rkt_opaque,
- void *msg_opaque);
-
- rd_kafka_compression_t compression_codec;
- int produce_offset_report;
-
- int consume_callback_max_msgs;
- int auto_commit;
- int auto_commit_interval_ms;
- int auto_offset_reset;
- char *offset_store_path;
- int offset_store_sync_interval_ms;
-
- rd_kafka_offset_method_t offset_store_method;
-
- /* Application provided opaque pointer (this is rkt_opaque) */
- void *opaque;
-};
-
-
-
-void rd_kafka_anyconf_destroy (int scope, void *conf);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_event.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_event.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_event.c
deleted file mode 100644
index 5fe783d..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_event.c
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2016 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rdkafka_int.h"
-#include "rdkafka_event.h"
-#include "rd.h"
-
-rd_kafka_event_type_t rd_kafka_event_type (const rd_kafka_event_t *rkev) {
- return rkev ? rkev->rko_evtype : RD_KAFKA_EVENT_NONE;
-}
-
-const char *rd_kafka_event_name (const rd_kafka_event_t *rkev) {
- switch (rkev ? rkev->rko_evtype : RD_KAFKA_EVENT_NONE)
- {
- case RD_KAFKA_EVENT_NONE:
- return "(NONE)";
- case RD_KAFKA_EVENT_DR:
- return "DeliveryReport";
- case RD_KAFKA_EVENT_FETCH:
- return "Fetch";
- case RD_KAFKA_EVENT_LOG:
- return "Log";
- case RD_KAFKA_EVENT_ERROR:
- return "Error";
- case RD_KAFKA_EVENT_REBALANCE:
- return "Rebalance";
- case RD_KAFKA_EVENT_OFFSET_COMMIT:
- return "OffsetCommit";
- case RD_KAFKA_EVENT_STATS:
- return "Stats";
- default:
- return "?unknown?";
- }
-}
-
-
-
-
-void rd_kafka_event_destroy (rd_kafka_event_t *rkev) {
- if (unlikely(!rkev))
- return;
- rd_kafka_op_destroy(rkev);
-}
-
-
-/**
- * @returns the next message from the event's message queue.
- * @remark messages will be freed automatically when event is destroyed,
- * application MUST NOT call rd_kafka_message_destroy()
- */
-const rd_kafka_message_t *
-rd_kafka_event_message_next (rd_kafka_event_t *rkev) {
- rd_kafka_op_t *rko = rkev;
- rd_kafka_msg_t *rkm;
- rd_kafka_msgq_t *rkmq, *rkmq2;
- rd_kafka_message_t *rkmessage;
-
- switch (rkev->rko_type)
- {
- case RD_KAFKA_OP_DR:
- rkmq = &rko->rko_u.dr.msgq;
- rkmq2 = &rko->rko_u.dr.msgq2;
- break;
-
- case RD_KAFKA_OP_FETCH:
- /* Just one message */
- if (rko->rko_u.fetch.evidx++ > 0)
- return NULL;
-
- rkmessage = rd_kafka_message_get(rko);
- if (unlikely(!rkmessage))
- return NULL;
-
- /* Store offset */
- rd_kafka_op_offset_store(NULL, rko, rkmessage);
-
- return rkmessage;
-
-
- default:
- return NULL;
- }
-
- if (unlikely(!(rkm = TAILQ_FIRST(&rkmq->rkmq_msgs))))
- return NULL;
-
- rd_kafka_msgq_deq(rkmq, rkm, 1);
-
- /* Put rkm on secondary message queue which will be purged later. */
- rd_kafka_msgq_enq(rkmq2, rkm);
-
- return rd_kafka_message_get_from_rkm(rko, rkm);
-}
-
-
-size_t rd_kafka_event_message_array (rd_kafka_event_t *rkev,
- const rd_kafka_message_t **rkmessages, size_t size) {
- size_t cnt = 0;
- const rd_kafka_message_t *rkmessage;
-
- while ((rkmessage = rd_kafka_event_message_next(rkev)))
- rkmessages[cnt++] = rkmessage;
-
- return cnt;
-}
-
-
-size_t rd_kafka_event_message_count (rd_kafka_event_t *rkev) {
- switch (rkev->rko_evtype)
- {
- case RD_KAFKA_EVENT_DR:
- return rd_atomic32_get(&rkev->rko_u.dr.msgq.rkmq_msg_cnt);
- case RD_KAFKA_EVENT_FETCH:
- return 1;
- default:
- return 0;
- }
-}
-
-
-rd_kafka_resp_err_t rd_kafka_event_error (rd_kafka_event_t *rkev) {
- return rkev->rko_err;
-}
-
-const char *rd_kafka_event_error_string (rd_kafka_event_t *rkev) {
- switch (rkev->rko_type)
- {
- case RD_KAFKA_OP_ERR:
- case RD_KAFKA_OP_CONSUMER_ERR:
- if (rkev->rko_u.err.errstr)
- return rkev->rko_u.err.errstr;
- /* FALLTHRU */
- default:
- return rd_kafka_err2str(rkev->rko_err);
- }
-}
-
-
-void *rd_kafka_event_opaque (rd_kafka_event_t *rkev) {
- switch (rkev->rko_type & ~RD_KAFKA_OP_FLAGMASK)
- {
- case RD_KAFKA_OP_OFFSET_COMMIT:
- return rkev->rko_u.offset_commit.opaque;
- default:
- return NULL;
- }
-}
-
-
-int rd_kafka_event_log (rd_kafka_event_t *rkev, const char **fac,
- const char **str, int *level) {
- if (unlikely(rkev->rko_evtype != RD_KAFKA_EVENT_LOG))
- return -1;
-
- if (likely(fac != NULL))
- *fac = rkev->rko_u.log.fac;
- if (likely(str != NULL))
- *str = rkev->rko_u.log.str;
- if (likely(level != NULL))
- *level = rkev->rko_u.log.level;
-
- return 0;
-}
-
-const char *rd_kafka_event_stats (rd_kafka_event_t *rkev) {
- return rkev->rko_u.stats.json;
-}
-
-rd_kafka_topic_partition_list_t *
-rd_kafka_event_topic_partition_list (rd_kafka_event_t *rkev) {
- switch (rkev->rko_evtype)
- {
- case RD_KAFKA_EVENT_REBALANCE:
- return rkev->rko_u.rebalance.partitions;
- case RD_KAFKA_EVENT_OFFSET_COMMIT:
- return rkev->rko_u.offset_commit.partitions;
- default:
- return NULL;
- }
-}
-
-
-rd_kafka_topic_partition_t *
-rd_kafka_event_topic_partition (rd_kafka_event_t *rkev) {
- rd_kafka_topic_partition_t *rktpar;
-
- if (unlikely(!rkev->rko_rktp))
- return NULL;
-
- rktpar = rd_kafka_topic_partition_new_from_rktp(
- rd_kafka_toppar_s2i(rkev->rko_rktp));
-
- switch (rkev->rko_type)
- {
- case RD_KAFKA_OP_ERR:
- case RD_KAFKA_OP_CONSUMER_ERR:
- rktpar->offset = rkev->rko_u.err.offset;
- break;
- default:
- break;
- }
-
- rktpar->err = rkev->rko_err;
-
- return rktpar;
-
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_event.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_event.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_event.h
deleted file mode 100644
index 0e8f8a1..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_event.h
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2016 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-
-/**
- * @brief Converts op type to event type.
- * @returns the event type, or 0 if the op cannot be mapped to an event.
- */
-static RD_UNUSED RD_INLINE
-rd_kafka_event_type_t rd_kafka_op2event (rd_kafka_op_type_t optype) {
- static const rd_kafka_event_type_t map[RD_KAFKA_OP__END] = {
- [RD_KAFKA_OP_DR] = RD_KAFKA_EVENT_DR,
- [RD_KAFKA_OP_FETCH] = RD_KAFKA_EVENT_FETCH,
- [RD_KAFKA_OP_ERR] = RD_KAFKA_EVENT_ERROR,
- [RD_KAFKA_OP_CONSUMER_ERR] = RD_KAFKA_EVENT_ERROR,
- [RD_KAFKA_OP_REBALANCE] = RD_KAFKA_EVENT_REBALANCE,
- [RD_KAFKA_OP_OFFSET_COMMIT] = RD_KAFKA_EVENT_OFFSET_COMMIT,
- [RD_KAFKA_OP_LOG] = RD_KAFKA_EVENT_LOG,
- [RD_KAFKA_OP_STATS] = RD_KAFKA_EVENT_STATS
- };
-
- return map[(int)optype & ~RD_KAFKA_OP_FLAGMASK];
-}
-
-
-/**
- * @brief Attempt to set up an event based on rko.
- * @returns 1 if op is event:able and set up, else 0.
- */
-static RD_UNUSED RD_INLINE
-int rd_kafka_event_setup (rd_kafka_t *rk, rd_kafka_op_t *rko) {
- rko->rko_evtype = rd_kafka_op2event(rko->rko_type);
- switch (rko->rko_evtype)
- {
- case RD_KAFKA_EVENT_NONE:
- return 0;
-
- case RD_KAFKA_EVENT_DR:
- rko->rko_rk = rk;
- rd_dassert(!rko->rko_u.dr.do_purge2);
- rd_kafka_msgq_init(&rko->rko_u.dr.msgq2);
- rko->rko_u.dr.do_purge2 = 1;
- return 1;
-
- case RD_KAFKA_EVENT_REBALANCE:
- case RD_KAFKA_EVENT_ERROR:
- case RD_KAFKA_EVENT_LOG:
- case RD_KAFKA_EVENT_OFFSET_COMMIT:
- case RD_KAFKA_EVENT_STATS:
- return 1;
-
- default:
- return 0;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_feature.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_feature.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_feature.c
deleted file mode 100644
index 8a9ab24..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_feature.c
+++ /dev/null
@@ -1,444 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2016, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-
-#include "rdkafka_int.h"
-#include "rdkafka_feature.h"
-
-#include <stdlib.h>
-
-static const char *rd_kafka_feature_names[] = {
- "MsgVer1",
- "ApiVersion",
- "BrokerBalancedConsumer",
- "ThrottleTime",
- "Sasl",
- "SaslHandshake",
- "BrokerGroupCoordinator",
- "LZ4",
- "OffsetTime",
- "MsgVer2",
- NULL
-};
-
-
-static const struct rd_kafka_feature_map {
- /* RD_KAFKA_FEATURE_... */
- int feature;
-
- /* Depends on the following ApiVersions overlapping with
- * what the broker supports: */
- struct rd_kafka_ApiVersion depends[RD_KAFKAP__NUM];
-
-} rd_kafka_feature_map[] = {
- /**
- * @brief List of features and the ApiVersions they depend on.
- *
- * The dependency list consists of the ApiKey followed by this
- * client's supported minimum and maximum API versions.
- * As long as this list and its versions overlaps with the
- * broker supported API versions the feature will be enabled.
- */
- {
-
- /* @brief >=0.10.0: Message.MagicByte version 1:
- * Relative offsets (KIP-31) and message timestamps (KIP-32). */
- .feature = RD_KAFKA_FEATURE_MSGVER1,
- .depends = {
- { RD_KAFKAP_Produce, 2, 2 },
- { RD_KAFKAP_Fetch, 2, 2 },
- { -1 },
- },
- },
- {
- /* @brief >=0.11.0: Message.MagicByte version 2 */
- .feature = RD_KAFKA_FEATURE_MSGVER2,
- .depends = {
- { RD_KAFKAP_Produce, 3, 3 },
- { RD_KAFKAP_Fetch, 4, 4 },
- { -1 },
- },
- },
- {
-
- /* @brief >=0.10.0: ApiVersionQuery support.
- * @remark This is a bit of chicken-and-egg problem but needs to be
- * set by feature_check() to avoid the feature being cleared
- * even when broker supports it. */
- .feature = RD_KAFKA_FEATURE_APIVERSION,
- .depends = {
- { RD_KAFKAP_ApiVersion, 0, 0 },
- { -1 },
- },
- },
- {
- /* @brief >=0.8.2.0: Broker-based Group coordinator */
- .feature = RD_KAFKA_FEATURE_BROKER_GROUP_COORD,
- .depends = {
- { RD_KAFKAP_GroupCoordinator, 0, 0 },
- { -1 },
- },
- },
- {
- /* @brief >=0.9.0: Broker-based balanced consumer groups. */
- .feature = RD_KAFKA_FEATURE_BROKER_BALANCED_CONSUMER,
- .depends = {
- { RD_KAFKAP_GroupCoordinator, 0, 0 },
- { RD_KAFKAP_OffsetCommit, 1, 2 },
- { RD_KAFKAP_OffsetFetch, 1, 1 },
- { RD_KAFKAP_JoinGroup, 0, 0 },
- { RD_KAFKAP_SyncGroup, 0, 0 },
- { RD_KAFKAP_Heartbeat, 0, 0 },
- { RD_KAFKAP_LeaveGroup, 0, 0 },
- { -1 },
- },
- },
- {
- /* @brief >=0.9.0: ThrottleTime */
- .feature = RD_KAFKA_FEATURE_THROTTLETIME,
- .depends = {
- { RD_KAFKAP_Produce, 1, 2 },
- { RD_KAFKAP_Fetch, 1, 2 },
- { -1 },
- },
-
- },
- {
- /* @brief >=0.9.0: SASL (GSSAPI) authentication.
- * Since SASL is not using the Kafka protocol
- * we must use something else to map us to the
- * proper broker version support:
- * JoinGroup was released along with SASL in 0.9.0. */
- .feature = RD_KAFKA_FEATURE_SASL_GSSAPI,
- .depends = {
- { RD_KAFKAP_JoinGroup, 0, 0 },
- { -1 },
- },
- },
- {
- /* @brief >=0.10.0: SASL mechanism handshake (KIP-43)
- * to automatically support other mechanisms
- * than GSSAPI, such as PLAIN. */
- .feature = RD_KAFKA_FEATURE_SASL_HANDSHAKE,
- .depends = {
- { RD_KAFKAP_SaslHandshake, 0, 0 },
- { -1 },
- },
- },
- {
- /* @brief >=0.8.2: LZ4 compression.
- * Since LZ4 initially did not rely on a specific API
- * type or version (it does in >=0.10.0)
- * we must use something else to map us to the
- * proper broker version support:
- * GrooupCoordinator was released in 0.8.2 */
- .feature = RD_KAFKA_FEATURE_LZ4,
- .depends = {
- { RD_KAFKAP_GroupCoordinator, 0, 0 },
- { -1 },
- },
- },
- {
- /* @brief >=0.10.1.0: Offset v1 (KIP-79)
- * Time-based offset requests */
- .feature = RD_KAFKA_FEATURE_OFFSET_TIME,
- .depends = {
- { RD_KAFKAP_Offset, 1, 1 },
- { -1 },
- }
- },
- { .feature = 0 }, /* sentinel */
-};
-
-
-
-/**
- * @brief In absence of KIP-35 support in earlier broker versions we provide hardcoded
- * lists that corresponds to older broker versions.
- */
-
-/* >= 0.10.0.0: dummy for all future versions that support ApiVersionRequest */
-static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_Queryable[] = {
- { RD_KAFKAP_ApiVersion, 0, 0 }
-};
-
-
-/* =~ 0.9.0 */
-static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_9_0[] = {
- { RD_KAFKAP_Produce, 0, 1 },
- { RD_KAFKAP_Fetch, 0, 1 },
- { RD_KAFKAP_Offset, 0, 0 },
- { RD_KAFKAP_Metadata, 0, 0 },
- { RD_KAFKAP_OffsetCommit, 0, 2 },
- { RD_KAFKAP_OffsetFetch, 0, 1 },
- { RD_KAFKAP_GroupCoordinator, 0, 0 },
- { RD_KAFKAP_JoinGroup, 0, 0 },
- { RD_KAFKAP_Heartbeat, 0, 0 },
- { RD_KAFKAP_LeaveGroup, 0, 0 },
- { RD_KAFKAP_SyncGroup, 0, 0 },
- { RD_KAFKAP_DescribeGroups, 0, 0 },
- { RD_KAFKAP_ListGroups, 0, 0 }
-};
-
-/* =~ 0.8.2 */
-static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_2[] = {
- { RD_KAFKAP_Produce, 0, 0 },
- { RD_KAFKAP_Fetch, 0, 0 },
- { RD_KAFKAP_Offset, 0, 0 },
- { RD_KAFKAP_Metadata, 0, 0 },
- { RD_KAFKAP_OffsetCommit, 0, 1 },
- { RD_KAFKAP_OffsetFetch, 0, 1 },
- { RD_KAFKAP_GroupCoordinator, 0, 0 }
-};
-
-/* =~ 0.8.1 */
-static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_1[] = {
- { RD_KAFKAP_Produce, 0, 0 },
- { RD_KAFKAP_Fetch, 0, 0 },
- { RD_KAFKAP_Offset, 0, 0 },
- { RD_KAFKAP_Metadata, 0, 0 },
- { RD_KAFKAP_OffsetCommit, 0, 1 },
- { RD_KAFKAP_OffsetFetch, 0, 0 }
-};
-
-/* =~ 0.8.0 */
-static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_0[] = {
- { RD_KAFKAP_Produce, 0, 0 },
- { RD_KAFKAP_Fetch, 0, 0 },
- { RD_KAFKAP_Offset, 0, 0 },
- { RD_KAFKAP_Metadata, 0, 0 }
-};
-
-
-/**
- * @brief Returns the ApiVersion list for legacy broker versions that do not
- * support the ApiVersionQuery request. E.g., brokers <0.10.0.
- *
- * @param broker_version Broker version to match (longest prefix matching).
- * @param use_default If no match is found return the default APIs (but return 0).
- *
- * @returns 1 if \p broker_version was recognized: \p *apisp will point to
- * the ApiVersion list and *api_cntp will be set to its element count.
- * 0 if \p broker_version was not recognized: \p *apisp remains unchanged.
- *
- */
-int rd_kafka_get_legacy_ApiVersions (const char *broker_version,
- struct rd_kafka_ApiVersion **apisp,
- size_t *api_cntp, const char *fallback) {
- static const struct {
- const char *pfx;
- struct rd_kafka_ApiVersion *apis;
- size_t api_cnt;
- } vermap[] = {
-#define _VERMAP(PFX,APIS) { PFX, APIS, RD_ARRAYSIZE(APIS) }
- _VERMAP("0.9.0", rd_kafka_ApiVersion_0_9_0),
- _VERMAP("0.8.2", rd_kafka_ApiVersion_0_8_2),
- _VERMAP("0.8.1", rd_kafka_ApiVersion_0_8_1),
- _VERMAP("0.8.0", rd_kafka_ApiVersion_0_8_0),
- { "0.7.", NULL }, /* Unsupported */
- { "0.6.", NULL }, /* Unsupported */
- _VERMAP("", rd_kafka_ApiVersion_Queryable),
- { NULL }
- };
- int i;
- int fallback_i = -1;
- int ret = 0;
-
- *apisp = NULL;
- *api_cntp = 0;
-
- for (i = 0 ; vermap[i].pfx ; i++) {
- if (!strncmp(vermap[i].pfx, broker_version, strlen(vermap[i].pfx))) {
- if (!vermap[i].apis)
- return 0;
- *apisp = vermap[i].apis;
- *api_cntp = vermap[i].api_cnt;
- ret = 1;
- break;
- } else if (fallback && !strcmp(vermap[i].pfx, fallback))
- fallback_i = i;
- }
-
- if (!*apisp && fallback) {
- rd_kafka_assert(NULL, fallback_i != -1);
- *apisp = vermap[fallback_i].apis;
- *api_cntp = vermap[fallback_i].api_cnt;
- }
-
- return ret;
-}
-
-
-/**
- * @returns 1 if the provided broker version (probably)
- * supports api.version.request.
- */
-int rd_kafka_ApiVersion_is_queryable (const char *broker_version) {
- struct rd_kafka_ApiVersion *apis;
- size_t api_cnt;
-
-
- if (!rd_kafka_get_legacy_ApiVersions(broker_version,
- &apis, &api_cnt, 0))
- return 0;
-
- return apis == rd_kafka_ApiVersion_Queryable;
-}
-
-
-
-
-
-/**
- * @brief Check if match's versions overlaps with \p apis.
- *
- * @returns 1 if true, else 0.
- * @remark \p apis must be sorted using rd_kafka_ApiVersion_key_cmp()
- */
-static RD_INLINE int
-rd_kafka_ApiVersion_check (const struct rd_kafka_ApiVersion *apis, size_t api_cnt,
- const struct rd_kafka_ApiVersion *match) {
- const struct rd_kafka_ApiVersion *api;
-
- api = bsearch(match, apis, api_cnt, sizeof(*apis),
- rd_kafka_ApiVersion_key_cmp);
- if (unlikely(!api))
- return 0;
-
- return match->MinVer <= api->MaxVer && api->MinVer <= match->MaxVer;
-}
-
-
-/**
- * @brief Compare broker's supported API versions to our feature request map
- * and enable/disable features accordingly.
- *
- * @param broker_apis Broker's supported APIs. If NULL the
- * \p broker.version.fallback configuration property will specify a
- * default legacy version to use.
- * @param broker_api_cnt Number of elements in \p broker_apis
- *
- * @returns the supported features (bitmask) to enable.
- */
-int rd_kafka_features_check (rd_kafka_broker_t *rkb,
- struct rd_kafka_ApiVersion *broker_apis,
- size_t broker_api_cnt) {
- int features = 0;
- int i;
-
- /* Scan through features. */
- for (i = 0 ; rd_kafka_feature_map[i].feature != 0 ; i++) {
- const struct rd_kafka_ApiVersion *match;
- int fails = 0;
-
- /* For each feature check that all its API dependencies
- * can be fullfilled. */
-
- for (match = &rd_kafka_feature_map[i].depends[0] ;
- match->ApiKey != -1 ; match++) {
- int r;
-
- r = rd_kafka_ApiVersion_check(broker_apis, broker_api_cnt,
- match);
-
- rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
- " Feature %s: %s (%hd..%hd) "
- "%ssupported by broker",
- rd_kafka_features2str(rd_kafka_feature_map[i].
- feature),
- rd_kafka_ApiKey2str(match->ApiKey),
- match->MinVer, match->MaxVer,
- r ? "" : "NOT ");
-
- fails += !r;
- }
-
- rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
- "%s feature %s",
- fails ? "Disabling" : "Enabling",
- rd_kafka_features2str(rd_kafka_feature_map[i].feature));
-
-
- if (!fails)
- features |= rd_kafka_feature_map[i].feature;
- }
-
- return features;
-}
-
-
-
-/**
- * @brief Make an allocated and sorted copy of \p src.
- */
-void
-rd_kafka_ApiVersions_copy (const struct rd_kafka_ApiVersion *src,
- size_t src_cnt,
- struct rd_kafka_ApiVersion **dstp,
- size_t *dst_cntp) {
- *dstp = rd_memdup(src, sizeof(*src) * src_cnt);
- *dst_cntp = src_cnt;
- qsort(*dstp, *dst_cntp, sizeof(**dstp), rd_kafka_ApiVersion_key_cmp);
-}
-
-
-
-
-
-
-/**
- * @returns a human-readable feature flag string.
- */
-const char *rd_kafka_features2str (int features) {
- static RD_TLS char ret[4][128];
- size_t of = 0;
- static RD_TLS int reti = 0;
- int i;
-
- reti = (reti + 1) % 4;
-
- *ret[reti] = '\0';
- for (i = 0 ; rd_kafka_feature_names[i] ; i++) {
- int r;
- if (!(features & (1 << i)))
- continue;
-
- r = rd_snprintf(ret[reti]+of, sizeof(ret[reti])-of, "%s%s",
- of == 0 ? "" : ",",
- rd_kafka_feature_names[i]);
- if ((size_t)r > sizeof(ret[reti])-of) {
- /* Out of space */
- memcpy(&ret[reti][sizeof(ret[reti])-3], "..", 3);
- break;
- }
-
- of += r;
- }
-
- return ret[reti];
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_feature.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_feature.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_feature.h
deleted file mode 100644
index 1e40664..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_feature.h
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2016, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-#pragma once
-
-
-/**
- * @brief Kafka protocol features
- */
-
-/* Message version 1 (MagicByte=1):
- * + relative offsets (KIP-31)
- * + timestamps (KIP-32) */
-#define RD_KAFKA_FEATURE_MSGVER1 0x1
-
-/* ApiVersionQuery support (KIP-35) */
-#define RD_KAFKA_FEATURE_APIVERSION 0x2
-
- /* >= 0.9: Broker-based Balanced Consumer */
-#define RD_KAFKA_FEATURE_BROKER_BALANCED_CONSUMER 0x4
-
-/* >= 0.9: Produce/Fetch ThrottleTime reporting */
-#define RD_KAFKA_FEATURE_THROTTLETIME 0x8
-
-/* >= 0.9: SASL GSSAPI support */
-#define RD_KAFKA_FEATURE_SASL_GSSAPI 0x10
-
-/* >= 0.10: SaslMechanismRequest (KIP-43) */
-#define RD_KAFKA_FEATURE_SASL_HANDSHAKE 0x20
-
-/* >= 0.8.2.0: Broker-based Group coordinator */
-#define RD_KAFKA_FEATURE_BROKER_GROUP_COORD 0x40
-
-/* >= 0.8.2.0: LZ4 compression (with bad and proper HC checksums) */
-#define RD_KAFKA_FEATURE_LZ4 0x80
-
-/* >= 0.10.1.0: Time-based Offset fetch (KIP-79) */
-#define RD_KAFKA_FEATURE_OFFSET_TIME 0x100
-
-/* >= 0.11.0.0: Message version 2 (MagicByte=2):
- * + EOS message format KIP-98 */
-#define RD_KAFKA_FEATURE_MSGVER2 0x200
-
-
-int rd_kafka_get_legacy_ApiVersions (const char *broker_version,
- struct rd_kafka_ApiVersion **apisp,
- size_t *api_cntp, const char *fallback);
-int rd_kafka_ApiVersion_is_queryable (const char *broker_version);
-void rd_kafka_ApiVersions_copy (const struct rd_kafka_ApiVersion *src, size_t src_cnt,
- struct rd_kafka_ApiVersion **dstp, size_t *dst_cntp);
-int rd_kafka_features_check (rd_kafka_broker_t *rkb,
- struct rd_kafka_ApiVersion *broker_apis,
- size_t broker_api_cnt);
-
-const char *rd_kafka_features2str (int features);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_int.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_int.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_int.h
deleted file mode 100644
index 792c1a5..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_int.h
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2013, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-
-#ifndef _MSC_VER
-#define _GNU_SOURCE /* for strndup() */
-#include <syslog.h>
-#else
-typedef int mode_t;
-#endif
-#include <fcntl.h>
-
-
-#include "rdsysqueue.h"
-
-#include "rdkafka.h"
-#include "rd.h"
-#include "rdlog.h"
-#include "rdtime.h"
-#include "rdaddr.h"
-#include "rdinterval.h"
-#include "rdavg.h"
-#include "rdlist.h"
-
-#if WITH_SSL
-#include <openssl/ssl.h>
-#endif
-
-
-
-
-typedef struct rd_kafka_itopic_s rd_kafka_itopic_t;
-typedef struct rd_ikafka_s rd_ikafka_t;
-
-
-#define rd_kafka_assert(rk, cond) do { \
- if (unlikely(!(cond))) \
- rd_kafka_crash(__FILE__,__LINE__, __FUNCTION__, \
- (rk), "assert: " # cond); \
- } while (0)
-
-
-void
-RD_NORETURN
-rd_kafka_crash (const char *file, int line, const char *function,
- rd_kafka_t *rk, const char *reason);
-
-
-/* Forward declarations */
-struct rd_kafka_s;
-struct rd_kafka_itopic_s;
-struct rd_kafka_msg_s;
-struct rd_kafka_broker_s;
-
-typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_toppar_s) shptr_rd_kafka_toppar_t;
-typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_itopic_s) shptr_rd_kafka_itopic_t;
-
-
-
-#include "rdkafka_op.h"
-#include "rdkafka_queue.h"
-#include "rdkafka_msg.h"
-#include "rdkafka_proto.h"
-#include "rdkafka_buf.h"
-#include "rdkafka_pattern.h"
-#include "rdkafka_conf.h"
-#include "rdkafka_transport.h"
-#include "rdkafka_timer.h"
-#include "rdkafka_assignor.h"
-#include "rdkafka_metadata.h"
-
-
-/**
- * Protocol level sanity
- */
-#define RD_KAFKAP_BROKERS_MAX 1000
-#define RD_KAFKAP_TOPICS_MAX 1000000
-#define RD_KAFKAP_PARTITIONS_MAX 10000
-
-
-#define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0)
-
-
-
-
-
-
-
-/**
- * Kafka handle, internal representation of the application's rd_kafka_t.
- */
-
-typedef RD_SHARED_PTR_TYPE(shptr_rd_ikafka_s, rd_ikafka_t) shptr_rd_ikafka_t;
-
-struct rd_kafka_s {
- rd_kafka_q_t *rk_rep; /* kafka -> application reply queue */
- rd_kafka_q_t *rk_ops; /* any -> rdkafka main thread ops */
-
- TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers;
- rd_list_t rk_broker_by_id; /* Fast id lookups. */
- rd_atomic32_t rk_broker_cnt;
- rd_atomic32_t rk_broker_down_cnt;
- mtx_t rk_internal_rkb_lock;
- rd_kafka_broker_t *rk_internal_rkb;
-
- /* Broadcasting of broker state changes to wake up
- * functions waiting for a state change. */
- cnd_t rk_broker_state_change_cnd;
- mtx_t rk_broker_state_change_lock;
- int rk_broker_state_change_version;
-
-
- TAILQ_HEAD(, rd_kafka_itopic_s) rk_topics;
- int rk_topic_cnt;
-
- struct rd_kafka_cgrp_s *rk_cgrp;
-
- rd_kafka_conf_t rk_conf;
- rd_kafka_q_t *rk_logq; /* Log queue if `log.queue` set */
- char rk_name[128];
- rd_kafkap_str_t *rk_client_id;
- rd_kafkap_str_t *rk_group_id; /* Consumer group id */
-
- int rk_flags;
- rd_atomic32_t rk_terminate;
- rwlock_t rk_lock;
- rd_kafka_type_t rk_type;
- struct timeval rk_tv_state_change;
-
- rd_atomic32_t rk_last_throttle; /* Last throttle_time_ms value
- * from broker. */
-
- /* Locks: rd_kafka_*lock() */
- rd_ts_t rk_ts_metadata; /* Timestamp of most recent
- * metadata. */
-
- struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */
- rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */
- struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */
-
- char *rk_clusterid; /* ClusterId from metadata */
-
- /* Simple consumer count:
- * >0: Running in legacy / Simple Consumer mode,
- * 0: No consumers running
- * <0: Running in High level consumer mode */
- rd_atomic32_t rk_simple_cnt;
-
- /**
- * Exactly Once Semantics
- */
- struct {
- rd_kafkap_str_t *TransactionalId;
- int64_t PID;
- int16_t ProducerEpoch;
- } rk_eos;
-
- const rd_kafkap_bytes_t *rk_null_bytes;
-
- struct {
- mtx_t lock; /* Protects acces to this struct */
- cnd_t cnd; /* For waking up blocking injectors */
- unsigned int cnt; /* Current message count */
- size_t size; /* Current message size sum */
- unsigned int max_cnt; /* Max limit */
- size_t max_size; /* Max limit */
- } rk_curr_msgs;
-
- rd_kafka_timers_t rk_timers;
- thrd_t rk_thread;
-
- int rk_initialized;
-};
-
-#define rd_kafka_wrlock(rk) rwlock_wrlock(&(rk)->rk_lock)
-#define rd_kafka_rdlock(rk) rwlock_rdlock(&(rk)->rk_lock)
-#define rd_kafka_rdunlock(rk) rwlock_rdunlock(&(rk)->rk_lock)
-#define rd_kafka_wrunlock(rk) rwlock_wrunlock(&(rk)->rk_lock)
-
-/**
- * @brief Add \p cnt messages and of total size \p size bytes to the
- * internal bookkeeping of current message counts.
- * If the total message count or size after add would exceed the
- * configured limits \c queue.buffering.max.messages and
- * \c queue.buffering.max.kbytes then depending on the value of
- * \p block the function either blocks until enough space is available
- * if \p block is 1, else immediately returns
- * RD_KAFKA_RESP_ERR__QUEUE_FULL.
- */
-static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
-rd_kafka_curr_msgs_add (rd_kafka_t *rk, unsigned int cnt, size_t size,
- int block) {
-
- if (rk->rk_type != RD_KAFKA_PRODUCER)
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-
- mtx_lock(&rk->rk_curr_msgs.lock);
- while (unlikely(rk->rk_curr_msgs.cnt + cnt >
- rk->rk_curr_msgs.max_cnt ||
- (unsigned long long)(rk->rk_curr_msgs.size + size) >
- (unsigned long long)rk->rk_curr_msgs.max_size)) {
- if (!block) {
- mtx_unlock(&rk->rk_curr_msgs.lock);
- return RD_KAFKA_RESP_ERR__QUEUE_FULL;
- }
-
- cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock);
- }
-
- rk->rk_curr_msgs.cnt += cnt;
- rk->rk_curr_msgs.size += size;
- mtx_unlock(&rk->rk_curr_msgs.lock);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * @brief Subtract \p cnt messages of total size \p size from the
- * current bookkeeping and broadcast a wakeup on the condvar
- * for any waiting & blocking threads.
- */
-static RD_INLINE RD_UNUSED void
-rd_kafka_curr_msgs_sub (rd_kafka_t *rk, unsigned int cnt, size_t size) {
- int broadcast = 0;
-
- if (rk->rk_type != RD_KAFKA_PRODUCER)
- return;
-
- mtx_lock(&rk->rk_curr_msgs.lock);
- rd_kafka_assert(NULL,
- rk->rk_curr_msgs.cnt >= cnt &&
- rk->rk_curr_msgs.size >= size);
-
- /* If the subtraction would pass one of the thresholds
- * broadcast a wake-up to any waiting listeners. */
- if ((rk->rk_curr_msgs.cnt >= rk->rk_curr_msgs.max_cnt &&
- rk->rk_curr_msgs.cnt - cnt < rk->rk_curr_msgs.max_cnt) ||
- (rk->rk_curr_msgs.size >= rk->rk_curr_msgs.max_size &&
- rk->rk_curr_msgs.size - size < rk->rk_curr_msgs.max_size))
- broadcast = 1;
-
- rk->rk_curr_msgs.cnt -= cnt;
- rk->rk_curr_msgs.size -= size;
-
- if (unlikely(broadcast))
- cnd_broadcast(&rk->rk_curr_msgs.cnd);
-
- mtx_unlock(&rk->rk_curr_msgs.lock);
-}
-
-static RD_INLINE RD_UNUSED void
-rd_kafka_curr_msgs_get (rd_kafka_t *rk, unsigned int *cntp, size_t *sizep) {
- if (rk->rk_type != RD_KAFKA_PRODUCER) {
- *cntp = 0;
- *sizep = 0;
- return;
- }
-
- mtx_lock(&rk->rk_curr_msgs.lock);
- *cntp = rk->rk_curr_msgs.cnt;
- *sizep = rk->rk_curr_msgs.size;
- mtx_unlock(&rk->rk_curr_msgs.lock);
-}
-
-static RD_INLINE RD_UNUSED int
-rd_kafka_curr_msgs_cnt (rd_kafka_t *rk) {
- int cnt;
- if (rk->rk_type != RD_KAFKA_PRODUCER)
- return 0;
-
- mtx_lock(&rk->rk_curr_msgs.lock);
- cnt = rk->rk_curr_msgs.cnt;
- mtx_unlock(&rk->rk_curr_msgs.lock);
-
- return cnt;
-}
-
-
-void rd_kafka_destroy_final (rd_kafka_t *rk);
-
-
-/**
- * Returns true if 'rk' handle is terminating.
- */
-#define rd_kafka_terminating(rk) (rd_atomic32_get(&(rk)->rk_terminate))
-
-#define rd_kafka_is_simple_consumer(rk) \
- (rd_atomic32_get(&(rk)->rk_simple_cnt) > 0)
-int rd_kafka_simple_consumer_add (rd_kafka_t *rk);
-
-
-#include "rdkafka_topic.h"
-#include "rdkafka_partition.h"
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-/**
- * Debug contexts
- */
-#define RD_KAFKA_DBG_GENERIC 0x1
-#define RD_KAFKA_DBG_BROKER 0x2
-#define RD_KAFKA_DBG_TOPIC 0x4
-#define RD_KAFKA_DBG_METADATA 0x8
-#define RD_KAFKA_DBG_FEATURE 0x10
-#define RD_KAFKA_DBG_QUEUE 0x20
-#define RD_KAFKA_DBG_MSG 0x40
-#define RD_KAFKA_DBG_PROTOCOL 0x80
-#define RD_KAFKA_DBG_CGRP 0x100
-#define RD_KAFKA_DBG_SECURITY 0x200
-#define RD_KAFKA_DBG_FETCH 0x400
-#define RD_KAFKA_DBG_INTERCEPTOR 0x800
-#define RD_KAFKA_DBG_PLUGIN 0x1000
-#define RD_KAFKA_DBG_ALL 0xffff
-
-
-void rd_kafka_log0(const rd_kafka_conf_t *conf,
- const rd_kafka_t *rk, const char *extra, int level,
- const char *fac, const char *fmt, ...) RD_FORMAT(printf,
- 6, 7);
-
-#define rd_kafka_log(rk,level,fac,...) \
- rd_kafka_log0(&rk->rk_conf, rk, NULL, level, fac, __VA_ARGS__)
-#define rd_kafka_dbg(rk,ctx,fac,...) do { \
- if (unlikely((rk)->rk_conf.debug & (RD_KAFKA_DBG_ ## ctx))) \
- rd_kafka_log0(&rk->rk_conf,rk,NULL, \
- LOG_DEBUG,fac,__VA_ARGS__); \
- } while (0)
-
-/* dbg() not requiring an rk, just the conf object, for early logging */
-#define rd_kafka_dbg0(conf,ctx,fac,...) do { \
- if (unlikely((conf)->debug & (RD_KAFKA_DBG_ ## ctx))) \
- rd_kafka_log0(conf,NULL,NULL, \
- LOG_DEBUG,fac,__VA_ARGS__); \
- } while (0)
-
-/* NOTE: The local copy of _logname is needed due rkb_logname_lock lock-ordering
- * when logging another broker's name in the message. */
-#define rd_rkb_log(rkb,level,fac,...) do { \
- char _logname[RD_KAFKA_NODENAME_SIZE]; \
- mtx_lock(&(rkb)->rkb_logname_lock); \
- strncpy(_logname, rkb->rkb_logname, sizeof(_logname)-1); \
- _logname[RD_KAFKA_NODENAME_SIZE-1] = '\0'; \
- mtx_unlock(&(rkb)->rkb_logname_lock); \
- rd_kafka_log0(&(rkb)->rkb_rk->rk_conf, \
- (rkb)->rkb_rk, _logname, \
- level, fac, __VA_ARGS__); \
- } while (0)
-
-#define rd_rkb_dbg(rkb,ctx,fac,...) do { \
- if (unlikely((rkb)->rkb_rk->rk_conf.debug & \
- (RD_KAFKA_DBG_ ## ctx))) { \
- rd_rkb_log(rkb, LOG_DEBUG, fac, __VA_ARGS__); \
- } \
- } while (0)
-
-
-
-extern rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
-
-static RD_UNUSED RD_INLINE
-rd_kafka_resp_err_t rd_kafka_set_last_error (rd_kafka_resp_err_t err,
- int errnox) {
- if (errnox) {
-#ifdef _MSC_VER
- /* This is the correct way to set errno on Windows,
- * but it is still pointless due to different errnos in
- * in different runtimes:
- * https://social.msdn.microsoft.com/Forums/vstudio/en-US/b4500c0d-1b69-40c7-9ef5-08da1025b5bf/setting-errno-from-within-a-dll?forum=vclanguage/
- * errno is thus highly deprecated, and buggy, on Windows
- * when using librdkafka as a dynamically loaded DLL. */
- _set_errno(errnox);
-#else
- errno = errnox;
-#endif
- }
- rd_kafka_last_error_code = err;
- return err;
-}
-
-
-extern rd_atomic32_t rd_kafka_thread_cnt_curr;
-
-extern char RD_TLS rd_kafka_thread_name[64];
-
-
-
-
-
-int rd_kafka_path_is_dir (const char *path);
-
-rd_kafka_op_res_t
-rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
- rd_kafka_q_cb_type_t cb_type, void *opaque);
-
-rd_kafka_resp_err_t rd_kafka_subscribe_rkt (rd_kafka_itopic_t *rkt);
-
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_interceptor.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_interceptor.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_interceptor.c
deleted file mode 100644
index 91f7761..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_interceptor.c
+++ /dev/null
@@ -1,624 +0,0 @@
-/*
- * librdkafka - The Apache Kafka C/C++ library
- *
- * Copyright (c) 2017 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rdkafka_int.h"
-#include "rdkafka_interceptor.h"
-#include "rdstring.h"
-
-/**
- * @brief Interceptor methodtion/method reference
- */
-typedef struct rd_kafka_interceptor_method_s {
- union {
- rd_kafka_interceptor_f_on_conf_set_t *on_conf_set;
- rd_kafka_interceptor_f_on_conf_dup_t *on_conf_dup;
- rd_kafka_interceptor_f_on_conf_destroy_t *on_conf_destroy;
- rd_kafka_interceptor_f_on_new_t *on_new;
- rd_kafka_interceptor_f_on_destroy_t *on_destroy;
- rd_kafka_interceptor_f_on_send_t *on_send;
- rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement;
- rd_kafka_interceptor_f_on_consume_t *on_consume;
- rd_kafka_interceptor_f_on_commit_t *on_commit;
- void *generic; /* For easy assignment */
-
- } u;
- char *ic_name;
- void *ic_opaque;
-} rd_kafka_interceptor_method_t;
-
-/**
- * @brief Destroy interceptor methodtion reference
- */
-static void
-rd_kafka_interceptor_method_destroy (void *ptr) {
- rd_kafka_interceptor_method_t *method = ptr;
- rd_free(method->ic_name);
- rd_free(method);
-}
-
-
-
-
-
-/**
- * @brief Handle an interceptor on_... methodtion call failures.
- */
-static RD_INLINE void
-rd_kafka_interceptor_failed (rd_kafka_t *rk,
- const rd_kafka_interceptor_method_t *method,
- const char *method_name, rd_kafka_resp_err_t err,
- const rd_kafka_message_t *rkmessage,
- const char *errstr) {
-
- /* FIXME: Suppress log messages, eventually */
- if (rkmessage)
- rd_kafka_log(rk, LOG_WARNING, "ICFAIL",
- "Interceptor %s failed %s for "
- "message on %s [%"PRId32"] @ %"PRId64
- ": %s%s%s",
- method->ic_name, method_name,
- rd_kafka_topic_a2i(rkmessage->rkt)->rkt_topic->str,
- rkmessage->partition,
- rkmessage->offset,
- rd_kafka_err2str(err),
- errstr ? ": " : "",
- errstr ? errstr : "");
- else
- rd_kafka_log(rk, LOG_WARNING, "ICFAIL",
- "Interceptor %s failed %s: %s%s%s",
- method->ic_name, method_name,
- rd_kafka_err2str(err),
- errstr ? ": " : "",
- errstr ? errstr : "");
-
-}
-
-
-
-/**
- * @brief Create interceptor method reference.
- * Duplicates are rejected
- */
-static rd_kafka_interceptor_method_t *
-rd_kafka_interceptor_method_new (const char *ic_name,
- void *func, void *ic_opaque) {
- rd_kafka_interceptor_method_t *method;
-
- method = rd_calloc(1, sizeof(*method));
- method->ic_name = rd_strdup(ic_name);
- method->ic_opaque = ic_opaque;
- method->u.generic = func;
-
- return method;
-}
-
-
-/**
- * @brief Method comparator to be used for finding, not sorting.
- */
-static int rd_kafka_interceptor_method_cmp (const void *_a, const void *_b) {
- const rd_kafka_interceptor_method_t *a = _a, *b = _b;
-
- if (a->u.generic != b->u.generic)
- return -1;
-
- return strcmp(a->ic_name, b->ic_name);
-}
-
-/**
- * @brief Add interceptor method reference
- */
-static rd_kafka_resp_err_t
-rd_kafka_interceptor_method_add (rd_list_t *list, const char *ic_name,
- void *func, void *ic_opaque) {
- rd_kafka_interceptor_method_t *method;
- const rd_kafka_interceptor_method_t skel = {
- .ic_name = (char *)ic_name,
- .u = { .generic = func }
- };
-
- /* Reject same method from same interceptor.
- * This is needed to avoid duplicate interceptors when configuration
- * objects are duplicated.
- * An exception is made for lists with _F_UNIQUE, which is currently
- * only on_conf_destroy() to allow interceptor cleanup. */
- if ((list->rl_flags & RD_LIST_F_UNIQUE) &&
- rd_list_find(list, &skel, rd_kafka_interceptor_method_cmp))
- return RD_KAFKA_RESP_ERR__CONFLICT;
-
- method = rd_kafka_interceptor_method_new(ic_name, func, ic_opaque);
- rd_list_add(list, method);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-/**
- * @brief Destroy all interceptors
- * @locality application thread calling rd_kafka_conf_destroy() or
- * rd_kafka_destroy()
- */
-void rd_kafka_interceptors_destroy (rd_kafka_conf_t *conf) {
- rd_list_destroy(&conf->interceptors.on_conf_set);
- rd_list_destroy(&conf->interceptors.on_conf_dup);
- rd_list_destroy(&conf->interceptors.on_conf_destroy);
- rd_list_destroy(&conf->interceptors.on_new);
- rd_list_destroy(&conf->interceptors.on_destroy);
- rd_list_destroy(&conf->interceptors.on_send);
- rd_list_destroy(&conf->interceptors.on_acknowledgement);
- rd_list_destroy(&conf->interceptors.on_consume);
- rd_list_destroy(&conf->interceptors.on_commit);
-
- /* Interceptor config */
- rd_list_destroy(&conf->interceptors.config);
-}
-
-
-/**
- * @brief Initialize interceptor sub-system for config object.
- * @locality application thread
- */
-static void
-rd_kafka_interceptors_init (rd_kafka_conf_t *conf) {
- rd_list_init(&conf->interceptors.on_conf_set, 0,
- rd_kafka_interceptor_method_destroy)
- ->rl_flags |= RD_LIST_F_UNIQUE;
- rd_list_init(&conf->interceptors.on_conf_dup, 0,
- rd_kafka_interceptor_method_destroy)
- ->rl_flags |= RD_LIST_F_UNIQUE;
- /* conf_destroy() allows duplicates entries. */
- rd_list_init(&conf->interceptors.on_conf_destroy, 0,
- rd_kafka_interceptor_method_destroy);
- rd_list_init(&conf->interceptors.on_new, 0,
- rd_kafka_interceptor_method_destroy)
- ->rl_flags |= RD_LIST_F_UNIQUE;
- rd_list_init(&conf->interceptors.on_destroy, 0,
- rd_kafka_interceptor_method_destroy)
- ->rl_flags |= RD_LIST_F_UNIQUE;
- rd_list_init(&conf->interceptors.on_send, 0,
- rd_kafka_interceptor_method_destroy)
- ->rl_flags |= RD_LIST_F_UNIQUE;
- rd_list_init(&conf->interceptors.on_acknowledgement, 0,
- rd_kafka_interceptor_method_destroy)
- ->rl_flags |= RD_LIST_F_UNIQUE;
- rd_list_init(&conf->interceptors.on_consume, 0,
- rd_kafka_interceptor_method_destroy)
- ->rl_flags |= RD_LIST_F_UNIQUE;
- rd_list_init(&conf->interceptors.on_commit, 0,
- rd_kafka_interceptor_method_destroy)
- ->rl_flags |= RD_LIST_F_UNIQUE;
-
- /* Interceptor config */
- rd_list_init(&conf->interceptors.config, 0,
- (void (*)(void *))rd_strtup_destroy);
-}
-
-
-
-
-/**
- * @name Configuration backend
- */
-
-
-/**
- * @brief Constructor called when configuration object is created.
- */
-void rd_kafka_conf_interceptor_ctor (int scope, void *pconf) {
- rd_kafka_conf_t *conf = pconf;
- assert(scope == _RK_GLOBAL);
- rd_kafka_interceptors_init(conf);
-}
-
-/**
- * @brief Destructor called when configuration object is destroyed.
- */
-void rd_kafka_conf_interceptor_dtor (int scope, void *pconf) {
- rd_kafka_conf_t *conf = pconf;
- assert(scope == _RK_GLOBAL);
- rd_kafka_interceptors_destroy(conf);
-}
-
-/**
- * @brief Copy-constructor called when configuration object \p psrcp is
- * duplicated to \p dstp.
- * @remark Interceptors are NOT copied, but interceptor config is.
- *
- */
-void rd_kafka_conf_interceptor_copy (int scope, void *pdst, const void *psrc,
- void *dstptr, const void *srcptr,
- size_t filter_cnt, const char **filter) {
- rd_kafka_conf_t *dconf = pdst;
- const rd_kafka_conf_t *sconf = psrc;
- int i;
- const rd_strtup_t *confval;
-
- assert(scope == _RK_GLOBAL);
-
- /* Apply interceptor configuration values.
- * on_conf_dup() has already been called for dconf so
- * on_conf_set() interceptors are already in place and we can
- * apply the configuration through the standard conf_set() API. */
- RD_LIST_FOREACH(confval, &sconf->interceptors.config, i) {
- size_t fi;
- size_t nlen = strlen(confval->name);
-
- /* Apply filter */
- for (fi = 0 ; fi < filter_cnt ; fi++) {
- size_t flen = strlen(filter[fi]);
- if (nlen >= flen && !strncmp(filter[fi], confval->name,
- flen))
- break;
- }
-
- if (fi < filter_cnt)
- continue; /* Filter matched: ignore property. */
-
- /* Ignore errors for now */
- rd_kafka_conf_set(dconf, confval->name, confval->value,
- NULL, 0);
- }
-}
-
-
-
-
-/**
- * @brief Call interceptor on_conf_set methods.
- * @locality application thread calling rd_kafka_conf_set() and
- * rd_kafka_conf_dup()
- */
-rd_kafka_conf_res_t
-rd_kafka_interceptors_on_conf_set (rd_kafka_conf_t *conf,
- const char *name, const char *val,
- char *errstr, size_t errstr_size) {
- rd_kafka_interceptor_method_t *method;
- int i;
-
- RD_LIST_FOREACH(method, &conf->interceptors.on_conf_set, i) {
- rd_kafka_conf_res_t res;
-
- res = method->u.on_conf_set(conf,
- name, val, errstr, errstr_size,
- method->ic_opaque);
- if (res == RD_KAFKA_CONF_UNKNOWN)
- continue;
-
- /* Add successfully handled properties to list of
- * interceptor config properties so conf_t objects
- * can be copied. */
- if (res == RD_KAFKA_CONF_OK)
- rd_list_add(&conf->interceptors.config,
- rd_strtup_new(name, val));
- return res;
- }
-
- return RD_KAFKA_CONF_UNKNOWN;
-}
-
-/**
- * @brief Call interceptor on_conf_dup methods.
- * @locality application thread calling rd_kafka_conf_dup()
- */
-void
-rd_kafka_interceptors_on_conf_dup (rd_kafka_conf_t *new_conf,
- const rd_kafka_conf_t *old_conf,
- size_t filter_cnt, const char **filter) {
- rd_kafka_interceptor_method_t *method;
- int i;
-
- RD_LIST_FOREACH(method, &old_conf->interceptors.on_conf_dup, i) {
- /* FIXME: Ignore error for now */
- method->u.on_conf_dup(new_conf, old_conf,
- filter_cnt, filter, method->ic_opaque);
- }
-}
-
-
-/**
- * @brief Call interceptor on_conf_destroy methods.
- * @locality application thread calling rd_kafka_conf_destroy(), rd_kafka_new(),
- * rd_kafka_destroy()
- */
-void
-rd_kafka_interceptors_on_conf_destroy (rd_kafka_conf_t *conf) {
- rd_kafka_interceptor_method_t *method;
- int i;
-
- RD_LIST_FOREACH(method, &conf->interceptors.on_conf_destroy, i) {
- /* FIXME: Ignore error for now */
- method->u.on_conf_destroy(method->ic_opaque);
- }
-}
-
-
-/**
- * @brief Call interceptor on_new methods.
- * @locality application thread calling rd_kafka_new()
- */
-void
-rd_kafka_interceptors_on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf) {
- rd_kafka_interceptor_method_t *method;
- int i;
- char errstr[512];
-
- RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_new, i) {
- rd_kafka_resp_err_t err;
-
- err = method->u.on_new(rk, conf, method->ic_opaque,
- errstr, sizeof(errstr));
- if (unlikely(err))
- rd_kafka_interceptor_failed(rk, method, "on_new", err,
- NULL, errstr);
- }
-}
-
-
-
-/**
- * @brief Call interceptor on_destroy methods.
- * @locality application thread calling rd_kafka_new() or rd_kafka_destroy()
- */
-void
-rd_kafka_interceptors_on_destroy (rd_kafka_t *rk) {
- rd_kafka_interceptor_method_t *method;
- int i;
-
- RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_destroy, i) {
- rd_kafka_resp_err_t err;
-
- err = method->u.on_destroy(rk, method->ic_opaque);
- if (unlikely(err))
- rd_kafka_interceptor_failed(rk, method, "on_destroy",
- err, NULL, NULL);
- }
-}
-
-
-
-/**
- * @brief Call interceptor on_send methods.
- * @locality application thread calling produce()
- */
-void
-rd_kafka_interceptors_on_send (rd_kafka_t *rk, rd_kafka_message_t *rkmessage) {
- rd_kafka_interceptor_method_t *method;
- int i;
-
- RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_send, i) {
- rd_kafka_resp_err_t err;
-
- err = method->u.on_send(rk, rkmessage, method->ic_opaque);
- if (unlikely(err))
- rd_kafka_interceptor_failed(rk, method, "on_send", err,
- rkmessage, NULL);
- }
-}
-
-
-
-/**
- * @brief Call interceptor on_acknowledgement methods.
- * @locality application thread calling poll(), or the broker thread if
- * if dr callback has been set.
- */
-void
-rd_kafka_interceptors_on_acknowledgement (rd_kafka_t *rk,
- rd_kafka_message_t *rkmessage) {
- rd_kafka_interceptor_method_t *method;
- int i;
-
- RD_LIST_FOREACH(method,
- &rk->rk_conf.interceptors.on_acknowledgement, i) {
- rd_kafka_resp_err_t err;
-
- err = method->u.on_acknowledgement(rk, rkmessage,
- method->ic_opaque);
- if (unlikely(err))
- rd_kafka_interceptor_failed(rk, method,
- "on_acknowledgement", err,
- rkmessage, NULL);
- }
-}
-
-
-/**
- * @brief Call on_acknowledgement methods for all messages in queue.
- * @locality broker thread
- */
-void
-rd_kafka_interceptors_on_acknowledgement_queue (rd_kafka_t *rk,
- rd_kafka_msgq_t *rkmq) {
- rd_kafka_msg_t *rkm;
-
- RD_KAFKA_MSGQ_FOREACH(rkm, rkmq) {
- rd_kafka_interceptors_on_acknowledgement(rk,
- &rkm->rkm_rkmessage);
- }
-}
-
-
-/**
- * @brief Call interceptor on_consume methods.
- * @locality application thread calling poll(), consume() or similar prior to
- * passing the message to the application.
- */
-void
-rd_kafka_interceptors_on_consume (rd_kafka_t *rk,
- rd_kafka_message_t *rkmessage) {
- rd_kafka_interceptor_method_t *method;
- int i;
-
- RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_consume, i) {
- rd_kafka_resp_err_t err;
-
- err = method->u.on_consume(rk, rkmessage,
- method->ic_opaque);
- if (unlikely(err))
- rd_kafka_interceptor_failed(rk, method,
- "on_consume", err,
- rkmessage, NULL);
- }
-}
-
-
-/**
- * @brief Call interceptor on_commit methods.
- * @locality application thread calling poll(), consume() or similar,
- * or rdkafka main thread if no commit_cb or handler registered.
- */
-void
-rd_kafka_interceptors_on_commit (rd_kafka_t *rk,
- const rd_kafka_topic_partition_list_t *offsets,
- rd_kafka_resp_err_t err) {
- rd_kafka_interceptor_method_t *method;
- int i;
-
- RD_LIST_FOREACH(method, &rk->rk_conf.interceptors.on_commit, i) {
- rd_kafka_resp_err_t ic_err;
-
- ic_err = method->u.on_commit(rk, offsets, err,
- method->ic_opaque);
- if (unlikely(ic_err))
- rd_kafka_interceptor_failed(rk, method,
- "on_commit", ic_err, NULL,
- NULL);
- }
-}
-
-
-
-
-/**
- * @name Public API (backend)
- * @{
- */
-
-
-rd_kafka_resp_err_t
-rd_kafka_conf_interceptor_add_on_conf_set (
- rd_kafka_conf_t *conf, const char *ic_name,
- rd_kafka_interceptor_f_on_conf_set_t *on_conf_set,
- void *ic_opaque) {
- return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_set,
- ic_name, (void *)on_conf_set,
- ic_opaque);
-}
-
-rd_kafka_resp_err_t
-rd_kafka_conf_interceptor_add_on_conf_dup (
- rd_kafka_conf_t *conf, const char *ic_name,
- rd_kafka_interceptor_f_on_conf_dup_t *on_conf_dup,
- void *ic_opaque) {
- return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_dup,
- ic_name, (void *)on_conf_dup,
- ic_opaque);
-}
-
-rd_kafka_resp_err_t
-rd_kafka_conf_interceptor_add_on_conf_destroy (
- rd_kafka_conf_t *conf, const char *ic_name,
- rd_kafka_interceptor_f_on_conf_destroy_t *on_conf_destroy,
- void *ic_opaque) {
- return rd_kafka_interceptor_method_add(&conf->interceptors.on_conf_destroy,
- ic_name, (void *)on_conf_destroy,
- ic_opaque);
-}
-
-
-
-rd_kafka_resp_err_t
-rd_kafka_conf_interceptor_add_on_new (
- rd_kafka_conf_t *conf, const char *ic_name,
- rd_kafka_interceptor_f_on_new_t *on_new,
- void *ic_opaque) {
- return rd_kafka_interceptor_method_add(&conf->interceptors.on_new,
- ic_name, (void *)on_new,
- ic_opaque);
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_interceptor_add_on_destroy (
- rd_kafka_t *rk, const char *ic_name,
- rd_kafka_interceptor_f_on_destroy_t *on_destroy,
- void *ic_opaque) {
- assert(!rk->rk_initialized);
- return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.on_destroy,
- ic_name, (void *)on_destroy,
- ic_opaque);
-}
-
-rd_kafka_resp_err_t
-rd_kafka_interceptor_add_on_send (
- rd_kafka_t *rk, const char *ic_name,
- rd_kafka_interceptor_f_on_send_t *on_send,
- void *ic_opaque) {
- assert(!rk->rk_initialized);
- return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.on_send,
- ic_name, (void *)on_send,
- ic_opaque);
-}
-
-rd_kafka_resp_err_t
-rd_kafka_interceptor_add_on_acknowledgement (
- rd_kafka_t *rk, const char *ic_name,
- rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement,
- void *ic_opaque) {
- assert(!rk->rk_initialized);
- return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
- on_acknowledgement,
- ic_name,
- (void *)on_acknowledgement,
- ic_opaque);
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_interceptor_add_on_consume (
- rd_kafka_t *rk, const char *ic_name,
- rd_kafka_interceptor_f_on_consume_t *on_consume,
- void *ic_opaque) {
- assert(!rk->rk_initialized);
- return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
- on_consume,
- ic_name, (void *)on_consume,
- ic_opaque);
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_interceptor_add_on_commit (
- rd_kafka_t *rk, const char *ic_name,
- rd_kafka_interceptor_f_on_commit_t *on_commit,
- void *ic_opaque) {
- assert(!rk->rk_initialized);
- return rd_kafka_interceptor_method_add(&rk->rk_conf.interceptors.
- on_commit,
- ic_name, (void *)on_commit,
- ic_opaque);
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_interceptor.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_interceptor.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_interceptor.h
deleted file mode 100644
index 6be4e86..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_interceptor.h
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * librdkafka - The Apache Kafka C/C++ library
- *
- * Copyright (c) 2017 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#ifndef _RDKAFKA_INTERCEPTOR_H
-#define _RDKAFKA_INTERCEPTOR_H
-
-rd_kafka_conf_res_t
-rd_kafka_interceptors_on_conf_set (rd_kafka_conf_t *conf,
- const char *name, const char *val,
- char *errstr, size_t errstr_size);
-void
-rd_kafka_interceptors_on_conf_dup (rd_kafka_conf_t *new_conf,
- const rd_kafka_conf_t *old_conf,
- size_t filter_cnt, const char **filter);
-void
-rd_kafka_interceptors_on_conf_destroy (rd_kafka_conf_t *conf) ;
-void
-rd_kafka_interceptors_on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf);
-void
-rd_kafka_interceptors_on_destroy (rd_kafka_t *rk);
-void
-rd_kafka_interceptors_on_send (rd_kafka_t *rk, rd_kafka_message_t *rkmessage);
-void
-rd_kafka_interceptors_on_acknowledgement (rd_kafka_t *rk,
- rd_kafka_message_t *rkmessage);
-void
-rd_kafka_interceptors_on_acknowledgement_queue (rd_kafka_t *rk,
- rd_kafka_msgq_t *rkmq);
-
-void rd_kafka_interceptors_on_consume (rd_kafka_t *rk,
- rd_kafka_message_t *rkmessage);
-void
-rd_kafka_interceptors_on_commit (rd_kafka_t *rk,
- const rd_kafka_topic_partition_list_t *offsets,
- rd_kafka_resp_err_t err);
-
-
-void rd_kafka_conf_interceptor_ctor (int scope, void *pconf);
-void rd_kafka_conf_interceptor_dtor (int scope, void *pconf);
-void rd_kafka_conf_interceptor_copy (int scope, void *pdst, const void *psrc,
- void *dstptr, const void *srcptr,
- size_t filter_cnt, const char **filter);
-
-void rd_kafka_interceptors_destroy (rd_kafka_conf_t *conf);
-
-#endif /* _RDKAFKA_INTERCEPTOR_H */