You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:52 UTC

[24/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade to librdkafka 0.11.4

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_msgset_writer.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_msgset_writer.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_msgset_writer.c
deleted file mode 100644
index 5faad84..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_msgset_writer.c
+++ /dev/null
@@ -1,1161 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2017 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rd.h"
-#include "rdkafka_int.h"
-#include "rdkafka_msg.h"
-#include "rdkafka_msgset.h"
-#include "rdkafka_topic.h"
-#include "rdkafka_partition.h"
-#include "rdkafka_lz4.h"
-
-#include "snappy.h"
-#include "rdvarint.h"
-#include "crc32c.h"
-
-
-typedef struct rd_kafka_msgset_writer_s {
-        rd_kafka_buf_t *msetw_rkbuf;     /* Backing store buffer (refcounted)*/
-
-        int16_t msetw_ApiVersion;        /* ProduceRequest ApiVersion */
-        int     msetw_MsgVersion;        /* MsgVersion to construct */
-        int     msetw_features;          /* Protocol features to use */
-        int     msetw_msgcntmax;         /* Max number of messages to send
-                                          * in a batch. */
-        size_t  msetw_messages_len;      /* Total size of Messages, without
-                                          * MessageSet header */
-
-        size_t  msetw_MessageSetSize;    /* Current MessageSetSize value */
-        size_t  msetw_of_MessageSetSize; /* offset of MessageSetSize */
-        size_t  msetw_of_start;          /* offset of MessageSet */
-
-        int     msetw_relative_offsets;  /* Bool: use relative offsets */
-
-        /* For MessageSet v2 */
-        int     msetw_Attributes;        /* MessageSet Attributes */
-        int64_t msetw_MaxTimestamp;      /* Maximum timestamp in batch */
-        size_t  msetw_of_CRC;            /* offset of MessageSet.CRC */
-
-        /* First message information */
-        struct {
-                size_t     of;  /* rkbuf's first message position */
-                int64_t    timestamp;
-        } msetw_firstmsg;
-
-        rd_kafka_broker_t *msetw_rkb;    /* @warning Not a refcounted
-                                          *          reference! */
-        rd_kafka_toppar_t *msetw_rktp;   /* @warning Not a refcounted
-                                          *          reference! */
-} rd_kafka_msgset_writer_t;
-
-
-
-/**
- * @brief Select ApiVersion and MsgVersion to use based on broker's
- *        feature compatibility.
- *
- * @locality broker thread
- */
-static RD_INLINE void
-rd_kafka_msgset_writer_select_MsgVersion (rd_kafka_msgset_writer_t *msetw) {
-        rd_kafka_broker_t *rkb = msetw->msetw_rkb;
-        int feature;
-
-        if ((feature = rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER2)) {
-                msetw->msetw_ApiVersion = 3;
-                msetw->msetw_MsgVersion = 2;
-                msetw->msetw_features |= feature;
-        } else if ((feature = rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER1)) {
-                msetw->msetw_ApiVersion = 2;
-                msetw->msetw_MsgVersion = 1;
-                msetw->msetw_features |= feature;
-        } else {
-                if ((feature =
-                     rkb->rkb_features & RD_KAFKA_FEATURE_THROTTLETIME)) {
-                        msetw->msetw_ApiVersion = 1;
-                        msetw->msetw_features |= feature;
-                } else
-                        msetw->msetw_ApiVersion = 0;
-                msetw->msetw_MsgVersion = 0;
-        }
-}
-
-
-/**
- * @brief Allocate buffer for messageset writer based on a previously set
- *        up \p msetw.
- *
- * Allocate iovecs to hold all headers and messages,
- * and allocate enough space to allow copies of small messages.
- * The allocated size is the minimum of message.max.bytes
- * or queued_bytes + msgcntmax * msg_overhead
- */
-static void
-rd_kafka_msgset_writer_alloc_buf (rd_kafka_msgset_writer_t *msetw) {
-        rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
-        size_t msg_overhead = 0;
-        size_t hdrsize = 0;
-        size_t msgsetsize = 0;
-        size_t bufsize;
-
-        rd_kafka_assert(NULL, !msetw->msetw_rkbuf);
-
-        /* Calculate worst-case buffer size, produce header size,
-         * message size, etc, this isn't critical but avoids unnecesary
-         * extra allocations. The buffer will grow as needed if we get
-         * this wrong.
-         *
-         * ProduceRequest headers go in one iovec:
-         *  ProduceRequest v0..2:
-         *    RequiredAcks + Timeout +
-         *    [Topic + [Partition + MessageSetSize]]
-         *
-         *  ProduceRequest v3:
-         *    TransactionalId + RequiredAcks + Timeout +
-         *    [Topic + [Partition + MessageSetSize + MessageSet]]
-         */
-
-        /*
-         * ProduceRequest header sizes
-         */
-        switch (msetw->msetw_ApiVersion)
-        {
-        case 3:
-                /* Add TransactionalId */
-                hdrsize += RD_KAFKAP_STR_SIZE(rk->rk_eos.TransactionalId);
-                /* FALLTHRU */
-        case 0:
-        case 1:
-        case 2:
-                hdrsize +=
-                        /* RequiredAcks + Timeout + TopicCnt */
-                        2 + 4 + 4 +
-                        /* Topic */
-                        RD_KAFKAP_STR_SIZE(msetw->msetw_rktp->
-                                           rktp_rkt->rkt_topic) +
-                        /* PartitionCnt + Partition + MessageSetSize */
-                        4 + 4 + 4;
-                msgsetsize += 4; /* MessageSetSize */
-                break;
-
-        default:
-                RD_NOTREACHED();
-        }
-
-        /*
-         * MsgVersion specific sizes:
-         * - (Worst-case) Message overhead: message fields
-         * - MessageSet header size
-         */
-        switch (msetw->msetw_MsgVersion)
-        {
-        case 0:
-                /* MsgVer0 */
-                msg_overhead = RD_KAFKAP_MESSAGE_V0_OVERHEAD;
-                break;
-        case 1:
-                /* MsgVer1 */
-                msg_overhead = RD_KAFKAP_MESSAGE_V1_OVERHEAD;
-                break;
-
-        case 2:
-                /* MsgVer2 uses varints, we calculate for the worst-case. */
-                msg_overhead += RD_KAFKAP_MESSAGE_V2_OVERHEAD;
-
-                /* MessageSet header fields */
-                msgsetsize +=
-                        8 /* BaseOffset */ +
-                        4 /* Length */ +
-                        4 /* PartitionLeaderEpoch */ +
-                        1 /* Magic (MsgVersion) */ +
-                        4 /* CRC (CRC32C) */ +
-                        2 /* Attributes */ +
-                        4 /* LastOffsetDelta */ +
-                        8 /* BaseTimestamp */ +
-                        8 /* MaxTimestamp */ +
-                        8 /* ProducerId */ +
-                        2 /* ProducerEpoch */ +
-                        4 /* BaseSequence */ +
-                        4 /* RecordCount */;
-                break;
-
-        default:
-                RD_NOTREACHED();
-        }
-
-        /*
-         * Calculate total buffer size to allocate
-         */
-        bufsize = hdrsize + msgsetsize;
-
-        /* If copying for small payloads is enabled, allocate enough
-         * space for each message to be copied based on this limit.
-         */
-        if (rk->rk_conf.msg_copy_max_size > 0) {
-                size_t queued_bytes = rd_kafka_msgq_size(&msetw->msetw_rktp->
-                                                         rktp_xmit_msgq);
-                bufsize += RD_MIN(queued_bytes,
-                                  (size_t)rk->rk_conf.msg_copy_max_size *
-                                  msetw->msetw_msgcntmax);
-        }
-
-        /* Add estimed per-message overhead */
-        bufsize += msg_overhead * msetw->msetw_msgcntmax;
-
-        /* Cap allocation at message.max.bytes */
-        if (bufsize > (size_t)rk->rk_conf.max_msg_size)
-                bufsize = (size_t)rk->rk_conf.max_msg_size;
-
-        /*
-         * Allocate iovecs to hold all headers and messages,
-         * and allocate auxilliery space for message headers, etc.
-         */
-        msetw->msetw_rkbuf =
-                rd_kafka_buf_new_request(msetw->msetw_rkb, RD_KAFKAP_Produce,
-                                         msetw->msetw_msgcntmax/2 + 10,
-                                         bufsize);
-
-        rd_kafka_buf_ApiVersion_set(msetw->msetw_rkbuf,
-                                    msetw->msetw_ApiVersion,
-                                    msetw->msetw_features);
-}
-
-
-/**
- * @brief Write the MessageSet header.
- * @remark Must only be called for MsgVersion 2
- */
-static void
-rd_kafka_msgset_writer_write_MessageSet_v2_header (
-        rd_kafka_msgset_writer_t *msetw) {
-        rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
-        rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
-
-        rd_kafka_assert(NULL, msetw->msetw_ApiVersion >= 3);
-        rd_kafka_assert(NULL, msetw->msetw_MsgVersion == 2);
-
-        /* BaseOffset (also store the offset to the start of
-         * the messageset header fields) */
-        msetw->msetw_of_start = rd_kafka_buf_write_i64(rkbuf, 0);
-
-        /* Length: updated later */
-        rd_kafka_buf_write_i32(rkbuf, 0);
-
-        /* PartitionLeaderEpoch (KIP-101) */
-        rd_kafka_buf_write_i32(rkbuf, 0);
-
-        /* Magic (MsgVersion) */
-        rd_kafka_buf_write_i8(rkbuf, msetw->msetw_MsgVersion);
-
-        /* CRC (CRC32C): updated later.
-         * CRC needs to be done after the entire messageset+messages has
-         * been constructed and the following header fields updated. :(
-         * Save the offset for this position. so it can be udpated later. */
-        msetw->msetw_of_CRC = rd_kafka_buf_write_i32(rkbuf, 0);
-
-        /* Attributes: updated later */
-        rd_kafka_buf_write_i16(rkbuf, 0);
-
-        /* LastOffsetDelta: updated later */
-        rd_kafka_buf_write_i32(rkbuf, 0);
-
-        /* BaseTimestamp: updated later */
-        rd_kafka_buf_write_i64(rkbuf, 0);
-
-        /* MaxTimestamp: updated later */
-        rd_kafka_buf_write_i64(rkbuf, 0);
-
-        /* ProducerId */
-        rd_kafka_buf_write_i64(rkbuf, rk->rk_eos.PID);
-
-        /* ProducerEpoch */
-        rd_kafka_buf_write_i16(rkbuf, rk->rk_eos.ProducerEpoch);
-
-        /* BaseSequence */
-        rd_kafka_buf_write_i32(rkbuf, -1);
-
-        /* RecordCount: udpated later */
-        rd_kafka_buf_write_i32(rkbuf, 0);
-
-}
-
-
-/**
- * @brief Write ProduceRequest headers.
- *        When this function returns the msgset is ready for
- *        writing individual messages.
- *        msetw_MessageSetSize will have been set to the messageset header.
- */
-static void
-rd_kafka_msgset_writer_write_Produce_header (rd_kafka_msgset_writer_t *msetw) {
-
-        rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
-        rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
-        rd_kafka_itopic_t *rkt = msetw->msetw_rktp->rktp_rkt;
-
-        /* V3: TransactionalId */
-        if (msetw->msetw_ApiVersion == 3)
-                rd_kafka_buf_write_kstr(rkbuf, rk->rk_eos.TransactionalId);
-
-        /* RequiredAcks */
-        rd_kafka_buf_write_i16(rkbuf, rkt->rkt_conf.required_acks);
-
-        /* Timeout */
-        rd_kafka_buf_write_i32(rkbuf, rkt->rkt_conf.request_timeout_ms);
-
-        /* TopicArrayCnt */
-        rd_kafka_buf_write_i32(rkbuf, 1);
-
-        /* Insert topic */
-        rd_kafka_buf_write_kstr(rkbuf, rkt->rkt_topic);
-
-        /* PartitionArrayCnt */
-        rd_kafka_buf_write_i32(rkbuf, 1);
-
-        /* Partition */
-        rd_kafka_buf_write_i32(rkbuf, msetw->msetw_rktp->rktp_partition);
-
-        /* MessageSetSize: Will be finalized later*/
-        msetw->msetw_of_MessageSetSize = rd_kafka_buf_write_i32(rkbuf, 0);
-
-        if (msetw->msetw_MsgVersion == 2) {
-                /* MessageSet v2 header */
-                rd_kafka_msgset_writer_write_MessageSet_v2_header(msetw);
-                msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V2_SIZE;
-        } else {
-                /* Older MessageSet */
-                msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V0_SIZE;
-        }
-}
-
-
-/**
- * @brief Initialize a ProduceRequest MessageSet writer for
- *        the given broker and partition.
- *
- *        A new buffer will be allocated to fit the pending messages in queue.
- *
- * @returns the number of messages to enqueue
- *
- * @remark This currently constructs the entire ProduceRequest, containing
- *         a single outer MessageSet for a single partition.
- */
-static int rd_kafka_msgset_writer_init (rd_kafka_msgset_writer_t *msetw,
-                                         rd_kafka_broker_t *rkb,
-                                         rd_kafka_toppar_t *rktp) {
-        int msgcnt = rd_atomic32_get(&rktp->rktp_xmit_msgq.rkmq_msg_cnt);
-
-        if (msgcnt == 0)
-                return 0;
-
-        memset(msetw, 0, sizeof(*msetw));
-
-        msetw->msetw_rktp = rktp;
-        msetw->msetw_rkb = rkb;
-
-        /* Max number of messages to send in a batch,
-         * limited by current queue size or configured batch size,
-         * whichever is lower. */
-        msetw->msetw_msgcntmax = RD_MIN(msgcnt,
-                                        rkb->rkb_rk->rk_conf.
-                                        batch_num_messages);
-        rd_dassert(msetw->msetw_msgcntmax > 0);
-
-        /* Select MsgVersion to use */
-        rd_kafka_msgset_writer_select_MsgVersion(msetw);
-
-        /* MsgVersion specific setup. */
-        switch (msetw->msetw_MsgVersion)
-        {
-        case 2:
-                msetw->msetw_relative_offsets = 1; /* OffsetDelta */
-                break;
-        case 1:
-                if (rktp->rktp_rkt->rkt_conf.compression_codec)
-                        msetw->msetw_relative_offsets = 1;
-                break;
-        }
-
-        /* Allocate backing buffer */
-        rd_kafka_msgset_writer_alloc_buf(msetw);
-
-        /* Construct first part of Produce header + MessageSet header */
-        rd_kafka_msgset_writer_write_Produce_header(msetw);
-
-        /* The current buffer position is now where the first message
-         * is located.
-         * Record the current buffer position so it can be rewound later
-         * in case of compression. */
-        msetw->msetw_firstmsg.of = rd_buf_write_pos(&msetw->msetw_rkbuf->
-                                                    rkbuf_buf);
-
-        return msetw->msetw_msgcntmax;
-}
-
-
-
-/**
- * @brief Copy or link message payload to buffer.
- */
-static RD_INLINE void
-rd_kafka_msgset_writer_write_msg_payload (rd_kafka_msgset_writer_t *msetw,
-                                          const rd_kafka_msg_t *rkm,
-                                          void (*free_cb)(void *)) {
-        const rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
-        rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
-
-        /* If payload is below the copy limit and there is still
-         * room in the buffer we'll copy the payload to the buffer,
-         * otherwise we push a reference to the memory. */
-        if (rkm->rkm_len <= (size_t)rk->rk_conf.msg_copy_max_size &&
-            rd_buf_write_remains(&rkbuf->rkbuf_buf) > rkm->rkm_len)
-                rd_kafka_buf_write(rkbuf,
-                                   rkm->rkm_payload, rkm->rkm_len);
-        else
-                rd_kafka_buf_push(rkbuf, rkm->rkm_payload, rkm->rkm_len,
-                                  free_cb);
-}
-
-
-/**
- * @brief Write message to messageset buffer with MsgVersion 0 or 1.
- * @returns the number of bytes written.
- */
-static size_t
-rd_kafka_msgset_writer_write_msg_v0_1 (rd_kafka_msgset_writer_t *msetw,
-                                       rd_kafka_msg_t *rkm,
-                                       int64_t Offset,
-                                       int8_t MsgAttributes,
-                                       void (*free_cb)(void *)) {
-        rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
-        size_t MessageSize;
-        size_t of_Crc;
-
-        /*
-         * MessageSet's (v0 and v1) per-Message header.
-         */
-
-        /* Offset (only relevant for compressed messages on MsgVersion v1) */
-        rd_kafka_buf_write_i64(rkbuf, Offset);
-
-        /* MessageSize */
-        MessageSize =
-                4 + 1 + 1 + /* Crc+MagicByte+Attributes */
-                4 /* KeyLength */ + rkm->rkm_key_len +
-                4 /* ValueLength */ + rkm->rkm_len;
-
-        if (msetw->msetw_MsgVersion == 1)
-                MessageSize += 8; /* Timestamp i64 */
-
-        rd_kafka_buf_write_i32(rkbuf, (int32_t)MessageSize);
-
-        /*
-         * Message
-         */
-        /* Crc: will be updated later */
-        of_Crc = rd_kafka_buf_write_i32(rkbuf, 0);
-
-        /* Start Crc calculation of all buf writes. */
-        rd_kafka_buf_crc_init(rkbuf);
-
-        /* MagicByte */
-        rd_kafka_buf_write_i8(rkbuf, msetw->msetw_MsgVersion);
-
-        /* Attributes */
-        rd_kafka_buf_write_i8(rkbuf, MsgAttributes);
-
-        /* V1: Timestamp */
-        if (msetw->msetw_MsgVersion == 1)
-                rd_kafka_buf_write_i64(rkbuf, rkm->rkm_timestamp);
-
-        /* Message Key */
-        rd_kafka_buf_write_bytes(rkbuf, rkm->rkm_key, rkm->rkm_key_len);
-
-        /* Write or copy Value/payload */
-        if (rkm->rkm_payload) {
-                rd_kafka_buf_write_i32(rkbuf, (int32_t)rkm->rkm_len);
-                rd_kafka_msgset_writer_write_msg_payload(msetw, rkm, free_cb);
-        } else
-                rd_kafka_buf_write_i32(rkbuf, RD_KAFKAP_BYTES_LEN_NULL);
-
-        /* Finalize Crc */
-        rd_kafka_buf_update_u32(rkbuf, of_Crc,
-                                rd_kafka_buf_crc_finalize(rkbuf));
-
-
-        /* Return written message size */
-        return 8/*Offset*/ + 4/*MessageSize*/ + MessageSize;
-}
-
-/**
- * @brief Write message to messageset buffer with MsgVersion 2.
- * @returns the number of bytes written.
- */
-static size_t
-rd_kafka_msgset_writer_write_msg_v2 (rd_kafka_msgset_writer_t *msetw,
-                                     rd_kafka_msg_t *rkm,
-                                     int64_t Offset,
-                                     int8_t MsgAttributes,
-                                     void (*free_cb)(void *)) {
-        rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
-        size_t MessageSize = 0;
-        char varint_Length[RD_UVARINT_ENC_SIZEOF(int32_t)];
-        char varint_TimestampDelta[RD_UVARINT_ENC_SIZEOF(int64_t)];
-        char varint_OffsetDelta[RD_UVARINT_ENC_SIZEOF(int64_t)];
-        char varint_KeyLen[RD_UVARINT_ENC_SIZEOF(int32_t)];
-        char varint_ValueLen[RD_UVARINT_ENC_SIZEOF(int32_t)];
-        char varint_HeaderCount[RD_UVARINT_ENC_SIZEOF(int32_t)];
-        size_t sz_Length;
-        size_t sz_TimestampDelta;
-        size_t sz_OffsetDelta;
-        size_t sz_KeyLen;
-        size_t sz_ValueLen;
-        size_t sz_HeaderCount;
-
-        /* All varints, except for Length, needs to be pre-built
-         * so that the Length field can be set correctly and thus have
-         * correct varint encoded width. */
-
-        sz_TimestampDelta = rd_uvarint_enc_i64(
-                varint_TimestampDelta, sizeof(varint_TimestampDelta),
-                rkm->rkm_timestamp - msetw->msetw_firstmsg.timestamp);
-        sz_OffsetDelta = rd_uvarint_enc_i64(
-                varint_OffsetDelta, sizeof(varint_OffsetDelta), Offset);
-        sz_KeyLen = rd_uvarint_enc_i32(
-                varint_KeyLen, sizeof(varint_KeyLen),
-                rkm->rkm_key ? (int32_t)rkm->rkm_key_len :
-                (int32_t)RD_KAFKAP_BYTES_LEN_NULL);
-        sz_ValueLen = rd_uvarint_enc_i32(
-                varint_ValueLen, sizeof(varint_ValueLen),
-                rkm->rkm_payload ? (int32_t)rkm->rkm_len :
-                (int32_t)RD_KAFKAP_BYTES_LEN_NULL);
-        sz_HeaderCount = rd_uvarint_enc_i32(
-                varint_HeaderCount, sizeof(varint_HeaderCount), 0);
-
-        /* Calculate MessageSize without length of Length (added later)
-         * to store it in Length. */
-        MessageSize =
-                1 /* MsgAttributes */ +
-                sz_TimestampDelta +
-                sz_OffsetDelta +
-                sz_KeyLen +
-                rkm->rkm_key_len +
-                sz_ValueLen +
-                rkm->rkm_len +
-                sz_HeaderCount;
-
-        /* Length */
-        sz_Length = rd_uvarint_enc_i64(varint_Length, sizeof(varint_Length),
-                                       MessageSize);
-        rd_kafka_buf_write(rkbuf, varint_Length, sz_Length);
-        MessageSize += sz_Length;
-
-        /* Attributes: The MsgAttributes argument is losely based on MsgVer0
-         *             which don't apply for MsgVer2 */
-        rd_kafka_buf_write_i8(rkbuf, 0);
-
-        /* TimestampDelta */
-        rd_kafka_buf_write(rkbuf, varint_TimestampDelta, sz_TimestampDelta);
-
-        /* OffsetDelta */
-        rd_kafka_buf_write(rkbuf, varint_OffsetDelta, sz_OffsetDelta);
-
-        /* KeyLen */
-        rd_kafka_buf_write(rkbuf, varint_KeyLen, sz_KeyLen);
-
-        /* Key (if any) */
-        if (rkm->rkm_key)
-                rd_kafka_buf_write(rkbuf, rkm->rkm_key, rkm->rkm_key_len);
-
-        /* ValueLen */
-        rd_kafka_buf_write(rkbuf, varint_ValueLen, sz_ValueLen);
-
-        /* Write or copy Value/payload */
-        if (rkm->rkm_payload)
-                rd_kafka_msgset_writer_write_msg_payload(msetw, rkm, free_cb);
-
-        /* HeaderCount (headers currently not implemented) */
-        rd_kafka_buf_write(rkbuf, varint_HeaderCount, sz_HeaderCount);
-
-        /* Return written message size */
-        return MessageSize;
-}
-
-
-/**
- * @brief Write message to messageset buffer.
- * @returns the number of bytes written.
- */
-static size_t
-rd_kafka_msgset_writer_write_msg (rd_kafka_msgset_writer_t *msetw,
-                                  rd_kafka_msg_t *rkm,
-                                  int64_t Offset, int8_t MsgAttributes,
-                                  void (*free_cb)(void *)) {
-        size_t outlen;
-        size_t (*writer[]) (rd_kafka_msgset_writer_t *,
-                            rd_kafka_msg_t *, int64_t, int8_t,
-                            void (*)(void *)) = {
-                [0] = rd_kafka_msgset_writer_write_msg_v0_1,
-                [1] = rd_kafka_msgset_writer_write_msg_v0_1,
-                [2] = rd_kafka_msgset_writer_write_msg_v2
-        };
-        size_t actual_written;
-        size_t pre_pos;
-
-        if (likely(rkm->rkm_timestamp))
-                MsgAttributes |= RD_KAFKA_MSG_ATTR_CREATE_TIME;
-
-        pre_pos = rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf);
-
-        outlen = writer[msetw->msetw_MsgVersion](msetw, rkm,
-                                                 Offset, MsgAttributes,
-                                                 free_cb);
-
-        actual_written = rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) -
-                pre_pos;
-        rd_assert(outlen <=
-                   rd_kafka_msg_wire_size(rkm, msetw->msetw_MsgVersion));
-        rd_assert(outlen == actual_written);
-
-        return outlen;
-
-}
-
-/**
- * @brief Write as many messages from the given message queue to
- *        the messageset.
- */
-static void
-rd_kafka_msgset_writer_write_msgq (rd_kafka_msgset_writer_t *msetw,
-                                   rd_kafka_msgq_t *rkmq) {
-        rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
-        rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
-        rd_kafka_broker_t *rkb = msetw->msetw_rkb;
-        size_t len = rd_buf_len(&msetw->msetw_rkbuf->rkbuf_buf);
-        size_t max_msg_size = (size_t)msetw->msetw_rkb->rkb_rk->
-                rk_conf.max_msg_size;
-        rd_ts_t int_latency_base;
-        rd_ts_t MaxTimestamp = 0;
-        rd_kafka_msg_t *rkm;
-        int msgcnt = 0;
-
-        /* Internal latency calculation base.
-         * Uses rkm_ts_timeout which is enqueue time + timeout */
-        int_latency_base = rd_clock() +
-                (rktp->rktp_rkt->rkt_conf.message_timeout_ms * 1000);
-
-        /* Acquire BaseTimestamp from first message. */
-        rkm = TAILQ_FIRST(&rkmq->rkmq_msgs);
-        rd_kafka_assert(NULL, rkm);
-        msetw->msetw_firstmsg.timestamp = rkm->rkm_timestamp;
-
-        /*
-         * Write as many messages as possible until buffer is full
-         * or limit reached.
-         */
-        do {
-                if (unlikely(msgcnt == msetw->msetw_msgcntmax ||
-                             len + rd_kafka_msg_wire_size(rkm, msetw->
-                                                          msetw_MsgVersion) >
-                             max_msg_size)) {
-                        rd_rkb_dbg(rkb, MSG, "PRODUCE",
-                                   "No more space in current MessageSet "
-                                   "(%i message(s), %"PRIusz" bytes)",
-                                   msgcnt, len);
-                        break;
-                }
-
-                /* Move message to buffer's queue */
-                rd_kafka_msgq_deq(rkmq, rkm, 1);
-                rd_kafka_msgq_enq(&rkbuf->rkbuf_msgq, rkm);
-
-                /* Add internal latency metrics */
-                rd_avg_add(&rkb->rkb_avg_int_latency,
-                           int_latency_base - rkm->rkm_ts_timeout);
-
-                /* MessageSet v2's .MaxTimestamp field */
-                if (unlikely(MaxTimestamp < rkm->rkm_timestamp))
-                        MaxTimestamp = rkm->rkm_timestamp;
-
-                /* Write message to buffer */
-                len += rd_kafka_msgset_writer_write_msg(msetw, rkm, msgcnt, 0,
-                                                        NULL);
-
-                rd_dassert(len <= max_msg_size);
-                msgcnt++;
-
-        } while ((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs)));
-
-        msetw->msetw_MaxTimestamp = MaxTimestamp;
-}
-
-
-#if WITH_ZLIB
-/**
- * @brief Compress messageset using gzip/zlib
- */
-static int
-rd_kafka_msgset_writer_compress_gzip (rd_kafka_msgset_writer_t *msetw,
-                                      rd_slice_t *slice,
-                                      struct iovec *ciov) {
-
-        rd_kafka_broker_t *rkb = msetw->msetw_rkb;
-        rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
-        z_stream strm;
-        size_t len = rd_slice_remains(slice);
-        const void *p;
-        size_t rlen;
-        int r;
-
-        memset(&strm, 0, sizeof(strm));
-        r = deflateInit2(&strm, Z_DEFAULT_COMPRESSION,
-                         Z_DEFLATED, 15+16,
-                         8, Z_DEFAULT_STRATEGY);
-        if (r != Z_OK) {
-                rd_rkb_log(rkb, LOG_ERR, "GZIP",
-                           "Failed to initialize gzip for "
-                           "compressing %"PRIusz" bytes in "
-                           "topic %.*s [%"PRId32"]: %s (%i): "
-                           "sending uncompressed",
-                           len,
-                           RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
-                           rktp->rktp_partition,
-                           strm.msg ? strm.msg : "", r);
-                return -1;
-        }
-
-        /* Calculate maximum compressed size and
-         * allocate an output buffer accordingly, being
-         * prefixed with the Message header. */
-        ciov->iov_len = deflateBound(&strm, (uLong)rd_slice_remains(slice));
-        ciov->iov_base = rd_malloc(ciov->iov_len);
-
-        strm.next_out  = (void *)ciov->iov_base;
-        strm.avail_out =   (uInt)ciov->iov_len;
-
-        /* Iterate through each segment and compress it. */
-        while ((rlen = rd_slice_reader(slice, &p))) {
-
-                strm.next_in  = (void *)p;
-                strm.avail_in =   (uInt)rlen;
-
-                /* Compress message */
-                if ((r = deflate(&strm, Z_NO_FLUSH) != Z_OK)) {
-                        rd_rkb_log(rkb, LOG_ERR, "GZIP",
-                                   "Failed to gzip-compress "
-                                   "%"PRIusz" bytes (%"PRIusz" total) for "
-                                   "topic %.*s [%"PRId32"]: "
-                                   "%s (%i): "
-                                   "sending uncompressed",
-                                   rlen, len,
-                                   RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
-                                   rktp->rktp_partition,
-                                   strm.msg ? strm.msg : "", r);
-                        deflateEnd(&strm);
-                        rd_free(ciov->iov_base);
-                        return -1;
-                }
-
-                rd_kafka_assert(rkb->rkb_rk, strm.avail_in == 0);
-        }
-
-        /* Finish the compression */
-        if ((r = deflate(&strm, Z_FINISH)) != Z_STREAM_END) {
-                rd_rkb_log(rkb, LOG_ERR, "GZIP",
-                           "Failed to finish gzip compression "
-                           " of %"PRIusz" bytes for "
-                           "topic %.*s [%"PRId32"]: "
-                           "%s (%i): "
-                           "sending uncompressed",
-                           len,
-                           RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
-                           rktp->rktp_partition,
-                           strm.msg ? strm.msg : "", r);
-                deflateEnd(&strm);
-                rd_free(ciov->iov_base);
-                return -1;
-        }
-
-        ciov->iov_len = strm.total_out;
-
-        /* Deinitialize compression */
-        deflateEnd(&strm);
-
-        return 0;
-}
-#endif
-
-
-#if WITH_SNAPPY
-/**
- * @brief Compress messageset using Snappy
- */
-static int
-rd_kafka_msgset_writer_compress_snappy (rd_kafka_msgset_writer_t *msetw,
-                                        rd_slice_t *slice, struct iovec *ciov) {
-        rd_kafka_broker_t *rkb = msetw->msetw_rkb;
-        rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
-        struct iovec *iov;
-        size_t iov_max, iov_cnt;
-        struct snappy_env senv;
-        size_t len = rd_slice_remains(slice);
-        int r;
-
-        /* Initialize snappy compression environment */
-        rd_kafka_snappy_init_env_sg(&senv, 1/*iov enable*/);
-
-        /* Calculate maximum compressed size and
-         * allocate an output buffer accordingly. */
-        ciov->iov_len = rd_kafka_snappy_max_compressed_length(len);
-        ciov->iov_base = rd_malloc(ciov->iov_len);
-
-        iov_max = slice->buf->rbuf_segment_cnt;
-        iov = rd_alloca(sizeof(*iov) * iov_max);
-
-        rd_slice_get_iov(slice, iov, &iov_cnt, iov_max, len);
-
-        /* Compress each message */
-        if ((r = rd_kafka_snappy_compress_iov(&senv, iov, iov_cnt, len,
-                                              ciov)) != 0) {
-                rd_rkb_log(rkb, LOG_ERR, "SNAPPY",
-                           "Failed to snappy-compress "
-                           "%"PRIusz" bytes for "
-                           "topic %.*s [%"PRId32"]: %s: "
-                           "sending uncompressed",
-                           len, RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
-                           rktp->rktp_partition,
-                           rd_strerror(-r));
-                rd_free(ciov->iov_base);
-                return -1;
-        }
-
-        /* rd_free snappy environment */
-        rd_kafka_snappy_free_env(&senv);
-
-        return 0;
-}
-#endif
-
-/**
- * @brief Compress messageset using LZ4F
- */
-static int
-rd_kafka_msgset_writer_compress_lz4 (rd_kafka_msgset_writer_t *msetw,
-                                     rd_slice_t *slice, struct iovec *ciov) {
-        rd_kafka_resp_err_t err;
-        err = rd_kafka_lz4_compress(msetw->msetw_rkb,
-                                    /* Correct or incorrect HC */
-                                    msetw->msetw_MsgVersion >= 1 ? 1 : 0,
-                                    slice, &ciov->iov_base, &ciov->iov_len);
-        return (err ? -1 : 0);
-}
-
-
-
-/**
- * @brief Compress the message set.
- * @param outlenp in: total uncompressed messages size,
- *                out (on success): returns the compressed buffer size.
- * @returns 0 on success or if -1 if compression failed.
- * @remark Compression failures are not critical, we'll just send the
- *         the messageset uncompressed.
- */
-static int
-rd_kafka_msgset_writer_compress (rd_kafka_msgset_writer_t *msetw,
-                                 size_t *outlenp) {
-        rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
-        rd_buf_t *rbuf = &msetw->msetw_rkbuf->rkbuf_buf;
-        rd_slice_t slice;
-        size_t len = *outlenp;
-        struct iovec ciov = RD_ZERO_INIT; /* Compressed output buffer */
-        int r = -1;
-        size_t outlen;
-
-        rd_assert(rd_buf_len(rbuf) >= msetw->msetw_firstmsg.of + len);
-
-        /* Create buffer slice from firstmsg and onwards */
-        r = rd_slice_init(&slice, rbuf, msetw->msetw_firstmsg.of, len);
-        rd_assert(r == 0 || !*"invalid firstmsg position");
-
-        switch (rktp->rktp_rkt->rkt_conf.compression_codec)
-        {
-#if WITH_ZLIB
-        case RD_KAFKA_COMPRESSION_GZIP:
-                r = rd_kafka_msgset_writer_compress_gzip(msetw, &slice, &ciov);
-                break;
-#endif
-
-#if WITH_SNAPPY
-        case RD_KAFKA_COMPRESSION_SNAPPY:
-                r = rd_kafka_msgset_writer_compress_snappy(msetw, &slice,
-                                                           &ciov);
-                break;
-#endif
-
-        case RD_KAFKA_COMPRESSION_LZ4:
-                /* Skip LZ4 compression if broker doesn't support it. */
-                if (!(msetw->msetw_rkb->rkb_features & RD_KAFKA_FEATURE_LZ4))
-                        return -1;
-
-                r = rd_kafka_msgset_writer_compress_lz4(msetw, &slice, &ciov);
-                break;
-
-
-        default:
-                rd_kafka_assert(NULL,
-                                !*"notreached: unsupported compression.codec");
-                break;
-        }
-
-        if (r == -1) /* Compression failed, send uncompressed */
-                return -1;
-
-
-        if (unlikely(ciov.iov_len > len)) {
-                /* If the compressed data is larger than the uncompressed size
-                 * then throw it away and send as uncompressed. */
-                rd_free(ciov.iov_base);
-                return -1;
-        }
-
-        /* Set compression codec in MessageSet.Attributes */
-        msetw->msetw_Attributes |= rktp->rktp_rkt->rkt_conf.compression_codec;
-
-        /* Rewind rkbuf to the pre-message checkpoint (firstmsg)
-         * and replace the original message(s) with the compressed payload,
-         * possibly with version dependent enveloping. */
-        rd_buf_write_seek(rbuf, msetw->msetw_firstmsg.of);
-
-        rd_kafka_assert(msetw->msetw_rkb->rkb_rk, ciov.iov_len < INT32_MAX);
-
-        if (msetw->msetw_MsgVersion == 2) {
-                /* MsgVersion 2 has no inner MessageSet header or wrapping
-                 * for compressed messages, just the messages back-to-back,
-                 * so we can push the compressed memory directly to the
-                 * buffer without wrapping it. */
-                rd_buf_push(rbuf, ciov.iov_base, ciov.iov_len, rd_free);
-                outlen = ciov.iov_len;
-
-        } else {
-                /* Older MessageSets envelope/wrap the compressed MessageSet
-                 * in an outer Message. */
-                rd_kafka_msg_t rkm = {
-                        .rkm_len       = ciov.iov_len,
-                        .rkm_payload   = ciov.iov_base,
-                        .rkm_timestamp = msetw->msetw_firstmsg.timestamp
-                };
-                outlen = rd_kafka_msgset_writer_write_msg(
-                        msetw, &rkm, 0,
-                        rktp->rktp_rkt->rkt_conf.compression_codec,
-                        rd_free/*free for ciov.iov_base*/);
-        }
-
-        *outlenp = outlen;
-
-        return 0;
-}
-
-
-
-
-/**
- * @brief Calculate MessageSet v2 CRC (CRC32C) when messageset is complete.
- */
-static void
-rd_kafka_msgset_writer_calc_crc_v2 (rd_kafka_msgset_writer_t *msetw) {
-        int32_t crc;
-        rd_slice_t slice;
-        int r;
-
-        r = rd_slice_init(&slice, &msetw->msetw_rkbuf->rkbuf_buf,
-                          msetw->msetw_of_CRC+4,
-                          rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) -
-                          msetw->msetw_of_CRC-4);
-       rd_assert(!r && *"slice_init failed");
-
-       /* CRC32C calculation */
-        crc = rd_slice_crc32c(&slice);
-
-        /* Update CRC at MessageSet v2 CRC offset */
-        rd_kafka_buf_update_i32(msetw->msetw_rkbuf, msetw->msetw_of_CRC, crc);
-}
-
-/**
- * @brief Finalize MessageSet v2 header fields.
- */
-static void
-rd_kafka_msgset_writer_finalize_MessageSet_v2_header (
-        rd_kafka_msgset_writer_t *msetw) {
-        rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
-        int msgcnt = rd_kafka_msgq_len(&rkbuf->rkbuf_msgq);
-
-        rd_kafka_assert(NULL, msgcnt > 0);
-        rd_kafka_assert(NULL, msetw->msetw_ApiVersion >= 3);
-
-        msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V2_SIZE +
-                msetw->msetw_messages_len;
-
-        /* MessageSet.Length is the same as
-         * MessageSetSize minus field widths for FirstOffset+Length */
-        rd_kafka_buf_update_i32(rkbuf, msetw->msetw_of_start +
-                                RD_KAFKAP_MSGSET_V2_OF_Length,
-                                (int32_t)msetw->msetw_MessageSetSize - (8+4));
-
-        msetw->msetw_Attributes |= RD_KAFKA_MSG_ATTR_CREATE_TIME;
-
-        rd_kafka_buf_update_i16(rkbuf, msetw->msetw_of_start +
-                                RD_KAFKAP_MSGSET_V2_OF_Attributes,
-                                msetw->msetw_Attributes);
-
-        rd_kafka_buf_update_i32(rkbuf, msetw->msetw_of_start +
-                                RD_KAFKAP_MSGSET_V2_OF_LastOffsetDelta,
-                                msgcnt-1);
-
-        rd_kafka_buf_update_i64(rkbuf, msetw->msetw_of_start +
-                                RD_KAFKAP_MSGSET_V2_OF_BaseTimestamp,
-                                msetw->msetw_firstmsg.timestamp);
-
-        rd_kafka_buf_update_i64(rkbuf, msetw->msetw_of_start +
-                                RD_KAFKAP_MSGSET_V2_OF_MaxTimestamp,
-                                msetw->msetw_MaxTimestamp);
-
-        rd_kafka_buf_update_i32(rkbuf, msetw->msetw_of_start +
-                                RD_KAFKAP_MSGSET_V2_OF_RecordCount, msgcnt);
-
-        rd_kafka_msgset_writer_calc_crc_v2(msetw);
-}
-
-
-
-
-/**
- * @brief Finalize the MessageSet header, if applicable.
- */
-static void
-rd_kafka_msgset_writer_finalize_MessageSet (rd_kafka_msgset_writer_t *msetw) {
-        rd_dassert(msetw->msetw_messages_len > 0);
-
-        if (msetw->msetw_MsgVersion == 2)
-                rd_kafka_msgset_writer_finalize_MessageSet_v2_header(msetw);
-        else
-                msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V0_SIZE +
-                        msetw->msetw_messages_len;
-
-        /* Update MessageSetSize */
-        rd_kafka_buf_update_i32(msetw->msetw_rkbuf,
-                                msetw->msetw_of_MessageSetSize,
-                                (int32_t)msetw->msetw_MessageSetSize);
-
-}
-
-
-/**
- * @brief Finalize the messageset - call when no more messages are to be
- *        added to the messageset.
- *
- *        Will compress, update final values, CRCs, etc.
- *
- *        The messageset writer is destroyed and the buffer is returned
- *        and ready to be transmitted.
- *
- * @param MessagetSetSizep will be set to the finalized MessageSetSize
- *
- * @returns the buffer to transmit or NULL if there were no messages
- *          in messageset.
- */
-static rd_kafka_buf_t *
-rd_kafka_msgset_writer_finalize (rd_kafka_msgset_writer_t *msetw,
-                                 size_t *MessageSetSizep) {
-        rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
-        rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
-        size_t len;
-        int cnt;
-
-        /* No messages added, bail out early. */
-        if (unlikely((cnt = rd_kafka_msgq_len(&rkbuf->rkbuf_msgq)) == 0)) {
-                rd_kafka_buf_destroy(rkbuf);
-                return NULL;
-        }
-
-        /* Total size of messages */
-        len = rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) -
-                msetw->msetw_firstmsg.of;
-        rd_assert(len > 0);
-        rd_assert(len <= (size_t)rktp->rktp_rkt->rkt_rk->rk_conf.max_msg_size);
-
-        /* Compress the message set */
-        if (rktp->rktp_rkt->rkt_conf.compression_codec)
-                rd_kafka_msgset_writer_compress(msetw, &len);
-
-        msetw->msetw_messages_len = len;
-
-        /* Finalize MessageSet header fields */
-        rd_kafka_msgset_writer_finalize_MessageSet(msetw);
-
-        /* Return final MessageSetSize */
-        *MessageSetSizep = msetw->msetw_MessageSetSize;
-
-        rd_rkb_dbg(msetw->msetw_rkb, MSG, "PRODUCE",
-                   "%s [%"PRId32"]: "
-                   "Produce MessageSet with %i message(s) (%"PRIusz" bytes, "
-                   "ApiVersion %d, MsgVersion %d)",
-                   rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
-                   cnt, msetw->msetw_MessageSetSize,
-                   msetw->msetw_ApiVersion, msetw->msetw_MsgVersion);
-
-
-        return rkbuf;
-}
-
-
-/**
- * @brief Create ProduceRequest containing as many messages from
- *        the toppar's transmit queue as possible, limited by configuration,
- *        size, etc.
- *
- * @param rkb broker to create buffer for
- * @param rktp toppar to transmit messages for
- * @param MessagetSetSizep will be set to the final MessageSetSize
- *
- * @returns the buffer to transmit or NULL if there were no messages
- *          in messageset.
- */
-rd_kafka_buf_t *
-rd_kafka_msgset_create_ProduceRequest (rd_kafka_broker_t *rkb,
-                                       rd_kafka_toppar_t *rktp,
-                                       size_t *MessageSetSizep) {
-
-        rd_kafka_msgset_writer_t msetw;
-
-        if (rd_kafka_msgset_writer_init(&msetw, rkb, rktp) == 0)
-                return NULL;
-
-        rd_kafka_msgset_writer_write_msgq(&msetw, &rktp->rktp_xmit_msgq);
-
-        return rd_kafka_msgset_writer_finalize(&msetw, MessageSetSizep);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_offset.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_offset.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_offset.c
deleted file mode 100644
index b586988..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_offset.c
+++ /dev/null
@@ -1,1139 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012,2013 Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-// FIXME: Revise this documentation:
-/**
- * This file implements the consumer offset storage.
- * It currently supports local file storage and broker OffsetCommit storage,
- * not zookeeper.
- *
- * Regardless of commit method (file, broker, ..) this is how it works:
- *  - When rdkafka, or the application, depending on if auto.offset.commit
- *    is enabled or not, calls rd_kafka_offset_store() with an offset to store,
- *    all it does is set rktp->rktp_stored_offset to this value.
- *    This can happen from any thread and is locked by the rktp lock.
- *  - The actual commit/write of the offset to its backing store (filesystem)
- *    is performed by the main rdkafka thread and scheduled at the configured
- *    auto.commit.interval.ms interval.
- *  - The write is performed in the main rdkafka thread (in a blocking manner
- *    for file based offsets) and once the write has
- *    succeeded rktp->rktp_committed_offset is updated to the new value.
- *  - If offset.store.sync.interval.ms is configured the main rdkafka thread
- *    will also make sure to fsync() each offset file accordingly. (file)
- */
-
-
-#include "rdkafka_int.h"
-#include "rdkafka_topic.h"
-#include "rdkafka_partition.h"
-#include "rdkafka_offset.h"
-#include "rdkafka_broker.h"
-
-#include <stdio.h>
-#include <sys/types.h>
-#include <fcntl.h>
-
-#ifdef _MSC_VER
-#include <io.h>
-#include <share.h>
-#include <sys/stat.h>
-#include <Shlwapi.h>
-typedef int mode_t;
-#endif
-
-
-/**
- * Convert an absolute or logical offset to string.
- */
-const char *rd_kafka_offset2str (int64_t offset) {
-        static RD_TLS char ret[16][32];
-        static RD_TLS int i = 0;
-
-        i = (i + 1) % 16;
-
-        if (offset >= 0)
-                rd_snprintf(ret[i], sizeof(ret[i]), "%"PRId64, offset);
-        else if (offset == RD_KAFKA_OFFSET_BEGINNING)
-                return "BEGINNING";
-        else if (offset == RD_KAFKA_OFFSET_END)
-                return "END";
-        else if (offset == RD_KAFKA_OFFSET_STORED)
-                return "STORED";
-        else if (offset == RD_KAFKA_OFFSET_INVALID)
-                return "INVALID";
-        else if (offset <= RD_KAFKA_OFFSET_TAIL_BASE)
-                rd_snprintf(ret[i], sizeof(ret[i]), "TAIL(%lld)",
-			    llabs(offset - RD_KAFKA_OFFSET_TAIL_BASE));
-        else
-                rd_snprintf(ret[i], sizeof(ret[i]), "%"PRId64"?", offset);
-
-        return ret[i];
-}
-
-static void rd_kafka_offset_file_close (rd_kafka_toppar_t *rktp) {
-	if (!rktp->rktp_offset_fp)
-		return;
-
-	fclose(rktp->rktp_offset_fp);
-	rktp->rktp_offset_fp = NULL;
-}
-
-
-#ifndef _MSC_VER
-/**
- * Linux version of open callback providing racefree CLOEXEC.
- */
-int rd_kafka_open_cb_linux (const char *pathname, int flags, mode_t mode,
-                            void *opaque) {
-#ifdef O_CLOEXEC
-        return open(pathname, flags|O_CLOEXEC, mode);
-#else
-        return rd_kafka_open_cb_generic(pathname, flags, mode, opaque);
-#endif
-}
-#endif
-
-/**
- * Fallback version of open_cb NOT providing racefree CLOEXEC,
- * but setting CLOEXEC after file open (if FD_CLOEXEC is defined).
- */
-int rd_kafka_open_cb_generic (const char *pathname, int flags, mode_t mode,
-                              void *opaque) {
-#ifndef _MSC_VER
-	int fd;
-        int on = 1;
-        fd = open(pathname, flags, mode);
-        if (fd == -1)
-                return -1;
-#ifdef FD_CLOEXEC
-        fcntl(fd, F_SETFD, FD_CLOEXEC, &on);
-#endif
-        return fd;
-#else
-	int fd;
-	if (_sopen_s(&fd, pathname, flags, _SH_DENYNO, mode) != 0)
-		return -1;
-	return fd;
-#endif
-}
-
-
-static int rd_kafka_offset_file_open (rd_kafka_toppar_t *rktp) {
-        rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
-        int fd;
-
-#ifndef _MSC_VER
-	mode_t mode = 0644;
-#else
-	mode_t mode = _S_IREAD|_S_IWRITE;
-#endif
-	if ((fd = rk->rk_conf.open_cb(rktp->rktp_offset_path,
-                                      O_CREAT|O_RDWR, mode,
-                                      rk->rk_conf.opaque)) == -1) {
-		rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
-				RD_KAFKA_RESP_ERR__FS,
-				"%s [%"PRId32"]: "
-				"Failed to open offset file %s: %s",
-				rktp->rktp_rkt->rkt_topic->str,
-				rktp->rktp_partition,
-				rktp->rktp_offset_path, rd_strerror(errno));
-		return -1;
-	}
-
-	rktp->rktp_offset_fp =
-#ifndef _MSC_VER
-		fdopen(fd, "r+");
-#else
-		_fdopen(fd, "r+");
-#endif
-
-	return 0;
-}
-
-
-static int64_t rd_kafka_offset_file_read (rd_kafka_toppar_t *rktp) {
-	char buf[22];
-	char *end;
-	int64_t offset;
-	size_t r;
-
-	if (fseek(rktp->rktp_offset_fp, 0, SEEK_SET) == -1) {
-		rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
-				RD_KAFKA_RESP_ERR__FS,
-				"%s [%"PRId32"]: "
-				"Seek (for read) failed on offset file %s: %s",
-				rktp->rktp_rkt->rkt_topic->str,
-				rktp->rktp_partition,
-				rktp->rktp_offset_path,
-				rd_strerror(errno));
-		rd_kafka_offset_file_close(rktp);
-		return RD_KAFKA_OFFSET_INVALID;
-	}
-
-	r = fread(buf, 1, sizeof(buf) - 1, rktp->rktp_offset_fp);
-	if (r == 0) {
-		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
-			     "%s [%"PRId32"]: offset file (%s) is empty",
-			     rktp->rktp_rkt->rkt_topic->str,
-			     rktp->rktp_partition,
-			     rktp->rktp_offset_path);
-		return RD_KAFKA_OFFSET_INVALID;
-	}
-
-	buf[r] = '\0';
-
-	offset = strtoull(buf, &end, 10);
-	if (buf == end) {
-		rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
-				RD_KAFKA_RESP_ERR__FS,
-				"%s [%"PRId32"]: "
-				"Unable to parse offset in %s",
-				rktp->rktp_rkt->rkt_topic->str,
-				rktp->rktp_partition,
-				rktp->rktp_offset_path);
-		return RD_KAFKA_OFFSET_INVALID;
-	}
-
-
-	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
-		     "%s [%"PRId32"]: Read offset %"PRId64" from offset "
-		     "file (%s)",
-		     rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
-		     offset, rktp->rktp_offset_path);
-
-	return offset;
-}
-
-
-/**
- * Sync/flush offset file.
- */
-static int rd_kafka_offset_file_sync (rd_kafka_toppar_t *rktp) {
-        if (!rktp->rktp_offset_fp)
-                return 0;
-
-        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "SYNC",
-                     "%s [%"PRId32"]: offset file sync",
-                     rktp->rktp_rkt->rkt_topic->str,
-                     rktp->rktp_partition);
-
-#ifndef _MSC_VER
-	(void)fflush(rktp->rktp_offset_fp);
-	(void)fsync(fileno(rktp->rktp_offset_fp)); // FIXME
-#else
-	// FIXME
-	// FlushFileBuffers(_get_osfhandle(fileno(rktp->rktp_offset_fp)));
-#endif
-	return 0;
-}
-
-
-/**
- * Write offset to offset file.
- *
- * Locality: toppar's broker thread
- */
-static rd_kafka_resp_err_t
-rd_kafka_offset_file_commit (rd_kafka_toppar_t *rktp) {
-	rd_kafka_itopic_t *rkt = rktp->rktp_rkt;
-	int attempt;
-        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
-        int64_t offset = rktp->rktp_stored_offset;
-
-	for (attempt = 0 ; attempt < 2 ; attempt++) {
-		char buf[22];
-		int len;
-
-		if (!rktp->rktp_offset_fp)
-			if (rd_kafka_offset_file_open(rktp) == -1)
-				continue;
-
-		if (fseek(rktp->rktp_offset_fp, 0, SEEK_SET) == -1) {
-			rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
-					RD_KAFKA_RESP_ERR__FS,
-					"%s [%"PRId32"]: "
-					"Seek failed on offset file %s: %s",
-					rktp->rktp_rkt->rkt_topic->str,
-					rktp->rktp_partition,
-					rktp->rktp_offset_path,
-					rd_strerror(errno));
-                        err = RD_KAFKA_RESP_ERR__FS;
-			rd_kafka_offset_file_close(rktp);
-			continue;
-		}
-
-		len = rd_snprintf(buf, sizeof(buf), "%"PRId64"\n", offset);
-
-		if (fwrite(buf, 1, len, rktp->rktp_offset_fp) < 1) {
-			rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
-					RD_KAFKA_RESP_ERR__FS,
-					"%s [%"PRId32"]: "
-					"Failed to write offset %"PRId64" to "
-					"offset file %s: %s",
-					rktp->rktp_rkt->rkt_topic->str,
-					rktp->rktp_partition,
-					offset,
-					rktp->rktp_offset_path,
-					rd_strerror(errno));
-                        err = RD_KAFKA_RESP_ERR__FS;
-			rd_kafka_offset_file_close(rktp);
-			continue;
-		}
-
-                /* Need to flush before truncate to preserve write ordering */
-                (void)fflush(rktp->rktp_offset_fp);
-
-		/* Truncate file */
-#ifdef _MSC_VER
-		if (_chsize_s(_fileno(rktp->rktp_offset_fp), len) == -1)
-			; /* Ignore truncate failures */
-#else
-		if (ftruncate(fileno(rktp->rktp_offset_fp), len) == -1)
-			; /* Ignore truncate failures */
-#endif
-		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
-			     "%s [%"PRId32"]: wrote offset %"PRId64" to "
-			     "file %s",
-			     rktp->rktp_rkt->rkt_topic->str,
-			     rktp->rktp_partition, offset,
-			     rktp->rktp_offset_path);
-
-		rktp->rktp_committed_offset = offset;
-
-		/* If sync interval is set to immediate we sync right away. */
-		if (rkt->rkt_conf.offset_store_sync_interval_ms == 0)
-			rd_kafka_offset_file_sync(rktp);
-
-
-		return RD_KAFKA_RESP_ERR_NO_ERROR;
-	}
-
-
-	return err;
-}
-
-
-/**
- * Enqueue offset_commit_cb op, if configured.
- *
- */
-void rd_kafka_offset_commit_cb_op (rd_kafka_t *rk,
-				   rd_kafka_resp_err_t err,
-				   const rd_kafka_topic_partition_list_t *offsets) {
-	rd_kafka_op_t *rko;
-
-        if (!(rk->rk_conf.enabled_events & RD_KAFKA_EVENT_OFFSET_COMMIT))
-		return;
-
-	rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT|RD_KAFKA_OP_REPLY);
-        rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
-	rko->rko_err = err;
-	rko->rko_u.offset_commit.cb = rk->rk_conf.offset_commit_cb;/*maybe NULL*/
-	rko->rko_u.offset_commit.opaque = rk->rk_conf.opaque;
-	if (offsets)
-		rko->rko_u.offset_commit.partitions =
-			rd_kafka_topic_partition_list_copy(offsets);
-	rd_kafka_q_enq(rk->rk_rep, rko);
-}
-
-
-
-
-/**
- * Commit a list of offsets asynchronously. Response will be queued on 'replyq'.
- * Optional \p cb will be set on requesting op.
- *
- * Makes a copy of \p offsets (may be NULL for current assignment)
- */
-static rd_kafka_resp_err_t
-rd_kafka_commit0 (rd_kafka_t *rk,
-                  const rd_kafka_topic_partition_list_t *offsets,
-		  rd_kafka_toppar_t *rktp,
-                  rd_kafka_replyq_t replyq,
-		  void (*cb) (rd_kafka_t *rk,
-			      rd_kafka_resp_err_t err,
-			      rd_kafka_topic_partition_list_t *offsets,
-			      void *opaque),
-		  void *opaque,
-                  const char *reason) {
-        rd_kafka_cgrp_t *rkcg;
-        rd_kafka_op_t *rko;
-
-        if (!(rkcg = rd_kafka_cgrp_get(rk)))
-                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
-        rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT);
-        rko->rko_u.offset_commit.reason = rd_strdup(reason);
-	rko->rko_replyq = replyq;
-	rko->rko_u.offset_commit.cb = cb;
-	rko->rko_u.offset_commit.opaque = opaque;
-	if (rktp)
-		rko->rko_rktp = rd_kafka_toppar_keep(rktp);
-
-        if (offsets)
-		rko->rko_u.offset_commit.partitions =
-                        rd_kafka_topic_partition_list_copy(offsets);
-
-        rd_kafka_q_enq(rkcg->rkcg_ops, rko);
-
-        return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-
-/**
- * NOTE: 'offsets' may be NULL, see official documentation.
- */
-rd_kafka_resp_err_t
-rd_kafka_commit (rd_kafka_t *rk,
-                 const rd_kafka_topic_partition_list_t *offsets, int async) {
-        rd_kafka_cgrp_t *rkcg;
-	rd_kafka_resp_err_t err;
-	rd_kafka_q_t *repq = NULL;
-	rd_kafka_replyq_t rq = RD_KAFKA_NO_REPLYQ;
-
-        if (!(rkcg = rd_kafka_cgrp_get(rk)))
-                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
-        if (!async)
-                repq = rd_kafka_q_new(rk);
-
-        if (!async) 
-		rq = RD_KAFKA_REPLYQ(repq, 0);
- 
-        err = rd_kafka_commit0(rk, offsets, NULL, rq, NULL, NULL, "manual");
-
-        if (!err && !async) {
-		err = rd_kafka_q_wait_result(repq, RD_POLL_INFINITE);
-		rd_kafka_q_destroy(repq);
-        }
-
-	return err;
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
-                         int async) {
-        rd_kafka_topic_partition_list_t *offsets;
-        rd_kafka_topic_partition_t *rktpar;
-        rd_kafka_resp_err_t err;
-
-        if (rkmessage->err)
-                return RD_KAFKA_RESP_ERR__INVALID_ARG;
-
-        offsets = rd_kafka_topic_partition_list_new(1);
-        rktpar = rd_kafka_topic_partition_list_add(
-                offsets, rd_kafka_topic_name(rkmessage->rkt),
-                rkmessage->partition);
-        rktpar->offset = rkmessage->offset+1;
-
-        err = rd_kafka_commit(rk, offsets, async);
-
-        rd_kafka_topic_partition_list_destroy(offsets);
-
-        return err;
-}
-
-
-
-rd_kafka_resp_err_t
-rd_kafka_commit_queue (rd_kafka_t *rk,
-		       const rd_kafka_topic_partition_list_t *offsets,
-		       rd_kafka_queue_t *rkqu,
-		       void (*cb) (rd_kafka_t *rk,
-				   rd_kafka_resp_err_t err,
-				   rd_kafka_topic_partition_list_t *offsets,
-				   void *opaque),
-		       void *opaque) {
-	rd_kafka_q_t *rkq;
-	rd_kafka_resp_err_t err;
-
-        if (!rd_kafka_cgrp_get(rk))
-                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
-	if (rkqu)
-		rkq = rkqu->rkqu_q;
-	else
-		rkq = rd_kafka_q_new(rk);
-
-	err = rd_kafka_commit0(rk, offsets, NULL,
-			       RD_KAFKA_REPLYQ(rkq, 0),
-			       cb, opaque, "manual");
-
-	if (!rkqu) {
-                rd_kafka_op_t *rko =
-                        rd_kafka_q_pop_serve(rkq, RD_POLL_INFINITE,
-                                             0, RD_KAFKA_Q_CB_FORCE_RETURN,
-                                             NULL, NULL);
-		if (!rko)
-			err = RD_KAFKA_RESP_ERR__TIMED_OUT;
-		else {
-                        if (cb)
-                                cb(rk, rko->rko_err,
-                                   rko->rko_u.offset_commit.partitions,
-                                   opaque);
-                        err = rko->rko_err;
-                        rd_kafka_op_destroy(rko);
-                }
-
-                rd_kafka_q_destroy(rkq);
-	}
-
-	return err;
-}
-
-
-
-
-/**
- * Called when a broker commit is done.
- *
- * Locality: toppar handler thread
- * Locks: none
- */
-static void
-rd_kafka_offset_broker_commit_cb (rd_kafka_t *rk,
-				  rd_kafka_resp_err_t err,
-				  rd_kafka_topic_partition_list_t *offsets,
-				  void *opaque) {
-        shptr_rd_kafka_toppar_t *s_rktp;
-        rd_kafka_toppar_t *rktp;
-        rd_kafka_topic_partition_t *rktpar;
-
-        if (offsets->cnt == 0) {
-                rd_kafka_dbg(rk, TOPIC, "OFFSETCOMMIT",
-                             "No offsets to commit (commit_cb)");
-                return;
-        }
-
-        rktpar = &offsets->elems[0];
-
-        if (!(s_rktp = rd_kafka_topic_partition_list_get_toppar(rk, rktpar))) {
-		rd_kafka_dbg(rk, TOPIC, "OFFSETCOMMIT",
-			     "No local partition found for %s [%"PRId32"] "
-			     "while parsing OffsetCommit response "
-			     "(offset %"PRId64", error \"%s\")",
-			     rktpar->topic,
-			     rktpar->partition,
-			     rktpar->offset,
-			     rd_kafka_err2str(rktpar->err));
-                return;
-        }
-
-        rktp = rd_kafka_toppar_s2i(s_rktp);
-
-        if (!err)
-                err = rktpar->err;
-
-	rd_kafka_toppar_offset_commit_result(rktp, err, offsets);
-
-        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
-                     "%s [%"PRId32"]: offset %"PRId64" committed: %s",
-                     rktp->rktp_rkt->rkt_topic->str,
-                     rktp->rktp_partition, rktpar->offset,
-                     rd_kafka_err2str(err));
-
-        rktp->rktp_committing_offset = 0;
-
-        rd_kafka_toppar_lock(rktp);
-        if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING)
-                rd_kafka_offset_store_term(rktp, err);
-        rd_kafka_toppar_unlock(rktp);
-
-        rd_kafka_toppar_destroy(s_rktp);
-}
-
-
-static rd_kafka_resp_err_t
-rd_kafka_offset_broker_commit (rd_kafka_toppar_t *rktp, const char *reason) {
-        rd_kafka_topic_partition_list_t *offsets;
-        rd_kafka_topic_partition_t *rktpar;
-
-        rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_cgrp != NULL);
-        rd_kafka_assert(rktp->rktp_rkt->rkt_rk,
-                        rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE);
-
-        rktp->rktp_committing_offset = rktp->rktp_stored_offset;
-
-        offsets = rd_kafka_topic_partition_list_new(1);
-        rktpar = rd_kafka_topic_partition_list_add(
-                offsets, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
-        rktpar->offset = rktp->rktp_committing_offset;
-
-        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSETCMT",
-                     "%.*s [%"PRId32"]: committing offset %"PRId64": %s",
-                     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
-                     rktp->rktp_partition, rktp->rktp_committing_offset,
-                     reason);
-
-        rd_kafka_commit0(rktp->rktp_rkt->rkt_rk, offsets, rktp,
-			 RD_KAFKA_REPLYQ(rktp->rktp_ops, 0),
-			 rd_kafka_offset_broker_commit_cb, NULL,
-                         reason);
-
-        rd_kafka_topic_partition_list_destroy(offsets);
-
-        return RD_KAFKA_RESP_ERR__IN_PROGRESS;
-}
-
-
-
-
-/**
- * Commit offset to backing store.
- * This might be an async operation.
- *
- * Locality: toppar handler thread
- */
-static
-rd_kafka_resp_err_t rd_kafka_offset_commit (rd_kafka_toppar_t *rktp,
-                                            const char *reason) {
-        if (1)  // FIXME
-        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
-		     "%s [%"PRId32"]: commit: "
-		     "stored offset %"PRId64" > committed offset %"PRId64"?",
-		     rktp->rktp_rkt->rkt_topic->str,
-		     rktp->rktp_partition,
-		     rktp->rktp_stored_offset, rktp->rktp_committed_offset);
-
-        /* Already committed */
-        if (rktp->rktp_stored_offset <= rktp->rktp_committed_offset)
-                return RD_KAFKA_RESP_ERR_NO_ERROR;
-
-        /* Already committing (for async ops) */
-        if (rktp->rktp_stored_offset <= rktp->rktp_committing_offset)
-                return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
-
-        switch (rktp->rktp_rkt->rkt_conf.offset_store_method)
-        {
-        case RD_KAFKA_OFFSET_METHOD_FILE:
-                return rd_kafka_offset_file_commit(rktp);
-        case RD_KAFKA_OFFSET_METHOD_BROKER:
-                return rd_kafka_offset_broker_commit(rktp, reason);
-        default:
-                /* UNREACHABLE */
-                return RD_KAFKA_RESP_ERR__INVALID_ARG;
-        }
-}
-
-
-
-
-
-/**
- * Sync offset backing store. This is only used for METHOD_FILE.
- *
- * Locality: rktp's broker thread.
- */
-rd_kafka_resp_err_t rd_kafka_offset_sync (rd_kafka_toppar_t *rktp) {
-        switch (rktp->rktp_rkt->rkt_conf.offset_store_method)
-        {
-        case RD_KAFKA_OFFSET_METHOD_FILE:
-                return rd_kafka_offset_file_sync(rktp);
-        default:
-                return RD_KAFKA_RESP_ERR__INVALID_ARG;
-        }
-}
-
-
-/**
- * Store offset.
- * Typically called from application code.
- *
- * NOTE: No locks must be held.
- */
-rd_kafka_resp_err_t rd_kafka_offset_store (rd_kafka_topic_t *app_rkt,
-					   int32_t partition, int64_t offset) {
-        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
-	shptr_rd_kafka_toppar_t *s_rktp;
-
-	/* Find toppar */
-	rd_kafka_topic_rdlock(rkt);
-	if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*!ua_on_miss*/))) {
-		rd_kafka_topic_rdunlock(rkt);
-		return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
-	}
-	rd_kafka_topic_rdunlock(rkt);
-
-	rd_kafka_offset_store0(rd_kafka_toppar_s2i(s_rktp), offset+1,
-                               1/*lock*/);
-
-	rd_kafka_toppar_destroy(s_rktp);
-
-	return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_offsets_store (rd_kafka_t *rk,
-                        rd_kafka_topic_partition_list_t *offsets) {
-        int i;
-        int ok_cnt = 0;
-
-        for (i = 0 ; i < offsets->cnt ; i++) {
-                rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
-                shptr_rd_kafka_toppar_t *s_rktp;
-
-                s_rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar);
-                if (!s_rktp) {
-                        rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
-                        continue;
-                }
-
-                rd_kafka_offset_store0(rd_kafka_toppar_s2i(s_rktp),
-                                       rktpar->offset, 1/*lock*/);
-                rd_kafka_toppar_destroy(s_rktp);
-
-                rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
-                ok_cnt++;
-        }
-
-        return offsets->cnt > 0 && ok_cnt < offsets->cnt ?
-                RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION :
-                RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-
-
-
-/**
- * Decommissions the use of an offset file for a toppar.
- * The file content will not be touched and the file will not be removed.
- */
-static rd_kafka_resp_err_t rd_kafka_offset_file_term (rd_kafka_toppar_t *rktp) {
-        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
-
-        /* Sync offset file if the sync is intervalled (> 0) */
-        if (rktp->rktp_rkt->rkt_conf.offset_store_sync_interval_ms > 0) {
-                rd_kafka_offset_file_sync(rktp);
-		rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
-				    &rktp->rktp_offset_sync_tmr, 1/*lock*/);
-	}
-
-
-	rd_kafka_offset_file_close(rktp);
-
-	rd_free(rktp->rktp_offset_path);
-	rktp->rktp_offset_path = NULL;
-
-        return err;
-}
-
-static rd_kafka_op_res_t
-rd_kafka_offset_reset_op_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq,
-                             rd_kafka_op_t *rko) {
-	rd_kafka_toppar_t *rktp =
-		rd_kafka_toppar_s2i(rko->rko_rktp);
-	rd_kafka_toppar_lock(rktp);
-        rd_kafka_offset_reset(rktp,
-                              rko->rko_u.offset_reset.offset,
-                              rko->rko_err, rko->rko_u.offset_reset.reason);
-	rd_kafka_toppar_unlock(rktp);
-        return RD_KAFKA_OP_RES_HANDLED;
-}
-
-/**
- * Take action when the offset for a toppar becomes unusable.
- *
- * Locality: toppar handler thread
- * Locks: toppar_lock() MUST be held
- */
-void rd_kafka_offset_reset (rd_kafka_toppar_t *rktp, int64_t err_offset,
-			    rd_kafka_resp_err_t err, const char *reason) {
-	int64_t offset = RD_KAFKA_OFFSET_INVALID;
-	rd_kafka_op_t *rko;
-
-        /* Enqueue op for toppar handler thread if we're on the wrong thread. */
-        if (!thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread)) {
-                rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_RESET |
-						     RD_KAFKA_OP_CB);
-                rko->rko_op_cb = rd_kafka_offset_reset_op_cb;
-                rko->rko_err = err;
-                rko->rko_rktp = rd_kafka_toppar_keep(rktp);
-                rko->rko_u.offset_reset.offset = err_offset;
-		rko->rko_u.offset_reset.reason = rd_strdup(reason);
-                rd_kafka_q_enq(rktp->rktp_ops, rko);
-                return;
-        }
-
-	if (err_offset == RD_KAFKA_OFFSET_INVALID || err)
-		offset = rktp->rktp_rkt->rkt_conf.auto_offset_reset;
-	else
-		offset = err_offset;
-
-	if (offset == RD_KAFKA_OFFSET_INVALID) {
-		/* Error, auto.offset.reset tells us to error out. */
-		rko = rd_kafka_op_new(RD_KAFKA_OP_CONSUMER_ERR);
-
-		rko->rko_err               = err;
-		rko->rko_u.err.offset      = err_offset;
-		rko->rko_u.err.errstr      = rd_strdup(reason);
-                rko->rko_rktp        = rd_kafka_toppar_keep(rktp);
-
-		rd_kafka_q_enq(rktp->rktp_fetchq, rko);
-                rd_kafka_toppar_set_fetch_state(
-			rktp, RD_KAFKA_TOPPAR_FETCH_NONE);
-
-	} else {
-		/* Query logical offset */
-		rktp->rktp_query_offset = offset;
-                rd_kafka_toppar_set_fetch_state(
-			rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
-	}
-
-	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
-		     "%s [%"PRId32"]: offset reset (at offset %s) "
-		     "to %s: %s: %s",
-		     rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
-		     rd_kafka_offset2str(err_offset),
-                     rd_kafka_offset2str(offset),
-                     reason, rd_kafka_err2str(err));
-
-	if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
-		rd_kafka_toppar_offset_request(rktp, rktp->rktp_query_offset, 0);
-}
-
-
-/**
- * Escape any special characters in filename 'in' and write escaped
- * string to 'out' (of max size out_size).
- */
-static char *mk_esc_filename (const char *in, char *out, size_t out_size) {
-        const char *s = in;
-        char *o = out;
-
-        while (*s) {
-                const char *esc;
-                size_t esclen;
-
-                switch (*s)
-                {
-                case '/': /* linux */
-                        esc = "%2F";
-                        esclen = strlen(esc);
-                        break;
-                case ':': /* osx, windows */
-                        esc = "%3A";
-                        esclen = strlen(esc);
-                        break;
-                case '\\': /* windows */
-                        esc = "%5C";
-                        esclen = strlen(esc);
-                        break;
-                default:
-                        esc = s;
-                        esclen = 1;
-                        break;
-                }
-
-                if ((size_t)((o + esclen + 1) - out) >= out_size) {
-                        /* No more space in output string, truncate. */
-                        break;
-                }
-
-                while (esclen-- > 0)
-                        *(o++) = *(esc++);
-
-                s++;
-        }
-
-        *o = '\0';
-        return out;
-}
-
-
-static void rd_kafka_offset_sync_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
-	rd_kafka_toppar_t *rktp = arg;
-	rd_kafka_offset_sync(rktp);
-}
-
-
-/**
- * Prepare a toppar for using an offset file.
- *
- * Locality: rdkafka main thread
- * Locks: toppar_lock(rktp) must be held
- */
-static void rd_kafka_offset_file_init (rd_kafka_toppar_t *rktp) {
-	char spath[4096];
-	const char *path = rktp->rktp_rkt->rkt_conf.offset_store_path;
-	int64_t offset = RD_KAFKA_OFFSET_INVALID;
-
-	if (rd_kafka_path_is_dir(path)) {
-                char tmpfile[1024];
-                char escfile[4096];
-
-                /* Include group.id in filename if configured. */
-                if (!RD_KAFKAP_STR_IS_NULL(rktp->rktp_rkt->rkt_rk->rk_group_id))
-                        rd_snprintf(tmpfile, sizeof(tmpfile),
-                                 "%s-%"PRId32"-%.*s.offset",
-                                 rktp->rktp_rkt->rkt_topic->str,
-                                 rktp->rktp_partition,
-                                 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_rk->
-                                                  rk_group_id));
-                else
-                        rd_snprintf(tmpfile, sizeof(tmpfile),
-                                 "%s-%"PRId32".offset",
-                                 rktp->rktp_rkt->rkt_topic->str,
-                                 rktp->rktp_partition);
-
-                /* Escape filename to make it safe. */
-                mk_esc_filename(tmpfile, escfile, sizeof(escfile));
-
-                rd_snprintf(spath, sizeof(spath), "%s%s%s",
-                         path, path[strlen(path)-1] == '/' ? "" : "/", escfile);
-
-		path = spath;
-	}
-
-	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
-		     "%s [%"PRId32"]: using offset file %s",
-		     rktp->rktp_rkt->rkt_topic->str,
-		     rktp->rktp_partition,
-		     path);
-	rktp->rktp_offset_path = rd_strdup(path);
-
-
-        /* Set up the offset file sync interval. */
- 	if (rktp->rktp_rkt->rkt_conf.offset_store_sync_interval_ms > 0)
-		rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
-				     &rktp->rktp_offset_sync_tmr,
-				     rktp->rktp_rkt->rkt_conf.
-				     offset_store_sync_interval_ms * 1000ll,
-				     rd_kafka_offset_sync_tmr_cb, rktp);
-
-	if (rd_kafka_offset_file_open(rktp) != -1) {
-		/* Read offset from offset file. */
-		offset = rd_kafka_offset_file_read(rktp);
-	}
-
-	if (offset != RD_KAFKA_OFFSET_INVALID) {
-		/* Start fetching from offset */
-		rktp->rktp_stored_offset = offset;
-		rktp->rktp_committed_offset = offset;
-                rd_kafka_toppar_next_offset_handle(rktp, offset);
-
-	} else {
-		/* Offset was not usable: perform offset reset logic */
-		rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
-		rd_kafka_offset_reset(rktp, RD_KAFKA_OFFSET_INVALID,
-				      RD_KAFKA_RESP_ERR__FS,
-				      "non-readable offset file");
-	}
-}
-
-
-
-/**
- * Terminate broker offset store
- */
-static rd_kafka_resp_err_t rd_kafka_offset_broker_term (rd_kafka_toppar_t *rktp){
-        return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * Prepare a toppar for using broker offset commit (broker 0.8.2 or later).
- * When using KafkaConsumer (high-level consumer) this functionality is
- * disabled in favour of the cgrp commits for the entire set of subscriptions.
- */
-static void rd_kafka_offset_broker_init (rd_kafka_toppar_t *rktp) {
-        if (!rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk))
-                return;
-        rd_kafka_offset_reset(rktp, RD_KAFKA_OFFSET_STORED, 0,
-                              "query broker for offsets");
-}
-
-
-/**
- * Terminates toppar's offset store, this is the finalizing step after
- * offset_store_stop().
- *
- * Locks: rd_kafka_toppar_lock() MUST be held.
- */
-void rd_kafka_offset_store_term (rd_kafka_toppar_t *rktp,
-                                 rd_kafka_resp_err_t err) {
-        rd_kafka_resp_err_t err2;
-
-	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "STORETERM",
-		     "%s [%"PRId32"]: offset store terminating",
-                     rktp->rktp_rkt->rkt_topic->str,
-		     rktp->rktp_partition);
-
-        rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING;
-
-	rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
-			    &rktp->rktp_offset_commit_tmr, 1/*lock*/);
-
-        switch (rktp->rktp_rkt->rkt_conf.offset_store_method)
-        {
-        case RD_KAFKA_OFFSET_METHOD_FILE:
-                err2 = rd_kafka_offset_file_term(rktp);
-                break;
-        case RD_KAFKA_OFFSET_METHOD_BROKER:
-                err2 = rd_kafka_offset_broker_term(rktp);
-                break;
-        case RD_KAFKA_OFFSET_METHOD_NONE:
-                err2 = RD_KAFKA_RESP_ERR_NO_ERROR;
-                break;
-        }
-
-        /* Prioritize the input error (probably from commit), fall
-         * back on termination error. */
-        if (!err)
-                err = err2;
-
-        rd_kafka_toppar_fetch_stopped(rktp, err);
-
-}
-
-
-/**
- * Stop toppar's offset store, committing the final offsets, etc.
- *
- * Returns RD_KAFKA_RESP_ERR_NO_ERROR on success,
- * RD_KAFKA_RESP_ERR__IN_PROGRESS if the term triggered an
- * async operation (e.g., broker offset commit), or
- * any other error in case of immediate failure.
- *
- * The offset layer will call rd_kafka_offset_store_term() when
- * the offset management has been fully stopped for this partition.
- *
- * Locks: rd_kafka_toppar_lock() MUST be held.
- */
-rd_kafka_resp_err_t rd_kafka_offset_store_stop (rd_kafka_toppar_t *rktp) {
-        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
-
-        if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE))
-                goto done;
-
-        rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING;
-
-        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
-                     "%s [%"PRId32"]: stopping offset store "
-                     "(stored offset %"PRId64
-                     ", committed offset %"PRId64", EOF offset %"PRId64")",
-                     rktp->rktp_rkt->rkt_topic->str,
-		     rktp->rktp_partition,
-		     rktp->rktp_stored_offset, rktp->rktp_committed_offset,
-                     rktp->rktp_offsets_fin.eof_offset);
-
-        /* Store end offset for empty partitions */
-        if (rktp->rktp_rkt->rkt_rk->rk_conf.enable_auto_offset_store &&
-            rktp->rktp_stored_offset == RD_KAFKA_OFFSET_INVALID &&
-            rktp->rktp_offsets_fin.eof_offset > 0)
-                rd_kafka_offset_store0(rktp, rktp->rktp_offsets_fin.eof_offset,
-                                       0/*no lock*/);
-
-        /* Commit offset to backing store.
-         * This might be an async operation. */
-        if (rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk) &&
-            rktp->rktp_stored_offset > rktp->rktp_committed_offset)
-                err = rd_kafka_offset_commit(rktp, "offset store stop");
-
-        /* If stop is in progress (async commit), return now. */
-        if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
-                return err;
-
-done:
-        /* Stop is done */
-        rd_kafka_offset_store_term(rktp, err);
-
-        return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-static void rd_kafka_offset_auto_commit_tmr_cb (rd_kafka_timers_t *rkts,
-						 void *arg) {
-	rd_kafka_toppar_t *rktp = arg;
-	rd_kafka_offset_commit(rktp, "auto commit timer");
-}
-
-void rd_kafka_offset_query_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
-	rd_kafka_toppar_t *rktp = arg;
-	rd_kafka_toppar_lock(rktp);
-	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
-		     "Topic %s [%"PRId32"]: timed offset query for %s in "
-		     "state %s",
-		     rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
-		     rd_kafka_offset2str(rktp->rktp_query_offset),
-		     rd_kafka_fetch_states[rktp->rktp_fetch_state]);
-	rd_kafka_toppar_offset_request(rktp, rktp->rktp_query_offset, 0);
-	rd_kafka_toppar_unlock(rktp);
-}
-
-
-/**
- * Initialize toppar's offset store.
- *
- * Locality: toppar handler thread
- */
-void rd_kafka_offset_store_init (rd_kafka_toppar_t *rktp) {
-        static const char *store_names[] = { "none", "file", "broker" };
-
-        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
-                     "%s [%"PRId32"]: using offset store method: %s",
-                     rktp->rktp_rkt->rkt_topic->str,
-                     rktp->rktp_partition,
-                     store_names[rktp->rktp_rkt->rkt_conf.offset_store_method]);
-
-        /* The committed offset is unknown at this point. */
-        rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
-
-        /* Set up the commit interval (for simple consumer). */
-        if (rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk) &&
-            rktp->rktp_rkt->rkt_conf.auto_commit_interval_ms > 0)
-		rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
-				     &rktp->rktp_offset_commit_tmr,
-				     rktp->rktp_rkt->rkt_conf.
-				     auto_commit_interval_ms * 1000ll,
-				     rd_kafka_offset_auto_commit_tmr_cb,
-				     rktp);
-
-        switch (rktp->rktp_rkt->rkt_conf.offset_store_method)
-        {
-        case RD_KAFKA_OFFSET_METHOD_FILE:
-                rd_kafka_offset_file_init(rktp);
-                break;
-        case RD_KAFKA_OFFSET_METHOD_BROKER:
-                rd_kafka_offset_broker_init(rktp);
-                break;
-        case RD_KAFKA_OFFSET_METHOD_NONE:
-                break;
-        default:
-                /* NOTREACHED */
-                return;
-        }
-
-        rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_OFFSET_STORE;
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_offset.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_offset.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_offset.h
deleted file mode 100644
index a9e8655..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_offset.h
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012,2013 Magnus Edenhill
- * All rights reserved.
- * 
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-#include "rdkafka_partition.h"
-
-
-const char *rd_kafka_offset2str (int64_t offset);
-
-
-/**
- * Stores the offset for the toppar 'rktp'.
- * The actual commit of the offset to backing store is usually
- * performed at a later time (time or threshold based).
- *
- * See head of rdkafka_offset.c for more information.
- */
-static RD_INLINE RD_UNUSED
-void rd_kafka_offset_store0 (rd_kafka_toppar_t *rktp, int64_t offset,
-			     int lock) {
-	if (lock)
-		rd_kafka_toppar_lock(rktp);
-	rktp->rktp_stored_offset = offset;
-	if (lock)
-		rd_kafka_toppar_unlock(rktp);
-}
-
-rd_kafka_resp_err_t rd_kafka_offset_store (rd_kafka_topic_t *rkt,
-					   int32_t partition, int64_t offset);
-
-rd_kafka_resp_err_t rd_kafka_offset_sync (rd_kafka_toppar_t *rktp);
-
-void rd_kafka_offset_store_term (rd_kafka_toppar_t *rktp,
-                                 rd_kafka_resp_err_t err);
-rd_kafka_resp_err_t rd_kafka_offset_store_stop (rd_kafka_toppar_t *rktp);
-void rd_kafka_offset_store_init (rd_kafka_toppar_t *rktp);
-
-void rd_kafka_offset_reset (rd_kafka_toppar_t *rktp, int64_t err_offset,
-			    rd_kafka_resp_err_t err, const char *reason);
-
-void rd_kafka_offset_query_tmr_cb (rd_kafka_timers_t *rkts, void *arg);
-
-void rd_kafka_offset_commit_cb_op (rd_kafka_t *rk,
-				   rd_kafka_resp_err_t err,
-				   const rd_kafka_topic_partition_list_t *offsets);
-