You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ph...@apache.org on 2018/06/06 14:14:35 UTC

[07/51] [partial] nifi-minifi-cpp git commit: MINIFICPP-512 - upgrade to librdkafka 0.11.4

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/examples/rdkafka_example.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/examples/rdkafka_example.c b/thirdparty/librdkafka-0.11.4/examples/rdkafka_example.c
new file mode 100644
index 0000000..77c345e
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/examples/rdkafka_example.c
@@ -0,0 +1,885 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012, Magnus Edenhill
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met: 
+ * 
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer. 
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution. 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Apache Kafka consumer & producer example programs
+ * using the Kafka driver from librdkafka
+ * (https://github.com/edenhill/librdkafka)
+ */
+
+#include <ctype.h>
+#include <signal.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <syslog.h>
+#include <time.h>
+#include <sys/time.h>
+#include <getopt.h>
+
+/* Typical include path would be <librdkafka/rdkafka.h>, but this program
+ * is builtin from within the librdkafka source tree and thus differs. */
+#include "rdkafka.h"  /* for Kafka driver */
+
+
+static int run = 1;
+static rd_kafka_t *rk;
+static int exit_eof = 0;
+static int quiet = 0;
+static 	enum {
+	OUTPUT_HEXDUMP,
+	OUTPUT_RAW,
+} output = OUTPUT_HEXDUMP;
+
+static void stop (int sig) {
+	run = 0;
+	fclose(stdin); /* abort fgets() */
+}
+
+
+static void hexdump (FILE *fp, const char *name, const void *ptr, size_t len) {
+	const char *p = (const char *)ptr;
+	size_t of = 0;
+
+
+	if (name)
+		fprintf(fp, "%s hexdump (%zd bytes):\n", name, len);
+
+	for (of = 0 ; of < len ; of += 16) {
+		char hexen[16*3+1];
+		char charen[16+1];
+		int hof = 0;
+
+		int cof = 0;
+		int i;
+
+		for (i = of ; i < (int)of + 16 && i < (int)len ; i++) {
+			hof += sprintf(hexen+hof, "%02x ", p[i] & 0xff);
+			cof += sprintf(charen+cof, "%c",
+				       isprint((int)p[i]) ? p[i] : '.');
+		}
+		fprintf(fp, "%08zx: %-48s %-16s\n",
+			of, hexen, charen);
+	}
+}
+
+/**
+ * Kafka logger callback (optional)
+ */
+static void logger (const rd_kafka_t *rk, int level,
+		    const char *fac, const char *buf) {
+	struct timeval tv;
+	gettimeofday(&tv, NULL);
+	fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
+		(int)tv.tv_sec, (int)(tv.tv_usec / 1000),
+		level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
+}
+
+/**
+ * Message delivery report callback.
+ * Called once for each message.
+ * See rdkafka.h for more information.
+ */
+static void msg_delivered (rd_kafka_t *rk,
+			   void *payload, size_t len,
+			   int error_code,
+			   void *opaque, void *msg_opaque) {
+
+	if (error_code)
+		fprintf(stderr, "%% Message delivery failed: %s\n",
+			rd_kafka_err2str(error_code));
+	else if (!quiet)
+		fprintf(stderr, "%% Message delivered (%zd bytes): %.*s\n", len,
+			(int)len, (const char *)payload);
+}
+
+/**
+ * Message delivery report callback using the richer rd_kafka_message_t object.
+ */
+static void msg_delivered2 (rd_kafka_t *rk,
+                            const rd_kafka_message_t *rkmessage, void *opaque) {
+	printf("del: %s: offset %"PRId64"\n",
+	       rd_kafka_err2str(rkmessage->err), rkmessage->offset);
+        if (rkmessage->err)
+		fprintf(stderr, "%% Message delivery failed: %s\n",
+                        rd_kafka_err2str(rkmessage->err));
+	else if (!quiet)
+		fprintf(stderr,
+                        "%% Message delivered (%zd bytes, offset %"PRId64", "
+                        "partition %"PRId32"): %.*s\n",
+                        rkmessage->len, rkmessage->offset,
+			rkmessage->partition,
+			(int)rkmessage->len, (const char *)rkmessage->payload);
+}
+
+
+static void msg_consume (rd_kafka_message_t *rkmessage,
+			 void *opaque) {
+	if (rkmessage->err) {
+		if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
+			fprintf(stderr,
+				"%% Consumer reached end of %s [%"PRId32"] "
+			       "message queue at offset %"PRId64"\n",
+			       rd_kafka_topic_name(rkmessage->rkt),
+			       rkmessage->partition, rkmessage->offset);
+
+			if (exit_eof)
+				run = 0;
+
+			return;
+		}
+
+		fprintf(stderr, "%% Consume error for topic \"%s\" [%"PRId32"] "
+		       "offset %"PRId64": %s\n",
+		       rd_kafka_topic_name(rkmessage->rkt),
+		       rkmessage->partition,
+		       rkmessage->offset,
+		       rd_kafka_message_errstr(rkmessage));
+
+                if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
+                    rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
+                        run = 0;
+		return;
+	}
+
+	if (!quiet) {
+		rd_kafka_timestamp_type_t tstype;
+		int64_t timestamp;
+                rd_kafka_headers_t *hdrs;
+
+		fprintf(stdout, "%% Message (offset %"PRId64", %zd bytes):\n",
+			rkmessage->offset, rkmessage->len);
+
+		timestamp = rd_kafka_message_timestamp(rkmessage, &tstype);
+		if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
+			const char *tsname = "?";
+			if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME)
+				tsname = "create time";
+			else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
+				tsname = "log append time";
+
+			fprintf(stdout, "%% Message timestamp: %s %"PRId64
+				" (%ds ago)\n",
+				tsname, timestamp,
+				!timestamp ? 0 :
+				(int)time(NULL) - (int)(timestamp/1000));
+		}
+
+                if (!rd_kafka_message_headers(rkmessage, &hdrs)) {
+                        size_t idx = 0;
+                        const char *name;
+                        const void *val;
+                        size_t size;
+
+                        fprintf(stdout, "%% Headers:");
+
+                        while (!rd_kafka_header_get_all(hdrs, idx++,
+                                                        &name, &val, &size)) {
+                                fprintf(stdout, "%s%s=",
+                                        idx == 1 ? " " : ", ", name);
+                                if (val)
+                                        fprintf(stdout, "\"%.*s\"",
+                                                (int)size, (const char *)val);
+                                else
+                                        fprintf(stdout, "NULL");
+                        }
+                        fprintf(stdout, "\n");
+                }
+	}
+
+	if (rkmessage->key_len) {
+		if (output == OUTPUT_HEXDUMP)
+			hexdump(stdout, "Message Key",
+				rkmessage->key, rkmessage->key_len);
+		else
+			printf("Key: %.*s\n",
+			       (int)rkmessage->key_len, (char *)rkmessage->key);
+	}
+
+	if (output == OUTPUT_HEXDUMP)
+		hexdump(stdout, "Message Payload",
+			rkmessage->payload, rkmessage->len);
+	else
+		printf("%.*s\n",
+		       (int)rkmessage->len, (char *)rkmessage->payload);
+}
+
+
+static void metadata_print (const char *topic,
+                            const struct rd_kafka_metadata *metadata) {
+        int i, j, k;
+
+        printf("Metadata for %s (from broker %"PRId32": %s):\n",
+               topic ? : "all topics",
+               metadata->orig_broker_id,
+               metadata->orig_broker_name);
+
+
+        /* Iterate brokers */
+        printf(" %i brokers:\n", metadata->broker_cnt);
+        for (i = 0 ; i < metadata->broker_cnt ; i++)
+                printf("  broker %"PRId32" at %s:%i\n",
+                       metadata->brokers[i].id,
+                       metadata->brokers[i].host,
+                       metadata->brokers[i].port);
+
+        /* Iterate topics */
+        printf(" %i topics:\n", metadata->topic_cnt);
+        for (i = 0 ; i < metadata->topic_cnt ; i++) {
+                const struct rd_kafka_metadata_topic *t = &metadata->topics[i];
+                printf("  topic \"%s\" with %i partitions:",
+                       t->topic,
+                       t->partition_cnt);
+                if (t->err) {
+                        printf(" %s", rd_kafka_err2str(t->err));
+                        if (t->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE)
+                                printf(" (try again)");
+                }
+                printf("\n");
+
+                /* Iterate topic's partitions */
+                for (j = 0 ; j < t->partition_cnt ; j++) {
+                        const struct rd_kafka_metadata_partition *p;
+                        p = &t->partitions[j];
+                        printf("    partition %"PRId32", "
+                               "leader %"PRId32", replicas: ",
+                               p->id, p->leader);
+
+                        /* Iterate partition's replicas */
+                        for (k = 0 ; k < p->replica_cnt ; k++)
+                                printf("%s%"PRId32,
+                                       k > 0 ? ",":"", p->replicas[k]);
+
+                        /* Iterate partition's ISRs */
+                        printf(", isrs: ");
+                        for (k = 0 ; k < p->isr_cnt ; k++)
+                                printf("%s%"PRId32,
+                                       k > 0 ? ",":"", p->isrs[k]);
+                        if (p->err)
+                                printf(", %s\n", rd_kafka_err2str(p->err));
+                        else
+                                printf("\n");
+                }
+        }
+}
+
+
+static void sig_usr1 (int sig) {
+	rd_kafka_dump(stdout, rk);
+}
+
+int main (int argc, char **argv) {
+	rd_kafka_topic_t *rkt;
+	char *brokers = "localhost:9092";
+	char mode = 'C';
+	char *topic = NULL;
+	int partition = RD_KAFKA_PARTITION_UA;
+	int opt;
+	rd_kafka_conf_t *conf;
+	rd_kafka_topic_conf_t *topic_conf;
+	char errstr[512];
+	int64_t start_offset = 0;
+        int report_offsets = 0;
+	int do_conf_dump = 0;
+	char tmp[16];
+        int64_t seek_offset = 0;
+        int64_t tmp_offset = 0;
+	int get_wmarks = 0;
+        rd_kafka_headers_t *hdrs = NULL;
+        rd_kafka_resp_err_t err;
+
+	/* Kafka configuration */
+	conf = rd_kafka_conf_new();
+
+        /* Set logger */
+        rd_kafka_conf_set_log_cb(conf, logger);
+
+	/* Quick termination */
+	snprintf(tmp, sizeof(tmp), "%i", SIGIO);
+	rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
+
+	/* Topic configuration */
+	topic_conf = rd_kafka_topic_conf_new();
+
+	while ((opt = getopt(argc, argv, "PCLt:p:b:z:qd:o:eX:As:H:")) != -1) {
+		switch (opt) {
+		case 'P':
+		case 'C':
+                case 'L':
+			mode = opt;
+			break;
+		case 't':
+			topic = optarg;
+			break;
+		case 'p':
+			partition = atoi(optarg);
+			break;
+		case 'b':
+			brokers = optarg;
+			break;
+		case 'z':
+			if (rd_kafka_conf_set(conf, "compression.codec",
+					      optarg,
+					      errstr, sizeof(errstr)) !=
+			    RD_KAFKA_CONF_OK) {
+				fprintf(stderr, "%% %s\n", errstr);
+				exit(1);
+			}
+			break;
+		case 'o':
+                case 's':
+			if (!strcmp(optarg, "end"))
+				tmp_offset = RD_KAFKA_OFFSET_END;
+			else if (!strcmp(optarg, "beginning"))
+				tmp_offset = RD_KAFKA_OFFSET_BEGINNING;
+			else if (!strcmp(optarg, "stored"))
+				tmp_offset = RD_KAFKA_OFFSET_STORED;
+                        else if (!strcmp(optarg, "report"))
+                                report_offsets = 1;
+			else if (!strcmp(optarg, "wmark"))
+				get_wmarks = 1;
+			else {
+				tmp_offset = strtoll(optarg, NULL, 10);
+
+				if (tmp_offset < 0)
+					tmp_offset = RD_KAFKA_OFFSET_TAIL(-tmp_offset);
+			}
+
+                        if (opt == 'o')
+                                start_offset = tmp_offset;
+                        else if (opt == 's')
+                                seek_offset = tmp_offset;
+			break;
+		case 'e':
+			exit_eof = 1;
+			break;
+		case 'd':
+			if (rd_kafka_conf_set(conf, "debug", optarg,
+					      errstr, sizeof(errstr)) !=
+			    RD_KAFKA_CONF_OK) {
+				fprintf(stderr,
+					"%% Debug configuration failed: "
+					"%s: %s\n",
+					errstr, optarg);
+				exit(1);
+			}
+			break;
+		case 'q':
+			quiet = 1;
+			break;
+		case 'A':
+			output = OUTPUT_RAW;
+			break;
+                case 'H':
+                {
+                        char *name, *val;
+                        size_t name_sz = -1;
+
+                        name = optarg;
+                        val = strchr(name, '=');
+                        if (val) {
+                                name_sz = (size_t)(val-name);
+                                val++; /* past the '=' */
+                        }
+
+                        if (!hdrs)
+                                hdrs = rd_kafka_headers_new(8);
+
+                        err = rd_kafka_header_add(hdrs, name, name_sz, val, -1);
+                        if (err) {
+                                fprintf(stderr,
+                                        "%% Failed to add header %s: %s\n",
+                                        name, rd_kafka_err2str(err));
+                                exit(1);
+                        }
+                }
+                break;
+
+		case 'X':
+		{
+			char *name, *val;
+			rd_kafka_conf_res_t res;
+
+			if (!strcmp(optarg, "list") ||
+			    !strcmp(optarg, "help")) {
+				rd_kafka_conf_properties_show(stdout);
+				exit(0);
+			}
+
+			if (!strcmp(optarg, "dump")) {
+				do_conf_dump = 1;
+				continue;
+			}
+
+			name = optarg;
+			if (!(val = strchr(name, '='))) {
+				char dest[512];
+				size_t dest_size = sizeof(dest);
+				/* Return current value for property. */
+
+				res = RD_KAFKA_CONF_UNKNOWN;
+				if (!strncmp(name, "topic.", strlen("topic.")))
+					res = rd_kafka_topic_conf_get(
+						topic_conf,
+						name+strlen("topic."),
+						dest, &dest_size);
+				if (res == RD_KAFKA_CONF_UNKNOWN)
+					res = rd_kafka_conf_get(
+						conf, name, dest, &dest_size);
+
+				if (res == RD_KAFKA_CONF_OK) {
+					printf("%s = %s\n", name, dest);
+					exit(0);
+				} else {
+					fprintf(stderr,
+						"%% %s property\n",
+						res == RD_KAFKA_CONF_UNKNOWN ?
+						"Unknown" : "Invalid");
+					exit(1);
+				}
+			}
+
+			*val = '\0';
+			val++;
+
+			res = RD_KAFKA_CONF_UNKNOWN;
+			/* Try "topic." prefixed properties on topic
+			 * conf first, and then fall through to global if
+			 * it didnt match a topic configuration property. */
+			if (!strncmp(name, "topic.", strlen("topic.")))
+				res = rd_kafka_topic_conf_set(topic_conf,
+							      name+
+							      strlen("topic."),
+							      val,
+							      errstr,
+							      sizeof(errstr));
+
+			if (res == RD_KAFKA_CONF_UNKNOWN)
+				res = rd_kafka_conf_set(conf, name, val,
+							errstr, sizeof(errstr));
+
+			if (res != RD_KAFKA_CONF_OK) {
+				fprintf(stderr, "%% %s\n", errstr);
+				exit(1);
+			}
+		}
+		break;
+
+		default:
+			goto usage;
+		}
+	}
+
+
+	if (do_conf_dump) {
+		const char **arr;
+		size_t cnt;
+		int pass;
+
+		for (pass = 0 ; pass < 2 ; pass++) {
+			int i;
+
+			if (pass == 0) {
+				arr = rd_kafka_conf_dump(conf, &cnt);
+				printf("# Global config\n");
+			} else {
+				printf("# Topic config\n");
+				arr = rd_kafka_topic_conf_dump(topic_conf,
+							       &cnt);
+			}
+
+			for (i = 0 ; i < (int)cnt ; i += 2)
+				printf("%s = %s\n",
+				       arr[i], arr[i+1]);
+
+			printf("\n");
+
+			rd_kafka_conf_dump_free(arr, cnt);
+		}
+
+		exit(0);
+	}
+
+
+	if (optind != argc || (mode != 'L' && !topic)) {
+	usage:
+		fprintf(stderr,
+			"Usage: %s -C|-P|-L -t <topic> "
+			"[-p <partition>] [-b <host1:port1,host2:port2,..>]\n"
+			"\n"
+			"librdkafka version %s (0x%08x)\n"
+			"\n"
+			" Options:\n"
+			"  -C | -P         Consumer or Producer mode\n"
+                        "  -L              Metadata list mode\n"
+			"  -t <topic>      Topic to fetch / produce\n"
+			"  -p <num>        Partition (random partitioner)\n"
+			"  -b <brokers>    Broker address (localhost:9092)\n"
+			"  -z <codec>      Enable compression:\n"
+			"                  none|gzip|snappy\n"
+			"  -o <offset>     Start offset (consumer):\n"
+			"                  beginning, end, NNNNN or -NNNNN\n"
+			"                  wmark returns the current hi&lo "
+			"watermarks.\n"
+                        "  -o report       Report message offsets (producer)\n"
+			"  -e              Exit consumer when last message\n"
+			"                  in partition has been received.\n"
+			"  -d [facs..]     Enable debugging contexts:\n"
+			"                  %s\n"
+			"  -q              Be quiet\n"
+			"  -A              Raw payload output (consumer)\n"
+                        "  -H <name[=value]> Add header to message (producer)\n"
+			"  -X <prop=name>  Set arbitrary librdkafka "
+			"configuration property\n"
+			"                  Properties prefixed with \"topic.\" "
+			"will be set on topic object.\n"
+			"  -X list         Show full list of supported "
+			"properties.\n"
+			"  -X <prop>       Get single property value\n"
+			"\n"
+			" In Consumer mode:\n"
+			"  writes fetched messages to stdout\n"
+			" In Producer mode:\n"
+			"  reads messages from stdin and sends to broker\n"
+                        " In List mode:\n"
+                        "  queries broker for metadata information, "
+                        "topic is optional.\n"
+			"\n"
+			"\n"
+			"\n",
+			argv[0],
+			rd_kafka_version_str(), rd_kafka_version(),
+			RD_KAFKA_DEBUG_CONTEXTS);
+		exit(1);
+	}
+
+	if ((mode == 'C' && !isatty(STDIN_FILENO)) ||
+	    (mode != 'C' && !isatty(STDOUT_FILENO)))
+		quiet = 1;
+
+
+	signal(SIGINT, stop);
+	signal(SIGUSR1, sig_usr1);
+
+	if (mode == 'P') {
+		/*
+		 * Producer
+		 */
+		char buf[2048];
+		int sendcnt = 0;
+
+		/* Set up a message delivery report callback.
+		 * It will be called once for each message, either on successful
+		 * delivery to broker, or upon failure to deliver to broker. */
+
+                /* If offset reporting (-o report) is enabled, use the
+                 * richer dr_msg_cb instead. */
+                if (report_offsets) {
+                        rd_kafka_topic_conf_set(topic_conf,
+                                                "produce.offset.report",
+                                                "true", errstr, sizeof(errstr));
+                        rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered2);
+                } else
+                        rd_kafka_conf_set_dr_cb(conf, msg_delivered);
+
+		/* Create Kafka handle */
+		if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
+					errstr, sizeof(errstr)))) {
+			fprintf(stderr,
+				"%% Failed to create new producer: %s\n",
+				errstr);
+			exit(1);
+		}
+
+		/* Add brokers */
+		if (rd_kafka_brokers_add(rk, brokers) == 0) {
+			fprintf(stderr, "%% No valid brokers specified\n");
+			exit(1);
+		}
+
+		/* Create topic */
+		rkt = rd_kafka_topic_new(rk, topic, topic_conf);
+                topic_conf = NULL; /* Now owned by topic */
+
+		if (!quiet)
+			fprintf(stderr,
+				"%% Type stuff and hit enter to send\n");
+
+		while (run && fgets(buf, sizeof(buf), stdin)) {
+			size_t len = strlen(buf);
+			if (buf[len-1] == '\n')
+				buf[--len] = '\0';
+
+			/* Send/Produce message. */
+                        if (hdrs) {
+                                rd_kafka_headers_t *hdrs_copy;
+
+                                hdrs_copy = rd_kafka_headers_copy(hdrs);
+
+                                err = rd_kafka_producev(
+                                        rk,
+                                        RD_KAFKA_V_RKT(rkt),
+                                        RD_KAFKA_V_PARTITION(partition),
+                                        RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
+                                        RD_KAFKA_V_VALUE(buf, len),
+                                        RD_KAFKA_V_HEADERS(hdrs_copy),
+                                        RD_KAFKA_V_END);
+
+                                if (err)
+                                        rd_kafka_headers_destroy(hdrs_copy);
+
+                        } else {
+                                if (rd_kafka_produce(
+                                            rkt, partition,
+                                            RD_KAFKA_MSG_F_COPY,
+                                            /* Payload and length */
+                                            buf, len,
+                                            /* Optional key and its length */
+                                            NULL, 0,
+                                            /* Message opaque, provided in
+                                             * delivery report callback as
+                                             * msg_opaque. */
+                                            NULL) == -1) {
+                                        err = rd_kafka_last_error();
+                                }
+                        }
+
+                        if (err) {
+                                fprintf(stderr,
+                                        "%% Failed to produce to topic %s "
+					"partition %i: %s\n",
+					rd_kafka_topic_name(rkt), partition,
+					rd_kafka_err2str(err));
+
+				/* Poll to handle delivery reports */
+				rd_kafka_poll(rk, 0);
+				continue;
+			}
+
+			if (!quiet)
+				fprintf(stderr, "%% Sent %zd bytes to topic "
+					"%s partition %i\n",
+				len, rd_kafka_topic_name(rkt), partition);
+			sendcnt++;
+			/* Poll to handle delivery reports */
+			rd_kafka_poll(rk, 0);
+		}
+
+		/* Poll to handle delivery reports */
+		rd_kafka_poll(rk, 0);
+
+		/* Wait for messages to be delivered */
+		while (run && rd_kafka_outq_len(rk) > 0)
+			rd_kafka_poll(rk, 100);
+
+		/* Destroy topic */
+		rd_kafka_topic_destroy(rkt);
+
+		/* Destroy the handle */
+		rd_kafka_destroy(rk);
+
+	} else if (mode == 'C') {
+		/*
+		 * Consumer
+		 */
+
+		/* Create Kafka handle */
+		if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
+					errstr, sizeof(errstr)))) {
+			fprintf(stderr,
+				"%% Failed to create new consumer: %s\n",
+				errstr);
+			exit(1);
+		}
+
+		/* Add brokers */
+		if (rd_kafka_brokers_add(rk, brokers) == 0) {
+			fprintf(stderr, "%% No valid brokers specified\n");
+			exit(1);
+		}
+
+		if (get_wmarks) {
+			int64_t lo, hi;
+                        rd_kafka_resp_err_t err;
+
+			/* Only query for hi&lo partition watermarks */
+
+			if ((err = rd_kafka_query_watermark_offsets(
+				     rk, topic, partition, &lo, &hi, 5000))) {
+				fprintf(stderr, "%% query_watermark_offsets() "
+					"failed: %s\n",
+					rd_kafka_err2str(err));
+				exit(1);
+			}
+
+			printf("%s [%d]: low - high offsets: "
+			       "%"PRId64" - %"PRId64"\n",
+			       topic, partition, lo, hi);
+
+			rd_kafka_destroy(rk);
+			exit(0);
+		}
+
+
+		/* Create topic */
+		rkt = rd_kafka_topic_new(rk, topic, topic_conf);
+                topic_conf = NULL; /* Now owned by topic */
+
+		/* Start consuming */
+		if (rd_kafka_consume_start(rkt, partition, start_offset) == -1){
+			rd_kafka_resp_err_t err = rd_kafka_last_error();
+			fprintf(stderr, "%% Failed to start consuming: %s\n",
+				rd_kafka_err2str(err));
+                        if (err == RD_KAFKA_RESP_ERR__INVALID_ARG)
+                                fprintf(stderr,
+                                        "%% Broker based offset storage "
+                                        "requires a group.id, "
+                                        "add: -X group.id=yourGroup\n");
+			exit(1);
+		}
+
+		while (run) {
+			rd_kafka_message_t *rkmessage;
+                        rd_kafka_resp_err_t err;
+
+                        /* Poll for errors, etc. */
+                        rd_kafka_poll(rk, 0);
+
+			/* Consume single message.
+			 * See rdkafka_performance.c for high speed
+			 * consuming of messages. */
+			rkmessage = rd_kafka_consume(rkt, partition, 1000);
+			if (!rkmessage) /* timeout */
+				continue;
+
+			msg_consume(rkmessage, NULL);
+
+			/* Return message to rdkafka */
+			rd_kafka_message_destroy(rkmessage);
+
+                        if (seek_offset) {
+                                err = rd_kafka_seek(rkt, partition, seek_offset,
+                                                    2000);
+                                if (err)
+                                        printf("Seek failed: %s\n",
+                                               rd_kafka_err2str(err));
+                                else
+                                        printf("Seeked to %"PRId64"\n",
+                                               seek_offset);
+                                seek_offset = 0;
+                        }
+		}
+
+		/* Stop consuming */
+		rd_kafka_consume_stop(rkt, partition);
+
+                while (rd_kafka_outq_len(rk) > 0)
+                        rd_kafka_poll(rk, 10);
+
+		/* Destroy topic */
+		rd_kafka_topic_destroy(rkt);
+
+		/* Destroy handle */
+		rd_kafka_destroy(rk);
+
+        } else if (mode == 'L') {
+                rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
+
+		/* Create Kafka handle */
+		if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
+					errstr, sizeof(errstr)))) {
+			fprintf(stderr,
+				"%% Failed to create new producer: %s\n",
+				errstr);
+			exit(1);
+		}
+
+		/* Add brokers */
+		if (rd_kafka_brokers_add(rk, brokers) == 0) {
+			fprintf(stderr, "%% No valid brokers specified\n");
+			exit(1);
+		}
+
+                /* Create topic */
+                if (topic) {
+                        rkt = rd_kafka_topic_new(rk, topic, topic_conf);
+                        topic_conf = NULL; /* Now owned by topic */
+                } else
+                        rkt = NULL;
+
+                while (run) {
+                        const struct rd_kafka_metadata *metadata;
+
+                        /* Fetch metadata */
+                        err = rd_kafka_metadata(rk, rkt ? 0 : 1, rkt,
+                                                &metadata, 5000);
+                        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
+                                fprintf(stderr,
+                                        "%% Failed to acquire metadata: %s\n",
+                                        rd_kafka_err2str(err));
+                                run = 0;
+                                break;
+                        }
+
+                        metadata_print(topic, metadata);
+
+                        rd_kafka_metadata_destroy(metadata);
+                        run = 0;
+                }
+
+		/* Destroy topic */
+		if (rkt)
+			rd_kafka_topic_destroy(rkt);
+
+		/* Destroy the handle */
+		rd_kafka_destroy(rk);
+
+                if (topic_conf)
+                        rd_kafka_topic_conf_destroy(topic_conf);
+
+
+                /* Exit right away, dont wait for background cleanup, we haven't
+                 * done anything important anyway. */
+                exit(err ? 2 : 0);
+        }
+
+        if (hdrs)
+                rd_kafka_headers_destroy(hdrs);
+
+        if (topic_conf)
+                rd_kafka_topic_conf_destroy(topic_conf);
+
+	/* Let background threads clean up and terminate cleanly. */
+	run = 5;
+	while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
+		printf("Waiting for librdkafka to decommission\n");
+	if (run <= 0)
+		rd_kafka_dump(stdout, rk);
+
+	return 0;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7528d23e/thirdparty/librdkafka-0.11.4/examples/rdkafka_example.cpp
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.4/examples/rdkafka_example.cpp b/thirdparty/librdkafka-0.11.4/examples/rdkafka_example.cpp
new file mode 100644
index 0000000..30d0d0e
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.4/examples/rdkafka_example.cpp
@@ -0,0 +1,645 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2014, Magnus Edenhill
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met: 
+ * 
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer. 
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution. 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Apache Kafka consumer & producer example programs
+ * using the Kafka driver from librdkafka
+ * (https://github.com/edenhill/librdkafka)
+ */
+
+#include <iostream>
+#include <string>
+#include <cstdlib>
+#include <cstdio>
+#include <csignal>
+#include <cstring>
+
+#ifdef _MSC_VER
+#include "../win32/wingetopt.h"
+#elif _AIX
+#include <unistd.h>
+#else
+#include <getopt.h>
+#endif
+
+/*
+ * Typically include path in a real application would be
+ * #include <librdkafka/rdkafkacpp.h>
+ */
+#include "rdkafkacpp.h"
+
+
+static void metadata_print (const std::string &topic,
+                            const RdKafka::Metadata *metadata) {
+  std::cout << "Metadata for " << (topic.empty() ? "" : "all topics")
+           << "(from broker "  << metadata->orig_broker_id()
+           << ":" << metadata->orig_broker_name() << std::endl;
+
+  /* Iterate brokers */
+  std::cout << " " << metadata->brokers()->size() << " brokers:" << std::endl;
+  RdKafka::Metadata::BrokerMetadataIterator ib;
+  for (ib = metadata->brokers()->begin();
+       ib != metadata->brokers()->end();
+       ++ib) {
+    std::cout << "  broker " << (*ib)->id() << " at "
+              << (*ib)->host() << ":" << (*ib)->port() << std::endl;
+  }
+  /* Iterate topics */
+  std::cout << metadata->topics()->size() << " topics:" << std::endl;
+  RdKafka::Metadata::TopicMetadataIterator it;
+  for (it = metadata->topics()->begin();
+       it != metadata->topics()->end();
+       ++it) {
+    std::cout << "  topic \""<< (*it)->topic() << "\" with "
+              << (*it)->partitions()->size() << " partitions:";
+
+    if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
+      std::cout << " " << err2str((*it)->err());
+      if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE)
+        std::cout << " (try again)";
+    }
+    std::cout << std::endl;
+
+    /* Iterate topic's partitions */
+    RdKafka::TopicMetadata::PartitionMetadataIterator ip;
+    for (ip = (*it)->partitions()->begin();
+         ip != (*it)->partitions()->end();
+         ++ip) {
+      std::cout << "    partition " << (*ip)->id()
+                << ", leader " << (*ip)->leader()
+                << ", replicas: ";
+
+      /* Iterate partition's replicas */
+      RdKafka::PartitionMetadata::ReplicasIterator ir;
+      for (ir = (*ip)->replicas()->begin();
+           ir != (*ip)->replicas()->end();
+           ++ir) {
+        std::cout << (ir == (*ip)->replicas()->begin() ? "":",") << *ir;
+      }
+
+      /* Iterate partition's ISRs */
+      std::cout << ", isrs: ";
+      RdKafka::PartitionMetadata::ISRSIterator iis;
+      for (iis = (*ip)->isrs()->begin(); iis != (*ip)->isrs()->end() ; ++iis)
+        std::cout << (iis == (*ip)->isrs()->begin() ? "":",") << *iis;
+
+      if ((*ip)->err() != RdKafka::ERR_NO_ERROR)
+        std::cout << ", " << RdKafka::err2str((*ip)->err()) << std::endl;
+      else
+        std::cout << std::endl;
+    }
+  }
+}
+
+static bool run = true;
+static bool exit_eof = false;
+
+static void sigterm (int sig) {
+  run = false;
+}
+
+
+class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
+ public:
+  void dr_cb (RdKafka::Message &message) {
+    std::cout << "Message delivery for (" << message.len() << " bytes): " <<
+        message.errstr() << std::endl;
+    if (message.key())
+      std::cout << "Key: " << *(message.key()) << ";" << std::endl;
+  }
+};
+
+
+class ExampleEventCb : public RdKafka::EventCb {
+ public:
+  void event_cb (RdKafka::Event &event) {
+    switch (event.type())
+    {
+      case RdKafka::Event::EVENT_ERROR:
+        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
+            event.str() << std::endl;
+        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
+          run = false;
+        break;
+
+      case RdKafka::Event::EVENT_STATS:
+        std::cerr << "\"STATS\": " << event.str() << std::endl;
+        break;
+
+      case RdKafka::Event::EVENT_LOG:
+        fprintf(stderr, "LOG-%i-%s: %s\n",
+                event.severity(), event.fac().c_str(), event.str().c_str());
+        break;
+
+      default:
+        std::cerr << "EVENT " << event.type() <<
+            " (" << RdKafka::err2str(event.err()) << "): " <<
+            event.str() << std::endl;
+        break;
+    }
+  }
+};
+
+
+/* Use of this partitioner is pretty pointless since no key is provided
+ * in the produce() call. */
+class MyHashPartitionerCb : public RdKafka::PartitionerCb {
+ public:
+  int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,
+                          int32_t partition_cnt, void *msg_opaque) {
+    return djb_hash(key->c_str(), key->size()) % partition_cnt;
+  }
+ private:
+
+  static inline unsigned int djb_hash (const char *str, size_t len) {
+    unsigned int hash = 5381;
+    for (size_t i = 0 ; i < len ; i++)
+      hash = ((hash << 5) + hash) + str[i];
+    return hash;
+  }
+};
+
+void msg_consume(RdKafka::Message* message, void* opaque) {
+  switch (message->err()) {
+    case RdKafka::ERR__TIMED_OUT:
+      break;
+
+    case RdKafka::ERR_NO_ERROR:
+      /* Real message */
+      std::cout << "Read msg at offset " << message->offset() << std::endl;
+      if (message->key()) {
+        std::cout << "Key: " << *message->key() << std::endl;
+      }
+      printf("%.*s\n",
+        static_cast<int>(message->len()),
+        static_cast<const char *>(message->payload()));
+      break;
+
+    case RdKafka::ERR__PARTITION_EOF:
+      /* Last message */
+      if (exit_eof) {
+        run = false;
+      }
+      break;
+
+    case RdKafka::ERR__UNKNOWN_TOPIC:
+    case RdKafka::ERR__UNKNOWN_PARTITION:
+      std::cerr << "Consume failed: " << message->errstr() << std::endl;
+      run = false;
+      break;
+
+    default:
+      /* Errors */
+      std::cerr << "Consume failed: " << message->errstr() << std::endl;
+      run = false;
+  }
+}
+
+
+class ExampleConsumeCb : public RdKafka::ConsumeCb {
+ public:
+  void consume_cb (RdKafka::Message &msg, void *opaque) {
+    msg_consume(&msg, opaque);
+  }
+};
+
+
+
+int main (int argc, char **argv) {
+  std::string brokers = "localhost";
+  std::string errstr;
+  std::string topic_str;
+  std::string mode;
+  std::string debug;
+  int32_t partition = RdKafka::Topic::PARTITION_UA;
+  int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
+  bool do_conf_dump = false;
+  int opt;
+  MyHashPartitionerCb hash_partitioner;
+  int use_ccb = 0;
+
+  /*
+   * Create configuration objects
+   */
+  RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
+  RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
+
+
+  while ((opt = getopt(argc, argv, "PCLt:p:b:z:qd:o:eX:AM:f:")) != -1) {
+    switch (opt) {
+    case 'P':
+    case 'C':
+    case 'L':
+      mode = opt;
+      break;
+    case 't':
+      topic_str = optarg;
+      break;
+    case 'p':
+      if (!strcmp(optarg, "random"))
+        /* default */;
+      else if (!strcmp(optarg, "hash")) {
+        if (tconf->set("partitioner_cb", &hash_partitioner, errstr) !=
+            RdKafka::Conf::CONF_OK) {
+          std::cerr << errstr << std::endl;
+          exit(1);
+        }
+      } else
+        partition = std::atoi(optarg);
+      break;
+    case 'b':
+      brokers = optarg;
+      break;
+    case 'z':
+      if (conf->set("compression.codec", optarg, errstr) !=
+	  RdKafka::Conf::CONF_OK) {
+	std::cerr << errstr << std::endl;
+	exit(1);
+      }
+      break;
+    case 'o':
+      if (!strcmp(optarg, "end"))
+	start_offset = RdKafka::Topic::OFFSET_END;
+      else if (!strcmp(optarg, "beginning"))
+	start_offset = RdKafka::Topic::OFFSET_BEGINNING;
+      else if (!strcmp(optarg, "stored"))
+	start_offset = RdKafka::Topic::OFFSET_STORED;
+      else
+	start_offset = strtoll(optarg, NULL, 10);
+      break;
+    case 'e':
+      exit_eof = true;
+      break;
+    case 'd':
+      debug = optarg;
+      break;
+    case 'M':
+      if (conf->set("statistics.interval.ms", optarg, errstr) !=
+          RdKafka::Conf::CONF_OK) {
+        std::cerr << errstr << std::endl;
+        exit(1);
+      }
+      break;
+    case 'X':
+      {
+	char *name, *val;
+
+	if (!strcmp(optarg, "dump")) {
+	  do_conf_dump = true;
+	  continue;
+	}
+
+	name = optarg;
+	if (!(val = strchr(name, '='))) {
+          std::cerr << "%% Expected -X property=value, not " <<
+              name << std::endl;
+	  exit(1);
+	}
+
+	*val = '\0';
+	val++;
+
+	/* Try "topic." prefixed properties on topic
+	 * conf first, and then fall through to global if
+	 * it didnt match a topic configuration property. */
+        RdKafka::Conf::ConfResult res;
+	if (!strncmp(name, "topic.", strlen("topic.")))
+          res = tconf->set(name+strlen("topic."), val, errstr);
+        else
+	  res = conf->set(name, val, errstr);
+
+	if (res != RdKafka::Conf::CONF_OK) {
+          std::cerr << errstr << std::endl;
+	  exit(1);
+	}
+      }
+      break;
+
+      case 'f':
+        if (!strcmp(optarg, "ccb"))
+          use_ccb = 1;
+        else {
+          std::cerr << "Unknown option: " << optarg << std::endl;
+          exit(1);
+        }
+        break;
+
+    default:
+      goto usage;
+    }
+  }
+
+  if (mode.empty() || (topic_str.empty() && mode != "L") || optind != argc) {
+  usage:
+	  std::string features;
+	  conf->get("builtin.features", features);
+    fprintf(stderr,
+            "Usage: %s [-C|-P] -t <topic> "
+            "[-p <partition>] [-b <host1:port1,host2:port2,..>]\n"
+            "\n"
+            "librdkafka version %s (0x%08x, builtin.features \"%s\")\n"
+            "\n"
+            " Options:\n"
+            "  -C | -P         Consumer or Producer mode\n"
+            "  -L              Metadata list mode\n"
+            "  -t <topic>      Topic to fetch / produce\n"
+            "  -p <num>        Partition (random partitioner)\n"
+            "  -p <func>       Use partitioner:\n"
+            "                  random (default), hash\n"
+            "  -b <brokers>    Broker address (localhost:9092)\n"
+            "  -z <codec>      Enable compression:\n"
+            "                  none|gzip|snappy\n"
+            "  -o <offset>     Start offset (consumer)\n"
+            "  -e              Exit consumer when last message\n"
+            "                  in partition has been received.\n"
+            "  -d [facs..]     Enable debugging contexts:\n"
+            "                  %s\n"
+            "  -M <intervalms> Enable statistics\n"
+            "  -X <prop=name>  Set arbitrary librdkafka "
+            "configuration property\n"
+            "                  Properties prefixed with \"topic.\" "
+            "will be set on topic object.\n"
+            "                  Use '-X list' to see the full list\n"
+            "                  of supported properties.\n"
+            "  -f <flag>       Set option:\n"
+            "                     ccb - use consume_callback\n"
+            "\n"
+            " In Consumer mode:\n"
+            "  writes fetched messages to stdout\n"
+            " In Producer mode:\n"
+            "  reads messages from stdin and sends to broker\n"
+            "\n"
+            "\n"
+            "\n",
+	    argv[0],
+	    RdKafka::version_str().c_str(), RdKafka::version(),
+		features.c_str(),
+	    RdKafka::get_debug_contexts().c_str());
+	exit(1);
+  }
+
+
+  /*
+   * Set configuration properties
+   */
+  conf->set("metadata.broker.list", brokers, errstr);
+
+  if (!debug.empty()) {
+    if (conf->set("debug", debug, errstr) != RdKafka::Conf::CONF_OK) {
+      std::cerr << errstr << std::endl;
+      exit(1);
+    }
+  }
+
+  ExampleEventCb ex_event_cb;
+  conf->set("event_cb", &ex_event_cb, errstr);
+
+  if (do_conf_dump) {
+    int pass;
+
+    for (pass = 0 ; pass < 2 ; pass++) {
+      std::list<std::string> *dump;
+      if (pass == 0) {
+        dump = conf->dump();
+        std::cout << "# Global config" << std::endl;
+      } else {
+        dump = tconf->dump();
+        std::cout << "# Topic config" << std::endl;
+      }
+
+      for (std::list<std::string>::iterator it = dump->begin();
+           it != dump->end(); ) {
+        std::cout << *it << " = ";
+        it++;
+        std::cout << *it << std::endl;
+        it++;
+      }
+      std::cout << std::endl;
+    }
+    exit(0);
+  }
+
+  signal(SIGINT, sigterm);
+  signal(SIGTERM, sigterm);
+
+
+  if (mode == "P") {
+    /*
+     * Producer mode
+     */
+
+    if(topic_str.empty())
+      goto usage;
+
+    ExampleDeliveryReportCb ex_dr_cb;
+
+    /* Set delivery report callback */
+    conf->set("dr_cb", &ex_dr_cb, errstr);
+
+    /*
+     * Create producer using accumulated global configuration.
+     */
+    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
+    if (!producer) {
+      std::cerr << "Failed to create producer: " << errstr << std::endl;
+      exit(1);
+    }
+
+    std::cout << "% Created producer " << producer->name() << std::endl;
+
+    /*
+     * Create topic handle.
+     */
+    RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str,
+						   tconf, errstr);
+    if (!topic) {
+      std::cerr << "Failed to create topic: " << errstr << std::endl;
+      exit(1);
+    }
+
+    /*
+     * Read messages from stdin and produce to broker.
+     */
+    for (std::string line; run && std::getline(std::cin, line);) {
+      if (line.empty()) {
+        producer->poll(0);
+	continue;
+      }
+
+      /*
+       * Produce message
+       */
+      RdKafka::ErrorCode resp =
+	producer->produce(topic, partition,
+			  RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
+			  const_cast<char *>(line.c_str()), line.size(),
+			  NULL, NULL);
+      if (resp != RdKafka::ERR_NO_ERROR)
+	std::cerr << "% Produce failed: " <<
+	  RdKafka::err2str(resp) << std::endl;
+      else
+	std::cerr << "% Produced message (" << line.size() << " bytes)" <<
+	  std::endl;
+
+      producer->poll(0);
+    }
+    run = true;
+
+    while (run && producer->outq_len() > 0) {
+      std::cerr << "Waiting for " << producer->outq_len() << std::endl;
+      producer->poll(1000);
+    }
+
+    delete topic;
+    delete producer;
+
+
+  } else if (mode == "C") {
+    /*
+     * Consumer mode
+     */
+
+    if(topic_str.empty())
+      goto usage;
+
+    /*
+     * Create consumer using accumulated global configuration.
+     */
+    RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
+    if (!consumer) {
+      std::cerr << "Failed to create consumer: " << errstr << std::endl;
+      exit(1);
+    }
+
+    std::cout << "% Created consumer " << consumer->name() << std::endl;
+
+    /*
+     * Create topic handle.
+     */
+    RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str,
+						   tconf, errstr);
+    if (!topic) {
+      std::cerr << "Failed to create topic: " << errstr << std::endl;
+      exit(1);
+    }
+
+    /*
+     * Start consumer for topic+partition at start offset
+     */
+    RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
+    if (resp != RdKafka::ERR_NO_ERROR) {
+      std::cerr << "Failed to start consumer: " <<
+	RdKafka::err2str(resp) << std::endl;
+      exit(1);
+    }
+
+    ExampleConsumeCb ex_consume_cb;
+
+    /*
+     * Consume messages
+     */
+    while (run) {
+      if (use_ccb) {
+        consumer->consume_callback(topic, partition, 1000,
+                                   &ex_consume_cb, &use_ccb);
+      } else {
+        RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
+        msg_consume(msg, NULL);
+        delete msg;
+      }
+      consumer->poll(0);
+    }
+
+    /*
+     * Stop consumer
+     */
+    consumer->stop(topic, partition);
+
+    consumer->poll(1000);
+
+    delete topic;
+    delete consumer;
+  } else {
+    /* Metadata mode */
+
+    /*
+     * Create producer using accumulated global configuration.
+     */
+    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
+    if (!producer) {
+      std::cerr << "Failed to create producer: " << errstr << std::endl;
+      exit(1);
+    }
+
+    std::cout << "% Created producer " << producer->name() << std::endl;
+
+    /*
+     * Create topic handle.
+     */
+    RdKafka::Topic *topic = NULL;
+    if(!topic_str.empty()) {
+      topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
+      if (!topic) {
+        std::cerr << "Failed to create topic: " << errstr << std::endl;
+        exit(1);
+      }
+    }
+
+    while (run) {
+      class RdKafka::Metadata *metadata;
+
+      /* Fetch metadata */
+      RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic,
+                              &metadata, 5000);
+      if (err != RdKafka::ERR_NO_ERROR) {
+        std::cerr << "%% Failed to acquire metadata: " 
+                  << RdKafka::err2str(err) << std::endl;
+              run = 0;
+              break;
+      }
+
+      metadata_print(topic_str, metadata);
+
+      delete metadata;
+      run = 0;
+    }
+
+  }
+
+
+  /*
+   * Wait for RdKafka to decommission.
+   * This is not strictly needed (when check outq_len() above), but
+   * allows RdKafka to clean up all its resources before the application
+   * exits so that memory profilers such as valgrind wont complain about
+   * memory leaks.
+   */
+  RdKafka::wait_destroyed(5000);
+
+  return 0;
+}