You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:51 UTC

[23/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade to librdkafka 0.11.4

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_op.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_op.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_op.c
deleted file mode 100644
index a761e7a..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_op.c
+++ /dev/null
@@ -1,662 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include <stdarg.h>
-
-#include "rdkafka_int.h"
-#include "rdkafka_op.h"
-#include "rdkafka_topic.h"
-#include "rdkafka_partition.h"
-#include "rdkafka_offset.h"
-
-/* Current number of rd_kafka_op_t */
-rd_atomic32_t rd_kafka_op_cnt;
-
-
-const char *rd_kafka_op2str (rd_kafka_op_type_t type) {
-        int skiplen = 6;
-        static const char *names[] = {
-                [RD_KAFKA_OP_NONE] = "REPLY:NONE",
-                [RD_KAFKA_OP_FETCH] = "REPLY:FETCH",
-                [RD_KAFKA_OP_ERR] = "REPLY:ERR",
-                [RD_KAFKA_OP_CONSUMER_ERR] = "REPLY:CONSUMER_ERR",
-                [RD_KAFKA_OP_DR] = "REPLY:DR",
-                [RD_KAFKA_OP_STATS] = "REPLY:STATS",
-                [RD_KAFKA_OP_OFFSET_COMMIT] = "REPLY:OFFSET_COMMIT",
-                [RD_KAFKA_OP_NODE_UPDATE] = "REPLY:NODE_UPDATE",
-                [RD_KAFKA_OP_XMIT_BUF] = "REPLY:XMIT_BUF",
-                [RD_KAFKA_OP_RECV_BUF] = "REPLY:RECV_BUF",
-                [RD_KAFKA_OP_XMIT_RETRY] = "REPLY:XMIT_RETRY",
-                [RD_KAFKA_OP_FETCH_START] = "REPLY:FETCH_START",
-                [RD_KAFKA_OP_FETCH_STOP] = "REPLY:FETCH_STOP",
-                [RD_KAFKA_OP_SEEK] = "REPLY:SEEK",
-                [RD_KAFKA_OP_PAUSE] = "REPLY:PAUSE",
-                [RD_KAFKA_OP_OFFSET_FETCH] = "REPLY:OFFSET_FETCH",
-                [RD_KAFKA_OP_PARTITION_JOIN] = "REPLY:PARTITION_JOIN",
-                [RD_KAFKA_OP_PARTITION_LEAVE] = "REPLY:PARTITION_LEAVE",
-                [RD_KAFKA_OP_REBALANCE] = "REPLY:REBALANCE",
-                [RD_KAFKA_OP_TERMINATE] = "REPLY:TERMINATE",
-                [RD_KAFKA_OP_COORD_QUERY] = "REPLY:COORD_QUERY",
-                [RD_KAFKA_OP_SUBSCRIBE] = "REPLY:SUBSCRIBE",
-                [RD_KAFKA_OP_ASSIGN] = "REPLY:ASSIGN",
-                [RD_KAFKA_OP_GET_SUBSCRIPTION] = "REPLY:GET_SUBSCRIPTION",
-                [RD_KAFKA_OP_GET_ASSIGNMENT] = "REPLY:GET_ASSIGNMENT",
-                [RD_KAFKA_OP_THROTTLE] = "REPLY:THROTTLE",
-                [RD_KAFKA_OP_NAME] = "REPLY:NAME",
-                [RD_KAFKA_OP_OFFSET_RESET] = "REPLY:OFFSET_RESET",
-                [RD_KAFKA_OP_METADATA] = "REPLY:METADATA",
-                [RD_KAFKA_OP_LOG] = "REPLY:LOG",
-                [RD_KAFKA_OP_WAKEUP] = "REPLY:WAKEUP",
-        };
-
-        if (type & RD_KAFKA_OP_REPLY)
-                skiplen = 0;
-
-        return names[type & ~RD_KAFKA_OP_FLAGMASK]+skiplen;
-}
-
-
-void rd_kafka_op_print (FILE *fp, const char *prefix, rd_kafka_op_t *rko) {
-	fprintf(fp,
-		"%s((rd_kafka_op_t*)%p)\n"
-		"%s Type: %s (0x%x), Version: %"PRId32"\n",
-		prefix, rko,
-		prefix, rd_kafka_op2str(rko->rko_type), rko->rko_type,
-		rko->rko_version);
-	if (rko->rko_err)
-		fprintf(fp, "%s Error: %s\n",
-			prefix, rd_kafka_err2str(rko->rko_err));
-	if (rko->rko_replyq.q)
-		fprintf(fp, "%s Replyq %p v%d (%s)\n",
-			prefix, rko->rko_replyq.q, rko->rko_replyq.version,
-#if ENABLE_DEVEL
-			rko->rko_replyq._id
-#else
-			""
-#endif
-			);
-	if (rko->rko_rktp) {
-		rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
-		fprintf(fp, "%s ((rd_kafka_toppar_t*)%p) "
-			"%s [%"PRId32"] v%d (shptr %p)\n",
-			prefix, rktp, rktp->rktp_rkt->rkt_topic->str,
-			rktp->rktp_partition,
-			rd_atomic32_get(&rktp->rktp_version), rko->rko_rktp);
-	}
-
-	switch (rko->rko_type & ~RD_KAFKA_OP_FLAGMASK)
-	{
-	case RD_KAFKA_OP_FETCH:
-		fprintf(fp,  "%s Offset: %"PRId64"\n",
-			prefix, rko->rko_u.fetch.rkm.rkm_offset);
-		break;
-	case RD_KAFKA_OP_CONSUMER_ERR:
-		fprintf(fp,  "%s Offset: %"PRId64"\n",
-			prefix, rko->rko_u.err.offset);
-		/* FALLTHRU */
-	case RD_KAFKA_OP_ERR:
-		fprintf(fp, "%s Reason: %s\n", prefix, rko->rko_u.err.errstr);
-		break;
-	case RD_KAFKA_OP_DR:
-		fprintf(fp, "%s %"PRId32" messages on %s\n", prefix,
-			rd_atomic32_get(&rko->rko_u.dr.msgq.rkmq_msg_cnt),
-			rko->rko_u.dr.s_rkt ?
-			rd_kafka_topic_s2i(rko->rko_u.dr.s_rkt)->
-			rkt_topic->str : "(n/a)");
-		break;
-	case RD_KAFKA_OP_OFFSET_COMMIT:
-		fprintf(fp, "%s Callback: %p (opaque %p)\n",
-			prefix, rko->rko_u.offset_commit.cb,
-			rko->rko_u.offset_commit.opaque);
-		fprintf(fp, "%s %d partitions\n",
-			prefix,
-			rko->rko_u.offset_commit.partitions ?
-			rko->rko_u.offset_commit.partitions->cnt : 0);
-		break;
-
-        case RD_KAFKA_OP_LOG:
-                fprintf(fp, "%s Log: %%%d %s: %s\n",
-                        prefix, rko->rko_u.log.level,
-                        rko->rko_u.log.fac,
-                        rko->rko_u.log.str);
-                break;
-
-	default:
-		break;
-	}
-}
-
-
-rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type) {
-	rd_kafka_op_t *rko;
-        static const size_t op2size[RD_KAFKA_OP__END] = {
-                [RD_KAFKA_OP_FETCH] = sizeof(rko->rko_u.fetch),
-                [RD_KAFKA_OP_ERR] = sizeof(rko->rko_u.err),
-                [RD_KAFKA_OP_CONSUMER_ERR] = sizeof(rko->rko_u.err),
-                [RD_KAFKA_OP_DR] = sizeof(rko->rko_u.dr),
-                [RD_KAFKA_OP_STATS] = sizeof(rko->rko_u.stats),
-                [RD_KAFKA_OP_OFFSET_COMMIT] = sizeof(rko->rko_u.offset_commit),
-                [RD_KAFKA_OP_NODE_UPDATE] = sizeof(rko->rko_u.node),
-                [RD_KAFKA_OP_XMIT_BUF] = sizeof(rko->rko_u.xbuf),
-                [RD_KAFKA_OP_RECV_BUF] = sizeof(rko->rko_u.xbuf),
-                [RD_KAFKA_OP_XMIT_RETRY] = sizeof(rko->rko_u.xbuf),
-                [RD_KAFKA_OP_FETCH_START] = sizeof(rko->rko_u.fetch_start),
-                [RD_KAFKA_OP_FETCH_STOP] = 0,
-                [RD_KAFKA_OP_SEEK] = sizeof(rko->rko_u.fetch_start),
-                [RD_KAFKA_OP_PAUSE] = sizeof(rko->rko_u.pause),
-                [RD_KAFKA_OP_OFFSET_FETCH] = sizeof(rko->rko_u.offset_fetch),
-                [RD_KAFKA_OP_PARTITION_JOIN] = 0,
-                [RD_KAFKA_OP_PARTITION_LEAVE] = 0,
-                [RD_KAFKA_OP_REBALANCE] = sizeof(rko->rko_u.rebalance),
-                [RD_KAFKA_OP_TERMINATE] = 0,
-                [RD_KAFKA_OP_COORD_QUERY] = 0,
-                [RD_KAFKA_OP_SUBSCRIBE] = sizeof(rko->rko_u.subscribe),
-                [RD_KAFKA_OP_ASSIGN] = sizeof(rko->rko_u.assign),
-                [RD_KAFKA_OP_GET_SUBSCRIPTION] = sizeof(rko->rko_u.subscribe),
-                [RD_KAFKA_OP_GET_ASSIGNMENT] = sizeof(rko->rko_u.assign),
-                [RD_KAFKA_OP_THROTTLE] = sizeof(rko->rko_u.throttle),
-                [RD_KAFKA_OP_NAME] = sizeof(rko->rko_u.name),
-                [RD_KAFKA_OP_OFFSET_RESET] = sizeof(rko->rko_u.offset_reset),
-                [RD_KAFKA_OP_METADATA] = sizeof(rko->rko_u.metadata),
-                [RD_KAFKA_OP_LOG] = sizeof(rko->rko_u.log),
-                [RD_KAFKA_OP_WAKEUP] = 0,
-	};
-	size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK];
-
-	rko = rd_calloc(1, sizeof(*rko)-sizeof(rko->rko_u)+tsize);
-	rko->rko_type = type;
-
-#if ENABLE_DEVEL
-        rko->rko_source = source;
-        rd_atomic32_add(&rd_kafka_op_cnt, 1);
-#endif
-	return rko;
-}
-
-
-void rd_kafka_op_destroy (rd_kafka_op_t *rko) {
-
-	switch (rko->rko_type & ~RD_KAFKA_OP_FLAGMASK)
-	{
-	case RD_KAFKA_OP_FETCH:
-		rd_kafka_msg_destroy(NULL, &rko->rko_u.fetch.rkm);
-		/* Decrease refcount on rkbuf to eventually rd_free shared buf*/
-		if (rko->rko_u.fetch.rkbuf)
-			rd_kafka_buf_handle_op(rko, RD_KAFKA_RESP_ERR__DESTROY);
-
-		break;
-
-	case RD_KAFKA_OP_OFFSET_FETCH:
-		if (rko->rko_u.offset_fetch.partitions &&
-		    rko->rko_u.offset_fetch.do_free)
-			rd_kafka_topic_partition_list_destroy(
-				rko->rko_u.offset_fetch.partitions);
-		break;
-
-	case RD_KAFKA_OP_OFFSET_COMMIT:
-		RD_IF_FREE(rko->rko_u.offset_commit.partitions,
-			   rd_kafka_topic_partition_list_destroy);
-                RD_IF_FREE(rko->rko_u.offset_commit.reason, rd_free);
-		break;
-
-	case RD_KAFKA_OP_SUBSCRIBE:
-	case RD_KAFKA_OP_GET_SUBSCRIPTION:
-		RD_IF_FREE(rko->rko_u.subscribe.topics,
-			   rd_kafka_topic_partition_list_destroy);
-		break;
-
-	case RD_KAFKA_OP_ASSIGN:
-	case RD_KAFKA_OP_GET_ASSIGNMENT:
-		RD_IF_FREE(rko->rko_u.assign.partitions,
-			   rd_kafka_topic_partition_list_destroy);
-		break;
-
-	case RD_KAFKA_OP_REBALANCE:
-		RD_IF_FREE(rko->rko_u.rebalance.partitions,
-			   rd_kafka_topic_partition_list_destroy);
-		break;
-
-	case RD_KAFKA_OP_NAME:
-		RD_IF_FREE(rko->rko_u.name.str, rd_free);
-		break;
-
-	case RD_KAFKA_OP_ERR:
-	case RD_KAFKA_OP_CONSUMER_ERR:
-		RD_IF_FREE(rko->rko_u.err.errstr, rd_free);
-		rd_kafka_msg_destroy(NULL, &rko->rko_u.err.rkm);
-		break;
-
-		break;
-
-	case RD_KAFKA_OP_THROTTLE:
-		RD_IF_FREE(rko->rko_u.throttle.nodename, rd_free);
-		break;
-
-	case RD_KAFKA_OP_STATS:
-		RD_IF_FREE(rko->rko_u.stats.json, rd_free);
-		break;
-
-	case RD_KAFKA_OP_XMIT_RETRY:
-	case RD_KAFKA_OP_XMIT_BUF:
-	case RD_KAFKA_OP_RECV_BUF:
-		if (rko->rko_u.xbuf.rkbuf)
-			rd_kafka_buf_handle_op(rko, RD_KAFKA_RESP_ERR__DESTROY);
-
-		RD_IF_FREE(rko->rko_u.xbuf.rkbuf, rd_kafka_buf_destroy);
-		break;
-
-	case RD_KAFKA_OP_DR:
-		rd_kafka_msgq_purge(rko->rko_rk, &rko->rko_u.dr.msgq);
-		if (rko->rko_u.dr.do_purge2)
-			rd_kafka_msgq_purge(rko->rko_rk, &rko->rko_u.dr.msgq2);
-
-		if (rko->rko_u.dr.s_rkt)
-			rd_kafka_topic_destroy0(rko->rko_u.dr.s_rkt);
-		break;
-
-	case RD_KAFKA_OP_OFFSET_RESET:
-		RD_IF_FREE(rko->rko_u.offset_reset.reason, rd_free);
-		break;
-
-        case RD_KAFKA_OP_METADATA:
-                RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy);
-                break;
-
-        case RD_KAFKA_OP_LOG:
-                rd_free(rko->rko_u.log.str);
-                break;
-
-	default:
-		break;
-	}
-
-        if (rko->rko_type & RD_KAFKA_OP_CB && rko->rko_op_cb) {
-                rd_kafka_op_res_t res;
-                /* Let callback clean up */
-                rko->rko_err = RD_KAFKA_RESP_ERR__DESTROY;
-                res = rko->rko_op_cb(rko->rko_rk, NULL, rko);
-                assert(res != RD_KAFKA_OP_RES_YIELD);
-        }
-
-	RD_IF_FREE(rko->rko_rktp, rd_kafka_toppar_destroy);
-
-	rd_kafka_replyq_destroy(&rko->rko_replyq);
-
-#if ENABLE_DEVEL
-        if (rd_atomic32_sub(&rd_kafka_op_cnt, 1) < 0)
-                rd_kafka_assert(NULL, !*"rd_kafka_op_cnt < 0");
-#endif
-
-	rd_free(rko);
-}
-
-
-
-
-
-
-
-
-
-
-
-/**
- * Propagate an error event to the application on a specific queue.
- * \p optype should be RD_KAFKA_OP_ERR for generic errors and
- * RD_KAFKA_OP_CONSUMER_ERR for consumer errors.
- */
-void rd_kafka_q_op_err (rd_kafka_q_t *rkq, rd_kafka_op_type_t optype,
-                        rd_kafka_resp_err_t err, int32_t version,
-			rd_kafka_toppar_t *rktp, int64_t offset,
-                        const char *fmt, ...) {
-	va_list ap;
-	char buf[2048];
-	rd_kafka_op_t *rko;
-
-	va_start(ap, fmt);
-	rd_vsnprintf(buf, sizeof(buf), fmt, ap);
-	va_end(ap);
-
-	rko = rd_kafka_op_new(optype);
-	rko->rko_version = version;
-	rko->rko_err = err;
-	rko->rko_u.err.offset = offset;
-	rko->rko_u.err.errstr = rd_strdup(buf);
-	if (rktp)
-		rko->rko_rktp = rd_kafka_toppar_keep(rktp);
-
-	rd_kafka_q_enq(rkq, rko);
-}
-
-
-
-/**
- * Creates a reply opp based on 'rko_orig'.
- * If 'rko_orig' has rko_op_cb set the reply op will be OR:ed with
- * RD_KAFKA_OP_CB, else the reply type will be the original rko_type OR:ed
- * with RD_KAFKA_OP_REPLY.
- */
-rd_kafka_op_t *rd_kafka_op_new_reply (rd_kafka_op_t *rko_orig,
-				      rd_kafka_resp_err_t err) {
-        rd_kafka_op_t *rko;
-
-        rko = rd_kafka_op_new(rko_orig->rko_type |
-			      (rko_orig->rko_op_cb ?
-			       RD_KAFKA_OP_CB : RD_KAFKA_OP_REPLY));
-	rd_kafka_op_get_reply_version(rko, rko_orig);
-	rko->rko_op_cb   = rko_orig->rko_op_cb;
-	rko->rko_err     = err;
-	if (rko_orig->rko_rktp)
-		rko->rko_rktp = rd_kafka_toppar_keep(
-			rd_kafka_toppar_s2i(rko_orig->rko_rktp));
-
-        return rko;
-}
-
-
-/**
- * @brief Create new callback op for type \p type
- */
-rd_kafka_op_t *rd_kafka_op_new_cb (rd_kafka_t *rk,
-                                   rd_kafka_op_type_t type,
-                                   rd_kafka_op_cb_t *cb) {
-        rd_kafka_op_t *rko;
-        rko = rd_kafka_op_new(type | RD_KAFKA_OP_CB);
-        rko->rko_op_cb = cb;
-        rko->rko_rk = rk;
-        return rko;
-}
-
-
-
-/**
- * @brief Reply to 'rko' re-using the same rko.
- * If there is no replyq the rko is destroyed.
- *
- * @returns 1 if op was enqueued, else 0 and rko is destroyed.
- */
-int rd_kafka_op_reply (rd_kafka_op_t *rko, rd_kafka_resp_err_t err) {
-
-        if (!rko->rko_replyq.q) {
-		rd_kafka_op_destroy(rko);
-                return 0;
-	}
-
-	rko->rko_type |= (rko->rko_op_cb ? RD_KAFKA_OP_CB : RD_KAFKA_OP_REPLY);
-        rko->rko_err   = err;
-
-	return rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0);
-}
-
-
-/**
- * @brief Send request to queue, wait for response.
- *
- * @returns response on success or NULL if destq is disabled.
- */
-rd_kafka_op_t *rd_kafka_op_req0 (rd_kafka_q_t *destq,
-                                 rd_kafka_q_t *recvq,
-                                 rd_kafka_op_t *rko,
-                                 int timeout_ms) {
-        rd_kafka_op_t *reply;
-
-        /* Indicate to destination where to send reply. */
-        rd_kafka_op_set_replyq(rko, recvq, NULL);
-
-        /* Enqueue op */
-        if (!rd_kafka_q_enq(destq, rko))
-                return NULL;
-
-        /* Wait for reply */
-        reply = rd_kafka_q_pop(recvq, timeout_ms, 0);
-
-        /* May be NULL for timeout */
-        return reply;
-}
-
-/**
- * Send request to queue, wait for response.
- * Creates a temporary reply queue.
- */
-rd_kafka_op_t *rd_kafka_op_req (rd_kafka_q_t *destq,
-                                rd_kafka_op_t *rko,
-                                int timeout_ms) {
-        rd_kafka_q_t *recvq;
-        rd_kafka_op_t *reply;
-
-        recvq = rd_kafka_q_new(destq->rkq_rk);
-
-        reply = rd_kafka_op_req0(destq, recvq, rko, timeout_ms);
-
-        rd_kafka_q_destroy(recvq);
-
-        return reply;
-}
-
-
-/**
- * Send simple type-only request to queue, wait for response.
- */
-rd_kafka_op_t *rd_kafka_op_req2 (rd_kafka_q_t *destq, rd_kafka_op_type_t type) {
-        rd_kafka_op_t *rko;
-
-        rko = rd_kafka_op_new(type);
-        return rd_kafka_op_req(destq, rko, RD_POLL_INFINITE);
-}
-
-/**
- * Destroys the rko and returns its error.
- */
-rd_kafka_resp_err_t rd_kafka_op_err_destroy (rd_kafka_op_t *rko) {
-        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
-
-	if (rko) {
-		err = rko->rko_err;
-		rd_kafka_op_destroy(rko);
-	}
-        return err;
-}
-
-
-/**
- * Call op callback
- */
-rd_kafka_op_res_t rd_kafka_op_call (rd_kafka_t *rk, rd_kafka_q_t *rkq,
-                                    rd_kafka_op_t *rko) {
-        rd_kafka_op_res_t res;
-        res = rko->rko_op_cb(rk, rkq, rko);
-        if (unlikely(res == RD_KAFKA_OP_RES_YIELD || rd_kafka_yield_thread))
-                return RD_KAFKA_OP_RES_YIELD;
-        rko->rko_op_cb = NULL;
-        return res;
-}
-
-
-/**
- * @brief Creates a new RD_KAFKA_OP_FETCH op and sets up the
- *        embedded message according to the parameters.
- *
- * @param rkmp will be set to the embedded rkm in the rko (for convenience)
- * @param offset may be updated later if relative offset.
- */
-rd_kafka_op_t *
-rd_kafka_op_new_fetch_msg (rd_kafka_msg_t **rkmp,
-                           rd_kafka_toppar_t *rktp,
-                           int32_t version,
-                           rd_kafka_buf_t *rkbuf,
-                           int64_t offset,
-                           size_t key_len, const void *key,
-                           size_t val_len, const void *val) {
-        rd_kafka_msg_t *rkm;
-        rd_kafka_op_t *rko;
-
-        rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH);
-        rko->rko_rktp    = rd_kafka_toppar_keep(rktp);
-        rko->rko_version = version;
-        rkm   = &rko->rko_u.fetch.rkm;
-        *rkmp = rkm;
-
-        /* Since all the ops share the same payload buffer
-         * a refcnt is used on the rkbuf that makes sure all
-         * consume_cb() will have been
-         * called for each of these ops before the rkbuf
-         * and its memory backing buffers are freed. */
-        rko->rko_u.fetch.rkbuf = rkbuf;
-        rd_kafka_buf_keep(rkbuf);
-
-        rkm->rkm_offset    = offset;
-
-        rkm->rkm_key       = (void *)key;
-        rkm->rkm_key_len   = key_len;
-
-        rkm->rkm_payload   = (void *)val;
-        rkm->rkm_len       = val_len;
-        rko->rko_len       = (int32_t)rkm->rkm_len;
-
-        rkm->rkm_partition = rktp->rktp_partition;
-
-        return rko;
-}
-
-
-/**
- * Enqueue ERR__THROTTLE op, if desired.
- */
-void rd_kafka_op_throttle_time (rd_kafka_broker_t *rkb,
-				rd_kafka_q_t *rkq,
-				int throttle_time) {
-	rd_kafka_op_t *rko;
-
-	rd_avg_add(&rkb->rkb_avg_throttle, throttle_time);
-
-	/* We send throttle events when:
-	 *  - throttle_time > 0
-	 *  - throttle_time == 0 and last throttle_time > 0
-	 */
-	if (!rkb->rkb_rk->rk_conf.throttle_cb ||
-	    (!throttle_time && !rd_atomic32_get(&rkb->rkb_rk->rk_last_throttle)))
-		return;
-
-	rd_atomic32_set(&rkb->rkb_rk->rk_last_throttle, throttle_time);
-
-	rko = rd_kafka_op_new(RD_KAFKA_OP_THROTTLE);
-        rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
-	rko->rko_u.throttle.nodename = rd_strdup(rkb->rkb_nodename);
-	rko->rko_u.throttle.nodeid   = rkb->rkb_nodeid;
-	rko->rko_u.throttle.throttle_time = throttle_time;
-	rd_kafka_q_enq(rkq, rko);
-}
-
-
-/**
- * @brief Handle standard op types.
- */
-rd_kafka_op_res_t
-rd_kafka_op_handle_std (rd_kafka_t *rk, rd_kafka_q_t *rkq,
-                        rd_kafka_op_t *rko, int cb_type) {
-        if (cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
-                return RD_KAFKA_OP_RES_PASS;
-        else if (cb_type != RD_KAFKA_Q_CB_EVENT &&
-                 rko->rko_type & RD_KAFKA_OP_CB)
-                return rd_kafka_op_call(rk, rkq, rko);
-        else if (rko->rko_type == RD_KAFKA_OP_RECV_BUF) /* Handle Response */
-                rd_kafka_buf_handle_op(rko, rko->rko_err);
-        else if (rko->rko_type == RD_KAFKA_OP_WAKEUP)
-                ;/* do nothing, wake up is a fact anyway */
-        else if (cb_type != RD_KAFKA_Q_CB_RETURN &&
-                 rko->rko_type & RD_KAFKA_OP_REPLY &&
-                 rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
-                return RD_KAFKA_OP_RES_HANDLED; /* dest queue was
-                                                 * probably disabled. */
-        else
-                return RD_KAFKA_OP_RES_PASS;
-
-        return RD_KAFKA_OP_RES_HANDLED;
-}
-
-
-/**
- * @brief Attempt to handle op using its queue's serve callback,
- *        or the passed callback, or op_handle_std(), else do nothing.
- *
- * @param rkq is \p rko's queue (which it was unlinked from) with rkq_lock
- *            being held. Callback may re-enqueue the op on this queue
- *            and return YIELD.
- *
- * @returns HANDLED if op was handled (and destroyed), PASS if not,
- *          or YIELD if op was handled (maybe destroyed or re-enqueued)
- *          and caller must propagate yield upwards (cancel and return).
- */
-rd_kafka_op_res_t
-rd_kafka_op_handle (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
-                    rd_kafka_q_cb_type_t cb_type, void *opaque,
-                    rd_kafka_q_serve_cb_t *callback) {
-        rd_kafka_op_res_t res;
-
-        res = rd_kafka_op_handle_std(rk, rkq, rko, cb_type);
-        if (res == RD_KAFKA_OP_RES_HANDLED) {
-                rd_kafka_op_destroy(rko);
-                return res;
-        } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD))
-                return res;
-
-        if (rko->rko_serve) {
-                callback = rko->rko_serve;
-                opaque   = rko->rko_serve_opaque;
-                rko->rko_serve        = NULL;
-                rko->rko_serve_opaque = NULL;
-        }
-
-        if (callback)
-                res = callback(rk, rkq, rko, cb_type, opaque);
-
-        return res;
-}
-
-
-/**
- * @brief Store offset for fetched message.
- */
-void rd_kafka_op_offset_store (rd_kafka_t *rk, rd_kafka_op_t *rko,
-			       const rd_kafka_message_t *rkmessage) {
-	rd_kafka_toppar_t *rktp;
-
-	if (unlikely(rko->rko_type != RD_KAFKA_OP_FETCH || rko->rko_err))
-		return;
-
-	rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
-
-	if (unlikely(!rk))
-		rk = rktp->rktp_rkt->rkt_rk;
-
-	rd_kafka_toppar_lock(rktp);
-	rktp->rktp_app_offset = rkmessage->offset+1;
-	if (rk->rk_conf.enable_auto_offset_store)
-		rd_kafka_offset_store0(rktp, rkmessage->offset+1, 0/*no lock*/);
-	rd_kafka_toppar_unlock(rktp);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_op.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_op.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_op.h
deleted file mode 100644
index f0af481..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_op.h
+++ /dev/null
@@ -1,400 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-#pragma once
-
-
-#include "rdkafka_msg.h"
-
-/* Forward declarations */
-typedef struct rd_kafka_q_s rd_kafka_q_t;
-typedef struct rd_kafka_toppar_s rd_kafka_toppar_t;
-typedef struct rd_kafka_op_s rd_kafka_op_t;
-
-/* One-off reply queue + reply version.
- * All APIs that take a rd_kafka_replyq_t makes a copy of the
- * struct as-is and grabs hold of the existing .q refcount.
- * Think of replyq as a (Q,VERSION) tuple. */
-typedef struct rd_kafka_replyq_s {
-	rd_kafka_q_t *q;
-	int32_t       version;
-#if ENABLE_DEVEL
-	char *_id; /* Devel id used for debugging reference leaks.
-		    * Is a strdup() of the caller's function name,
-		    * which makes for easy debugging with valgrind. */
-#endif
-} rd_kafka_replyq_t;
-
-
-
-
-/**
- * Flags used by:
- *   - rd_kafka_op_t.rko_flags
- *   - rd_kafka_buf_t.rkbuf_flags
- */
-#define RD_KAFKA_OP_F_FREE        0x1  /* rd_free payload when done with it */
-#define RD_KAFKA_OP_F_FLASH       0x2  /* Internal: insert at head of queue */
-#define RD_KAFKA_OP_F_NO_RESPONSE 0x4  /* rkbuf: Not expecting a response */
-#define RD_KAFKA_OP_F_CRC         0x8  /* rkbuf: Perform CRC calculation */
-#define RD_KAFKA_OP_F_BLOCKING    0x10 /* rkbuf: blocking protocol request */
-#define RD_KAFKA_OP_F_REPROCESS   0x20 /* cgrp: Reprocess at a later time. */
-
-
-typedef enum {
-        RD_KAFKA_OP_NONE,     /* No specific type, use OP_CB */
-	RD_KAFKA_OP_FETCH,    /* Kafka thread -> Application */
-	RD_KAFKA_OP_ERR,      /* Kafka thread -> Application */
-        RD_KAFKA_OP_CONSUMER_ERR, /* Kafka thread -> Application */
-	RD_KAFKA_OP_DR,       /* Kafka thread -> Application
-			       * Produce message delivery report */
-	RD_KAFKA_OP_STATS,    /* Kafka thread -> Application */
-
-        RD_KAFKA_OP_OFFSET_COMMIT, /* any -> toppar's Broker thread */
-        RD_KAFKA_OP_NODE_UPDATE,   /* any -> Broker thread: node update */
-
-        RD_KAFKA_OP_XMIT_BUF, /* transmit buffer: any -> broker thread */
-        RD_KAFKA_OP_RECV_BUF, /* received response buffer: broker thr -> any */
-        RD_KAFKA_OP_XMIT_RETRY, /* retry buffer xmit: any -> broker thread */
-        RD_KAFKA_OP_FETCH_START, /* Application -> toppar's handler thread */
-        RD_KAFKA_OP_FETCH_STOP,  /* Application -> toppar's handler thread */
-        RD_KAFKA_OP_SEEK,        /* Application -> toppar's handler thread */
-	RD_KAFKA_OP_PAUSE,       /* Application -> toppar's handler thread */
-        RD_KAFKA_OP_OFFSET_FETCH, /* Broker -> broker thread: fetch offsets
-                                   * for topic. */
-
-        RD_KAFKA_OP_PARTITION_JOIN,  /* * -> cgrp op:   add toppar to cgrp
-                                      * * -> broker op: add toppar to broker */
-        RD_KAFKA_OP_PARTITION_LEAVE, /* * -> cgrp op:   remove toppar from cgrp
-                                      * * -> broker op: remove toppar from rkb*/
-        RD_KAFKA_OP_REBALANCE,       /* broker thread -> app:
-                                      * group rebalance */
-        RD_KAFKA_OP_TERMINATE,       /* For generic use */
-        RD_KAFKA_OP_COORD_QUERY,     /* Query for coordinator */
-        RD_KAFKA_OP_SUBSCRIBE,       /* New subscription */
-        RD_KAFKA_OP_ASSIGN,          /* New assignment */
-        RD_KAFKA_OP_GET_SUBSCRIPTION,/* Get current subscription.
-				      * Reuses u.subscribe */
-        RD_KAFKA_OP_GET_ASSIGNMENT,  /* Get current assignment.
-				      * Reuses u.assign */
-	RD_KAFKA_OP_THROTTLE,        /* Throttle info */
-	RD_KAFKA_OP_NAME,            /* Request name */
-	RD_KAFKA_OP_OFFSET_RESET,    /* Offset reset */
-        RD_KAFKA_OP_METADATA,        /* Metadata response */
-        RD_KAFKA_OP_LOG,             /* Log */
-        RD_KAFKA_OP_WAKEUP,          /* Wake-up signaling */
-        RD_KAFKA_OP__END
-} rd_kafka_op_type_t;
-
-/* Flags used with op_type_t */
-#define RD_KAFKA_OP_CB        (1 << 30)  /* Callback op. */
-#define RD_KAFKA_OP_REPLY     (1 << 31)  /* Reply op. */
-#define RD_KAFKA_OP_FLAGMASK  (RD_KAFKA_OP_CB | RD_KAFKA_OP_REPLY)
-
-
-/**
- * @brief Op/queue priority levels.
- * @remark Since priority levels alter the FIFO order, pay extra attention
- *         to preserve ordering as deemed necessary.
- * @remark Priority should only be set on ops destined for application
- *         facing queues (rk_rep, rkcg_q, etc).
- */
-typedef enum {
-        RD_KAFKA_PRIO_NORMAL = 0,   /* Normal bulk, messages, DRs, etc. */
-        RD_KAFKA_PRIO_MEDIUM,       /* Prioritize in front of bulk,
-                                     * still at some scale. e.g. logs, .. */
-        RD_KAFKA_PRIO_HIGH,         /* Small scale high priority */
-        RD_KAFKA_PRIO_FLASH         /* Micro scale, immediate delivery. */
-} rd_kafka_op_prio_t;
-
-
-/**
- * @brief Op handler result
- *
- * @remark When returning YIELD from a handler the handler will
- *         need to have made sure to either re-enqueue the op or destroy it
- *         since the caller will not touch the op anymore.
- */
-typedef enum {
-        RD_KAFKA_OP_RES_PASS,    /* Not handled, pass to caller */
-        RD_KAFKA_OP_RES_HANDLED, /* Op was handled (through callbacks) */
-        RD_KAFKA_OP_RES_YIELD    /* Callback called yield */
-} rd_kafka_op_res_t;
-
-
-/**
- * @brief Queue serve callback call type
- */
-typedef enum {
-        RD_KAFKA_Q_CB_INVALID, /* dont use */
-        RD_KAFKA_Q_CB_CALLBACK,/* trigger callback based on op */
-        RD_KAFKA_Q_CB_RETURN,  /* return op rather than trigger callback
-                                * (if possible)*/
-        RD_KAFKA_Q_CB_FORCE_RETURN, /* return op, regardless of callback. */
-        RD_KAFKA_Q_CB_EVENT    /* like _Q_CB_RETURN but return event_t:ed op */
-} rd_kafka_q_cb_type_t;
-
-/**
- * @brief Queue serve callback
- * @remark See rd_kafka_op_res_t docs for return semantics.
- */
-typedef rd_kafka_op_res_t
-(rd_kafka_q_serve_cb_t) (rd_kafka_t *rk,
-                         struct rd_kafka_q_s *rkq,
-                         struct rd_kafka_op_s *rko,
-                         rd_kafka_q_cb_type_t cb_type, void *opaque)
-        RD_WARN_UNUSED_RESULT;
-
-/**
- * @brief Op callback type
- */
-typedef rd_kafka_op_res_t (rd_kafka_op_cb_t) (rd_kafka_t *rk,
-                                              rd_kafka_q_t *rkq,
-                                              struct rd_kafka_op_s *rko)
-                RD_WARN_UNUSED_RESULT;
-
-
-#define RD_KAFKA_OP_TYPE_ASSERT(rko,type) \
-	rd_kafka_assert(NULL, (rko)->rko_type == (type) && # type)
-
-struct rd_kafka_op_s {
-	TAILQ_ENTRY(rd_kafka_op_s) rko_link;
-
-	rd_kafka_op_type_t    rko_type;   /* Internal op type */
-	rd_kafka_event_type_t rko_evtype;
-	int                   rko_flags;  /* See RD_KAFKA_OP_F_... above */
-	int32_t               rko_version;
-	rd_kafka_resp_err_t   rko_err;
-	int32_t               rko_len;    /* Depends on type, typically the
-					   * message length. */
-        rd_kafka_op_prio_t    rko_prio;   /* In-queue priority.
-                                           * Higher value means higher prio. */
-
-	shptr_rd_kafka_toppar_t *rko_rktp;
-
-        /*
-	 * Generic fields
-	 */
-
-	/* Indicates request: enqueue reply on rko_replyq.q with .version.
-	 * .q is refcounted. */
-	rd_kafka_replyq_t rko_replyq;
-
-        /* Original queue's op serve callback and opaque, if any.
-         * Mainly used for forwarded queues to use the original queue's
-         * serve function from the forwarded position. */
-        rd_kafka_q_serve_cb_t *rko_serve;
-        void *rko_serve_opaque;
-
-	rd_kafka_t     *rko_rk;
-
-#if ENABLE_DEVEL
-        const char *rko_source;  /**< Where op was created */
-#endif
-
-        /* RD_KAFKA_OP_CB */
-        rd_kafka_op_cb_t *rko_op_cb;
-
-	union {
-		struct {
-			rd_kafka_buf_t *rkbuf;
-			rd_kafka_msg_t  rkm;
-			int evidx;
-		} fetch;
-
-		struct {
-			rd_kafka_topic_partition_list_t *partitions;
-			int do_free; /* free .partitions on destroy() */
-		} offset_fetch;
-
-		struct {
-			rd_kafka_topic_partition_list_t *partitions;
-			void (*cb) (rd_kafka_t *rk,
-				    rd_kafka_resp_err_t err,
-				    rd_kafka_topic_partition_list_t *offsets,
-				    void *opaque);
-			void *opaque;
-			int silent_empty; /**< Fail silently if there are no
-					   *   offsets to commit. */
-                        rd_ts_t ts_timeout;
-                        char *reason;
-		} offset_commit;
-
-		struct {
-			rd_kafka_topic_partition_list_t *topics;
-		} subscribe; /* also used for GET_SUBSCRIPTION */
-
-		struct {
-			rd_kafka_topic_partition_list_t *partitions;
-		} assign; /* also used for GET_ASSIGNMENT */
-
-		struct {
-			rd_kafka_topic_partition_list_t *partitions;
-		} rebalance;
-
-		struct {
-			char *str;
-		} name;
-
-		struct {
-			int64_t offset;
-			char *errstr;
-			rd_kafka_msg_t rkm;
-		} err;  /* used for ERR and CONSUMER_ERR */
-
-		struct {
-			int throttle_time;
-			int32_t nodeid;
-			char *nodename;
-		} throttle;
-
-		struct {
-			char *json;
-			size_t json_len;
-		} stats;
-
-		struct {
-			rd_kafka_buf_t *rkbuf;
-		} xbuf; /* XMIT_BUF and RECV_BUF */
-
-                /* RD_KAFKA_OP_METADATA */
-                struct {
-                        rd_kafka_metadata_t *md;
-                        int force; /* force request regardless of outstanding
-                                    * metadata requests. */
-                } metadata;
-
-		struct {
-			shptr_rd_kafka_itopic_t *s_rkt;
-			rd_kafka_msgq_t msgq;
-			rd_kafka_msgq_t msgq2;
-			int do_purge2;
-		} dr;
-
-		struct {
-			int32_t nodeid;
-			char    nodename[RD_KAFKA_NODENAME_SIZE];
-		} node;
-
-		struct {
-			int64_t offset;
-			char *reason;
-		} offset_reset;
-
-		struct {
-			int64_t offset;
-			struct rd_kafka_cgrp_s *rkcg;
-		} fetch_start; /* reused for SEEK */
-
-		struct {
-			int pause;
-			int flag;
-		} pause;
-
-                struct {
-                        char fac[64];
-                        int  level;
-                        char *str;
-                } log;
-	} rko_u;
-};
-
-TAILQ_HEAD(rd_kafka_op_head_s, rd_kafka_op_s);
-
-
-
-
-const char *rd_kafka_op2str (rd_kafka_op_type_t type);
-void rd_kafka_op_destroy (rd_kafka_op_t *rko);
-rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type);
-#if ENABLE_DEVEL
-#define _STRINGIFYX(A) #A
-#define _STRINGIFY(A) _STRINGIFYX(A)
-#define rd_kafka_op_new(type)                                   \
-        rd_kafka_op_new0(__FILE__ ":" _STRINGIFY(__LINE__), type)
-#else
-#define rd_kafka_op_new(type) rd_kafka_op_new0(NULL, type)
-#endif
-rd_kafka_op_t *rd_kafka_op_new_reply (rd_kafka_op_t *rko_orig,
-                                      rd_kafka_resp_err_t err);
-rd_kafka_op_t *rd_kafka_op_new_cb (rd_kafka_t *rk,
-                                   rd_kafka_op_type_t type,
-                                   rd_kafka_op_cb_t *cb);
-int rd_kafka_op_reply (rd_kafka_op_t *rko, rd_kafka_resp_err_t err);
-
-#define rd_kafka_op_set_prio(rko,prio) ((rko)->rko_prio = prio)
-
-
-#define rd_kafka_op_err(rk,err,...) do {				\
-		if (!(rk)->rk_conf.error_cb) {				\
-			rd_kafka_log(rk, LOG_ERR, "ERROR", __VA_ARGS__); \
-			break;						\
-		}							\
-		rd_kafka_q_op_err((rk)->rk_rep, RD_KAFKA_OP_ERR, err, 0, \
-				  NULL, 0, __VA_ARGS__);		\
-	} while (0)
-
-void rd_kafka_q_op_err (rd_kafka_q_t *rkq, rd_kafka_op_type_t optype,
-                        rd_kafka_resp_err_t err, int32_t version,
-                        rd_kafka_toppar_t *rktp, int64_t offset,
-			const char *fmt, ...);
-rd_kafka_op_t *rd_kafka_op_req (rd_kafka_q_t *destq,
-                                rd_kafka_op_t *rko,
-                                int timeout_ms);
-rd_kafka_op_t *rd_kafka_op_req2 (rd_kafka_q_t *destq, rd_kafka_op_type_t type);
-rd_kafka_resp_err_t rd_kafka_op_err_destroy (rd_kafka_op_t *rko);
-
-rd_kafka_op_res_t rd_kafka_op_call (rd_kafka_t *rk,
-                                    rd_kafka_q_t *rkq, rd_kafka_op_t *rko)
-        RD_WARN_UNUSED_RESULT;
-
-rd_kafka_op_t *
-rd_kafka_op_new_fetch_msg (rd_kafka_msg_t **rkmp,
-                           rd_kafka_toppar_t *rktp,
-                           int32_t version,
-                           rd_kafka_buf_t *rkbuf,
-                           int64_t offset,
-                           size_t key_len, const void *key,
-                           size_t val_len, const void *val);
-
-void rd_kafka_op_throttle_time (struct rd_kafka_broker_s *rkb,
-				rd_kafka_q_t *rkq,
-				int throttle_time);
-
-
-rd_kafka_op_res_t
-rd_kafka_op_handle (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
-                    rd_kafka_q_cb_type_t cb_type, void *opaque,
-                    rd_kafka_q_serve_cb_t *callback) RD_WARN_UNUSED_RESULT;
-
-
-extern rd_atomic32_t rd_kafka_op_cnt;
-
-void rd_kafka_op_print (FILE *fp, const char *prefix, rd_kafka_op_t *rko);
-
-void rd_kafka_op_offset_store (rd_kafka_t *rk, rd_kafka_op_t *rko,
-			       const rd_kafka_message_t *rkmessage);