You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:31 UTC
[03/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade
to librdkafka 0.11.4
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/CMakeLists.txt b/thirdparty/librdkafka-0.11.4/src-cpp/CMakeLists.txt
new file mode 100644
index 0000000..db71516
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/CMakeLists.txt
@@ -0,0 +1,35 @@
+add_library(
+ rdkafka++
+ ConfImpl.cpp
+ ConsumerImpl.cpp
+ HandleImpl.cpp
+ KafkaConsumerImpl.cpp
+ MessageImpl.cpp
+ MetadataImpl.cpp
+ ProducerImpl.cpp
+ QueueImpl.cpp
+ RdKafka.cpp
+ TopicImpl.cpp
+ TopicPartitionImpl.cpp
+)
+
+target_link_libraries(rdkafka++ PUBLIC rdkafka)
+
+# Support '#include <rdkafcpp.h>'
+target_include_directories(rdkafka++ PUBLIC "$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}>")
+if(NOT RDKAFKA_BUILD_STATIC)
+ target_compile_definitions(rdkafka++ PRIVATE LIBRDKAFKACPP_EXPORTS)
+endif()
+install(
+ TARGETS rdkafka++
+ EXPORT "${targets_export_name}"
+ LIBRARY DESTINATION "${CMAKE_INSTALL_LIBDIR}"
+ ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}"
+ RUNTIME DESTINATION "${CMAKE_INSTALL_BINDIR}"
+ INCLUDES DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}"
+)
+
+install(
+ FILES "rdkafkacpp.h"
+ DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/librdkafka"
+)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/ConfImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/ConfImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/ConfImpl.cpp
new file mode 100644
index 0000000..709c728
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/ConfImpl.cpp
@@ -0,0 +1,89 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <string>
+#include <list>
+
+#include "rdkafkacpp_int.h"
+
+
+
+RdKafka::ConfImpl::ConfResult RdKafka::ConfImpl::set(const std::string &name,
+ const std::string &value,
+ std::string &errstr) {
+ rd_kafka_conf_res_t res;
+ char errbuf[512];
+
+ if (this->conf_type_ == CONF_GLOBAL)
+ res = rd_kafka_conf_set(this->rk_conf_,
+ name.c_str(), value.c_str(),
+ errbuf, sizeof(errbuf));
+ else
+ res = rd_kafka_topic_conf_set(this->rkt_conf_,
+ name.c_str(), value.c_str(),
+ errbuf, sizeof(errbuf));
+
+ if (res != RD_KAFKA_CONF_OK)
+ errstr = errbuf;
+
+ return static_cast<Conf::ConfResult>(res);
+}
+
+
+std::list<std::string> *RdKafka::ConfImpl::dump () {
+
+ const char **arrc;
+ size_t cnt;
+ std::list<std::string> *arr;
+
+ if (rk_conf_)
+ arrc = rd_kafka_conf_dump(rk_conf_, &cnt);
+ else
+ arrc = rd_kafka_topic_conf_dump(rkt_conf_, &cnt);
+
+ arr = new std::list<std::string>();
+ for (int i = 0 ; i < static_cast<int>(cnt) ; i++)
+ arr->push_back(std::string(arrc[i]));
+
+ rd_kafka_conf_dump_free(arrc, cnt);
+ return arr;
+}
+
+RdKafka::Conf *RdKafka::Conf::create (ConfType type) {
+ ConfImpl *conf = new ConfImpl();
+
+ conf->conf_type_ = type;
+
+ if (type == CONF_GLOBAL)
+ conf->rk_conf_ = rd_kafka_conf_new();
+ else
+ conf->rkt_conf_ = rd_kafka_topic_conf_new();
+
+ return conf;
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/ConsumerImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/ConsumerImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/ConsumerImpl.cpp
new file mode 100644
index 0000000..bb46877
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/ConsumerImpl.cpp
@@ -0,0 +1,233 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <string>
+#include <list>
+#include <cerrno>
+
+#include "rdkafkacpp_int.h"
+
+RdKafka::Consumer::~Consumer () {}
+
+RdKafka::Consumer *RdKafka::Consumer::create (RdKafka::Conf *conf,
+ std::string &errstr) {
+ char errbuf[512];
+ RdKafka::ConfImpl *confimpl = dynamic_cast<RdKafka::ConfImpl *>(conf);
+ RdKafka::ConsumerImpl *rkc = new RdKafka::ConsumerImpl();
+ rd_kafka_conf_t *rk_conf = NULL;
+
+ if (confimpl) {
+ if (!confimpl->rk_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+ delete rkc;
+ return NULL;
+ }
+
+ rkc->set_common_config(confimpl);
+
+ rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_);
+ }
+
+ rd_kafka_t *rk;
+ if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, rk_conf,
+ errbuf, sizeof(errbuf)))) {
+ errstr = errbuf;
+ delete rkc;
+ return NULL;
+ }
+
+ rkc->rk_ = rk;
+
+
+ return rkc;
+}
+
+int64_t RdKafka::Consumer::OffsetTail (int64_t offset) {
+ return RD_KAFKA_OFFSET_TAIL(offset);
+}
+
+RdKafka::ErrorCode RdKafka::ConsumerImpl::start (Topic *topic,
+ int32_t partition,
+ int64_t offset) {
+ RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
+
+ if (rd_kafka_consume_start(topicimpl->rkt_, partition, offset) == -1)
+ return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
+
+ return RdKafka::ERR_NO_ERROR;
+}
+
+
+RdKafka::ErrorCode RdKafka::ConsumerImpl::start (Topic *topic,
+ int32_t partition,
+ int64_t offset,
+ Queue *queue) {
+ RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
+ RdKafka::QueueImpl *queueimpl = dynamic_cast<RdKafka::QueueImpl *>(queue);
+
+ if (rd_kafka_consume_start_queue(topicimpl->rkt_, partition, offset,
+ queueimpl->queue_) == -1)
+ return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
+
+ return RdKafka::ERR_NO_ERROR;
+}
+
+
+RdKafka::ErrorCode RdKafka::ConsumerImpl::stop (Topic *topic,
+ int32_t partition) {
+ RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
+
+ if (rd_kafka_consume_stop(topicimpl->rkt_, partition) == -1)
+ return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
+
+ return RdKafka::ERR_NO_ERROR;
+}
+
+RdKafka::ErrorCode RdKafka::ConsumerImpl::seek (Topic *topic,
+ int32_t partition,
+ int64_t offset,
+ int timeout_ms) {
+ RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
+
+ if (rd_kafka_seek(topicimpl->rkt_, partition, offset, timeout_ms) == -1)
+ return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
+
+ return RdKafka::ERR_NO_ERROR;
+}
+
+RdKafka::Message *RdKafka::ConsumerImpl::consume (Topic *topic,
+ int32_t partition,
+ int timeout_ms) {
+ RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
+ rd_kafka_message_t *rkmessage;
+
+ rkmessage = rd_kafka_consume(topicimpl->rkt_, partition, timeout_ms);
+ if (!rkmessage)
+ return new RdKafka::MessageImpl(topic,
+ static_cast<RdKafka::ErrorCode>
+ (rd_kafka_last_error()));
+
+ return new RdKafka::MessageImpl(topic, rkmessage);
+}
+
+namespace {
+ /* Helper struct for `consume_callback'.
+ * Encapsulates the values we need in order to call `rd_kafka_consume_callback'
+ * and keep track of the C++ callback function and `opaque' value.
+ */
+ struct ConsumerImplCallback {
+ ConsumerImplCallback(RdKafka::Topic* topic, RdKafka::ConsumeCb* cb, void* data)
+ : topic(topic), cb_cls(cb), cb_data(data) {
+ }
+ /* This function is the one we give to `rd_kafka_consume_callback', with
+ * the `opaque' pointer pointing to an instance of this struct, in which
+ * we can find the C++ callback and `cb_data'.
+ */
+ static void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque) {
+ ConsumerImplCallback *instance = static_cast<ConsumerImplCallback*>(opaque);
+ RdKafka::MessageImpl message(instance->topic, msg, false /*don't free*/);
+ instance->cb_cls->consume_cb(message, instance->cb_data);
+ }
+ RdKafka::Topic *topic;
+ RdKafka::ConsumeCb *cb_cls;
+ void *cb_data;
+ };
+}
+
+int RdKafka::ConsumerImpl::consume_callback (RdKafka::Topic* topic,
+ int32_t partition,
+ int timeout_ms,
+ RdKafka::ConsumeCb *consume_cb,
+ void *opaque) {
+ RdKafka::TopicImpl *topicimpl = static_cast<RdKafka::TopicImpl *>(topic);
+ ConsumerImplCallback context(topic, consume_cb, opaque);
+ return rd_kafka_consume_callback(topicimpl->rkt_, partition, timeout_ms,
+ &ConsumerImplCallback::consume_cb_trampoline, &context);
+}
+
+
+RdKafka::Message *RdKafka::ConsumerImpl::consume (Queue *queue,
+ int timeout_ms) {
+ RdKafka::QueueImpl *queueimpl = dynamic_cast<RdKafka::QueueImpl *>(queue);
+ rd_kafka_message_t *rkmessage;
+
+ rkmessage = rd_kafka_consume_queue(queueimpl->queue_, timeout_ms);
+ if (!rkmessage)
+ return new RdKafka::MessageImpl(NULL,
+ static_cast<RdKafka::ErrorCode>
+ (rd_kafka_last_error()));
+ /*
+ * Recover our Topic * from the topic conf's opaque field, which we
+ * set in RdKafka::Topic::create() for just this kind of situation.
+ */
+ void *opaque = rd_kafka_topic_opaque(rkmessage->rkt);
+ Topic *topic = static_cast<Topic *>(opaque);
+
+ return new RdKafka::MessageImpl(topic, rkmessage);
+}
+
+namespace {
+ /* Helper struct for `consume_callback' with a Queue.
+ * Encapsulates the values we need in order to call `rd_kafka_consume_callback'
+ * and keep track of the C++ callback function and `opaque' value.
+ */
+ struct ConsumerImplQueueCallback {
+ ConsumerImplQueueCallback(RdKafka::ConsumeCb *cb, void *data)
+ : cb_cls(cb), cb_data(data) {
+ }
+ /* This function is the one we give to `rd_kafka_consume_callback', with
+ * the `opaque' pointer pointing to an instance of this struct, in which
+ * we can find the C++ callback and `cb_data'.
+ */
+ static void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque) {
+ ConsumerImplQueueCallback *instance = static_cast<ConsumerImplQueueCallback *>(opaque);
+ /*
+ * Recover our Topic * from the topic conf's opaque field, which we
+ * set in RdKafka::Topic::create() for just this kind of situation.
+ */
+ void *topic_opaque = rd_kafka_topic_opaque(msg->rkt);
+ RdKafka::Topic *topic = static_cast<RdKafka::Topic *>(topic_opaque);
+ RdKafka::MessageImpl message(topic, msg, false /*don't free*/);
+ instance->cb_cls->consume_cb(message, instance->cb_data);
+ }
+ RdKafka::ConsumeCb *cb_cls;
+ void *cb_data;
+ };
+}
+
+int RdKafka::ConsumerImpl::consume_callback (Queue *queue,
+ int timeout_ms,
+ RdKafka::ConsumeCb *consume_cb,
+ void *opaque) {
+ RdKafka::QueueImpl *queueimpl = dynamic_cast<RdKafka::QueueImpl *>(queue);
+ ConsumerImplQueueCallback context(consume_cb, opaque);
+ return rd_kafka_consume_callback_queue(queueimpl->queue_, timeout_ms,
+ &ConsumerImplQueueCallback::consume_cb_trampoline,
+ &context);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/HandleImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/HandleImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/HandleImpl.cpp
new file mode 100644
index 0000000..3bdccbf
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/HandleImpl.cpp
@@ -0,0 +1,365 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <string>
+#include <list>
+
+#include "rdkafkacpp_int.h"
+
+void RdKafka::consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+ RdKafka::Topic* topic = static_cast<Topic *>(rd_kafka_topic_opaque(msg->rkt));
+
+ RdKafka::MessageImpl message(topic, msg, false /*don't free*/);
+
+ handle->consume_cb_->consume_cb(message, opaque);
+}
+
+void RdKafka::log_cb_trampoline (const rd_kafka_t *rk, int level,
+ const char *fac, const char *buf) {
+ if (!rk) {
+ rd_kafka_log_print(rk, level, fac, buf);
+ return;
+ }
+
+ void *opaque = rd_kafka_opaque(rk);
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+ if (!handle->event_cb_) {
+ rd_kafka_log_print(rk, level, fac, buf);
+ return;
+ }
+
+ RdKafka::EventImpl event(RdKafka::Event::EVENT_LOG,
+ RdKafka::ERR_NO_ERROR,
+ static_cast<RdKafka::Event::Severity>(level),
+ fac, buf);
+
+ handle->event_cb_->event_cb(event);
+}
+
+
+void RdKafka::error_cb_trampoline (rd_kafka_t *rk, int err,
+ const char *reason, void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+ RdKafka::EventImpl event(RdKafka::Event::EVENT_ERROR,
+ static_cast<RdKafka::ErrorCode>(err),
+ RdKafka::Event::EVENT_SEVERITY_ERROR,
+ NULL,
+ reason);
+
+ handle->event_cb_->event_cb(event);
+}
+
+
+void RdKafka::throttle_cb_trampoline (rd_kafka_t *rk, const char *broker_name,
+ int32_t broker_id,
+ int throttle_time_ms,
+ void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+ RdKafka::EventImpl event(RdKafka::Event::EVENT_THROTTLE);
+ event.str_ = broker_name;
+ event.id_ = broker_id;
+ event.throttle_time_ = throttle_time_ms;
+
+ handle->event_cb_->event_cb(event);
+}
+
+
+int RdKafka::stats_cb_trampoline (rd_kafka_t *rk, char *json, size_t json_len,
+ void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+ RdKafka::EventImpl event(RdKafka::Event::EVENT_STATS,
+ RdKafka::ERR_NO_ERROR,
+ RdKafka::Event::EVENT_SEVERITY_INFO,
+ NULL, json);
+
+ handle->event_cb_->event_cb(event);
+
+ return 0;
+}
+
+
+int RdKafka::socket_cb_trampoline (int domain, int type, int protocol,
+ void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+ return handle->socket_cb_->socket_cb(domain, type, protocol);
+}
+
+int RdKafka::open_cb_trampoline (const char *pathname, int flags, mode_t mode,
+ void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+ return handle->open_cb_->open_cb(pathname, flags, static_cast<int>(mode));
+}
+
+RdKafka::ErrorCode RdKafka::HandleImpl::metadata (bool all_topics,
+ const Topic *only_rkt,
+ Metadata **metadatap,
+ int timeout_ms) {
+
+ const rd_kafka_metadata_t *cmetadatap=NULL;
+
+ rd_kafka_topic_t *topic = only_rkt ?
+ static_cast<const TopicImpl *>(only_rkt)->rkt_ : NULL;
+
+ const rd_kafka_resp_err_t rc = rd_kafka_metadata(rk_, all_topics, topic,
+ &cmetadatap,timeout_ms);
+
+ *metadatap = (rc == RD_KAFKA_RESP_ERR_NO_ERROR) ?
+ new RdKafka::MetadataImpl(cmetadatap) : NULL;
+
+ return static_cast<RdKafka::ErrorCode>(rc);
+}
+
+/**
+ * Convert a list of C partitions to C++ partitions
+ */
+static void c_parts_to_partitions (const rd_kafka_topic_partition_list_t
+ *c_parts,
+ std::vector<RdKafka::TopicPartition*>
+ &partitions) {
+ partitions.resize(c_parts->cnt);
+ for (int i = 0 ; i < c_parts->cnt ; i++)
+ partitions[i] = new RdKafka::TopicPartitionImpl(&c_parts->elems[i]);
+}
+
+static void free_partition_vector (std::vector<RdKafka::TopicPartition*> &v) {
+ for (unsigned int i = 0 ; i < v.size() ; i++)
+ delete v[i];
+ v.clear();
+}
+
+void
+RdKafka::rebalance_cb_trampoline (rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *c_partitions,
+ void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+ std::vector<RdKafka::TopicPartition*> partitions;
+
+ c_parts_to_partitions(c_partitions, partitions);
+
+ handle->rebalance_cb_->rebalance_cb(
+ dynamic_cast<RdKafka::KafkaConsumer*>(handle),
+ static_cast<RdKafka::ErrorCode>(err),
+ partitions);
+
+ free_partition_vector(partitions);
+}
+
+
+void
+RdKafka::offset_commit_cb_trampoline0 (
+ rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *c_offsets, void *opaque) {
+ OffsetCommitCb *cb = static_cast<RdKafka::OffsetCommitCb *>(opaque);
+ std::vector<RdKafka::TopicPartition*> offsets;
+
+ if (c_offsets)
+ c_parts_to_partitions(c_offsets, offsets);
+
+ cb->offset_commit_cb(static_cast<RdKafka::ErrorCode>(err), offsets);
+
+ free_partition_vector(offsets);
+}
+
+static void
+offset_commit_cb_trampoline (
+ rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *c_offsets, void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+ RdKafka::offset_commit_cb_trampoline0(rk, err, c_offsets,
+ handle->offset_commit_cb_);
+}
+
+
+void RdKafka::HandleImpl::set_common_config (RdKafka::ConfImpl *confimpl) {
+
+ rd_kafka_conf_set_opaque(confimpl->rk_conf_, this);
+
+ if (confimpl->event_cb_) {
+ rd_kafka_conf_set_log_cb(confimpl->rk_conf_,
+ RdKafka::log_cb_trampoline);
+ rd_kafka_conf_set_error_cb(confimpl->rk_conf_,
+ RdKafka::error_cb_trampoline);
+ rd_kafka_conf_set_throttle_cb(confimpl->rk_conf_,
+ RdKafka::throttle_cb_trampoline);
+ rd_kafka_conf_set_stats_cb(confimpl->rk_conf_,
+ RdKafka::stats_cb_trampoline);
+ event_cb_ = confimpl->event_cb_;
+ }
+
+ if (confimpl->socket_cb_) {
+ rd_kafka_conf_set_socket_cb(confimpl->rk_conf_,
+ RdKafka::socket_cb_trampoline);
+ socket_cb_ = confimpl->socket_cb_;
+ }
+
+ if (confimpl->open_cb_) {
+#ifndef _MSC_VER
+ rd_kafka_conf_set_open_cb(confimpl->rk_conf_, RdKafka::open_cb_trampoline);
+ open_cb_ = confimpl->open_cb_;
+#endif
+ }
+
+ if (confimpl->rebalance_cb_) {
+ rd_kafka_conf_set_rebalance_cb(confimpl->rk_conf_,
+ RdKafka::rebalance_cb_trampoline);
+ rebalance_cb_ = confimpl->rebalance_cb_;
+ }
+
+ if (confimpl->offset_commit_cb_) {
+ rd_kafka_conf_set_offset_commit_cb(confimpl->rk_conf_,
+ offset_commit_cb_trampoline);
+ offset_commit_cb_ = confimpl->offset_commit_cb_;
+ }
+
+ if (confimpl->consume_cb_) {
+ rd_kafka_conf_set_consume_cb(confimpl->rk_conf_,
+ RdKafka::consume_cb_trampoline);
+ consume_cb_ = confimpl->consume_cb_;
+ }
+
+}
+
+
+RdKafka::ErrorCode
+RdKafka::HandleImpl::pause (std::vector<RdKafka::TopicPartition*> &partitions) {
+ rd_kafka_topic_partition_list_t *c_parts;
+ rd_kafka_resp_err_t err;
+
+ c_parts = partitions_to_c_parts(partitions);
+
+ err = rd_kafka_pause_partitions(rk_, c_parts);
+
+ if (!err)
+ update_partitions_from_c_parts(partitions, c_parts);
+
+ rd_kafka_topic_partition_list_destroy(c_parts);
+
+ return static_cast<RdKafka::ErrorCode>(err);
+}
+
+
+RdKafka::ErrorCode
+RdKafka::HandleImpl::resume (std::vector<RdKafka::TopicPartition*> &partitions) {
+ rd_kafka_topic_partition_list_t *c_parts;
+ rd_kafka_resp_err_t err;
+
+ c_parts = partitions_to_c_parts(partitions);
+
+ err = rd_kafka_resume_partitions(rk_, c_parts);
+
+ if (!err)
+ update_partitions_from_c_parts(partitions, c_parts);
+
+ rd_kafka_topic_partition_list_destroy(c_parts);
+
+ return static_cast<RdKafka::ErrorCode>(err);
+}
+
+RdKafka::Queue *
+RdKafka::HandleImpl::get_partition_queue (const TopicPartition *part) {
+ rd_kafka_queue_t *rkqu;
+ rkqu = rd_kafka_queue_get_partition(rk_,
+ part->topic().c_str(),
+ part->partition());
+
+ if (rkqu == NULL)
+ return NULL;
+
+ RdKafka::QueueImpl *queueimpl = new RdKafka::QueueImpl;
+ queueimpl->queue_ = rkqu;
+
+ return queueimpl;
+}
+
+RdKafka::ErrorCode
+RdKafka::HandleImpl::set_log_queue (RdKafka::Queue *queue) {
+ rd_kafka_queue_t *rkqu = NULL;
+ if (queue) {
+ QueueImpl *queueimpl = dynamic_cast<QueueImpl *>(queue);
+ rkqu = queueimpl->queue_;
+ }
+ return static_cast<RdKafka::ErrorCode>(
+ rd_kafka_set_log_queue(rk_, rkqu));
+}
+
+namespace RdKafka {
+
+rd_kafka_topic_partition_list_t *
+partitions_to_c_parts (const std::vector<RdKafka::TopicPartition*> &partitions){
+ rd_kafka_topic_partition_list_t *c_parts;
+
+ c_parts = rd_kafka_topic_partition_list_new((int)partitions.size());
+
+ for (unsigned int i = 0 ; i < partitions.size() ; i++) {
+ const RdKafka::TopicPartitionImpl *tpi =
+ dynamic_cast<const RdKafka::TopicPartitionImpl*>(partitions[i]);
+ rd_kafka_topic_partition_t *rktpar =
+ rd_kafka_topic_partition_list_add(c_parts,
+ tpi->topic_.c_str(), tpi->partition_);
+ rktpar->offset = tpi->offset_;
+ }
+
+ return c_parts;
+}
+
+
+/**
+ * @brief Update the application provided 'partitions' with info from 'c_parts'
+ */
+void
+update_partitions_from_c_parts (std::vector<RdKafka::TopicPartition*> &partitions,
+ const rd_kafka_topic_partition_list_t *c_parts) {
+ for (int i = 0 ; i < c_parts->cnt ; i++) {
+ rd_kafka_topic_partition_t *p = &c_parts->elems[i];
+
+ /* Find corresponding C++ entry */
+ for (unsigned int j = 0 ; j < partitions.size() ; j++) {
+ RdKafka::TopicPartitionImpl *pp =
+ dynamic_cast<RdKafka::TopicPartitionImpl*>(partitions[j]);
+ if (!strcmp(p->topic, pp->topic_.c_str()) &&
+ p->partition == pp->partition_) {
+ pp->offset_ = p->offset;
+ pp->err_ = static_cast<RdKafka::ErrorCode>(p->err);
+ }
+ }
+ }
+}
+
+};
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/KafkaConsumerImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/KafkaConsumerImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/KafkaConsumerImpl.cpp
new file mode 100644
index 0000000..f4e79d3
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/KafkaConsumerImpl.cpp
@@ -0,0 +1,257 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <string>
+#include <vector>
+
+#include "rdkafkacpp_int.h"
+
+RdKafka::KafkaConsumer::~KafkaConsumer () {}
+
+RdKafka::KafkaConsumer *RdKafka::KafkaConsumer::create (RdKafka::Conf *conf,
+ std::string &errstr) {
+ char errbuf[512];
+ RdKafka::ConfImpl *confimpl = dynamic_cast<RdKafka::ConfImpl *>(conf);
+ RdKafka::KafkaConsumerImpl *rkc = new RdKafka::KafkaConsumerImpl();
+ rd_kafka_conf_t *rk_conf = NULL;
+ size_t grlen;
+
+ if (!confimpl->rk_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+ delete rkc;
+ return NULL;
+ }
+
+ if (rd_kafka_conf_get(confimpl->rk_conf_, "group.id",
+ NULL, &grlen) != RD_KAFKA_CONF_OK ||
+ grlen <= 1 /* terminating null only */) {
+ errstr = "\"group.id\" must be configured";
+ delete rkc;
+ return NULL;
+ }
+
+ rkc->set_common_config(confimpl);
+
+ rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_);
+
+ rd_kafka_t *rk;
+ if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, rk_conf,
+ errbuf, sizeof(errbuf)))) {
+ errstr = errbuf;
+ delete rkc;
+ return NULL;
+ }
+
+ rkc->rk_ = rk;
+
+ /* Redirect handle queue to cgrp's queue to provide a single queue point */
+ rd_kafka_poll_set_consumer(rk);
+
+ return rkc;
+}
+
+
+
+
+
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::subscribe (const std::vector<std::string> &topics) {
+ rd_kafka_topic_partition_list_t *c_topics;
+ rd_kafka_resp_err_t err;
+
+ c_topics = rd_kafka_topic_partition_list_new((int)topics.size());
+
+ for (unsigned int i = 0 ; i < topics.size() ; i++)
+ rd_kafka_topic_partition_list_add(c_topics, topics[i].c_str(),
+ RD_KAFKA_PARTITION_UA);
+
+ err = rd_kafka_subscribe(rk_, c_topics);
+
+ rd_kafka_topic_partition_list_destroy(c_topics);
+
+ return static_cast<RdKafka::ErrorCode>(err);
+}
+
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::unsubscribe () {
+ return static_cast<RdKafka::ErrorCode>(rd_kafka_unsubscribe(this->rk_));
+}
+
+RdKafka::Message *RdKafka::KafkaConsumerImpl::consume (int timeout_ms) {
+ rd_kafka_message_t *rkmessage;
+
+ rkmessage = rd_kafka_consumer_poll(this->rk_, timeout_ms);
+
+ if (!rkmessage)
+ return new RdKafka::MessageImpl(NULL, RdKafka::ERR__TIMED_OUT);
+
+ return new RdKafka::MessageImpl(rkmessage);
+
+}
+
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::assignment (std::vector<RdKafka::TopicPartition*> &partitions) {
+ rd_kafka_topic_partition_list_t *c_parts;
+ rd_kafka_resp_err_t err;
+
+ if ((err = rd_kafka_assignment(rk_, &c_parts)))
+ return static_cast<RdKafka::ErrorCode>(err);
+
+ partitions.resize(c_parts->cnt);
+
+ for (int i = 0 ; i < c_parts->cnt ; i++)
+ partitions[i] = new RdKafka::TopicPartitionImpl(&c_parts->elems[i]);
+
+ rd_kafka_topic_partition_list_destroy(c_parts);
+
+ return RdKafka::ERR_NO_ERROR;
+}
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::subscription (std::vector<std::string> &topics) {
+ rd_kafka_topic_partition_list_t *c_topics;
+ rd_kafka_resp_err_t err;
+
+ if ((err = rd_kafka_subscription(rk_, &c_topics)))
+ return static_cast<RdKafka::ErrorCode>(err);
+
+ topics.resize(c_topics->cnt);
+ for (int i = 0 ; i < c_topics->cnt ; i++)
+ topics[i] = std::string(c_topics->elems[i].topic);
+
+ rd_kafka_topic_partition_list_destroy(c_topics);
+
+ return RdKafka::ERR_NO_ERROR;
+}
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::assign (const std::vector<TopicPartition*> &partitions) {
+ rd_kafka_topic_partition_list_t *c_parts;
+ rd_kafka_resp_err_t err;
+
+ c_parts = partitions_to_c_parts(partitions);
+
+ err = rd_kafka_assign(rk_, c_parts);
+
+ rd_kafka_topic_partition_list_destroy(c_parts);
+ return static_cast<RdKafka::ErrorCode>(err);
+}
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::unassign () {
+ return static_cast<RdKafka::ErrorCode>(rd_kafka_assign(rk_, NULL));
+}
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::committed (std::vector<RdKafka::TopicPartition*> &partitions, int timeout_ms) {
+ rd_kafka_topic_partition_list_t *c_parts;
+ rd_kafka_resp_err_t err;
+
+ c_parts = partitions_to_c_parts(partitions);
+
+ err = rd_kafka_committed(rk_, c_parts, timeout_ms);
+
+ if (!err) {
+ update_partitions_from_c_parts(partitions, c_parts);
+ }
+
+ rd_kafka_topic_partition_list_destroy(c_parts);
+
+ return static_cast<RdKafka::ErrorCode>(err);
+}
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::position (std::vector<RdKafka::TopicPartition*> &partitions) {
+ rd_kafka_topic_partition_list_t *c_parts;
+ rd_kafka_resp_err_t err;
+
+ c_parts = partitions_to_c_parts(partitions);
+
+ err = rd_kafka_position(rk_, c_parts);
+
+ if (!err) {
+ update_partitions_from_c_parts(partitions, c_parts);
+ }
+
+ rd_kafka_topic_partition_list_destroy(c_parts);
+
+ return static_cast<RdKafka::ErrorCode>(err);
+}
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::seek (const RdKafka::TopicPartition &partition,
+ int timeout_ms) {
+ const RdKafka::TopicPartitionImpl *p =
+ dynamic_cast<const RdKafka::TopicPartitionImpl*>(&partition);
+ rd_kafka_topic_t *rkt;
+
+ if (!(rkt = rd_kafka_topic_new(rk_, p->topic_.c_str(), NULL)))
+ return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
+
+ /* FIXME: Use a C API that takes a topic_partition_list_t instead */
+ RdKafka::ErrorCode err =
+ static_cast<RdKafka::ErrorCode>
+ (rd_kafka_seek(rkt, p->partition_, p->offset_, timeout_ms));
+
+ rd_kafka_topic_destroy(rkt);
+
+ return err;
+}
+
+
+
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::close () {
+ rd_kafka_resp_err_t err;
+ err = rd_kafka_consumer_close(rk_);
+ if (err)
+ return static_cast<RdKafka::ErrorCode>(err);
+
+ while (rd_kafka_outq_len(rk_) > 0)
+ rd_kafka_poll(rk_, 10);
+ rd_kafka_destroy(rk_);
+
+ return static_cast<RdKafka::ErrorCode>(err);
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/Makefile
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/Makefile b/thirdparty/librdkafka-0.11.4/src-cpp/Makefile
new file mode 100644
index 0000000..16f20b0
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/Makefile
@@ -0,0 +1,49 @@
+PKGNAME= librdkafka
+LIBNAME= librdkafka++
+LIBVER= 1
+
+CXXSRCS= RdKafka.cpp ConfImpl.cpp HandleImpl.cpp \
+ ConsumerImpl.cpp ProducerImpl.cpp KafkaConsumerImpl.cpp \
+ TopicImpl.cpp TopicPartitionImpl.cpp MessageImpl.cpp \
+ QueueImpl.cpp MetadataImpl.cpp
+
+HDRS= rdkafkacpp.h
+
+OBJS= $(CXXSRCS:%.cpp=%.o)
+
+
+
+all: lib check
+
+
+include ../mklove/Makefile.base
+
+# No linker script/symbol hiding for C++ library
+WITH_LDS=n
+
+# OSX and Cygwin requires linking required libraries
+ifeq ($(_UNAME_S),Darwin)
+ FWD_LINKING_REQ=y
+endif
+ifeq ($(_UNAME_S),AIX)
+ FWD_LINKING_REQ=y
+endif
+ifeq ($(shell uname -o 2>/dev/null),Cygwin)
+ FWD_LINKING_REQ=y
+endif
+
+# Ignore previously defined library dependencies for the C library,
+# we'll get those dependencies through the C library linkage.
+LIBS := -L../src -lrdkafka -lstdc++
+
+CHECK_FILES+= $(LIBFILENAME) $(LIBNAME).a
+
+
+file-check: lib
+check: file-check
+
+install: lib-install
+
+clean: lib-clean
+
+-include $(DEPS)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/MessageImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/MessageImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/MessageImpl.cpp
new file mode 100644
index 0000000..9562402
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/MessageImpl.cpp
@@ -0,0 +1,38 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <string>
+#include <list>
+#include <cerrno>
+
+#include "rdkafkacpp_int.h"
+
+
+RdKafka::Message::~Message() {}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/MetadataImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/MetadataImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/MetadataImpl.cpp
new file mode 100644
index 0000000..c2869f5
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/MetadataImpl.cpp
@@ -0,0 +1,151 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafkacpp_int.h"
+
+using namespace RdKafka;
+
+BrokerMetadata::~BrokerMetadata() {};
+PartitionMetadata::~PartitionMetadata() {};
+TopicMetadata::~TopicMetadata() {};
+Metadata::~Metadata() {};
+
+
+/**
+ * Metadata: Broker information handler implementation
+ */
+class BrokerMetadataImpl : public BrokerMetadata {
+ public:
+ BrokerMetadataImpl(const rd_kafka_metadata_broker_t *broker_metadata)
+ :broker_metadata_(broker_metadata),host_(broker_metadata->host) {}
+
+ int32_t id() const{return broker_metadata_->id;}
+
+ const std::string host() const {return host_;}
+ int port() const {return broker_metadata_->port;}
+
+ virtual ~BrokerMetadataImpl() {}
+
+ private:
+ const rd_kafka_metadata_broker_t *broker_metadata_;
+ const std::string host_;
+};
+
+/**
+ * Metadata: Partition information handler
+ */
+class PartitionMetadataImpl : public PartitionMetadata {
+ public:
+ // @TODO too much memory copy? maybe we should create a new vector class that read directly from C arrays?
+ // @TODO use auto_ptr?
+ PartitionMetadataImpl(const rd_kafka_metadata_partition_t *partition_metadata)
+ :partition_metadata_(partition_metadata) {
+ replicas_.reserve(partition_metadata->replica_cnt);
+ for(int i=0;i<partition_metadata->replica_cnt;++i)
+ replicas_.push_back(partition_metadata->replicas[i]);
+
+ isrs_.reserve(partition_metadata->isr_cnt);
+ for(int i=0;i<partition_metadata->isr_cnt;++i)
+ isrs_.push_back(partition_metadata->isrs[i]);
+ }
+
+ int32_t id() const {
+ return partition_metadata_->id;
+ }
+ int32_t leader() const {
+ return partition_metadata_->leader;
+ }
+ ErrorCode err() const {
+ return static_cast<ErrorCode>(partition_metadata_->err);
+ }
+
+ const std::vector<int32_t> *replicas() const {return &replicas_;}
+ const std::vector<int32_t> *isrs() const {return &isrs_;}
+
+ ~PartitionMetadataImpl() {};
+
+ private:
+ const rd_kafka_metadata_partition_t *partition_metadata_;
+ std::vector<int32_t> replicas_,isrs_;
+};
+
+/**
+ * Metadata: Topic information handler
+ */
+class TopicMetadataImpl : public TopicMetadata{
+ public:
+ TopicMetadataImpl(const rd_kafka_metadata_topic_t *topic_metadata)
+ :topic_metadata_(topic_metadata),topic_(topic_metadata->topic) {
+ partitions_.reserve(topic_metadata->partition_cnt);
+ for(int i=0;i<topic_metadata->partition_cnt;++i)
+ partitions_.push_back(
+ new PartitionMetadataImpl(&topic_metadata->partitions[i])
+ );
+ }
+
+ ~TopicMetadataImpl(){
+ for(size_t i=0;i<partitions_.size();++i)
+ delete partitions_[i];
+ }
+
+ const std::string topic() const {return topic_;}
+ const std::vector<const PartitionMetadata *> *partitions() const {
+ return &partitions_;
+ }
+ ErrorCode err() const {return static_cast<ErrorCode>(topic_metadata_->err);}
+
+ private:
+ const rd_kafka_metadata_topic_t *topic_metadata_;
+ const std::string topic_;
+ std::vector<const PartitionMetadata *> partitions_;
+
+};
+
+MetadataImpl::MetadataImpl(const rd_kafka_metadata_t *metadata)
+:metadata_(metadata)
+{
+ brokers_.reserve(metadata->broker_cnt);
+ for(int i=0;i<metadata->broker_cnt;++i)
+ brokers_.push_back(new BrokerMetadataImpl(&metadata->brokers[i]));
+
+ topics_.reserve(metadata->topic_cnt);
+ for(int i=0;i<metadata->topic_cnt;++i)
+ topics_.push_back(new TopicMetadataImpl(&metadata->topics[i]));
+
+}
+
+MetadataImpl::~MetadataImpl() {
+ for(size_t i=0;i<brokers_.size();++i)
+ delete brokers_[i];
+ for(size_t i=0;i<topics_.size();++i)
+ delete topics_[i];
+
+
+ if(metadata_)
+ rd_kafka_metadata_destroy(metadata_);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/ProducerImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/ProducerImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/ProducerImpl.cpp
new file mode 100644
index 0000000..456bc33
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/ProducerImpl.cpp
@@ -0,0 +1,167 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <string>
+#include <list>
+#include <cerrno>
+
+#include "rdkafkacpp_int.h"
+
+
+RdKafka::Producer::~Producer () {
+
+}
+
+static void dr_msg_cb_trampoline (rd_kafka_t *rk,
+ const rd_kafka_message_t *
+ rkmessage,
+ void *opaque) {
+ RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+ RdKafka::MessageImpl message(NULL, (rd_kafka_message_t *)rkmessage, false);
+ handle->dr_cb_->dr_cb(message);
+}
+
+
+
+RdKafka::Producer *RdKafka::Producer::create (RdKafka::Conf *conf,
+ std::string &errstr) {
+ char errbuf[512];
+ RdKafka::ConfImpl *confimpl = dynamic_cast<RdKafka::ConfImpl *>(conf);
+ RdKafka::ProducerImpl *rkp = new RdKafka::ProducerImpl();
+ rd_kafka_conf_t *rk_conf = NULL;
+
+ if (confimpl) {
+ if (!confimpl->rk_conf_) {
+ errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+ delete rkp;
+ return NULL;
+ }
+
+ rkp->set_common_config(confimpl);
+
+ rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_);
+
+ if (confimpl->dr_cb_) {
+ rd_kafka_conf_set_dr_msg_cb(rk_conf, dr_msg_cb_trampoline);
+ rkp->dr_cb_ = confimpl->dr_cb_;
+ }
+ }
+
+
+ rd_kafka_t *rk;
+ if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf,
+ errbuf, sizeof(errbuf)))) {
+ errstr = errbuf;
+ delete rkp;
+ return NULL;
+ }
+
+ rkp->rk_ = rk;
+
+ return rkp;
+}
+
+
+RdKafka::ErrorCode RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,
+ int32_t partition,
+ int msgflags,
+ void *payload, size_t len,
+ const std::string *key,
+ void *msg_opaque) {
+ RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
+
+ if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags,
+ payload, len,
+ key ? key->c_str() : NULL, key ? key->size() : 0,
+ msg_opaque) == -1)
+ return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
+
+ return RdKafka::ERR_NO_ERROR;
+}
+
+
+RdKafka::ErrorCode RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,
+ int32_t partition,
+ int msgflags,
+ void *payload, size_t len,
+ const void *key,
+ size_t key_len,
+ void *msg_opaque) {
+ RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
+
+ if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags,
+ payload, len, key, key_len,
+ msg_opaque) == -1)
+ return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
+
+ return RdKafka::ERR_NO_ERROR;
+}
+
+
+RdKafka::ErrorCode
+RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,
+ int32_t partition,
+ const std::vector<char> *payload,
+ const std::vector<char> *key,
+ void *msg_opaque) {
+ RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
+
+ if (rd_kafka_produce(topicimpl->rkt_, partition, RD_KAFKA_MSG_F_COPY,
+ payload ? (void *)&(*payload)[0] : NULL,
+ payload ? payload->size() : 0,
+ key ? &(*key)[0] : NULL, key ? key->size() : 0,
+ msg_opaque) == -1)
+ return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
+
+ return RdKafka::ERR_NO_ERROR;
+
+}
+
+
+RdKafka::ErrorCode
+RdKafka::ProducerImpl::produce (const std::string topic_name,
+ int32_t partition, int msgflags,
+ void *payload, size_t len,
+ const void *key, size_t key_len,
+ int64_t timestamp,
+ void *msg_opaque) {
+ return
+ static_cast<RdKafka::ErrorCode>
+ (
+ rd_kafka_producev(rk_,
+ RD_KAFKA_V_TOPIC(topic_name.c_str()),
+ RD_KAFKA_V_PARTITION(partition),
+ RD_KAFKA_V_MSGFLAGS(msgflags),
+ RD_KAFKA_V_VALUE(payload, len),
+ RD_KAFKA_V_KEY(key, key_len),
+ RD_KAFKA_V_TIMESTAMP(timestamp),
+ RD_KAFKA_V_OPAQUE(msg_opaque),
+ RD_KAFKA_V_END)
+ );
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/QueueImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/QueueImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/QueueImpl.cpp
new file mode 100644
index 0000000..1d8ce93
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/QueueImpl.cpp
@@ -0,0 +1,71 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <cerrno>
+
+#include "rdkafkacpp_int.h"
+
+RdKafka::Queue::~Queue () {
+
+}
+
+RdKafka::Queue *RdKafka::Queue::create (Handle *base) {
+ RdKafka::QueueImpl *queueimpl = new RdKafka::QueueImpl;
+ queueimpl->queue_ = rd_kafka_queue_new(dynamic_cast<HandleImpl*>(base)->rk_);
+ return queueimpl;
+}
+
+RdKafka::ErrorCode
+RdKafka::QueueImpl::forward (Queue *queue) {
+ if (!queue) {
+ rd_kafka_queue_forward(queue_, NULL);
+ } else {
+ QueueImpl *queueimpl = dynamic_cast<QueueImpl *>(queue);
+ rd_kafka_queue_forward(queue_, queueimpl->queue_);
+ }
+ return RdKafka::ERR_NO_ERROR;
+}
+
+RdKafka::Message *RdKafka::QueueImpl::consume (int timeout_ms) {
+ rd_kafka_message_t *rkmessage;
+ rkmessage = rd_kafka_consume_queue(queue_, timeout_ms);
+
+ if (!rkmessage)
+ return new RdKafka::MessageImpl(NULL, RdKafka::ERR__TIMED_OUT);
+
+ return new RdKafka::MessageImpl(rkmessage);
+}
+
+int RdKafka::QueueImpl::poll (int timeout_ms) {
+ return rd_kafka_queue_poll_callback(queue_, timeout_ms);
+}
+
+void RdKafka::QueueImpl::io_event_enable (int fd, const void *payload,
+ size_t size) {
+ rd_kafka_queue_io_event_enable(queue_, fd, payload, size);
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/README.md
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/README.md b/thirdparty/librdkafka-0.11.4/src-cpp/README.md
new file mode 100644
index 0000000..a484589
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/README.md
@@ -0,0 +1,16 @@
+librdkafka C++ interface
+========================
+
+**See rdkafkacpp.h for the public C++ API**
+
+
+
+Maintainer notes for the C++ interface:
+
+ * The public C++ interface (rdkafkacpp.h) does not include the
+ public C interface (rdkafka.h) in any way, this means that all
+ constants, flags, etc, must be kept in sync manually between the two
+ header files.
+ A regression test should be implemented that checks this is true.
+
+ * The public C++ interface is provided using pure virtual abstract classes.
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/RdKafka.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/RdKafka.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/RdKafka.cpp
new file mode 100644
index 0000000..7b67a7b
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/RdKafka.cpp
@@ -0,0 +1,52 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <string>
+
+#include "rdkafkacpp_int.h"
+
+int RdKafka::version () {
+ return rd_kafka_version();
+}
+
+std::string RdKafka::version_str () {
+ return std::string(rd_kafka_version_str());
+}
+
+std::string RdKafka::get_debug_contexts() {
+ return std::string(RD_KAFKA_DEBUG_CONTEXTS);
+}
+
+std::string RdKafka::err2str (RdKafka::ErrorCode err) {
+ return std::string(rd_kafka_err2str(static_cast<rd_kafka_resp_err_t>(err)));
+}
+
+int RdKafka::wait_destroyed (int timeout_ms) {
+ return rd_kafka_wait_destroyed(timeout_ms);
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/TopicImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/TopicImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/TopicImpl.cpp
new file mode 100644
index 0000000..cd80a4b
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/TopicImpl.cpp
@@ -0,0 +1,128 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <string>
+#include <list>
+#include <cerrno>
+
+#include "rdkafkacpp_int.h"
+
+const int32_t RdKafka::Topic::PARTITION_UA = RD_KAFKA_PARTITION_UA;
+
+const int64_t RdKafka::Topic::OFFSET_BEGINNING = RD_KAFKA_OFFSET_BEGINNING;
+
+const int64_t RdKafka::Topic::OFFSET_END = RD_KAFKA_OFFSET_END;
+
+const int64_t RdKafka::Topic::OFFSET_STORED = RD_KAFKA_OFFSET_STORED;
+
+const int64_t RdKafka::Topic::OFFSET_INVALID = RD_KAFKA_OFFSET_INVALID;
+
+RdKafka::Topic::~Topic () {
+
+}
+
+static int32_t partitioner_cb_trampoline (const rd_kafka_topic_t *rkt,
+ const void *keydata,
+ size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque) {
+ RdKafka::TopicImpl *topicimpl = static_cast<RdKafka::TopicImpl *>(rkt_opaque);
+ std::string key(static_cast<const char *>(keydata), keylen);
+ return topicimpl->partitioner_cb_->partitioner_cb(topicimpl, &key,
+ partition_cnt, msg_opaque);
+}
+
+static int32_t partitioner_kp_cb_trampoline (const rd_kafka_topic_t *rkt,
+ const void *keydata,
+ size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque) {
+ RdKafka::TopicImpl *topicimpl = static_cast<RdKafka::TopicImpl *>(rkt_opaque);
+ return topicimpl->partitioner_kp_cb_->partitioner_cb(topicimpl,
+ keydata, keylen,
+ partition_cnt,
+ msg_opaque);
+}
+
+
+
+RdKafka::Topic *RdKafka::Topic::create (Handle *base,
+ const std::string &topic_str,
+ Conf *conf,
+ std::string &errstr) {
+ RdKafka::ConfImpl *confimpl = static_cast<RdKafka::ConfImpl *>(conf);
+ rd_kafka_topic_t *rkt;
+ rd_kafka_topic_conf_t *rkt_conf;
+ rd_kafka_t *rk = dynamic_cast<HandleImpl*>(base)->rk_;
+
+ RdKafka::TopicImpl *topic = new RdKafka::TopicImpl();
+
+ if (!confimpl) {
+ /* Reuse default topic config, but we need our own copy to
+ * set the topic opaque. */
+ rkt_conf = rd_kafka_default_topic_conf_dup(rk);
+ } else {
+ /* Make a copy of conf struct to allow Conf reuse. */
+ rkt_conf = rd_kafka_topic_conf_dup(confimpl->rkt_conf_);
+ }
+
+ /* Set topic opaque to the topic so that we can reach our topic object
+ * from whatever callbacks get registered.
+ * The application itself will not need these opaques since their
+ * callbacks are class based. */
+ rd_kafka_topic_conf_set_opaque(rkt_conf, static_cast<void *>(topic));
+
+ if (confimpl) {
+ if (confimpl->partitioner_cb_) {
+ rd_kafka_topic_conf_set_partitioner_cb(rkt_conf,
+ partitioner_cb_trampoline);
+ topic->partitioner_cb_ = confimpl->partitioner_cb_;
+ } else if (confimpl->partitioner_kp_cb_) {
+ rd_kafka_topic_conf_set_partitioner_cb(rkt_conf,
+ partitioner_kp_cb_trampoline);
+ topic->partitioner_kp_cb_ = confimpl->partitioner_kp_cb_;
+ }
+ }
+
+
+ if (!(rkt = rd_kafka_topic_new(rk, topic_str.c_str(), rkt_conf))) {
+ errstr = rd_kafka_err2str(rd_kafka_last_error());
+ delete topic;
+ rd_kafka_topic_conf_destroy(rkt_conf);
+ return NULL;
+ }
+
+ topic->rkt_ = rkt;
+
+ return topic;
+
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/TopicPartitionImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/TopicPartitionImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/TopicPartitionImpl.cpp
new file mode 100644
index 0000000..71a688c
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/TopicPartitionImpl.cpp
@@ -0,0 +1,55 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <string>
+#include <vector>
+
+#include "rdkafkacpp_int.h"
+
+RdKafka::TopicPartition::~TopicPartition () {
+}
+
+RdKafka::TopicPartition *
+RdKafka::TopicPartition::create (const std::string &topic, int partition) {
+ return new TopicPartitionImpl(topic, partition);
+}
+
+RdKafka::TopicPartition *
+RdKafka::TopicPartition::create (const std::string &topic, int partition,
+ int64_t offset) {
+ return new TopicPartitionImpl(topic, partition, offset);
+}
+
+void
+RdKafka::TopicPartition::destroy (std::vector<TopicPartition*> &partitions) {
+ for (std::vector<TopicPartition*>::iterator it = partitions.begin() ;
+ it != partitions.end(); ++it)
+ delete(*it);
+ partitions.clear();
+}