You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:52 UTC
[24/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade
to librdkafka 0.11.4
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_msgset_writer.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_msgset_writer.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_msgset_writer.c
deleted file mode 100644
index 5faad84..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_msgset_writer.c
+++ /dev/null
@@ -1,1161 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2017 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "rd.h"
-#include "rdkafka_int.h"
-#include "rdkafka_msg.h"
-#include "rdkafka_msgset.h"
-#include "rdkafka_topic.h"
-#include "rdkafka_partition.h"
-#include "rdkafka_lz4.h"
-
-#include "snappy.h"
-#include "rdvarint.h"
-#include "crc32c.h"
-
-
-typedef struct rd_kafka_msgset_writer_s {
- rd_kafka_buf_t *msetw_rkbuf; /* Backing store buffer (refcounted)*/
-
- int16_t msetw_ApiVersion; /* ProduceRequest ApiVersion */
- int msetw_MsgVersion; /* MsgVersion to construct */
- int msetw_features; /* Protocol features to use */
- int msetw_msgcntmax; /* Max number of messages to send
- * in a batch. */
- size_t msetw_messages_len; /* Total size of Messages, without
- * MessageSet header */
-
- size_t msetw_MessageSetSize; /* Current MessageSetSize value */
- size_t msetw_of_MessageSetSize; /* offset of MessageSetSize */
- size_t msetw_of_start; /* offset of MessageSet */
-
- int msetw_relative_offsets; /* Bool: use relative offsets */
-
- /* For MessageSet v2 */
- int msetw_Attributes; /* MessageSet Attributes */
- int64_t msetw_MaxTimestamp; /* Maximum timestamp in batch */
- size_t msetw_of_CRC; /* offset of MessageSet.CRC */
-
- /* First message information */
- struct {
- size_t of; /* rkbuf's first message position */
- int64_t timestamp;
- } msetw_firstmsg;
-
- rd_kafka_broker_t *msetw_rkb; /* @warning Not a refcounted
- * reference! */
- rd_kafka_toppar_t *msetw_rktp; /* @warning Not a refcounted
- * reference! */
-} rd_kafka_msgset_writer_t;
-
-
-
-/**
- * @brief Select ApiVersion and MsgVersion to use based on broker's
- * feature compatibility.
- *
- * @locality broker thread
- */
-static RD_INLINE void
-rd_kafka_msgset_writer_select_MsgVersion (rd_kafka_msgset_writer_t *msetw) {
- rd_kafka_broker_t *rkb = msetw->msetw_rkb;
- int feature;
-
- if ((feature = rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER2)) {
- msetw->msetw_ApiVersion = 3;
- msetw->msetw_MsgVersion = 2;
- msetw->msetw_features |= feature;
- } else if ((feature = rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER1)) {
- msetw->msetw_ApiVersion = 2;
- msetw->msetw_MsgVersion = 1;
- msetw->msetw_features |= feature;
- } else {
- if ((feature =
- rkb->rkb_features & RD_KAFKA_FEATURE_THROTTLETIME)) {
- msetw->msetw_ApiVersion = 1;
- msetw->msetw_features |= feature;
- } else
- msetw->msetw_ApiVersion = 0;
- msetw->msetw_MsgVersion = 0;
- }
-}
-
-
-/**
- * @brief Allocate buffer for messageset writer based on a previously set
- * up \p msetw.
- *
- * Allocate iovecs to hold all headers and messages,
- * and allocate enough space to allow copies of small messages.
- * The allocated size is the minimum of message.max.bytes
- * or queued_bytes + msgcntmax * msg_overhead
- */
-static void
-rd_kafka_msgset_writer_alloc_buf (rd_kafka_msgset_writer_t *msetw) {
- rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
- size_t msg_overhead = 0;
- size_t hdrsize = 0;
- size_t msgsetsize = 0;
- size_t bufsize;
-
- rd_kafka_assert(NULL, !msetw->msetw_rkbuf);
-
- /* Calculate worst-case buffer size, produce header size,
- * message size, etc, this isn't critical but avoids unnecesary
- * extra allocations. The buffer will grow as needed if we get
- * this wrong.
- *
- * ProduceRequest headers go in one iovec:
- * ProduceRequest v0..2:
- * RequiredAcks + Timeout +
- * [Topic + [Partition + MessageSetSize]]
- *
- * ProduceRequest v3:
- * TransactionalId + RequiredAcks + Timeout +
- * [Topic + [Partition + MessageSetSize + MessageSet]]
- */
-
- /*
- * ProduceRequest header sizes
- */
- switch (msetw->msetw_ApiVersion)
- {
- case 3:
- /* Add TransactionalId */
- hdrsize += RD_KAFKAP_STR_SIZE(rk->rk_eos.TransactionalId);
- /* FALLTHRU */
- case 0:
- case 1:
- case 2:
- hdrsize +=
- /* RequiredAcks + Timeout + TopicCnt */
- 2 + 4 + 4 +
- /* Topic */
- RD_KAFKAP_STR_SIZE(msetw->msetw_rktp->
- rktp_rkt->rkt_topic) +
- /* PartitionCnt + Partition + MessageSetSize */
- 4 + 4 + 4;
- msgsetsize += 4; /* MessageSetSize */
- break;
-
- default:
- RD_NOTREACHED();
- }
-
- /*
- * MsgVersion specific sizes:
- * - (Worst-case) Message overhead: message fields
- * - MessageSet header size
- */
- switch (msetw->msetw_MsgVersion)
- {
- case 0:
- /* MsgVer0 */
- msg_overhead = RD_KAFKAP_MESSAGE_V0_OVERHEAD;
- break;
- case 1:
- /* MsgVer1 */
- msg_overhead = RD_KAFKAP_MESSAGE_V1_OVERHEAD;
- break;
-
- case 2:
- /* MsgVer2 uses varints, we calculate for the worst-case. */
- msg_overhead += RD_KAFKAP_MESSAGE_V2_OVERHEAD;
-
- /* MessageSet header fields */
- msgsetsize +=
- 8 /* BaseOffset */ +
- 4 /* Length */ +
- 4 /* PartitionLeaderEpoch */ +
- 1 /* Magic (MsgVersion) */ +
- 4 /* CRC (CRC32C) */ +
- 2 /* Attributes */ +
- 4 /* LastOffsetDelta */ +
- 8 /* BaseTimestamp */ +
- 8 /* MaxTimestamp */ +
- 8 /* ProducerId */ +
- 2 /* ProducerEpoch */ +
- 4 /* BaseSequence */ +
- 4 /* RecordCount */;
- break;
-
- default:
- RD_NOTREACHED();
- }
-
- /*
- * Calculate total buffer size to allocate
- */
- bufsize = hdrsize + msgsetsize;
-
- /* If copying for small payloads is enabled, allocate enough
- * space for each message to be copied based on this limit.
- */
- if (rk->rk_conf.msg_copy_max_size > 0) {
- size_t queued_bytes = rd_kafka_msgq_size(&msetw->msetw_rktp->
- rktp_xmit_msgq);
- bufsize += RD_MIN(queued_bytes,
- (size_t)rk->rk_conf.msg_copy_max_size *
- msetw->msetw_msgcntmax);
- }
-
- /* Add estimed per-message overhead */
- bufsize += msg_overhead * msetw->msetw_msgcntmax;
-
- /* Cap allocation at message.max.bytes */
- if (bufsize > (size_t)rk->rk_conf.max_msg_size)
- bufsize = (size_t)rk->rk_conf.max_msg_size;
-
- /*
- * Allocate iovecs to hold all headers and messages,
- * and allocate auxilliery space for message headers, etc.
- */
- msetw->msetw_rkbuf =
- rd_kafka_buf_new_request(msetw->msetw_rkb, RD_KAFKAP_Produce,
- msetw->msetw_msgcntmax/2 + 10,
- bufsize);
-
- rd_kafka_buf_ApiVersion_set(msetw->msetw_rkbuf,
- msetw->msetw_ApiVersion,
- msetw->msetw_features);
-}
-
-
-/**
- * @brief Write the MessageSet header.
- * @remark Must only be called for MsgVersion 2
- */
-static void
-rd_kafka_msgset_writer_write_MessageSet_v2_header (
- rd_kafka_msgset_writer_t *msetw) {
- rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
- rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
-
- rd_kafka_assert(NULL, msetw->msetw_ApiVersion >= 3);
- rd_kafka_assert(NULL, msetw->msetw_MsgVersion == 2);
-
- /* BaseOffset (also store the offset to the start of
- * the messageset header fields) */
- msetw->msetw_of_start = rd_kafka_buf_write_i64(rkbuf, 0);
-
- /* Length: updated later */
- rd_kafka_buf_write_i32(rkbuf, 0);
-
- /* PartitionLeaderEpoch (KIP-101) */
- rd_kafka_buf_write_i32(rkbuf, 0);
-
- /* Magic (MsgVersion) */
- rd_kafka_buf_write_i8(rkbuf, msetw->msetw_MsgVersion);
-
- /* CRC (CRC32C): updated later.
- * CRC needs to be done after the entire messageset+messages has
- * been constructed and the following header fields updated. :(
- * Save the offset for this position. so it can be udpated later. */
- msetw->msetw_of_CRC = rd_kafka_buf_write_i32(rkbuf, 0);
-
- /* Attributes: updated later */
- rd_kafka_buf_write_i16(rkbuf, 0);
-
- /* LastOffsetDelta: updated later */
- rd_kafka_buf_write_i32(rkbuf, 0);
-
- /* BaseTimestamp: updated later */
- rd_kafka_buf_write_i64(rkbuf, 0);
-
- /* MaxTimestamp: updated later */
- rd_kafka_buf_write_i64(rkbuf, 0);
-
- /* ProducerId */
- rd_kafka_buf_write_i64(rkbuf, rk->rk_eos.PID);
-
- /* ProducerEpoch */
- rd_kafka_buf_write_i16(rkbuf, rk->rk_eos.ProducerEpoch);
-
- /* BaseSequence */
- rd_kafka_buf_write_i32(rkbuf, -1);
-
- /* RecordCount: udpated later */
- rd_kafka_buf_write_i32(rkbuf, 0);
-
-}
-
-
-/**
- * @brief Write ProduceRequest headers.
- * When this function returns the msgset is ready for
- * writing individual messages.
- * msetw_MessageSetSize will have been set to the messageset header.
- */
-static void
-rd_kafka_msgset_writer_write_Produce_header (rd_kafka_msgset_writer_t *msetw) {
-
- rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
- rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
- rd_kafka_itopic_t *rkt = msetw->msetw_rktp->rktp_rkt;
-
- /* V3: TransactionalId */
- if (msetw->msetw_ApiVersion == 3)
- rd_kafka_buf_write_kstr(rkbuf, rk->rk_eos.TransactionalId);
-
- /* RequiredAcks */
- rd_kafka_buf_write_i16(rkbuf, rkt->rkt_conf.required_acks);
-
- /* Timeout */
- rd_kafka_buf_write_i32(rkbuf, rkt->rkt_conf.request_timeout_ms);
-
- /* TopicArrayCnt */
- rd_kafka_buf_write_i32(rkbuf, 1);
-
- /* Insert topic */
- rd_kafka_buf_write_kstr(rkbuf, rkt->rkt_topic);
-
- /* PartitionArrayCnt */
- rd_kafka_buf_write_i32(rkbuf, 1);
-
- /* Partition */
- rd_kafka_buf_write_i32(rkbuf, msetw->msetw_rktp->rktp_partition);
-
- /* MessageSetSize: Will be finalized later*/
- msetw->msetw_of_MessageSetSize = rd_kafka_buf_write_i32(rkbuf, 0);
-
- if (msetw->msetw_MsgVersion == 2) {
- /* MessageSet v2 header */
- rd_kafka_msgset_writer_write_MessageSet_v2_header(msetw);
- msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V2_SIZE;
- } else {
- /* Older MessageSet */
- msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V0_SIZE;
- }
-}
-
-
-/**
- * @brief Initialize a ProduceRequest MessageSet writer for
- * the given broker and partition.
- *
- * A new buffer will be allocated to fit the pending messages in queue.
- *
- * @returns the number of messages to enqueue
- *
- * @remark This currently constructs the entire ProduceRequest, containing
- * a single outer MessageSet for a single partition.
- */
-static int rd_kafka_msgset_writer_init (rd_kafka_msgset_writer_t *msetw,
- rd_kafka_broker_t *rkb,
- rd_kafka_toppar_t *rktp) {
- int msgcnt = rd_atomic32_get(&rktp->rktp_xmit_msgq.rkmq_msg_cnt);
-
- if (msgcnt == 0)
- return 0;
-
- memset(msetw, 0, sizeof(*msetw));
-
- msetw->msetw_rktp = rktp;
- msetw->msetw_rkb = rkb;
-
- /* Max number of messages to send in a batch,
- * limited by current queue size or configured batch size,
- * whichever is lower. */
- msetw->msetw_msgcntmax = RD_MIN(msgcnt,
- rkb->rkb_rk->rk_conf.
- batch_num_messages);
- rd_dassert(msetw->msetw_msgcntmax > 0);
-
- /* Select MsgVersion to use */
- rd_kafka_msgset_writer_select_MsgVersion(msetw);
-
- /* MsgVersion specific setup. */
- switch (msetw->msetw_MsgVersion)
- {
- case 2:
- msetw->msetw_relative_offsets = 1; /* OffsetDelta */
- break;
- case 1:
- if (rktp->rktp_rkt->rkt_conf.compression_codec)
- msetw->msetw_relative_offsets = 1;
- break;
- }
-
- /* Allocate backing buffer */
- rd_kafka_msgset_writer_alloc_buf(msetw);
-
- /* Construct first part of Produce header + MessageSet header */
- rd_kafka_msgset_writer_write_Produce_header(msetw);
-
- /* The current buffer position is now where the first message
- * is located.
- * Record the current buffer position so it can be rewound later
- * in case of compression. */
- msetw->msetw_firstmsg.of = rd_buf_write_pos(&msetw->msetw_rkbuf->
- rkbuf_buf);
-
- return msetw->msetw_msgcntmax;
-}
-
-
-
-/**
- * @brief Copy or link message payload to buffer.
- */
-static RD_INLINE void
-rd_kafka_msgset_writer_write_msg_payload (rd_kafka_msgset_writer_t *msetw,
- const rd_kafka_msg_t *rkm,
- void (*free_cb)(void *)) {
- const rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
- rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
-
- /* If payload is below the copy limit and there is still
- * room in the buffer we'll copy the payload to the buffer,
- * otherwise we push a reference to the memory. */
- if (rkm->rkm_len <= (size_t)rk->rk_conf.msg_copy_max_size &&
- rd_buf_write_remains(&rkbuf->rkbuf_buf) > rkm->rkm_len)
- rd_kafka_buf_write(rkbuf,
- rkm->rkm_payload, rkm->rkm_len);
- else
- rd_kafka_buf_push(rkbuf, rkm->rkm_payload, rkm->rkm_len,
- free_cb);
-}
-
-
-/**
- * @brief Write message to messageset buffer with MsgVersion 0 or 1.
- * @returns the number of bytes written.
- */
-static size_t
-rd_kafka_msgset_writer_write_msg_v0_1 (rd_kafka_msgset_writer_t *msetw,
- rd_kafka_msg_t *rkm,
- int64_t Offset,
- int8_t MsgAttributes,
- void (*free_cb)(void *)) {
- rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
- size_t MessageSize;
- size_t of_Crc;
-
- /*
- * MessageSet's (v0 and v1) per-Message header.
- */
-
- /* Offset (only relevant for compressed messages on MsgVersion v1) */
- rd_kafka_buf_write_i64(rkbuf, Offset);
-
- /* MessageSize */
- MessageSize =
- 4 + 1 + 1 + /* Crc+MagicByte+Attributes */
- 4 /* KeyLength */ + rkm->rkm_key_len +
- 4 /* ValueLength */ + rkm->rkm_len;
-
- if (msetw->msetw_MsgVersion == 1)
- MessageSize += 8; /* Timestamp i64 */
-
- rd_kafka_buf_write_i32(rkbuf, (int32_t)MessageSize);
-
- /*
- * Message
- */
- /* Crc: will be updated later */
- of_Crc = rd_kafka_buf_write_i32(rkbuf, 0);
-
- /* Start Crc calculation of all buf writes. */
- rd_kafka_buf_crc_init(rkbuf);
-
- /* MagicByte */
- rd_kafka_buf_write_i8(rkbuf, msetw->msetw_MsgVersion);
-
- /* Attributes */
- rd_kafka_buf_write_i8(rkbuf, MsgAttributes);
-
- /* V1: Timestamp */
- if (msetw->msetw_MsgVersion == 1)
- rd_kafka_buf_write_i64(rkbuf, rkm->rkm_timestamp);
-
- /* Message Key */
- rd_kafka_buf_write_bytes(rkbuf, rkm->rkm_key, rkm->rkm_key_len);
-
- /* Write or copy Value/payload */
- if (rkm->rkm_payload) {
- rd_kafka_buf_write_i32(rkbuf, (int32_t)rkm->rkm_len);
- rd_kafka_msgset_writer_write_msg_payload(msetw, rkm, free_cb);
- } else
- rd_kafka_buf_write_i32(rkbuf, RD_KAFKAP_BYTES_LEN_NULL);
-
- /* Finalize Crc */
- rd_kafka_buf_update_u32(rkbuf, of_Crc,
- rd_kafka_buf_crc_finalize(rkbuf));
-
-
- /* Return written message size */
- return 8/*Offset*/ + 4/*MessageSize*/ + MessageSize;
-}
-
-/**
- * @brief Write message to messageset buffer with MsgVersion 2.
- * @returns the number of bytes written.
- */
-static size_t
-rd_kafka_msgset_writer_write_msg_v2 (rd_kafka_msgset_writer_t *msetw,
- rd_kafka_msg_t *rkm,
- int64_t Offset,
- int8_t MsgAttributes,
- void (*free_cb)(void *)) {
- rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
- size_t MessageSize = 0;
- char varint_Length[RD_UVARINT_ENC_SIZEOF(int32_t)];
- char varint_TimestampDelta[RD_UVARINT_ENC_SIZEOF(int64_t)];
- char varint_OffsetDelta[RD_UVARINT_ENC_SIZEOF(int64_t)];
- char varint_KeyLen[RD_UVARINT_ENC_SIZEOF(int32_t)];
- char varint_ValueLen[RD_UVARINT_ENC_SIZEOF(int32_t)];
- char varint_HeaderCount[RD_UVARINT_ENC_SIZEOF(int32_t)];
- size_t sz_Length;
- size_t sz_TimestampDelta;
- size_t sz_OffsetDelta;
- size_t sz_KeyLen;
- size_t sz_ValueLen;
- size_t sz_HeaderCount;
-
- /* All varints, except for Length, needs to be pre-built
- * so that the Length field can be set correctly and thus have
- * correct varint encoded width. */
-
- sz_TimestampDelta = rd_uvarint_enc_i64(
- varint_TimestampDelta, sizeof(varint_TimestampDelta),
- rkm->rkm_timestamp - msetw->msetw_firstmsg.timestamp);
- sz_OffsetDelta = rd_uvarint_enc_i64(
- varint_OffsetDelta, sizeof(varint_OffsetDelta), Offset);
- sz_KeyLen = rd_uvarint_enc_i32(
- varint_KeyLen, sizeof(varint_KeyLen),
- rkm->rkm_key ? (int32_t)rkm->rkm_key_len :
- (int32_t)RD_KAFKAP_BYTES_LEN_NULL);
- sz_ValueLen = rd_uvarint_enc_i32(
- varint_ValueLen, sizeof(varint_ValueLen),
- rkm->rkm_payload ? (int32_t)rkm->rkm_len :
- (int32_t)RD_KAFKAP_BYTES_LEN_NULL);
- sz_HeaderCount = rd_uvarint_enc_i32(
- varint_HeaderCount, sizeof(varint_HeaderCount), 0);
-
- /* Calculate MessageSize without length of Length (added later)
- * to store it in Length. */
- MessageSize =
- 1 /* MsgAttributes */ +
- sz_TimestampDelta +
- sz_OffsetDelta +
- sz_KeyLen +
- rkm->rkm_key_len +
- sz_ValueLen +
- rkm->rkm_len +
- sz_HeaderCount;
-
- /* Length */
- sz_Length = rd_uvarint_enc_i64(varint_Length, sizeof(varint_Length),
- MessageSize);
- rd_kafka_buf_write(rkbuf, varint_Length, sz_Length);
- MessageSize += sz_Length;
-
- /* Attributes: The MsgAttributes argument is losely based on MsgVer0
- * which don't apply for MsgVer2 */
- rd_kafka_buf_write_i8(rkbuf, 0);
-
- /* TimestampDelta */
- rd_kafka_buf_write(rkbuf, varint_TimestampDelta, sz_TimestampDelta);
-
- /* OffsetDelta */
- rd_kafka_buf_write(rkbuf, varint_OffsetDelta, sz_OffsetDelta);
-
- /* KeyLen */
- rd_kafka_buf_write(rkbuf, varint_KeyLen, sz_KeyLen);
-
- /* Key (if any) */
- if (rkm->rkm_key)
- rd_kafka_buf_write(rkbuf, rkm->rkm_key, rkm->rkm_key_len);
-
- /* ValueLen */
- rd_kafka_buf_write(rkbuf, varint_ValueLen, sz_ValueLen);
-
- /* Write or copy Value/payload */
- if (rkm->rkm_payload)
- rd_kafka_msgset_writer_write_msg_payload(msetw, rkm, free_cb);
-
- /* HeaderCount (headers currently not implemented) */
- rd_kafka_buf_write(rkbuf, varint_HeaderCount, sz_HeaderCount);
-
- /* Return written message size */
- return MessageSize;
-}
-
-
-/**
- * @brief Write message to messageset buffer.
- * @returns the number of bytes written.
- */
-static size_t
-rd_kafka_msgset_writer_write_msg (rd_kafka_msgset_writer_t *msetw,
- rd_kafka_msg_t *rkm,
- int64_t Offset, int8_t MsgAttributes,
- void (*free_cb)(void *)) {
- size_t outlen;
- size_t (*writer[]) (rd_kafka_msgset_writer_t *,
- rd_kafka_msg_t *, int64_t, int8_t,
- void (*)(void *)) = {
- [0] = rd_kafka_msgset_writer_write_msg_v0_1,
- [1] = rd_kafka_msgset_writer_write_msg_v0_1,
- [2] = rd_kafka_msgset_writer_write_msg_v2
- };
- size_t actual_written;
- size_t pre_pos;
-
- if (likely(rkm->rkm_timestamp))
- MsgAttributes |= RD_KAFKA_MSG_ATTR_CREATE_TIME;
-
- pre_pos = rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf);
-
- outlen = writer[msetw->msetw_MsgVersion](msetw, rkm,
- Offset, MsgAttributes,
- free_cb);
-
- actual_written = rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) -
- pre_pos;
- rd_assert(outlen <=
- rd_kafka_msg_wire_size(rkm, msetw->msetw_MsgVersion));
- rd_assert(outlen == actual_written);
-
- return outlen;
-
-}
-
-/**
- * @brief Write as many messages from the given message queue to
- * the messageset.
- */
-static void
-rd_kafka_msgset_writer_write_msgq (rd_kafka_msgset_writer_t *msetw,
- rd_kafka_msgq_t *rkmq) {
- rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
- rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
- rd_kafka_broker_t *rkb = msetw->msetw_rkb;
- size_t len = rd_buf_len(&msetw->msetw_rkbuf->rkbuf_buf);
- size_t max_msg_size = (size_t)msetw->msetw_rkb->rkb_rk->
- rk_conf.max_msg_size;
- rd_ts_t int_latency_base;
- rd_ts_t MaxTimestamp = 0;
- rd_kafka_msg_t *rkm;
- int msgcnt = 0;
-
- /* Internal latency calculation base.
- * Uses rkm_ts_timeout which is enqueue time + timeout */
- int_latency_base = rd_clock() +
- (rktp->rktp_rkt->rkt_conf.message_timeout_ms * 1000);
-
- /* Acquire BaseTimestamp from first message. */
- rkm = TAILQ_FIRST(&rkmq->rkmq_msgs);
- rd_kafka_assert(NULL, rkm);
- msetw->msetw_firstmsg.timestamp = rkm->rkm_timestamp;
-
- /*
- * Write as many messages as possible until buffer is full
- * or limit reached.
- */
- do {
- if (unlikely(msgcnt == msetw->msetw_msgcntmax ||
- len + rd_kafka_msg_wire_size(rkm, msetw->
- msetw_MsgVersion) >
- max_msg_size)) {
- rd_rkb_dbg(rkb, MSG, "PRODUCE",
- "No more space in current MessageSet "
- "(%i message(s), %"PRIusz" bytes)",
- msgcnt, len);
- break;
- }
-
- /* Move message to buffer's queue */
- rd_kafka_msgq_deq(rkmq, rkm, 1);
- rd_kafka_msgq_enq(&rkbuf->rkbuf_msgq, rkm);
-
- /* Add internal latency metrics */
- rd_avg_add(&rkb->rkb_avg_int_latency,
- int_latency_base - rkm->rkm_ts_timeout);
-
- /* MessageSet v2's .MaxTimestamp field */
- if (unlikely(MaxTimestamp < rkm->rkm_timestamp))
- MaxTimestamp = rkm->rkm_timestamp;
-
- /* Write message to buffer */
- len += rd_kafka_msgset_writer_write_msg(msetw, rkm, msgcnt, 0,
- NULL);
-
- rd_dassert(len <= max_msg_size);
- msgcnt++;
-
- } while ((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs)));
-
- msetw->msetw_MaxTimestamp = MaxTimestamp;
-}
-
-
-#if WITH_ZLIB
-/**
- * @brief Compress messageset using gzip/zlib
- */
-static int
-rd_kafka_msgset_writer_compress_gzip (rd_kafka_msgset_writer_t *msetw,
- rd_slice_t *slice,
- struct iovec *ciov) {
-
- rd_kafka_broker_t *rkb = msetw->msetw_rkb;
- rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
- z_stream strm;
- size_t len = rd_slice_remains(slice);
- const void *p;
- size_t rlen;
- int r;
-
- memset(&strm, 0, sizeof(strm));
- r = deflateInit2(&strm, Z_DEFAULT_COMPRESSION,
- Z_DEFLATED, 15+16,
- 8, Z_DEFAULT_STRATEGY);
- if (r != Z_OK) {
- rd_rkb_log(rkb, LOG_ERR, "GZIP",
- "Failed to initialize gzip for "
- "compressing %"PRIusz" bytes in "
- "topic %.*s [%"PRId32"]: %s (%i): "
- "sending uncompressed",
- len,
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition,
- strm.msg ? strm.msg : "", r);
- return -1;
- }
-
- /* Calculate maximum compressed size and
- * allocate an output buffer accordingly, being
- * prefixed with the Message header. */
- ciov->iov_len = deflateBound(&strm, (uLong)rd_slice_remains(slice));
- ciov->iov_base = rd_malloc(ciov->iov_len);
-
- strm.next_out = (void *)ciov->iov_base;
- strm.avail_out = (uInt)ciov->iov_len;
-
- /* Iterate through each segment and compress it. */
- while ((rlen = rd_slice_reader(slice, &p))) {
-
- strm.next_in = (void *)p;
- strm.avail_in = (uInt)rlen;
-
- /* Compress message */
- if ((r = deflate(&strm, Z_NO_FLUSH) != Z_OK)) {
- rd_rkb_log(rkb, LOG_ERR, "GZIP",
- "Failed to gzip-compress "
- "%"PRIusz" bytes (%"PRIusz" total) for "
- "topic %.*s [%"PRId32"]: "
- "%s (%i): "
- "sending uncompressed",
- rlen, len,
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition,
- strm.msg ? strm.msg : "", r);
- deflateEnd(&strm);
- rd_free(ciov->iov_base);
- return -1;
- }
-
- rd_kafka_assert(rkb->rkb_rk, strm.avail_in == 0);
- }
-
- /* Finish the compression */
- if ((r = deflate(&strm, Z_FINISH)) != Z_STREAM_END) {
- rd_rkb_log(rkb, LOG_ERR, "GZIP",
- "Failed to finish gzip compression "
- " of %"PRIusz" bytes for "
- "topic %.*s [%"PRId32"]: "
- "%s (%i): "
- "sending uncompressed",
- len,
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition,
- strm.msg ? strm.msg : "", r);
- deflateEnd(&strm);
- rd_free(ciov->iov_base);
- return -1;
- }
-
- ciov->iov_len = strm.total_out;
-
- /* Deinitialize compression */
- deflateEnd(&strm);
-
- return 0;
-}
-#endif
-
-
-#if WITH_SNAPPY
-/**
- * @brief Compress messageset using Snappy
- */
-static int
-rd_kafka_msgset_writer_compress_snappy (rd_kafka_msgset_writer_t *msetw,
- rd_slice_t *slice, struct iovec *ciov) {
- rd_kafka_broker_t *rkb = msetw->msetw_rkb;
- rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
- struct iovec *iov;
- size_t iov_max, iov_cnt;
- struct snappy_env senv;
- size_t len = rd_slice_remains(slice);
- int r;
-
- /* Initialize snappy compression environment */
- rd_kafka_snappy_init_env_sg(&senv, 1/*iov enable*/);
-
- /* Calculate maximum compressed size and
- * allocate an output buffer accordingly. */
- ciov->iov_len = rd_kafka_snappy_max_compressed_length(len);
- ciov->iov_base = rd_malloc(ciov->iov_len);
-
- iov_max = slice->buf->rbuf_segment_cnt;
- iov = rd_alloca(sizeof(*iov) * iov_max);
-
- rd_slice_get_iov(slice, iov, &iov_cnt, iov_max, len);
-
- /* Compress each message */
- if ((r = rd_kafka_snappy_compress_iov(&senv, iov, iov_cnt, len,
- ciov)) != 0) {
- rd_rkb_log(rkb, LOG_ERR, "SNAPPY",
- "Failed to snappy-compress "
- "%"PRIusz" bytes for "
- "topic %.*s [%"PRId32"]: %s: "
- "sending uncompressed",
- len, RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition,
- rd_strerror(-r));
- rd_free(ciov->iov_base);
- return -1;
- }
-
- /* rd_free snappy environment */
- rd_kafka_snappy_free_env(&senv);
-
- return 0;
-}
-#endif
-
-/**
- * @brief Compress messageset using LZ4F
- */
-static int
-rd_kafka_msgset_writer_compress_lz4 (rd_kafka_msgset_writer_t *msetw,
- rd_slice_t *slice, struct iovec *ciov) {
- rd_kafka_resp_err_t err;
- err = rd_kafka_lz4_compress(msetw->msetw_rkb,
- /* Correct or incorrect HC */
- msetw->msetw_MsgVersion >= 1 ? 1 : 0,
- slice, &ciov->iov_base, &ciov->iov_len);
- return (err ? -1 : 0);
-}
-
-
-
-/**
- * @brief Compress the message set.
- * @param outlenp in: total uncompressed messages size,
- * out (on success): returns the compressed buffer size.
- * @returns 0 on success or if -1 if compression failed.
- * @remark Compression failures are not critical, we'll just send the
- * the messageset uncompressed.
- */
-static int
-rd_kafka_msgset_writer_compress (rd_kafka_msgset_writer_t *msetw,
- size_t *outlenp) {
- rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
- rd_buf_t *rbuf = &msetw->msetw_rkbuf->rkbuf_buf;
- rd_slice_t slice;
- size_t len = *outlenp;
- struct iovec ciov = RD_ZERO_INIT; /* Compressed output buffer */
- int r = -1;
- size_t outlen;
-
- rd_assert(rd_buf_len(rbuf) >= msetw->msetw_firstmsg.of + len);
-
- /* Create buffer slice from firstmsg and onwards */
- r = rd_slice_init(&slice, rbuf, msetw->msetw_firstmsg.of, len);
- rd_assert(r == 0 || !*"invalid firstmsg position");
-
- switch (rktp->rktp_rkt->rkt_conf.compression_codec)
- {
-#if WITH_ZLIB
- case RD_KAFKA_COMPRESSION_GZIP:
- r = rd_kafka_msgset_writer_compress_gzip(msetw, &slice, &ciov);
- break;
-#endif
-
-#if WITH_SNAPPY
- case RD_KAFKA_COMPRESSION_SNAPPY:
- r = rd_kafka_msgset_writer_compress_snappy(msetw, &slice,
- &ciov);
- break;
-#endif
-
- case RD_KAFKA_COMPRESSION_LZ4:
- /* Skip LZ4 compression if broker doesn't support it. */
- if (!(msetw->msetw_rkb->rkb_features & RD_KAFKA_FEATURE_LZ4))
- return -1;
-
- r = rd_kafka_msgset_writer_compress_lz4(msetw, &slice, &ciov);
- break;
-
-
- default:
- rd_kafka_assert(NULL,
- !*"notreached: unsupported compression.codec");
- break;
- }
-
- if (r == -1) /* Compression failed, send uncompressed */
- return -1;
-
-
- if (unlikely(ciov.iov_len > len)) {
- /* If the compressed data is larger than the uncompressed size
- * then throw it away and send as uncompressed. */
- rd_free(ciov.iov_base);
- return -1;
- }
-
- /* Set compression codec in MessageSet.Attributes */
- msetw->msetw_Attributes |= rktp->rktp_rkt->rkt_conf.compression_codec;
-
- /* Rewind rkbuf to the pre-message checkpoint (firstmsg)
- * and replace the original message(s) with the compressed payload,
- * possibly with version dependent enveloping. */
- rd_buf_write_seek(rbuf, msetw->msetw_firstmsg.of);
-
- rd_kafka_assert(msetw->msetw_rkb->rkb_rk, ciov.iov_len < INT32_MAX);
-
- if (msetw->msetw_MsgVersion == 2) {
- /* MsgVersion 2 has no inner MessageSet header or wrapping
- * for compressed messages, just the messages back-to-back,
- * so we can push the compressed memory directly to the
- * buffer without wrapping it. */
- rd_buf_push(rbuf, ciov.iov_base, ciov.iov_len, rd_free);
- outlen = ciov.iov_len;
-
- } else {
- /* Older MessageSets envelope/wrap the compressed MessageSet
- * in an outer Message. */
- rd_kafka_msg_t rkm = {
- .rkm_len = ciov.iov_len,
- .rkm_payload = ciov.iov_base,
- .rkm_timestamp = msetw->msetw_firstmsg.timestamp
- };
- outlen = rd_kafka_msgset_writer_write_msg(
- msetw, &rkm, 0,
- rktp->rktp_rkt->rkt_conf.compression_codec,
- rd_free/*free for ciov.iov_base*/);
- }
-
- *outlenp = outlen;
-
- return 0;
-}
-
-
-
-
-/**
- * @brief Calculate MessageSet v2 CRC (CRC32C) when messageset is complete.
- */
-static void
-rd_kafka_msgset_writer_calc_crc_v2 (rd_kafka_msgset_writer_t *msetw) {
- int32_t crc;
- rd_slice_t slice;
- int r;
-
- r = rd_slice_init(&slice, &msetw->msetw_rkbuf->rkbuf_buf,
- msetw->msetw_of_CRC+4,
- rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) -
- msetw->msetw_of_CRC-4);
- rd_assert(!r && *"slice_init failed");
-
- /* CRC32C calculation */
- crc = rd_slice_crc32c(&slice);
-
- /* Update CRC at MessageSet v2 CRC offset */
- rd_kafka_buf_update_i32(msetw->msetw_rkbuf, msetw->msetw_of_CRC, crc);
-}
-
-/**
- * @brief Finalize MessageSet v2 header fields.
- */
-static void
-rd_kafka_msgset_writer_finalize_MessageSet_v2_header (
- rd_kafka_msgset_writer_t *msetw) {
- rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
- int msgcnt = rd_kafka_msgq_len(&rkbuf->rkbuf_msgq);
-
- rd_kafka_assert(NULL, msgcnt > 0);
- rd_kafka_assert(NULL, msetw->msetw_ApiVersion >= 3);
-
- msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V2_SIZE +
- msetw->msetw_messages_len;
-
- /* MessageSet.Length is the same as
- * MessageSetSize minus field widths for FirstOffset+Length */
- rd_kafka_buf_update_i32(rkbuf, msetw->msetw_of_start +
- RD_KAFKAP_MSGSET_V2_OF_Length,
- (int32_t)msetw->msetw_MessageSetSize - (8+4));
-
- msetw->msetw_Attributes |= RD_KAFKA_MSG_ATTR_CREATE_TIME;
-
- rd_kafka_buf_update_i16(rkbuf, msetw->msetw_of_start +
- RD_KAFKAP_MSGSET_V2_OF_Attributes,
- msetw->msetw_Attributes);
-
- rd_kafka_buf_update_i32(rkbuf, msetw->msetw_of_start +
- RD_KAFKAP_MSGSET_V2_OF_LastOffsetDelta,
- msgcnt-1);
-
- rd_kafka_buf_update_i64(rkbuf, msetw->msetw_of_start +
- RD_KAFKAP_MSGSET_V2_OF_BaseTimestamp,
- msetw->msetw_firstmsg.timestamp);
-
- rd_kafka_buf_update_i64(rkbuf, msetw->msetw_of_start +
- RD_KAFKAP_MSGSET_V2_OF_MaxTimestamp,
- msetw->msetw_MaxTimestamp);
-
- rd_kafka_buf_update_i32(rkbuf, msetw->msetw_of_start +
- RD_KAFKAP_MSGSET_V2_OF_RecordCount, msgcnt);
-
- rd_kafka_msgset_writer_calc_crc_v2(msetw);
-}
-
-
-
-
-/**
- * @brief Finalize the MessageSet header, if applicable.
- */
-static void
-rd_kafka_msgset_writer_finalize_MessageSet (rd_kafka_msgset_writer_t *msetw) {
- rd_dassert(msetw->msetw_messages_len > 0);
-
- if (msetw->msetw_MsgVersion == 2)
- rd_kafka_msgset_writer_finalize_MessageSet_v2_header(msetw);
- else
- msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V0_SIZE +
- msetw->msetw_messages_len;
-
- /* Update MessageSetSize */
- rd_kafka_buf_update_i32(msetw->msetw_rkbuf,
- msetw->msetw_of_MessageSetSize,
- (int32_t)msetw->msetw_MessageSetSize);
-
-}
-
-
-/**
- * @brief Finalize the messageset - call when no more messages are to be
- * added to the messageset.
- *
- * Will compress, update final values, CRCs, etc.
- *
- * The messageset writer is destroyed and the buffer is returned
- * and ready to be transmitted.
- *
- * @param MessagetSetSizep will be set to the finalized MessageSetSize
- *
- * @returns the buffer to transmit or NULL if there were no messages
- * in messageset.
- */
-static rd_kafka_buf_t *
-rd_kafka_msgset_writer_finalize (rd_kafka_msgset_writer_t *msetw,
- size_t *MessageSetSizep) {
- rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
- rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
- size_t len;
- int cnt;
-
- /* No messages added, bail out early. */
- if (unlikely((cnt = rd_kafka_msgq_len(&rkbuf->rkbuf_msgq)) == 0)) {
- rd_kafka_buf_destroy(rkbuf);
- return NULL;
- }
-
- /* Total size of messages */
- len = rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) -
- msetw->msetw_firstmsg.of;
- rd_assert(len > 0);
- rd_assert(len <= (size_t)rktp->rktp_rkt->rkt_rk->rk_conf.max_msg_size);
-
- /* Compress the message set */
- if (rktp->rktp_rkt->rkt_conf.compression_codec)
- rd_kafka_msgset_writer_compress(msetw, &len);
-
- msetw->msetw_messages_len = len;
-
- /* Finalize MessageSet header fields */
- rd_kafka_msgset_writer_finalize_MessageSet(msetw);
-
- /* Return final MessageSetSize */
- *MessageSetSizep = msetw->msetw_MessageSetSize;
-
- rd_rkb_dbg(msetw->msetw_rkb, MSG, "PRODUCE",
- "%s [%"PRId32"]: "
- "Produce MessageSet with %i message(s) (%"PRIusz" bytes, "
- "ApiVersion %d, MsgVersion %d)",
- rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
- cnt, msetw->msetw_MessageSetSize,
- msetw->msetw_ApiVersion, msetw->msetw_MsgVersion);
-
-
- return rkbuf;
-}
-
-
-/**
- * @brief Create ProduceRequest containing as many messages from
- * the toppar's transmit queue as possible, limited by configuration,
- * size, etc.
- *
- * @param rkb broker to create buffer for
- * @param rktp toppar to transmit messages for
- * @param MessagetSetSizep will be set to the final MessageSetSize
- *
- * @returns the buffer to transmit or NULL if there were no messages
- * in messageset.
- */
-rd_kafka_buf_t *
-rd_kafka_msgset_create_ProduceRequest (rd_kafka_broker_t *rkb,
- rd_kafka_toppar_t *rktp,
- size_t *MessageSetSizep) {
-
- rd_kafka_msgset_writer_t msetw;
-
- if (rd_kafka_msgset_writer_init(&msetw, rkb, rktp) == 0)
- return NULL;
-
- rd_kafka_msgset_writer_write_msgq(&msetw, &rktp->rktp_xmit_msgq);
-
- return rd_kafka_msgset_writer_finalize(&msetw, MessageSetSizep);
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_offset.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_offset.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_offset.c
deleted file mode 100644
index b586988..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_offset.c
+++ /dev/null
@@ -1,1139 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012,2013 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-// FIXME: Revise this documentation:
-/**
- * This file implements the consumer offset storage.
- * It currently supports local file storage and broker OffsetCommit storage,
- * not zookeeper.
- *
- * Regardless of commit method (file, broker, ..) this is how it works:
- * - When rdkafka, or the application, depending on if auto.offset.commit
- * is enabled or not, calls rd_kafka_offset_store() with an offset to store,
- * all it does is set rktp->rktp_stored_offset to this value.
- * This can happen from any thread and is locked by the rktp lock.
- * - The actual commit/write of the offset to its backing store (filesystem)
- * is performed by the main rdkafka thread and scheduled at the configured
- * auto.commit.interval.ms interval.
- * - The write is performed in the main rdkafka thread (in a blocking manner
- * for file based offsets) and once the write has
- * succeeded rktp->rktp_committed_offset is updated to the new value.
- * - If offset.store.sync.interval.ms is configured the main rdkafka thread
- * will also make sure to fsync() each offset file accordingly. (file)
- */
-
-
-#include "rdkafka_int.h"
-#include "rdkafka_topic.h"
-#include "rdkafka_partition.h"
-#include "rdkafka_offset.h"
-#include "rdkafka_broker.h"
-
-#include <stdio.h>
-#include <sys/types.h>
-#include <fcntl.h>
-
-#ifdef _MSC_VER
-#include <io.h>
-#include <share.h>
-#include <sys/stat.h>
-#include <Shlwapi.h>
-typedef int mode_t;
-#endif
-
-
-/**
- * Convert an absolute or logical offset to string.
- */
-const char *rd_kafka_offset2str (int64_t offset) {
- static RD_TLS char ret[16][32];
- static RD_TLS int i = 0;
-
- i = (i + 1) % 16;
-
- if (offset >= 0)
- rd_snprintf(ret[i], sizeof(ret[i]), "%"PRId64, offset);
- else if (offset == RD_KAFKA_OFFSET_BEGINNING)
- return "BEGINNING";
- else if (offset == RD_KAFKA_OFFSET_END)
- return "END";
- else if (offset == RD_KAFKA_OFFSET_STORED)
- return "STORED";
- else if (offset == RD_KAFKA_OFFSET_INVALID)
- return "INVALID";
- else if (offset <= RD_KAFKA_OFFSET_TAIL_BASE)
- rd_snprintf(ret[i], sizeof(ret[i]), "TAIL(%lld)",
- llabs(offset - RD_KAFKA_OFFSET_TAIL_BASE));
- else
- rd_snprintf(ret[i], sizeof(ret[i]), "%"PRId64"?", offset);
-
- return ret[i];
-}
-
-static void rd_kafka_offset_file_close (rd_kafka_toppar_t *rktp) {
- if (!rktp->rktp_offset_fp)
- return;
-
- fclose(rktp->rktp_offset_fp);
- rktp->rktp_offset_fp = NULL;
-}
-
-
-#ifndef _MSC_VER
-/**
- * Linux version of open callback providing racefree CLOEXEC.
- */
-int rd_kafka_open_cb_linux (const char *pathname, int flags, mode_t mode,
- void *opaque) {
-#ifdef O_CLOEXEC
- return open(pathname, flags|O_CLOEXEC, mode);
-#else
- return rd_kafka_open_cb_generic(pathname, flags, mode, opaque);
-#endif
-}
-#endif
-
-/**
- * Fallback version of open_cb NOT providing racefree CLOEXEC,
- * but setting CLOEXEC after file open (if FD_CLOEXEC is defined).
- */
-int rd_kafka_open_cb_generic (const char *pathname, int flags, mode_t mode,
- void *opaque) {
-#ifndef _MSC_VER
- int fd;
- int on = 1;
- fd = open(pathname, flags, mode);
- if (fd == -1)
- return -1;
-#ifdef FD_CLOEXEC
- fcntl(fd, F_SETFD, FD_CLOEXEC, &on);
-#endif
- return fd;
-#else
- int fd;
- if (_sopen_s(&fd, pathname, flags, _SH_DENYNO, mode) != 0)
- return -1;
- return fd;
-#endif
-}
-
-
-static int rd_kafka_offset_file_open (rd_kafka_toppar_t *rktp) {
- rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
- int fd;
-
-#ifndef _MSC_VER
- mode_t mode = 0644;
-#else
- mode_t mode = _S_IREAD|_S_IWRITE;
-#endif
- if ((fd = rk->rk_conf.open_cb(rktp->rktp_offset_path,
- O_CREAT|O_RDWR, mode,
- rk->rk_conf.opaque)) == -1) {
- rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
- RD_KAFKA_RESP_ERR__FS,
- "%s [%"PRId32"]: "
- "Failed to open offset file %s: %s",
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition,
- rktp->rktp_offset_path, rd_strerror(errno));
- return -1;
- }
-
- rktp->rktp_offset_fp =
-#ifndef _MSC_VER
- fdopen(fd, "r+");
-#else
- _fdopen(fd, "r+");
-#endif
-
- return 0;
-}
-
-
-static int64_t rd_kafka_offset_file_read (rd_kafka_toppar_t *rktp) {
- char buf[22];
- char *end;
- int64_t offset;
- size_t r;
-
- if (fseek(rktp->rktp_offset_fp, 0, SEEK_SET) == -1) {
- rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
- RD_KAFKA_RESP_ERR__FS,
- "%s [%"PRId32"]: "
- "Seek (for read) failed on offset file %s: %s",
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition,
- rktp->rktp_offset_path,
- rd_strerror(errno));
- rd_kafka_offset_file_close(rktp);
- return RD_KAFKA_OFFSET_INVALID;
- }
-
- r = fread(buf, 1, sizeof(buf) - 1, rktp->rktp_offset_fp);
- if (r == 0) {
- rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
- "%s [%"PRId32"]: offset file (%s) is empty",
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition,
- rktp->rktp_offset_path);
- return RD_KAFKA_OFFSET_INVALID;
- }
-
- buf[r] = '\0';
-
- offset = strtoull(buf, &end, 10);
- if (buf == end) {
- rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
- RD_KAFKA_RESP_ERR__FS,
- "%s [%"PRId32"]: "
- "Unable to parse offset in %s",
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition,
- rktp->rktp_offset_path);
- return RD_KAFKA_OFFSET_INVALID;
- }
-
-
- rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
- "%s [%"PRId32"]: Read offset %"PRId64" from offset "
- "file (%s)",
- rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
- offset, rktp->rktp_offset_path);
-
- return offset;
-}
-
-
-/**
- * Sync/flush offset file.
- */
-static int rd_kafka_offset_file_sync (rd_kafka_toppar_t *rktp) {
- if (!rktp->rktp_offset_fp)
- return 0;
-
- rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "SYNC",
- "%s [%"PRId32"]: offset file sync",
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition);
-
-#ifndef _MSC_VER
- (void)fflush(rktp->rktp_offset_fp);
- (void)fsync(fileno(rktp->rktp_offset_fp)); // FIXME
-#else
- // FIXME
- // FlushFileBuffers(_get_osfhandle(fileno(rktp->rktp_offset_fp)));
-#endif
- return 0;
-}
-
-
-/**
- * Write offset to offset file.
- *
- * Locality: toppar's broker thread
- */
-static rd_kafka_resp_err_t
-rd_kafka_offset_file_commit (rd_kafka_toppar_t *rktp) {
- rd_kafka_itopic_t *rkt = rktp->rktp_rkt;
- int attempt;
- rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
- int64_t offset = rktp->rktp_stored_offset;
-
- for (attempt = 0 ; attempt < 2 ; attempt++) {
- char buf[22];
- int len;
-
- if (!rktp->rktp_offset_fp)
- if (rd_kafka_offset_file_open(rktp) == -1)
- continue;
-
- if (fseek(rktp->rktp_offset_fp, 0, SEEK_SET) == -1) {
- rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
- RD_KAFKA_RESP_ERR__FS,
- "%s [%"PRId32"]: "
- "Seek failed on offset file %s: %s",
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition,
- rktp->rktp_offset_path,
- rd_strerror(errno));
- err = RD_KAFKA_RESP_ERR__FS;
- rd_kafka_offset_file_close(rktp);
- continue;
- }
-
- len = rd_snprintf(buf, sizeof(buf), "%"PRId64"\n", offset);
-
- if (fwrite(buf, 1, len, rktp->rktp_offset_fp) < 1) {
- rd_kafka_op_err(rktp->rktp_rkt->rkt_rk,
- RD_KAFKA_RESP_ERR__FS,
- "%s [%"PRId32"]: "
- "Failed to write offset %"PRId64" to "
- "offset file %s: %s",
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition,
- offset,
- rktp->rktp_offset_path,
- rd_strerror(errno));
- err = RD_KAFKA_RESP_ERR__FS;
- rd_kafka_offset_file_close(rktp);
- continue;
- }
-
- /* Need to flush before truncate to preserve write ordering */
- (void)fflush(rktp->rktp_offset_fp);
-
- /* Truncate file */
-#ifdef _MSC_VER
- if (_chsize_s(_fileno(rktp->rktp_offset_fp), len) == -1)
- ; /* Ignore truncate failures */
-#else
- if (ftruncate(fileno(rktp->rktp_offset_fp), len) == -1)
- ; /* Ignore truncate failures */
-#endif
- rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
- "%s [%"PRId32"]: wrote offset %"PRId64" to "
- "file %s",
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition, offset,
- rktp->rktp_offset_path);
-
- rktp->rktp_committed_offset = offset;
-
- /* If sync interval is set to immediate we sync right away. */
- if (rkt->rkt_conf.offset_store_sync_interval_ms == 0)
- rd_kafka_offset_file_sync(rktp);
-
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
- }
-
-
- return err;
-}
-
-
-/**
- * Enqueue offset_commit_cb op, if configured.
- *
- */
-void rd_kafka_offset_commit_cb_op (rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- const rd_kafka_topic_partition_list_t *offsets) {
- rd_kafka_op_t *rko;
-
- if (!(rk->rk_conf.enabled_events & RD_KAFKA_EVENT_OFFSET_COMMIT))
- return;
-
- rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT|RD_KAFKA_OP_REPLY);
- rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
- rko->rko_err = err;
- rko->rko_u.offset_commit.cb = rk->rk_conf.offset_commit_cb;/*maybe NULL*/
- rko->rko_u.offset_commit.opaque = rk->rk_conf.opaque;
- if (offsets)
- rko->rko_u.offset_commit.partitions =
- rd_kafka_topic_partition_list_copy(offsets);
- rd_kafka_q_enq(rk->rk_rep, rko);
-}
-
-
-
-
-/**
- * Commit a list of offsets asynchronously. Response will be queued on 'replyq'.
- * Optional \p cb will be set on requesting op.
- *
- * Makes a copy of \p offsets (may be NULL for current assignment)
- */
-static rd_kafka_resp_err_t
-rd_kafka_commit0 (rd_kafka_t *rk,
- const rd_kafka_topic_partition_list_t *offsets,
- rd_kafka_toppar_t *rktp,
- rd_kafka_replyq_t replyq,
- void (*cb) (rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *offsets,
- void *opaque),
- void *opaque,
- const char *reason) {
- rd_kafka_cgrp_t *rkcg;
- rd_kafka_op_t *rko;
-
- if (!(rkcg = rd_kafka_cgrp_get(rk)))
- return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
- rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT);
- rko->rko_u.offset_commit.reason = rd_strdup(reason);
- rko->rko_replyq = replyq;
- rko->rko_u.offset_commit.cb = cb;
- rko->rko_u.offset_commit.opaque = opaque;
- if (rktp)
- rko->rko_rktp = rd_kafka_toppar_keep(rktp);
-
- if (offsets)
- rko->rko_u.offset_commit.partitions =
- rd_kafka_topic_partition_list_copy(offsets);
-
- rd_kafka_q_enq(rkcg->rkcg_ops, rko);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-
-/**
- * NOTE: 'offsets' may be NULL, see official documentation.
- */
-rd_kafka_resp_err_t
-rd_kafka_commit (rd_kafka_t *rk,
- const rd_kafka_topic_partition_list_t *offsets, int async) {
- rd_kafka_cgrp_t *rkcg;
- rd_kafka_resp_err_t err;
- rd_kafka_q_t *repq = NULL;
- rd_kafka_replyq_t rq = RD_KAFKA_NO_REPLYQ;
-
- if (!(rkcg = rd_kafka_cgrp_get(rk)))
- return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
- if (!async)
- repq = rd_kafka_q_new(rk);
-
- if (!async)
- rq = RD_KAFKA_REPLYQ(repq, 0);
-
- err = rd_kafka_commit0(rk, offsets, NULL, rq, NULL, NULL, "manual");
-
- if (!err && !async) {
- err = rd_kafka_q_wait_result(repq, RD_POLL_INFINITE);
- rd_kafka_q_destroy(repq);
- }
-
- return err;
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
- int async) {
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_topic_partition_t *rktpar;
- rd_kafka_resp_err_t err;
-
- if (rkmessage->err)
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
-
- offsets = rd_kafka_topic_partition_list_new(1);
- rktpar = rd_kafka_topic_partition_list_add(
- offsets, rd_kafka_topic_name(rkmessage->rkt),
- rkmessage->partition);
- rktpar->offset = rkmessage->offset+1;
-
- err = rd_kafka_commit(rk, offsets, async);
-
- rd_kafka_topic_partition_list_destroy(offsets);
-
- return err;
-}
-
-
-
-rd_kafka_resp_err_t
-rd_kafka_commit_queue (rd_kafka_t *rk,
- const rd_kafka_topic_partition_list_t *offsets,
- rd_kafka_queue_t *rkqu,
- void (*cb) (rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *offsets,
- void *opaque),
- void *opaque) {
- rd_kafka_q_t *rkq;
- rd_kafka_resp_err_t err;
-
- if (!rd_kafka_cgrp_get(rk))
- return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
-
- if (rkqu)
- rkq = rkqu->rkqu_q;
- else
- rkq = rd_kafka_q_new(rk);
-
- err = rd_kafka_commit0(rk, offsets, NULL,
- RD_KAFKA_REPLYQ(rkq, 0),
- cb, opaque, "manual");
-
- if (!rkqu) {
- rd_kafka_op_t *rko =
- rd_kafka_q_pop_serve(rkq, RD_POLL_INFINITE,
- 0, RD_KAFKA_Q_CB_FORCE_RETURN,
- NULL, NULL);
- if (!rko)
- err = RD_KAFKA_RESP_ERR__TIMED_OUT;
- else {
- if (cb)
- cb(rk, rko->rko_err,
- rko->rko_u.offset_commit.partitions,
- opaque);
- err = rko->rko_err;
- rd_kafka_op_destroy(rko);
- }
-
- rd_kafka_q_destroy(rkq);
- }
-
- return err;
-}
-
-
-
-
-/**
- * Called when a broker commit is done.
- *
- * Locality: toppar handler thread
- * Locks: none
- */
-static void
-rd_kafka_offset_broker_commit_cb (rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *offsets,
- void *opaque) {
- shptr_rd_kafka_toppar_t *s_rktp;
- rd_kafka_toppar_t *rktp;
- rd_kafka_topic_partition_t *rktpar;
-
- if (offsets->cnt == 0) {
- rd_kafka_dbg(rk, TOPIC, "OFFSETCOMMIT",
- "No offsets to commit (commit_cb)");
- return;
- }
-
- rktpar = &offsets->elems[0];
-
- if (!(s_rktp = rd_kafka_topic_partition_list_get_toppar(rk, rktpar))) {
- rd_kafka_dbg(rk, TOPIC, "OFFSETCOMMIT",
- "No local partition found for %s [%"PRId32"] "
- "while parsing OffsetCommit response "
- "(offset %"PRId64", error \"%s\")",
- rktpar->topic,
- rktpar->partition,
- rktpar->offset,
- rd_kafka_err2str(rktpar->err));
- return;
- }
-
- rktp = rd_kafka_toppar_s2i(s_rktp);
-
- if (!err)
- err = rktpar->err;
-
- rd_kafka_toppar_offset_commit_result(rktp, err, offsets);
-
- rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
- "%s [%"PRId32"]: offset %"PRId64" committed: %s",
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition, rktpar->offset,
- rd_kafka_err2str(err));
-
- rktp->rktp_committing_offset = 0;
-
- rd_kafka_toppar_lock(rktp);
- if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING)
- rd_kafka_offset_store_term(rktp, err);
- rd_kafka_toppar_unlock(rktp);
-
- rd_kafka_toppar_destroy(s_rktp);
-}
-
-
-static rd_kafka_resp_err_t
-rd_kafka_offset_broker_commit (rd_kafka_toppar_t *rktp, const char *reason) {
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_topic_partition_t *rktpar;
-
- rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_cgrp != NULL);
- rd_kafka_assert(rktp->rktp_rkt->rkt_rk,
- rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE);
-
- rktp->rktp_committing_offset = rktp->rktp_stored_offset;
-
- offsets = rd_kafka_topic_partition_list_new(1);
- rktpar = rd_kafka_topic_partition_list_add(
- offsets, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
- rktpar->offset = rktp->rktp_committing_offset;
-
- rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSETCMT",
- "%.*s [%"PRId32"]: committing offset %"PRId64": %s",
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition, rktp->rktp_committing_offset,
- reason);
-
- rd_kafka_commit0(rktp->rktp_rkt->rkt_rk, offsets, rktp,
- RD_KAFKA_REPLYQ(rktp->rktp_ops, 0),
- rd_kafka_offset_broker_commit_cb, NULL,
- reason);
-
- rd_kafka_topic_partition_list_destroy(offsets);
-
- return RD_KAFKA_RESP_ERR__IN_PROGRESS;
-}
-
-
-
-
-/**
- * Commit offset to backing store.
- * This might be an async operation.
- *
- * Locality: toppar handler thread
- */
-static
-rd_kafka_resp_err_t rd_kafka_offset_commit (rd_kafka_toppar_t *rktp,
- const char *reason) {
- if (1) // FIXME
- rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
- "%s [%"PRId32"]: commit: "
- "stored offset %"PRId64" > committed offset %"PRId64"?",
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition,
- rktp->rktp_stored_offset, rktp->rktp_committed_offset);
-
- /* Already committed */
- if (rktp->rktp_stored_offset <= rktp->rktp_committed_offset)
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-
- /* Already committing (for async ops) */
- if (rktp->rktp_stored_offset <= rktp->rktp_committing_offset)
- return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
-
- switch (rktp->rktp_rkt->rkt_conf.offset_store_method)
- {
- case RD_KAFKA_OFFSET_METHOD_FILE:
- return rd_kafka_offset_file_commit(rktp);
- case RD_KAFKA_OFFSET_METHOD_BROKER:
- return rd_kafka_offset_broker_commit(rktp, reason);
- default:
- /* UNREACHABLE */
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
- }
-}
-
-
-
-
-
-/**
- * Sync offset backing store. This is only used for METHOD_FILE.
- *
- * Locality: rktp's broker thread.
- */
-rd_kafka_resp_err_t rd_kafka_offset_sync (rd_kafka_toppar_t *rktp) {
- switch (rktp->rktp_rkt->rkt_conf.offset_store_method)
- {
- case RD_KAFKA_OFFSET_METHOD_FILE:
- return rd_kafka_offset_file_sync(rktp);
- default:
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
- }
-}
-
-
-/**
- * Store offset.
- * Typically called from application code.
- *
- * NOTE: No locks must be held.
- */
-rd_kafka_resp_err_t rd_kafka_offset_store (rd_kafka_topic_t *app_rkt,
- int32_t partition, int64_t offset) {
- rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
- shptr_rd_kafka_toppar_t *s_rktp;
-
- /* Find toppar */
- rd_kafka_topic_rdlock(rkt);
- if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*!ua_on_miss*/))) {
- rd_kafka_topic_rdunlock(rkt);
- return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
- }
- rd_kafka_topic_rdunlock(rkt);
-
- rd_kafka_offset_store0(rd_kafka_toppar_s2i(s_rktp), offset+1,
- 1/*lock*/);
-
- rd_kafka_toppar_destroy(s_rktp);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-rd_kafka_resp_err_t
-rd_kafka_offsets_store (rd_kafka_t *rk,
- rd_kafka_topic_partition_list_t *offsets) {
- int i;
- int ok_cnt = 0;
-
- for (i = 0 ; i < offsets->cnt ; i++) {
- rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
- shptr_rd_kafka_toppar_t *s_rktp;
-
- s_rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar);
- if (!s_rktp) {
- rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
- continue;
- }
-
- rd_kafka_offset_store0(rd_kafka_toppar_s2i(s_rktp),
- rktpar->offset, 1/*lock*/);
- rd_kafka_toppar_destroy(s_rktp);
-
- rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
- ok_cnt++;
- }
-
- return offsets->cnt > 0 && ok_cnt < offsets->cnt ?
- RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION :
- RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-
-
-
-/**
- * Decommissions the use of an offset file for a toppar.
- * The file content will not be touched and the file will not be removed.
- */
-static rd_kafka_resp_err_t rd_kafka_offset_file_term (rd_kafka_toppar_t *rktp) {
- rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
-
- /* Sync offset file if the sync is intervalled (> 0) */
- if (rktp->rktp_rkt->rkt_conf.offset_store_sync_interval_ms > 0) {
- rd_kafka_offset_file_sync(rktp);
- rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
- &rktp->rktp_offset_sync_tmr, 1/*lock*/);
- }
-
-
- rd_kafka_offset_file_close(rktp);
-
- rd_free(rktp->rktp_offset_path);
- rktp->rktp_offset_path = NULL;
-
- return err;
-}
-
-static rd_kafka_op_res_t
-rd_kafka_offset_reset_op_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq,
- rd_kafka_op_t *rko) {
- rd_kafka_toppar_t *rktp =
- rd_kafka_toppar_s2i(rko->rko_rktp);
- rd_kafka_toppar_lock(rktp);
- rd_kafka_offset_reset(rktp,
- rko->rko_u.offset_reset.offset,
- rko->rko_err, rko->rko_u.offset_reset.reason);
- rd_kafka_toppar_unlock(rktp);
- return RD_KAFKA_OP_RES_HANDLED;
-}
-
-/**
- * Take action when the offset for a toppar becomes unusable.
- *
- * Locality: toppar handler thread
- * Locks: toppar_lock() MUST be held
- */
-void rd_kafka_offset_reset (rd_kafka_toppar_t *rktp, int64_t err_offset,
- rd_kafka_resp_err_t err, const char *reason) {
- int64_t offset = RD_KAFKA_OFFSET_INVALID;
- rd_kafka_op_t *rko;
-
- /* Enqueue op for toppar handler thread if we're on the wrong thread. */
- if (!thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread)) {
- rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_RESET |
- RD_KAFKA_OP_CB);
- rko->rko_op_cb = rd_kafka_offset_reset_op_cb;
- rko->rko_err = err;
- rko->rko_rktp = rd_kafka_toppar_keep(rktp);
- rko->rko_u.offset_reset.offset = err_offset;
- rko->rko_u.offset_reset.reason = rd_strdup(reason);
- rd_kafka_q_enq(rktp->rktp_ops, rko);
- return;
- }
-
- if (err_offset == RD_KAFKA_OFFSET_INVALID || err)
- offset = rktp->rktp_rkt->rkt_conf.auto_offset_reset;
- else
- offset = err_offset;
-
- if (offset == RD_KAFKA_OFFSET_INVALID) {
- /* Error, auto.offset.reset tells us to error out. */
- rko = rd_kafka_op_new(RD_KAFKA_OP_CONSUMER_ERR);
-
- rko->rko_err = err;
- rko->rko_u.err.offset = err_offset;
- rko->rko_u.err.errstr = rd_strdup(reason);
- rko->rko_rktp = rd_kafka_toppar_keep(rktp);
-
- rd_kafka_q_enq(rktp->rktp_fetchq, rko);
- rd_kafka_toppar_set_fetch_state(
- rktp, RD_KAFKA_TOPPAR_FETCH_NONE);
-
- } else {
- /* Query logical offset */
- rktp->rktp_query_offset = offset;
- rd_kafka_toppar_set_fetch_state(
- rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
- }
-
- rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
- "%s [%"PRId32"]: offset reset (at offset %s) "
- "to %s: %s: %s",
- rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
- rd_kafka_offset2str(err_offset),
- rd_kafka_offset2str(offset),
- reason, rd_kafka_err2str(err));
-
- if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
- rd_kafka_toppar_offset_request(rktp, rktp->rktp_query_offset, 0);
-}
-
-
-/**
- * Escape any special characters in filename 'in' and write escaped
- * string to 'out' (of max size out_size).
- */
-static char *mk_esc_filename (const char *in, char *out, size_t out_size) {
- const char *s = in;
- char *o = out;
-
- while (*s) {
- const char *esc;
- size_t esclen;
-
- switch (*s)
- {
- case '/': /* linux */
- esc = "%2F";
- esclen = strlen(esc);
- break;
- case ':': /* osx, windows */
- esc = "%3A";
- esclen = strlen(esc);
- break;
- case '\\': /* windows */
- esc = "%5C";
- esclen = strlen(esc);
- break;
- default:
- esc = s;
- esclen = 1;
- break;
- }
-
- if ((size_t)((o + esclen + 1) - out) >= out_size) {
- /* No more space in output string, truncate. */
- break;
- }
-
- while (esclen-- > 0)
- *(o++) = *(esc++);
-
- s++;
- }
-
- *o = '\0';
- return out;
-}
-
-
-static void rd_kafka_offset_sync_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
- rd_kafka_toppar_t *rktp = arg;
- rd_kafka_offset_sync(rktp);
-}
-
-
-/**
- * Prepare a toppar for using an offset file.
- *
- * Locality: rdkafka main thread
- * Locks: toppar_lock(rktp) must be held
- */
-static void rd_kafka_offset_file_init (rd_kafka_toppar_t *rktp) {
- char spath[4096];
- const char *path = rktp->rktp_rkt->rkt_conf.offset_store_path;
- int64_t offset = RD_KAFKA_OFFSET_INVALID;
-
- if (rd_kafka_path_is_dir(path)) {
- char tmpfile[1024];
- char escfile[4096];
-
- /* Include group.id in filename if configured. */
- if (!RD_KAFKAP_STR_IS_NULL(rktp->rktp_rkt->rkt_rk->rk_group_id))
- rd_snprintf(tmpfile, sizeof(tmpfile),
- "%s-%"PRId32"-%.*s.offset",
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition,
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_rk->
- rk_group_id));
- else
- rd_snprintf(tmpfile, sizeof(tmpfile),
- "%s-%"PRId32".offset",
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition);
-
- /* Escape filename to make it safe. */
- mk_esc_filename(tmpfile, escfile, sizeof(escfile));
-
- rd_snprintf(spath, sizeof(spath), "%s%s%s",
- path, path[strlen(path)-1] == '/' ? "" : "/", escfile);
-
- path = spath;
- }
-
- rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
- "%s [%"PRId32"]: using offset file %s",
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition,
- path);
- rktp->rktp_offset_path = rd_strdup(path);
-
-
- /* Set up the offset file sync interval. */
- if (rktp->rktp_rkt->rkt_conf.offset_store_sync_interval_ms > 0)
- rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
- &rktp->rktp_offset_sync_tmr,
- rktp->rktp_rkt->rkt_conf.
- offset_store_sync_interval_ms * 1000ll,
- rd_kafka_offset_sync_tmr_cb, rktp);
-
- if (rd_kafka_offset_file_open(rktp) != -1) {
- /* Read offset from offset file. */
- offset = rd_kafka_offset_file_read(rktp);
- }
-
- if (offset != RD_KAFKA_OFFSET_INVALID) {
- /* Start fetching from offset */
- rktp->rktp_stored_offset = offset;
- rktp->rktp_committed_offset = offset;
- rd_kafka_toppar_next_offset_handle(rktp, offset);
-
- } else {
- /* Offset was not usable: perform offset reset logic */
- rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
- rd_kafka_offset_reset(rktp, RD_KAFKA_OFFSET_INVALID,
- RD_KAFKA_RESP_ERR__FS,
- "non-readable offset file");
- }
-}
-
-
-
-/**
- * Terminate broker offset store
- */
-static rd_kafka_resp_err_t rd_kafka_offset_broker_term (rd_kafka_toppar_t *rktp){
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * Prepare a toppar for using broker offset commit (broker 0.8.2 or later).
- * When using KafkaConsumer (high-level consumer) this functionality is
- * disabled in favour of the cgrp commits for the entire set of subscriptions.
- */
-static void rd_kafka_offset_broker_init (rd_kafka_toppar_t *rktp) {
- if (!rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk))
- return;
- rd_kafka_offset_reset(rktp, RD_KAFKA_OFFSET_STORED, 0,
- "query broker for offsets");
-}
-
-
-/**
- * Terminates toppar's offset store, this is the finalizing step after
- * offset_store_stop().
- *
- * Locks: rd_kafka_toppar_lock() MUST be held.
- */
-void rd_kafka_offset_store_term (rd_kafka_toppar_t *rktp,
- rd_kafka_resp_err_t err) {
- rd_kafka_resp_err_t err2;
-
- rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "STORETERM",
- "%s [%"PRId32"]: offset store terminating",
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition);
-
- rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING;
-
- rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
- &rktp->rktp_offset_commit_tmr, 1/*lock*/);
-
- switch (rktp->rktp_rkt->rkt_conf.offset_store_method)
- {
- case RD_KAFKA_OFFSET_METHOD_FILE:
- err2 = rd_kafka_offset_file_term(rktp);
- break;
- case RD_KAFKA_OFFSET_METHOD_BROKER:
- err2 = rd_kafka_offset_broker_term(rktp);
- break;
- case RD_KAFKA_OFFSET_METHOD_NONE:
- err2 = RD_KAFKA_RESP_ERR_NO_ERROR;
- break;
- }
-
- /* Prioritize the input error (probably from commit), fall
- * back on termination error. */
- if (!err)
- err = err2;
-
- rd_kafka_toppar_fetch_stopped(rktp, err);
-
-}
-
-
-/**
- * Stop toppar's offset store, committing the final offsets, etc.
- *
- * Returns RD_KAFKA_RESP_ERR_NO_ERROR on success,
- * RD_KAFKA_RESP_ERR__IN_PROGRESS if the term triggered an
- * async operation (e.g., broker offset commit), or
- * any other error in case of immediate failure.
- *
- * The offset layer will call rd_kafka_offset_store_term() when
- * the offset management has been fully stopped for this partition.
- *
- * Locks: rd_kafka_toppar_lock() MUST be held.
- */
-rd_kafka_resp_err_t rd_kafka_offset_store_stop (rd_kafka_toppar_t *rktp) {
- rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
-
- if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE))
- goto done;
-
- rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_OFFSET_STORE_STOPPING;
-
- rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
- "%s [%"PRId32"]: stopping offset store "
- "(stored offset %"PRId64
- ", committed offset %"PRId64", EOF offset %"PRId64")",
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition,
- rktp->rktp_stored_offset, rktp->rktp_committed_offset,
- rktp->rktp_offsets_fin.eof_offset);
-
- /* Store end offset for empty partitions */
- if (rktp->rktp_rkt->rkt_rk->rk_conf.enable_auto_offset_store &&
- rktp->rktp_stored_offset == RD_KAFKA_OFFSET_INVALID &&
- rktp->rktp_offsets_fin.eof_offset > 0)
- rd_kafka_offset_store0(rktp, rktp->rktp_offsets_fin.eof_offset,
- 0/*no lock*/);
-
- /* Commit offset to backing store.
- * This might be an async operation. */
- if (rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk) &&
- rktp->rktp_stored_offset > rktp->rktp_committed_offset)
- err = rd_kafka_offset_commit(rktp, "offset store stop");
-
- /* If stop is in progress (async commit), return now. */
- if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
- return err;
-
-done:
- /* Stop is done */
- rd_kafka_offset_store_term(rktp, err);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-static void rd_kafka_offset_auto_commit_tmr_cb (rd_kafka_timers_t *rkts,
- void *arg) {
- rd_kafka_toppar_t *rktp = arg;
- rd_kafka_offset_commit(rktp, "auto commit timer");
-}
-
-void rd_kafka_offset_query_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
- rd_kafka_toppar_t *rktp = arg;
- rd_kafka_toppar_lock(rktp);
- rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
- "Topic %s [%"PRId32"]: timed offset query for %s in "
- "state %s",
- rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
- rd_kafka_offset2str(rktp->rktp_query_offset),
- rd_kafka_fetch_states[rktp->rktp_fetch_state]);
- rd_kafka_toppar_offset_request(rktp, rktp->rktp_query_offset, 0);
- rd_kafka_toppar_unlock(rktp);
-}
-
-
-/**
- * Initialize toppar's offset store.
- *
- * Locality: toppar handler thread
- */
-void rd_kafka_offset_store_init (rd_kafka_toppar_t *rktp) {
- static const char *store_names[] = { "none", "file", "broker" };
-
- rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
- "%s [%"PRId32"]: using offset store method: %s",
- rktp->rktp_rkt->rkt_topic->str,
- rktp->rktp_partition,
- store_names[rktp->rktp_rkt->rkt_conf.offset_store_method]);
-
- /* The committed offset is unknown at this point. */
- rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
-
- /* Set up the commit interval (for simple consumer). */
- if (rd_kafka_is_simple_consumer(rktp->rktp_rkt->rkt_rk) &&
- rktp->rktp_rkt->rkt_conf.auto_commit_interval_ms > 0)
- rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
- &rktp->rktp_offset_commit_tmr,
- rktp->rktp_rkt->rkt_conf.
- auto_commit_interval_ms * 1000ll,
- rd_kafka_offset_auto_commit_tmr_cb,
- rktp);
-
- switch (rktp->rktp_rkt->rkt_conf.offset_store_method)
- {
- case RD_KAFKA_OFFSET_METHOD_FILE:
- rd_kafka_offset_file_init(rktp);
- break;
- case RD_KAFKA_OFFSET_METHOD_BROKER:
- rd_kafka_offset_broker_init(rktp);
- break;
- case RD_KAFKA_OFFSET_METHOD_NONE:
- break;
- default:
- /* NOTREACHED */
- return;
- }
-
- rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_OFFSET_STORE;
-}
-
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/rdkafka_offset.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_offset.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_offset.h
deleted file mode 100644
index a9e8655..0000000
--- a/thirdparty/librdkafka-0.11.1/src/rdkafka_offset.h
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012,2013 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-#include "rdkafka_partition.h"
-
-
-const char *rd_kafka_offset2str (int64_t offset);
-
-
-/**
- * Stores the offset for the toppar 'rktp'.
- * The actual commit of the offset to backing store is usually
- * performed at a later time (time or threshold based).
- *
- * See head of rdkafka_offset.c for more information.
- */
-static RD_INLINE RD_UNUSED
-void rd_kafka_offset_store0 (rd_kafka_toppar_t *rktp, int64_t offset,
- int lock) {
- if (lock)
- rd_kafka_toppar_lock(rktp);
- rktp->rktp_stored_offset = offset;
- if (lock)
- rd_kafka_toppar_unlock(rktp);
-}
-
-rd_kafka_resp_err_t rd_kafka_offset_store (rd_kafka_topic_t *rkt,
- int32_t partition, int64_t offset);
-
-rd_kafka_resp_err_t rd_kafka_offset_sync (rd_kafka_toppar_t *rktp);
-
-void rd_kafka_offset_store_term (rd_kafka_toppar_t *rktp,
- rd_kafka_resp_err_t err);
-rd_kafka_resp_err_t rd_kafka_offset_store_stop (rd_kafka_toppar_t *rktp);
-void rd_kafka_offset_store_init (rd_kafka_toppar_t *rktp);
-
-void rd_kafka_offset_reset (rd_kafka_toppar_t *rktp, int64_t err_offset,
- rd_kafka_resp_err_t err, const char *reason);
-
-void rd_kafka_offset_query_tmr_cb (rd_kafka_timers_t *rkts, void *arg);
-
-void rd_kafka_offset_commit_cb_op (rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- const rd_kafka_topic_partition_list_t *offsets);
-