You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/16 21:15:38 UTC

[08/42] nifi-minifi-cpp git commit: MINIFICPP-274: PutKafka Processor

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl.c
new file mode 100644
index 0000000..9ece5cb
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl.c
@@ -0,0 +1,343 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_transport.h"
+#include "rdkafka_transport_int.h"
+#include "rdkafka_sasl.h"
+#include "rdkafka_sasl_int.h"
+
+
+ /**
+ * Send auth message with framing.
+ * This is a blocking call.
+ */
+int rd_kafka_sasl_send (rd_kafka_transport_t *rktrans,
+                        const void *payload, int len,
+                        char *errstr, size_t errstr_size) {
+        rd_buf_t buf;
+        rd_slice_t slice;
+	int32_t hdr;
+
+	rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
+		   "Send SASL frame to broker (%d bytes)", len);
+
+        rd_buf_init(&buf, 1+1, sizeof(hdr));
+
+	hdr = htobe32(len);
+        rd_buf_write(&buf, &hdr, sizeof(hdr));
+	if (payload)
+                rd_buf_push(&buf, payload, len, NULL);
+
+        rd_slice_init_full(&slice, &buf);
+
+	/* Simulate blocking behaviour on non-blocking socket..
+	 * FIXME: This isn't optimal but is highly unlikely to stall since
+	 *        the socket buffer will most likely not be exceeded. */
+	do {
+		int r;
+
+		r = (int)rd_kafka_transport_send(rktrans, &slice,
+                                                 errstr, errstr_size);
+		if (r == -1) {
+			rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
+				   "SASL send failed: %s", errstr);
+                        rd_buf_destroy(&buf);
+			return -1;
+		}
+
+                if (rd_slice_remains(&slice) == 0)
+                        break;
+
+		/* Avoid busy-looping */
+		rd_usleep(10*1000, NULL);
+
+	} while (1);
+
+        rd_buf_destroy(&buf);
+
+	return 0;
+}
+
+
+/**
+ * @brief Authentication succesful
+ *
+ * Transition to next connect state.
+ */
+void rd_kafka_sasl_auth_done (rd_kafka_transport_t *rktrans) {
+        /* Authenticated */
+        rd_kafka_broker_connect_up(rktrans->rktrans_rkb);
+}
+
+
+int rd_kafka_sasl_io_event (rd_kafka_transport_t *rktrans, int events,
+                            char *errstr, size_t errstr_size) {
+        rd_kafka_buf_t *rkbuf;
+        int r;
+        const void *buf;
+        size_t len;
+
+        if (!(events & POLLIN))
+                return 0;
+
+        r = rd_kafka_transport_framed_recv(rktrans, &rkbuf,
+                                           errstr, errstr_size);
+        if (r == -1) {
+                if (!strcmp(errstr, "Disconnected"))
+                        rd_snprintf(errstr, errstr_size,
+                                    "Disconnected: check client %s credentials "
+                                    "and broker logs",
+                                    rktrans->rktrans_rkb->rkb_rk->rk_conf.
+                                    sasl.mechanisms);
+                return -1;
+        } else if (r == 0) /* not fully received yet */
+                return 0;
+
+        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
+                   "Received SASL frame from broker (%"PRIusz" bytes)",
+                   rkbuf ? rkbuf->rkbuf_totlen : 0);
+
+        if (rkbuf) {
+                rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
+                /* Seek past framing header */
+                rd_slice_seek(&rkbuf->rkbuf_reader, 4);
+                len = rd_slice_remains(&rkbuf->rkbuf_reader);
+                buf = rd_slice_ensure_contig(&rkbuf->rkbuf_reader, len);
+        } else {
+                buf = NULL;
+                len = 0;
+        }
+
+        r = rktrans->rktrans_rkb->rkb_rk->
+                rk_conf.sasl.provider->recv(rktrans, buf, len,
+                                            errstr, errstr_size);
+        rd_kafka_buf_destroy(rkbuf);
+
+        return r;
+}
+
+
+/**
+ * @brief Close SASL session (from transport code)
+ * @remark May be called on non-SASL transports (no-op)
+ */
+void rd_kafka_sasl_close (rd_kafka_transport_t *rktrans) {
+        const struct rd_kafka_sasl_provider *provider =
+                rktrans->rktrans_rkb->rkb_rk->rk_conf.
+                sasl.provider;
+
+        if (provider && provider->close)
+                provider->close(rktrans);
+}
+
+
+
+/**
+ * Initialize and start SASL authentication.
+ *
+ * Returns 0 on successful init and -1 on error.
+ *
+ * Locality: broker thread
+ */
+int rd_kafka_sasl_client_new (rd_kafka_transport_t *rktrans,
+			      char *errstr, size_t errstr_size) {
+	int r;
+	rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
+	rd_kafka_t *rk = rkb->rkb_rk;
+        char *hostname, *t;
+        const struct rd_kafka_sasl_provider *provider =
+                rk->rk_conf.sasl.provider;
+
+        /* Verify broker support:
+         * - RD_KAFKA_FEATURE_SASL_GSSAPI - GSSAPI supported
+         * - RD_KAFKA_FEATURE_SASL_HANDSHAKE - GSSAPI, PLAIN and possibly
+         *   other mechanisms supported. */
+        if (!strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) {
+                if (!(rkb->rkb_features & RD_KAFKA_FEATURE_SASL_GSSAPI)) {
+                        rd_snprintf(errstr, errstr_size,
+                                    "SASL GSSAPI authentication not supported "
+                                    "by broker");
+                        return -1;
+                }
+        } else if (!(rkb->rkb_features & RD_KAFKA_FEATURE_SASL_HANDSHAKE)) {
+                rd_snprintf(errstr, errstr_size,
+                            "SASL Handshake not supported by broker "
+                            "(required by mechanism %s)%s",
+                            rk->rk_conf.sasl.mechanisms,
+                            rk->rk_conf.api_version_request ? "" :
+                            ": try api.version.request=true");
+                return -1;
+        }
+
+        rd_strdupa(&hostname, rktrans->rktrans_rkb->rkb_nodename);
+        if ((t = strchr(hostname, ':')))
+                *t = '\0';  /* remove ":port" */
+
+        rd_rkb_dbg(rkb, SECURITY, "SASL",
+                   "Initializing SASL client: service name %s, "
+                   "hostname %s, mechanisms %s, provider %s",
+                   rk->rk_conf.sasl.service_name, hostname,
+                   rk->rk_conf.sasl.mechanisms,
+                   provider->name);
+
+        r = provider->client_new(rktrans, hostname, errstr, errstr_size);
+        if (r != -1)
+                rd_kafka_transport_poll_set(rktrans, POLLIN);
+
+        return r;
+}
+
+
+
+
+
+
+
+/**
+ * Per handle SASL term.
+ *
+ * Locality: broker thread
+ */
+void rd_kafka_sasl_broker_term (rd_kafka_broker_t *rkb) {
+        const struct rd_kafka_sasl_provider *provider =
+                rkb->rkb_rk->rk_conf.sasl.provider;
+        if (provider->broker_term)
+                provider->broker_term(rkb);
+}
+
+/**
+ * Broker SASL init.
+ *
+ * Locality: broker thread
+ */
+void rd_kafka_sasl_broker_init (rd_kafka_broker_t *rkb) {
+        const struct rd_kafka_sasl_provider *provider =
+                rkb->rkb_rk->rk_conf.sasl.provider;
+        if (provider->broker_init)
+                provider->broker_init(rkb);
+}
+
+
+
+/**
+ * @brief Select SASL provider for configured mechanism (singularis)
+ * @returns 0 on success or -1 on failure.
+ */
+int rd_kafka_sasl_select_provider (rd_kafka_t *rk,
+                                   char *errstr, size_t errstr_size) {
+        const struct rd_kafka_sasl_provider *provider = NULL;
+
+        if (!strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) {
+                /* GSSAPI / Kerberos */
+#ifdef _MSC_VER
+                provider = &rd_kafka_sasl_win32_provider;
+#elif WITH_SASL_CYRUS
+                provider = &rd_kafka_sasl_cyrus_provider;
+#endif
+
+        } else if (!strcmp(rk->rk_conf.sasl.mechanisms, "PLAIN")) {
+                /* SASL PLAIN */
+                provider = &rd_kafka_sasl_plain_provider;
+
+        } else if (!strncmp(rk->rk_conf.sasl.mechanisms, "SCRAM-SHA-",
+                            strlen("SCRAM-SHA-"))) {
+                /* SASL SCRAM */
+#if WITH_SASL_SCRAM
+                provider = &rd_kafka_sasl_scram_provider;
+#endif
+
+        } else {
+                /* Unsupported mechanism */
+                rd_snprintf(errstr, errstr_size,
+                            "Unsupported SASL mechanism: %s",
+                            rk->rk_conf.sasl.mechanisms);
+                return -1;
+        }
+
+        if (!provider) {
+                rd_snprintf(errstr, errstr_size,
+                            "No provider for SASL mechanism %s"
+                            ": recompile librdkafka with "
+#ifndef _MSC_VER
+                            "libsasl2 or "
+#endif
+                            "openssl support. "
+                            "Current build options:"
+                            " PLAIN"
+#ifdef _MSC_VER
+                            " WindowsSSPI(GSSAPI)"
+#endif
+#if WITH_SASL_CYRUS
+                            " SASL_CYRUS"
+#endif
+#if WITH_SASL_SCRAM
+                            " SASL_SCRAM"
+#endif
+                            ,
+                            rk->rk_conf.sasl.mechanisms);
+                return -1;
+        }
+
+        rd_kafka_dbg(rk, SECURITY, "SASL",
+                     "Selected provider %s for SASL mechanism %s",
+                     provider->name, rk->rk_conf.sasl.mechanisms);
+
+        /* Validate SASL config */
+        if (provider->conf_validate &&
+            provider->conf_validate(rk, errstr, errstr_size) == -1)
+                return -1;
+
+        rk->rk_conf.sasl.provider = provider;
+
+        return 0;
+}
+
+
+
+/**
+ * Global SASL termination.
+ */
+void rd_kafka_sasl_global_term (void) {
+#if WITH_SASL_CYRUS
+        rd_kafka_sasl_cyrus_global_term();
+#endif
+}
+
+
+/**
+ * Global SASL init, called once per runtime.
+ */
+int rd_kafka_sasl_global_init (void) {
+#if WITH_SASL_CYRUS
+        return rd_kafka_sasl_cyrus_global_init();
+#else
+        return 0;
+#endif
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl.h
new file mode 100644
index 0000000..496e04e
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl.h
@@ -0,0 +1,46 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#pragma once
+
+
+
+int rd_kafka_sasl_io_event (rd_kafka_transport_t *rktrans, int events,
+			    char *errstr, size_t errstr_size);
+void rd_kafka_sasl_close (rd_kafka_transport_t *rktrans);
+int rd_kafka_sasl_client_new (rd_kafka_transport_t *rktrans,
+			      char *errstr, size_t errstr_size);
+
+void rd_kafka_sasl_broker_term (rd_kafka_broker_t *rkb);
+void rd_kafka_sasl_broker_init (rd_kafka_broker_t *rkb);
+
+void rd_kafka_sasl_global_term (void);
+int rd_kafka_sasl_global_init (void);
+
+int rd_kafka_sasl_select_provider (rd_kafka_t *rk,
+                                   char *errstr, size_t errstr_size);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_cyrus.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_cyrus.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_cyrus.c
new file mode 100644
index 0000000..35b3183
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_cyrus.c
@@ -0,0 +1,623 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_transport.h"
+#include "rdkafka_transport_int.h"
+#include "rdkafka_sasl.h"
+#include "rdkafka_sasl_int.h"
+#include "rdstring.h"
+
+#ifdef __FreeBSD__
+#include <sys/wait.h>  /* For WIF.. */
+#endif
+
+#ifdef __APPLE__
+/* Apple has deprecated most of the SASL API for unknown reason,
+ * silence those warnings. */
+#pragma clang diagnostic ignored "-Wdeprecated-declarations"
+#endif
+
+#include <sasl/sasl.h>
+
+static mtx_t rd_kafka_sasl_cyrus_kinit_lock;
+
+typedef struct rd_kafka_sasl_cyrus_state_s {
+        sasl_conn_t *conn;
+        sasl_callback_t callbacks[16];
+} rd_kafka_sasl_cyrus_state_t;
+
+
+
+/**
+ * Handle received frame from broker.
+ */
+static int rd_kafka_sasl_cyrus_recv (struct rd_kafka_transport_s *rktrans,
+                                     const void *buf, size_t size,
+                                     char *errstr, size_t errstr_size) {
+        rd_kafka_sasl_cyrus_state_t *state = rktrans->rktrans_sasl.state;
+        int r;
+
+        if (rktrans->rktrans_sasl.complete && size == 0)
+                goto auth_successful;
+
+        do {
+                sasl_interact_t *interact = NULL;
+                const char *out;
+                unsigned int outlen;
+
+                r = sasl_client_step(state->conn,
+                                     size > 0 ? buf : NULL, size,
+                                     &interact,
+                                     &out, &outlen);
+
+                if (r >= 0) {
+                        /* Note: outlen may be 0 here for an empty response */
+                        if (rd_kafka_sasl_send(rktrans, out, outlen,
+                                               errstr, errstr_size) == -1)
+                                return -1;
+                }
+
+                if (r == SASL_INTERACT)
+                        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
+                                   "SASL_INTERACT: %lu %s, %s, %s, %p",
+                                   interact->id,
+                                   interact->challenge,
+                                   interact->prompt,
+                                   interact->defresult,
+                                   interact->result);
+
+        } while (r == SASL_INTERACT);
+
+        if (r == SASL_CONTINUE)
+                return 0;  /* Wait for more data from broker */
+        else if (r != SASL_OK) {
+                rd_snprintf(errstr, errstr_size,
+                            "SASL handshake failed (step): %s",
+                            sasl_errdetail(state->conn));
+                return -1;
+        }
+
+        /* Authentication successful */
+auth_successful:
+        if (rktrans->rktrans_rkb->rkb_rk->rk_conf.debug &
+            RD_KAFKA_DBG_SECURITY) {
+                const char *user, *mech, *authsrc;
+
+                if (sasl_getprop(state->conn, SASL_USERNAME,
+                                 (const void **)&user) != SASL_OK)
+                        user = "(unknown)";
+
+                if (sasl_getprop(state->conn, SASL_MECHNAME,
+                                 (const void **)&mech) != SASL_OK)
+                        mech = "(unknown)";
+
+                if (sasl_getprop(state->conn, SASL_AUTHSOURCE,
+                                 (const void **)&authsrc) != SASL_OK)
+                        authsrc = "(unknown)";
+
+                rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
+                           "Authenticated as %s using %s (%s)",
+                           user, mech, authsrc);
+        }
+
+        rd_kafka_sasl_auth_done(rktrans);
+
+        return 0;
+}
+
+
+
+
+static ssize_t render_callback (const char *key, char *buf,
+                                size_t size, void *opaque) {
+        rd_kafka_broker_t *rkb = opaque;
+
+        if (!strcmp(key, "broker.name")) {
+                char *val, *t;
+                size_t len;
+                rd_kafka_broker_lock(rkb);
+                rd_strdupa(&val, rkb->rkb_nodename);
+                rd_kafka_broker_unlock(rkb);
+
+                /* Just the broker name, no port */
+                if ((t = strchr(val, ':')))
+                        len = (size_t)(t-val);
+                else
+                        len = strlen(val);
+
+                if (buf)
+                        memcpy(buf, val, RD_MIN(len, size));
+
+                return len;
+
+        } else {
+                rd_kafka_conf_res_t res;
+                size_t destsize = size;
+
+                /* Try config lookup. */
+                res = rd_kafka_conf_get(&rkb->rkb_rk->rk_conf, key,
+                                        buf, &destsize);
+                if (res != RD_KAFKA_CONF_OK)
+                        return -1;
+
+                /* Dont include \0 in returned size */
+                return (destsize > 0 ? destsize-1 : destsize);
+        }
+}
+
+
+/**
+ * Execute kinit to refresh ticket.
+ *
+ * Returns 0 on success, -1 on error.
+ *
+ * Locality: any
+ */
+static int rd_kafka_sasl_cyrus_kinit_refresh (rd_kafka_broker_t *rkb) {
+        rd_kafka_t *rk = rkb->rkb_rk;
+        int r;
+        char *cmd;
+        char errstr[128];
+
+        if (!rk->rk_conf.sasl.kinit_cmd ||
+            !strstr(rk->rk_conf.sasl.mechanisms, "GSSAPI"))
+                return 0; /* kinit not configured */
+
+        /* Build kinit refresh command line using string rendering and config */
+        cmd = rd_string_render(rk->rk_conf.sasl.kinit_cmd,
+                               errstr, sizeof(errstr),
+                               render_callback, rkb);
+        if (!cmd) {
+                rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH",
+                           "Failed to construct kinit command "
+                           "from sasl.kerberos.kinit.cmd template: %s",
+                           errstr);
+                return -1;
+        }
+
+        /* Execute kinit */
+        rd_rkb_dbg(rkb, SECURITY, "SASLREFRESH",
+                   "Refreshing SASL keys with command: %s", cmd);
+
+        mtx_lock(&rd_kafka_sasl_cyrus_kinit_lock);
+        r = system(cmd);
+        mtx_unlock(&rd_kafka_sasl_cyrus_kinit_lock);
+
+        if (r == -1) {
+                rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH",
+                           "SASL key refresh failed: Failed to execute %s",
+                           cmd);
+                rd_free(cmd);
+                return -1;
+        } else if (WIFSIGNALED(r)) {
+                rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH",
+                           "SASL key refresh failed: %s: received signal %d",
+                           cmd, WTERMSIG(r));
+                rd_free(cmd);
+                return -1;
+        } else if (WIFEXITED(r) && WEXITSTATUS(r) != 0) {
+                rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH",
+                           "SASL key refresh failed: %s: exited with code %d",
+                           cmd, WEXITSTATUS(r));
+                rd_free(cmd);
+                return -1;
+        }
+
+        rd_free(cmd);
+
+        rd_rkb_dbg(rkb, SECURITY, "SASLREFRESH", "SASL key refreshed");
+        return 0;
+}
+
+
+/**
+ * Refresh timer callback
+ *
+ * Locality: kafka main thread
+ */
+static void rd_kafka_sasl_cyrus_kinit_refresh_tmr_cb (rd_kafka_timers_t *rkts,
+                                                      void *arg) {
+        rd_kafka_broker_t *rkb = arg;
+
+        rd_kafka_sasl_cyrus_kinit_refresh(rkb);
+}
+
+
+
+/**
+ *
+ * libsasl callbacks
+ *
+ */
+static RD_UNUSED int
+rd_kafka_sasl_cyrus_cb_getopt (void *context, const char *plugin_name,
+                         const char *option,
+                         const char **result, unsigned *len) {
+        rd_kafka_transport_t *rktrans = context;
+
+        if (!strcmp(option, "client_mech_list"))
+                *result = "GSSAPI";
+        if (!strcmp(option, "canon_user_plugin"))
+                *result = "INTERNAL";
+
+        if (*result && len)
+                *len = strlen(*result);
+
+        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
+                   "CB_GETOPT: plugin %s, option %s: returning %s",
+                   plugin_name, option, *result);
+
+        return SASL_OK;
+}
+
+static int rd_kafka_sasl_cyrus_cb_log (void *context, int level, const char *message){
+        rd_kafka_transport_t *rktrans = context;
+
+        if (level >= LOG_DEBUG)
+                rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
+                           "%s", message);
+        else
+                rd_rkb_log(rktrans->rktrans_rkb, level, "LIBSASL",
+                           "%s", message);
+        return SASL_OK;
+}
+
+
+static int rd_kafka_sasl_cyrus_cb_getsimple (void *context, int id,
+                                       const char **result, unsigned *len) {
+        rd_kafka_transport_t *rktrans = context;
+
+        switch (id)
+        {
+        case SASL_CB_USER:
+        case SASL_CB_AUTHNAME:
+                *result = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.username;
+                break;
+
+        default:
+                *result = NULL;
+                break;
+        }
+
+        if (len)
+                *len = *result ? strlen(*result) : 0;
+
+        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
+                   "CB_GETSIMPLE: id 0x%x: returning %s", id, *result);
+
+        return *result ? SASL_OK : SASL_FAIL;
+}
+
+
+static int rd_kafka_sasl_cyrus_cb_getsecret (sasl_conn_t *conn, void *context,
+                                       int id, sasl_secret_t **psecret) {
+        rd_kafka_transport_t *rktrans = context;
+        const char *password;
+
+        password = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.password;
+
+        if (!password) {
+                *psecret = NULL;
+        } else {
+                size_t passlen = strlen(password);
+                *psecret = rd_realloc(*psecret, sizeof(**psecret) + passlen);
+                (*psecret)->len = passlen;
+                memcpy((*psecret)->data, password, passlen);
+        }
+
+        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
+                   "CB_GETSECRET: id 0x%x: returning %s",
+                   id, *psecret ? "(hidden)":"NULL");
+
+        return SASL_OK;
+}
+
+static int rd_kafka_sasl_cyrus_cb_chalprompt (void *context, int id,
+                                        const char *challenge,
+                                        const char *prompt,
+                                        const char *defres,
+                                        const char **result, unsigned *len) {
+        rd_kafka_transport_t *rktrans = context;
+
+        *result = "min_chalprompt";
+        *len = strlen(*result);
+
+        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
+                   "CB_CHALPROMPT: id 0x%x, challenge %s, prompt %s, "
+                   "default %s: returning %s",
+                   id, challenge, prompt, defres, *result);
+
+        return SASL_OK;
+}
+
+static int rd_kafka_sasl_cyrus_cb_getrealm (void *context, int id,
+                                      const char **availrealms,
+                                      const char **result) {
+        rd_kafka_transport_t *rktrans = context;
+
+        *result = *availrealms;
+
+        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
+                   "CB_GETREALM: id 0x%x: returning %s", id, *result);
+
+        return SASL_OK;
+}
+
+
+static RD_UNUSED int
+rd_kafka_sasl_cyrus_cb_canon (sasl_conn_t *conn,
+                              void *context,
+                              const char *in, unsigned inlen,
+                              unsigned flags,
+                              const char *user_realm,
+                              char *out, unsigned out_max,
+                              unsigned *out_len) {
+        rd_kafka_transport_t *rktrans = context;
+
+        if (strstr(rktrans->rktrans_rkb->rkb_rk->rk_conf.
+                   sasl.mechanisms, "GSSAPI")) {
+                *out_len = rd_snprintf(out, out_max, "%s",
+                                       rktrans->rktrans_rkb->rkb_rk->
+                                       rk_conf.sasl.principal);
+        } else if (!strcmp(rktrans->rktrans_rkb->rkb_rk->rk_conf.
+                           sasl.mechanisms, "PLAIN")) {
+                *out_len = rd_snprintf(out, out_max, "%.*s", inlen, in);
+        } else
+                out = NULL;
+
+        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
+                   "CB_CANON: flags 0x%x, \"%.*s\" @ \"%s\": returning \"%.*s\"",
+                   flags, (int)inlen, in, user_realm, (int)(*out_len), out);
+
+        return out ? SASL_OK : SASL_FAIL;
+}
+
+
+static void rd_kafka_sasl_cyrus_close (struct rd_kafka_transport_s *rktrans) {
+        rd_kafka_sasl_cyrus_state_t *state = rktrans->rktrans_sasl.state;
+
+        if (!state)
+                return;
+
+        if (state->conn)
+                sasl_dispose(&state->conn);
+        rd_free(state);
+}
+
+
+/**
+ * Initialize and start SASL authentication.
+ *
+ * Returns 0 on successful init and -1 on error.
+ *
+ * Locality: broker thread
+ */
+static int rd_kafka_sasl_cyrus_client_new (rd_kafka_transport_t *rktrans,
+                                           const char *hostname,
+                                           char *errstr, size_t errstr_size) {
+        int r;
+        rd_kafka_sasl_cyrus_state_t *state;
+        rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
+        rd_kafka_t *rk = rkb->rkb_rk;
+        sasl_callback_t callbacks[16] = {
+                // { SASL_CB_GETOPT, (void *)rd_kafka_sasl_cyrus_cb_getopt, rktrans },
+                { SASL_CB_LOG, (void *)rd_kafka_sasl_cyrus_cb_log, rktrans },
+                { SASL_CB_AUTHNAME, (void *)rd_kafka_sasl_cyrus_cb_getsimple, rktrans },
+                { SASL_CB_PASS, (void *)rd_kafka_sasl_cyrus_cb_getsecret, rktrans },
+                { SASL_CB_ECHOPROMPT, (void *)rd_kafka_sasl_cyrus_cb_chalprompt, rktrans },
+                { SASL_CB_GETREALM, (void *)rd_kafka_sasl_cyrus_cb_getrealm, rktrans },
+                { SASL_CB_CANON_USER, (void *)rd_kafka_sasl_cyrus_cb_canon, rktrans },
+                { SASL_CB_LIST_END }
+        };
+
+        state = rd_calloc(1, sizeof(*state));
+        rktrans->rktrans_sasl.state = state;
+
+        /* SASL_CB_USER is needed for PLAIN but breaks GSSAPI */
+        if (!strcmp(rk->rk_conf.sasl.mechanisms, "PLAIN")) {
+                int endidx;
+                /* Find end of callbacks array */
+                for (endidx = 0 ;
+                     callbacks[endidx].id != SASL_CB_LIST_END ; endidx++)
+                        ;
+
+                callbacks[endidx].id = SASL_CB_USER;
+                callbacks[endidx].proc = (void *)rd_kafka_sasl_cyrus_cb_getsimple;
+                callbacks[endidx].context = rktrans;
+                endidx++;
+                callbacks[endidx].id = SASL_CB_LIST_END;
+        }
+
+        memcpy(state->callbacks, callbacks, sizeof(callbacks));
+
+        /* Acquire or refresh ticket if kinit is configured */ 
+        rd_kafka_sasl_cyrus_kinit_refresh(rkb);
+
+        r = sasl_client_new(rk->rk_conf.sasl.service_name, hostname,
+                            NULL, NULL, /* no local & remote IP checks */
+                            state->callbacks, 0, &state->conn);
+        if (r != SASL_OK) {
+                rd_snprintf(errstr, errstr_size, "%s",
+                            sasl_errstring(r, NULL, NULL));
+                return -1;
+        }
+
+        if (rk->rk_conf.debug & RD_KAFKA_DBG_SECURITY) {
+                const char *avail_mechs;
+                sasl_listmech(state->conn, NULL, NULL, " ", NULL,
+                              &avail_mechs, NULL, NULL);
+                rd_rkb_dbg(rkb, SECURITY, "SASL",
+                           "My supported SASL mechanisms: %s", avail_mechs);
+        }
+
+        do {
+                const char *out;
+                unsigned int outlen;
+                const char *mech = NULL;
+
+                r = sasl_client_start(state->conn,
+                                      rk->rk_conf.sasl.mechanisms,
+                                      NULL, &out, &outlen, &mech);
+
+                if (r >= 0)
+                        if (rd_kafka_sasl_send(rktrans, out, outlen,
+                                               errstr, errstr_size))
+                                return -1;
+        } while (r == SASL_INTERACT);
+
+        if (r == SASL_OK) {
+                /* PLAIN is appearantly done here, but we still need to make sure
+                 * the PLAIN frame is sent and we get a response back (but we must
+                 * not pass the response to libsasl or it will fail). */
+                rktrans->rktrans_sasl.complete = 1;
+                return 0;
+
+        } else if (r != SASL_CONTINUE) {
+                rd_snprintf(errstr, errstr_size,
+                            "SASL handshake failed (start (%d)): %s",
+                            r, sasl_errdetail(state->conn));
+                return -1;
+        }
+
+        return 0;
+}
+
+
+
+
+
+
+
+/**
+ * Per handle SASL term.
+ *
+ * Locality: broker thread
+ */
+static void rd_kafka_sasl_cyrus_broker_term (rd_kafka_broker_t *rkb) {
+        rd_kafka_t *rk = rkb->rkb_rk;
+
+        if (!rk->rk_conf.sasl.kinit_cmd)
+                return;
+
+        rd_kafka_timer_stop(&rk->rk_timers, &rkb->rkb_sasl_kinit_refresh_tmr,1);
+}
+
+/**
+ * Broker SASL init.
+ *
+ * Locality: broker thread
+ */
+static void rd_kafka_sasl_cyrus_broker_init (rd_kafka_broker_t *rkb) {
+        rd_kafka_t *rk = rkb->rkb_rk;
+
+        if (!rk->rk_conf.sasl.kinit_cmd ||
+            !strstr(rk->rk_conf.sasl.mechanisms, "GSSAPI"))
+                return; /* kinit not configured, no need to start timer */
+
+        rd_kafka_timer_start(&rk->rk_timers, &rkb->rkb_sasl_kinit_refresh_tmr,
+                             rk->rk_conf.sasl.relogin_min_time * 1000ll,
+                             rd_kafka_sasl_cyrus_kinit_refresh_tmr_cb, rkb);
+}
+
+
+
+static int rd_kafka_sasl_cyrus_conf_validate (rd_kafka_t *rk,
+                                       char *errstr, size_t errstr_size) {
+
+        if (strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI"))
+                return 0;
+
+        if (rk->rk_conf.sasl.kinit_cmd) {
+                rd_kafka_broker_t rkb;
+                char *cmd;
+                char tmperr[128];
+
+                memset(&rkb, 0, sizeof(rkb));
+                strcpy(rkb.rkb_nodename, "ATestBroker:9092");
+                rkb.rkb_rk = rk;
+                mtx_init(&rkb.rkb_lock, mtx_plain);
+
+                cmd = rd_string_render(rk->rk_conf.sasl.kinit_cmd,
+                                       tmperr, sizeof(tmperr),
+                                       render_callback, &rkb);
+
+                mtx_destroy(&rkb.rkb_lock);
+
+                if (!cmd) {
+                        rd_snprintf(errstr, errstr_size,
+                                    "Invalid sasl.kerberos.kinit.cmd value: %s",
+                                    tmperr);
+                        return -1;
+                }
+
+                rd_free(cmd);
+        }
+
+        return 0;
+}
+
+
+/**
+ * Global SASL termination.
+ */
+void rd_kafka_sasl_cyrus_global_term (void) {
+        /* NOTE: Should not be called since the application may be using SASL too*/
+        /* sasl_done(); */
+        mtx_destroy(&rd_kafka_sasl_cyrus_kinit_lock);
+}
+
+
+/**
+ * Global SASL init, called once per runtime.
+ */
+int rd_kafka_sasl_cyrus_global_init (void) {
+        int r;
+
+        mtx_init(&rd_kafka_sasl_cyrus_kinit_lock, mtx_plain);
+
+        r = sasl_client_init(NULL);
+        if (r != SASL_OK) {
+                fprintf(stderr, "librdkafka: sasl_client_init() failed: %s\n",
+                        sasl_errstring(r, NULL, NULL));
+                return -1;
+        }
+
+        return 0;
+}
+
+
+const struct rd_kafka_sasl_provider rd_kafka_sasl_cyrus_provider = {
+        .name          = "Cyrus",
+        .client_new    = rd_kafka_sasl_cyrus_client_new,
+        .recv          = rd_kafka_sasl_cyrus_recv,
+        .close         = rd_kafka_sasl_cyrus_close,
+        .broker_init   = rd_kafka_sasl_cyrus_broker_init,
+        .broker_term   = rd_kafka_sasl_cyrus_broker_term,
+        .conf_validate = rd_kafka_sasl_cyrus_conf_validate
+};

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_int.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_int.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_int.h
new file mode 100644
index 0000000..699174e
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_int.h
@@ -0,0 +1,70 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#pragma once
+
+struct rd_kafka_sasl_provider {
+        const char *name;
+
+        int (*client_new) (rd_kafka_transport_t *rktrans,
+                           const char *hostname,
+                           char *errstr, size_t errstr_size);
+
+        int (*recv) (struct rd_kafka_transport_s *s,
+                     const void *buf, size_t size,
+                     char *errstr, size_t errstr_size);
+        void (*close) (struct rd_kafka_transport_s *);
+
+        void (*broker_init) (rd_kafka_broker_t *rkb);
+        void (*broker_term) (rd_kafka_broker_t *rkb);
+
+        int (*conf_validate) (rd_kafka_t *rk,
+                              char *errstr, size_t errstr_size);
+};
+
+#ifdef _MSC_VER
+extern const struct rd_kafka_sasl_provider rd_kafka_sasl_win32_provider;
+#endif
+
+#if WITH_SASL_CYRUS
+extern const struct rd_kafka_sasl_provider rd_kafka_sasl_cyrus_provider;
+void rd_kafka_sasl_cyrus_global_term (void);
+int rd_kafka_sasl_cyrus_global_init (void);
+#endif
+
+extern const struct rd_kafka_sasl_provider rd_kafka_sasl_plain_provider;
+
+#if WITH_SASL_SCRAM
+extern const struct rd_kafka_sasl_provider rd_kafka_sasl_scram_provider;
+#endif
+
+void rd_kafka_sasl_auth_done (rd_kafka_transport_t *rktrans);
+int rd_kafka_sasl_send (rd_kafka_transport_t *rktrans,
+                        const void *payload, int len,
+                        char *errstr, size_t errstr_size);
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_plain.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_plain.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_plain.c
new file mode 100644
index 0000000..57650ee
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_plain.c
@@ -0,0 +1,128 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2017 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+/**
+ * Builtin SASL PLAIN support when Cyrus SASL is not available
+ */
+#include "rdkafka_int.h"
+#include "rdkafka_transport.h"
+#include "rdkafka_transport_int.h"
+#include "rdkafka_sasl.h"
+#include "rdkafka_sasl_int.h"
+
+
+/**
+ * @brief Handle received frame from broker.
+ */
+static int rd_kafka_sasl_plain_recv (struct rd_kafka_transport_s *rktrans,
+                                     const void *buf, size_t size,
+                                     char *errstr, size_t errstr_size) {
+        if (size)
+                rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLPLAIN",
+                           "Received non-empty SASL PLAIN (builtin) "
+                           "response from broker (%"PRIusz" bytes)", size);
+
+        rd_kafka_sasl_auth_done(rktrans);
+
+        return 0;
+}
+
+
+/**
+ * @brief Initialize and start SASL PLAIN (builtin) authentication.
+ *
+ * Returns 0 on successful init and -1 on error.
+ *
+ * @locality broker thread
+ */
+int rd_kafka_sasl_plain_client_new (rd_kafka_transport_t *rktrans,
+                                    const char *hostname,
+                                    char *errstr, size_t errstr_size) {
+        rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
+        rd_kafka_t *rk = rkb->rkb_rk;
+        /* [authzid] UTF8NUL authcid UTF8NUL passwd */
+        char buf[255+1+255+1+255+1];
+        int of = 0;
+
+        /* authzid: none (empty) */
+        /* UTF8NUL */
+        buf[of++] = 0;
+        /* authcid */
+        if (rk->rk_conf.sasl.username) {
+                int r = (int)strlen(rk->rk_conf.sasl.username);
+                r = RD_MIN(255, r);
+                memcpy(&buf[of], rk->rk_conf.sasl.username, r);
+                of += r;
+        }
+        /* UTF8NUL */
+        buf[of++] = 0;
+        /* passwd */
+        if (rk->rk_conf.sasl.password) {
+                int r = (int)strlen(rk->rk_conf.sasl.password);
+                r = RD_MIN(255, r);
+                memcpy(&buf[of], rk->rk_conf.sasl.password, r);
+                of += r;
+        }
+
+        rd_rkb_dbg(rkb, SECURITY, "SASLPLAIN",
+                   "Sending SASL PLAIN (builtin) authentication token");
+
+        if (rd_kafka_sasl_send(rktrans, buf, of,
+                               errstr, errstr_size))
+                return -1;
+
+        /* PLAIN is appearantly done here, but we still need to make sure
+         * the PLAIN frame is sent and we get a response back (empty) */
+        rktrans->rktrans_sasl.complete = 1;
+        return 0;
+}
+
+
+/**
+ * @brief Validate PLAIN config
+ */
+static int rd_kafka_sasl_plain_conf_validate (rd_kafka_t *rk,
+                                              char *errstr,
+                                              size_t errstr_size) {
+        if (!rk->rk_conf.sasl.username || !rk->rk_conf.sasl.password) {
+                rd_snprintf(errstr, errstr_size,
+                            "sasl.username and sasl.password must be set");
+                return -1;
+        }
+
+        return 0;
+}
+
+
+const struct rd_kafka_sasl_provider rd_kafka_sasl_plain_provider = {
+        .name          = "PLAIN (builtin)",
+        .client_new    = rd_kafka_sasl_plain_client_new,
+        .recv          = rd_kafka_sasl_plain_recv,
+        .conf_validate = rd_kafka_sasl_plain_conf_validate
+};

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_scram.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_scram.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_scram.c
new file mode 100644
index 0000000..968d879
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_scram.c
@@ -0,0 +1,901 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2017 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ *    this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ *    this list of conditions and the following disclaimer in the documentation
+ *    and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+/**
+ * Builtin SASL SCRAM support when Cyrus SASL is not available
+ */
+#include "rdkafka_int.h"
+#include "rdkafka_transport.h"
+#include "rdkafka_transport_int.h"
+#include "rdkafka_sasl.h"
+#include "rdkafka_sasl_int.h"
+#include "rdrand.h"
+
+#if WITH_SSL
+#include <openssl/hmac.h>
+#include <openssl/evp.h>
+#include <openssl/sha.h>
+#else
+#error "WITH_SSL (OpenSSL) is required for SASL SCRAM"
+#endif
+
+
+/**
+ * @brief Per-connection state
+ */
+struct rd_kafka_sasl_scram_state {
+        enum {
+                RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FIRST_MESSAGE,
+                RD_KAFKA_SASL_SCRAM_STATE_SERVER_FIRST_MESSAGE,
+                RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FINAL_MESSAGE,
+        } state;
+        rd_chariov_t cnonce;         /* client c-nonce */
+        rd_chariov_t first_msg_bare; /* client-first-message-bare */
+        char *ServerSignatureB64;    /* ServerSignature in Base64 */
+        const EVP_MD *evp;  /* Hash function pointer */
+};
+
+
+/**
+ * @brief Close and free authentication state
+ */
+static void rd_kafka_sasl_scram_close (rd_kafka_transport_t *rktrans) {
+        struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
+
+        if (!state)
+                return;
+
+        RD_IF_FREE(state->cnonce.ptr, rd_free);
+        RD_IF_FREE(state->first_msg_bare.ptr, rd_free);
+        RD_IF_FREE(state->ServerSignatureB64, rd_free);
+        rd_free(state);
+}
+
+
+
+/**
+ * @brief Generates a nonce string (a random printable string)
+ * @remark dst->ptr will be allocated and must be freed.
+ */
+static void rd_kafka_sasl_scram_generate_nonce (rd_chariov_t *dst) {
+        int i;
+        dst->size = 32;
+        dst->ptr = rd_malloc(dst->size+1);
+        for (i = 0 ; i < (int)dst->size ; i++)
+                dst->ptr[i] = 'a'; // (char)rd_jitter(0x2d/*-*/, 0x7e/*~*/);
+        dst->ptr[i] = 0;
+}
+
+
+/**
+ * @brief Parses inbuf for SCRAM attribute \p attr (e.g., 's')
+ * @returns a newly allocated copy of the value, or NULL
+ *          on failure in which case an error is written to \p errstr
+ *          prefixed by \p description.
+ */
+static char *rd_kafka_sasl_scram_get_attr (const rd_chariov_t *inbuf, char attr,
+                                           const char *description,
+                                           char *errstr, size_t errstr_size) {
+        size_t of = 0;
+
+        for (of = 0 ; of < inbuf->size ; ) {
+                const char *td;
+                size_t len;
+
+                /* Find next delimiter , (if any) */
+                td = memchr(&inbuf->ptr[of], ',', inbuf->size - of);
+                if (td)
+                        len = (size_t)(td - &inbuf->ptr[of]);
+                else
+                        len = inbuf->size - of;
+
+                /* Check if attr "x=" matches */
+                if (inbuf->ptr[of] == attr && inbuf->size > of+1 &&
+                    inbuf->ptr[of+1] == '=') {
+                        char *ret;
+                        of += 2; /* past = */
+                        ret = rd_malloc(len - 2 + 1);
+                        memcpy(ret, &inbuf->ptr[of], len - 2);
+                        ret[len-2] = '\0';
+                        return ret;
+                }
+
+                /* Not the attr we are looking for, skip
+                 * past the next delimiter and continue looking. */
+                of += len+1;
+        }
+
+        rd_snprintf(errstr, errstr_size,
+                    "%s: could not find attribute (%c)",
+                    description, attr);
+        return NULL;
+}
+
+
+/**
+ * @brief Base64 encode binary input \p in
+ * @returns a newly allocated base64 string
+ */
+static char *rd_base64_encode (const rd_chariov_t *in) {
+        BIO *buf, *b64f;
+        BUF_MEM *ptr;
+        char *out;
+
+        b64f = BIO_new(BIO_f_base64());
+        buf = BIO_new(BIO_s_mem());
+        buf = BIO_push(b64f, buf);
+
+        BIO_set_flags(buf, BIO_FLAGS_BASE64_NO_NL);
+        BIO_set_close(buf, BIO_CLOSE);
+        BIO_write(buf, in->ptr, (int)in->size);
+        BIO_flush(buf);
+
+        BIO_get_mem_ptr(buf, &ptr);
+        out = malloc(ptr->length + 1);
+        memcpy(out, ptr->data, ptr->length);
+        out[ptr->length] = '\0';
+
+        BIO_free_all(buf);
+
+        return out;
+}
+
+/**
+ * @brief Base64 decode input string \p in of size \p insize.
+ * @returns -1 on invalid Base64, or 0 on successes in which case a
+ *         newly allocated binary string is set in out (and size).
+ */
+static int rd_base64_decode (const rd_chariov_t *in, rd_chariov_t *out) {
+        size_t asize;
+        BIO *b64, *bmem;
+
+        if (in->size == 0 || (in->size % 4) != 0)
+                return -1;
+
+        asize = (in->size * 3) / 4; /* allocation size */
+        out->ptr = rd_malloc(asize+1);
+
+        b64 = BIO_new(BIO_f_base64());
+        BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
+
+        bmem = BIO_new_mem_buf(in->ptr, (int)in->size);
+        bmem = BIO_push(b64, bmem);
+
+        out->size = BIO_read(bmem, out->ptr, (int)asize+1);
+        assert(out->size <= asize);
+        BIO_free_all(bmem);
+
+#if ENABLE_DEVEL
+        /* Verify that decode==encode */
+        {
+                char *encoded = rd_base64_encode(out);
+                assert(strlen(encoded) == in->size);
+                assert(!strncmp(encoded, in->ptr, in->size));
+                rd_free(encoded);
+        }
+#endif
+
+        return 0;
+}
+
+
+/**
+ * @brief Perform H(str) hash function and stores the result in \p out
+ *        which must be at least EVP_MAX_MD_SIZE.
+ * @returns 0 on success, else -1
+ */
+static int
+rd_kafka_sasl_scram_H (rd_kafka_transport_t *rktrans,
+                       const rd_chariov_t *str,
+                       rd_chariov_t *out) {
+
+        rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_H(
+                (const unsigned char *)str->ptr, str->size,
+                (unsigned char *)out->ptr);
+
+        out->size = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_H_size;
+        return 0;
+}
+
+/**
+ * @brief Perform HMAC(key,str) and stores the result in \p out
+ *        which must be at least EVP_MAX_MD_SIZE.
+ * @returns 0 on success, else -1
+ */
+static int
+rd_kafka_sasl_scram_HMAC (rd_kafka_transport_t *rktrans,
+                          const rd_chariov_t *key,
+                          const rd_chariov_t *str,
+                          rd_chariov_t *out) {
+        const EVP_MD *evp =
+                rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_evp;
+        unsigned int outsize;
+
+        //printf("HMAC KEY: %s\n", rd_base64_encode(key));
+        //printf("HMAC STR: %s\n", rd_base64_encode(str));
+
+        if (!HMAC(evp,
+                  (const unsigned char *)key->ptr, (int)key->size,
+                  (const unsigned char *)str->ptr, (int)str->size,
+                  (unsigned char *)out->ptr, &outsize)) {
+                rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM",
+                           "HMAC failed");
+                return -1;
+        }
+
+        out->size = outsize;
+        //printf("HMAC OUT: %s\n", rd_base64_encode(out));
+
+        return 0;
+}
+
+
+
+/**
+ * @brief Perform \p itcnt iterations of HMAC() on the given buffer \p in
+ *        using \p salt, writing the output into \p out which must be
+ *        at least EVP_MAX_MD_SIZE. Actual size is updated in \p *outsize.
+ * @returns 0 on success, else -1
+ */
+static int
+rd_kafka_sasl_scram_Hi (rd_kafka_transport_t *rktrans,
+                        const rd_chariov_t *in,
+                        const rd_chariov_t *salt,
+                        int itcnt, rd_chariov_t *out) {
+        const EVP_MD *evp =
+                rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_evp;
+        unsigned int  ressize = 0;
+        unsigned char tempres[EVP_MAX_MD_SIZE];
+        unsigned char *saltplus;
+        int i;
+
+        /* U1   := HMAC(str, salt + INT(1)) */
+        saltplus = rd_alloca(salt->size + 4);
+        memcpy(saltplus, salt->ptr, salt->size);
+        saltplus[salt->size]   = 0;
+        saltplus[salt->size+1] = 0;
+        saltplus[salt->size+2] = 0;
+        saltplus[salt->size+3] = 1;
+
+        /* U1   := HMAC(str, salt + INT(1)) */
+        if (!HMAC(evp,
+                  (const unsigned char *)in->ptr, (int)in->size,
+                  saltplus, salt->size+4,
+                  tempres, &ressize)) {
+                rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM",
+                           "HMAC priming failed");
+                return -1;
+        }
+
+        memcpy(out->ptr, tempres, ressize);
+
+        /* Ui-1 := HMAC(str, Ui-2) ..  */
+        for (i = 1 ; i < itcnt ; i++) {
+                unsigned char tempdest[EVP_MAX_MD_SIZE];
+                int j;
+
+                if (unlikely(!HMAC(evp,
+                                   (const unsigned char *)in->ptr, (int)in->size,
+                                   tempres, ressize,
+                                   tempdest, NULL))) {
+                        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM",
+                                   "Hi() HMAC #%d/%d failed", i, itcnt);
+                        return -1;
+                }
+
+                /* U1 XOR U2 .. */
+                for (j = 0 ; j < (int)ressize ; j++) {
+                        out->ptr[j] ^= tempdest[j];
+                        tempres[j] = tempdest[j];
+                }
+        }
+
+        out->size = ressize;
+
+        return 0;
+}
+
+
+/**
+ * @returns a SASL value-safe-char encoded string, replacing "," and "="
+ *          with their escaped counterparts in a newly allocated string.
+ */
+static char *rd_kafka_sasl_safe_string (const char *str) {
+        char *safe = NULL, *d = NULL/*avoid warning*/;
+        int pass;
+        size_t len = 0;
+
+        /* Pass #1: scan for needed length and allocate.
+         * Pass #2: encode string */
+        for (pass = 0 ; pass < 2 ; pass++) {
+                const char *s;
+                for (s = str ; *s ; s++) {
+                        if (pass == 0) {
+                                len += 1 + (*s == ',' || *s == '=');
+                                continue;
+                        }
+
+                        if (*s == ',') {
+                                *(d++) = '=';
+                                *(d++) = '2';
+                                *(d++) = 'C';
+                        } else if (*s == '=') {
+                                *(d++) = '=';
+                                *(d++) = '3';
+                                *(d++) = 'D';
+                        } else
+                                *(d++) = *s;
+                }
+
+                if (pass == 0)
+                        d = safe = rd_malloc(len+1);
+        }
+
+        rd_assert(d == safe + (int)len);
+        *d = '\0';
+
+        return safe;
+}
+
+
+/**
+ * @brief Build client-final-message-without-proof
+ * @remark out->ptr will be allocated and must be freed.
+ */
+static void
+rd_kafka_sasl_scram_build_client_final_message_wo_proof (
+        struct rd_kafka_sasl_scram_state *state,
+        const char *snonce,
+        rd_chariov_t *out) {
+        const char *attr_c = "biws"; /* base64 encode of "n,," */
+
+        /*
+         * client-final-message-without-proof =
+         *            channel-binding "," nonce [","
+         *            extensions]
+         */
+        out->size = strlen("c=,r=") + strlen(attr_c) +
+                state->cnonce.size + strlen(snonce);
+        out->ptr = rd_malloc(out->size+1);
+        rd_snprintf(out->ptr, out->size+1, "c=%s,r=%.*s%s",
+                    attr_c, (int)state->cnonce.size, state->cnonce.ptr, snonce);
+}
+
+
+/**
+ * @brief Build client-final-message
+ * @returns -1 on error.
+ */
+static int
+rd_kafka_sasl_scram_build_client_final_message (
+        rd_kafka_transport_t *rktrans,
+        const rd_chariov_t *salt,
+        const char *server_nonce,
+        const rd_chariov_t *server_first_msg,
+        int itcnt, rd_chariov_t *out) {
+        struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
+        const rd_kafka_conf_t *conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf;
+        rd_chariov_t SaslPassword =
+                { .ptr = conf->sasl.password,
+                  .size = strlen(conf->sasl.password) };
+        rd_chariov_t SaltedPassword =
+                { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
+        rd_chariov_t ClientKey =
+                { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
+        rd_chariov_t ServerKey =
+                { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
+        rd_chariov_t StoredKey =
+                { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
+        rd_chariov_t AuthMessage = RD_ZERO_INIT;
+        rd_chariov_t ClientSignature =
+                { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
+        rd_chariov_t ServerSignature =
+                { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
+        const rd_chariov_t ClientKeyVerbatim =
+                { .ptr = "Client Key", .size = 10 };
+        const rd_chariov_t ServerKeyVerbatim =
+                { .ptr = "Server Key", .size = 10 };
+        rd_chariov_t ClientProof =
+                { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
+        rd_chariov_t client_final_msg_wo_proof;
+        char *ClientProofB64;
+        int i;
+
+        /* Constructing the ClientProof attribute (p):
+         *
+         * p = Base64-encoded ClientProof
+         * SaltedPassword  := Hi(Normalize(password), salt, i)
+         * ClientKey       := HMAC(SaltedPassword, "Client Key")
+         * StoredKey       := H(ClientKey)
+         * AuthMessage     := client-first-message-bare + "," +
+         *                    server-first-message + "," +
+         *                    client-final-message-without-proof
+         * ClientSignature := HMAC(StoredKey, AuthMessage)
+         * ClientProof     := ClientKey XOR ClientSignature
+         * ServerKey       := HMAC(SaltedPassword, "Server Key")
+         * ServerSignature := HMAC(ServerKey, AuthMessage)
+         */
+
+        /* SaltedPassword  := Hi(Normalize(password), salt, i) */
+        if (rd_kafka_sasl_scram_Hi(
+                    rktrans, &SaslPassword, salt,
+                    itcnt, &SaltedPassword) == -1)
+                return -1;
+
+        /* ClientKey       := HMAC(SaltedPassword, "Client Key") */
+        if (rd_kafka_sasl_scram_HMAC(
+                    rktrans, &SaltedPassword, &ClientKeyVerbatim,
+                    &ClientKey) == -1)
+                return -1;
+
+        /* StoredKey       := H(ClientKey) */
+        if (rd_kafka_sasl_scram_H(rktrans, &ClientKey, &StoredKey) == -1)
+                return -1;
+
+        /* client-final-message-without-proof */
+        rd_kafka_sasl_scram_build_client_final_message_wo_proof(
+                state, server_nonce, &client_final_msg_wo_proof);
+
+        /* AuthMessage     := client-first-message-bare + "," +
+         *                    server-first-message + "," +
+         *                    client-final-message-without-proof */
+        AuthMessage.size =
+                state->first_msg_bare.size + 1 +
+                server_first_msg->size + 1 +
+                client_final_msg_wo_proof.size;
+        AuthMessage.ptr = rd_alloca(AuthMessage.size+1);
+        rd_snprintf(AuthMessage.ptr, AuthMessage.size+1,
+                    "%.*s,%.*s,%.*s",
+                    (int)state->first_msg_bare.size, state->first_msg_bare.ptr,
+                    (int)server_first_msg->size, server_first_msg->ptr,
+                    (int)client_final_msg_wo_proof.size,
+                    client_final_msg_wo_proof.ptr);
+
+        /*
+         * Calculate ServerSignature for later verification when
+         * server-final-message is received.
+         */
+
+        /* ServerKey       := HMAC(SaltedPassword, "Server Key") */
+        if (rd_kafka_sasl_scram_HMAC(
+                    rktrans, &SaltedPassword, &ServerKeyVerbatim,
+                    &ServerKey) == -1) {
+                rd_free(client_final_msg_wo_proof.ptr);
+                return -1;
+        }
+
+        /* ServerSignature := HMAC(ServerKey, AuthMessage) */
+        if (rd_kafka_sasl_scram_HMAC(rktrans, &ServerKey,
+                                     &AuthMessage, &ServerSignature) == -1) {
+                rd_free(client_final_msg_wo_proof.ptr);
+                return -1;
+        }
+
+        /* Store the Base64 encoded ServerSignature for quick comparison */
+        state->ServerSignatureB64 = rd_base64_encode(&ServerSignature);
+
+
+        /*
+         * Continue with client-final-message
+         */
+
+        /* ClientSignature := HMAC(StoredKey, AuthMessage) */
+        if (rd_kafka_sasl_scram_HMAC(rktrans, &StoredKey,
+                                     &AuthMessage, &ClientSignature) == -1) {
+                rd_free(client_final_msg_wo_proof.ptr);
+                return -1;
+        }
+
+        /* ClientProof     := ClientKey XOR ClientSignature */
+        assert(ClientKey.size == ClientSignature.size);
+        for (i = 0 ; i < (int)ClientKey.size ; i++)
+                ClientProof.ptr[i] = ClientKey.ptr[i] ^ ClientSignature.ptr[i];
+        ClientProof.size = ClientKey.size;
+
+
+        /* Base64 encoded ClientProof */
+        ClientProofB64 = rd_base64_encode(&ClientProof);
+
+        /* Construct client-final-message */
+        out->size = client_final_msg_wo_proof.size +
+                strlen(",p=") + strlen(ClientProofB64);
+        out->ptr = rd_malloc(out->size + 1);
+
+        rd_snprintf(out->ptr, out->size+1,
+                    "%.*s,p=%s",
+                    (int)client_final_msg_wo_proof.size,
+                    client_final_msg_wo_proof.ptr,
+                    ClientProofB64);
+        rd_free(ClientProofB64);
+        rd_free(client_final_msg_wo_proof.ptr);
+
+        return 0;
+}
+
+
+/**
+ * @brief Handle first message from server
+ *
+ * Parse server response which looks something like:
+ * "r=fyko+d2lbbFgONR....,s=QSXCR+Q6sek8bf92,i=4096"
+ *
+ * @returns -1 on error.
+ */
+static int
+rd_kafka_sasl_scram_handle_server_first_message (rd_kafka_transport_t *rktrans,
+                                                 const rd_chariov_t *in,
+                                                 rd_chariov_t *out,
+                                                 char *errstr,
+                                                 size_t errstr_size) {
+        struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
+        char *server_nonce;
+        rd_chariov_t salt_b64, salt;
+        char *itcntstr;
+        const char *endptr;
+        int itcnt;
+        char *attr_m;
+
+        /* Mandatory future extension check */
+        if ((attr_m = rd_kafka_sasl_scram_get_attr(
+                     in, 'm', NULL, NULL, 0))) {
+                rd_snprintf(errstr, errstr_size,
+                            "Unsupported mandatory SCRAM extension");
+                rd_free(attr_m);
+                return -1;
+        }
+
+        /* Server nonce */
+        if (!(server_nonce = rd_kafka_sasl_scram_get_attr(
+                      in, 'r',
+                      "Server nonce in server-first-message",
+                      errstr, errstr_size)))
+                return -1;
+
+        if (strlen(server_nonce) <= state->cnonce.size ||
+            strncmp(state->cnonce.ptr, server_nonce, state->cnonce.size)) {
+                rd_snprintf(errstr, errstr_size,
+                            "Server/client nonce mismatch in "
+                            "server-first-message");
+                rd_free(server_nonce);
+                return -1;
+        }
+
+        /* Salt (Base64) */
+        if (!(salt_b64.ptr = rd_kafka_sasl_scram_get_attr(
+                      in, 's',
+                      "Salt in server-first-message",
+                      errstr, errstr_size))) {
+                rd_free(server_nonce);
+                return -1;
+        }
+        salt_b64.size = strlen(salt_b64.ptr);
+
+        /* Convert Salt to binary */
+        if (rd_base64_decode(&salt_b64, &salt) == -1) {
+                rd_snprintf(errstr, errstr_size,
+                            "Invalid Base64 Salt in server-first-message");
+                rd_free(server_nonce);
+                rd_free(salt_b64.ptr);
+        }
+        rd_free(salt_b64.ptr);
+
+        /* Iteration count (as string) */
+        if (!(itcntstr = rd_kafka_sasl_scram_get_attr(
+                      in, 'i',
+                      "Iteration count in server-first-message",
+                      errstr, errstr_size))) {
+                rd_free(server_nonce);
+                rd_free(salt.ptr);
+                return -1;
+        }
+
+        /* Iteration count (as int) */
+        errno = 0;
+        itcnt = (int)strtoul(itcntstr, (char **)&endptr, 10);
+        if (itcntstr == endptr || *endptr != '\0' || errno != 0 ||
+            itcnt > 1000000) {
+                rd_snprintf(errstr, errstr_size,
+                            "Invalid value (not integer or too large) "
+                            "for Iteration count in server-first-message");
+                rd_free(server_nonce);
+                rd_free(salt.ptr);
+                rd_free(itcntstr);
+                return -1;
+        }
+        rd_free(itcntstr);
+
+        /* Build client-final-message */
+        if (rd_kafka_sasl_scram_build_client_final_message(
+                    rktrans, &salt, server_nonce, in, itcnt, out) == -1) {
+                rd_snprintf(errstr, errstr_size,
+                            "Failed to build SCRAM client-final-message");
+                rd_free(salt.ptr);
+                rd_free(server_nonce);
+                return -1;
+        }
+
+        rd_free(server_nonce);
+        rd_free(salt.ptr);
+
+        return 0;
+}
+
+/**
+ * @brief Handle server-final-message
+ * 
+ *        This is the end of authentication and the SCRAM state
+ *        will be freed at the end of this function regardless of
+ *        authentication outcome.
+ *
+ * @returns -1 on failure
+ */
+static int
+rd_kafka_sasl_scram_handle_server_final_message (
+        rd_kafka_transport_t *rktrans,
+        const rd_chariov_t *in,
+        char *errstr, size_t errstr_size) {
+        struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
+        char *attr_v, *attr_e;
+
+        if ((attr_e = rd_kafka_sasl_scram_get_attr(
+                            in, 'e', "server-error in server-final-message",
+                            errstr, errstr_size))) {
+                /* Authentication failed */
+
+                rd_snprintf(errstr, errstr_size,
+                            "SASL SCRAM authentication failed: "
+                            "broker responded with %s",
+                            attr_e);
+                rd_free(attr_e);
+                return -1;
+
+        } else if ((attr_v = rd_kafka_sasl_scram_get_attr(
+                     in, 'v', "verifier in server-final-message",
+                     errstr, errstr_size))) {
+                const rd_kafka_conf_t *conf;
+
+                /* Authentication succesful on server,
+                 * but we need to verify the ServerSignature too. */
+                rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY | RD_KAFKA_DBG_BROKER,
+                           "SCRAMAUTH",
+                           "SASL SCRAM authentication succesful on server: "
+                           "verifying ServerSignature");
+
+                if (strcmp(attr_v, state->ServerSignatureB64)) {
+                        rd_snprintf(errstr, errstr_size,
+                                    "SASL SCRAM authentication failed: "
+                                    "ServerSignature mismatch "
+                                    "(server's %s != ours %s)",
+                                    attr_v, state->ServerSignatureB64);
+                        rd_free(attr_v);
+                        return -1;
+                }
+                rd_free(attr_v);
+
+                conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf;
+
+                rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY | RD_KAFKA_DBG_BROKER,
+                           "SCRAMAUTH",
+                           "Authenticated as %s using %s",
+                           conf->sasl.username,
+                           conf->sasl.mechanisms);
+
+                rd_kafka_sasl_auth_done(rktrans);
+                return 0;
+
+        } else {
+                rd_snprintf(errstr, errstr_size,
+                            "SASL SCRAM authentication failed: "
+                            "no verifier or server-error returned from broker");
+                return -1;
+        }
+}
+
+
+
+/**
+ * @brief Build client-first-message
+ */
+static void
+rd_kafka_sasl_scram_build_client_first_message (
+        rd_kafka_transport_t *rktrans,
+        rd_chariov_t *out) {
+        char *sasl_username;
+        struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
+        const rd_kafka_conf_t *conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf;
+
+        rd_kafka_sasl_scram_generate_nonce(&state->cnonce);
+
+        sasl_username = rd_kafka_sasl_safe_string(conf->sasl.username);
+
+        out->size = strlen("n,,n=,r=") + strlen(sasl_username) +
+                state->cnonce.size;
+        out->ptr = rd_malloc(out->size+1);
+
+        rd_snprintf(out->ptr, out->size+1,
+                    "n,,n=%s,r=%.*s",
+                    sasl_username,
+                    (int)state->cnonce.size, state->cnonce.ptr);
+        rd_free(sasl_username);
+
+        /* Save client-first-message-bare (skip gs2-header) */
+        state->first_msg_bare.size = out->size-3;
+        state->first_msg_bare.ptr  = rd_memdup(out->ptr+3,
+                                               state->first_msg_bare.size);
+}
+
+
+
+/**
+ * @brief SASL SCRAM client state machine
+ * @returns -1 on failure (errstr set), else 0.
+ */
+static int rd_kafka_sasl_scram_fsm (rd_kafka_transport_t *rktrans,
+                                    const rd_chariov_t *in,
+                                    char *errstr, size_t errstr_size) {
+        static const char *state_names[] = {
+                "client-first-message",
+                "server-first-message",
+                "client-final-message",
+        };
+        struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
+        rd_chariov_t out = RD_ZERO_INIT;
+        int r = -1;
+        rd_ts_t ts_start = rd_clock();
+        int prev_state = state->state;
+
+        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLSCRAM",
+                   "SASL SCRAM client in state %s",
+                   state_names[state->state]);
+
+        switch (state->state)
+        {
+        case RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FIRST_MESSAGE:
+                rd_dassert(!in); /* Not expecting any server-input */
+
+                rd_kafka_sasl_scram_build_client_first_message(rktrans, &out);
+                state->state = RD_KAFKA_SASL_SCRAM_STATE_SERVER_FIRST_MESSAGE;
+                break;
+
+
+        case RD_KAFKA_SASL_SCRAM_STATE_SERVER_FIRST_MESSAGE:
+                rd_dassert(in); /* Requires server-input */
+
+                if (rd_kafka_sasl_scram_handle_server_first_message(
+                             rktrans, in, &out, errstr, errstr_size) == -1)
+                        return -1;
+
+                state->state = RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FINAL_MESSAGE;
+                break;
+
+        case RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FINAL_MESSAGE:
+                rd_dassert(in);  /* Requires server-input */
+
+                r = rd_kafka_sasl_scram_handle_server_final_message(
+                        rktrans, in, errstr, errstr_size);
+                break;
+        }
+
+        if (out.ptr) {
+                r = rd_kafka_sasl_send(rktrans, out.ptr, (int)out.size,
+                                       errstr, errstr_size);
+                rd_free(out.ptr);
+        }
+
+        ts_start = (rd_clock() - ts_start) / 1000;
+        if (ts_start >= 100)
+                rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM",
+                           "SASL SCRAM state %s handled in %"PRId64"ms",
+                           state_names[prev_state], ts_start);
+
+
+        return r;
+}
+
+
+/**
+ * @brief Handle received frame from broker.
+ */
+static int rd_kafka_sasl_scram_recv (rd_kafka_transport_t *rktrans,
+                                     const void *buf, size_t size,
+                                     char *errstr, size_t errstr_size) {
+        const rd_chariov_t in = { .ptr = (char *)buf, .size = size };
+        return rd_kafka_sasl_scram_fsm(rktrans, &in, errstr, errstr_size);
+}
+
+
+/**
+ * @brief Initialize and start SASL SCRAM (builtin) authentication.
+ *
+ * Returns 0 on successful init and -1 on error.
+ *
+ * @locality broker thread
+ */
+static int rd_kafka_sasl_scram_client_new (rd_kafka_transport_t *rktrans,
+                                    const char *hostname,
+                                    char *errstr, size_t errstr_size) {
+        struct rd_kafka_sasl_scram_state *state;
+
+        state = rd_calloc(1, sizeof(*state));
+        state->state = RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FIRST_MESSAGE;
+        rktrans->rktrans_sasl.state = state;
+
+        /* Kick off the FSM */
+        return rd_kafka_sasl_scram_fsm(rktrans, NULL, errstr, errstr_size);
+}
+
+
+
+/**
+ * @brief Validate SCRAM config and look up the hash function
+ */
+static int rd_kafka_sasl_scram_conf_validate (rd_kafka_t *rk,
+                                              char *errstr,
+                                              size_t errstr_size) {
+        const char *mech = rk->rk_conf.sasl.mechanisms;
+
+        if (!rk->rk_conf.sasl.username || !rk->rk_conf.sasl.password) {
+                rd_snprintf(errstr, errstr_size,
+                            "sasl.username and sasl.password must be set");
+                return -1;
+        }
+
+        if (!strcmp(mech, "SCRAM-SHA-1")) {
+                rk->rk_conf.sasl.scram_evp = EVP_sha1();
+                rk->rk_conf.sasl.scram_H = SHA1;
+                rk->rk_conf.sasl.scram_H_size = SHA_DIGEST_LENGTH;
+        } else if (!strcmp(mech, "SCRAM-SHA-256")) {
+                rk->rk_conf.sasl.scram_evp = EVP_sha256();
+                rk->rk_conf.sasl.scram_H = SHA256;
+                rk->rk_conf.sasl.scram_H_size = SHA256_DIGEST_LENGTH;
+        } else if (!strcmp(mech, "SCRAM-SHA-512")) {
+                rk->rk_conf.sasl.scram_evp = EVP_sha512();
+                rk->rk_conf.sasl.scram_H = SHA512;
+                rk->rk_conf.sasl.scram_H_size = SHA512_DIGEST_LENGTH;
+        } else {
+                rd_snprintf(errstr, errstr_size,
+                            "Unsupported hash function: %s "
+                            "(try SCRAM-SHA-512)",
+                            mech);
+                return -1;
+        }
+
+        return 0;
+}
+
+
+
+
+const struct rd_kafka_sasl_provider rd_kafka_sasl_scram_provider = {
+        .name          = "SCRAM (builtin)",
+        .client_new    = rd_kafka_sasl_scram_client_new,
+        .recv          = rd_kafka_sasl_scram_recv,
+        .close         = rd_kafka_sasl_scram_close,
+        .conf_validate = rd_kafka_sasl_scram_conf_validate,
+};