You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:34 UTC

[06/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade to librdkafka 0.11.4

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/examples/rdkafka_performance.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/examples/rdkafka_performance.c b/thirdparty/librdkafka-0.11.4/examples/rdkafka_performance.c
new file mode 100644
index 0000000..bceb23b
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/examples/rdkafka_performance.c
@@ -0,0 +1,1651 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012, Magnus Edenhill
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met: 
+ * 
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer. 
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution. 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Apache Kafka consumer & producer performance tester
+ * using the Kafka driver from librdkafka
+ * (https://github.com/edenhill/librdkafka)
+ */
+
+#ifdef _MSC_VER
+#define  _CRT_SECURE_NO_WARNINGS /* Silence nonsense on MSVC */
+#endif
+
+#include "../src/rd.h"
+
+#define _GNU_SOURCE /* for strndup() */
+#include <ctype.h>
+#include <signal.h>
+#include <string.h>
+#include <errno.h>
+
+/* Typical include path would be <librdkafka/rdkafka.h>, but this program
+ * is built from within the librdkafka source tree and thus differs. */
+#include "rdkafka.h"  /* for Kafka driver */
+/* Do not include these defines from your program, they will not be
+ * provided by librdkafka. */
+#include "rd.h"
+#include "rdtime.h"
+
+#ifdef _MSC_VER
+#include "../win32/wingetopt.h"
+#include "../win32/wintime.h"
+#endif
+
+
+static int run = 1;
+static int forever = 1;
+static rd_ts_t dispintvl = 1000;
+static int do_seq = 0;
+static int exit_after = 0;
+static int exit_eof = 0;
+static FILE *stats_fp;
+static int dr_disp_div;
+static int verbosity = 1;
+static int latency_mode = 0;
+static int report_offset = 0;
+static FILE *latency_fp = NULL;
+static int msgcnt = -1;
+static int incremental_mode = 0;
+static int partition_cnt = 0;
+static int eof_cnt = 0;
+static int with_dr = 1;
+static int read_hdrs = 0;
+
+
+static void stop (int sig) {
+        if (!run)
+                exit(0);
+	run = 0;
+}
+
+static long int msgs_wait_cnt = 0;
+static long int msgs_wait_produce_cnt = 0;
+static rd_ts_t t_end;
+static rd_kafka_t *global_rk;
+
+struct avg {
+        int64_t  val;
+        int      cnt;
+        uint64_t ts_start;
+};
+
+static struct {
+	rd_ts_t  t_start;
+	rd_ts_t  t_end;
+	rd_ts_t  t_end_send;
+	uint64_t msgs;
+	uint64_t msgs_last;
+        uint64_t msgs_dr_ok;
+        uint64_t msgs_dr_err;
+        uint64_t bytes_dr_ok;
+	uint64_t bytes;
+	uint64_t bytes_last;
+	uint64_t tx;
+	uint64_t tx_err;
+        uint64_t avg_rtt;
+        uint64_t offset;
+	rd_ts_t  t_fetch_latency;
+	rd_ts_t  t_last;
+        rd_ts_t  t_enobufs_last;
+	rd_ts_t  t_total;
+        rd_ts_t  latency_last;
+        rd_ts_t  latency_lo;
+        rd_ts_t  latency_hi;
+        rd_ts_t  latency_sum;
+        int      latency_cnt;
+        int64_t  last_offset;
+} cnt;
+
+
+uint64_t wall_clock (void) {
+        struct timeval tv;
+        gettimeofday(&tv, NULL);
+        return ((uint64_t)tv.tv_sec * 1000000LLU) +
+		((uint64_t)tv.tv_usec);
+}
+
+static void err_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) {
+	printf("%% ERROR CALLBACK: %s: %s: %s\n",
+	       rd_kafka_name(rk), rd_kafka_err2str(err), reason);
+}
+
+static void throttle_cb (rd_kafka_t *rk, const char *broker_name,
+			 int32_t broker_id, int throttle_time_ms,
+			 void *opaque) {
+	printf("%% THROTTLED %dms by %s (%"PRId32")\n", throttle_time_ms,
+	       broker_name, broker_id);
+}
+
+static void offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
+                              rd_kafka_topic_partition_list_t *offsets,
+                              void *opaque) {
+        int i;
+
+        if (err || verbosity >= 2)
+                printf("%% Offset commit of %d partition(s): %s\n",
+                       offsets->cnt, rd_kafka_err2str(err));
+
+        for (i = 0 ; i < offsets->cnt ; i++) {
+                rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
+                if (rktpar->err || verbosity >= 2)
+                        printf("%%  %s [%"PRId32"] @ %"PRId64": %s\n",
+                               rktpar->topic, rktpar->partition,
+                               rktpar->offset, rd_kafka_err2str(err));
+        }
+}
+
+/**
+ * @brief Add latency measurement
+ */
+static void latency_add (int64_t ts, const char *who) {
+        if (ts > cnt.latency_hi)
+                cnt.latency_hi = ts;
+        if (!cnt.latency_lo || ts < cnt.latency_lo)
+                cnt.latency_lo = ts;
+        cnt.latency_last = ts;
+        cnt.latency_cnt++;
+        cnt.latency_sum += ts;
+        if (latency_fp)
+                fprintf(latency_fp, "%"PRIu64"\n", ts);
+}
+
+
+static void msg_delivered (rd_kafka_t *rk,
+                           const rd_kafka_message_t *rkmessage, void *opaque) {
+	static rd_ts_t last;
+	rd_ts_t now = rd_clock();
+	static int msgs;
+
+	msgs++;
+
+	msgs_wait_cnt--;
+
+	if (rkmessage->err)
+                cnt.msgs_dr_err++;
+        else {
+                cnt.msgs_dr_ok++;
+                cnt.bytes_dr_ok += rkmessage->len;
+        }
+
+        if (latency_mode) {
+                /* Extract latency */
+                int64_t source_ts;
+                if (sscanf(rkmessage->payload, "LATENCY:%"SCNd64,
+                           &source_ts) == 1)
+                        latency_add(wall_clock() - source_ts, "producer");
+        }
+
+
+	if ((rkmessage->err &&
+	     (cnt.msgs_dr_err < 50 ||
+              !(cnt.msgs_dr_err % (dispintvl / 1000)))) ||
+	    !last || msgs_wait_cnt < 5 ||
+	    !(msgs_wait_cnt % dr_disp_div) || 
+	    (now - last) >= dispintvl * 1000 ||
+            verbosity >= 3) {
+		if (rkmessage->err && verbosity >= 2)
+			printf("%% Message delivery failed: %s [%"PRId32"]: "
+			       "%s (%li remain)\n",
+			       rd_kafka_topic_name(rkmessage->rkt),
+			       rkmessage->partition,
+			       rd_kafka_err2str(rkmessage->err),
+			       msgs_wait_cnt);
+		else if (verbosity > 2)
+			printf("%% Message delivered (offset %"PRId64"): "
+                               "%li remain\n",
+                               rkmessage->offset, msgs_wait_cnt);
+		if (verbosity >= 3 && do_seq)
+			printf(" --> \"%.*s\"\n",
+                               (int)rkmessage->len,
+                               (const char *)rkmessage->payload);
+		last = now;
+	}
+
+        if (report_offset)
+                cnt.last_offset = rkmessage->offset;
+
+	if (msgs_wait_produce_cnt == 0 && msgs_wait_cnt == 0 && !forever) {
+		if (verbosity >= 2)
+			printf("All messages delivered!\n");
+		t_end = rd_clock();
+		run = 0;
+	}
+
+	if (exit_after && exit_after <= msgs) {
+		printf("%% Hard exit after %i messages, as requested\n",
+		       exit_after);
+		exit(0);
+	}
+}
+
+
+static void msg_consume (rd_kafka_message_t *rkmessage, void *opaque) {
+
+	if (rkmessage->err) {
+		if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
+                        cnt.offset = rkmessage->offset;
+
+                        if (verbosity >= 1)
+                                printf("%% Consumer reached end of "
+                                       "%s [%"PRId32"] "
+                                       "message queue at offset %"PRId64"\n",
+                                       rd_kafka_topic_name(rkmessage->rkt),
+                                       rkmessage->partition, rkmessage->offset);
+
+			if (exit_eof && ++eof_cnt == partition_cnt)
+				run = 0;
+
+			return;
+		}
+
+		printf("%% Consume error for topic \"%s\" [%"PRId32"] "
+		       "offset %"PRId64": %s\n",
+		       rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt):"",
+		       rkmessage->partition,
+		       rkmessage->offset,
+		       rd_kafka_message_errstr(rkmessage));
+
+                if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
+                    rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
+                        run = 0;
+
+                cnt.msgs_dr_err++;
+		return;
+	}
+
+	/* Start measuring from first message received */
+	if (!cnt.t_start)
+		cnt.t_start = cnt.t_last = rd_clock();
+
+        cnt.offset = rkmessage->offset;
+	cnt.msgs++;
+	cnt.bytes += rkmessage->len;
+
+	if (verbosity >= 3 ||
+            (verbosity >= 2 && !(cnt.msgs % 1000000)))
+		printf("@%"PRId64": %.*s: %.*s\n",
+		       rkmessage->offset,
+                       (int)rkmessage->key_len, (char *)rkmessage->key,
+		       (int)rkmessage->len, (char *)rkmessage->payload);
+
+
+        if (latency_mode) {
+                int64_t remote_ts, ts;
+
+                if (rkmessage->len > 8 &&
+                    !memcmp(rkmessage->payload, "LATENCY:", 8) &&
+                    sscanf(rkmessage->payload, "LATENCY:%"SCNd64,
+                           &remote_ts) == 1) {
+                        ts = wall_clock() - remote_ts;
+                        if (ts > 0 && ts < (1000000 * 60 * 5)) {
+                                latency_add(ts, "consumer");
+                        } else {
+                                if (verbosity >= 1)
+                                        printf("Received latency timestamp is too far off: %"PRId64"us (message offset %"PRId64"): ignored\n",
+                                               ts, rkmessage->offset);
+                        }
+                } else if (verbosity > 1)
+                        printf("not a LATENCY payload: %.*s\n",
+                               (int)rkmessage->len,
+                               (char *)rkmessage->payload);
+
+        }
+
+        if (read_hdrs) {
+                rd_kafka_headers_t *hdrs;
+                /* Force parsing of headers but don't do anything with them. */
+                rd_kafka_message_headers(rkmessage, &hdrs);
+        }
+
+        if (msgcnt != -1 && (int)cnt.msgs >= msgcnt)
+                run = 0;
+}
+
+
+static void rebalance_cb (rd_kafka_t *rk,
+			  rd_kafka_resp_err_t err,
+			  rd_kafka_topic_partition_list_t *partitions,
+			  void *opaque) {
+
+	switch (err)
+	{
+	case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
+		fprintf(stderr,
+			"%% Group rebalanced: %d partition(s) assigned\n",
+			partitions->cnt);
+		eof_cnt = 0;
+		partition_cnt = partitions->cnt;
+		rd_kafka_assign(rk, partitions);
+		break;
+
+	case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
+		fprintf(stderr,
+			"%% Group rebalanced: %d partition(s) revoked\n",
+			partitions->cnt);
+		eof_cnt = 0;
+		partition_cnt = 0;
+		rd_kafka_assign(rk, NULL);
+		break;
+
+	default:
+		break;
+	}
+}
+
+
+/**
+ * Find and extract single value from a two-level search.
+ * First find 'field1', then find 'field2' and extract its value.
+ * Returns 0 on miss else the value.
+ */
+static uint64_t json_parse_fields (const char *json, const char **end,
+                                   const char *field1, const char *field2) {
+        const char *t = json;
+        const char *t2;
+        int len1 = (int)strlen(field1);
+        int len2 = (int)strlen(field2);
+
+        while ((t2 = strstr(t, field1))) {
+                uint64_t v;
+
+                t = t2;
+                t += len1;
+
+                /* Find field */
+                if (!(t2 = strstr(t, field2)))
+                        continue;
+                t2 += len2;
+
+                while (isspace((int)*t2))
+                        t2++;
+
+                v = strtoull(t2, (char **)&t, 10);
+                if (t2 == t)
+                        continue;
+
+                *end = t;
+                return v;
+        }
+
+        *end = t + strlen(t);
+        return 0;
+}
+
+/**
+ * Parse various values from rdkafka stats
+ */
+static void json_parse_stats (const char *json) {
+        const char *t;
+#define MAX_AVGS 100 /* max number of brokers to scan for rtt */
+        uint64_t avg_rtt[MAX_AVGS+1];
+        int avg_rtt_i     = 0;
+
+        /* Store totals at end of array */
+        avg_rtt[MAX_AVGS]     = 0;
+
+        /* Extract all broker RTTs */
+        t = json;
+        while (avg_rtt_i < MAX_AVGS && *t) {
+                avg_rtt[avg_rtt_i] = json_parse_fields(t, &t,
+                                                       "\"rtt\":",
+                                                       "\"avg\":");
+
+                /* Skip low RTT values, means no messages are passing */
+                if (avg_rtt[avg_rtt_i] < 100 /*0.1ms*/)
+                        continue;
+
+
+                avg_rtt[MAX_AVGS] += avg_rtt[avg_rtt_i];
+                avg_rtt_i++;
+        }
+
+        if (avg_rtt_i > 0)
+                avg_rtt[MAX_AVGS] /= avg_rtt_i;
+
+        cnt.avg_rtt = avg_rtt[MAX_AVGS];
+}
+
+
+static int stats_cb (rd_kafka_t *rk, char *json, size_t json_len,
+		     void *opaque) {
+
+        /* Extract values for our own stats */
+        json_parse_stats(json);
+
+        if (stats_fp)
+                fprintf(stats_fp, "%s\n", json);
+	return 0;
+}
+
+#define _OTYPE_TAB      0x1  /* tabular format */
+#define _OTYPE_SUMMARY  0x2  /* summary format */
+#define _OTYPE_FORCE    0x4  /* force output regardless of interval timing */
+static void print_stats (rd_kafka_t *rk,
+                         int mode, int otype, const char *compression) {
+	rd_ts_t now = rd_clock();
+	rd_ts_t t_total;
+        static int rows_written = 0;
+        int print_header;
+        double latency_avg = 0.0f;
+        char extra[512];
+        int extra_of = 0;
+        *extra = '\0';
+
+	if (!(otype & _OTYPE_FORCE) &&
+            (((otype & _OTYPE_SUMMARY) && verbosity == 0) ||
+             cnt.t_last + dispintvl > now))
+		return;
+
+        print_header = !rows_written ||(verbosity > 0 && !(rows_written % 20));
+
+	if (cnt.t_end_send)
+		t_total = cnt.t_end_send - cnt.t_start;
+	else if (cnt.t_end)
+		t_total = cnt.t_end - cnt.t_start;
+	else if (cnt.t_start)
+		t_total = now - cnt.t_start;
+	else
+		t_total = 1;
+
+        if (latency_mode && cnt.latency_cnt)
+                latency_avg = (double)cnt.latency_sum /
+                        (double)cnt.latency_cnt;
+
+        if (mode == 'P') {
+
+                if (otype & _OTYPE_TAB) {
+#define ROW_START()        do {} while (0)
+#define COL_HDR(NAME)      printf("| %10.10s ", (NAME))
+#define COL_PR64(NAME,VAL) printf("| %10"PRIu64" ", (VAL))
+#define COL_PRF(NAME,VAL)  printf("| %10.2f ", (VAL))
+#define ROW_END()          do {                 \
+                                printf("\n");   \
+                                rows_written++; \
+                        } while (0)
+
+                        if (print_header) {
+                                /* First time, print header */
+                                ROW_START();
+                                COL_HDR("elapsed");
+                                COL_HDR("msgs");
+                                COL_HDR("bytes");
+                                COL_HDR("rtt");
+                                COL_HDR("dr");
+                                COL_HDR("dr_m/s");
+                                COL_HDR("dr_MB/s");
+                                COL_HDR("dr_err");
+                                COL_HDR("tx_err");
+                                COL_HDR("outq");
+                                if (report_offset)
+                                        COL_HDR("offset");
+                                if (latency_mode) {
+                                        COL_HDR("lat_curr");
+                                        COL_HDR("lat_avg");
+                                        COL_HDR("lat_lo");
+                                        COL_HDR("lat_hi");
+                                }
+
+                                ROW_END();
+                        }
+
+                        ROW_START();
+                        COL_PR64("elapsed", t_total / 1000);
+                        COL_PR64("msgs", cnt.msgs);
+                        COL_PR64("bytes", cnt.bytes);
+                        COL_PR64("rtt", cnt.avg_rtt / 1000);
+                        COL_PR64("dr", cnt.msgs_dr_ok);
+                        COL_PR64("dr_m/s",
+                                 ((cnt.msgs_dr_ok * 1000000) / t_total));
+                        COL_PRF("dr_MB/s",
+                                (float)((cnt.bytes_dr_ok) / (float)t_total));
+                        COL_PR64("dr_err", cnt.msgs_dr_err);
+                        COL_PR64("tx_err", cnt.tx_err);
+                        COL_PR64("outq",
+                                 rk ? (uint64_t)rd_kafka_outq_len(rk) : 0);
+                        if (report_offset)
+                                COL_PR64("offset", (uint64_t)cnt.last_offset);
+                        if (latency_mode) {
+                                COL_PRF("lat_curr", cnt.latency_last / 1000.0f);
+                                COL_PRF("lat_avg", latency_avg / 1000.0f);
+                                COL_PRF("lat_lo", cnt.latency_lo / 1000.0f);
+                                COL_PRF("lat_hi", cnt.latency_hi / 1000.0f);
+                        }
+                        ROW_END();
+                }
+
+                if (otype & _OTYPE_SUMMARY) {
+                        printf("%% %"PRIu64" messages produced "
+                               "(%"PRIu64" bytes), "
+                               "%"PRIu64" delivered "
+                               "(offset %"PRId64", %"PRIu64" failed) "
+                               "in %"PRIu64"ms: %"PRIu64" msgs/s and "
+                               "%.02f MB/s, "
+                               "%"PRIu64" produce failures, %i in queue, "
+                               "%s compression\n",
+                               cnt.msgs, cnt.bytes,
+                               cnt.msgs_dr_ok, cnt.last_offset, cnt.msgs_dr_err,
+                               t_total / 1000,
+                               ((cnt.msgs_dr_ok * 1000000) / t_total),
+                               (float)((cnt.bytes_dr_ok) / (float)t_total),
+                               cnt.tx_err,
+                               rk ? rd_kafka_outq_len(rk) : 0,
+                               compression);
+                }
+
+        } else {
+
+                if (otype & _OTYPE_TAB) {
+                        if (print_header) {
+                                /* First time, print header */
+                                ROW_START();
+                                COL_HDR("elapsed");
+                                COL_HDR("msgs");
+                                COL_HDR("bytes");
+                                COL_HDR("rtt");
+                                COL_HDR("m/s");
+                                COL_HDR("MB/s");
+                                COL_HDR("rx_err");
+                                COL_HDR("offset");
+                                if (latency_mode) {
+                                        COL_HDR("lat_curr");
+                                        COL_HDR("lat_avg");
+                                        COL_HDR("lat_lo");
+                                        COL_HDR("lat_hi");
+                                }
+                                ROW_END();
+                        }
+
+                        ROW_START();
+                        COL_PR64("elapsed", t_total / 1000);
+                        COL_PR64("msgs", cnt.msgs);
+                        COL_PR64("bytes", cnt.bytes);
+                        COL_PR64("rtt", cnt.avg_rtt / 1000);
+                        COL_PR64("m/s",
+                                 ((cnt.msgs * 1000000) / t_total));
+                        COL_PRF("MB/s",
+                                (float)((cnt.bytes) / (float)t_total));
+                        COL_PR64("rx_err", cnt.msgs_dr_err);
+                        COL_PR64("offset", cnt.offset);
+                        if (latency_mode) {
+                                COL_PRF("lat_curr", cnt.latency_last / 1000.0f);
+                                COL_PRF("lat_avg", latency_avg / 1000.0f);
+                                COL_PRF("lat_lo", cnt.latency_lo / 1000.0f);
+                                COL_PRF("lat_hi", cnt.latency_hi / 1000.0f);
+                        }
+                        ROW_END();
+
+                }
+
+                if (otype & _OTYPE_SUMMARY) {
+                        if (latency_avg >= 1.0f)
+                                extra_of += rd_snprintf(extra+extra_of,
+                                                     sizeof(extra)-extra_of,
+                                                     ", latency "
+                                                     "curr/avg/lo/hi "
+                                                     "%.2f/%.2f/%.2f/%.2fms",
+                                                     cnt.latency_last / 1000.0f,
+                                                     latency_avg  / 1000.0f,
+                                                     cnt.latency_lo / 1000.0f,
+                                                     cnt.latency_hi / 1000.0f)
+;
+                        printf("%% %"PRIu64" messages (%"PRIu64" bytes) "
+                               "consumed in %"PRIu64"ms: %"PRIu64" msgs/s "
+                               "(%.02f MB/s)"
+                               "%s\n",
+                               cnt.msgs, cnt.bytes,
+                               t_total / 1000,
+                               ((cnt.msgs * 1000000) / t_total),
+                               (float)((cnt.bytes) / (float)t_total),
+                               extra);
+                }
+
+                if (incremental_mode && now > cnt.t_last) {
+                        uint64_t i_msgs = cnt.msgs - cnt.msgs_last;
+                        uint64_t i_bytes = cnt.bytes - cnt.bytes_last;
+                        uint64_t i_time = cnt.t_last ? now - cnt.t_last : 0;
+
+                        printf("%% INTERVAL: %"PRIu64" messages "
+                               "(%"PRIu64" bytes) "
+                               "consumed in %"PRIu64"ms: %"PRIu64" msgs/s "
+                               "(%.02f MB/s)"
+                               "%s\n",
+                               i_msgs, i_bytes,
+                               i_time / 1000,
+                               ((i_msgs * 1000000) / i_time),
+                               (float)((i_bytes) / (float)i_time),
+                               extra);
+
+                }
+        }
+
+	cnt.t_last = now;
+	cnt.msgs_last = cnt.msgs;
+	cnt.bytes_last = cnt.bytes;
+}
+
+
+static void sig_usr1 (int sig) {
+	rd_kafka_dump(stdout, global_rk);
+}
+
+
+/**
+ * @brief Read config from file
+ * @returns -1 on error, else 0.
+ */
+static int read_conf_file (rd_kafka_conf_t *conf,
+                           rd_kafka_topic_conf_t *tconf, const char *path) {
+        FILE *fp;
+        char buf[512];
+        int line = 0;
+        char errstr[512];
+
+        if (!(fp = fopen(path, "r"))) {
+                fprintf(stderr, "%% Failed to open %s: %s\n",
+                        path, strerror(errno));
+                return -1;
+        }
+
+        while (fgets(buf, sizeof(buf), fp)) {
+                char *s = buf;
+                char *t;
+                rd_kafka_conf_res_t r = RD_KAFKA_CONF_UNKNOWN;
+
+                line++;
+
+                while (isspace((int)*s))
+                        s++;
+
+                if (!*s || *s == '#')
+                        continue;
+
+                if ((t = strchr(buf, '\n')))
+                        *t = '\0';
+
+                t = strchr(buf, '=');
+                if (!t || t == s || !*(t+1)) {
+                        fprintf(stderr, "%% %s:%d: expected key=value\n",
+                                path, line);
+                        fclose(fp);
+                        return -1;
+                }
+
+                *(t++) = '\0';
+
+                /* Try property on topic config first */
+                if (tconf)
+                        r = rd_kafka_topic_conf_set(tconf, s, t,
+                                                    errstr, sizeof(errstr));
+
+                /* Try global config */
+                if (r == RD_KAFKA_CONF_UNKNOWN)
+                        r = rd_kafka_conf_set(conf, s, t,
+                                              errstr, sizeof(errstr));
+
+                if (r == RD_KAFKA_CONF_OK)
+                        continue;
+
+                fprintf(stderr, "%% %s:%d: %s=%s: %s\n",
+                        path, line, s, t, errstr);
+                fclose(fp);
+                return -1;
+        }
+
+        fclose(fp);
+
+        return 0;
+}
+
+
+static rd_kafka_resp_err_t do_produce (rd_kafka_t *rk,
+                                       rd_kafka_topic_t *rkt, int32_t partition,
+                                       int msgflags,
+                                       void *payload, size_t size,
+                                       const void *key, size_t key_size,
+                                       const rd_kafka_headers_t *hdrs) {
+
+        /* Send/Produce message. */
+        if (hdrs) {
+                rd_kafka_headers_t *hdrs_copy;
+                rd_kafka_resp_err_t err;
+
+                hdrs_copy = rd_kafka_headers_copy(hdrs);
+
+                err = rd_kafka_producev(
+                        rk,
+                        RD_KAFKA_V_RKT(rkt),
+                        RD_KAFKA_V_PARTITION(partition),
+                        RD_KAFKA_V_MSGFLAGS(msgflags),
+                        RD_KAFKA_V_VALUE(payload, size),
+                        RD_KAFKA_V_KEY(key, key_size),
+                        RD_KAFKA_V_HEADERS(hdrs_copy),
+                        RD_KAFKA_V_END);
+
+                if (err)
+                        rd_kafka_headers_destroy(hdrs_copy);
+
+                return err;
+
+        } else {
+                if (rd_kafka_produce(rkt, partition, msgflags, payload, size,
+                                     key, key_size, NULL) == -1)
+                        return rd_kafka_last_error();
+        }
+
+        return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+int main (int argc, char **argv) {
+	char *brokers = NULL;
+	char mode = 'C';
+	char *topic = NULL;
+	const char *key = NULL;
+        int *partitions = NULL;
+	int opt;
+	int sendflags = 0;
+	char *msgpattern = "librdkafka_performance testing!";
+	int msgsize = (int)strlen(msgpattern);
+	const char *debug = NULL;
+	rd_ts_t now;
+	char errstr[512];
+	uint64_t seq = 0;
+	int seed = (int)time(NULL);
+        rd_kafka_t *rk;
+	rd_kafka_topic_t *rkt;
+	rd_kafka_conf_t *conf;
+	rd_kafka_topic_conf_t *topic_conf;
+	rd_kafka_queue_t *rkqu = NULL;
+	const char *compression = "no";
+	int64_t start_offset = 0;
+	int batch_size = 0;
+	int idle = 0;
+        const char *stats_cmd = NULL;
+        char *stats_intvlstr = NULL;
+        char tmp[128];
+        char *tmp2;
+        int otype = _OTYPE_SUMMARY;
+        double dtmp;
+        int rate_sleep = 0;
+	rd_kafka_topic_partition_list_t *topics;
+        int exitcode = 0;
+        rd_kafka_headers_t *hdrs = NULL;
+        rd_kafka_resp_err_t err;
+
+	/* Kafka configuration */
+	conf = rd_kafka_conf_new();
+	rd_kafka_conf_set_error_cb(conf, err_cb);
+	rd_kafka_conf_set_throttle_cb(conf, throttle_cb);
+        rd_kafka_conf_set_offset_commit_cb(conf, offset_commit_cb);
+
+#ifdef SIGIO
+        /* Quick termination */
+	rd_snprintf(tmp, sizeof(tmp), "%i", SIGIO);
+	rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
+#endif
+
+	/* Producer config */
+	rd_kafka_conf_set(conf, "queue.buffering.max.messages", "500000",
+			  NULL, 0);
+	rd_kafka_conf_set(conf, "message.send.max.retries", "3", NULL, 0);
+	rd_kafka_conf_set(conf, "retry.backoff.ms", "500", NULL, 0);
+
+	/* Consumer config */
+	/* Tell rdkafka to (try to) maintain 1M messages
+	 * in its internal receive buffers. This is to avoid
+	 * application -> rdkafka -> broker  per-message ping-pong
+	 * latency.
+	 * The larger the local queue, the higher the performance.
+	 * Try other values with: ... -X queued.min.messages=1000
+	 */
+	rd_kafka_conf_set(conf, "queued.min.messages", "1000000", NULL, 0);
+	rd_kafka_conf_set(conf, "session.timeout.ms", "6000", NULL, 0);
+
+	/* Kafka topic configuration */
+	topic_conf = rd_kafka_topic_conf_new();
+	rd_kafka_topic_conf_set(topic_conf, "auto.offset.reset", "earliest",
+				NULL, 0);
+
+	topics = rd_kafka_topic_partition_list_new(1);
+
+	while ((opt =
+		getopt(argc, argv,
+		       "PCG:t:p:b:s:k:c:fi:MDd:m:S:x:"
+                       "R:a:z:o:X:B:eT:Y:qvIur:lA:OwNHH:")) != -1) {
+		switch (opt) {
+		case 'G':
+			if (rd_kafka_conf_set(conf, "group.id", optarg,
+					      errstr, sizeof(errstr)) !=
+			    RD_KAFKA_CONF_OK) {
+				fprintf(stderr, "%% %s\n", errstr);
+				exit(1);
+			}
+			/* FALLTHRU */
+		case 'P':
+		case 'C':
+			mode = opt;
+			break;
+		case 't':
+			rd_kafka_topic_partition_list_add(topics, optarg,
+							  RD_KAFKA_PARTITION_UA);
+			break;
+		case 'p':
+                        partition_cnt++;
+			partitions = realloc(partitions, sizeof(*partitions) * partition_cnt);
+			partitions[partition_cnt-1] = atoi(optarg);
+			break;
+
+		case 'b':
+			brokers = optarg;
+			break;
+		case 's':
+			msgsize = atoi(optarg);
+			break;
+		case 'k':
+			key = optarg;
+			break;
+		case 'c':
+			msgcnt = atoi(optarg);
+			break;
+		case 'D':
+			sendflags |= RD_KAFKA_MSG_F_FREE;
+			break;
+		case 'i':
+			dispintvl = atoi(optarg);
+			break;
+		case 'm':
+			msgpattern = optarg;
+			break;
+		case 'S':
+			seq = strtoull(optarg, NULL, 10);
+			do_seq = 1;
+			break;
+		case 'x':
+			exit_after = atoi(optarg);
+			break;
+		case 'R':
+			seed = atoi(optarg);
+			break;
+		case 'a':
+			if (rd_kafka_topic_conf_set(topic_conf,
+						    "request.required.acks",
+						    optarg,
+						    errstr, sizeof(errstr)) !=
+			    RD_KAFKA_CONF_OK) {
+				fprintf(stderr, "%% %s\n", errstr);
+				exit(1);
+			}
+			break;
+		case 'B':
+			batch_size = atoi(optarg);
+			break;
+		case 'z':
+			if (rd_kafka_conf_set(conf, "compression.codec",
+					      optarg,
+					      errstr, sizeof(errstr)) !=
+			    RD_KAFKA_CONF_OK) {
+				fprintf(stderr, "%% %s\n", errstr);
+				exit(1);
+			}
+			compression = optarg;
+			break;
+		case 'o':
+			if (!strcmp(optarg, "end"))
+				start_offset = RD_KAFKA_OFFSET_END;
+			else if (!strcmp(optarg, "beginning"))
+				start_offset = RD_KAFKA_OFFSET_BEGINNING;
+			else if (!strcmp(optarg, "stored"))
+				start_offset = RD_KAFKA_OFFSET_STORED;
+			else {
+				start_offset = strtoll(optarg, NULL, 10);
+
+				if (start_offset < 0)
+					start_offset = RD_KAFKA_OFFSET_TAIL(-start_offset);
+			}
+
+			break;
+		case 'e':
+			exit_eof = 1;
+			break;
+		case 'd':
+			debug = optarg;
+			break;
+                case 'H':
+                {
+                        char *name, *val;
+                        size_t name_sz = -1;
+
+                        if (!optarg) {
+                                read_hdrs = 1;
+                                break;
+                        }
+
+                        name = optarg;
+                        val = strchr(name, '=');
+                        if (val) {
+                                name_sz = (size_t)(val-name);
+                                val++; /* past the '=' */
+                        }
+
+                        if (!hdrs)
+                                hdrs = rd_kafka_headers_new(8);
+
+                        err = rd_kafka_header_add(hdrs, name, name_sz, val, -1);
+                        if (err) {
+                                fprintf(stderr,
+                                        "%% Failed to add header %s: %s\n",
+                                        name, rd_kafka_err2str(err));
+                                exit(1);
+                        }
+
+                        read_hdrs = 1;
+                }
+                break;
+		case 'X':
+		{
+			char *name, *val;
+			rd_kafka_conf_res_t res;
+
+			if (!strcmp(optarg, "list") ||
+			    !strcmp(optarg, "help")) {
+				rd_kafka_conf_properties_show(stdout);
+				exit(0);
+			}
+
+			name = optarg;
+			if (!(val = strchr(name, '='))) {
+				fprintf(stderr, "%% Expected "
+					"-X property=value, not %s\n", name);
+				exit(1);
+			}
+
+			*val = '\0';
+			val++;
+
+                        if (!strcmp(name, "file")) {
+                                if (read_conf_file(conf, topic_conf, val) == -1)
+                                        exit(1);
+                                break;
+                        }
+
+			res = RD_KAFKA_CONF_UNKNOWN;
+			/* Try "topic." prefixed properties on topic
+			 * conf first, and then fall through to global if
+			 * it didnt match a topic configuration property. */
+			if (!strncmp(name, "topic.", strlen("topic.")))
+				res = rd_kafka_topic_conf_set(topic_conf,
+							      name+
+							      strlen("topic."),
+							      val,
+							      errstr,
+							      sizeof(errstr));
+
+			if (res == RD_KAFKA_CONF_UNKNOWN)
+				res = rd_kafka_conf_set(conf, name, val,
+							errstr, sizeof(errstr));
+
+			if (res != RD_KAFKA_CONF_OK) {
+				fprintf(stderr, "%% %s\n", errstr);
+				exit(1);
+			}
+		}
+		break;
+
+		case 'T':
+                        stats_intvlstr = optarg;
+			break;
+                case 'Y':
+                        stats_cmd = optarg;
+                        break;
+
+		case 'q':
+                        verbosity--;
+			break;
+
+		case 'v':
+                        verbosity++;
+			break;
+
+		case 'I':
+			idle = 1;
+			break;
+
+                case 'u':
+                        otype = _OTYPE_TAB;
+                        verbosity--; /* remove some fluff */
+                        break;
+
+                case 'r':
+                        dtmp = strtod(optarg, &tmp2);
+                        if (tmp2 == optarg ||
+                            (dtmp >= -0.001 && dtmp <= 0.001)) {
+                                fprintf(stderr, "%% Invalid rate: %s\n",
+                                        optarg);
+                                exit(1);
+                        }
+
+                        rate_sleep = (int)(1000000.0 / dtmp);
+                        break;
+
+                case 'l':
+                        latency_mode = 1;
+			break;
+
+		case 'A':
+			if (!(latency_fp = fopen(optarg, "w"))) {
+				fprintf(stderr,
+					"%% Cant open %s: %s\n",
+					optarg, strerror(errno));
+				exit(1);
+			}
+                        break;
+
+                case 'O':
+                        if (rd_kafka_topic_conf_set(topic_conf,
+                                                    "produce.offset.report",
+                                                    "true",
+                                                    errstr, sizeof(errstr)) !=
+                            RD_KAFKA_CONF_OK) {
+                                fprintf(stderr, "%% %s\n", errstr);
+                                exit(1);
+                        }
+                        report_offset = 1;
+                        break;
+
+		case 'M':
+			incremental_mode = 1;
+			break;
+
+		case 'N':
+			with_dr = 0;
+			break;
+
+		default:
+                        fprintf(stderr, "Unknown option: %c\n", opt);
+			goto usage;
+		}
+	}
+
+	if (topics->cnt == 0 || optind != argc) {
+                if (optind < argc)
+                        fprintf(stderr, "Unknown argument: %s\n", argv[optind]);
+	usage:
+		fprintf(stderr,
+			"Usage: %s [-C|-P] -t <topic> "
+			"[-p <partition>] [-b <broker,broker..>] [options..]\n"
+			"\n"
+			"librdkafka version %s (0x%08x)\n"
+			"\n"
+			" Options:\n"
+			"  -C | -P |    Consumer or Producer mode\n"
+			"  -G <groupid> High-level Kafka Consumer mode\n"
+			"  -t <topic>   Topic to consume / produce\n"
+			"  -p <num>     Partition (defaults to random). "
+			"Multiple partitions are allowed in -C consumer mode.\n"
+			"  -M           Print consumer interval stats\n"
+			"  -b <brokers> Broker address list (host[:port],..)\n"
+			"  -s <size>    Message size (producer)\n"
+			"  -k <key>     Message key (producer)\n"
+                        "  -H <name[=value]> Add header to message (producer)\n"
+                        "  -H           Read message headers (consumer)\n"
+			"  -c <cnt>     Messages to transmit/receive\n"
+			"  -x <cnt>     Hard exit after transmitting <cnt> messages (producer)\n"
+			"  -D           Copy/Duplicate data buffer (producer)\n"
+			"  -i <ms>      Display interval\n"
+			"  -m <msg>     Message payload pattern\n"
+			"  -S <start>   Send a sequence number starting at "
+			"<start> as payload\n"
+			"  -R <seed>    Random seed value (defaults to time)\n"
+			"  -a <acks>    Required acks (producer): "
+			"-1, 0, 1, >1\n"
+			"  -B <size>    Consume batch size (# of msgs)\n"
+			"  -z <codec>   Enable compression:\n"
+			"               none|gzip|snappy\n"
+			"  -o <offset>  Start offset (consumer)\n"
+			"               beginning, end, NNNNN or -NNNNN\n"
+			"  -d [facs..]  Enable debugging contexts:\n"
+			"               %s\n"
+			"  -X <prop=name> Set arbitrary librdkafka "
+			"configuration property\n"
+			"               Properties prefixed with \"topic.\" "
+			"will be set on topic object.\n"
+			"               Use '-X list' to see the full list\n"
+			"               of supported properties.\n"
+                        "  -X file=<path> Read config from file.\n"
+			"  -T <intvl>   Enable statistics from librdkafka at "
+			"specified interval (ms)\n"
+                        "  -Y <command> Pipe statistics to <command>\n"
+			"  -I           Idle: dont produce any messages\n"
+			"  -q           Decrease verbosity\n"
+                        "  -v           Increase verbosity (default 1)\n"
+                        "  -u           Output stats in table format\n"
+                        "  -r <rate>    Producer msg/s limit\n"
+                        "  -l           Latency measurement.\n"
+                        "               Needs two matching instances, one\n"
+                        "               consumer and one producer, both\n"
+                        "               running with the -l switch.\n"
+                        "  -l           Producer: per-message latency stats\n"
+			"  -A <file>    Write per-message latency stats to "
+			"<file>. Requires -l\n"
+                        "  -O           Report produced offset (producer)\n"
+			"  -N           No delivery reports (producer)\n"
+			"\n"
+			" In Consumer mode:\n"
+			"  consumes messages and prints thruput\n"
+			"  If -B <..> is supplied the batch consumer\n"
+			"  mode is used, else the callback mode is used.\n"
+			"\n"
+			" In Producer mode:\n"
+			"  writes messages of size -s <..> and prints thruput\n"
+			"\n",
+			argv[0],
+			rd_kafka_version_str(), rd_kafka_version(),
+			RD_KAFKA_DEBUG_CONTEXTS);
+		exit(1);
+	}
+
+
+	dispintvl *= 1000; /* us */
+
+        if (verbosity > 1)
+                printf("%% Using random seed %i, verbosity level %i\n",
+                       seed, verbosity);
+	srand(seed);
+	signal(SIGINT, stop);
+#ifdef SIGUSR1
+	signal(SIGUSR1, sig_usr1);
+#endif
+
+
+	if (debug &&
+	    rd_kafka_conf_set(conf, "debug", debug, errstr, sizeof(errstr)) !=
+	    RD_KAFKA_CONF_OK) {
+		printf("%% Debug configuration failed: %s: %s\n",
+		       errstr, debug);
+		exit(1);
+	}
+
+        /* Always enable stats (for RTT extraction), and if user supplied
+         * the -T <intvl> option we let her take part of the stats aswell. */
+        rd_kafka_conf_set_stats_cb(conf, stats_cb);
+
+        if (!stats_intvlstr) {
+                /* if no user-desired stats, adjust stats interval
+                 * to the display interval. */
+                rd_snprintf(tmp, sizeof(tmp), "%"PRId64, dispintvl / 1000);
+        }
+
+        if (rd_kafka_conf_set(conf, "statistics.interval.ms",
+                              stats_intvlstr ? stats_intvlstr : tmp,
+                              errstr, sizeof(errstr)) !=
+            RD_KAFKA_CONF_OK) {
+                fprintf(stderr, "%% %s\n", errstr);
+                exit(1);
+        }
+
+        if (latency_mode)
+                do_seq = 0;
+
+        if (stats_intvlstr) {
+                /* User enabled stats (-T) */
+
+#ifndef _MSC_VER
+                if (stats_cmd) {
+                        if (!(stats_fp = popen(stats_cmd, "we"))) {
+                                fprintf(stderr,
+                                        "%% Failed to start stats command: "
+                                        "%s: %s", stats_cmd, strerror(errno));
+                                exit(1);
+                        }
+                } else
+#endif
+                        stats_fp = stdout;
+        }
+
+	if (msgcnt != -1)
+		forever = 0;
+
+	topic = topics->elems[0].topic;
+
+	if (mode == 'P') {
+		/*
+		 * Producer
+		 */
+		char *sbuf;
+		char *pbuf;
+		int outq;
+		int keylen = key ? (int)strlen(key) : 0;
+		off_t rof = 0;
+		size_t plen = strlen(msgpattern);
+		int partition = partitions ? partitions[0] :
+			RD_KAFKA_PARTITION_UA;
+
+                if (latency_mode) {
+                        int minlen = (int)(strlen("LATENCY:") +
+                                           strlen("18446744073709551615 ")+1);
+                        msgsize = RD_MAX(minlen, msgsize);
+                        sendflags |= RD_KAFKA_MSG_F_COPY;
+		} else if (do_seq) {
+                        int minlen = (int)strlen("18446744073709551615 ")+1;
+                        if (msgsize < minlen)
+                                msgsize = minlen;
+
+			/* Force duplication of payload */
+                        sendflags |= RD_KAFKA_MSG_F_FREE;
+		}
+
+		sbuf = malloc(msgsize);
+
+		/* Copy payload content to new buffer */
+		while (rof < msgsize) {
+			size_t xlen = RD_MIN((size_t)msgsize-rof, plen);
+			memcpy(sbuf+rof, msgpattern, xlen);
+			rof += (off_t)xlen;
+		}
+
+		if (msgcnt == -1)
+			printf("%% Sending messages of size %i bytes\n",
+			       msgsize);
+		else
+			printf("%% Sending %i messages of size %i bytes\n",
+			       msgcnt, msgsize);
+
+		if (with_dr)
+			rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered);
+
+		/* Create Kafka handle */
+		if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
+ 					errstr, sizeof(errstr)))) {
+			fprintf(stderr,
+				"%% Failed to create Kafka producer: %s\n",
+				errstr);
+			exit(1);
+		}
+
+                global_rk = rk;
+
+		/* Add broker(s) */
+		if (brokers && rd_kafka_brokers_add(rk, brokers) < 1) {
+			fprintf(stderr, "%% No valid brokers specified\n");
+			exit(1);
+		}
+
+		/* Explicitly create topic to avoid per-msg lookups. */
+		rkt = rd_kafka_topic_new(rk, topic, topic_conf);
+
+
+                if (rate_sleep && verbosity >= 2)
+                        fprintf(stderr,
+                                "%% Inter message rate limiter sleep %ius\n",
+                                rate_sleep);
+
+                dr_disp_div = msgcnt / 50;
+                if (dr_disp_div == 0)
+                        dr_disp_div = 10;
+
+		cnt.t_start = cnt.t_last = rd_clock();
+
+		msgs_wait_produce_cnt = msgcnt;
+
+		while (run && (msgcnt == -1 || (int)cnt.msgs < msgcnt)) {
+			/* Send/Produce message. */
+
+			if (idle) {
+				rd_kafka_poll(rk, 1000);
+				continue;
+			}
+
+                        if (latency_mode) {
+                                rd_snprintf(sbuf, msgsize-1,
+                                         "LATENCY:%"PRIu64,  wall_clock());
+                        } else if (do_seq) {
+                                rd_snprintf(sbuf,
+                                         msgsize-1, "%"PRIu64": ", seq);
+                                seq++;
+			}
+
+			if (sendflags & RD_KAFKA_MSG_F_FREE) {
+				/* Duplicate memory */
+				pbuf = malloc(msgsize);
+				memcpy(pbuf, sbuf, msgsize);
+			} else
+				pbuf = sbuf;
+
+                        if (msgsize == 0)
+                                pbuf = NULL;
+
+			cnt.tx++;
+			while (run &&
+                               (err = do_produce(rk, rkt, partition, sendflags,
+                                                 pbuf, msgsize,
+                                                 key, keylen, hdrs))) {
+				if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
+					printf("%% No such partition: "
+						   "%"PRId32"\n", partition);
+				else if (verbosity >= 3 ||
+					(err != RD_KAFKA_RESP_ERR__QUEUE_FULL && verbosity >= 1))
+					printf("%% produce error: %s%s\n",
+						   rd_kafka_err2str(err),
+						   err == RD_KAFKA_RESP_ERR__QUEUE_FULL ?
+						   " (backpressure)" : "");
+
+				cnt.tx_err++;
+				if (err != RD_KAFKA_RESP_ERR__QUEUE_FULL) {
+					run = 0;
+					break;
+				}
+				now = rd_clock();
+				if (verbosity >= 2 &&
+                                    cnt.t_enobufs_last + dispintvl <= now) {
+					printf("%% Backpressure %i "
+					       "(tx %"PRIu64", "
+					       "txerr %"PRIu64")\n",
+					       rd_kafka_outq_len(rk),
+					       cnt.tx, cnt.tx_err);
+					cnt.t_enobufs_last = now;
+				}
+
+				/* Poll to handle delivery reports */
+				rd_kafka_poll(rk, 10);
+
+                                print_stats(rk, mode, otype, compression);
+			}
+
+			msgs_wait_cnt++;
+			if (msgs_wait_produce_cnt != -1)
+				msgs_wait_produce_cnt--;
+			cnt.msgs++;
+			cnt.bytes += msgsize;
+
+                        if (rate_sleep) {
+				if (rate_sleep > 100) {
+#ifdef _MSC_VER
+					Sleep(rate_sleep / 1000);
+#else
+					usleep(rate_sleep);
+#endif
+				} else {
+					rd_ts_t next = rd_clock() + rate_sleep;
+					while (next > rd_clock())
+						;
+				}
+                        }
+
+			/* Must poll to handle delivery reports */
+			rd_kafka_poll(rk, 0);
+
+			print_stats(rk, mode, otype, compression);
+		}
+
+		forever = 0;
+                if (verbosity >= 2)
+                        printf("%% All messages produced, "
+                               "now waiting for %li deliveries\n",
+                               msgs_wait_cnt);
+
+		/* Wait for messages to be delivered */
+                while (run && rd_kafka_poll(rk, 1000) != -1)
+			print_stats(rk, mode, otype, compression);
+
+
+		outq = rd_kafka_outq_len(rk);
+                if (verbosity >= 2)
+                        printf("%% %i messages in outq\n", outq);
+		cnt.msgs -= outq;
+		cnt.bytes -= msgsize * outq;
+
+		cnt.t_end = t_end;
+
+		if (cnt.tx_err > 0)
+			printf("%% %"PRIu64" backpressures for %"PRIu64
+			       " produce calls: %.3f%% backpressure rate\n",
+			       cnt.tx_err, cnt.tx,
+			       ((double)cnt.tx_err / (double)cnt.tx) * 100.0);
+
+		/* Destroy topic */
+		rd_kafka_topic_destroy(rkt);
+
+		/* Destroy the handle */
+		rd_kafka_destroy(rk);
+                global_rk = rk = NULL;
+
+		free(sbuf);
+
+                exitcode = cnt.msgs == cnt.msgs_dr_ok ? 0 : 1;
+
+	} else if (mode == 'C') {
+		/*
+		 * Consumer
+		 */
+
+		rd_kafka_message_t **rkmessages = NULL;
+		size_t i = 0;
+
+		/* Create Kafka handle */
+		if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
+					errstr, sizeof(errstr)))) {
+			fprintf(stderr,
+				"%% Failed to create Kafka consumer: %s\n",
+				errstr);
+			exit(1);
+		}
+
+                global_rk = rk;
+
+		/* Add broker(s) */
+		if (brokers && rd_kafka_brokers_add(rk, brokers) < 1) {
+			fprintf(stderr, "%% No valid brokers specified\n");
+			exit(1);
+		}
+
+		/* Create topic to consume from */
+		rkt = rd_kafka_topic_new(rk, topic, topic_conf);
+
+		/* Batch consumer */
+		if (batch_size)
+			rkmessages = malloc(sizeof(*rkmessages) * batch_size);
+
+		/* Start consuming */
+		rkqu = rd_kafka_queue_new(rk);
+		for (i=0 ; i<(size_t)partition_cnt ; ++i) {
+			const int r = rd_kafka_consume_start_queue(rkt,
+				partitions[i], start_offset, rkqu);
+
+			if (r == -1) {
+                                fprintf(stderr, "%% Error creating queue: %s\n",
+                                        rd_kafka_err2str(rd_kafka_last_error()));
+				exit(1);
+			}
+		}
+
+		while (run && (msgcnt == -1 || msgcnt > (int)cnt.msgs)) {
+			/* Consume messages.
+			 * A message may either be a real message, or
+			 * an error signaling (if rkmessage->err is set).
+			 */
+			uint64_t fetch_latency;
+			ssize_t r;
+
+			fetch_latency = rd_clock();
+
+			if (batch_size) {
+				int i;
+				int partition = partitions ? partitions[0] :
+				    RD_KAFKA_PARTITION_UA;
+
+				/* Batch fetch mode */
+				r = rd_kafka_consume_batch(rkt, partition,
+							   1000,
+							   rkmessages,
+							   batch_size);
+				if (r != -1) {
+					for (i = 0 ; i < r ; i++) {
+						msg_consume(rkmessages[i],
+							NULL);
+						rd_kafka_message_destroy(
+							rkmessages[i]);
+					}
+				}
+			} else {
+				/* Queue mode */
+				r = rd_kafka_consume_callback_queue(rkqu, 1000,
+							msg_consume,
+							NULL);
+			}
+
+			cnt.t_fetch_latency += rd_clock() - fetch_latency;
+                        if (r == -1)
+                                fprintf(stderr, "%% Error: %s\n",
+                                        rd_kafka_err2str(rd_kafka_last_error()));
+
+			print_stats(rk, mode, otype, compression);
+
+			/* Poll to handle stats callbacks */
+			rd_kafka_poll(rk, 0);
+		}
+		cnt.t_end = rd_clock();
+
+		/* Stop consuming */
+		for (i=0 ; i<(size_t)partition_cnt ; ++i) {
+			int r = rd_kafka_consume_stop(rkt, (int32_t)i);
+			if (r == -1) {
+                                fprintf(stderr,
+                                        "%% Error in consume_stop: %s\n",
+                                        rd_kafka_err2str(rd_kafka_last_error()));
+			}
+		}
+		rd_kafka_queue_destroy(rkqu);
+
+		/* Destroy topic */
+		rd_kafka_topic_destroy(rkt);
+
+		if (batch_size)
+			free(rkmessages);
+
+		/* Destroy the handle */
+		rd_kafka_destroy(rk);
+
+                global_rk = rk = NULL;
+
+	} else if (mode == 'G') {
+		/*
+		 * High-level balanced Consumer
+		 */
+		rd_kafka_resp_err_t err;
+
+		rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
+		rd_kafka_conf_set_default_topic_conf(conf, topic_conf);
+
+		/* Create Kafka handle */
+		if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
+					errstr, sizeof(errstr)))) {
+			fprintf(stderr,
+				"%% Failed to create Kafka consumer: %s\n",
+				errstr);
+			exit(1);
+		}
+
+		/* Forward all events to consumer queue */
+		rd_kafka_poll_set_consumer(rk);
+
+                global_rk = rk;
+
+		/* Add broker(s) */
+		if (brokers && rd_kafka_brokers_add(rk, brokers) < 1) {
+			fprintf(stderr, "%% No valid brokers specified\n");
+			exit(1);
+		}
+
+		err = rd_kafka_subscribe(rk, topics);
+		if (err) {
+			fprintf(stderr, "%% Subscribe failed: %s\n",
+				rd_kafka_err2str(err));
+			exit(1);
+		}
+		fprintf(stderr, "%% Waiting for group rebalance..\n");
+
+		while (run && (msgcnt == -1 || msgcnt > (int)cnt.msgs)) {
+			/* Consume messages.
+			 * A message may either be a real message, or
+			 * an event (if rkmessage->err is set).
+			 */
+			rd_kafka_message_t *rkmessage;
+			uint64_t fetch_latency;
+
+			fetch_latency = rd_clock();
+
+			rkmessage = rd_kafka_consumer_poll(rk, 1000);
+			if (rkmessage) {
+				msg_consume(rkmessage, NULL);
+				rd_kafka_message_destroy(rkmessage);
+			}
+
+			cnt.t_fetch_latency += rd_clock() - fetch_latency;
+
+			print_stats(rk, mode, otype, compression);
+		}
+		cnt.t_end = rd_clock();
+
+		err = rd_kafka_consumer_close(rk);
+		if (err)
+			fprintf(stderr, "%% Failed to close consumer: %s\n",
+				rd_kafka_err2str(err));
+
+		rd_kafka_destroy(rk);
+	}
+
+        if (hdrs)
+                rd_kafka_headers_destroy(hdrs);
+
+	print_stats(NULL, mode, otype|_OTYPE_FORCE, compression);
+
+	if (cnt.t_fetch_latency && cnt.msgs)
+		printf("%% Average application fetch latency: %"PRIu64"us\n",
+		       cnt.t_fetch_latency / cnt.msgs);
+
+	if (latency_fp)
+		fclose(latency_fp);
+
+        if (stats_fp) {
+#ifndef _MSC_VER
+                pclose(stats_fp);
+#endif
+                stats_fp = NULL;
+        }
+
+        if (partitions)
+                free(partitions);
+
+	rd_kafka_topic_partition_list_destroy(topics);
+
+	/* Let background threads clean up and terminate cleanly. */
+	rd_kafka_wait_destroyed(2000);
+
+	return exitcode;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/examples/rdkafka_simple_producer.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/examples/rdkafka_simple_producer.c b/thirdparty/librdkafka-0.11.4/examples/rdkafka_simple_producer.c
new file mode 100644
index 0000000..a353d01
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/examples/rdkafka_simple_producer.c
@@ -0,0 +1,260 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2017, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Simple Apache Kafka producer
+ * using the Kafka driver from librdkafka
+ * (https://github.com/edenhill/librdkafka)
+ */
+
+#include <stdio.h>
+#include <signal.h>
+#include <string.h>
+
+
+/* Typical include path would be <librdkafka/rdkafka.h>, but this program
+ * is builtin from within the librdkafka source tree and thus differs. */
+#include "rdkafka.h"
+
+
+static int run = 1;
+
+/**
+ * @brief Signal termination of program
+ */
+static void stop (int sig) {
+        run = 0;
+        fclose(stdin); /* abort fgets() */
+}
+
+
+/**
+ * @brief Message delivery report callback.
+ *
+ * This callback is called exactly once per message, indicating if
+ * the message was succesfully delivered
+ * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently
+ * failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR).
+ *
+ * The callback is triggered from rd_kafka_poll() and executes on
+ * the application's thread.
+ */
+static void dr_msg_cb (rd_kafka_t *rk,
+                       const rd_kafka_message_t *rkmessage, void *opaque) {
+        if (rkmessage->err)
+                fprintf(stderr, "%% Message delivery failed: %s\n",
+                        rd_kafka_err2str(rkmessage->err));
+        else
+                fprintf(stderr,
+                        "%% Message delivered (%zd bytes, "
+                        "partition %"PRId32")\n",
+                        rkmessage->len, rkmessage->partition);
+
+        /* The rkmessage is destroyed automatically by librdkafka */
+}
+
+
+
+int main (int argc, char **argv) {
+        rd_kafka_t *rk;         /* Producer instance handle */
+        rd_kafka_topic_t *rkt;  /* Topic object */
+        rd_kafka_conf_t *conf;  /* Temporary configuration object */
+        char errstr[512];       /* librdkafka API error reporting buffer */
+        char buf[512];          /* Message value temporary buffer */
+        const char *brokers;    /* Argument: broker list */
+        const char *topic;      /* Argument: topic to produce to */
+
+        /*
+         * Argument validation
+         */
+        if (argc != 3) {
+                fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);
+                return 1;
+        }
+
+        brokers = argv[1];
+        topic   = argv[2];
+
+
+        /*
+         * Create Kafka client configuration place-holder
+         */
+        conf = rd_kafka_conf_new();
+
+        /* Set bootstrap broker(s) as a comma-separated list of
+         * host or host:port (default port 9092).
+         * librdkafka will use the bootstrap brokers to acquire the full
+         * set of brokers from the cluster. */
+        if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
+                              errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+                fprintf(stderr, "%s\n", errstr);
+                return 1;
+        }
+
+        /* Set the delivery report callback.
+         * This callback will be called once per message to inform
+         * the application if delivery succeeded or failed.
+         * See dr_msg_cb() above. */
+        rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
+
+
+        /*
+         * Create producer instance.
+         *
+         * NOTE: rd_kafka_new() takes ownership of the conf object
+         *       and the application must not reference it again after
+         *       this call.
+         */
+        rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
+        if (!rk) {
+                fprintf(stderr,
+                        "%% Failed to create new producer: %s\n", errstr);
+                return 1;
+        }
+
+
+        /* Create topic object that will be reused for each message
+         * produced.
+         *
+         * Both the producer instance (rd_kafka_t) and topic objects (topic_t)
+         * are long-lived objects that should be reused as much as possible.
+         */
+        rkt = rd_kafka_topic_new(rk, topic, NULL);
+        if (!rkt) {
+                fprintf(stderr, "%% Failed to create topic object: %s\n",
+                        rd_kafka_err2str(rd_kafka_last_error()));
+                rd_kafka_destroy(rk);
+                return 1;
+        }
+
+        /* Signal handler for clean shutdown */
+        signal(SIGINT, stop);
+
+        fprintf(stderr,
+                "%% Type some text and hit enter to produce message\n"
+                "%% Or just hit enter to only serve delivery reports\n"
+                "%% Press Ctrl-C or Ctrl-D to exit\n");
+
+        while (run && fgets(buf, sizeof(buf), stdin)) {
+                size_t len = strlen(buf);
+
+                if (buf[len-1] == '\n') /* Remove newline */
+                        buf[--len] = '\0';
+
+                if (len == 0) {
+                        /* Empty line: only serve delivery reports */
+                        rd_kafka_poll(rk, 0/*non-blocking */);
+                        continue;
+                }
+
+                /*
+                 * Send/Produce message.
+                 * This is an asynchronous call, on success it will only
+                 * enqueue the message on the internal producer queue.
+                 * The actual delivery attempts to the broker are handled
+                 * by background threads.
+                 * The previously registered delivery report callback
+                 * (dr_msg_cb) is used to signal back to the application
+                 * when the message has been delivered (or failed).
+                 */
+        retry:
+                if (rd_kafka_produce(
+                            /* Topic object */
+                            rkt,
+                            /* Use builtin partitioner to select partition*/
+                            RD_KAFKA_PARTITION_UA,
+                            /* Make a copy of the payload. */
+                            RD_KAFKA_MSG_F_COPY,
+                            /* Message payload (value) and length */
+                            buf, len,
+                            /* Optional key and its length */
+                            NULL, 0,
+                            /* Message opaque, provided in
+                             * delivery report callback as
+                             * msg_opaque. */
+                            NULL) == -1) {
+                        /**
+                         * Failed to *enqueue* message for producing.
+                         */
+                        fprintf(stderr,
+                                "%% Failed to produce to topic %s: %s\n",
+                                rd_kafka_topic_name(rkt),
+                                rd_kafka_err2str(rd_kafka_last_error()));
+
+                        /* Poll to handle delivery reports */
+                        if (rd_kafka_last_error() ==
+                            RD_KAFKA_RESP_ERR__QUEUE_FULL) {
+                                /* If the internal queue is full, wait for
+                                 * messages to be delivered and then retry.
+                                 * The internal queue represents both
+                                 * messages to be sent and messages that have
+                                 * been sent or failed, awaiting their
+                                 * delivery report callback to be called.
+                                 *
+                                 * The internal queue is limited by the
+                                 * configuration property
+                                 * queue.buffering.max.messages */
+                                rd_kafka_poll(rk, 1000/*block for max 1000ms*/);
+                                goto retry;
+                        }
+                } else {
+                        fprintf(stderr, "%% Enqueued message (%zd bytes) "
+                                "for topic %s\n",
+                                len, rd_kafka_topic_name(rkt));
+                }
+
+
+                /* A producer application should continually serve
+                 * the delivery report queue by calling rd_kafka_poll()
+                 * at frequent intervals.
+                 * Either put the poll call in your main loop, or in a
+                 * dedicated thread, or call it after every
+                 * rd_kafka_produce() call.
+                 * Just make sure that rd_kafka_poll() is still called
+                 * during periods where you are not producing any messages
+                 * to make sure previously produced messages have their
+                 * delivery report callback served (and any other callbacks
+                 * you register). */
+                rd_kafka_poll(rk, 0/*non-blocking*/);
+        }
+
+
+        /* Wait for final messages to be delivered or fail.
+         * rd_kafka_flush() is an abstraction over rd_kafka_poll() which
+         * waits for all messages to be delivered. */
+        fprintf(stderr, "%% Flushing final messages..\n");
+        rd_kafka_flush(rk, 10*1000 /* wait for max 10 seconds */);
+
+        /* Destroy topic object */
+        rd_kafka_topic_destroy(rkt);
+
+        /* Destroy the producer instance */
+        rd_kafka_destroy(rk);
+
+        return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/examples/rdkafka_zookeeper_example.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/examples/rdkafka_zookeeper_example.c b/thirdparty/librdkafka-0.11.4/examples/rdkafka_zookeeper_example.c
new file mode 100644
index 0000000..2f9a61e
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/examples/rdkafka_zookeeper_example.c
@@ -0,0 +1,728 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012, Magnus Edenhill
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met: 
+ * 
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer. 
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution. 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Apache Kafka consumer & producer example programs
+ * using the Kafka driver from librdkafka
+ * (https://github.com/edenhill/librdkafka)
+ */
+
+#include <ctype.h>
+#include <signal.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <syslog.h>
+#include <sys/time.h>
+#include <errno.h>
+
+/* Typical include path would be <librdkafka/rdkafka.h>, but this program
+ * is builtin from within the librdkafka source tree and thus differs. */
+#include "rdkafka.h"  /* for Kafka driver */
+
+#include <zookeeper.h>
+#include <zookeeper.jute.h>
+#include <jansson.h>
+
+#define BROKER_PATH "/brokers/ids"
+
+static int run = 1;
+static rd_kafka_t *rk;
+static int exit_eof = 0;
+static int quiet = 0;
+static 	enum {
+	OUTPUT_HEXDUMP,
+	OUTPUT_RAW,
+} output = OUTPUT_HEXDUMP;
+
+static void stop (int sig) {
+	run = 0;
+	fclose(stdin); /* abort fgets() */
+}
+
+
+static void hexdump (FILE *fp, const char *name, const void *ptr, size_t len) {
+	const char *p = (const char *)ptr;
+	int of = 0;
+
+
+	if (name)
+		fprintf(fp, "%s hexdump (%zd bytes):\n", name, len);
+
+	for (of = 0 ; of < len ; of += 16) {
+		char hexen[16*3+1];
+		char charen[16+1];
+		int hof = 0;
+
+		int cof = 0;
+		int i;
+
+		for (i = of ; i < of + 16 && i < len ; i++) {
+			hof += sprintf(hexen+hof, "%02x ", p[i] & 0xff);
+			cof += sprintf(charen+cof, "%c",
+				       isprint((int)p[i]) ? p[i] : '.');
+		}
+		fprintf(fp, "%08x: %-48s %-16s\n",
+			of, hexen, charen);
+	}
+}
+
+/**
+ * Kafka logger callback (optional)
+ */
+static void logger (const rd_kafka_t *rk, int level,
+		    const char *fac, const char *buf) {
+	struct timeval tv;
+	gettimeofday(&tv, NULL);
+	fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
+		(int)tv.tv_sec, (int)(tv.tv_usec / 1000),
+		level, fac, rd_kafka_name(rk), buf);
+}
+
+/**
+ * Message delivery report callback.
+ * Called once for each message.
+ * See rdkafka.h for more information.
+ */
+static void msg_delivered (rd_kafka_t *rk,
+			   void *payload, size_t len,
+			   int error_code,
+			   void *opaque, void *msg_opaque) {
+
+	if (error_code)
+		fprintf(stderr, "%% Message delivery failed: %s\n",
+			rd_kafka_err2str(error_code));
+	else if (!quiet)
+		fprintf(stderr, "%% Message delivered (%zd bytes)\n", len);
+}
+
+
+static void msg_consume (rd_kafka_message_t *rkmessage,
+			 void *opaque) {
+	if (rkmessage->err) {
+		if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
+			fprintf(stderr,
+				"%% Consumer reached end of %s [%"PRId32"] "
+			       "message queue at offset %"PRId64"\n",
+			       rd_kafka_topic_name(rkmessage->rkt),
+			       rkmessage->partition, rkmessage->offset);
+
+			if (exit_eof)
+				run = 0;
+
+			return;
+		}
+
+		fprintf(stderr, "%% Consume error for topic \"%s\" [%"PRId32"] "
+		       "offset %"PRId64": %s\n",
+		       rd_kafka_topic_name(rkmessage->rkt),
+		       rkmessage->partition,
+		       rkmessage->offset,
+		       rd_kafka_message_errstr(rkmessage));
+		return;
+	}
+
+	if (!quiet)
+		fprintf(stdout, "%% Message (offset %"PRId64", %zd bytes):\n",
+			rkmessage->offset, rkmessage->len);
+
+	if (rkmessage->key_len) {
+		if (output == OUTPUT_HEXDUMP)
+			hexdump(stdout, "Message Key",
+				rkmessage->key, rkmessage->key_len);
+		else
+			printf("Key: %.*s\n",
+			       (int)rkmessage->key_len, (char *)rkmessage->key);
+	}
+
+	if (output == OUTPUT_HEXDUMP)
+		hexdump(stdout, "Message Payload",
+			rkmessage->payload, rkmessage->len);
+	else
+		printf("%.*s\n",
+		       (int)rkmessage->len, (char *)rkmessage->payload);
+}
+
+
+static void metadata_print (const char *topic,
+                            const struct rd_kafka_metadata *metadata) {
+        int i, j, k;
+
+        printf("Metadata for %s (from broker %"PRId32": %s):\n",
+               topic ? : "all topics",
+               metadata->orig_broker_id,
+               metadata->orig_broker_name);
+
+
+        /* Iterate brokers */
+        printf(" %i brokers:\n", metadata->broker_cnt);
+        for (i = 0 ; i < metadata->broker_cnt ; i++)
+                printf("  broker %"PRId32" at %s:%i\n",
+                       metadata->brokers[i].id,
+                       metadata->brokers[i].host,
+                       metadata->brokers[i].port);
+
+        /* Iterate topics */
+        printf(" %i topics:\n", metadata->topic_cnt);
+        for (i = 0 ; i < metadata->topic_cnt ; i++) {
+                const struct rd_kafka_metadata_topic *t = &metadata->topics[i];
+                printf("  topic \"%s\" with %i partitions:",
+                       t->topic,
+                       t->partition_cnt);
+                if (t->err) {
+                        printf(" %s", rd_kafka_err2str(t->err));
+                        if (t->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE)
+                                printf(" (try again)");
+                }
+                printf("\n");
+
+                /* Iterate topic's partitions */
+                for (j = 0 ; j < t->partition_cnt ; j++) {
+                        const struct rd_kafka_metadata_partition *p;
+                        p = &t->partitions[j];
+                        printf("    partition %"PRId32", "
+                               "leader %"PRId32", replicas: ",
+                               p->id, p->leader);
+
+                        /* Iterate partition's replicas */
+                        for (k = 0 ; k < p->replica_cnt ; k++)
+                                printf("%s%"PRId32,
+                                       k > 0 ? ",":"", p->replicas[k]);
+
+                        /* Iterate partition's ISRs */
+                        printf(", isrs: ");
+                        for (k = 0 ; k < p->isr_cnt ; k++)
+                                printf("%s%"PRId32,
+                                       k > 0 ? ",":"", p->isrs[k]);
+                        if (p->err)
+                                printf(", %s\n", rd_kafka_err2str(p->err));
+                        else
+                                printf("\n");
+                }
+        }
+}
+
+
+static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers)
+{
+	if (zzh)
+	{
+		struct String_vector brokerlist;
+		if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK)
+		{
+			fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH);
+			return;
+		}
+
+		int i;
+		char *brokerptr = brokers;
+		for (i = 0; i < brokerlist.count; i++)
+		{
+			char path[255], cfg[1024];
+			sprintf(path, "/brokers/ids/%s", brokerlist.data[i]);
+			int len = sizeof(cfg);
+			zoo_get(zzh, path, 0, cfg, &len, NULL);
+
+			if (len > 0)
+			{
+				cfg[len] = '\0';
+				json_error_t jerror;
+				json_t *jobj = json_loads(cfg, 0, &jerror);
+				if (jobj)
+				{
+					json_t *jhost = json_object_get(jobj, "host");
+					json_t *jport = json_object_get(jobj, "port");
+
+					if (jhost && jport)
+					{
+						const char *host = json_string_value(jhost);
+						const int   port = json_integer_value(jport);
+						sprintf(brokerptr, "%s:%d", host, port);
+
+						brokerptr += strlen(brokerptr);
+						if (i < brokerlist.count - 1)
+						{
+							*brokerptr++ = ',';
+						}
+					}
+					json_decref(jobj);
+				}
+			}
+		}
+		deallocate_String_vector(&brokerlist);
+		printf("Found brokers %s\n", brokers);
+	}
+}
+
+
+static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx)
+{
+	char brokers[1024];
+	if (type == ZOO_CHILD_EVENT && strncmp(path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0)
+	{
+		brokers[0] = '\0';
+		set_brokerlist_from_zookeeper(zh, brokers);
+		if (brokers[0] != '\0' && rk != NULL)
+		{
+			rd_kafka_brokers_add(rk, brokers);
+			rd_kafka_poll(rk, 10);
+		}
+	}
+}
+
+
+static zhandle_t* initialize_zookeeper(const char * zookeeper, const int debug)
+{
+	zhandle_t *zh;
+	if (debug)
+	{
+		zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
+	}
+	zh = zookeeper_init(zookeeper, watcher, 10000, 0, 0, 0);
+	if (zh == NULL)
+	{
+		fprintf(stderr, "Zookeeper connection not established.");
+		exit(1);
+	}
+	return zh;
+}
+
+
+static void sig_usr1 (int sig) {
+	rd_kafka_dump(stdout, rk);
+}
+
+int main (int argc, char **argv) {
+	rd_kafka_topic_t *rkt;
+	char *zookeeper = "localhost:2181";
+	zhandle_t *zh = NULL;
+	char brokers[1024];
+	char mode = 'C';
+	char *topic = NULL;
+	int partition = RD_KAFKA_PARTITION_UA;
+	int opt;
+	rd_kafka_conf_t *conf;
+	rd_kafka_topic_conf_t *topic_conf;
+	char errstr[512];
+	const char *debug = NULL;
+	int64_t start_offset = 0;
+	int do_conf_dump = 0;
+
+	memset(brokers, 0, sizeof(brokers));
+	quiet = !isatty(STDIN_FILENO);
+
+	/* Kafka configuration */
+	conf = rd_kafka_conf_new();
+
+	/* Topic configuration */
+	topic_conf = rd_kafka_topic_conf_new();
+
+	while ((opt = getopt(argc, argv, "PCLt:p:k:z:qd:o:eX:A")) != -1) {
+		switch (opt) {
+		case 'P':
+		case 'C':
+                case 'L':
+			mode = opt;
+			break;
+		case 't':
+			topic = optarg;
+			break;
+		case 'p':
+			partition = atoi(optarg);
+			break;
+		case 'k':
+			zookeeper = optarg;
+			break;
+		case 'z':
+			if (rd_kafka_conf_set(conf, "compression.codec",
+					      optarg,
+					      errstr, sizeof(errstr)) !=
+			    RD_KAFKA_CONF_OK) {
+				fprintf(stderr, "%% %s\n", errstr);
+				exit(1);
+			}
+			break;
+		case 'o':
+			if (!strcmp(optarg, "end"))
+				start_offset = RD_KAFKA_OFFSET_END;
+			else if (!strcmp(optarg, "beginning"))
+				start_offset = RD_KAFKA_OFFSET_BEGINNING;
+			else if (!strcmp(optarg, "stored"))
+				start_offset = RD_KAFKA_OFFSET_STORED;
+			else
+				start_offset = strtoll(optarg, NULL, 10);
+			break;
+		case 'e':
+			exit_eof = 1;
+			break;
+		case 'd':
+			debug = optarg;
+			break;
+		case 'q':
+			quiet = 1;
+			break;
+		case 'A':
+			output = OUTPUT_RAW;
+			break;
+		case 'X':
+		{
+			char *name, *val;
+			rd_kafka_conf_res_t res;
+
+			if (!strcmp(optarg, "list") ||
+			    !strcmp(optarg, "help")) {
+				rd_kafka_conf_properties_show(stdout);
+				exit(0);
+			}
+
+			if (!strcmp(optarg, "dump")) {
+				do_conf_dump = 1;
+				continue;
+			}
+
+			name = optarg;
+			if (!(val = strchr(name, '='))) {
+				fprintf(stderr, "%% Expected "
+					"-X property=value, not %s\n", name);
+				exit(1);
+			}
+
+			*val = '\0';
+			val++;
+
+			res = RD_KAFKA_CONF_UNKNOWN;
+			/* Try "topic." prefixed properties on topic
+			 * conf first, and then fall through to global if
+			 * it didnt match a topic configuration property. */
+			if (!strncmp(name, "topic.", strlen("topic.")))
+				res = rd_kafka_topic_conf_set(topic_conf,
+							      name+
+							      strlen("topic."),
+							      val,
+							      errstr,
+							      sizeof(errstr));
+
+			if (res == RD_KAFKA_CONF_UNKNOWN)
+				res = rd_kafka_conf_set(conf, name, val,
+							errstr, sizeof(errstr));
+
+			if (res != RD_KAFKA_CONF_OK) {
+				fprintf(stderr, "%% %s\n", errstr);
+				exit(1);
+			}
+		}
+		break;
+
+		default:
+			goto usage;
+		}
+	}
+
+
+	if (do_conf_dump) {
+		const char **arr;
+		size_t cnt;
+		int pass;
+
+		for (pass = 0 ; pass < 2 ; pass++) {
+			int i;
+
+			if (pass == 0) {
+				arr = rd_kafka_conf_dump(conf, &cnt);
+				printf("# Global config\n");
+			} else {
+				printf("# Topic config\n");
+				arr = rd_kafka_topic_conf_dump(topic_conf,
+							       &cnt);
+			}
+
+			for (i = 0 ; i < cnt ; i += 2)
+				printf("%s = %s\n",
+				       arr[i], arr[i+1]);
+
+			printf("\n");
+
+			rd_kafka_conf_dump_free(arr, cnt);
+		}
+
+		exit(0);
+	}
+
+
+	if (optind != argc || (mode != 'L' && !topic)) {
+	usage:
+		fprintf(stderr,
+			"Usage: %s -C|-P|-L -t <topic> "
+			"[-p <partition>] [-b <host1:port1,host2:port2,..>]\n"
+			"\n"
+			"librdkafka version %s (0x%08x)\n"
+			"\n"
+			" Options:\n"
+			"  -C | -P         Consumer or Producer mode\n"
+                        "  -L              Metadata list mode\n"
+			"  -t <topic>      Topic to fetch / produce\n"
+			"  -p <num>        Partition (random partitioner)\n"
+			"  -k <zookeepers> Zookeeper address (localhost:2181)\n"
+			"  -z <codec>      Enable compression:\n"
+			"                  none|gzip|snappy\n"
+			"  -o <offset>     Start offset (consumer)\n"
+			"  -e              Exit consumer when last message\n"
+			"                  in partition has been received.\n"
+			"  -d [facs..]     Enable debugging contexts:\n"
+			"  -q              Be quiet\n"
+			"  -A              Raw payload output (consumer)\n"
+			"                  %s\n"
+			"  -X <prop=name> Set arbitrary librdkafka "
+			"configuration property\n"
+			"               Properties prefixed with \"topic.\" "
+			"will be set on topic object.\n"
+			"               Use '-X list' to see the full list\n"
+			"               of supported properties.\n"
+			"\n"
+			" In Consumer mode:\n"
+			"  writes fetched messages to stdout\n"
+			" In Producer mode:\n"
+			"  reads messages from stdin and sends to broker\n"
+                        " In List mode:\n"
+                        "  queries broker for metadata information, "
+                        "topic is optional.\n"
+			"\n"
+			"\n"
+			"\n",
+			argv[0],
+			rd_kafka_version_str(), rd_kafka_version(),
+			RD_KAFKA_DEBUG_CONTEXTS);
+		exit(1);
+	}
+
+
+	signal(SIGINT, stop);
+	signal(SIGUSR1, sig_usr1);
+
+	if (debug &&
+	    rd_kafka_conf_set(conf, "debug", debug, errstr, sizeof(errstr)) !=
+	    RD_KAFKA_CONF_OK) {
+		fprintf(stderr, "%% Debug configuration failed: %s: %s\n",
+			errstr, debug);
+		exit(1);
+	}
+
+        /* Set logger */
+        rd_kafka_conf_set_log_cb(conf, logger);
+
+	/** Initialize zookeeper */
+	zh = initialize_zookeeper(zookeeper, debug != NULL);
+
+	/* Add brokers */
+	set_brokerlist_from_zookeeper(zh, brokers);
+        if (rd_kafka_conf_set(conf, "metadata.broker.list",
+                              brokers, errstr, sizeof(errstr) !=
+                              RD_KAFKA_CONF_OK)) {
+                fprintf(stderr, "%% Failed to set brokers: %s\n", errstr);
+                exit(1);
+        }
+
+	if (debug) {
+		printf("Broker list from zookeeper cluster %s: %s\n", zookeeper, brokers);
+	}
+
+	if (mode == 'P') {
+		/*
+		 * Producer
+		 */
+		char buf[2048];
+		int sendcnt = 0;
+
+		/* Set up a message delivery report callback.
+		 * It will be called once for each message, either on successful
+		 * delivery to broker, or upon failure to deliver to broker. */
+		rd_kafka_conf_set_dr_cb(conf, msg_delivered);
+
+		/* Create Kafka handle */
+		if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
+					errstr, sizeof(errstr)))) {
+			fprintf(stderr,
+				"%% Failed to create new producer: %s\n",
+				errstr);
+			exit(1);
+		}
+
+		/* Create topic */
+		rkt = rd_kafka_topic_new(rk, topic, topic_conf);
+
+		if (!quiet)
+			fprintf(stderr,
+				"%% Type stuff and hit enter to send\n");
+
+		while (run && fgets(buf, sizeof(buf), stdin)) {
+			size_t len = strlen(buf);
+			if (buf[len-1] == '\n')
+				buf[--len] = '\0';
+
+			/* Send/Produce message. */
+			if (rd_kafka_produce(rkt, partition,
+					     RD_KAFKA_MSG_F_COPY,
+					     /* Payload and length */
+					     buf, len,
+					     /* Optional key and its length */
+					     NULL, 0,
+					     /* Message opaque, provided in
+					      * delivery report callback as
+					      * msg_opaque. */
+					     NULL) == -1) {
+				fprintf(stderr,
+					"%% Failed to produce to topic %s "
+					"partition %i: %s\n",
+					rd_kafka_topic_name(rkt), partition,
+					rd_kafka_err2str(
+						rd_kafka_errno2err(errno)));
+				/* Poll to handle delivery reports */
+				rd_kafka_poll(rk, 0);
+				continue;
+			}
+
+			if (!quiet)
+				fprintf(stderr, "%% Sent %zd bytes to topic "
+					"%s partition %i\n",
+				len, rd_kafka_topic_name(rkt), partition);
+			sendcnt++;
+			/* Poll to handle delivery reports */
+			rd_kafka_poll(rk, 0);
+		}
+
+		/* Poll to handle delivery reports */
+		rd_kafka_poll(rk, 0);
+
+		/* Wait for messages to be delivered */
+		while (run && rd_kafka_outq_len(rk) > 0)
+			rd_kafka_poll(rk, 100);
+
+		/* Destroy the handle */
+		rd_kafka_destroy(rk);
+
+	} else if (mode == 'C') {
+		/*
+		 * Consumer
+		 */
+
+		/* Create Kafka handle */
+		if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
+					errstr, sizeof(errstr)))) {
+			fprintf(stderr,
+				"%% Failed to create new consumer: %s\n",
+				errstr);
+			exit(1);
+		}
+
+		/* Create topic */
+		rkt = rd_kafka_topic_new(rk, topic, topic_conf);
+
+		/* Start consuming */
+		if (rd_kafka_consume_start(rkt, partition, start_offset) == -1){
+			fprintf(stderr, "%% Failed to start consuming: %s\n",
+				rd_kafka_err2str(rd_kafka_errno2err(errno)));
+			exit(1);
+		}
+
+		while (run) {
+			rd_kafka_message_t *rkmessage;
+
+			/* Consume single message.
+			 * See rdkafka_performance.c for high speed
+			 * consuming of messages. */
+			rkmessage = rd_kafka_consume(rkt, partition, 1000);
+			if (!rkmessage) /* timeout */
+				continue;
+
+			msg_consume(rkmessage, NULL);
+
+			/* Return message to rdkafka */
+			rd_kafka_message_destroy(rkmessage);
+		}
+
+		/* Stop consuming */
+		rd_kafka_consume_stop(rkt, partition);
+
+		rd_kafka_topic_destroy(rkt);
+
+		rd_kafka_destroy(rk);
+
+	} else if (mode == 'L') {
+		rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
+
+		/* Create Kafka handle */
+		if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
+					errstr, sizeof(errstr)))) {
+			fprintf(stderr,
+				"%% Failed to create new producer: %s\n",
+				errstr);
+			exit(1);
+		}
+
+		/* Create topic */
+		if (topic)
+			rkt = rd_kafka_topic_new(rk, topic, topic_conf);
+		else
+			rkt = NULL;
+
+		while (run) {
+				const struct rd_kafka_metadata *metadata;
+
+				/* Fetch metadata */
+				err = rd_kafka_metadata(rk, rkt ? 0 : 1, rkt,
+										&metadata, 5000);
+				if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
+						fprintf(stderr,
+								"%% Failed to acquire metadata: %s\n",
+								rd_kafka_err2str(err));
+						run = 0;
+						break;
+				}
+
+				metadata_print(topic, metadata);
+
+				rd_kafka_metadata_destroy(metadata);
+				run = 0;
+		}
+
+		/* Destroy the handle */
+		rd_kafka_destroy(rk);
+
+		/* Exit right away, dont wait for background cleanup, we haven't
+		 * done anything important anyway. */
+		exit(err ? 2 : 0);
+	}
+
+	/* Let background threads clean up and terminate cleanly. */
+	rd_kafka_wait_destroyed(2000);
+
+	/** Free the zookeeper data. */
+	zookeeper_close(zh);
+
+	return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/lds-gen.py
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/lds-gen.py b/thirdparty/librdkafka-0.11.4/lds-gen.py
new file mode 100755
index 0000000..1136580
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/lds-gen.py
@@ -0,0 +1,38 @@
+#!/usr/bin/env python
+#
+#
+# Generate linker script to only expose symbols of the public API
+#
+
+import sys
+import re
+
+
+if __name__ == '__main__':
+
+    funcs = list()
+    last_line = ''
+
+    for line in sys.stdin:
+        m = re.match(r'^(\S+.*\s+\**)?(rd_kafka_\S+)\s*\(', line)
+        if m:
+            sym = m.group(2)
+            # Ignore static (unused) functions
+            m2 = re.match(r'(RD_UNUSED|__attribute__\(\(unused\)\))', last_line)
+            if not m2:
+                funcs.append(sym)
+            last_line = ''
+        else:
+            last_line = line
+
+    print('# Automatically generated by lds-gen.py - DO NOT EDIT')
+    print('{\n global:')
+    if len(funcs) == 0:
+        print('    *;')
+    else:
+        for f in sorted(funcs):
+            print('    %s;' % f)
+
+        print('local:\n    *;')
+
+    print('};')

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/mainpage.doxy
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/mainpage.doxy b/thirdparty/librdkafka-0.11.4/mainpage.doxy
new file mode 100644
index 0000000..97f2456
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/mainpage.doxy
@@ -0,0 +1,35 @@
+/**
+ * @mainpage librdkafka documentation
+ *
+ * librdkafka is the Apache Kafka C/C++ client library.
+ *
+ * @section intro Introduction
+ *
+ * For an introduction and manual to librdkafka see INTRODUCTION.md
+ *
+ * @section conf Configuration
+ *
+ * librdkafka is highly configurable to meet any deployment demands.
+ * It is usually safe to leave most configuration properties to their default
+ * values.
+ *
+ * See CONFIGURATION.md for the full list of supported configuration properties.
+ *
+ * @remark Application developers are recommended to provide a non-hardcoded
+ *         interface to librdkafka's string based name-value configuration
+ *         property interface, allowing users to configure any librdkafka
+ *         property directly without alterations to the application.
+ *         This allows for seamless upgrades where linking to a new version
+ *         of librdkafka automatically provides new configuration
+ *         based features.
+
+ *
+ * @section c_api C API
+ *
+ * The C API is documented in rdkafka.h
+ *
+ * @section cpp_api C++ API
+ *
+ * The C++ API is documented in rdkafkacpp.h
+ */
+