You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:31 UTC

[03/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade to librdkafka 0.11.4

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/CMakeLists.txt b/thirdparty/librdkafka-0.11.4/src-cpp/CMakeLists.txt
new file mode 100644
index 0000000..db71516
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/CMakeLists.txt
@@ -0,0 +1,35 @@
+add_library(
+    rdkafka++
+    ConfImpl.cpp
+    ConsumerImpl.cpp
+    HandleImpl.cpp
+    KafkaConsumerImpl.cpp
+    MessageImpl.cpp
+    MetadataImpl.cpp
+    ProducerImpl.cpp
+    QueueImpl.cpp
+    RdKafka.cpp
+    TopicImpl.cpp
+    TopicPartitionImpl.cpp
+)
+
+target_link_libraries(rdkafka++ PUBLIC rdkafka)
+
+# Support '#include <rdkafcpp.h>'
+target_include_directories(rdkafka++ PUBLIC "$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}>")
+if(NOT RDKAFKA_BUILD_STATIC)
+    target_compile_definitions(rdkafka++ PRIVATE LIBRDKAFKACPP_EXPORTS)
+endif()
+install(
+    TARGETS rdkafka++
+    EXPORT "${targets_export_name}"
+    LIBRARY DESTINATION "${CMAKE_INSTALL_LIBDIR}"
+    ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}"
+    RUNTIME DESTINATION "${CMAKE_INSTALL_BINDIR}"
+    INCLUDES DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}"
+)
+
+install(
+    FILES "rdkafkacpp.h"
+    DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/librdkafka"
+)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/ConfImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/ConfImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/ConfImpl.cpp
new file mode 100644
index 0000000..709c728
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/ConfImpl.cpp
@@ -0,0 +1,89 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <string>
+#include <list>
+
+#include "rdkafkacpp_int.h"
+
+
+
+RdKafka::ConfImpl::ConfResult RdKafka::ConfImpl::set(const std::string &name,
+						     const std::string &value,
+						     std::string &errstr) {
+  rd_kafka_conf_res_t res;
+  char errbuf[512];
+
+  if (this->conf_type_ == CONF_GLOBAL)
+    res = rd_kafka_conf_set(this->rk_conf_,
+                            name.c_str(), value.c_str(),
+                            errbuf, sizeof(errbuf));
+  else
+    res = rd_kafka_topic_conf_set(this->rkt_conf_,
+                                  name.c_str(), value.c_str(),
+                                  errbuf, sizeof(errbuf));
+
+  if (res != RD_KAFKA_CONF_OK)
+    errstr = errbuf;
+
+  return static_cast<Conf::ConfResult>(res);
+}
+
+
+std::list<std::string> *RdKafka::ConfImpl::dump () {
+
+  const char **arrc;
+  size_t cnt;
+  std::list<std::string> *arr;
+
+  if (rk_conf_)
+    arrc = rd_kafka_conf_dump(rk_conf_, &cnt);
+  else
+    arrc = rd_kafka_topic_conf_dump(rkt_conf_, &cnt);
+
+  arr = new std::list<std::string>();
+  for (int i = 0 ; i < static_cast<int>(cnt) ; i++)
+    arr->push_back(std::string(arrc[i]));
+
+  rd_kafka_conf_dump_free(arrc, cnt);
+  return arr;
+}
+
+RdKafka::Conf *RdKafka::Conf::create (ConfType type) {
+  ConfImpl *conf = new ConfImpl();
+
+  conf->conf_type_ = type;
+
+  if (type == CONF_GLOBAL)
+    conf->rk_conf_ = rd_kafka_conf_new();
+  else
+    conf->rkt_conf_ = rd_kafka_topic_conf_new();
+
+  return conf;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/ConsumerImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/ConsumerImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/ConsumerImpl.cpp
new file mode 100644
index 0000000..bb46877
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/ConsumerImpl.cpp
@@ -0,0 +1,233 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <string>
+#include <list>
+#include <cerrno>
+
+#include "rdkafkacpp_int.h"
+
+RdKafka::Consumer::~Consumer () {}
+
+RdKafka::Consumer *RdKafka::Consumer::create (RdKafka::Conf *conf,
+                                              std::string &errstr) {
+  char errbuf[512];
+  RdKafka::ConfImpl *confimpl = dynamic_cast<RdKafka::ConfImpl *>(conf);
+  RdKafka::ConsumerImpl *rkc = new RdKafka::ConsumerImpl();
+  rd_kafka_conf_t *rk_conf = NULL;
+
+  if (confimpl) {
+    if (!confimpl->rk_conf_) {
+      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+      delete rkc;
+      return NULL;
+    }
+
+    rkc->set_common_config(confimpl);
+
+    rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_);
+  }
+
+  rd_kafka_t *rk;
+  if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, rk_conf,
+                          errbuf, sizeof(errbuf)))) {
+    errstr = errbuf;
+    delete rkc;
+    return NULL;
+  }
+
+  rkc->rk_ = rk;
+
+
+  return rkc;
+}
+
+int64_t RdKafka::Consumer::OffsetTail (int64_t offset) {
+  return RD_KAFKA_OFFSET_TAIL(offset);
+}
+
+RdKafka::ErrorCode RdKafka::ConsumerImpl::start (Topic *topic,
+                                                 int32_t partition,
+                                                 int64_t offset) {
+  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
+
+  if (rd_kafka_consume_start(topicimpl->rkt_, partition, offset) == -1)
+    return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
+
+  return RdKafka::ERR_NO_ERROR;
+}
+
+
+RdKafka::ErrorCode RdKafka::ConsumerImpl::start (Topic *topic,
+                                                 int32_t partition,
+                                                 int64_t offset,
+                                                 Queue *queue) {
+  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
+  RdKafka::QueueImpl *queueimpl = dynamic_cast<RdKafka::QueueImpl *>(queue);
+
+  if (rd_kafka_consume_start_queue(topicimpl->rkt_, partition, offset,
+                                   queueimpl->queue_) == -1)
+    return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
+
+  return RdKafka::ERR_NO_ERROR;
+}
+
+
+RdKafka::ErrorCode RdKafka::ConsumerImpl::stop (Topic *topic,
+                                                int32_t partition) {
+  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
+
+  if (rd_kafka_consume_stop(topicimpl->rkt_, partition) == -1)
+    return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
+
+  return RdKafka::ERR_NO_ERROR;
+}
+
+RdKafka::ErrorCode RdKafka::ConsumerImpl::seek (Topic *topic,
+						int32_t partition,
+						int64_t offset,
+						int timeout_ms) {
+  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
+
+  if (rd_kafka_seek(topicimpl->rkt_, partition, offset, timeout_ms) == -1)
+    return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
+
+  return RdKafka::ERR_NO_ERROR;
+}
+
+RdKafka::Message *RdKafka::ConsumerImpl::consume (Topic *topic,
+                                                  int32_t partition,
+                                                  int timeout_ms) {
+  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
+  rd_kafka_message_t *rkmessage;
+
+  rkmessage = rd_kafka_consume(topicimpl->rkt_, partition, timeout_ms);
+  if (!rkmessage)
+    return new RdKafka::MessageImpl(topic,
+                                    static_cast<RdKafka::ErrorCode>
+                                    (rd_kafka_last_error()));
+
+  return new RdKafka::MessageImpl(topic, rkmessage);
+}
+
+namespace {
+  /* Helper struct for `consume_callback'.
+   * Encapsulates the values we need in order to call `rd_kafka_consume_callback'
+   * and keep track of the C++ callback function and `opaque' value.
+   */
+  struct ConsumerImplCallback {
+    ConsumerImplCallback(RdKafka::Topic* topic, RdKafka::ConsumeCb* cb, void* data)
+      : topic(topic), cb_cls(cb), cb_data(data) {
+    }
+    /* This function is the one we give to `rd_kafka_consume_callback', with
+     * the `opaque' pointer pointing to an instance of this struct, in which
+     * we can find the C++ callback and `cb_data'.
+     */
+    static void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque) {
+      ConsumerImplCallback *instance = static_cast<ConsumerImplCallback*>(opaque);
+      RdKafka::MessageImpl message(instance->topic, msg, false /*don't free*/);
+      instance->cb_cls->consume_cb(message, instance->cb_data);
+    }
+    RdKafka::Topic *topic;
+    RdKafka::ConsumeCb *cb_cls;
+    void *cb_data;
+  };
+}
+
+int RdKafka::ConsumerImpl::consume_callback (RdKafka::Topic* topic,
+                                             int32_t partition,
+                                             int timeout_ms,
+                                             RdKafka::ConsumeCb *consume_cb,
+                                             void *opaque) {
+  RdKafka::TopicImpl *topicimpl = static_cast<RdKafka::TopicImpl *>(topic);
+  ConsumerImplCallback context(topic, consume_cb, opaque);
+  return rd_kafka_consume_callback(topicimpl->rkt_, partition, timeout_ms,
+                                   &ConsumerImplCallback::consume_cb_trampoline, &context);
+}
+
+
+RdKafka::Message *RdKafka::ConsumerImpl::consume (Queue *queue,
+                                                  int timeout_ms) {
+  RdKafka::QueueImpl *queueimpl = dynamic_cast<RdKafka::QueueImpl *>(queue);
+  rd_kafka_message_t *rkmessage;
+
+  rkmessage = rd_kafka_consume_queue(queueimpl->queue_, timeout_ms);
+  if (!rkmessage)
+    return new RdKafka::MessageImpl(NULL,
+                                    static_cast<RdKafka::ErrorCode>
+                                    (rd_kafka_last_error()));
+  /*
+   * Recover our Topic * from the topic conf's opaque field, which we
+   * set in RdKafka::Topic::create() for just this kind of situation.
+   */
+  void *opaque = rd_kafka_topic_opaque(rkmessage->rkt);
+  Topic *topic = static_cast<Topic *>(opaque);
+
+  return new RdKafka::MessageImpl(topic, rkmessage);
+}
+
+namespace {
+  /* Helper struct for `consume_callback' with a Queue.
+   * Encapsulates the values we need in order to call `rd_kafka_consume_callback'
+   * and keep track of the C++ callback function and `opaque' value.
+   */
+  struct ConsumerImplQueueCallback {
+    ConsumerImplQueueCallback(RdKafka::ConsumeCb *cb, void *data)
+      : cb_cls(cb), cb_data(data) {
+    }
+    /* This function is the one we give to `rd_kafka_consume_callback', with
+     * the `opaque' pointer pointing to an instance of this struct, in which
+     * we can find the C++ callback and `cb_data'.
+     */
+    static void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque) {
+      ConsumerImplQueueCallback *instance = static_cast<ConsumerImplQueueCallback *>(opaque);
+      /*
+       * Recover our Topic * from the topic conf's opaque field, which we
+       * set in RdKafka::Topic::create() for just this kind of situation.
+       */
+      void *topic_opaque = rd_kafka_topic_opaque(msg->rkt);
+      RdKafka::Topic *topic = static_cast<RdKafka::Topic *>(topic_opaque);
+      RdKafka::MessageImpl message(topic, msg, false /*don't free*/);
+      instance->cb_cls->consume_cb(message, instance->cb_data);
+    }
+    RdKafka::ConsumeCb *cb_cls;
+    void *cb_data;
+  };
+}
+
+int RdKafka::ConsumerImpl::consume_callback (Queue *queue,
+                                             int timeout_ms,
+                                             RdKafka::ConsumeCb *consume_cb,
+                                             void *opaque) {
+  RdKafka::QueueImpl *queueimpl = dynamic_cast<RdKafka::QueueImpl *>(queue);
+  ConsumerImplQueueCallback context(consume_cb, opaque);
+  return rd_kafka_consume_callback_queue(queueimpl->queue_, timeout_ms,
+                                         &ConsumerImplQueueCallback::consume_cb_trampoline,
+                                         &context);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/HandleImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/HandleImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/HandleImpl.cpp
new file mode 100644
index 0000000..3bdccbf
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/HandleImpl.cpp
@@ -0,0 +1,365 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <string>
+#include <list>
+
+#include "rdkafkacpp_int.h"
+
+void RdKafka::consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque) {
+  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+  RdKafka::Topic* topic = static_cast<Topic *>(rd_kafka_topic_opaque(msg->rkt));
+
+  RdKafka::MessageImpl message(topic, msg, false /*don't free*/);
+
+  handle->consume_cb_->consume_cb(message, opaque);
+}
+
+void RdKafka::log_cb_trampoline (const rd_kafka_t *rk, int level,
+                                 const char *fac, const char *buf) {
+  if (!rk) {
+    rd_kafka_log_print(rk, level, fac, buf);
+    return;
+  }
+
+  void *opaque = rd_kafka_opaque(rk);
+  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+  if (!handle->event_cb_) {
+    rd_kafka_log_print(rk, level, fac, buf);
+    return;
+  }
+
+  RdKafka::EventImpl event(RdKafka::Event::EVENT_LOG,
+                           RdKafka::ERR_NO_ERROR,
+                           static_cast<RdKafka::Event::Severity>(level),
+                           fac, buf);
+
+  handle->event_cb_->event_cb(event);
+}
+
+
+void RdKafka::error_cb_trampoline (rd_kafka_t *rk, int err,
+                                   const char *reason, void *opaque) {
+  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+  RdKafka::EventImpl event(RdKafka::Event::EVENT_ERROR,
+                           static_cast<RdKafka::ErrorCode>(err),
+                           RdKafka::Event::EVENT_SEVERITY_ERROR,
+                           NULL,
+                           reason);
+
+  handle->event_cb_->event_cb(event);
+}
+
+
+void RdKafka::throttle_cb_trampoline (rd_kafka_t *rk, const char *broker_name,
+				      int32_t broker_id,
+				      int throttle_time_ms,
+				      void *opaque) {
+  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+  RdKafka::EventImpl event(RdKafka::Event::EVENT_THROTTLE);
+  event.str_ = broker_name;
+  event.id_ = broker_id;
+  event.throttle_time_ = throttle_time_ms;
+
+  handle->event_cb_->event_cb(event);
+}
+
+
+int RdKafka::stats_cb_trampoline (rd_kafka_t *rk, char *json, size_t json_len,
+                                  void *opaque) {
+  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+  RdKafka::EventImpl event(RdKafka::Event::EVENT_STATS,
+                           RdKafka::ERR_NO_ERROR,
+                           RdKafka::Event::EVENT_SEVERITY_INFO,
+                           NULL, json);
+
+  handle->event_cb_->event_cb(event);
+
+  return 0;
+}
+
+
+int RdKafka::socket_cb_trampoline (int domain, int type, int protocol,
+                                   void *opaque) {
+  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+  return handle->socket_cb_->socket_cb(domain, type, protocol);
+}
+
+int RdKafka::open_cb_trampoline (const char *pathname, int flags, mode_t mode,
+                                 void *opaque) {
+  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+
+  return handle->open_cb_->open_cb(pathname, flags, static_cast<int>(mode));
+}
+
+RdKafka::ErrorCode RdKafka::HandleImpl::metadata (bool all_topics,
+                                                  const Topic *only_rkt,
+                                                  Metadata **metadatap, 
+                                                  int timeout_ms) {
+
+  const rd_kafka_metadata_t *cmetadatap=NULL;
+
+  rd_kafka_topic_t *topic = only_rkt ? 
+    static_cast<const TopicImpl *>(only_rkt)->rkt_ : NULL;
+
+  const rd_kafka_resp_err_t rc = rd_kafka_metadata(rk_, all_topics, topic,
+                                                   &cmetadatap,timeout_ms);
+
+  *metadatap = (rc == RD_KAFKA_RESP_ERR_NO_ERROR) ? 
+    new RdKafka::MetadataImpl(cmetadatap) : NULL;
+
+  return static_cast<RdKafka::ErrorCode>(rc);
+}
+
+/**
+ * Convert a list of C partitions to C++ partitions
+ */
+static void c_parts_to_partitions (const rd_kafka_topic_partition_list_t
+                                   *c_parts,
+                                   std::vector<RdKafka::TopicPartition*>
+                                   &partitions) {
+  partitions.resize(c_parts->cnt);
+  for (int i = 0 ; i < c_parts->cnt ; i++)
+    partitions[i] = new RdKafka::TopicPartitionImpl(&c_parts->elems[i]);
+}
+
+static void free_partition_vector (std::vector<RdKafka::TopicPartition*> &v) {
+  for (unsigned int i = 0 ; i < v.size() ; i++)
+    delete v[i];
+  v.clear();
+}
+
+void
+RdKafka::rebalance_cb_trampoline (rd_kafka_t *rk,
+                                  rd_kafka_resp_err_t err,
+                                  rd_kafka_topic_partition_list_t *c_partitions,
+                                  void *opaque) {
+  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+  std::vector<RdKafka::TopicPartition*> partitions;
+
+  c_parts_to_partitions(c_partitions, partitions);
+
+  handle->rebalance_cb_->rebalance_cb(
+				      dynamic_cast<RdKafka::KafkaConsumer*>(handle),
+				      static_cast<RdKafka::ErrorCode>(err),
+				      partitions);
+
+  free_partition_vector(partitions);
+}
+
+
+void
+RdKafka::offset_commit_cb_trampoline0 (
+    rd_kafka_t *rk,
+    rd_kafka_resp_err_t err,
+    rd_kafka_topic_partition_list_t *c_offsets, void *opaque) {
+  OffsetCommitCb *cb = static_cast<RdKafka::OffsetCommitCb *>(opaque);
+  std::vector<RdKafka::TopicPartition*> offsets;
+
+  if (c_offsets)
+    c_parts_to_partitions(c_offsets, offsets);
+
+  cb->offset_commit_cb(static_cast<RdKafka::ErrorCode>(err), offsets);
+
+  free_partition_vector(offsets);
+}
+
+static void
+offset_commit_cb_trampoline (
+    rd_kafka_t *rk,
+    rd_kafka_resp_err_t err,
+    rd_kafka_topic_partition_list_t *c_offsets, void *opaque) {
+  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+  RdKafka::offset_commit_cb_trampoline0(rk, err, c_offsets,
+                                        handle->offset_commit_cb_);
+}
+
+
+void RdKafka::HandleImpl::set_common_config (RdKafka::ConfImpl *confimpl) {
+
+  rd_kafka_conf_set_opaque(confimpl->rk_conf_, this);
+
+  if (confimpl->event_cb_) {
+    rd_kafka_conf_set_log_cb(confimpl->rk_conf_,
+                             RdKafka::log_cb_trampoline);
+    rd_kafka_conf_set_error_cb(confimpl->rk_conf_,
+                               RdKafka::error_cb_trampoline);
+    rd_kafka_conf_set_throttle_cb(confimpl->rk_conf_,
+				  RdKafka::throttle_cb_trampoline);
+    rd_kafka_conf_set_stats_cb(confimpl->rk_conf_,
+                               RdKafka::stats_cb_trampoline);
+    event_cb_ = confimpl->event_cb_;
+  }
+
+  if (confimpl->socket_cb_) {
+    rd_kafka_conf_set_socket_cb(confimpl->rk_conf_,
+                                RdKafka::socket_cb_trampoline);
+    socket_cb_ = confimpl->socket_cb_;
+  }
+
+  if (confimpl->open_cb_) {
+#ifndef _MSC_VER
+    rd_kafka_conf_set_open_cb(confimpl->rk_conf_, RdKafka::open_cb_trampoline);
+    open_cb_ = confimpl->open_cb_;
+#endif
+  }
+
+  if (confimpl->rebalance_cb_) {
+    rd_kafka_conf_set_rebalance_cb(confimpl->rk_conf_,
+                                   RdKafka::rebalance_cb_trampoline);
+    rebalance_cb_ = confimpl->rebalance_cb_;
+  }
+
+  if (confimpl->offset_commit_cb_) {
+    rd_kafka_conf_set_offset_commit_cb(confimpl->rk_conf_,
+                                       offset_commit_cb_trampoline);
+    offset_commit_cb_ = confimpl->offset_commit_cb_;
+  }
+
+  if (confimpl->consume_cb_) {
+    rd_kafka_conf_set_consume_cb(confimpl->rk_conf_,
+                                 RdKafka::consume_cb_trampoline);
+    consume_cb_ = confimpl->consume_cb_;
+  }
+
+}
+
+
+RdKafka::ErrorCode
+RdKafka::HandleImpl::pause (std::vector<RdKafka::TopicPartition*> &partitions) {
+  rd_kafka_topic_partition_list_t *c_parts;
+  rd_kafka_resp_err_t err;
+
+  c_parts = partitions_to_c_parts(partitions);
+
+  err = rd_kafka_pause_partitions(rk_, c_parts);
+
+  if (!err)
+    update_partitions_from_c_parts(partitions, c_parts);
+
+  rd_kafka_topic_partition_list_destroy(c_parts);
+
+  return static_cast<RdKafka::ErrorCode>(err);
+}
+
+
+RdKafka::ErrorCode
+RdKafka::HandleImpl::resume (std::vector<RdKafka::TopicPartition*> &partitions) {
+  rd_kafka_topic_partition_list_t *c_parts;
+  rd_kafka_resp_err_t err;
+
+  c_parts = partitions_to_c_parts(partitions);
+
+  err = rd_kafka_resume_partitions(rk_, c_parts);
+
+  if (!err)
+    update_partitions_from_c_parts(partitions, c_parts);
+
+  rd_kafka_topic_partition_list_destroy(c_parts);
+
+  return static_cast<RdKafka::ErrorCode>(err);
+}
+
+RdKafka::Queue *
+RdKafka::HandleImpl::get_partition_queue (const TopicPartition *part) {
+  rd_kafka_queue_t *rkqu;
+  rkqu = rd_kafka_queue_get_partition(rk_,
+                                      part->topic().c_str(),
+                                      part->partition());
+
+  if (rkqu == NULL)
+    return NULL;
+
+  RdKafka::QueueImpl *queueimpl = new RdKafka::QueueImpl;
+  queueimpl->queue_ = rkqu;
+
+  return queueimpl;
+}
+
+RdKafka::ErrorCode
+RdKafka::HandleImpl::set_log_queue (RdKafka::Queue *queue) {
+        rd_kafka_queue_t *rkqu = NULL;
+        if (queue) {
+                QueueImpl *queueimpl = dynamic_cast<QueueImpl *>(queue);
+                rkqu = queueimpl->queue_;
+        }
+        return static_cast<RdKafka::ErrorCode>(
+                rd_kafka_set_log_queue(rk_, rkqu));
+}
+
+namespace RdKafka {
+
+rd_kafka_topic_partition_list_t *
+partitions_to_c_parts (const std::vector<RdKafka::TopicPartition*> &partitions){
+  rd_kafka_topic_partition_list_t *c_parts;
+
+  c_parts = rd_kafka_topic_partition_list_new((int)partitions.size());
+
+  for (unsigned int i = 0 ; i < partitions.size() ; i++) {
+    const RdKafka::TopicPartitionImpl *tpi =
+        dynamic_cast<const RdKafka::TopicPartitionImpl*>(partitions[i]);
+    rd_kafka_topic_partition_t *rktpar =
+      rd_kafka_topic_partition_list_add(c_parts,
+					tpi->topic_.c_str(), tpi->partition_);
+    rktpar->offset = tpi->offset_;
+  }
+
+  return c_parts;
+}
+
+
+/**
+ * @brief Update the application provided 'partitions' with info from 'c_parts'
+ */
+void
+update_partitions_from_c_parts (std::vector<RdKafka::TopicPartition*> &partitions,
+				const rd_kafka_topic_partition_list_t *c_parts) {
+  for (int i = 0 ; i < c_parts->cnt ; i++) {
+    rd_kafka_topic_partition_t *p = &c_parts->elems[i];
+
+    /* Find corresponding C++ entry */
+    for (unsigned int j = 0 ; j < partitions.size() ; j++) {
+      RdKafka::TopicPartitionImpl *pp =
+	dynamic_cast<RdKafka::TopicPartitionImpl*>(partitions[j]);
+      if (!strcmp(p->topic, pp->topic_.c_str()) &&
+	  p->partition == pp->partition_) {
+	pp->offset_ = p->offset;
+	pp->err_ = static_cast<RdKafka::ErrorCode>(p->err);
+      }
+    }
+  }
+}
+
+};
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/KafkaConsumerImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/KafkaConsumerImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/KafkaConsumerImpl.cpp
new file mode 100644
index 0000000..f4e79d3
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/KafkaConsumerImpl.cpp
@@ -0,0 +1,257 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <string>
+#include <vector>
+
+#include "rdkafkacpp_int.h"
+
+RdKafka::KafkaConsumer::~KafkaConsumer () {}
+
+RdKafka::KafkaConsumer *RdKafka::KafkaConsumer::create (RdKafka::Conf *conf,
+                                                        std::string &errstr) {
+  char errbuf[512];
+  RdKafka::ConfImpl *confimpl = dynamic_cast<RdKafka::ConfImpl *>(conf);
+  RdKafka::KafkaConsumerImpl *rkc = new RdKafka::KafkaConsumerImpl();
+  rd_kafka_conf_t *rk_conf = NULL;
+  size_t grlen;
+
+  if (!confimpl->rk_conf_) {
+    errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+    delete rkc;
+    return NULL;
+  }
+
+  if (rd_kafka_conf_get(confimpl->rk_conf_, "group.id",
+                        NULL, &grlen) != RD_KAFKA_CONF_OK ||
+      grlen <= 1 /* terminating null only */) {
+    errstr = "\"group.id\" must be configured";
+    delete rkc;
+    return NULL;
+  }
+
+  rkc->set_common_config(confimpl);
+
+  rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_);
+
+  rd_kafka_t *rk;
+  if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, rk_conf,
+                          errbuf, sizeof(errbuf)))) {
+    errstr = errbuf;
+    delete rkc;
+    return NULL;
+  }
+
+  rkc->rk_ = rk;
+
+  /* Redirect handle queue to cgrp's queue to provide a single queue point */
+  rd_kafka_poll_set_consumer(rk);
+
+  return rkc;
+}
+
+
+
+
+
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::subscribe (const std::vector<std::string> &topics) {
+  rd_kafka_topic_partition_list_t *c_topics;
+  rd_kafka_resp_err_t err;
+
+  c_topics = rd_kafka_topic_partition_list_new((int)topics.size());
+
+  for (unsigned int i = 0 ; i < topics.size() ; i++)
+    rd_kafka_topic_partition_list_add(c_topics, topics[i].c_str(),
+                                      RD_KAFKA_PARTITION_UA);
+
+  err = rd_kafka_subscribe(rk_, c_topics);
+
+  rd_kafka_topic_partition_list_destroy(c_topics);
+
+  return static_cast<RdKafka::ErrorCode>(err);
+}
+
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::unsubscribe () {
+  return static_cast<RdKafka::ErrorCode>(rd_kafka_unsubscribe(this->rk_));
+}
+
+RdKafka::Message *RdKafka::KafkaConsumerImpl::consume (int timeout_ms) {
+  rd_kafka_message_t *rkmessage;
+
+  rkmessage = rd_kafka_consumer_poll(this->rk_, timeout_ms);
+
+  if (!rkmessage)
+    return new RdKafka::MessageImpl(NULL, RdKafka::ERR__TIMED_OUT);
+
+  return new RdKafka::MessageImpl(rkmessage);
+
+}
+
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::assignment (std::vector<RdKafka::TopicPartition*> &partitions) {
+  rd_kafka_topic_partition_list_t *c_parts;
+  rd_kafka_resp_err_t err;
+
+  if ((err = rd_kafka_assignment(rk_, &c_parts)))
+    return static_cast<RdKafka::ErrorCode>(err);
+
+  partitions.resize(c_parts->cnt);
+
+  for (int i = 0 ; i < c_parts->cnt ; i++)
+    partitions[i] = new RdKafka::TopicPartitionImpl(&c_parts->elems[i]);
+
+  rd_kafka_topic_partition_list_destroy(c_parts);
+
+  return RdKafka::ERR_NO_ERROR;
+}
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::subscription (std::vector<std::string> &topics) {
+  rd_kafka_topic_partition_list_t *c_topics;
+  rd_kafka_resp_err_t err;
+
+  if ((err = rd_kafka_subscription(rk_, &c_topics)))
+    return static_cast<RdKafka::ErrorCode>(err);
+
+  topics.resize(c_topics->cnt);
+  for (int i = 0 ; i < c_topics->cnt ; i++)
+    topics[i] = std::string(c_topics->elems[i].topic);
+
+  rd_kafka_topic_partition_list_destroy(c_topics);
+
+  return RdKafka::ERR_NO_ERROR;
+}
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::assign (const std::vector<TopicPartition*> &partitions) {
+  rd_kafka_topic_partition_list_t *c_parts;
+  rd_kafka_resp_err_t err;
+
+  c_parts = partitions_to_c_parts(partitions);
+
+  err = rd_kafka_assign(rk_, c_parts);
+
+  rd_kafka_topic_partition_list_destroy(c_parts);
+  return static_cast<RdKafka::ErrorCode>(err);
+}
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::unassign () {
+  return static_cast<RdKafka::ErrorCode>(rd_kafka_assign(rk_, NULL));
+}
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::committed (std::vector<RdKafka::TopicPartition*> &partitions, int timeout_ms) {
+  rd_kafka_topic_partition_list_t *c_parts;
+  rd_kafka_resp_err_t err;
+
+  c_parts = partitions_to_c_parts(partitions);
+
+  err = rd_kafka_committed(rk_, c_parts, timeout_ms);
+
+  if (!err) {
+    update_partitions_from_c_parts(partitions, c_parts);
+  }
+
+  rd_kafka_topic_partition_list_destroy(c_parts);
+
+  return static_cast<RdKafka::ErrorCode>(err);
+}
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::position (std::vector<RdKafka::TopicPartition*> &partitions) {
+  rd_kafka_topic_partition_list_t *c_parts;
+  rd_kafka_resp_err_t err;
+
+  c_parts = partitions_to_c_parts(partitions);
+
+  err = rd_kafka_position(rk_, c_parts);
+
+  if (!err) {
+    update_partitions_from_c_parts(partitions, c_parts);
+  }
+
+  rd_kafka_topic_partition_list_destroy(c_parts);
+
+  return static_cast<RdKafka::ErrorCode>(err);
+}
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::seek (const RdKafka::TopicPartition &partition,
+                                  int timeout_ms) {
+  const RdKafka::TopicPartitionImpl *p =
+    dynamic_cast<const RdKafka::TopicPartitionImpl*>(&partition);
+  rd_kafka_topic_t *rkt;
+
+  if (!(rkt = rd_kafka_topic_new(rk_, p->topic_.c_str(), NULL)))
+    return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
+
+  /* FIXME: Use a C API that takes a topic_partition_list_t instead */
+  RdKafka::ErrorCode err =
+    static_cast<RdKafka::ErrorCode>
+    (rd_kafka_seek(rkt, p->partition_, p->offset_, timeout_ms));
+
+  rd_kafka_topic_destroy(rkt);
+
+  return err;
+}
+
+
+
+
+
+RdKafka::ErrorCode
+RdKafka::KafkaConsumerImpl::close () {
+  rd_kafka_resp_err_t err;
+  err = rd_kafka_consumer_close(rk_);
+  if (err)
+    return static_cast<RdKafka::ErrorCode>(err);
+
+  while (rd_kafka_outq_len(rk_) > 0)
+    rd_kafka_poll(rk_, 10);
+  rd_kafka_destroy(rk_);
+
+  return static_cast<RdKafka::ErrorCode>(err);
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/Makefile
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/Makefile b/thirdparty/librdkafka-0.11.4/src-cpp/Makefile
new file mode 100644
index 0000000..16f20b0
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/Makefile
@@ -0,0 +1,49 @@
+PKGNAME=	librdkafka
+LIBNAME=	librdkafka++
+LIBVER=		1
+
+CXXSRCS=	RdKafka.cpp ConfImpl.cpp HandleImpl.cpp \
+		ConsumerImpl.cpp ProducerImpl.cpp KafkaConsumerImpl.cpp \
+		TopicImpl.cpp TopicPartitionImpl.cpp MessageImpl.cpp \
+		QueueImpl.cpp MetadataImpl.cpp
+
+HDRS=		rdkafkacpp.h
+
+OBJS=		$(CXXSRCS:%.cpp=%.o)
+
+
+
+all: lib check
+
+
+include ../mklove/Makefile.base
+
+# No linker script/symbol hiding for C++ library
+WITH_LDS=n
+
+# OSX and Cygwin requires linking required libraries
+ifeq ($(_UNAME_S),Darwin)
+	FWD_LINKING_REQ=y
+endif
+ifeq ($(_UNAME_S),AIX)
+	FWD_LINKING_REQ=y
+endif
+ifeq ($(shell uname -o 2>/dev/null),Cygwin)
+	FWD_LINKING_REQ=y
+endif
+
+# Ignore previously defined library dependencies for the C library,
+# we'll get those dependencies through the C library linkage.
+LIBS := -L../src -lrdkafka -lstdc++
+
+CHECK_FILES+= $(LIBFILENAME) $(LIBNAME).a
+
+
+file-check: lib
+check: file-check
+
+install: lib-install
+
+clean: lib-clean
+
+-include $(DEPS)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/MessageImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/MessageImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/MessageImpl.cpp
new file mode 100644
index 0000000..9562402
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/MessageImpl.cpp
@@ -0,0 +1,38 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <string>
+#include <list>
+#include <cerrno>
+
+#include "rdkafkacpp_int.h"
+
+
+RdKafka::Message::~Message() {}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/MetadataImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/MetadataImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/MetadataImpl.cpp
new file mode 100644
index 0000000..c2869f5
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/MetadataImpl.cpp
@@ -0,0 +1,151 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafkacpp_int.h"
+
+using namespace RdKafka;
+
+BrokerMetadata::~BrokerMetadata() {};
+PartitionMetadata::~PartitionMetadata() {};
+TopicMetadata::~TopicMetadata() {};
+Metadata::~Metadata() {};
+
+
+/**
+ * Metadata: Broker information handler implementation
+ */
+class BrokerMetadataImpl : public BrokerMetadata {
+ public:
+  BrokerMetadataImpl(const rd_kafka_metadata_broker_t *broker_metadata)
+  :broker_metadata_(broker_metadata),host_(broker_metadata->host) {}
+
+  int32_t      id() const{return broker_metadata_->id;}
+
+  const std::string host() const {return host_;}
+  int port() const {return broker_metadata_->port;}
+
+  virtual ~BrokerMetadataImpl() {}
+
+ private:
+  const rd_kafka_metadata_broker_t *broker_metadata_;
+  const std::string host_;
+};
+
+/**
+ * Metadata: Partition information handler
+ */
+class PartitionMetadataImpl : public PartitionMetadata {
+ public:
+  // @TODO too much memory copy? maybe we should create a new vector class that read directly from C arrays?
+  // @TODO use auto_ptr?
+  PartitionMetadataImpl(const rd_kafka_metadata_partition_t *partition_metadata)
+  :partition_metadata_(partition_metadata) {
+    replicas_.reserve(partition_metadata->replica_cnt);
+    for(int i=0;i<partition_metadata->replica_cnt;++i)
+      replicas_.push_back(partition_metadata->replicas[i]);
+
+    isrs_.reserve(partition_metadata->isr_cnt);
+    for(int i=0;i<partition_metadata->isr_cnt;++i)
+      isrs_.push_back(partition_metadata->isrs[i]);
+  }
+
+  int32_t                    id() const {
+    return partition_metadata_->id;
+  }
+  int32_t                    leader() const {
+    return partition_metadata_->leader;
+  }
+  ErrorCode                  err() const {
+    return static_cast<ErrorCode>(partition_metadata_->err);
+  }
+
+  const std::vector<int32_t> *replicas() const {return &replicas_;}
+  const std::vector<int32_t> *isrs() const {return &isrs_;}
+
+  ~PartitionMetadataImpl() {};
+
+ private:
+  const rd_kafka_metadata_partition_t *partition_metadata_;
+  std::vector<int32_t> replicas_,isrs_;
+};
+
+/**
+ * Metadata: Topic information handler
+ */
+class TopicMetadataImpl : public TopicMetadata{
+ public:
+  TopicMetadataImpl(const rd_kafka_metadata_topic_t *topic_metadata)
+  :topic_metadata_(topic_metadata),topic_(topic_metadata->topic) {
+    partitions_.reserve(topic_metadata->partition_cnt);
+    for(int i=0;i<topic_metadata->partition_cnt;++i)
+      partitions_.push_back(
+        new PartitionMetadataImpl(&topic_metadata->partitions[i])
+      );
+  }
+
+  ~TopicMetadataImpl(){
+    for(size_t i=0;i<partitions_.size();++i)
+      delete partitions_[i];
+  }
+
+  const std::string topic() const {return topic_;}
+  const std::vector<const PartitionMetadata *> *partitions() const {
+    return &partitions_;
+  }
+  ErrorCode err() const {return static_cast<ErrorCode>(topic_metadata_->err);}
+
+ private:
+  const rd_kafka_metadata_topic_t *topic_metadata_;
+  const std::string topic_;
+  std::vector<const PartitionMetadata *> partitions_;
+
+};
+
+MetadataImpl::MetadataImpl(const rd_kafka_metadata_t *metadata)
+:metadata_(metadata)
+{
+  brokers_.reserve(metadata->broker_cnt);
+  for(int i=0;i<metadata->broker_cnt;++i)
+    brokers_.push_back(new BrokerMetadataImpl(&metadata->brokers[i]));
+
+  topics_.reserve(metadata->topic_cnt);
+  for(int i=0;i<metadata->topic_cnt;++i)
+    topics_.push_back(new TopicMetadataImpl(&metadata->topics[i]));
+
+}
+
+MetadataImpl::~MetadataImpl() {
+  for(size_t i=0;i<brokers_.size();++i)
+    delete brokers_[i];
+  for(size_t i=0;i<topics_.size();++i)
+    delete topics_[i];
+
+
+  if(metadata_)
+    rd_kafka_metadata_destroy(metadata_);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/ProducerImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/ProducerImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/ProducerImpl.cpp
new file mode 100644
index 0000000..456bc33
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/ProducerImpl.cpp
@@ -0,0 +1,167 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <string>
+#include <list>
+#include <cerrno>
+
+#include "rdkafkacpp_int.h"
+
+
+RdKafka::Producer::~Producer () {
+
+}
+
+static void dr_msg_cb_trampoline (rd_kafka_t *rk,
+                                  const rd_kafka_message_t *
+                                  rkmessage,
+                                  void *opaque) {
+  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
+  RdKafka::MessageImpl message(NULL, (rd_kafka_message_t *)rkmessage, false);
+  handle->dr_cb_->dr_cb(message);
+}
+
+
+
+RdKafka::Producer *RdKafka::Producer::create (RdKafka::Conf *conf,
+                                              std::string &errstr) {
+  char errbuf[512];
+  RdKafka::ConfImpl *confimpl = dynamic_cast<RdKafka::ConfImpl *>(conf);
+  RdKafka::ProducerImpl *rkp = new RdKafka::ProducerImpl();
+  rd_kafka_conf_t *rk_conf = NULL;
+
+  if (confimpl) {
+    if (!confimpl->rk_conf_) {
+      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
+      delete rkp;
+      return NULL;
+    }
+
+    rkp->set_common_config(confimpl);
+
+    rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_);
+
+    if (confimpl->dr_cb_) {
+      rd_kafka_conf_set_dr_msg_cb(rk_conf, dr_msg_cb_trampoline);
+      rkp->dr_cb_ = confimpl->dr_cb_;
+    }
+  }
+
+
+  rd_kafka_t *rk;
+  if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf,
+                          errbuf, sizeof(errbuf)))) {
+    errstr = errbuf;
+    delete rkp;
+    return NULL;
+  }
+
+  rkp->rk_ = rk;
+
+  return rkp;
+}
+
+
+RdKafka::ErrorCode RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,
+                                                   int32_t partition,
+                                                   int msgflags,
+                                                   void *payload, size_t len,
+                                                   const std::string *key,
+                                                   void *msg_opaque) {
+  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
+
+  if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags,
+                       payload, len,
+                       key ? key->c_str() : NULL, key ? key->size() : 0,
+                       msg_opaque) == -1)
+    return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
+
+  return RdKafka::ERR_NO_ERROR;
+}
+
+
+RdKafka::ErrorCode RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,
+                                                   int32_t partition,
+                                                   int msgflags,
+                                                   void *payload, size_t len,
+                                                   const void *key,
+                                                   size_t key_len,
+                                                   void *msg_opaque) {
+  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
+
+  if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags,
+                       payload, len, key, key_len,
+                       msg_opaque) == -1)
+    return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
+
+  return RdKafka::ERR_NO_ERROR;
+}
+
+
+RdKafka::ErrorCode
+RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,
+                                int32_t partition,
+                                const std::vector<char> *payload,
+                                const std::vector<char> *key,
+                                void *msg_opaque) {
+  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
+
+  if (rd_kafka_produce(topicimpl->rkt_, partition, RD_KAFKA_MSG_F_COPY,
+                       payload ? (void *)&(*payload)[0] : NULL,
+                       payload ? payload->size() : 0,
+                       key ? &(*key)[0] : NULL, key ? key->size() : 0,
+                       msg_opaque) == -1)
+    return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
+
+  return RdKafka::ERR_NO_ERROR;
+
+}
+
+
+RdKafka::ErrorCode
+RdKafka::ProducerImpl::produce (const std::string topic_name,
+                                int32_t partition, int msgflags,
+                                void *payload, size_t len,
+                                const void *key, size_t key_len,
+                                int64_t timestamp,
+                                void *msg_opaque) {
+  return
+    static_cast<RdKafka::ErrorCode>
+    (
+     rd_kafka_producev(rk_,
+                       RD_KAFKA_V_TOPIC(topic_name.c_str()),
+                       RD_KAFKA_V_PARTITION(partition),
+                       RD_KAFKA_V_MSGFLAGS(msgflags),
+                       RD_KAFKA_V_VALUE(payload, len),
+                       RD_KAFKA_V_KEY(key, key_len),
+                       RD_KAFKA_V_TIMESTAMP(timestamp),
+                       RD_KAFKA_V_OPAQUE(msg_opaque),
+                       RD_KAFKA_V_END)
+     );
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/QueueImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/QueueImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/QueueImpl.cpp
new file mode 100644
index 0000000..1d8ce93
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/QueueImpl.cpp
@@ -0,0 +1,71 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <cerrno>
+
+#include "rdkafkacpp_int.h"
+
+RdKafka::Queue::~Queue () {
+
+}
+
+RdKafka::Queue *RdKafka::Queue::create (Handle *base) {
+  RdKafka::QueueImpl *queueimpl = new RdKafka::QueueImpl;
+  queueimpl->queue_ = rd_kafka_queue_new(dynamic_cast<HandleImpl*>(base)->rk_);
+  return queueimpl;
+}
+
+RdKafka::ErrorCode
+RdKafka::QueueImpl::forward (Queue *queue) {
+  if (!queue) {
+    rd_kafka_queue_forward(queue_, NULL);
+  } else {
+    QueueImpl *queueimpl = dynamic_cast<QueueImpl *>(queue);
+    rd_kafka_queue_forward(queue_, queueimpl->queue_);
+  }
+  return RdKafka::ERR_NO_ERROR;
+}
+
+RdKafka::Message *RdKafka::QueueImpl::consume (int timeout_ms) {
+  rd_kafka_message_t *rkmessage;
+  rkmessage = rd_kafka_consume_queue(queue_, timeout_ms);
+
+  if (!rkmessage)
+    return new RdKafka::MessageImpl(NULL, RdKafka::ERR__TIMED_OUT);
+
+  return new RdKafka::MessageImpl(rkmessage);
+}
+
+int RdKafka::QueueImpl::poll (int timeout_ms) {
+        return rd_kafka_queue_poll_callback(queue_, timeout_ms);
+}
+
+void RdKafka::QueueImpl::io_event_enable (int fd, const void *payload,
+                                          size_t size) {
+        rd_kafka_queue_io_event_enable(queue_, fd, payload, size);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/README.md
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/README.md b/thirdparty/librdkafka-0.11.4/src-cpp/README.md
new file mode 100644
index 0000000..a484589
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/README.md
@@ -0,0 +1,16 @@
+librdkafka C++ interface
+========================
+
+**See rdkafkacpp.h for the public C++ API**
+
+
+
+Maintainer notes for the C++ interface:
+
+ * The public C++ interface (rdkafkacpp.h) does not include the
+   public C interface (rdkafka.h) in any way, this means that all
+   constants, flags, etc, must be kept in sync manually between the two
+   header files.
+   A regression test should be implemented that checks this is true.
+
+ * The public C++ interface is provided using pure virtual abstract classes.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/RdKafka.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/RdKafka.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/RdKafka.cpp
new file mode 100644
index 0000000..7b67a7b
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/RdKafka.cpp
@@ -0,0 +1,52 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <string>
+
+#include "rdkafkacpp_int.h"
+
+int RdKafka::version () {
+  return rd_kafka_version();
+}
+
+std::string RdKafka::version_str () {
+  return std::string(rd_kafka_version_str());
+}
+
+std::string RdKafka::get_debug_contexts() {
+	return std::string(RD_KAFKA_DEBUG_CONTEXTS);
+}
+
+std::string RdKafka::err2str (RdKafka::ErrorCode err) {
+  return std::string(rd_kafka_err2str(static_cast<rd_kafka_resp_err_t>(err)));
+}
+
+int RdKafka::wait_destroyed (int timeout_ms) {
+  return rd_kafka_wait_destroyed(timeout_ms);
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/TopicImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/TopicImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/TopicImpl.cpp
new file mode 100644
index 0000000..cd80a4b
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/TopicImpl.cpp
@@ -0,0 +1,128 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2014 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <string>
+#include <list>
+#include <cerrno>
+
+#include "rdkafkacpp_int.h"
+
+const int32_t RdKafka::Topic::PARTITION_UA = RD_KAFKA_PARTITION_UA;
+
+const int64_t RdKafka::Topic::OFFSET_BEGINNING = RD_KAFKA_OFFSET_BEGINNING;
+
+const int64_t RdKafka::Topic::OFFSET_END = RD_KAFKA_OFFSET_END;
+
+const int64_t RdKafka::Topic::OFFSET_STORED = RD_KAFKA_OFFSET_STORED;
+
+const int64_t RdKafka::Topic::OFFSET_INVALID = RD_KAFKA_OFFSET_INVALID;
+
+RdKafka::Topic::~Topic () {
+
+}
+
+static int32_t partitioner_cb_trampoline (const rd_kafka_topic_t *rkt,
+                                          const void *keydata,
+                                          size_t keylen,
+                                          int32_t partition_cnt,
+                                          void *rkt_opaque,
+                                          void *msg_opaque) {
+  RdKafka::TopicImpl *topicimpl = static_cast<RdKafka::TopicImpl *>(rkt_opaque);
+  std::string key(static_cast<const char *>(keydata), keylen);
+  return topicimpl->partitioner_cb_->partitioner_cb(topicimpl, &key,
+                                                    partition_cnt, msg_opaque);
+}
+
+static int32_t partitioner_kp_cb_trampoline (const rd_kafka_topic_t *rkt,
+                                             const void *keydata,
+                                             size_t keylen,
+                                             int32_t partition_cnt,
+                                             void *rkt_opaque,
+                                             void *msg_opaque) {
+  RdKafka::TopicImpl *topicimpl = static_cast<RdKafka::TopicImpl *>(rkt_opaque);
+  return topicimpl->partitioner_kp_cb_->partitioner_cb(topicimpl,
+                                                       keydata, keylen,
+                                                       partition_cnt,
+                                                       msg_opaque);
+}
+
+
+
+RdKafka::Topic *RdKafka::Topic::create (Handle *base,
+					const std::string &topic_str,
+					Conf *conf,
+					std::string &errstr) {
+  RdKafka::ConfImpl *confimpl = static_cast<RdKafka::ConfImpl *>(conf);
+  rd_kafka_topic_t *rkt;
+  rd_kafka_topic_conf_t *rkt_conf;
+  rd_kafka_t *rk = dynamic_cast<HandleImpl*>(base)->rk_;
+
+  RdKafka::TopicImpl *topic = new RdKafka::TopicImpl();
+
+  if (!confimpl) {
+    /* Reuse default topic config, but we need our own copy to
+     * set the topic opaque. */
+    rkt_conf = rd_kafka_default_topic_conf_dup(rk);
+  } else {
+    /* Make a copy of conf struct to allow Conf reuse. */
+    rkt_conf = rd_kafka_topic_conf_dup(confimpl->rkt_conf_);
+  }
+
+  /* Set topic opaque to the topic so that we can reach our topic object
+   * from whatever callbacks get registered.
+   * The application itself will not need these opaques since their
+   * callbacks are class based. */
+  rd_kafka_topic_conf_set_opaque(rkt_conf, static_cast<void *>(topic));
+
+  if (confimpl) {
+    if (confimpl->partitioner_cb_) {
+      rd_kafka_topic_conf_set_partitioner_cb(rkt_conf,
+                                             partitioner_cb_trampoline);
+      topic->partitioner_cb_ = confimpl->partitioner_cb_;
+    } else if (confimpl->partitioner_kp_cb_) {
+      rd_kafka_topic_conf_set_partitioner_cb(rkt_conf,
+                                             partitioner_kp_cb_trampoline);
+      topic->partitioner_kp_cb_ = confimpl->partitioner_kp_cb_;
+    }
+  }
+
+
+  if (!(rkt = rd_kafka_topic_new(rk, topic_str.c_str(), rkt_conf))) {
+    errstr = rd_kafka_err2str(rd_kafka_last_error());
+    delete topic;
+    rd_kafka_topic_conf_destroy(rkt_conf);
+    return NULL;
+  }
+
+  topic->rkt_ = rkt;
+
+  return topic;
+
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/src-cpp/TopicPartitionImpl.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/src-cpp/TopicPartitionImpl.cpp b/thirdparty/librdkafka-0.11.4/src-cpp/TopicPartitionImpl.cpp
new file mode 100644
index 0000000..71a688c
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/src-cpp/TopicPartitionImpl.cpp
@@ -0,0 +1,55 @@
+/*
+ * librdkafka - Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <string>
+#include <vector>
+
+#include "rdkafkacpp_int.h"
+
+RdKafka::TopicPartition::~TopicPartition () {
+}
+
+RdKafka::TopicPartition *
+RdKafka::TopicPartition::create (const std::string &topic, int partition) {
+  return new TopicPartitionImpl(topic, partition);
+}
+
+RdKafka::TopicPartition *
+RdKafka::TopicPartition::create (const std::string &topic, int partition,
+                                 int64_t offset) {
+  return new TopicPartitionImpl(topic, partition, offset);
+}
+
+void
+RdKafka::TopicPartition::destroy (std::vector<TopicPartition*> &partitions) {
+  for (std::vector<TopicPartition*>::iterator it = partitions.begin() ;
+       it != partitions.end(); ++it)
+    delete(*it);
+  partitions.clear();
+}