You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:15:10 UTC
[42/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade
to librdkafka 0.11.4
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src-cpp/rdkafkacpp_int.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src-cpp/rdkafkacpp_int.h b/thirdparty/librdkafka-0.11.1/src-cpp/rdkafkacpp_int.h
deleted file mode 100644
index d231d20..0000000
--- a/thirdparty/librdkafka-0.11.1/src-cpp/rdkafkacpp_int.h
+++ /dev/null
@@ -1,897 +0,0 @@
-/*
- * librdkafka - Apache Kafka C/C++ library
- *
- * Copyright (c) 2014 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-#include <string>
-#include <iostream>
-#include <cstring>
-#include <stdlib.h>
-
-#include "rdkafkacpp.h"
-
-extern "C" {
-#include "../src/rdkafka.h"
-}
-
-#ifdef _MSC_VER
-typedef int mode_t;
-#pragma warning(disable : 4250)
-#endif
-
-
-namespace RdKafka {
-
-
-void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque);
-void log_cb_trampoline (const rd_kafka_t *rk, int level,
- const char *fac, const char *buf);
-void error_cb_trampoline (rd_kafka_t *rk, int err, const char *reason,
- void *opaque);
-void throttle_cb_trampoline (rd_kafka_t *rk, const char *broker_name,
- int32_t broker_id, int throttle_time_ms,
- void *opaque);
-int stats_cb_trampoline (rd_kafka_t *rk, char *json, size_t json_len,
- void *opaque);
-int socket_cb_trampoline (int domain, int type, int protocol, void *opaque);
-int open_cb_trampoline (const char *pathname, int flags, mode_t mode,
- void *opaque);
-void rebalance_cb_trampoline (rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *c_partitions,
- void *opaque);
-void offset_commit_cb_trampoline0 (
- rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *c_offsets, void *opaque);
-
-rd_kafka_topic_partition_list_t *
- partitions_to_c_parts (const std::vector<TopicPartition*> &partitions);
-
-/**
- * @brief Update the application provided 'partitions' with info from 'c_parts'
- */
-void update_partitions_from_c_parts (std::vector<TopicPartition*> &partitions,
- const rd_kafka_topic_partition_list_t *c_parts);
-
-
-class EventImpl : public Event {
- public:
- ~EventImpl () {};
-
- EventImpl (Type type, ErrorCode err, Severity severity,
- const char *fac, const char *str):
- type_(type), err_(err), severity_(severity), fac_(fac ? fac : ""),
- str_(str), id_(0), throttle_time_(0) {};
-
- EventImpl (Type type):
- type_(type), err_(ERR_NO_ERROR), severity_(EVENT_SEVERITY_EMERG),
- fac_(""), str_(""), id_(0), throttle_time_(0) {};
-
- Type type () const { return type_; }
- ErrorCode err () const { return err_; }
- Severity severity () const { return severity_; }
- std::string fac () const { return fac_; }
- std::string str () const { return str_; }
- std::string broker_name () const {
- if (type_ == EVENT_THROTTLE)
- return str_;
- else
- return std::string("");
- }
- int broker_id () const { return id_; }
- int throttle_time () const { return throttle_time_; }
-
- Type type_;
- ErrorCode err_;
- Severity severity_;
- std::string fac_;
- std::string str_; /* reused for THROTTLE broker_name */
- int id_;
- int throttle_time_;
-};
-
-
-class MessageImpl : public Message {
- public:
- ~MessageImpl () {
- if (free_rkmessage_)
- rd_kafka_message_destroy(const_cast<rd_kafka_message_t *>(rkmessage_));
- if (key_)
- delete key_;
- };
-
- MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage):
- topic_(topic), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL) {}
-
- MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage,
- bool dofree):
- topic_(topic), rkmessage_(rkmessage), free_rkmessage_(dofree), key_(NULL) { }
-
- MessageImpl (RdKafka::Topic *topic, const rd_kafka_message_t *rkmessage):
- topic_(topic), rkmessage_(rkmessage), free_rkmessage_(false), key_(NULL) { }
-
- MessageImpl (rd_kafka_message_t *rkmessage):
- topic_(NULL), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL) {
- if (rkmessage->rkt) {
- /* Possibly NULL */
- topic_ = static_cast<Topic *>(rd_kafka_topic_opaque(rkmessage->rkt));
- }
- }
-
- /* Create errored message */
- MessageImpl (RdKafka::Topic *topic, RdKafka::ErrorCode err):
- topic_(topic), free_rkmessage_(false), key_(NULL) {
- rkmessage_ = &rkmessage_err_;
- memset(&rkmessage_err_, 0, sizeof(rkmessage_err_));
- rkmessage_err_.err = static_cast<rd_kafka_resp_err_t>(err);
- }
-
- std::string errstr() const {
- /* FIXME: If there is an error string in payload (for consume_cb)
- * it wont be shown since 'payload' is reused for errstr
- * and we cant distinguish between consumer and producer.
- * For the producer case the payload needs to be the original
- * payload pointer. */
- const char *es = rd_kafka_err2str(rkmessage_->err);
- return std::string(es ? es : "");
- }
-
- ErrorCode err () const {
- return static_cast<RdKafka::ErrorCode>(rkmessage_->err);
- }
-
- Topic *topic () const { return topic_; }
- std::string topic_name () const {
- if (rkmessage_->rkt)
- return rd_kafka_topic_name(rkmessage_->rkt);
- else
- return "";
- }
- int32_t partition () const { return rkmessage_->partition; }
- void *payload () const { return rkmessage_->payload; }
- size_t len () const { return rkmessage_->len; }
- const std::string *key () const {
- if (key_) {
- return key_;
- } else if (rkmessage_->key) {
- key_ = new std::string(static_cast<char const*>(rkmessage_->key), rkmessage_->key_len);
- return key_;
- }
- return NULL;
- }
- const void *key_pointer () const { return rkmessage_->key; }
- size_t key_len () const { return rkmessage_->key_len; }
-
- int64_t offset () const { return rkmessage_->offset; }
-
- MessageTimestamp timestamp () const {
- MessageTimestamp ts;
- rd_kafka_timestamp_type_t tstype;
- ts.timestamp = rd_kafka_message_timestamp(rkmessage_, &tstype);
- ts.type = static_cast<MessageTimestamp::MessageTimestampType>(tstype);
- return ts;
- }
-
- void *msg_opaque () const { return rkmessage_->_private; };
-
- int64_t latency () const {
- return rd_kafka_message_latency(rkmessage_);
- }
-
- RdKafka::Topic *topic_;
- const rd_kafka_message_t *rkmessage_;
- bool free_rkmessage_;
- /* For error signalling by the C++ layer the .._err_ message is
- * used as a place holder and rkmessage_ is set to point to it. */
- rd_kafka_message_t rkmessage_err_;
- mutable std::string *key_; /* mutable because it's a cached value */
-
-private:
- /* "delete" copy ctor + copy assignment, for safety of key_ */
- MessageImpl(MessageImpl const&) /*= delete*/;
- MessageImpl& operator=(MessageImpl const&) /*= delete*/;
-};
-
-
-class ConfImpl : public Conf {
- public:
- ConfImpl()
- :consume_cb_(NULL),
- dr_cb_(NULL),
- event_cb_(NULL),
- socket_cb_(NULL),
- open_cb_(NULL),
- partitioner_cb_(NULL),
- partitioner_kp_cb_(NULL),
- rebalance_cb_(NULL),
- offset_commit_cb_(NULL),
- rk_conf_(NULL),
- rkt_conf_(NULL){}
- ~ConfImpl () {
- if (rk_conf_)
- rd_kafka_conf_destroy(rk_conf_);
- else if (rkt_conf_)
- rd_kafka_topic_conf_destroy(rkt_conf_);
- }
-
- Conf::ConfResult set(const std::string &name,
- const std::string &value,
- std::string &errstr);
-
- Conf::ConfResult set (const std::string &name, DeliveryReportCb *dr_cb,
- std::string &errstr) {
- if (name != "dr_cb") {
- errstr = "Invalid value type, expected RdKafka::DeliveryReportCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- dr_cb_ = dr_cb;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult set (const std::string &name, EventCb *event_cb,
- std::string &errstr) {
- if (name != "event_cb") {
- errstr = "Invalid value type, expected RdKafka::EventCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- event_cb_ = event_cb;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult set (const std::string &name, const Conf *topic_conf,
- std::string &errstr) {
- const ConfImpl *tconf_impl =
- dynamic_cast<const RdKafka::ConfImpl *>(topic_conf);
- if (name != "default_topic_conf" || !tconf_impl->rkt_conf_) {
- errstr = "Invalid value type, expected RdKafka::Conf";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- rd_kafka_conf_set_default_topic_conf(rk_conf_,
- rd_kafka_topic_conf_dup(tconf_impl->
- rkt_conf_));
-
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult set (const std::string &name, PartitionerCb *partitioner_cb,
- std::string &errstr) {
- if (name != "partitioner_cb") {
- errstr = "Invalid value type, expected RdKafka::PartitionerCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rkt_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_TOPIC object";
- return Conf::CONF_INVALID;
- }
-
- partitioner_cb_ = partitioner_cb;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult set (const std::string &name,
- PartitionerKeyPointerCb *partitioner_kp_cb,
- std::string &errstr) {
- if (name != "partitioner_key_pointer_cb") {
- errstr = "Invalid value type, expected RdKafka::PartitionerKeyPointerCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rkt_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_TOPIC object";
- return Conf::CONF_INVALID;
- }
-
- partitioner_kp_cb_ = partitioner_kp_cb;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult set (const std::string &name, SocketCb *socket_cb,
- std::string &errstr) {
- if (name != "socket_cb") {
- errstr = "Invalid value type, expected RdKafka::SocketCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- socket_cb_ = socket_cb;
- return Conf::CONF_OK;
- }
-
-
- Conf::ConfResult set (const std::string &name, OpenCb *open_cb,
- std::string &errstr) {
- if (name != "open_cb") {
- errstr = "Invalid value type, expected RdKafka::OpenCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- open_cb_ = open_cb;
- return Conf::CONF_OK;
- }
-
-
-
-
- Conf::ConfResult set (const std::string &name, RebalanceCb *rebalance_cb,
- std::string &errstr) {
- if (name != "rebalance_cb") {
- errstr = "Invalid value type, expected RdKafka::RebalanceCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- rebalance_cb_ = rebalance_cb;
- return Conf::CONF_OK;
- }
-
-
- Conf::ConfResult set (const std::string &name,
- OffsetCommitCb *offset_commit_cb,
- std::string &errstr) {
- if (name != "offset_commit_cb") {
- errstr = "Invalid value type, expected RdKafka::OffsetCommitCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- offset_commit_cb_ = offset_commit_cb;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(const std::string &name, std::string &value) const {
- if (name.compare("dr_cb") == 0 ||
- name.compare("event_cb") == 0 ||
- name.compare("partitioner_cb") == 0 ||
- name.compare("partitioner_key_pointer_cb") == 0 ||
- name.compare("socket_cb") == 0 ||
- name.compare("open_cb") == 0 ||
- name.compare("rebalance_cb") == 0 ||
- name.compare("offset_commit_cb") == 0 ) {
- return Conf::CONF_INVALID;
- }
- rd_kafka_conf_res_t res = RD_KAFKA_CONF_INVALID;
-
- /* Get size of property */
- size_t size;
- if (rk_conf_)
- res = rd_kafka_conf_get(rk_conf_,
- name.c_str(), NULL, &size);
- else if (rkt_conf_)
- res = rd_kafka_topic_conf_get(rkt_conf_,
- name.c_str(), NULL, &size);
- if (res != RD_KAFKA_CONF_OK)
- return static_cast<Conf::ConfResult>(res);
-
- char *tmpValue = new char[size];
-
- if (rk_conf_)
- res = rd_kafka_conf_get(rk_conf_, name.c_str(),
- tmpValue, &size);
- else if (rkt_conf_)
- res = rd_kafka_topic_conf_get(rkt_conf_,
- name.c_str(), NULL, &size);
-
- if (res == RD_KAFKA_CONF_OK)
- value.assign(tmpValue);
- delete[] tmpValue;
-
- return static_cast<Conf::ConfResult>(res);
- }
-
- Conf::ConfResult get(DeliveryReportCb *&dr_cb) const {
- if (!rk_conf_)
- return Conf::CONF_INVALID;
- dr_cb = this->dr_cb_;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(EventCb *&event_cb) const {
- if (!rk_conf_)
- return Conf::CONF_INVALID;
- event_cb = this->event_cb_;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(PartitionerCb *&partitioner_cb) const {
- if (!rkt_conf_)
- return Conf::CONF_INVALID;
- partitioner_cb = this->partitioner_cb_;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const {
- if (!rkt_conf_)
- return Conf::CONF_INVALID;
- partitioner_kp_cb = this->partitioner_kp_cb_;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(SocketCb *&socket_cb) const {
- if (!rk_conf_)
- return Conf::CONF_INVALID;
- socket_cb = this->socket_cb_;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(OpenCb *&open_cb) const {
- if (!rk_conf_)
- return Conf::CONF_INVALID;
- open_cb = this->open_cb_;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(RebalanceCb *&rebalance_cb) const {
- if (!rk_conf_)
- return Conf::CONF_INVALID;
- rebalance_cb = this->rebalance_cb_;
- return Conf::CONF_OK;
- }
-
- Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const {
- if (!rk_conf_)
- return Conf::CONF_INVALID;
- offset_commit_cb = this->offset_commit_cb_;
- return Conf::CONF_OK;
- }
-
-
-
- std::list<std::string> *dump ();
-
-
- Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb,
- std::string &errstr) {
- if (name != "consume_cb") {
- errstr = "Invalid value type, expected RdKafka::ConsumeCb";
- return Conf::CONF_INVALID;
- }
-
- if (!rk_conf_) {
- errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
- return Conf::CONF_INVALID;
- }
-
- consume_cb_ = consume_cb;
- return Conf::CONF_OK;
- }
-
-
- ConsumeCb *consume_cb_;
- DeliveryReportCb *dr_cb_;
- EventCb *event_cb_;
- SocketCb *socket_cb_;
- OpenCb *open_cb_;
- PartitionerCb *partitioner_cb_;
- PartitionerKeyPointerCb *partitioner_kp_cb_;
- RebalanceCb *rebalance_cb_;
- OffsetCommitCb *offset_commit_cb_;
- ConfType conf_type_;
- rd_kafka_conf_t *rk_conf_;
- rd_kafka_topic_conf_t *rkt_conf_;
-};
-
-
-class HandleImpl : virtual public Handle {
- public:
- ~HandleImpl() {};
- HandleImpl () {};
- const std::string name () const { return std::string(rd_kafka_name(rk_)); };
- const std::string memberid () const {
- char *str = rd_kafka_memberid(rk_);
- std::string memberid = str ? str : "";
- if (str)
- rd_kafka_mem_free(rk_, str);
- return memberid;
- }
- int poll (int timeout_ms) { return rd_kafka_poll(rk_, timeout_ms); };
- int outq_len () { return rd_kafka_outq_len(rk_); };
-
- void set_common_config (RdKafka::ConfImpl *confimpl);
-
- RdKafka::ErrorCode metadata (bool all_topics,const Topic *only_rkt,
- Metadata **metadatap, int timeout_ms);
-
- ErrorCode pause (std::vector<TopicPartition*> &partitions);
- ErrorCode resume (std::vector<TopicPartition*> &partitions);
-
- ErrorCode query_watermark_offsets (const std::string &topic,
- int32_t partition,
- int64_t *low, int64_t *high,
- int timeout_ms) {
- return static_cast<RdKafka::ErrorCode>(
- rd_kafka_query_watermark_offsets(
- rk_, topic.c_str(), partition,
- low, high, timeout_ms));
- }
-
- ErrorCode get_watermark_offsets (const std::string &topic,
- int32_t partition,
- int64_t *low, int64_t *high) {
- return static_cast<RdKafka::ErrorCode>(
- rd_kafka_get_watermark_offsets(
- rk_, topic.c_str(), partition,
- low, high));
- }
-
- Queue *get_partition_queue (const TopicPartition *partition);
-
- ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets,
- int timeout_ms) {
- rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets);
- ErrorCode err = static_cast<ErrorCode>(
- rd_kafka_offsets_for_times(rk_, c_offsets, timeout_ms));
- update_partitions_from_c_parts(offsets, c_offsets);
- rd_kafka_topic_partition_list_destroy(c_offsets);
- return err;
- }
-
- ErrorCode set_log_queue (Queue *queue);
-
- void yield () {
- rd_kafka_yield(rk_);
- }
-
- const std::string clusterid (int timeout_ms) {
- char *str = rd_kafka_clusterid(rk_, timeout_ms);
- std::string clusterid = str ? str : "";
- if (str)
- rd_kafka_mem_free(rk_, str);
- return clusterid;
- }
-
- rd_kafka_t *rk_;
- /* All Producer and Consumer callbacks must reside in HandleImpl and
- * the opaque provided to rdkafka must be a pointer to HandleImpl, since
- * ProducerImpl and ConsumerImpl classes cannot be safely directly cast to
- * HandleImpl due to the skewed diamond inheritance. */
- ConsumeCb *consume_cb_;
- EventCb *event_cb_;
- SocketCb *socket_cb_;
- OpenCb *open_cb_;
- DeliveryReportCb *dr_cb_;
- PartitionerCb *partitioner_cb_;
- PartitionerKeyPointerCb *partitioner_kp_cb_;
- RebalanceCb *rebalance_cb_;
- OffsetCommitCb *offset_commit_cb_;
-};
-
-
-class TopicImpl : public Topic {
- public:
- ~TopicImpl () {
- rd_kafka_topic_destroy(rkt_);
- }
-
- const std::string name () const {
- return rd_kafka_topic_name(rkt_);
- }
-
- bool partition_available (int32_t partition) const {
- return !!rd_kafka_topic_partition_available(rkt_, partition);
- }
-
- ErrorCode offset_store (int32_t partition, int64_t offset) {
- return static_cast<RdKafka::ErrorCode>(
- rd_kafka_offset_store(rkt_, partition, offset));
- }
-
- static Topic *create (Handle &base, const std::string &topic,
- Conf *conf);
-
- rd_kafka_topic_t *rkt_;
- PartitionerCb *partitioner_cb_;
- PartitionerKeyPointerCb *partitioner_kp_cb_;
-};
-
-
-/**
- * Topic and Partition
- */
-class TopicPartitionImpl : public TopicPartition {
-public:
- ~TopicPartitionImpl() {};
-
- static TopicPartition *create (const std::string &topic, int partition);
-
- TopicPartitionImpl (const std::string &topic, int partition):
- topic_(topic), partition_(partition), offset_(RdKafka::Topic::OFFSET_INVALID),
- err_(ERR_NO_ERROR) {}
-
- TopicPartitionImpl (const std::string &topic, int partition, int64_t offset):
- topic_(topic), partition_(partition), offset_(offset),
- err_(ERR_NO_ERROR) {}
-
- TopicPartitionImpl (const rd_kafka_topic_partition_t *c_part) {
- topic_ = std::string(c_part->topic);
- partition_ = c_part->partition;
- offset_ = c_part->offset;
- err_ = static_cast<ErrorCode>(c_part->err);
- // FIXME: metadata
- }
-
- static void destroy (std::vector<TopicPartition*> &partitions);
-
- int partition () const { return partition_; }
- const std::string &topic () const { return topic_ ; }
-
- int64_t offset () const { return offset_; }
-
- ErrorCode err () const { return err_; }
-
- void set_offset (int64_t offset) { offset_ = offset; }
-
- std::ostream& operator<<(std::ostream &ostrm) const {
- return ostrm << topic_ << " [" << partition_ << "]";
- }
-
- std::string topic_;
- int partition_;
- int64_t offset_;
- ErrorCode err_;
-};
-
-
-
-class KafkaConsumerImpl : virtual public KafkaConsumer, virtual public HandleImpl {
-public:
- ~KafkaConsumerImpl () {
-
- }
-
- static KafkaConsumer *create (Conf *conf, std::string &errstr);
-
- ErrorCode assignment (std::vector<TopicPartition*> &partitions);
- ErrorCode subscription (std::vector<std::string> &topics);
- ErrorCode subscribe (const std::vector<std::string> &topics);
- ErrorCode unsubscribe ();
- ErrorCode assign (const std::vector<TopicPartition*> &partitions);
- ErrorCode unassign ();
-
- Message *consume (int timeout_ms);
- ErrorCode commitSync () {
- return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 0/*sync*/));
- }
- ErrorCode commitAsync () {
- return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 1/*async*/));
- }
- ErrorCode commitSync (Message *message) {
- MessageImpl *msgimpl = dynamic_cast<MessageImpl*>(message);
- return static_cast<ErrorCode>(
- rd_kafka_commit_message(rk_, msgimpl->rkmessage_, 0/*sync*/));
- }
- ErrorCode commitAsync (Message *message) {
- MessageImpl *msgimpl = dynamic_cast<MessageImpl*>(message);
- return static_cast<ErrorCode>(
- rd_kafka_commit_message(rk_, msgimpl->rkmessage_,1/*async*/));
- }
-
- ErrorCode commitSync (std::vector<TopicPartition*> &offsets) {
- rd_kafka_topic_partition_list_t *c_parts =
- partitions_to_c_parts(offsets);
- rd_kafka_resp_err_t err =
- rd_kafka_commit(rk_, c_parts, 0);
- if (!err)
- update_partitions_from_c_parts(offsets, c_parts);
- rd_kafka_topic_partition_list_destroy(c_parts);
- return static_cast<ErrorCode>(err);
- }
-
- ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) {
- rd_kafka_topic_partition_list_t *c_parts =
- partitions_to_c_parts(offsets);
- rd_kafka_resp_err_t err =
- rd_kafka_commit(rk_, c_parts, 1);
- rd_kafka_topic_partition_list_destroy(c_parts);
- return static_cast<ErrorCode>(err);
- }
-
- ErrorCode commitSync (OffsetCommitCb *offset_commit_cb) {
- return static_cast<ErrorCode>(
- rd_kafka_commit_queue(rk_, NULL, NULL,
- RdKafka::offset_commit_cb_trampoline0,
- offset_commit_cb));
- }
-
- ErrorCode commitSync (std::vector<TopicPartition*> &offsets,
- OffsetCommitCb *offset_commit_cb) {
- rd_kafka_topic_partition_list_t *c_parts =
- partitions_to_c_parts(offsets);
- rd_kafka_resp_err_t err =
- rd_kafka_commit_queue(rk_, c_parts, NULL,
- RdKafka::offset_commit_cb_trampoline0,
- offset_commit_cb);
- rd_kafka_topic_partition_list_destroy(c_parts);
- return static_cast<ErrorCode>(err);
- }
-
- ErrorCode committed (std::vector<TopicPartition*> &partitions, int timeout_ms);
- ErrorCode position (std::vector<TopicPartition*> &partitions);
-
- ErrorCode close ();
-
- ErrorCode seek (const TopicPartition &partition, int timeout_ms);
-
- ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) {
- rd_kafka_topic_partition_list_t *c_parts =
- partitions_to_c_parts(offsets);
- rd_kafka_resp_err_t err =
- rd_kafka_offsets_store(rk_, c_parts);
- update_partitions_from_c_parts(offsets, c_parts);
- rd_kafka_topic_partition_list_destroy(c_parts);
- return static_cast<ErrorCode>(err);
- }
-
-};
-
-
-class MetadataImpl : public Metadata {
- public:
- MetadataImpl(const rd_kafka_metadata_t *metadata);
- ~MetadataImpl();
-
- const std::vector<const BrokerMetadata *> *brokers() const {
- return &brokers_;
- }
-
- const std::vector<const TopicMetadata *> *topics() const {
- return &topics_;
- }
-
- const std::string orig_broker_name() const {
- return std::string(metadata_->orig_broker_name);
- }
-
- int32_t orig_broker_id() const {
- return metadata_->orig_broker_id;
- }
-
-private:
- const rd_kafka_metadata_t *metadata_;
- std::vector<const BrokerMetadata *> brokers_;
- std::vector<const TopicMetadata *> topics_;
- std::string orig_broker_name_;
-};
-
-
-class QueueImpl : virtual public Queue {
- public:
- ~QueueImpl () {
- rd_kafka_queue_destroy(queue_);
- }
- static Queue *create (Handle *base);
- ErrorCode forward (Queue *queue);
- Message *consume (int timeout_ms);
- int poll (int timeout_ms);
-
- rd_kafka_queue_t *queue_;
-};
-
-
-
-
-
-class ConsumerImpl : virtual public Consumer, virtual public HandleImpl {
- public:
- ~ConsumerImpl () {
- rd_kafka_destroy(rk_); };
- static Consumer *create (Conf *conf, std::string &errstr);
-
- ErrorCode start (Topic *topic, int32_t partition, int64_t offset);
- ErrorCode start (Topic *topic, int32_t partition, int64_t offset,
- Queue *queue);
- ErrorCode stop (Topic *topic, int32_t partition);
- ErrorCode seek (Topic *topic, int32_t partition, int64_t offset,
- int timeout_ms);
- Message *consume (Topic *topic, int32_t partition, int timeout_ms);
- Message *consume (Queue *queue, int timeout_ms);
- int consume_callback (Topic *topic, int32_t partition, int timeout_ms,
- ConsumeCb *cb, void *opaque);
- int consume_callback (Queue *queue, int timeout_ms,
- RdKafka::ConsumeCb *consume_cb, void *opaque);
-};
-
-
-
-class ProducerImpl : virtual public Producer, virtual public HandleImpl {
-
- public:
- ~ProducerImpl () { if (rk_) rd_kafka_destroy(rk_); };
-
- ErrorCode produce (Topic *topic, int32_t partition,
- int msgflags,
- void *payload, size_t len,
- const std::string *key,
- void *msg_opaque);
-
- ErrorCode produce (Topic *topic, int32_t partition,
- int msgflags,
- void *payload, size_t len,
- const void *key, size_t key_len,
- void *msg_opaque);
-
- ErrorCode produce (Topic *topic, int32_t partition,
- const std::vector<char> *payload,
- const std::vector<char> *key,
- void *msg_opaque);
-
- ErrorCode produce (const std::string topic_name, int32_t partition,
- int msgflags,
- void *payload, size_t len,
- const void *key, size_t key_len,
- int64_t timestamp,
- void *msg_opaque);
-
- ErrorCode flush (int timeout_ms) {
- return static_cast<RdKafka::ErrorCode>(rd_kafka_flush(rk_,
- timeout_ms));
- }
-
- static Producer *create (Conf *conf, std::string &errstr);
-
-};
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/CMakeLists.txt b/thirdparty/librdkafka-0.11.1/src/CMakeLists.txt
deleted file mode 100644
index 17bfb23..0000000
--- a/thirdparty/librdkafka-0.11.1/src/CMakeLists.txt
+++ /dev/null
@@ -1,128 +0,0 @@
-set(
- sources
- crc32c.c
- rdaddr.c
- rdavl.c
- rdbuf.c
- rdcrc32.c
- rdkafka.c
- rdkafka_assignor.c
- rdkafka_broker.c
- rdkafka_buf.c
- rdkafka_cgrp.c
- rdkafka_conf.c
- rdkafka_event.c
- rdkafka_feature.c
- rdkafka_lz4.c
- rdkafka_metadata.c
- rdkafka_metadata_cache.c
- rdkafka_msg.c
- rdkafka_msgset_reader.c
- rdkafka_msgset_writer.c
- rdkafka_offset.c
- rdkafka_op.c
- rdkafka_partition.c
- rdkafka_pattern.c
- rdkafka_queue.c
- rdkafka_range_assignor.c
- rdkafka_request.c
- rdkafka_roundrobin_assignor.c
- rdkafka_sasl.c
- rdkafka_sasl_plain.c
- rdkafka_subscription.c
- rdkafka_timer.c
- rdkafka_topic.c
- rdkafka_transport.c
- rdkafka_interceptor.c
- rdlist.c
- rdlog.c
- rdports.c
- rdrand.c
- rdregex.c
- rdstring.c
- rdunittest.c
- rdvarint.c
- snappy.c
- tinycthread.c
- xxhash.c
- lz4.c
- lz4frame.c
- lz4hc.c
-)
-
-if(WITH_LIBDL)
- list(APPEND sources rddl.c)
-endif()
-
-if(WITH_PLUGINS)
- list(APPEND sources rdkafka_plugin.c)
-endif()
-
-if(WITH_SASL_SCRAM)
- list(APPEND sources rdkafka_sasl_win32.c)
-elseif(WITH_SASL_CYRUS)
- list(APPEND sources rdkafka_sasl_cyrus.c)
-endif()
-
-if(WITH_ZLIB)
- list(APPEND sources rdgz.c)
-endif()
-
-if(NOT HAVE_REGEX)
- list(APPEND sources regexp.c)
-endif()
-
-add_library(rdkafka ${sources})
-
-# Support '#include <rdkafka.h>'
-target_include_directories(rdkafka PUBLIC "$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}>")
-
-# We need 'dummy' directory to support `#include "../config.h"` path
-set(dummy "${GENERATED_DIR}/dummy")
-file(MAKE_DIRECTORY "${dummy}")
-target_include_directories(rdkafka PUBLIC "$<BUILD_INTERFACE:${dummy}>")
-
-if(WITH_ZLIB)
- find_package(ZLIB REQUIRED)
- target_link_libraries(rdkafka PUBLIC ZLIB::ZLIB)
-endif()
-
-if(WITH_SSL)
- if(WITH_BUNDLED_SSL) # option from 'h2o' parent project
- if(NOT TARGET bundled-ssl)
- message(FATAL_ERROR "bundled-ssl target not exist")
- endif()
- target_include_directories(rdkafka BEFORE PUBLIC ${BUNDLED_SSL_INCLUDE_DIR})
- target_link_libraries(rdkafka PUBLIC ${BUNDLED_SSL_LIBRARIES})
- add_dependencies(rdkafka bundled-ssl)
- else()
- find_package(OpenSSL REQUIRED)
- target_link_libraries(rdkafka PUBLIC OpenSSL::SSL OpenSSL::Crypto)
- endif()
-endif()
-
-if(LINK_ATOMIC)
- target_link_libraries(rdkafka PUBLIC "-latomic")
-endif()
-
-find_package(Threads REQUIRED)
-target_link_libraries(rdkafka PUBLIC Threads::Threads)
-
-if(WITH_SASL_CYRUS)
- pkg_check_modules(SASL REQUIRED libsasl2)
- target_link_libraries(rdkafka PUBLIC ${SASL_LIBRARIES})
-endif()
-
-install(
- TARGETS rdkafka
- EXPORT "${targets_export_name}"
- LIBRARY DESTINATION "${CMAKE_INSTALL_LIBDIR}"
- ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}"
- RUNTIME DESTINATION "${CMAKE_INSTALL_BINDIR}"
- INCLUDES DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}"
-)
-
-install(
- FILES "rdkafka.h"
- DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/librdkafka"
-)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/Makefile
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/Makefile b/thirdparty/librdkafka-0.11.1/src/Makefile
deleted file mode 100644
index 780edaa..0000000
--- a/thirdparty/librdkafka-0.11.1/src/Makefile
+++ /dev/null
@@ -1,81 +0,0 @@
-PKGNAME= librdkafka
-LIBNAME= librdkafka
-LIBVER= 1
-
--include ../Makefile.config
-
-ifneq ($(wildcard ../.git),)
-# Add librdkafka version string from git tag if this is a git checkout
-CPPFLAGS += -DLIBRDKAFKA_GIT_VERSION="\"$(shell git describe --abbrev=6 --dirty --tags 2>/dev/null)\""
-endif
-
-SRCS_$(WITH_SASL_CYRUS) += rdkafka_sasl_cyrus.c
-SRCS_$(WITH_SASL_SCRAM) += rdkafka_sasl_scram.c
-SRCS_$(WITH_SNAPPY) += snappy.c
-SRCS_$(WITH_ZLIB) += rdgz.c
-
-SRCS_LZ4 = xxhash.c
-ifneq ($(WITH_LZ4_EXT), y)
-# Use built-in liblz4
-SRCS_LZ4 += lz4.c lz4frame.c lz4hc.c
-endif
-SRCS_y += rdkafka_lz4.c $(SRCS_LZ4)
-
-SRCS_$(WITH_LIBDL) += rddl.c
-SRCS_$(WITH_PLUGINS) += rdkafka_plugin.c
-
-ifeq ($(HAVE_REGEX), n)
-SRCS_y += regexp.c
-endif
-
-SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
- rdkafka_conf.c rdkafka_timer.c rdkafka_offset.c \
- rdkafka_transport.c rdkafka_buf.c rdkafka_queue.c rdkafka_op.c \
- rdkafka_request.c rdkafka_cgrp.c rdkafka_pattern.c \
- rdkafka_partition.c rdkafka_subscription.c \
- rdkafka_assignor.c rdkafka_range_assignor.c \
- rdkafka_roundrobin_assignor.c rdkafka_feature.c \
- rdcrc32.c crc32c.c rdaddr.c rdrand.c rdlist.c tinycthread.c \
- rdlog.c rdstring.c rdkafka_event.c rdkafka_metadata.c \
- rdregex.c rdports.c rdkafka_metadata_cache.c rdavl.c \
- rdkafka_sasl.c rdkafka_sasl_plain.c rdkafka_interceptor.c \
- rdkafka_msgset_writer.c rdkafka_msgset_reader.c \
- rdvarint.c rdbuf.c rdunittest.c \
- $(SRCS_y)
-
-HDRS= rdkafka.h
-
-OBJS= $(SRCS:.c=.o)
-
-
-all: lib check
-
-include ../mklove/Makefile.base
-
-CHECK_FILES+= $(LIBFILENAME) $(LIBNAME).a
-
-file-check: lib
-check: file-check
- @(printf "%-30s " "Symbol visibility" ; \
- (($(SYMDUMPER) $(LIBFILENAME) | grep rd_kafka_new >/dev/null) && \
- ($(SYMDUMPER) $(LIBFILENAME) | grep -v rd_kafka_destroy >/dev/null) && \
- printf "$(MKL_GREEN)OK$(MKL_CLR_RESET)\n") || \
- printf "$(MKL_RED)FAILED$(MKL_CLR_RESET)\n")
-
-install: lib-install
-
-clean: lib-clean
-
-# Compile LZ4 with -O3
-$(SRCS_LZ4:.c=.o): CFLAGS:=$(CFLAGS) -O3
-
-ifeq ($(WITH_LDS),y)
-# Enable linker script if supported by platform
-LIB_LDFLAGS+= $(LDFLAG_LINKERSCRIPT)$(LIBNAME).lds
-endif
-
-$(LIBNAME).lds: $(HDRS)
- @(printf "$(MKL_YELLOW)Generating linker script $@ from $(HDRS)$(MKL_CLR_RESET)\n" ; \
- cat $(HDRS) | ../lds-gen.py > $@)
-
--include $(DEPS)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/crc32c.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/crc32c.c b/thirdparty/librdkafka-0.11.1/src/crc32c.c
deleted file mode 100644
index cd58147..0000000
--- a/thirdparty/librdkafka-0.11.1/src/crc32c.c
+++ /dev/null
@@ -1,427 +0,0 @@
-/* Copied from http://stackoverflow.com/a/17646775/1821055
- * with the following modifications:
- * * remove test code
- * * global hw/sw initialization to be called once per process
- * * HW support is determined by configure's WITH_CRC32C_HW
- * * Windows porting (no hardware support on Windows yet)
- *
- * FIXME:
- * * Hardware support on Windows (MSVC assembler)
- * * Hardware support on ARM
- */
-
-/* crc32c.c -- compute CRC-32C using the Intel crc32 instruction
- * Copyright (C) 2013 Mark Adler
- * Version 1.1 1 Aug 2013 Mark Adler
- */
-
-/*
- This software is provided 'as-is', without any express or implied
- warranty. In no event will the author be held liable for any damages
- arising from the use of this software.
-
- Permission is granted to anyone to use this software for any purpose,
- including commercial applications, and to alter it and redistribute it
- freely, subject to the following restrictions:
-
- 1. The origin of this software must not be misrepresented; you must not
- claim that you wrote the original software. If you use this software
- in a product, an acknowledgment in the product documentation would be
- appreciated but is not required.
- 2. Altered source versions must be plainly marked as such, and must not be
- misrepresented as being the original software.
- 3. This notice may not be removed or altered from any source distribution.
-
- Mark Adler
- madler@alumni.caltech.edu
- */
-
-/* Use hardware CRC instruction on Intel SSE 4.2 processors. This computes a
- CRC-32C, *not* the CRC-32 used by Ethernet and zip, gzip, etc. A software
- version is provided as a fall-back, as well as for speed comparisons. */
-
-/* Version history:
- 1.0 10 Feb 2013 First version
- 1.1 1 Aug 2013 Correct comments on why three crc instructions in parallel
- */
-
-#include "rd.h"
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <stdint.h>
-#ifndef _MSC_VER
-#include <unistd.h>
-#endif
-
-#include "rdunittest.h"
-
-#include "crc32c.h"
-
-/* CRC-32C (iSCSI) polynomial in reversed bit order. */
-#define POLY 0x82f63b78
-
-/* Table for a quadword-at-a-time software crc. */
-static uint32_t crc32c_table[8][256];
-
-/* Construct table for software CRC-32C calculation. */
-static void crc32c_init_sw(void)
-{
- uint32_t n, crc, k;
-
- for (n = 0; n < 256; n++) {
- crc = n;
- crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
- crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
- crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
- crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
- crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
- crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
- crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
- crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
- crc32c_table[0][n] = crc;
- }
- for (n = 0; n < 256; n++) {
- crc = crc32c_table[0][n];
- for (k = 1; k < 8; k++) {
- crc = crc32c_table[0][crc & 0xff] ^ (crc >> 8);
- crc32c_table[k][n] = crc;
- }
- }
-}
-
-/* Table-driven software version as a fall-back. This is about 15 times slower
- than using the hardware instructions. This assumes little-endian integers,
- as is the case on Intel processors that the assembler code here is for. */
-static uint32_t crc32c_sw(uint32_t crci, const void *buf, size_t len)
-{
- const unsigned char *next = buf;
- uint64_t crc;
-
- crc = crci ^ 0xffffffff;
- while (len && ((uintptr_t)next & 7) != 0) {
- crc = crc32c_table[0][(crc ^ *next++) & 0xff] ^ (crc >> 8);
- len--;
- }
- while (len >= 8) {
-#if defined(__sparc) || defined(__sparc__) || defined(__APPLE__)
- /* Alignment-safe alternative.
- * This is also needed on Apple to avoid compilation warnings for
- * non-appearant alignment reasons. */
- uint64_t ncopy;
- memcpy(&ncopy, next, sizeof(ncopy));
- crc ^= ncopy;
-#else
- crc ^= *(uint64_t *)next;
-#endif
- crc = crc32c_table[7][crc & 0xff] ^
- crc32c_table[6][(crc >> 8) & 0xff] ^
- crc32c_table[5][(crc >> 16) & 0xff] ^
- crc32c_table[4][(crc >> 24) & 0xff] ^
- crc32c_table[3][(crc >> 32) & 0xff] ^
- crc32c_table[2][(crc >> 40) & 0xff] ^
- crc32c_table[1][(crc >> 48) & 0xff] ^
- crc32c_table[0][crc >> 56];
- next += 8;
- len -= 8;
- }
- while (len) {
- crc = crc32c_table[0][(crc ^ *next++) & 0xff] ^ (crc >> 8);
- len--;
- }
- return (uint32_t)crc ^ 0xffffffff;
-}
-
-
-#if WITH_CRC32C_HW
-static int sse42; /* Cached SSE42 support */
-
-/* Multiply a matrix times a vector over the Galois field of two elements,
- GF(2). Each element is a bit in an unsigned integer. mat must have at
- least as many entries as the power of two for most significant one bit in
- vec. */
-static RD_INLINE uint32_t gf2_matrix_times(uint32_t *mat, uint32_t vec)
-{
- uint32_t sum;
-
- sum = 0;
- while (vec) {
- if (vec & 1)
- sum ^= *mat;
- vec >>= 1;
- mat++;
- }
- return sum;
-}
-
-/* Multiply a matrix by itself over GF(2). Both mat and square must have 32
- rows. */
-static RD_INLINE void gf2_matrix_square(uint32_t *square, uint32_t *mat)
-{
- int n;
-
- for (n = 0; n < 32; n++)
- square[n] = gf2_matrix_times(mat, mat[n]);
-}
-
-/* Construct an operator to apply len zeros to a crc. len must be a power of
- two. If len is not a power of two, then the result is the same as for the
- largest power of two less than len. The result for len == 0 is the same as
- for len == 1. A version of this routine could be easily written for any
- len, but that is not needed for this application. */
-static void crc32c_zeros_op(uint32_t *even, size_t len)
-{
- int n;
- uint32_t row;
- uint32_t odd[32]; /* odd-power-of-two zeros operator */
-
- /* put operator for one zero bit in odd */
- odd[0] = POLY; /* CRC-32C polynomial */
- row = 1;
- for (n = 1; n < 32; n++) {
- odd[n] = row;
- row <<= 1;
- }
-
- /* put operator for two zero bits in even */
- gf2_matrix_square(even, odd);
-
- /* put operator for four zero bits in odd */
- gf2_matrix_square(odd, even);
-
- /* first square will put the operator for one zero byte (eight zero bits),
- in even -- next square puts operator for two zero bytes in odd, and so
- on, until len has been rotated down to zero */
- do {
- gf2_matrix_square(even, odd);
- len >>= 1;
- if (len == 0)
- return;
- gf2_matrix_square(odd, even);
- len >>= 1;
- } while (len);
-
- /* answer ended up in odd -- copy to even */
- for (n = 0; n < 32; n++)
- even[n] = odd[n];
-}
-
-/* Take a length and build four lookup tables for applying the zeros operator
- for that length, byte-by-byte on the operand. */
-static void crc32c_zeros(uint32_t zeros[][256], size_t len)
-{
- uint32_t n;
- uint32_t op[32];
-
- crc32c_zeros_op(op, len);
- for (n = 0; n < 256; n++) {
- zeros[0][n] = gf2_matrix_times(op, n);
- zeros[1][n] = gf2_matrix_times(op, n << 8);
- zeros[2][n] = gf2_matrix_times(op, n << 16);
- zeros[3][n] = gf2_matrix_times(op, n << 24);
- }
-}
-
-/* Apply the zeros operator table to crc. */
-static RD_INLINE uint32_t crc32c_shift(uint32_t zeros[][256], uint32_t crc)
-{
- return zeros[0][crc & 0xff] ^ zeros[1][(crc >> 8) & 0xff] ^
- zeros[2][(crc >> 16) & 0xff] ^ zeros[3][crc >> 24];
-}
-
-/* Block sizes for three-way parallel crc computation. LONG and SHORT must
- both be powers of two. The associated string constants must be set
- accordingly, for use in constructing the assembler instructions. */
-#define LONG 8192
-#define LONGx1 "8192"
-#define LONGx2 "16384"
-#define SHORT 256
-#define SHORTx1 "256"
-#define SHORTx2 "512"
-
-/* Tables for hardware crc that shift a crc by LONG and SHORT zeros. */
-static uint32_t crc32c_long[4][256];
-static uint32_t crc32c_short[4][256];
-
-/* Initialize tables for shifting crcs. */
-static void crc32c_init_hw(void)
-{
- crc32c_zeros(crc32c_long, LONG);
- crc32c_zeros(crc32c_short, SHORT);
-}
-
-/* Compute CRC-32C using the Intel hardware instruction. */
-static uint32_t crc32c_hw(uint32_t crc, const void *buf, size_t len)
-{
- const unsigned char *next = buf;
- const unsigned char *end;
- uint64_t crc0, crc1, crc2; /* need to be 64 bits for crc32q */
-
- /* pre-process the crc */
- crc0 = crc ^ 0xffffffff;
-
- /* compute the crc for up to seven leading bytes to bring the data pointer
- to an eight-byte boundary */
- while (len && ((uintptr_t)next & 7) != 0) {
- __asm__("crc32b\t" "(%1), %0"
- : "=r"(crc0)
- : "r"(next), "0"(crc0));
- next++;
- len--;
- }
-
- /* compute the crc on sets of LONG*3 bytes, executing three independent crc
- instructions, each on LONG bytes -- this is optimized for the Nehalem,
- Westmere, Sandy Bridge, and Ivy Bridge architectures, which have a
- throughput of one crc per cycle, but a latency of three cycles */
- while (len >= LONG*3) {
- crc1 = 0;
- crc2 = 0;
- end = next + LONG;
- do {
- __asm__("crc32q\t" "(%3), %0\n\t"
- "crc32q\t" LONGx1 "(%3), %1\n\t"
- "crc32q\t" LONGx2 "(%3), %2"
- : "=r"(crc0), "=r"(crc1), "=r"(crc2)
- : "r"(next), "0"(crc0), "1"(crc1), "2"(crc2));
- next += 8;
- } while (next < end);
- crc0 = crc32c_shift(crc32c_long, crc0) ^ crc1;
- crc0 = crc32c_shift(crc32c_long, crc0) ^ crc2;
- next += LONG*2;
- len -= LONG*3;
- }
-
- /* do the same thing, but now on SHORT*3 blocks for the remaining data less
- than a LONG*3 block */
- while (len >= SHORT*3) {
- crc1 = 0;
- crc2 = 0;
- end = next + SHORT;
- do {
- __asm__("crc32q\t" "(%3), %0\n\t"
- "crc32q\t" SHORTx1 "(%3), %1\n\t"
- "crc32q\t" SHORTx2 "(%3), %2"
- : "=r"(crc0), "=r"(crc1), "=r"(crc2)
- : "r"(next), "0"(crc0), "1"(crc1), "2"(crc2));
- next += 8;
- } while (next < end);
- crc0 = crc32c_shift(crc32c_short, crc0) ^ crc1;
- crc0 = crc32c_shift(crc32c_short, crc0) ^ crc2;
- next += SHORT*2;
- len -= SHORT*3;
- }
-
- /* compute the crc on the remaining eight-byte units less than a SHORT*3
- block */
- end = next + (len - (len & 7));
- while (next < end) {
- __asm__("crc32q\t" "(%1), %0"
- : "=r"(crc0)
- : "r"(next), "0"(crc0));
- next += 8;
- }
- len &= 7;
-
- /* compute the crc for up to seven trailing bytes */
- while (len) {
- __asm__("crc32b\t" "(%1), %0"
- : "=r"(crc0)
- : "r"(next), "0"(crc0));
- next++;
- len--;
- }
-
- /* return a post-processed crc */
- return (uint32_t)crc0 ^ 0xffffffff;
-}
-
-/* Check for SSE 4.2. SSE 4.2 was first supported in Nehalem processors
- introduced in November, 2008. This does not check for the existence of the
- cpuid instruction itself, which was introduced on the 486SL in 1992, so this
- will fail on earlier x86 processors. cpuid works on all Pentium and later
- processors. */
-#define SSE42(have) \
- do { \
- uint32_t eax, ecx; \
- eax = 1; \
- __asm__("cpuid" \
- : "=c"(ecx) \
- : "a"(eax) \
- : "%ebx", "%edx"); \
- (have) = (ecx >> 20) & 1; \
- } while (0)
-
-#endif /* WITH_CRC32C_HW */
-
-/* Compute a CRC-32C. If the crc32 instruction is available, use the hardware
- version. Otherwise, use the software version. */
-uint32_t crc32c(uint32_t crc, const void *buf, size_t len)
-{
-#if WITH_CRC32C_HW
- if (sse42)
- return crc32c_hw(crc, buf, len);
- else
-#endif
- return crc32c_sw(crc, buf, len);
-}
-
-
-
-
-
-
-/**
- * @brief Populate shift tables once
- */
-void crc32c_global_init (void) {
-#if WITH_CRC32C_HW
- SSE42(sse42);
- if (sse42)
- crc32c_init_hw();
- else
-#endif
- crc32c_init_sw();
-}
-
-int unittest_crc32c (void) {
- const char *buf =
-" This software is provided 'as-is', without any express or implied\n"
-" warranty. In no event will the author be held liable for any damages\n"
-" arising from the use of this software.\n"
-"\n"
-" Permission is granted to anyone to use this software for any purpose,\n"
-" including commercial applications, and to alter it and redistribute it\n"
-" freely, subject to the following restrictions:\n"
-"\n"
-" 1. The origin of this software must not be misrepresented; you must not\n"
-" claim that you wrote the original software. If you use this software\n"
-" in a product, an acknowledgment in the product documentation would be\n"
-" appreciated but is not required.\n"
-" 2. Altered source versions must be plainly marked as such, and must not be\n"
-" misrepresented as being the original software.\n"
-" 3. This notice may not be removed or altered from any source distribution.";
- const uint32_t expected_crc = 0x7dcde113;
- uint32_t crc;
- const char *how;
-
- crc32c_global_init();
-
-#if WITH_CRC32C_HW
- if (sse42)
- how = "hardware (SSE42)";
- else
- how = "software (SE42 supported in build but not at runtime)";
-#else
- how = "software";
-#endif
- RD_UT_SAY("Calculate CRC32C using %s", how);
-
- crc = crc32c(0, buf, strlen(buf));
- RD_UT_ASSERT(crc == expected_crc,
- "Calculated CRC 0x%"PRIx32
- " not matching expected CRC 0x%"PRIx32,
- crc, expected_crc);
-
- RD_UT_PASS();
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/crc32c.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/crc32c.h b/thirdparty/librdkafka-0.11.1/src/crc32c.h
deleted file mode 100644
index 6abe33e..0000000
--- a/thirdparty/librdkafka-0.11.1/src/crc32c.h
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2017 Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#pragma once
-
-uint32_t crc32c(uint32_t crc, const void *buf, size_t len);
-
-void crc32c_global_init (void);
-
-int unittest_crc32c (void);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.1/src/librdkafka_cgrp_synch.png
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/librdkafka_cgrp_synch.png b/thirdparty/librdkafka-0.11.1/src/librdkafka_cgrp_synch.png
deleted file mode 100644
index 8df1eda..0000000
Binary files a/thirdparty/librdkafka-0.11.1/src/librdkafka_cgrp_synch.png and /dev/null differ