You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:36 UTC

[08/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade to librdkafka 0.11.4

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/examples/CMakeLists.txt b/thirdparty/librdkafka-0.11.4/examples/CMakeLists.txt
new file mode 100644
index 0000000..dae7f9a
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/examples/CMakeLists.txt
@@ -0,0 +1,30 @@
+if(WIN32)
+    set(win32_sources ../win32/wingetopt.c ../win32/wingetopt.h)
+    set(win32_compile_defs "LIBRDKAFKACPP_EXPORTS=0")
+endif(WIN32)
+
+add_executable(rdkafka_simple_producer rdkafka_simple_producer.c ${win32_sources})
+target_link_libraries(rdkafka_simple_producer PUBLIC rdkafka)
+
+add_executable(rdkafka_performance rdkafka_performance.c ${win32_sources})
+target_link_libraries(rdkafka_performance PUBLIC rdkafka)
+
+add_executable(rdkafka_example_cpp rdkafka_example.cpp ${win32_sources})
+target_link_libraries(rdkafka_example_cpp PUBLIC rdkafka++)
+target_compile_definitions(rdkafka_example_cpp PRIVATE ${win32_compile_defs})
+
+add_executable(rdkafka_consumer_example_cpp rdkafka_consumer_example.cpp ${win32_sources})
+target_link_libraries(rdkafka_consumer_example_cpp PUBLIC rdkafka++)
+target_compile_definitions(rdkafka_consumer_example_cpp PRIVATE ${win32_compile_defs})
+
+# The targets below has Unix include dirs and do not compile on Windows.
+if(NOT WIN32)
+    add_executable(rdkafka_example rdkafka_example.c)
+    target_link_libraries(rdkafka_example PUBLIC rdkafka)
+    
+    add_executable(rdkafka_consumer_example rdkafka_consumer_example.c)
+    target_link_libraries(rdkafka_consumer_example PUBLIC rdkafka)
+    
+    add_executable(kafkatest_verifiable_client kafkatest_verifiable_client.cpp)
+    target_link_libraries(kafkatest_verifiable_client PUBLIC rdkafka++)
+endif(NOT WIN32)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/examples/Makefile
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/examples/Makefile b/thirdparty/librdkafka-0.11.4/examples/Makefile
new file mode 100644
index 0000000..d3e0832
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/examples/Makefile
@@ -0,0 +1,96 @@
+EXAMPLES ?= rdkafka_example rdkafka_performance rdkafka_example_cpp \
+	rdkafka_consumer_example rdkafka_consumer_example_cpp \
+	kafkatest_verifiable_client rdkafka_simple_producer
+
+all: $(EXAMPLES)
+
+include ../mklove/Makefile.base
+
+CFLAGS += -I../src
+CXXFLAGS += -I../src-cpp
+
+# librdkafka must be compiled with -gstrict-dwarf, but rdkafka_example must not,
+# due to some clang bug on OSX 10.9
+CPPFLAGS := $(subst strict-dwarf,,$(CPPFLAGS))
+
+rdkafka_example: ../src/librdkafka.a rdkafka_example.c
+	$(CC) $(CPPFLAGS) $(CFLAGS) rdkafka_example.c -o $@ $(LDFLAGS) \
+		../src/librdkafka.a $(LIBS)
+	@echo "# $@ is ready"
+	@echo "#"
+	@echo "# Run producer (write messages on stdin)"
+	@echo "./$@ -P -t <topic> -p <partition>"
+	@echo ""
+	@echo "# or consumer"
+	@echo "./$@ -C -t <topic> -p <partition>"
+	@echo ""
+	@echo "#"
+	@echo "# More usage options:"
+	@echo "./$@ -h"
+
+rdkafka_simple_producer: ../src/librdkafka.a rdkafka_simple_producer.c
+	$(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \
+		../src/librdkafka.a $(LIBS)
+
+rdkafka_consumer_example: ../src/librdkafka.a rdkafka_consumer_example.c
+	$(CC) $(CPPFLAGS) $(CFLAGS) rdkafka_consumer_example.c -o $@ $(LDFLAGS) \
+		../src/librdkafka.a $(LIBS)
+	@echo "# $@ is ready"
+	@echo "#"
+	@echo "./$@ <topic[:part]> <topic2[:part]> .."
+	@echo ""
+	@echo "#"
+	@echo "# More usage options:"
+	@echo "./$@ -h"
+
+rdkafka_performance: ../src/librdkafka.a rdkafka_performance.c
+	$(CC) $(CPPFLAGS) $(CFLAGS) rdkafka_performance.c -o $@ $(LDFLAGS) \
+		../src/librdkafka.a $(LIBS)
+	@echo "# $@ is ready"
+	@echo "#"
+	@echo "# Run producer"
+	@echo "./$@ -P -t <topic> -p <partition> -s <msgsize>"
+	@echo ""
+	@echo "# or consumer"
+	@echo "./$@ -C -t <topic> -p <partition>"
+	@echo ""
+	@echo "#"
+	@echo "# More usage options:"
+	@echo "./$@ -h"
+
+
+rdkafka_example_cpp: ../src-cpp/librdkafka++.a ../src/librdkafka.a rdkafka_example.cpp
+	$(CXX) $(CPPFLAGS) $(CXXFLAGS) rdkafka_example.cpp -o $@ $(LDFLAGS) \
+		../src-cpp/librdkafka++.a ../src/librdkafka.a $(LIBS) -lstdc++
+
+kafkatest_verifiable_client: ../src-cpp/librdkafka++.a ../src/librdkafka.a kafkatest_verifiable_client.cpp
+	$(CXX) $(CPPFLAGS) $(CXXFLAGS) kafkatest_verifiable_client.cpp -o $@ $(LDFLAGS) \
+		../src-cpp/librdkafka++.a ../src/librdkafka.a $(LIBS) -lstdc++
+
+
+rdkafka_consumer_example_cpp: ../src-cpp/librdkafka++.a ../src/librdkafka.a rdkafka_consumer_example.cpp
+	$(CXX) $(CPPFLAGS) $(CXXFLAGS) rdkafka_consumer_example.cpp -o $@ $(LDFLAGS) \
+		../src-cpp/librdkafka++.a ../src/librdkafka.a $(LIBS) -lstdc++
+
+rdkafka_consume_batch: ../src-cpp/librdkafka++.a ../src/librdkafka.a rdkafka_consume_batch.cpp
+	$(CXX) $(CPPFLAGS) $(CXXFLAGS) rdkafka_consume_batch.cpp -o $@ $(LDFLAGS) \
+		../src-cpp/librdkafka++.a ../src/librdkafka.a $(LIBS) -lstdc++
+
+rdkafka_zookeeper_example: ../src/librdkafka.a rdkafka_zookeeper_example.c
+	$(CC) $(CPPFLAGS) $(CFLAGS) -I/usr/include/zookeeper rdkafka_zookeeper_example.c -o $@ $(LDFLAGS) \
+		../src/librdkafka.a $(LIBS) -lzookeeper_mt -ljansson
+	@echo "# $@ is ready"
+	@echo "#"
+	@echo "# Run producer (write messages on stdin)"
+	@echo "./$@ -P -t <topic> -p <partition>"
+	@echo ""
+	@echo "# or consumer"
+	@echo "./$@ -C -t <topic> -p <partition>"
+	@echo ""
+	@echo "#"
+	@echo "# More usage options:"
+	@echo "./$@ -h"
+
+clean:
+	rm -f $(EXAMPLES)
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/examples/globals.json
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/examples/globals.json b/thirdparty/librdkafka-0.11.4/examples/globals.json
new file mode 100644
index 0000000..527e126
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/examples/globals.json
@@ -0,0 +1,11 @@
+{"VerifiableConsumer":
+ {
+     "class": "kafkatest.services.verifiable_client.VerifiableClientApp",
+     "exec_cmd": "/vagrant/tests/c/kafkatest_verifiable_client --consumer --debug cgrp,topic,protocol,broker"
+ },
+ "VerifiableProducer":
+ {
+     "class": "kafkatest.services.verifiable_client.VerifiableClientApp",
+     "exec_cmd": "/vagrant/tests/c/kafkatest_verifiable_client --producer --debug topic,broker"
+ }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/examples/kafkatest_verifiable_client.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/examples/kafkatest_verifiable_client.cpp b/thirdparty/librdkafka-0.11.4/examples/kafkatest_verifiable_client.cpp
new file mode 100644
index 0000000..26e1ae0
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/examples/kafkatest_verifiable_client.cpp
@@ -0,0 +1,960 @@
+/*
+ * Copyright (c) 2015, Confluent Inc
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * librdkafka version of the Java VerifiableProducer and VerifiableConsumer
+ * for use with the official Kafka client tests.
+ */
+
+
+#include <iostream>
+#include <fstream>
+#include <sstream>
+#include <map>
+#include <string>
+#include <algorithm>
+#include <cstdlib>
+#include <cstdio>
+#include <csignal>
+#include <cstring>
+#include <unistd.h>
+#include <sys/time.h>
+#include <assert.h>
+#include <ctype.h>
+#include <strings.h>
+
+#ifdef _MSC_VER
+#include "../win32/wingetopt.h"
+#elif _AIX
+#include <unistd.h>
+#else
+#include <getopt.h>
+#endif
+
+/*
+ * Typically include path in a real application would be
+ * #include <librdkafka/rdkafkacpp.h>
+ */
+#include "rdkafkacpp.h"
+
+static bool run = true;
+static bool exit_eof = false;
+static int verbosity = 1;
+static std::string value_prefix;
+
+class Assignment {
+
+ public:
+  static std::string name (const std::string &t, int partition) {
+    std::stringstream stm;
+    stm << t << "." << partition;
+    return stm.str();
+  }
+
+  Assignment(): topic(""), partition(-1), consumedMessages(0),
+                minOffset(-1), maxOffset(0) {
+    printf("Created assignment\n");
+  }
+  Assignment(const Assignment &a) {
+    topic = a.topic;
+    partition = a.partition;
+    consumedMessages = a.consumedMessages;
+    minOffset = a.minOffset;
+    maxOffset = a.maxOffset;
+  }
+
+  Assignment &operator=(const Assignment &a) {
+    this->topic = a.topic;
+    this->partition = a.partition;
+    this->consumedMessages = a.consumedMessages;
+    this->minOffset = a.minOffset;
+    this->maxOffset = a.maxOffset;
+    return *this;
+  }
+
+  int operator==(const Assignment &a) const {
+    return !(this->topic == a.topic &&
+             this->partition == a.partition);
+  }
+
+  int operator<(const Assignment &a) const {
+    if (this->topic < a.topic) return 1;
+    if (this->topic >= a.topic) return 0;
+    return (this->partition < a.partition);
+  }
+
+  void setup (std::string t, int32_t p) {
+    assert(!t.empty());
+    assert(topic.empty() || topic == t);
+    assert(partition == -1 || partition == p);
+    topic = t;
+    partition = p;
+  }
+
+  std::string topic;
+  int partition;
+  int consumedMessages;
+  int64_t minOffset;
+  int64_t maxOffset;
+};
+
+
+
+
+static struct {
+  int maxMessages;
+
+  struct {
+    int numAcked;
+    int numSent;
+    int numErr;
+  } producer;
+
+  struct {
+    int consumedMessages;
+    int consumedMessagesLastReported;
+    int consumedMessagesAtLastCommit;
+    bool useAutoCommit;
+    std::map<std::string, Assignment> assignments;
+  } consumer;
+} state = {
+  /* .maxMessages = */ -1
+};
+
+
+static RdKafka::KafkaConsumer *consumer;
+
+
+static std::string now () {
+  struct timeval tv;
+  gettimeofday(&tv, NULL);
+  time_t t = tv.tv_sec;
+  struct tm tm;
+  char buf[64];
+
+  localtime_r(&t, &tm);
+  strftime(buf, sizeof(buf), "%H:%M:%S", &tm);
+  snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf), ".%03d",
+           (int)(tv.tv_usec / 1000));
+
+  return buf;
+}
+
+
+static time_t watchdog_last_kick;
+static const int watchdog_timeout = 20; /* Must be > socket.timeout.ms */
+static void sigwatchdog (int sig) {
+  time_t t = time(NULL);
+  if (watchdog_last_kick + watchdog_timeout <= t) {
+    std::cerr << now() << ": WATCHDOG TIMEOUT (" <<
+        (int)(t - watchdog_last_kick) << "s): TERMINATING" << std::endl;
+    int *i = NULL;
+    *i = 100;
+    abort();
+  }
+}
+
+static void watchdog_kick () {
+  watchdog_last_kick = time(NULL);
+
+  /* Safe guard against hangs-on-exit */
+  alarm(watchdog_timeout);
+}
+
+
+
+
+
+static void errorString (const std::string &name,
+                         const std::string &errmsg,
+                         const std::string &topic,
+                         const std::string *key,
+                         const std::string &value) {
+  std::cout << "{ "
+            << "\"name\": \"" << name << "\", "
+            << "\"_time\": \"" << now() << "\", "
+            << "\"message\": \"" << errmsg << "\", "
+            << "\"topic\": \"" << topic << "\", "
+            << "\"key\": \"" << (key ? *key : "NULL") << "\", "
+            << "\"value\": \"" << value << "\" "
+            << "}" << std::endl;
+}
+
+
+static void successString (const std::string &name,
+                           const std::string &topic,
+                           int partition,
+                           int64_t offset,
+                           const std::string *key,
+                           const std::string &value) {
+  std::cout << "{ "
+            << "\"name\": \"" << name << "\", "
+            << "\"_time\": \"" << now() << "\", "
+            << "\"topic\": \"" << topic << "\", "
+            << "\"partition\": " << partition << ", "
+            << "\"offset\": " << offset << ", "
+            << "\"key\": \"" << (key ? *key : "NULL") << "\", "
+            << "\"value\": \"" << value << "\" "
+            << "}" << std::endl;
+}
+
+
+#if FIXME
+static void offsetStatus (bool success,
+                          const std::string &topic,
+                          int partition,
+                          int64_t offset,
+                          const std::string &errstr) {
+  std::cout << "{ "
+      "\"name\": \"offsets_committed\", " <<
+      "\"success\": " << success << ", " <<
+      "\"offsets\": [ " <<
+      " { " <<
+      " \"topic\": \"" << topic << "\", " <<
+      " \"partition\": " << partition << ", " <<
+      " \"offset\": " << (int)offset << ", " <<
+      " \"error\": \"" << errstr << "\" " <<
+      " } " <<
+      "] }" << std::endl;
+
+}
+#endif
+
+
+static void sigterm (int sig) {
+
+  std::cerr << now() << ": Terminating because of signal " << sig << std::endl;
+
+  if (!run) {
+    std::cerr << now() << ": Forced termination" << std::endl;
+    exit(1);
+  }
+  run = false;
+}
+
+
+class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
+ public:
+  void dr_cb (RdKafka::Message &message) {
+    if (message.err()) {
+      state.producer.numErr++;
+      errorString("producer_send_error", message.errstr(),
+                  message.topic_name(),
+                  message.key(),
+                  std::string(static_cast<const char*>(message.payload()),
+                              message.len()));
+    } else {
+      successString("producer_send_success",
+                    message.topic_name(),
+                    (int)message.partition(),
+                    message.offset(),
+                    message.key(),
+                    std::string(static_cast<const char*>(message.payload()),
+                                message.len()));
+      state.producer.numAcked++;
+    }
+  }
+};
+
+
+class ExampleEventCb : public RdKafka::EventCb {
+ public:
+  void event_cb (RdKafka::Event &event) {
+    switch (event.type())
+    {
+      case RdKafka::Event::EVENT_ERROR:
+        std::cerr << now() << ": ERROR (" << RdKafka::err2str(event.err()) << "): " <<
+            event.str() << std::endl;
+        break;
+
+      case RdKafka::Event::EVENT_STATS:
+        std::cerr << now() << ": \"STATS\": " << event.str() << std::endl;
+        break;
+
+      case RdKafka::Event::EVENT_LOG:
+        std::cerr << now() << ": LOG-" << event.severity() << "-"
+                  << event.fac() << ": " << event.str() << std::endl;
+        break;
+
+      default:
+        std::cerr << now() << ": EVENT " << event.type() <<
+            " (" << RdKafka::err2str(event.err()) << "): " <<
+            event.str() << std::endl;
+        break;
+    }
+  }
+};
+
+
+/* Use of this partitioner is pretty pointless since no key is provided
+ * in the produce() call. */
+class MyHashPartitionerCb : public RdKafka::PartitionerCb {
+ public:
+  int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,
+                          int32_t partition_cnt, void *msg_opaque) {
+    return djb_hash(key->c_str(), key->size()) % partition_cnt;
+  }
+ private:
+
+  static inline unsigned int djb_hash (const char *str, size_t len) {
+    unsigned int hash = 5381;
+    for (size_t i = 0 ; i < len ; i++)
+      hash = ((hash << 5) + hash) + str[i];
+    return hash;
+  }
+};
+
+
+
+
+
+/**
+ * Print number of records consumed, every 100 messages or on timeout.
+ */
+static void report_records_consumed (int immediate) {
+  std::map<std::string,Assignment> *assignments = &state.consumer.assignments;
+
+  if (state.consumer.consumedMessages <=
+      state.consumer.consumedMessagesLastReported + (immediate ? 0 : 999))
+    return;
+
+  std::cout << "{ "
+      "\"name\": \"records_consumed\", " <<
+      "\"_totcount\": " << state.consumer.consumedMessages << ", " <<
+      "\"count\": " << (state.consumer.consumedMessages -
+                        state.consumer.consumedMessagesLastReported) << ", " <<
+      "\"partitions\": [ ";
+
+  for (std::map<std::string,Assignment>::iterator ii = assignments->begin() ;
+       ii != assignments->end() ; ii++) {
+    Assignment *a = &(*ii).second;
+    assert(!a->topic.empty());
+    std::cout << (ii == assignments->begin() ? "": ", ") << " { " <<
+        " \"topic\": \"" << a->topic << "\", " <<
+        " \"partition\": " << a->partition << ", " <<
+        " \"minOffset\": " << a->minOffset << ", " <<
+        " \"maxOffset\": " << a->maxOffset << " " <<
+        " } ";
+    a->minOffset = -1;
+  }
+
+  std::cout << "] }" << std::endl;
+
+  state.consumer.consumedMessagesLastReported = state.consumer.consumedMessages;
+}
+
+
+class ExampleOffsetCommitCb : public RdKafka::OffsetCommitCb {
+ public:
+  void offset_commit_cb (RdKafka::ErrorCode err,
+                         std::vector<RdKafka::TopicPartition*> &offsets) {
+    std::cerr << now() << ": Propagate offset for " << offsets.size() << " partitions, error: " << RdKafka::err2str(err) << std::endl;
+
+    /* No offsets to commit, dont report anything. */
+    if (err == RdKafka::ERR__NO_OFFSET)
+      return;
+
+    /* Send up-to-date records_consumed report to make sure consumed > committed */
+    report_records_consumed(1);
+
+    std::cout << "{ " <<
+        "\"name\": \"offsets_committed\", " <<
+        "\"success\": " << (err ? "false" : "true") << ", " <<
+        "\"error\": \"" << (err ? RdKafka::err2str(err) : "") << "\", " <<
+        "\"_autocommit\": " << (state.consumer.useAutoCommit ? "true":"false") << ", " <<
+        "\"offsets\": [ ";
+    assert(offsets.size() > 0);
+    for (unsigned int i = 0 ; i < offsets.size() ; i++) {
+      std::cout << (i == 0 ? "" : ", ") << "{ " <<
+          " \"topic\": \"" << offsets[i]->topic() << "\", " <<
+          " \"partition\": " << offsets[i]->partition() << ", " <<
+          " \"offset\": " << (int)offsets[i]->offset() << ", " <<
+          " \"error\": \"" <<
+          (offsets[i]->err() ? RdKafka::err2str(offsets[i]->err()) : "") <<
+          "\" " <<
+          " }";
+    }
+    std::cout << " ] }" << std::endl;
+
+  }
+};
+
+static ExampleOffsetCommitCb ex_offset_commit_cb;
+
+
+/**
+ * Commit every 1000 messages or whenever there is a consume timeout.
+ */
+static void do_commit (RdKafka::KafkaConsumer *consumer,
+                      int immediate) {
+  if (!immediate &&
+      (state.consumer.useAutoCommit ||
+       state.consumer.consumedMessagesAtLastCommit + 1000 >
+       state.consumer.consumedMessages))
+    return;
+
+  /* Make sure we report consumption before commit,
+   * otherwise tests may fail because of commit > consumed. */
+  if (state.consumer.consumedMessagesLastReported <
+      state.consumer.consumedMessages)
+    report_records_consumed(1);
+
+  std::cerr << now() << ": committing " <<
+    (state.consumer.consumedMessages -
+     state.consumer.consumedMessagesAtLastCommit) << " messages" << std::endl;
+
+  RdKafka::ErrorCode err;
+  err = consumer->commitSync(&ex_offset_commit_cb);
+
+  std::cerr << now() << ": " <<
+    "sync commit returned " << RdKafka::err2str(err) << std::endl;
+
+  state.consumer.consumedMessagesAtLastCommit =
+    state.consumer.consumedMessages;
+}
+
+
+void msg_consume(RdKafka::KafkaConsumer *consumer,
+                 RdKafka::Message* msg, void* opaque) {
+  switch (msg->err()) {
+    case RdKafka::ERR__TIMED_OUT:
+      /* Try reporting consumed messages */
+      report_records_consumed(1);
+      /* Commit one every consume() timeout instead of on every message.
+       * Also commit on every 1000 messages, whichever comes first. */
+      do_commit(consumer, 1);
+      break;
+
+
+    case RdKafka::ERR_NO_ERROR:
+      {
+        /* Real message */
+        if (verbosity > 2)
+          std::cerr << now() << ": Read msg from " << msg->topic_name() <<
+              " [" << (int)msg->partition() << "]  at offset " <<
+              msg->offset() << std::endl;
+
+        if (state.maxMessages >= 0 &&
+            state.consumer.consumedMessages >= state.maxMessages)
+          return;
+
+
+        Assignment *a =
+            &state.consumer.assignments[Assignment::name(msg->topic_name(),
+                                                         msg->partition())];
+        a->setup(msg->topic_name(), msg->partition());
+
+        a->consumedMessages++;
+        if (a->minOffset == -1)
+          a->minOffset = msg->offset();
+        if (a->maxOffset < msg->offset())
+          a->maxOffset = msg->offset();
+
+        if (msg->key()) {
+          if (verbosity >= 3)
+            std::cerr << now() << ": Key: " << *msg->key() << std::endl;
+        }
+
+        if (verbosity >= 3)
+          fprintf(stderr, "%.*s\n",
+                  static_cast<int>(msg->len()),
+                  static_cast<const char *>(msg->payload()));
+
+        state.consumer.consumedMessages++;
+
+        report_records_consumed(0);
+
+        do_commit(consumer, 0);
+      }
+      break;
+
+    case RdKafka::ERR__PARTITION_EOF:
+      /* Last message */
+      if (exit_eof) {
+        std::cerr << now() << ": Terminate: exit on EOF" << std::endl;
+        run = false;
+      }
+      break;
+
+    case RdKafka::ERR__UNKNOWN_TOPIC:
+    case RdKafka::ERR__UNKNOWN_PARTITION:
+      std::cerr << now() << ": Consume failed: " << msg->errstr() << std::endl;
+      run = false;
+      break;
+
+    case RdKafka::ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
+      std::cerr << now() << ": Warning: " << msg->errstr() << std::endl;
+      break;
+
+    default:
+      /* Errors */
+      std::cerr << now() << ": Consume failed: " << msg->errstr() << std::endl;
+      run = false;
+  }
+}
+
+
+
+
+class ExampleConsumeCb : public RdKafka::ConsumeCb {
+ public:
+  void consume_cb (RdKafka::Message &msg, void *opaque) {
+    msg_consume(consumer_, &msg, opaque);
+  }
+  RdKafka::KafkaConsumer *consumer_;
+};
+
+class ExampleRebalanceCb : public RdKafka::RebalanceCb {
+ private:
+  static std::string part_list_json (const std::vector<RdKafka::TopicPartition*> &partitions) {
+    std::ostringstream out;
+    for (unsigned int i = 0 ; i < partitions.size() ; i++)
+      out << (i==0?"":", ") << "{ " <<
+          " \"topic\": \"" << partitions[i]->topic() << "\", " <<
+          " \"partition\": " << partitions[i]->partition() <<
+          " }";
+    return out.str();
+  }
+ public:
+  void rebalance_cb (RdKafka::KafkaConsumer *consumer,
+                     RdKafka::ErrorCode err,
+                     std::vector<RdKafka::TopicPartition*> &partitions) {
+
+    std::cerr << now() << ": rebalance_cb " << RdKafka::err2str(err) <<
+        " for " << partitions.size() << " partitions" << std::endl;
+    /* Send message report prior to rebalancing event to make sure they
+     * are accounted for on the "right side" of the rebalance. */
+    report_records_consumed(1);
+
+    if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
+      consumer->assign(partitions);
+    else {
+      do_commit(consumer, 1);
+      consumer->unassign();
+    }
+
+    std::cout <<
+      "{ " <<
+      "\"name\": \"partitions_" << (err == RdKafka::ERR__ASSIGN_PARTITIONS ?
+                                    "assigned" : "revoked") << "\", " <<
+      "\"partitions\": [ " << part_list_json(partitions) << "] }" << std::endl;
+
+  }
+};
+
+
+
+/**
+ * @brief Read (Java client) configuration file
+ */
+static void read_conf_file (RdKafka::Conf *conf, const std::string &conf_file) {
+  std::ifstream inf(conf_file.c_str());
+
+  if (!inf) {
+    std::cerr << now() << ": " << conf_file << ": could not open file" << std::endl;
+    exit(1);
+  }
+
+  std::cerr << now() << ": " << conf_file << ": read config file" << std::endl;
+
+  std::string line;
+  int linenr = 0;
+
+  while (std::getline(inf, line)) {
+    linenr++;
+
+    // Ignore comments and empty lines
+    if (line[0] == '#' || line.length() == 0)
+      continue;
+
+    // Match on key=value..
+    size_t d = line.find("=");
+    if (d == 0 || d == std::string::npos) {
+      std::cerr << now() << ": " << conf_file << ":" << linenr << ": " << line << ": ignoring invalid line (expect key=value): " << ::std::endl;
+      continue;
+    }
+
+    std::string key = line.substr(0, d);
+    std::string val = line.substr(d+1);
+
+    std::string errstr;
+    if (conf->set(key, val, errstr)) {
+      std::cerr << now() << ": " << conf_file << ":" << linenr << ": " << key << "=" << val << ": " << errstr << ": ignoring error" << std::endl;
+    } else {
+      std::cerr << now() << ": " << conf_file << ":" << linenr << ": " << key << "=" << val << ": applied to configuration" << std::endl;
+    }
+  }
+
+  inf.close();
+}
+
+
+
+
+int main (int argc, char **argv) {
+  std::string brokers = "localhost";
+  std::string errstr;
+  std::vector<std::string> topics;
+  std::string mode = "P";
+  int throughput = 0;
+  int32_t partition = RdKafka::Topic::PARTITION_UA;
+  MyHashPartitionerCb hash_partitioner;
+  int64_t create_time = -1;
+
+  std::cerr << now() << ": librdkafka version " << RdKafka::version_str() <<
+    " (" << RdKafka::version() << ")" << std::endl;
+
+  /*
+   * Create configuration objects
+   */
+  RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
+
+  /* Java VerifiableProducer defaults to acks=all */
+  if (conf->set("acks", "all", errstr)) {
+    std::cerr << now() << ": " << errstr << std::endl;
+    exit(1);
+  }
+
+  /* Avoid slow shutdown on error */
+  if (conf->set("message.timeout.ms", "60000", errstr)) {
+    std::cerr << now() << ": " << errstr << std::endl;
+    exit(1);
+  }
+
+  {
+    char hostname[128];
+    gethostname(hostname, sizeof(hostname)-1);
+    conf->set("client.id", std::string("rdkafka@") + hostname, errstr);
+  }
+
+  conf->set("log.thread.name", "true", errstr);
+
+  /* correct producer offsets */
+  conf->set("produce.offset.report", "true", errstr);
+
+  /* auto commit is explicitly enabled with --enable-autocommit */
+  conf->set("enable.auto.commit", "false", errstr);
+
+  /* keep protocol request timeouts under the watchdog timeout
+   * to make sure things like commitSync() dont fall victim to the watchdog. */
+  conf->set("socket.timeout.ms", "10000", errstr);
+
+  conf->set("fetch.wait.max.ms", "500", errstr);
+  conf->set("fetch.min.bytes", "4096", errstr);
+
+  for (int i = 1 ; i < argc ; i++) {
+    const char *name = argv[i];
+    const char *val = i+1 < argc ? argv[i+1] : NULL;
+
+    if (val && !strncmp(val, "-", 1))
+      val = NULL;
+
+    std::cout << now() << ": argument: " << name << " " <<
+        (val?val:"") << std::endl;
+
+    if (val) {
+      if (!strcmp(name, "--topic"))
+        topics.push_back(val);
+      else if (!strcmp(name, "--broker-list"))
+        brokers = val;
+      else if (!strcmp(name, "--max-messages"))
+        state.maxMessages = atoi(val);
+      else if (!strcmp(name, "--throughput"))
+        throughput = atoi(val);
+      else if (!strcmp(name, "--producer.config") ||
+               !strcmp(name, "--consumer.config"))
+        read_conf_file(conf, val);
+      else if (!strcmp(name, "--group-id"))
+        conf->set("group.id", val, errstr);
+      else if (!strcmp(name, "--session-timeout"))
+        conf->set("session.timeout.ms", val, errstr);
+      else if (!strcmp(name, "--reset-policy")) {
+        if (conf->set("auto.offset.reset", val, errstr)) {
+          std::cerr << now() << ": " << errstr << std::endl;
+          exit(1);
+        }
+      } else if (!strcmp(name, "--assignment-strategy")) {
+        /* The system tests pass the Java class name(s) rather than
+         * the configuration value. Fix it.
+         * "org.apache.kafka.clients.consumer.RangeAssignor,.." -> "range,.."
+         */
+        std::string s = val;
+        size_t pos;
+
+        while ((pos = s.find("org.apache.kafka.clients.consumer.")) !=
+               std::string::npos)
+          s.erase(pos, strlen("org.apache.kafka.clients.consumer."));
+
+        while ((pos = s.find("Assignor")) != std::string::npos)
+          s.erase(pos, strlen("Assignor"));
+
+        std::transform(s.begin(), s.end(), s.begin(), tolower);
+
+        std::cerr << now() << ": converted " << name << " "
+                  << val << " to " << s << std::endl;
+
+        if  (conf->set("partition.assignment.strategy", s.c_str(), errstr)) {
+          std::cerr << now() << ": " << errstr << std::endl;
+          exit(1);
+        }
+      } else if (!strcmp(name, "--value-prefix")) {
+        value_prefix = std::string(val) + ".";
+      } else if (!strcmp(name, "--acks")) {
+       if (conf->set("acks", val, errstr)) {
+         std::cerr << now() << ": " << errstr << std::endl;
+         exit(1);
+       }
+      } else if (!strcmp(name, "--message-create-time")) {
+       create_time = (int64_t)atoi(val);
+      } else if (!strcmp(name, "--debug")) {
+        conf->set("debug", val, errstr);
+      } else if (!strcmp(name, "-X")) {
+        char *s = strdup(val);
+        char *t = strchr(s, '=');
+        if (!t)
+          t = (char *)"";
+        else {
+          *t = '\0';
+          t++;
+        }
+        if (conf->set(s, t, errstr)) {
+          std::cerr << now() << ": " << errstr << std::endl;
+          exit(1);
+        }
+        free(s);
+      } else {
+        std::cerr << now() << ": Unknown option " << name << std::endl;
+        exit(1);
+      }
+
+      i++;
+
+    } else {
+      if (!strcmp(name, "--consumer"))
+        mode = "C";
+      else if (!strcmp(name, "--producer"))
+        mode = "P";
+      else if (!strcmp(name, "--enable-autocommit")) {
+        state.consumer.useAutoCommit = true;
+        conf->set("enable.auto.commit", "true", errstr);
+      } else if (!strcmp(name, "-v"))
+        verbosity++;
+      else if (!strcmp(name, "-q"))
+        verbosity--;
+      else {
+        std::cerr << now() << ": Unknown option or missing argument to " << name << std::endl;
+        exit(1);
+      }
+    }
+  }
+
+  if (topics.empty() || brokers.empty()) {
+    std::cerr << now() << ": Missing --topic and --broker-list" << std::endl;
+    exit(1);
+  }
+
+
+  /*
+   * Set configuration properties
+   */
+  conf->set("metadata.broker.list", brokers, errstr);
+
+  ExampleEventCb ex_event_cb;
+  conf->set("event_cb", &ex_event_cb, errstr);
+
+  signal(SIGINT, sigterm);
+  signal(SIGTERM, sigterm);
+  signal(SIGALRM,  sigwatchdog);
+
+
+  if (mode == "P") {
+    /*
+     * Producer mode
+     */
+
+    ExampleDeliveryReportCb ex_dr_cb;
+
+    /* Set delivery report callback */
+    conf->set("dr_cb", &ex_dr_cb, errstr);
+
+    /*
+     * Create producer using accumulated global configuration.
+     */
+    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
+    if (!producer) {
+      std::cerr << now() << ": Failed to create producer: " << errstr << std::endl;
+      exit(1);
+    }
+
+    std::cerr << now() << ": % Created producer " << producer->name() << std::endl;
+
+    /*
+     * Create topic handle.
+     */
+    RdKafka::Topic *topic = RdKafka::Topic::create(producer, topics[0],
+                                                   NULL, errstr);
+    if (!topic) {
+      std::cerr << now() << ": Failed to create topic: " << errstr << std::endl;
+      exit(1);
+    }
+
+    static const int delay_us = throughput ? 1000000/throughput : 10;
+
+    if (state.maxMessages == -1)
+      state.maxMessages = 1000000; /* Avoid infinite produce */
+
+    for (int i = 0 ; run && i < state.maxMessages ; i++) {
+      /*
+       * Produce message
+       */
+      std::ostringstream msg;
+      msg << value_prefix << i;
+      while (true) {
+        RdKafka::ErrorCode resp;
+       if (create_time == -1) {
+         resp = producer->produce(topic, partition,
+                                  RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
+                                  const_cast<char *>(msg.str().c_str()),
+                                  msg.str().size(), NULL, NULL);
+       } else {
+         resp = producer->produce(topics[0], partition,
+                                  RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
+                                  const_cast<char *>(msg.str().c_str()),
+                                  msg.str().size(),
+                                  NULL, 0,
+                                  create_time,
+                                  NULL);
+       }
+
+        if (resp == RdKafka::ERR__QUEUE_FULL) {
+          producer->poll(100);
+          continue;
+        } else if (resp != RdKafka::ERR_NO_ERROR) {
+          errorString("producer_send_error",
+                      RdKafka::err2str(resp), topic->name(), NULL, msg.str());
+          state.producer.numErr++;
+        } else {
+          state.producer.numSent++;
+        }
+        break;
+      }
+
+      producer->poll(delay_us / 1000);
+      usleep(1000);
+      watchdog_kick();
+    }
+    run = true;
+
+    while (run && producer->outq_len() > 0) {
+      std::cerr << now() << ": Waiting for " << producer->outq_len() << std::endl;
+      producer->poll(1000);
+      watchdog_kick();
+    }
+
+    std::cerr << now() << ": " << state.producer.numAcked << "/" <<
+        state.producer.numSent << "/" << state.maxMessages <<
+        " msgs acked/sent/max, " << state.producer.numErr <<
+        " errored" << std::endl;
+
+    delete topic;
+    delete producer;
+
+
+  } else if (mode == "C") {
+    /*
+     * Consumer mode
+     */
+
+    conf->set("auto.offset.reset", "smallest", errstr);
+
+    ExampleRebalanceCb ex_rebalance_cb;
+    conf->set("rebalance_cb", &ex_rebalance_cb, errstr);
+
+    conf->set("offset_commit_cb", &ex_offset_commit_cb, errstr);
+
+
+    /*
+     * Create consumer using accumulated global configuration.
+     */
+    consumer = RdKafka::KafkaConsumer::create(conf, errstr);
+    if (!consumer) {
+      std::cerr << now() << ": Failed to create consumer: " <<
+          errstr << std::endl;
+      exit(1);
+    }
+
+    std::cerr << now() << ": % Created consumer " << consumer->name() <<
+        std::endl;
+
+    /*
+     * Subscribe to topic(s)
+     */
+    RdKafka::ErrorCode resp = consumer->subscribe(topics);
+    if (resp != RdKafka::ERR_NO_ERROR) {
+      std::cerr << now() << ": Failed to subscribe to " << topics.size() << " topics: "
+                << RdKafka::err2str(resp) << std::endl;
+      exit(1);
+    }
+
+    watchdog_kick();
+
+    /*
+     * Consume messages
+     */
+    while (run) {
+      RdKafka::Message *msg = consumer->consume(500);
+      msg_consume(consumer, msg, NULL);
+      delete msg;
+      watchdog_kick();
+    }
+
+    std::cerr << now() << ": Final commit on termination" << std::endl;
+
+    /* Final commit */
+    do_commit(consumer, 1);
+
+    /*
+     * Stop consumer
+     */
+    consumer->close();
+
+    delete consumer;
+  }
+
+  std::cout << "{ \"name\": \"shutdown_complete\" }" << std::endl;
+
+  /*
+   * Wait for RdKafka to decommission.
+   * This is not strictly needed (when check outq_len() above), but
+   * allows RdKafka to clean up all its resources before the application
+   * exits so that memory profilers such as valgrind wont complain about
+   * memory leaks.
+   */
+  RdKafka::wait_destroyed(5000);
+
+  std::cerr << now() << ": EXITING WITH RETURN VALUE 0" << std::endl;
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/examples/rdkafka_consume_batch.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/examples/rdkafka_consume_batch.cpp b/thirdparty/librdkafka-0.11.4/examples/rdkafka_consume_batch.cpp
new file mode 100644
index 0000000..ea4a169
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/examples/rdkafka_consume_batch.cpp
@@ -0,0 +1,260 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2018, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Apache Kafka consumer & producer example programs
+ * using the Kafka driver from librdkafka
+ * (https://github.com/edenhill/librdkafka)
+ *
+ * This example shows how to read batches of messages.
+ * Note that messages are fetched from the broker in batches regardless
+ * of how the application polls messages from librdkafka, this example
+ * merely shows how to accumulate a set of messages in the application.
+ */
+
+#include <iostream>
+#include <string>
+#include <cstdlib>
+#include <cstdio>
+#include <csignal>
+#include <cstring>
+
+#ifndef _MSC_VER
+#include <sys/time.h>
+#endif
+
+#ifdef _MSC_VER
+#include "../win32/wingetopt.h"
+#include <atltime.h>
+#elif _AIX
+#include <unistd.h>
+#else
+#include <getopt.h>
+#include <unistd.h>
+#endif
+
+/*
+ * Typically include path in a real application would be
+ * #include <librdkafka/rdkafkacpp.h>
+ */
+#include "rdkafkacpp.h"
+
+
+
+static bool run = true;
+
+static void sigterm (int sig) {
+  run = false;
+}
+
+
+
+/**
+ * @returns the current wall-clock time in milliseconds
+ */
+static int64_t now () {
+#ifndef _MSC_VER
+        struct timeval tv;
+        gettimeofday(&tv, NULL);
+        return ((int64_t)tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+#else
+#error "now() not implemented for Windows, please submit a PR"
+#endif
+}
+
+
+
+/**
+ * @brief Accumulate a batch of \p batch_size messages, but wait
+ *        no longer than \p batch_tmout milliseconds.
+ */
+static std::vector<RdKafka::Message *>
+consume_batch (RdKafka::KafkaConsumer *consumer, size_t batch_size, int batch_tmout) {
+
+  std::vector<RdKafka::Message *> msgs;
+  msgs.reserve(batch_size);
+
+  int64_t end = now() + batch_tmout;
+  int remaining_timeout = batch_tmout;
+
+  while (msgs.size() < batch_size) {
+    RdKafka::Message *msg = consumer->consume(remaining_timeout);
+
+    switch (msg->err()) {
+    case RdKafka::ERR__TIMED_OUT:
+      delete msg;
+      return msgs;
+
+    case RdKafka::ERR_NO_ERROR:
+      msgs.push_back(msg);
+      break;
+
+    default:
+      std::cerr << "%% Consumer error: " << msg->errstr() << std::endl;
+      run = false;
+      delete msg;
+      return msgs;
+    }
+
+    remaining_timeout = end - now();
+    if (remaining_timeout < 0)
+      break;
+  }
+
+  return msgs;
+}
+
+
+int main (int argc, char **argv) {
+  std::string errstr;
+  std::string topic_str;
+  std::vector<std::string> topics;
+  int batch_size = 100;
+  int batch_tmout = 1000;
+
+  /* Create configuration objects */
+  RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
+
+  if (conf->set("enable.partition.eof", "false", errstr) != RdKafka::Conf::CONF_OK) {
+    std::cerr << errstr << std::endl;
+    exit(1);
+  }
+
+  /* Read command line arguments */
+  int opt;
+  while ((opt = getopt(argc, argv, "g:B:T::b:X:")) != -1) {
+    switch (opt) {
+    case 'g':
+      if (conf->set("group.id",  optarg, errstr) != RdKafka::Conf::CONF_OK) {
+        std::cerr << errstr << std::endl;
+        exit(1);
+      }
+      break;
+
+    case 'B':
+      batch_size = atoi(optarg);
+      break;
+
+    case 'T':
+      batch_tmout = atoi(optarg);
+      break;
+
+    case 'b':
+      if (conf->set("bootstrap.servers", optarg, errstr) != RdKafka::Conf::CONF_OK) {
+        std::cerr << errstr << std::endl;
+        exit(1);
+      }
+      break;
+
+    case 'X':
+      {
+        char *name, *val;
+
+        name = optarg;
+        if (!(val = strchr(name, '='))) {
+          std::cerr << "%% Expected -X property=value, not " <<
+              name << std::endl;
+          exit(1);
+        }
+
+        *val = '\0';
+        val++;
+
+        if (conf->set(name, val, errstr) != RdKafka::Conf::CONF_OK) {
+          std::cerr << errstr << std::endl;
+          exit(1);
+        }
+      }
+      break;
+
+    default:
+      goto usage;
+    }
+  }
+
+  /* Topics to consume */
+  for (; optind < argc ; optind++)
+    topics.push_back(std::string(argv[optind]));
+
+  if (topics.empty() || optind != argc) {
+  usage:
+    fprintf(stderr,
+            "Usage: %s -g <group-id> -B <batch-size> [options] topic1 topic2..\n"
+            "\n"
+            "librdkafka version %s (0x%08x)\n"
+            "\n"
+            " Options:\n"
+            "  -g <group-id>    Consumer group id\n"
+            "  -B <batch-size>  How many messages to batch (default: 100).\n"
+            "  -T <batch-tmout> How long to wait for batch-size to accumulate in milliseconds. (default 1000 ms)\n"
+            "  -b <brokers>    Broker address (localhost:9092)\n"
+            "  -X <prop=name>  Set arbitrary librdkafka configuration property\n"
+            "\n",
+            argv[0],
+            RdKafka::version_str().c_str(), RdKafka::version());
+        exit(1);
+  }
+
+
+  signal(SIGINT, sigterm);
+  signal(SIGTERM, sigterm);
+
+  /* Create consumer */
+  RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
+  if (!consumer) {
+    std::cerr << "Failed to create consumer: " << errstr << std::endl;
+    exit(1);
+  }
+
+  delete conf;
+
+  /* Subscribe to topics */
+  RdKafka::ErrorCode err = consumer->subscribe(topics);
+  if (err) {
+    std::cerr << "Failed to subscribe to " << topics.size() << " topics: "
+              << RdKafka::err2str(err) << std::endl;
+    exit(1);
+  }
+
+  /* Consume messages in batches of \p batch_size */
+  while (run) {
+    auto msgs = consume_batch(consumer, batch_size, batch_tmout);
+    std::cout << "Accumulated " << msgs.size() << " messages:" << std::endl;
+
+    for (auto &msg : msgs) {
+      std::cout << " Message in " << msg->topic_name() << " [" << msg->partition() << "] at offset " << msg->offset() << std::endl;
+      delete msg;
+    }
+  }
+
+  /* Close and destroy consumer */
+  consumer->close();
+  delete consumer;
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/examples/rdkafka_consumer_example.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/examples/rdkafka_consumer_example.c b/thirdparty/librdkafka-0.11.4/examples/rdkafka_consumer_example.c
new file mode 100644
index 0000000..3896df8
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/examples/rdkafka_consumer_example.c
@@ -0,0 +1,624 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2015, Magnus Edenhill
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met: 
+ * 
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer. 
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution. 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Apache Kafka high level consumer example program
+ * using the Kafka driver from librdkafka
+ * (https://github.com/edenhill/librdkafka)
+ */
+
+#include <ctype.h>
+#include <signal.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <syslog.h>
+#include <sys/time.h>
+#include <errno.h>
+#include <getopt.h>
+
+/* Typical include path would be <librdkafka/rdkafka.h>, but this program
+ * is builtin from within the librdkafka source tree and thus differs. */
+#include "rdkafka.h"  /* for Kafka driver */
+
+
+static int run = 1;
+static rd_kafka_t *rk;
+static int exit_eof = 0;
+static int wait_eof = 0;  /* number of partitions awaiting EOF */
+static int quiet = 0;
+static 	enum {
+	OUTPUT_HEXDUMP,
+	OUTPUT_RAW,
+} output = OUTPUT_HEXDUMP;
+
+static void stop (int sig) {
+        if (!run)
+                exit(1);
+	run = 0;
+	fclose(stdin); /* abort fgets() */
+}
+
+
+static void hexdump (FILE *fp, const char *name, const void *ptr, size_t len) {
+	const char *p = (const char *)ptr;
+	unsigned int of = 0;
+
+
+	if (name)
+		fprintf(fp, "%s hexdump (%zd bytes):\n", name, len);
+
+	for (of = 0 ; of < len ; of += 16) {
+		char hexen[16*3+1];
+		char charen[16+1];
+		int hof = 0;
+
+		int cof = 0;
+		int i;
+
+		for (i = of ; i < (int)of + 16 && i < (int)len ; i++) {
+			hof += sprintf(hexen+hof, "%02x ", p[i] & 0xff);
+			cof += sprintf(charen+cof, "%c",
+				       isprint((int)p[i]) ? p[i] : '.');
+		}
+		fprintf(fp, "%08x: %-48s %-16s\n",
+			of, hexen, charen);
+	}
+}
+
+/**
+ * Kafka logger callback (optional)
+ */
+static void logger (const rd_kafka_t *rk, int level,
+		    const char *fac, const char *buf) {
+	struct timeval tv;
+	gettimeofday(&tv, NULL);
+	fprintf(stdout, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
+		(int)tv.tv_sec, (int)(tv.tv_usec / 1000),
+		level, fac, rd_kafka_name(rk), buf);
+}
+
+
+
+/**
+ * Handle and print a consumed message.
+ * Internally crafted messages are also used to propagate state from
+ * librdkafka to the application. The application needs to check
+ * the `rkmessage->err` field for this purpose.
+ */
+static void msg_consume (rd_kafka_message_t *rkmessage) {
+	if (rkmessage->err) {
+		if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
+			fprintf(stderr,
+				"%% Consumer reached end of %s [%"PRId32"] "
+			       "message queue at offset %"PRId64"\n",
+			       rd_kafka_topic_name(rkmessage->rkt),
+			       rkmessage->partition, rkmessage->offset);
+
+			if (exit_eof && --wait_eof == 0) {
+                                fprintf(stderr,
+                                        "%% All partition(s) reached EOF: "
+                                        "exiting\n");
+				run = 0;
+                        }
+
+			return;
+		}
+
+                if (rkmessage->rkt)
+                        fprintf(stderr, "%% Consume error for "
+                                "topic \"%s\" [%"PRId32"] "
+                                "offset %"PRId64": %s\n",
+                                rd_kafka_topic_name(rkmessage->rkt),
+                                rkmessage->partition,
+                                rkmessage->offset,
+                                rd_kafka_message_errstr(rkmessage));
+                else
+                        fprintf(stderr, "%% Consumer error: %s: %s\n",
+                                rd_kafka_err2str(rkmessage->err),
+                                rd_kafka_message_errstr(rkmessage));
+
+                if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
+                    rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
+                        run = 0;
+		return;
+	}
+
+	if (!quiet)
+		fprintf(stdout, "%% Message (topic %s [%"PRId32"], "
+                        "offset %"PRId64", %zd bytes):\n",
+                        rd_kafka_topic_name(rkmessage->rkt),
+                        rkmessage->partition,
+			rkmessage->offset, rkmessage->len);
+
+	if (rkmessage->key_len) {
+		if (output == OUTPUT_HEXDUMP)
+			hexdump(stdout, "Message Key",
+				rkmessage->key, rkmessage->key_len);
+		else
+			printf("Key: %.*s\n",
+			       (int)rkmessage->key_len, (char *)rkmessage->key);
+	}
+
+	if (output == OUTPUT_HEXDUMP)
+		hexdump(stdout, "Message Payload",
+			rkmessage->payload, rkmessage->len);
+	else
+		printf("%.*s\n",
+		       (int)rkmessage->len, (char *)rkmessage->payload);
+}
+
+
+static void print_partition_list (FILE *fp,
+                                  const rd_kafka_topic_partition_list_t
+                                  *partitions) {
+        int i;
+        for (i = 0 ; i < partitions->cnt ; i++) {
+                fprintf(stderr, "%s %s [%"PRId32"] offset %"PRId64,
+                        i > 0 ? ",":"",
+                        partitions->elems[i].topic,
+                        partitions->elems[i].partition,
+			partitions->elems[i].offset);
+        }
+        fprintf(stderr, "\n");
+
+}
+static void rebalance_cb (rd_kafka_t *rk,
+                          rd_kafka_resp_err_t err,
+			  rd_kafka_topic_partition_list_t *partitions,
+                          void *opaque) {
+
+	fprintf(stderr, "%% Consumer group rebalanced: ");
+
+	switch (err)
+	{
+	case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
+		fprintf(stderr, "assigned:\n");
+		print_partition_list(stderr, partitions);
+		rd_kafka_assign(rk, partitions);
+		wait_eof += partitions->cnt;
+		break;
+
+	case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
+		fprintf(stderr, "revoked:\n");
+		print_partition_list(stderr, partitions);
+		rd_kafka_assign(rk, NULL);
+		wait_eof = 0;
+		break;
+
+	default:
+		fprintf(stderr, "failed: %s\n",
+                        rd_kafka_err2str(err));
+                rd_kafka_assign(rk, NULL);
+		break;
+	}
+}
+
+
+static int describe_groups (rd_kafka_t *rk, const char *group) {
+        rd_kafka_resp_err_t err;
+        const struct rd_kafka_group_list *grplist;
+        int i;
+
+        err = rd_kafka_list_groups(rk, group, &grplist, 10000);
+
+        if (err) {
+                fprintf(stderr, "%% Failed to acquire group list: %s\n",
+                        rd_kafka_err2str(err));
+                return -1;
+        }
+
+        for (i = 0 ; i < grplist->group_cnt ; i++) {
+                const struct rd_kafka_group_info *gi = &grplist->groups[i];
+                int j;
+
+                printf("Group \"%s\" in state %s on broker %d (%s:%d)\n",
+                       gi->group, gi->state,
+                       gi->broker.id, gi->broker.host, gi->broker.port);
+                if (gi->err)
+                        printf(" Error: %s\n", rd_kafka_err2str(gi->err));
+                printf(" Protocol type \"%s\", protocol \"%s\", "
+                       "with %d member(s):\n",
+                       gi->protocol_type, gi->protocol, gi->member_cnt);
+
+                for (j = 0 ; j < gi->member_cnt ; j++) {
+                        const struct rd_kafka_group_member_info *mi;
+                        mi = &gi->members[j];
+
+                        printf("  \"%s\", client id \"%s\" on host %s\n",
+                               mi->member_id, mi->client_id, mi->client_host);
+                        printf("    metadata: %d bytes\n",
+                               mi->member_metadata_size);
+                        printf("    assignment: %d bytes\n",
+                               mi->member_assignment_size);
+                }
+                printf("\n");
+        }
+
+        if (group && !grplist->group_cnt)
+                fprintf(stderr, "%% No matching group (%s)\n", group);
+
+        rd_kafka_group_list_destroy(grplist);
+
+        return 0;
+}
+
+
+
+static void sig_usr1 (int sig) {
+	rd_kafka_dump(stdout, rk);
+}
+
+int main (int argc, char **argv) {
+        char mode = 'C';
+	char *brokers = "localhost:9092";
+	int opt;
+	rd_kafka_conf_t *conf;
+	rd_kafka_topic_conf_t *topic_conf;
+	char errstr[512];
+	const char *debug = NULL;
+	int do_conf_dump = 0;
+	char tmp[16];
+        rd_kafka_resp_err_t err;
+        char *group = NULL;
+        rd_kafka_topic_partition_list_t *topics;
+        int is_subscription;
+        int i;
+
+	quiet = !isatty(STDIN_FILENO);
+
+	/* Kafka configuration */
+	conf = rd_kafka_conf_new();
+
+        /* Set logger */
+        rd_kafka_conf_set_log_cb(conf, logger);
+
+	/* Quick termination */
+	snprintf(tmp, sizeof(tmp), "%i", SIGIO);
+	rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
+
+	/* Topic configuration */
+	topic_conf = rd_kafka_topic_conf_new();
+
+	while ((opt = getopt(argc, argv, "g:b:qd:eX:ADO")) != -1) {
+		switch (opt) {
+		case 'b':
+			brokers = optarg;
+			break;
+                case 'g':
+                        group = optarg;
+                        break;
+		case 'e':
+			exit_eof = 1;
+			break;
+		case 'd':
+			debug = optarg;
+			break;
+		case 'q':
+			quiet = 1;
+			break;
+		case 'A':
+			output = OUTPUT_RAW;
+			break;
+		case 'X':
+		{
+			char *name, *val;
+			rd_kafka_conf_res_t res;
+
+			if (!strcmp(optarg, "list") ||
+			    !strcmp(optarg, "help")) {
+				rd_kafka_conf_properties_show(stdout);
+				exit(0);
+			}
+
+			if (!strcmp(optarg, "dump")) {
+				do_conf_dump = 1;
+				continue;
+			}
+
+			name = optarg;
+			if (!(val = strchr(name, '='))) {
+				fprintf(stderr, "%% Expected "
+					"-X property=value, not %s\n", name);
+				exit(1);
+			}
+
+			*val = '\0';
+			val++;
+
+			res = RD_KAFKA_CONF_UNKNOWN;
+			/* Try "topic." prefixed properties on topic
+			 * conf first, and then fall through to global if
+			 * it didnt match a topic configuration property. */
+			if (!strncmp(name, "topic.", strlen("topic.")))
+				res = rd_kafka_topic_conf_set(topic_conf,
+							      name+
+							      strlen("topic."),
+							      val,
+							      errstr,
+							      sizeof(errstr));
+
+			if (res == RD_KAFKA_CONF_UNKNOWN)
+				res = rd_kafka_conf_set(conf, name, val,
+							errstr, sizeof(errstr));
+
+			if (res != RD_KAFKA_CONF_OK) {
+				fprintf(stderr, "%% %s\n", errstr);
+				exit(1);
+			}
+		}
+		break;
+
+                case 'D':
+                case 'O':
+                        mode = opt;
+                        break;
+
+		default:
+			goto usage;
+		}
+	}
+
+
+	if (do_conf_dump) {
+		const char **arr;
+		size_t cnt;
+		int pass;
+
+		for (pass = 0 ; pass < 2 ; pass++) {
+			if (pass == 0) {
+				arr = rd_kafka_conf_dump(conf, &cnt);
+				printf("# Global config\n");
+			} else {
+				printf("# Topic config\n");
+				arr = rd_kafka_topic_conf_dump(topic_conf,
+							       &cnt);
+			}
+
+			for (i = 0 ; i < (int)cnt ; i += 2)
+				printf("%s = %s\n",
+				       arr[i], arr[i+1]);
+
+			printf("\n");
+
+			rd_kafka_conf_dump_free(arr, cnt);
+		}
+
+		exit(0);
+	}
+
+
+	if (strchr("OC", mode) && optind == argc) {
+	usage:
+		fprintf(stderr,
+			"Usage: %s [options] <topic[:part]> <topic[:part]>..\n"
+			"\n"
+			"librdkafka version %s (0x%08x)\n"
+			"\n"
+			" Options:\n"
+                        "  -g <group>      Consumer group (%s)\n"
+			"  -b <brokers>    Broker address (%s)\n"
+			"  -e              Exit consumer when last message\n"
+			"                  in partition has been received.\n"
+                        "  -D              Describe group.\n"
+                        "  -O              Get commmitted offset(s)\n"
+			"  -d [facs..]     Enable debugging contexts:\n"
+			"                  %s\n"
+			"  -q              Be quiet\n"
+			"  -A              Raw payload output (consumer)\n"
+			"  -X <prop=name> Set arbitrary librdkafka "
+			"configuration property\n"
+			"               Properties prefixed with \"topic.\" "
+			"will be set on topic object.\n"
+			"               Use '-X list' to see the full list\n"
+			"               of supported properties.\n"
+			"\n"
+                        "For balanced consumer groups use the 'topic1 topic2..'"
+                        " format\n"
+                        "and for static assignment use "
+                        "'topic1:part1 topic1:part2 topic2:part1..'\n"
+			"\n",
+			argv[0],
+			rd_kafka_version_str(), rd_kafka_version(),
+                        group, brokers,
+			RD_KAFKA_DEBUG_CONTEXTS);
+		exit(1);
+	}
+
+
+	signal(SIGINT, stop);
+	signal(SIGUSR1, sig_usr1);
+
+	if (debug &&
+	    rd_kafka_conf_set(conf, "debug", debug, errstr, sizeof(errstr)) !=
+	    RD_KAFKA_CONF_OK) {
+		fprintf(stderr, "%% Debug configuration failed: %s: %s\n",
+			errstr, debug);
+		exit(1);
+	}
+
+        /*
+         * Client/Consumer group
+         */
+
+        if (strchr("CO", mode)) {
+                /* Consumer groups require a group id */
+                if (!group)
+                        group = "rdkafka_consumer_example";
+                if (rd_kafka_conf_set(conf, "group.id", group,
+                                      errstr, sizeof(errstr)) !=
+                    RD_KAFKA_CONF_OK) {
+                        fprintf(stderr, "%% %s\n", errstr);
+                        exit(1);
+                }
+
+                /* Consumer groups always use broker based offset storage */
+                if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method",
+                                            "broker",
+                                            errstr, sizeof(errstr)) !=
+                    RD_KAFKA_CONF_OK) {
+                        fprintf(stderr, "%% %s\n", errstr);
+                        exit(1);
+                }
+
+                /* Set default topic config for pattern-matched topics. */
+                rd_kafka_conf_set_default_topic_conf(conf, topic_conf);
+
+                /* Callback called on partition assignment changes */
+                rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
+        }
+
+        /* Create Kafka handle */
+        if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
+                                errstr, sizeof(errstr)))) {
+                fprintf(stderr,
+                        "%% Failed to create new consumer: %s\n",
+                        errstr);
+                exit(1);
+        }
+
+        /* Add brokers */
+        if (rd_kafka_brokers_add(rk, brokers) == 0) {
+                fprintf(stderr, "%% No valid brokers specified\n");
+                exit(1);
+        }
+
+
+        if (mode == 'D') {
+                int r;
+                /* Describe groups */
+                r = describe_groups(rk, group);
+
+                rd_kafka_destroy(rk);
+                exit(r == -1 ? 1 : 0);
+        }
+
+        /* Redirect rd_kafka_poll() to consumer_poll() */
+        rd_kafka_poll_set_consumer(rk);
+
+        topics = rd_kafka_topic_partition_list_new(argc - optind);
+        is_subscription = 1;
+        for (i = optind ; i < argc ; i++) {
+                /* Parse "topic[:part] */
+                char *topic = argv[i];
+                char *t;
+                int32_t partition = -1;
+
+                if ((t = strstr(topic, ":"))) {
+                        *t = '\0';
+                        partition = atoi(t+1);
+                        is_subscription = 0; /* is assignment */
+                        wait_eof++;
+                }
+
+                rd_kafka_topic_partition_list_add(topics, topic, partition);
+        }
+
+        if (mode == 'O') {
+                /* Offset query */
+
+                err = rd_kafka_committed(rk, topics, 5000);
+                if (err) {
+                        fprintf(stderr, "%% Failed to fetch offsets: %s\n",
+                                rd_kafka_err2str(err));
+                        exit(1);
+                }
+
+                for (i = 0 ; i < topics->cnt ; i++) {
+                        rd_kafka_topic_partition_t *p = &topics->elems[i];
+                        printf("Topic \"%s\" partition %"PRId32,
+                               p->topic, p->partition);
+                        if (p->err)
+                                printf(" error %s",
+                                       rd_kafka_err2str(p->err));
+                        else {
+                                printf(" offset %"PRId64"",
+                                       p->offset);
+
+                                if (p->metadata_size)
+                                        printf(" (%d bytes of metadata)",
+                                               (int)p->metadata_size);
+                        }
+                        printf("\n");
+                }
+
+                goto done;
+        }
+
+
+        if (is_subscription) {
+                fprintf(stderr, "%% Subscribing to %d topics\n", topics->cnt);
+
+                if ((err = rd_kafka_subscribe(rk, topics))) {
+                        fprintf(stderr,
+                                "%% Failed to start consuming topics: %s\n",
+                                rd_kafka_err2str(err));
+                        exit(1);
+                }
+        } else {
+                fprintf(stderr, "%% Assigning %d partitions\n", topics->cnt);
+
+                if ((err = rd_kafka_assign(rk, topics))) {
+                        fprintf(stderr,
+                                "%% Failed to assign partitions: %s\n",
+                                rd_kafka_err2str(err));
+                }
+        }
+
+        while (run) {
+                rd_kafka_message_t *rkmessage;
+
+                rkmessage = rd_kafka_consumer_poll(rk, 1000);
+                if (rkmessage) {
+                        msg_consume(rkmessage);
+                        rd_kafka_message_destroy(rkmessage);
+                }
+        }
+
+done:
+        err = rd_kafka_consumer_close(rk);
+        if (err)
+                fprintf(stderr, "%% Failed to close consumer: %s\n",
+                        rd_kafka_err2str(err));
+        else
+                fprintf(stderr, "%% Consumer closed\n");
+
+        rd_kafka_topic_partition_list_destroy(topics);
+
+        /* Destroy handle */
+        rd_kafka_destroy(rk);
+
+	/* Let background threads clean up and terminate cleanly. */
+	run = 5;
+	while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
+		printf("Waiting for librdkafka to decommission\n");
+	if (run <= 0)
+		rd_kafka_dump(stdout, rk);
+
+	return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/examples/rdkafka_consumer_example.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/examples/rdkafka_consumer_example.cpp b/thirdparty/librdkafka-0.11.4/examples/rdkafka_consumer_example.cpp
new file mode 100644
index 0000000..83da691
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/examples/rdkafka_consumer_example.cpp
@@ -0,0 +1,485 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2014, Magnus Edenhill
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met: 
+ * 
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer. 
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution. 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Apache Kafka consumer & producer example programs
+ * using the Kafka driver from librdkafka
+ * (https://github.com/edenhill/librdkafka)
+ */
+
+#include <iostream>
+#include <string>
+#include <cstdlib>
+#include <cstdio>
+#include <csignal>
+#include <cstring>
+
+#ifndef _MSC_VER
+#include <sys/time.h>
+#endif
+
+#ifdef _MSC_VER
+#include "../win32/wingetopt.h"
+#include <atltime.h>
+#elif _AIX
+#include <unistd.h>
+#else
+#include <getopt.h>
+#include <unistd.h>
+#endif
+
+/*
+ * Typically include path in a real application would be
+ * #include <librdkafka/rdkafkacpp.h>
+ */
+#include "rdkafkacpp.h"
+
+
+
+static bool run = true;
+static bool exit_eof = false;
+static int eof_cnt = 0;
+static int partition_cnt = 0;
+static int verbosity = 1;
+static long msg_cnt = 0;
+static int64_t msg_bytes = 0;
+static void sigterm (int sig) {
+  run = false;
+}
+
+
+/**
+ * @brief format a string timestamp from the current time
+ */
+static void print_time () {
+#ifndef _MSC_VER
+        struct timeval tv;
+        char buf[64];
+        gettimeofday(&tv, NULL);
+        strftime(buf, sizeof(buf) - 1, "%Y-%m-%d %H:%M:%S", localtime(&tv.tv_sec));
+        fprintf(stderr, "%s.%03d: ", buf, (int)(tv.tv_usec / 1000));
+#else
+        std::wcerr << CTime::GetCurrentTime().Format(_T("%Y-%m-%d %H:%M:%S")).GetString()
+                << ": ";
+#endif
+}
+class ExampleEventCb : public RdKafka::EventCb {
+ public:
+  void event_cb (RdKafka::Event &event) {
+
+    print_time();
+
+    switch (event.type())
+    {
+      case RdKafka::Event::EVENT_ERROR:
+        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
+            event.str() << std::endl;
+        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
+          run = false;
+        break;
+
+      case RdKafka::Event::EVENT_STATS:
+        std::cerr << "\"STATS\": " << event.str() << std::endl;
+        break;
+
+      case RdKafka::Event::EVENT_LOG:
+        fprintf(stderr, "LOG-%i-%s: %s\n",
+                event.severity(), event.fac().c_str(), event.str().c_str());
+        break;
+
+      case RdKafka::Event::EVENT_THROTTLE:
+	std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " <<
+	  event.broker_name() << " id " << (int)event.broker_id() << std::endl;
+	break;
+
+      default:
+        std::cerr << "EVENT " << event.type() <<
+            " (" << RdKafka::err2str(event.err()) << "): " <<
+            event.str() << std::endl;
+        break;
+    }
+  }
+};
+
+
+class ExampleRebalanceCb : public RdKafka::RebalanceCb {
+private:
+  static void part_list_print (const std::vector<RdKafka::TopicPartition*>&partitions){
+    for (unsigned int i = 0 ; i < partitions.size() ; i++)
+      std::cerr << partitions[i]->topic() <<
+	"[" << partitions[i]->partition() << "], ";
+    std::cerr << "\n";
+  }
+
+public:
+  void rebalance_cb (RdKafka::KafkaConsumer *consumer,
+		     RdKafka::ErrorCode err,
+                     std::vector<RdKafka::TopicPartition*> &partitions) {
+    std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
+
+    part_list_print(partitions);
+
+    if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
+      consumer->assign(partitions);
+      partition_cnt = (int)partitions.size();
+    } else {
+      consumer->unassign();
+      partition_cnt = 0;
+    }
+    eof_cnt = 0;
+  }
+};
+
+
+void msg_consume(RdKafka::Message* message, void* opaque) {
+  switch (message->err()) {
+    case RdKafka::ERR__TIMED_OUT:
+      break;
+
+    case RdKafka::ERR_NO_ERROR:
+      /* Real message */
+      msg_cnt++;
+      msg_bytes += message->len();
+      if (verbosity >= 3)
+        std::cerr << "Read msg at offset " << message->offset() << std::endl;
+      RdKafka::MessageTimestamp ts;
+      ts = message->timestamp();
+      if (verbosity >= 2 &&
+	  ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {
+	std::string tsname = "?";
+	if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME)
+	  tsname = "create time";
+        else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME)
+          tsname = "log append time";
+        std::cout << "Timestamp: " << tsname << " " << ts.timestamp << std::endl;
+      }
+      if (verbosity >= 2 && message->key()) {
+        std::cout << "Key: " << *message->key() << std::endl;
+      }
+      if (verbosity >= 1) {
+        printf("%.*s\n",
+               static_cast<int>(message->len()),
+               static_cast<const char *>(message->payload()));
+      }
+      break;
+
+    case RdKafka::ERR__PARTITION_EOF:
+      /* Last message */
+      if (exit_eof && ++eof_cnt == partition_cnt) {
+        std::cerr << "%% EOF reached for all " << partition_cnt <<
+            " partition(s)" << std::endl;
+        run = false;
+      }
+      break;
+
+    case RdKafka::ERR__UNKNOWN_TOPIC:
+    case RdKafka::ERR__UNKNOWN_PARTITION:
+      std::cerr << "Consume failed: " << message->errstr() << std::endl;
+      run = false;
+      break;
+
+    default:
+      /* Errors */
+      std::cerr << "Consume failed: " << message->errstr() << std::endl;
+      run = false;
+  }
+}
+
+
+class ExampleConsumeCb : public RdKafka::ConsumeCb {
+ public:
+  void consume_cb (RdKafka::Message &msg, void *opaque) {
+    msg_consume(&msg, opaque);
+  }
+};
+
+
+
+int main (int argc, char **argv) {
+  std::string brokers = "localhost";
+  std::string errstr;
+  std::string topic_str;
+  std::string mode;
+  std::string debug;
+  std::vector<std::string> topics;
+  bool do_conf_dump = false;
+  int opt;
+  int use_ccb = 0;
+
+  /*
+   * Create configuration objects
+   */
+  RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
+  RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
+
+  ExampleRebalanceCb ex_rebalance_cb;
+  conf->set("rebalance_cb", &ex_rebalance_cb, errstr);
+
+  while ((opt = getopt(argc, argv, "g:b:z:qd:eX:AM:f:qv")) != -1) {
+    switch (opt) {
+    case 'g':
+      if (conf->set("group.id",  optarg, errstr) != RdKafka::Conf::CONF_OK) {
+        std::cerr << errstr << std::endl;
+        exit(1);
+      }
+      break;
+    case 'b':
+      brokers = optarg;
+      break;
+    case 'z':
+      if (conf->set("compression.codec", optarg, errstr) !=
+	  RdKafka::Conf::CONF_OK) {
+	std::cerr << errstr << std::endl;
+	exit(1);
+      }
+      break;
+    case 'e':
+      exit_eof = true;
+      break;
+    case 'd':
+      debug = optarg;
+      break;
+    case 'M':
+      if (conf->set("statistics.interval.ms", optarg, errstr) !=
+          RdKafka::Conf::CONF_OK) {
+        std::cerr << errstr << std::endl;
+        exit(1);
+      }
+      break;
+    case 'X':
+      {
+	char *name, *val;
+
+	if (!strcmp(optarg, "dump")) {
+	  do_conf_dump = true;
+	  continue;
+	}
+
+	name = optarg;
+	if (!(val = strchr(name, '='))) {
+          std::cerr << "%% Expected -X property=value, not " <<
+              name << std::endl;
+	  exit(1);
+	}
+
+	*val = '\0';
+	val++;
+
+	/* Try "topic." prefixed properties on topic
+	 * conf first, and then fall through to global if
+	 * it didnt match a topic configuration property. */
+        RdKafka::Conf::ConfResult res = RdKafka::Conf::CONF_UNKNOWN;
+	if (!strncmp(name, "topic.", strlen("topic.")))
+          res = tconf->set(name+strlen("topic."), val, errstr);
+        if (res == RdKafka::Conf::CONF_UNKNOWN)
+	  res = conf->set(name, val, errstr);
+
+	if (res != RdKafka::Conf::CONF_OK) {
+          std::cerr << errstr << std::endl;
+	  exit(1);
+	}
+      }
+      break;
+
+      case 'f':
+        if (!strcmp(optarg, "ccb"))
+          use_ccb = 1;
+        else {
+          std::cerr << "Unknown option: " << optarg << std::endl;
+          exit(1);
+        }
+        break;
+
+      case 'q':
+        verbosity--;
+        break;
+
+      case 'v':
+        verbosity++;
+        break;
+
+    default:
+      goto usage;
+    }
+  }
+
+  for (; optind < argc ; optind++)
+    topics.push_back(std::string(argv[optind]));
+
+  if (topics.empty() || optind != argc) {
+  usage:
+    fprintf(stderr,
+            "Usage: %s -g <group-id> [options] topic1 topic2..\n"
+            "\n"
+            "librdkafka version %s (0x%08x)\n"
+            "\n"
+            " Options:\n"
+            "  -g <group-id>   Consumer group id\n"
+            "  -b <brokers>    Broker address (localhost:9092)\n"
+            "  -z <codec>      Enable compression:\n"
+            "                  none|gzip|snappy\n"
+            "  -e              Exit consumer when last message\n"
+            "                  in partition has been received.\n"
+            "  -d [facs..]     Enable debugging contexts:\n"
+            "                  %s\n"
+            "  -M <intervalms> Enable statistics\n"
+            "  -X <prop=name>  Set arbitrary librdkafka "
+            "configuration property\n"
+            "                  Properties prefixed with \"topic.\" "
+            "will be set on topic object.\n"
+            "                  Use '-X list' to see the full list\n"
+            "                  of supported properties.\n"
+            "  -f <flag>       Set option:\n"
+            "                     ccb - use consume_callback\n"
+            "  -q              Quiet / Decrease verbosity\n"
+            "  -v              Increase verbosity\n"
+            "\n"
+            "\n",
+	    argv[0],
+	    RdKafka::version_str().c_str(), RdKafka::version(),
+	    RdKafka::get_debug_contexts().c_str());
+	exit(1);
+  }
+
+
+  /*
+   * Set configuration properties
+   */
+  conf->set("metadata.broker.list", brokers, errstr);
+
+  if (!debug.empty()) {
+    if (conf->set("debug", debug, errstr) != RdKafka::Conf::CONF_OK) {
+      std::cerr << errstr << std::endl;
+      exit(1);
+    }
+  }
+
+  ExampleConsumeCb ex_consume_cb;
+
+  if(use_ccb) {
+    conf->set("consume_cb", &ex_consume_cb, errstr);
+  }
+
+  ExampleEventCb ex_event_cb;
+  conf->set("event_cb", &ex_event_cb, errstr);
+
+  if (do_conf_dump) {
+    int pass;
+
+    for (pass = 0 ; pass < 2 ; pass++) {
+      std::list<std::string> *dump;
+      if (pass == 0) {
+        dump = conf->dump();
+        std::cout << "# Global config" << std::endl;
+      } else {
+        dump = tconf->dump();
+        std::cout << "# Topic config" << std::endl;
+      }
+
+      for (std::list<std::string>::iterator it = dump->begin();
+           it != dump->end(); ) {
+        std::cout << *it << " = ";
+        it++;
+        std::cout << *it << std::endl;
+        it++;
+      }
+      std::cout << std::endl;
+    }
+    exit(0);
+  }
+
+  conf->set("default_topic_conf", tconf, errstr);
+  delete tconf;
+
+  signal(SIGINT, sigterm);
+  signal(SIGTERM, sigterm);
+
+
+  /*
+   * Consumer mode
+   */
+
+  /*
+   * Create consumer using accumulated global configuration.
+   */
+  RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
+  if (!consumer) {
+    std::cerr << "Failed to create consumer: " << errstr << std::endl;
+    exit(1);
+  }
+
+  delete conf;
+
+  std::cout << "% Created consumer " << consumer->name() << std::endl;
+
+
+  /*
+   * Subscribe to topics
+   */
+  RdKafka::ErrorCode err = consumer->subscribe(topics);
+  if (err) {
+    std::cerr << "Failed to subscribe to " << topics.size() << " topics: "
+              << RdKafka::err2str(err) << std::endl;
+    exit(1);
+  }
+
+  /*
+   * Consume messages
+   */
+  while (run) {
+    RdKafka::Message *msg = consumer->consume(1000);
+    if (!use_ccb) {
+      msg_consume(msg, NULL);
+    }
+    delete msg;
+  }
+
+#ifndef _MSC_VER
+  alarm(10);
+#endif
+
+  /*
+   * Stop consumer
+   */
+  consumer->close();
+  delete consumer;
+
+  std::cerr << "% Consumed " << msg_cnt << " messages ("
+            << msg_bytes << " bytes)" << std::endl;
+
+  /*
+   * Wait for RdKafka to decommission.
+   * This is not strictly needed (with check outq_len() above), but
+   * allows RdKafka to clean up all its resources before the application
+   * exits so that memory profilers such as valgrind wont complain about
+   * memory leaks.
+   */
+  RdKafka::wait_destroyed(5000);
+
+  return 0;
+}