You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bq...@apache.org on 2017/11/16 21:15:38 UTC
[08/42] nifi-minifi-cpp git commit: MINIFICPP-274: PutKafka Processor
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl.c
new file mode 100644
index 0000000..9ece5cb
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl.c
@@ -0,0 +1,343 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_transport.h"
+#include "rdkafka_transport_int.h"
+#include "rdkafka_sasl.h"
+#include "rdkafka_sasl_int.h"
+
+
+ /**
+ * Send auth message with framing.
+ * This is a blocking call.
+ */
+int rd_kafka_sasl_send (rd_kafka_transport_t *rktrans,
+ const void *payload, int len,
+ char *errstr, size_t errstr_size) {
+ rd_buf_t buf;
+ rd_slice_t slice;
+ int32_t hdr;
+
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
+ "Send SASL frame to broker (%d bytes)", len);
+
+ rd_buf_init(&buf, 1+1, sizeof(hdr));
+
+ hdr = htobe32(len);
+ rd_buf_write(&buf, &hdr, sizeof(hdr));
+ if (payload)
+ rd_buf_push(&buf, payload, len, NULL);
+
+ rd_slice_init_full(&slice, &buf);
+
+ /* Simulate blocking behaviour on non-blocking socket..
+ * FIXME: This isn't optimal but is highly unlikely to stall since
+ * the socket buffer will most likely not be exceeded. */
+ do {
+ int r;
+
+ r = (int)rd_kafka_transport_send(rktrans, &slice,
+ errstr, errstr_size);
+ if (r == -1) {
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
+ "SASL send failed: %s", errstr);
+ rd_buf_destroy(&buf);
+ return -1;
+ }
+
+ if (rd_slice_remains(&slice) == 0)
+ break;
+
+ /* Avoid busy-looping */
+ rd_usleep(10*1000, NULL);
+
+ } while (1);
+
+ rd_buf_destroy(&buf);
+
+ return 0;
+}
+
+
+/**
+ * @brief Authentication succesful
+ *
+ * Transition to next connect state.
+ */
+void rd_kafka_sasl_auth_done (rd_kafka_transport_t *rktrans) {
+ /* Authenticated */
+ rd_kafka_broker_connect_up(rktrans->rktrans_rkb);
+}
+
+
+int rd_kafka_sasl_io_event (rd_kafka_transport_t *rktrans, int events,
+ char *errstr, size_t errstr_size) {
+ rd_kafka_buf_t *rkbuf;
+ int r;
+ const void *buf;
+ size_t len;
+
+ if (!(events & POLLIN))
+ return 0;
+
+ r = rd_kafka_transport_framed_recv(rktrans, &rkbuf,
+ errstr, errstr_size);
+ if (r == -1) {
+ if (!strcmp(errstr, "Disconnected"))
+ rd_snprintf(errstr, errstr_size,
+ "Disconnected: check client %s credentials "
+ "and broker logs",
+ rktrans->rktrans_rkb->rkb_rk->rk_conf.
+ sasl.mechanisms);
+ return -1;
+ } else if (r == 0) /* not fully received yet */
+ return 0;
+
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
+ "Received SASL frame from broker (%"PRIusz" bytes)",
+ rkbuf ? rkbuf->rkbuf_totlen : 0);
+
+ if (rkbuf) {
+ rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
+ /* Seek past framing header */
+ rd_slice_seek(&rkbuf->rkbuf_reader, 4);
+ len = rd_slice_remains(&rkbuf->rkbuf_reader);
+ buf = rd_slice_ensure_contig(&rkbuf->rkbuf_reader, len);
+ } else {
+ buf = NULL;
+ len = 0;
+ }
+
+ r = rktrans->rktrans_rkb->rkb_rk->
+ rk_conf.sasl.provider->recv(rktrans, buf, len,
+ errstr, errstr_size);
+ rd_kafka_buf_destroy(rkbuf);
+
+ return r;
+}
+
+
+/**
+ * @brief Close SASL session (from transport code)
+ * @remark May be called on non-SASL transports (no-op)
+ */
+void rd_kafka_sasl_close (rd_kafka_transport_t *rktrans) {
+ const struct rd_kafka_sasl_provider *provider =
+ rktrans->rktrans_rkb->rkb_rk->rk_conf.
+ sasl.provider;
+
+ if (provider && provider->close)
+ provider->close(rktrans);
+}
+
+
+
+/**
+ * Initialize and start SASL authentication.
+ *
+ * Returns 0 on successful init and -1 on error.
+ *
+ * Locality: broker thread
+ */
+int rd_kafka_sasl_client_new (rd_kafka_transport_t *rktrans,
+ char *errstr, size_t errstr_size) {
+ int r;
+ rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
+ rd_kafka_t *rk = rkb->rkb_rk;
+ char *hostname, *t;
+ const struct rd_kafka_sasl_provider *provider =
+ rk->rk_conf.sasl.provider;
+
+ /* Verify broker support:
+ * - RD_KAFKA_FEATURE_SASL_GSSAPI - GSSAPI supported
+ * - RD_KAFKA_FEATURE_SASL_HANDSHAKE - GSSAPI, PLAIN and possibly
+ * other mechanisms supported. */
+ if (!strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) {
+ if (!(rkb->rkb_features & RD_KAFKA_FEATURE_SASL_GSSAPI)) {
+ rd_snprintf(errstr, errstr_size,
+ "SASL GSSAPI authentication not supported "
+ "by broker");
+ return -1;
+ }
+ } else if (!(rkb->rkb_features & RD_KAFKA_FEATURE_SASL_HANDSHAKE)) {
+ rd_snprintf(errstr, errstr_size,
+ "SASL Handshake not supported by broker "
+ "(required by mechanism %s)%s",
+ rk->rk_conf.sasl.mechanisms,
+ rk->rk_conf.api_version_request ? "" :
+ ": try api.version.request=true");
+ return -1;
+ }
+
+ rd_strdupa(&hostname, rktrans->rktrans_rkb->rkb_nodename);
+ if ((t = strchr(hostname, ':')))
+ *t = '\0'; /* remove ":port" */
+
+ rd_rkb_dbg(rkb, SECURITY, "SASL",
+ "Initializing SASL client: service name %s, "
+ "hostname %s, mechanisms %s, provider %s",
+ rk->rk_conf.sasl.service_name, hostname,
+ rk->rk_conf.sasl.mechanisms,
+ provider->name);
+
+ r = provider->client_new(rktrans, hostname, errstr, errstr_size);
+ if (r != -1)
+ rd_kafka_transport_poll_set(rktrans, POLLIN);
+
+ return r;
+}
+
+
+
+
+
+
+
+/**
+ * Per handle SASL term.
+ *
+ * Locality: broker thread
+ */
+void rd_kafka_sasl_broker_term (rd_kafka_broker_t *rkb) {
+ const struct rd_kafka_sasl_provider *provider =
+ rkb->rkb_rk->rk_conf.sasl.provider;
+ if (provider->broker_term)
+ provider->broker_term(rkb);
+}
+
+/**
+ * Broker SASL init.
+ *
+ * Locality: broker thread
+ */
+void rd_kafka_sasl_broker_init (rd_kafka_broker_t *rkb) {
+ const struct rd_kafka_sasl_provider *provider =
+ rkb->rkb_rk->rk_conf.sasl.provider;
+ if (provider->broker_init)
+ provider->broker_init(rkb);
+}
+
+
+
+/**
+ * @brief Select SASL provider for configured mechanism (singularis)
+ * @returns 0 on success or -1 on failure.
+ */
+int rd_kafka_sasl_select_provider (rd_kafka_t *rk,
+ char *errstr, size_t errstr_size) {
+ const struct rd_kafka_sasl_provider *provider = NULL;
+
+ if (!strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) {
+ /* GSSAPI / Kerberos */
+#ifdef _MSC_VER
+ provider = &rd_kafka_sasl_win32_provider;
+#elif WITH_SASL_CYRUS
+ provider = &rd_kafka_sasl_cyrus_provider;
+#endif
+
+ } else if (!strcmp(rk->rk_conf.sasl.mechanisms, "PLAIN")) {
+ /* SASL PLAIN */
+ provider = &rd_kafka_sasl_plain_provider;
+
+ } else if (!strncmp(rk->rk_conf.sasl.mechanisms, "SCRAM-SHA-",
+ strlen("SCRAM-SHA-"))) {
+ /* SASL SCRAM */
+#if WITH_SASL_SCRAM
+ provider = &rd_kafka_sasl_scram_provider;
+#endif
+
+ } else {
+ /* Unsupported mechanism */
+ rd_snprintf(errstr, errstr_size,
+ "Unsupported SASL mechanism: %s",
+ rk->rk_conf.sasl.mechanisms);
+ return -1;
+ }
+
+ if (!provider) {
+ rd_snprintf(errstr, errstr_size,
+ "No provider for SASL mechanism %s"
+ ": recompile librdkafka with "
+#ifndef _MSC_VER
+ "libsasl2 or "
+#endif
+ "openssl support. "
+ "Current build options:"
+ " PLAIN"
+#ifdef _MSC_VER
+ " WindowsSSPI(GSSAPI)"
+#endif
+#if WITH_SASL_CYRUS
+ " SASL_CYRUS"
+#endif
+#if WITH_SASL_SCRAM
+ " SASL_SCRAM"
+#endif
+ ,
+ rk->rk_conf.sasl.mechanisms);
+ return -1;
+ }
+
+ rd_kafka_dbg(rk, SECURITY, "SASL",
+ "Selected provider %s for SASL mechanism %s",
+ provider->name, rk->rk_conf.sasl.mechanisms);
+
+ /* Validate SASL config */
+ if (provider->conf_validate &&
+ provider->conf_validate(rk, errstr, errstr_size) == -1)
+ return -1;
+
+ rk->rk_conf.sasl.provider = provider;
+
+ return 0;
+}
+
+
+
+/**
+ * Global SASL termination.
+ */
+void rd_kafka_sasl_global_term (void) {
+#if WITH_SASL_CYRUS
+ rd_kafka_sasl_cyrus_global_term();
+#endif
+}
+
+
+/**
+ * Global SASL init, called once per runtime.
+ */
+int rd_kafka_sasl_global_init (void) {
+#if WITH_SASL_CYRUS
+ return rd_kafka_sasl_cyrus_global_init();
+#else
+ return 0;
+#endif
+}
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl.h
new file mode 100644
index 0000000..496e04e
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl.h
@@ -0,0 +1,46 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#pragma once
+
+
+
+int rd_kafka_sasl_io_event (rd_kafka_transport_t *rktrans, int events,
+ char *errstr, size_t errstr_size);
+void rd_kafka_sasl_close (rd_kafka_transport_t *rktrans);
+int rd_kafka_sasl_client_new (rd_kafka_transport_t *rktrans,
+ char *errstr, size_t errstr_size);
+
+void rd_kafka_sasl_broker_term (rd_kafka_broker_t *rkb);
+void rd_kafka_sasl_broker_init (rd_kafka_broker_t *rkb);
+
+void rd_kafka_sasl_global_term (void);
+int rd_kafka_sasl_global_init (void);
+
+int rd_kafka_sasl_select_provider (rd_kafka_t *rk,
+ char *errstr, size_t errstr_size);
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_cyrus.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_cyrus.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_cyrus.c
new file mode 100644
index 0000000..35b3183
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_cyrus.c
@@ -0,0 +1,623 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_transport.h"
+#include "rdkafka_transport_int.h"
+#include "rdkafka_sasl.h"
+#include "rdkafka_sasl_int.h"
+#include "rdstring.h"
+
+#ifdef __FreeBSD__
+#include <sys/wait.h> /* For WIF.. */
+#endif
+
+#ifdef __APPLE__
+/* Apple has deprecated most of the SASL API for unknown reason,
+ * silence those warnings. */
+#pragma clang diagnostic ignored "-Wdeprecated-declarations"
+#endif
+
+#include <sasl/sasl.h>
+
+static mtx_t rd_kafka_sasl_cyrus_kinit_lock;
+
+typedef struct rd_kafka_sasl_cyrus_state_s {
+ sasl_conn_t *conn;
+ sasl_callback_t callbacks[16];
+} rd_kafka_sasl_cyrus_state_t;
+
+
+
+/**
+ * Handle received frame from broker.
+ */
+static int rd_kafka_sasl_cyrus_recv (struct rd_kafka_transport_s *rktrans,
+ const void *buf, size_t size,
+ char *errstr, size_t errstr_size) {
+ rd_kafka_sasl_cyrus_state_t *state = rktrans->rktrans_sasl.state;
+ int r;
+
+ if (rktrans->rktrans_sasl.complete && size == 0)
+ goto auth_successful;
+
+ do {
+ sasl_interact_t *interact = NULL;
+ const char *out;
+ unsigned int outlen;
+
+ r = sasl_client_step(state->conn,
+ size > 0 ? buf : NULL, size,
+ &interact,
+ &out, &outlen);
+
+ if (r >= 0) {
+ /* Note: outlen may be 0 here for an empty response */
+ if (rd_kafka_sasl_send(rktrans, out, outlen,
+ errstr, errstr_size) == -1)
+ return -1;
+ }
+
+ if (r == SASL_INTERACT)
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
+ "SASL_INTERACT: %lu %s, %s, %s, %p",
+ interact->id,
+ interact->challenge,
+ interact->prompt,
+ interact->defresult,
+ interact->result);
+
+ } while (r == SASL_INTERACT);
+
+ if (r == SASL_CONTINUE)
+ return 0; /* Wait for more data from broker */
+ else if (r != SASL_OK) {
+ rd_snprintf(errstr, errstr_size,
+ "SASL handshake failed (step): %s",
+ sasl_errdetail(state->conn));
+ return -1;
+ }
+
+ /* Authentication successful */
+auth_successful:
+ if (rktrans->rktrans_rkb->rkb_rk->rk_conf.debug &
+ RD_KAFKA_DBG_SECURITY) {
+ const char *user, *mech, *authsrc;
+
+ if (sasl_getprop(state->conn, SASL_USERNAME,
+ (const void **)&user) != SASL_OK)
+ user = "(unknown)";
+
+ if (sasl_getprop(state->conn, SASL_MECHNAME,
+ (const void **)&mech) != SASL_OK)
+ mech = "(unknown)";
+
+ if (sasl_getprop(state->conn, SASL_AUTHSOURCE,
+ (const void **)&authsrc) != SASL_OK)
+ authsrc = "(unknown)";
+
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
+ "Authenticated as %s using %s (%s)",
+ user, mech, authsrc);
+ }
+
+ rd_kafka_sasl_auth_done(rktrans);
+
+ return 0;
+}
+
+
+
+
+static ssize_t render_callback (const char *key, char *buf,
+ size_t size, void *opaque) {
+ rd_kafka_broker_t *rkb = opaque;
+
+ if (!strcmp(key, "broker.name")) {
+ char *val, *t;
+ size_t len;
+ rd_kafka_broker_lock(rkb);
+ rd_strdupa(&val, rkb->rkb_nodename);
+ rd_kafka_broker_unlock(rkb);
+
+ /* Just the broker name, no port */
+ if ((t = strchr(val, ':')))
+ len = (size_t)(t-val);
+ else
+ len = strlen(val);
+
+ if (buf)
+ memcpy(buf, val, RD_MIN(len, size));
+
+ return len;
+
+ } else {
+ rd_kafka_conf_res_t res;
+ size_t destsize = size;
+
+ /* Try config lookup. */
+ res = rd_kafka_conf_get(&rkb->rkb_rk->rk_conf, key,
+ buf, &destsize);
+ if (res != RD_KAFKA_CONF_OK)
+ return -1;
+
+ /* Dont include \0 in returned size */
+ return (destsize > 0 ? destsize-1 : destsize);
+ }
+}
+
+
+/**
+ * Execute kinit to refresh ticket.
+ *
+ * Returns 0 on success, -1 on error.
+ *
+ * Locality: any
+ */
+static int rd_kafka_sasl_cyrus_kinit_refresh (rd_kafka_broker_t *rkb) {
+ rd_kafka_t *rk = rkb->rkb_rk;
+ int r;
+ char *cmd;
+ char errstr[128];
+
+ if (!rk->rk_conf.sasl.kinit_cmd ||
+ !strstr(rk->rk_conf.sasl.mechanisms, "GSSAPI"))
+ return 0; /* kinit not configured */
+
+ /* Build kinit refresh command line using string rendering and config */
+ cmd = rd_string_render(rk->rk_conf.sasl.kinit_cmd,
+ errstr, sizeof(errstr),
+ render_callback, rkb);
+ if (!cmd) {
+ rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH",
+ "Failed to construct kinit command "
+ "from sasl.kerberos.kinit.cmd template: %s",
+ errstr);
+ return -1;
+ }
+
+ /* Execute kinit */
+ rd_rkb_dbg(rkb, SECURITY, "SASLREFRESH",
+ "Refreshing SASL keys with command: %s", cmd);
+
+ mtx_lock(&rd_kafka_sasl_cyrus_kinit_lock);
+ r = system(cmd);
+ mtx_unlock(&rd_kafka_sasl_cyrus_kinit_lock);
+
+ if (r == -1) {
+ rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH",
+ "SASL key refresh failed: Failed to execute %s",
+ cmd);
+ rd_free(cmd);
+ return -1;
+ } else if (WIFSIGNALED(r)) {
+ rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH",
+ "SASL key refresh failed: %s: received signal %d",
+ cmd, WTERMSIG(r));
+ rd_free(cmd);
+ return -1;
+ } else if (WIFEXITED(r) && WEXITSTATUS(r) != 0) {
+ rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH",
+ "SASL key refresh failed: %s: exited with code %d",
+ cmd, WEXITSTATUS(r));
+ rd_free(cmd);
+ return -1;
+ }
+
+ rd_free(cmd);
+
+ rd_rkb_dbg(rkb, SECURITY, "SASLREFRESH", "SASL key refreshed");
+ return 0;
+}
+
+
+/**
+ * Refresh timer callback
+ *
+ * Locality: kafka main thread
+ */
+static void rd_kafka_sasl_cyrus_kinit_refresh_tmr_cb (rd_kafka_timers_t *rkts,
+ void *arg) {
+ rd_kafka_broker_t *rkb = arg;
+
+ rd_kafka_sasl_cyrus_kinit_refresh(rkb);
+}
+
+
+
+/**
+ *
+ * libsasl callbacks
+ *
+ */
+static RD_UNUSED int
+rd_kafka_sasl_cyrus_cb_getopt (void *context, const char *plugin_name,
+ const char *option,
+ const char **result, unsigned *len) {
+ rd_kafka_transport_t *rktrans = context;
+
+ if (!strcmp(option, "client_mech_list"))
+ *result = "GSSAPI";
+ if (!strcmp(option, "canon_user_plugin"))
+ *result = "INTERNAL";
+
+ if (*result && len)
+ *len = strlen(*result);
+
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
+ "CB_GETOPT: plugin %s, option %s: returning %s",
+ plugin_name, option, *result);
+
+ return SASL_OK;
+}
+
+static int rd_kafka_sasl_cyrus_cb_log (void *context, int level, const char *message){
+ rd_kafka_transport_t *rktrans = context;
+
+ if (level >= LOG_DEBUG)
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
+ "%s", message);
+ else
+ rd_rkb_log(rktrans->rktrans_rkb, level, "LIBSASL",
+ "%s", message);
+ return SASL_OK;
+}
+
+
+static int rd_kafka_sasl_cyrus_cb_getsimple (void *context, int id,
+ const char **result, unsigned *len) {
+ rd_kafka_transport_t *rktrans = context;
+
+ switch (id)
+ {
+ case SASL_CB_USER:
+ case SASL_CB_AUTHNAME:
+ *result = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.username;
+ break;
+
+ default:
+ *result = NULL;
+ break;
+ }
+
+ if (len)
+ *len = *result ? strlen(*result) : 0;
+
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
+ "CB_GETSIMPLE: id 0x%x: returning %s", id, *result);
+
+ return *result ? SASL_OK : SASL_FAIL;
+}
+
+
+static int rd_kafka_sasl_cyrus_cb_getsecret (sasl_conn_t *conn, void *context,
+ int id, sasl_secret_t **psecret) {
+ rd_kafka_transport_t *rktrans = context;
+ const char *password;
+
+ password = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.password;
+
+ if (!password) {
+ *psecret = NULL;
+ } else {
+ size_t passlen = strlen(password);
+ *psecret = rd_realloc(*psecret, sizeof(**psecret) + passlen);
+ (*psecret)->len = passlen;
+ memcpy((*psecret)->data, password, passlen);
+ }
+
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
+ "CB_GETSECRET: id 0x%x: returning %s",
+ id, *psecret ? "(hidden)":"NULL");
+
+ return SASL_OK;
+}
+
+static int rd_kafka_sasl_cyrus_cb_chalprompt (void *context, int id,
+ const char *challenge,
+ const char *prompt,
+ const char *defres,
+ const char **result, unsigned *len) {
+ rd_kafka_transport_t *rktrans = context;
+
+ *result = "min_chalprompt";
+ *len = strlen(*result);
+
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
+ "CB_CHALPROMPT: id 0x%x, challenge %s, prompt %s, "
+ "default %s: returning %s",
+ id, challenge, prompt, defres, *result);
+
+ return SASL_OK;
+}
+
+static int rd_kafka_sasl_cyrus_cb_getrealm (void *context, int id,
+ const char **availrealms,
+ const char **result) {
+ rd_kafka_transport_t *rktrans = context;
+
+ *result = *availrealms;
+
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
+ "CB_GETREALM: id 0x%x: returning %s", id, *result);
+
+ return SASL_OK;
+}
+
+
+static RD_UNUSED int
+rd_kafka_sasl_cyrus_cb_canon (sasl_conn_t *conn,
+ void *context,
+ const char *in, unsigned inlen,
+ unsigned flags,
+ const char *user_realm,
+ char *out, unsigned out_max,
+ unsigned *out_len) {
+ rd_kafka_transport_t *rktrans = context;
+
+ if (strstr(rktrans->rktrans_rkb->rkb_rk->rk_conf.
+ sasl.mechanisms, "GSSAPI")) {
+ *out_len = rd_snprintf(out, out_max, "%s",
+ rktrans->rktrans_rkb->rkb_rk->
+ rk_conf.sasl.principal);
+ } else if (!strcmp(rktrans->rktrans_rkb->rkb_rk->rk_conf.
+ sasl.mechanisms, "PLAIN")) {
+ *out_len = rd_snprintf(out, out_max, "%.*s", inlen, in);
+ } else
+ out = NULL;
+
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
+ "CB_CANON: flags 0x%x, \"%.*s\" @ \"%s\": returning \"%.*s\"",
+ flags, (int)inlen, in, user_realm, (int)(*out_len), out);
+
+ return out ? SASL_OK : SASL_FAIL;
+}
+
+
+static void rd_kafka_sasl_cyrus_close (struct rd_kafka_transport_s *rktrans) {
+ rd_kafka_sasl_cyrus_state_t *state = rktrans->rktrans_sasl.state;
+
+ if (!state)
+ return;
+
+ if (state->conn)
+ sasl_dispose(&state->conn);
+ rd_free(state);
+}
+
+
+/**
+ * Initialize and start SASL authentication.
+ *
+ * Returns 0 on successful init and -1 on error.
+ *
+ * Locality: broker thread
+ */
+static int rd_kafka_sasl_cyrus_client_new (rd_kafka_transport_t *rktrans,
+ const char *hostname,
+ char *errstr, size_t errstr_size) {
+ int r;
+ rd_kafka_sasl_cyrus_state_t *state;
+ rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
+ rd_kafka_t *rk = rkb->rkb_rk;
+ sasl_callback_t callbacks[16] = {
+ // { SASL_CB_GETOPT, (void *)rd_kafka_sasl_cyrus_cb_getopt, rktrans },
+ { SASL_CB_LOG, (void *)rd_kafka_sasl_cyrus_cb_log, rktrans },
+ { SASL_CB_AUTHNAME, (void *)rd_kafka_sasl_cyrus_cb_getsimple, rktrans },
+ { SASL_CB_PASS, (void *)rd_kafka_sasl_cyrus_cb_getsecret, rktrans },
+ { SASL_CB_ECHOPROMPT, (void *)rd_kafka_sasl_cyrus_cb_chalprompt, rktrans },
+ { SASL_CB_GETREALM, (void *)rd_kafka_sasl_cyrus_cb_getrealm, rktrans },
+ { SASL_CB_CANON_USER, (void *)rd_kafka_sasl_cyrus_cb_canon, rktrans },
+ { SASL_CB_LIST_END }
+ };
+
+ state = rd_calloc(1, sizeof(*state));
+ rktrans->rktrans_sasl.state = state;
+
+ /* SASL_CB_USER is needed for PLAIN but breaks GSSAPI */
+ if (!strcmp(rk->rk_conf.sasl.mechanisms, "PLAIN")) {
+ int endidx;
+ /* Find end of callbacks array */
+ for (endidx = 0 ;
+ callbacks[endidx].id != SASL_CB_LIST_END ; endidx++)
+ ;
+
+ callbacks[endidx].id = SASL_CB_USER;
+ callbacks[endidx].proc = (void *)rd_kafka_sasl_cyrus_cb_getsimple;
+ callbacks[endidx].context = rktrans;
+ endidx++;
+ callbacks[endidx].id = SASL_CB_LIST_END;
+ }
+
+ memcpy(state->callbacks, callbacks, sizeof(callbacks));
+
+ /* Acquire or refresh ticket if kinit is configured */
+ rd_kafka_sasl_cyrus_kinit_refresh(rkb);
+
+ r = sasl_client_new(rk->rk_conf.sasl.service_name, hostname,
+ NULL, NULL, /* no local & remote IP checks */
+ state->callbacks, 0, &state->conn);
+ if (r != SASL_OK) {
+ rd_snprintf(errstr, errstr_size, "%s",
+ sasl_errstring(r, NULL, NULL));
+ return -1;
+ }
+
+ if (rk->rk_conf.debug & RD_KAFKA_DBG_SECURITY) {
+ const char *avail_mechs;
+ sasl_listmech(state->conn, NULL, NULL, " ", NULL,
+ &avail_mechs, NULL, NULL);
+ rd_rkb_dbg(rkb, SECURITY, "SASL",
+ "My supported SASL mechanisms: %s", avail_mechs);
+ }
+
+ do {
+ const char *out;
+ unsigned int outlen;
+ const char *mech = NULL;
+
+ r = sasl_client_start(state->conn,
+ rk->rk_conf.sasl.mechanisms,
+ NULL, &out, &outlen, &mech);
+
+ if (r >= 0)
+ if (rd_kafka_sasl_send(rktrans, out, outlen,
+ errstr, errstr_size))
+ return -1;
+ } while (r == SASL_INTERACT);
+
+ if (r == SASL_OK) {
+ /* PLAIN is appearantly done here, but we still need to make sure
+ * the PLAIN frame is sent and we get a response back (but we must
+ * not pass the response to libsasl or it will fail). */
+ rktrans->rktrans_sasl.complete = 1;
+ return 0;
+
+ } else if (r != SASL_CONTINUE) {
+ rd_snprintf(errstr, errstr_size,
+ "SASL handshake failed (start (%d)): %s",
+ r, sasl_errdetail(state->conn));
+ return -1;
+ }
+
+ return 0;
+}
+
+
+
+
+
+
+
+/**
+ * Per handle SASL term.
+ *
+ * Locality: broker thread
+ */
+static void rd_kafka_sasl_cyrus_broker_term (rd_kafka_broker_t *rkb) {
+ rd_kafka_t *rk = rkb->rkb_rk;
+
+ if (!rk->rk_conf.sasl.kinit_cmd)
+ return;
+
+ rd_kafka_timer_stop(&rk->rk_timers, &rkb->rkb_sasl_kinit_refresh_tmr,1);
+}
+
+/**
+ * Broker SASL init.
+ *
+ * Locality: broker thread
+ */
+static void rd_kafka_sasl_cyrus_broker_init (rd_kafka_broker_t *rkb) {
+ rd_kafka_t *rk = rkb->rkb_rk;
+
+ if (!rk->rk_conf.sasl.kinit_cmd ||
+ !strstr(rk->rk_conf.sasl.mechanisms, "GSSAPI"))
+ return; /* kinit not configured, no need to start timer */
+
+ rd_kafka_timer_start(&rk->rk_timers, &rkb->rkb_sasl_kinit_refresh_tmr,
+ rk->rk_conf.sasl.relogin_min_time * 1000ll,
+ rd_kafka_sasl_cyrus_kinit_refresh_tmr_cb, rkb);
+}
+
+
+
+static int rd_kafka_sasl_cyrus_conf_validate (rd_kafka_t *rk,
+ char *errstr, size_t errstr_size) {
+
+ if (strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI"))
+ return 0;
+
+ if (rk->rk_conf.sasl.kinit_cmd) {
+ rd_kafka_broker_t rkb;
+ char *cmd;
+ char tmperr[128];
+
+ memset(&rkb, 0, sizeof(rkb));
+ strcpy(rkb.rkb_nodename, "ATestBroker:9092");
+ rkb.rkb_rk = rk;
+ mtx_init(&rkb.rkb_lock, mtx_plain);
+
+ cmd = rd_string_render(rk->rk_conf.sasl.kinit_cmd,
+ tmperr, sizeof(tmperr),
+ render_callback, &rkb);
+
+ mtx_destroy(&rkb.rkb_lock);
+
+ if (!cmd) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid sasl.kerberos.kinit.cmd value: %s",
+ tmperr);
+ return -1;
+ }
+
+ rd_free(cmd);
+ }
+
+ return 0;
+}
+
+
+/**
+ * Global SASL termination.
+ */
+void rd_kafka_sasl_cyrus_global_term (void) {
+ /* NOTE: Should not be called since the application may be using SASL too*/
+ /* sasl_done(); */
+ mtx_destroy(&rd_kafka_sasl_cyrus_kinit_lock);
+}
+
+
+/**
+ * Global SASL init, called once per runtime.
+ */
+int rd_kafka_sasl_cyrus_global_init (void) {
+ int r;
+
+ mtx_init(&rd_kafka_sasl_cyrus_kinit_lock, mtx_plain);
+
+ r = sasl_client_init(NULL);
+ if (r != SASL_OK) {
+ fprintf(stderr, "librdkafka: sasl_client_init() failed: %s\n",
+ sasl_errstring(r, NULL, NULL));
+ return -1;
+ }
+
+ return 0;
+}
+
+
+const struct rd_kafka_sasl_provider rd_kafka_sasl_cyrus_provider = {
+ .name = "Cyrus",
+ .client_new = rd_kafka_sasl_cyrus_client_new,
+ .recv = rd_kafka_sasl_cyrus_recv,
+ .close = rd_kafka_sasl_cyrus_close,
+ .broker_init = rd_kafka_sasl_cyrus_broker_init,
+ .broker_term = rd_kafka_sasl_cyrus_broker_term,
+ .conf_validate = rd_kafka_sasl_cyrus_conf_validate
+};
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_int.h
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_int.h b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_int.h
new file mode 100644
index 0000000..699174e
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_int.h
@@ -0,0 +1,70 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#pragma once
+
+struct rd_kafka_sasl_provider {
+ const char *name;
+
+ int (*client_new) (rd_kafka_transport_t *rktrans,
+ const char *hostname,
+ char *errstr, size_t errstr_size);
+
+ int (*recv) (struct rd_kafka_transport_s *s,
+ const void *buf, size_t size,
+ char *errstr, size_t errstr_size);
+ void (*close) (struct rd_kafka_transport_s *);
+
+ void (*broker_init) (rd_kafka_broker_t *rkb);
+ void (*broker_term) (rd_kafka_broker_t *rkb);
+
+ int (*conf_validate) (rd_kafka_t *rk,
+ char *errstr, size_t errstr_size);
+};
+
+#ifdef _MSC_VER
+extern const struct rd_kafka_sasl_provider rd_kafka_sasl_win32_provider;
+#endif
+
+#if WITH_SASL_CYRUS
+extern const struct rd_kafka_sasl_provider rd_kafka_sasl_cyrus_provider;
+void rd_kafka_sasl_cyrus_global_term (void);
+int rd_kafka_sasl_cyrus_global_init (void);
+#endif
+
+extern const struct rd_kafka_sasl_provider rd_kafka_sasl_plain_provider;
+
+#if WITH_SASL_SCRAM
+extern const struct rd_kafka_sasl_provider rd_kafka_sasl_scram_provider;
+#endif
+
+void rd_kafka_sasl_auth_done (rd_kafka_transport_t *rktrans);
+int rd_kafka_sasl_send (rd_kafka_transport_t *rktrans,
+ const void *payload, int len,
+ char *errstr, size_t errstr_size);
+
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_plain.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_plain.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_plain.c
new file mode 100644
index 0000000..57650ee
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_plain.c
@@ -0,0 +1,128 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2017 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+/**
+ * Builtin SASL PLAIN support when Cyrus SASL is not available
+ */
+#include "rdkafka_int.h"
+#include "rdkafka_transport.h"
+#include "rdkafka_transport_int.h"
+#include "rdkafka_sasl.h"
+#include "rdkafka_sasl_int.h"
+
+
+/**
+ * @brief Handle received frame from broker.
+ */
+static int rd_kafka_sasl_plain_recv (struct rd_kafka_transport_s *rktrans,
+ const void *buf, size_t size,
+ char *errstr, size_t errstr_size) {
+ if (size)
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLPLAIN",
+ "Received non-empty SASL PLAIN (builtin) "
+ "response from broker (%"PRIusz" bytes)", size);
+
+ rd_kafka_sasl_auth_done(rktrans);
+
+ return 0;
+}
+
+
+/**
+ * @brief Initialize and start SASL PLAIN (builtin) authentication.
+ *
+ * Returns 0 on successful init and -1 on error.
+ *
+ * @locality broker thread
+ */
+int rd_kafka_sasl_plain_client_new (rd_kafka_transport_t *rktrans,
+ const char *hostname,
+ char *errstr, size_t errstr_size) {
+ rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
+ rd_kafka_t *rk = rkb->rkb_rk;
+ /* [authzid] UTF8NUL authcid UTF8NUL passwd */
+ char buf[255+1+255+1+255+1];
+ int of = 0;
+
+ /* authzid: none (empty) */
+ /* UTF8NUL */
+ buf[of++] = 0;
+ /* authcid */
+ if (rk->rk_conf.sasl.username) {
+ int r = (int)strlen(rk->rk_conf.sasl.username);
+ r = RD_MIN(255, r);
+ memcpy(&buf[of], rk->rk_conf.sasl.username, r);
+ of += r;
+ }
+ /* UTF8NUL */
+ buf[of++] = 0;
+ /* passwd */
+ if (rk->rk_conf.sasl.password) {
+ int r = (int)strlen(rk->rk_conf.sasl.password);
+ r = RD_MIN(255, r);
+ memcpy(&buf[of], rk->rk_conf.sasl.password, r);
+ of += r;
+ }
+
+ rd_rkb_dbg(rkb, SECURITY, "SASLPLAIN",
+ "Sending SASL PLAIN (builtin) authentication token");
+
+ if (rd_kafka_sasl_send(rktrans, buf, of,
+ errstr, errstr_size))
+ return -1;
+
+ /* PLAIN is appearantly done here, but we still need to make sure
+ * the PLAIN frame is sent and we get a response back (empty) */
+ rktrans->rktrans_sasl.complete = 1;
+ return 0;
+}
+
+
+/**
+ * @brief Validate PLAIN config
+ */
+static int rd_kafka_sasl_plain_conf_validate (rd_kafka_t *rk,
+ char *errstr,
+ size_t errstr_size) {
+ if (!rk->rk_conf.sasl.username || !rk->rk_conf.sasl.password) {
+ rd_snprintf(errstr, errstr_size,
+ "sasl.username and sasl.password must be set");
+ return -1;
+ }
+
+ return 0;
+}
+
+
+const struct rd_kafka_sasl_provider rd_kafka_sasl_plain_provider = {
+ .name = "PLAIN (builtin)",
+ .client_new = rd_kafka_sasl_plain_client_new,
+ .recv = rd_kafka_sasl_plain_recv,
+ .conf_validate = rd_kafka_sasl_plain_conf_validate
+};
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f66960e/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_scram.c
----------------------------------------------------------------------
diff --git a/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_scram.c b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_scram.c
new file mode 100644
index 0000000..968d879
--- /dev/null
+++ b/thirdparty/librdkafka-0.11.1/src/rdkafka_sasl_scram.c
@@ -0,0 +1,901 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2017 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+/**
+ * Builtin SASL SCRAM support when Cyrus SASL is not available
+ */
+#include "rdkafka_int.h"
+#include "rdkafka_transport.h"
+#include "rdkafka_transport_int.h"
+#include "rdkafka_sasl.h"
+#include "rdkafka_sasl_int.h"
+#include "rdrand.h"
+
+#if WITH_SSL
+#include <openssl/hmac.h>
+#include <openssl/evp.h>
+#include <openssl/sha.h>
+#else
+#error "WITH_SSL (OpenSSL) is required for SASL SCRAM"
+#endif
+
+
+/**
+ * @brief Per-connection state
+ */
+struct rd_kafka_sasl_scram_state {
+ enum {
+ RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FIRST_MESSAGE,
+ RD_KAFKA_SASL_SCRAM_STATE_SERVER_FIRST_MESSAGE,
+ RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FINAL_MESSAGE,
+ } state;
+ rd_chariov_t cnonce; /* client c-nonce */
+ rd_chariov_t first_msg_bare; /* client-first-message-bare */
+ char *ServerSignatureB64; /* ServerSignature in Base64 */
+ const EVP_MD *evp; /* Hash function pointer */
+};
+
+
+/**
+ * @brief Close and free authentication state
+ */
+static void rd_kafka_sasl_scram_close (rd_kafka_transport_t *rktrans) {
+ struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
+
+ if (!state)
+ return;
+
+ RD_IF_FREE(state->cnonce.ptr, rd_free);
+ RD_IF_FREE(state->first_msg_bare.ptr, rd_free);
+ RD_IF_FREE(state->ServerSignatureB64, rd_free);
+ rd_free(state);
+}
+
+
+
+/**
+ * @brief Generates a nonce string (a random printable string)
+ * @remark dst->ptr will be allocated and must be freed.
+ */
+static void rd_kafka_sasl_scram_generate_nonce (rd_chariov_t *dst) {
+ int i;
+ dst->size = 32;
+ dst->ptr = rd_malloc(dst->size+1);
+ for (i = 0 ; i < (int)dst->size ; i++)
+ dst->ptr[i] = 'a'; // (char)rd_jitter(0x2d/*-*/, 0x7e/*~*/);
+ dst->ptr[i] = 0;
+}
+
+
+/**
+ * @brief Parses inbuf for SCRAM attribute \p attr (e.g., 's')
+ * @returns a newly allocated copy of the value, or NULL
+ * on failure in which case an error is written to \p errstr
+ * prefixed by \p description.
+ */
+static char *rd_kafka_sasl_scram_get_attr (const rd_chariov_t *inbuf, char attr,
+ const char *description,
+ char *errstr, size_t errstr_size) {
+ size_t of = 0;
+
+ for (of = 0 ; of < inbuf->size ; ) {
+ const char *td;
+ size_t len;
+
+ /* Find next delimiter , (if any) */
+ td = memchr(&inbuf->ptr[of], ',', inbuf->size - of);
+ if (td)
+ len = (size_t)(td - &inbuf->ptr[of]);
+ else
+ len = inbuf->size - of;
+
+ /* Check if attr "x=" matches */
+ if (inbuf->ptr[of] == attr && inbuf->size > of+1 &&
+ inbuf->ptr[of+1] == '=') {
+ char *ret;
+ of += 2; /* past = */
+ ret = rd_malloc(len - 2 + 1);
+ memcpy(ret, &inbuf->ptr[of], len - 2);
+ ret[len-2] = '\0';
+ return ret;
+ }
+
+ /* Not the attr we are looking for, skip
+ * past the next delimiter and continue looking. */
+ of += len+1;
+ }
+
+ rd_snprintf(errstr, errstr_size,
+ "%s: could not find attribute (%c)",
+ description, attr);
+ return NULL;
+}
+
+
+/**
+ * @brief Base64 encode binary input \p in
+ * @returns a newly allocated base64 string
+ */
+static char *rd_base64_encode (const rd_chariov_t *in) {
+ BIO *buf, *b64f;
+ BUF_MEM *ptr;
+ char *out;
+
+ b64f = BIO_new(BIO_f_base64());
+ buf = BIO_new(BIO_s_mem());
+ buf = BIO_push(b64f, buf);
+
+ BIO_set_flags(buf, BIO_FLAGS_BASE64_NO_NL);
+ BIO_set_close(buf, BIO_CLOSE);
+ BIO_write(buf, in->ptr, (int)in->size);
+ BIO_flush(buf);
+
+ BIO_get_mem_ptr(buf, &ptr);
+ out = malloc(ptr->length + 1);
+ memcpy(out, ptr->data, ptr->length);
+ out[ptr->length] = '\0';
+
+ BIO_free_all(buf);
+
+ return out;
+}
+
+/**
+ * @brief Base64 decode input string \p in of size \p insize.
+ * @returns -1 on invalid Base64, or 0 on successes in which case a
+ * newly allocated binary string is set in out (and size).
+ */
+static int rd_base64_decode (const rd_chariov_t *in, rd_chariov_t *out) {
+ size_t asize;
+ BIO *b64, *bmem;
+
+ if (in->size == 0 || (in->size % 4) != 0)
+ return -1;
+
+ asize = (in->size * 3) / 4; /* allocation size */
+ out->ptr = rd_malloc(asize+1);
+
+ b64 = BIO_new(BIO_f_base64());
+ BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
+
+ bmem = BIO_new_mem_buf(in->ptr, (int)in->size);
+ bmem = BIO_push(b64, bmem);
+
+ out->size = BIO_read(bmem, out->ptr, (int)asize+1);
+ assert(out->size <= asize);
+ BIO_free_all(bmem);
+
+#if ENABLE_DEVEL
+ /* Verify that decode==encode */
+ {
+ char *encoded = rd_base64_encode(out);
+ assert(strlen(encoded) == in->size);
+ assert(!strncmp(encoded, in->ptr, in->size));
+ rd_free(encoded);
+ }
+#endif
+
+ return 0;
+}
+
+
+/**
+ * @brief Perform H(str) hash function and stores the result in \p out
+ * which must be at least EVP_MAX_MD_SIZE.
+ * @returns 0 on success, else -1
+ */
+static int
+rd_kafka_sasl_scram_H (rd_kafka_transport_t *rktrans,
+ const rd_chariov_t *str,
+ rd_chariov_t *out) {
+
+ rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_H(
+ (const unsigned char *)str->ptr, str->size,
+ (unsigned char *)out->ptr);
+
+ out->size = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_H_size;
+ return 0;
+}
+
+/**
+ * @brief Perform HMAC(key,str) and stores the result in \p out
+ * which must be at least EVP_MAX_MD_SIZE.
+ * @returns 0 on success, else -1
+ */
+static int
+rd_kafka_sasl_scram_HMAC (rd_kafka_transport_t *rktrans,
+ const rd_chariov_t *key,
+ const rd_chariov_t *str,
+ rd_chariov_t *out) {
+ const EVP_MD *evp =
+ rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_evp;
+ unsigned int outsize;
+
+ //printf("HMAC KEY: %s\n", rd_base64_encode(key));
+ //printf("HMAC STR: %s\n", rd_base64_encode(str));
+
+ if (!HMAC(evp,
+ (const unsigned char *)key->ptr, (int)key->size,
+ (const unsigned char *)str->ptr, (int)str->size,
+ (unsigned char *)out->ptr, &outsize)) {
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM",
+ "HMAC failed");
+ return -1;
+ }
+
+ out->size = outsize;
+ //printf("HMAC OUT: %s\n", rd_base64_encode(out));
+
+ return 0;
+}
+
+
+
+/**
+ * @brief Perform \p itcnt iterations of HMAC() on the given buffer \p in
+ * using \p salt, writing the output into \p out which must be
+ * at least EVP_MAX_MD_SIZE. Actual size is updated in \p *outsize.
+ * @returns 0 on success, else -1
+ */
+static int
+rd_kafka_sasl_scram_Hi (rd_kafka_transport_t *rktrans,
+ const rd_chariov_t *in,
+ const rd_chariov_t *salt,
+ int itcnt, rd_chariov_t *out) {
+ const EVP_MD *evp =
+ rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_evp;
+ unsigned int ressize = 0;
+ unsigned char tempres[EVP_MAX_MD_SIZE];
+ unsigned char *saltplus;
+ int i;
+
+ /* U1 := HMAC(str, salt + INT(1)) */
+ saltplus = rd_alloca(salt->size + 4);
+ memcpy(saltplus, salt->ptr, salt->size);
+ saltplus[salt->size] = 0;
+ saltplus[salt->size+1] = 0;
+ saltplus[salt->size+2] = 0;
+ saltplus[salt->size+3] = 1;
+
+ /* U1 := HMAC(str, salt + INT(1)) */
+ if (!HMAC(evp,
+ (const unsigned char *)in->ptr, (int)in->size,
+ saltplus, salt->size+4,
+ tempres, &ressize)) {
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM",
+ "HMAC priming failed");
+ return -1;
+ }
+
+ memcpy(out->ptr, tempres, ressize);
+
+ /* Ui-1 := HMAC(str, Ui-2) .. */
+ for (i = 1 ; i < itcnt ; i++) {
+ unsigned char tempdest[EVP_MAX_MD_SIZE];
+ int j;
+
+ if (unlikely(!HMAC(evp,
+ (const unsigned char *)in->ptr, (int)in->size,
+ tempres, ressize,
+ tempdest, NULL))) {
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM",
+ "Hi() HMAC #%d/%d failed", i, itcnt);
+ return -1;
+ }
+
+ /* U1 XOR U2 .. */
+ for (j = 0 ; j < (int)ressize ; j++) {
+ out->ptr[j] ^= tempdest[j];
+ tempres[j] = tempdest[j];
+ }
+ }
+
+ out->size = ressize;
+
+ return 0;
+}
+
+
+/**
+ * @returns a SASL value-safe-char encoded string, replacing "," and "="
+ * with their escaped counterparts in a newly allocated string.
+ */
+static char *rd_kafka_sasl_safe_string (const char *str) {
+ char *safe = NULL, *d = NULL/*avoid warning*/;
+ int pass;
+ size_t len = 0;
+
+ /* Pass #1: scan for needed length and allocate.
+ * Pass #2: encode string */
+ for (pass = 0 ; pass < 2 ; pass++) {
+ const char *s;
+ for (s = str ; *s ; s++) {
+ if (pass == 0) {
+ len += 1 + (*s == ',' || *s == '=');
+ continue;
+ }
+
+ if (*s == ',') {
+ *(d++) = '=';
+ *(d++) = '2';
+ *(d++) = 'C';
+ } else if (*s == '=') {
+ *(d++) = '=';
+ *(d++) = '3';
+ *(d++) = 'D';
+ } else
+ *(d++) = *s;
+ }
+
+ if (pass == 0)
+ d = safe = rd_malloc(len+1);
+ }
+
+ rd_assert(d == safe + (int)len);
+ *d = '\0';
+
+ return safe;
+}
+
+
+/**
+ * @brief Build client-final-message-without-proof
+ * @remark out->ptr will be allocated and must be freed.
+ */
+static void
+rd_kafka_sasl_scram_build_client_final_message_wo_proof (
+ struct rd_kafka_sasl_scram_state *state,
+ const char *snonce,
+ rd_chariov_t *out) {
+ const char *attr_c = "biws"; /* base64 encode of "n,," */
+
+ /*
+ * client-final-message-without-proof =
+ * channel-binding "," nonce [","
+ * extensions]
+ */
+ out->size = strlen("c=,r=") + strlen(attr_c) +
+ state->cnonce.size + strlen(snonce);
+ out->ptr = rd_malloc(out->size+1);
+ rd_snprintf(out->ptr, out->size+1, "c=%s,r=%.*s%s",
+ attr_c, (int)state->cnonce.size, state->cnonce.ptr, snonce);
+}
+
+
+/**
+ * @brief Build client-final-message
+ * @returns -1 on error.
+ */
+static int
+rd_kafka_sasl_scram_build_client_final_message (
+ rd_kafka_transport_t *rktrans,
+ const rd_chariov_t *salt,
+ const char *server_nonce,
+ const rd_chariov_t *server_first_msg,
+ int itcnt, rd_chariov_t *out) {
+ struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
+ const rd_kafka_conf_t *conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf;
+ rd_chariov_t SaslPassword =
+ { .ptr = conf->sasl.password,
+ .size = strlen(conf->sasl.password) };
+ rd_chariov_t SaltedPassword =
+ { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
+ rd_chariov_t ClientKey =
+ { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
+ rd_chariov_t ServerKey =
+ { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
+ rd_chariov_t StoredKey =
+ { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
+ rd_chariov_t AuthMessage = RD_ZERO_INIT;
+ rd_chariov_t ClientSignature =
+ { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
+ rd_chariov_t ServerSignature =
+ { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
+ const rd_chariov_t ClientKeyVerbatim =
+ { .ptr = "Client Key", .size = 10 };
+ const rd_chariov_t ServerKeyVerbatim =
+ { .ptr = "Server Key", .size = 10 };
+ rd_chariov_t ClientProof =
+ { .ptr = rd_alloca(EVP_MAX_MD_SIZE) };
+ rd_chariov_t client_final_msg_wo_proof;
+ char *ClientProofB64;
+ int i;
+
+ /* Constructing the ClientProof attribute (p):
+ *
+ * p = Base64-encoded ClientProof
+ * SaltedPassword := Hi(Normalize(password), salt, i)
+ * ClientKey := HMAC(SaltedPassword, "Client Key")
+ * StoredKey := H(ClientKey)
+ * AuthMessage := client-first-message-bare + "," +
+ * server-first-message + "," +
+ * client-final-message-without-proof
+ * ClientSignature := HMAC(StoredKey, AuthMessage)
+ * ClientProof := ClientKey XOR ClientSignature
+ * ServerKey := HMAC(SaltedPassword, "Server Key")
+ * ServerSignature := HMAC(ServerKey, AuthMessage)
+ */
+
+ /* SaltedPassword := Hi(Normalize(password), salt, i) */
+ if (rd_kafka_sasl_scram_Hi(
+ rktrans, &SaslPassword, salt,
+ itcnt, &SaltedPassword) == -1)
+ return -1;
+
+ /* ClientKey := HMAC(SaltedPassword, "Client Key") */
+ if (rd_kafka_sasl_scram_HMAC(
+ rktrans, &SaltedPassword, &ClientKeyVerbatim,
+ &ClientKey) == -1)
+ return -1;
+
+ /* StoredKey := H(ClientKey) */
+ if (rd_kafka_sasl_scram_H(rktrans, &ClientKey, &StoredKey) == -1)
+ return -1;
+
+ /* client-final-message-without-proof */
+ rd_kafka_sasl_scram_build_client_final_message_wo_proof(
+ state, server_nonce, &client_final_msg_wo_proof);
+
+ /* AuthMessage := client-first-message-bare + "," +
+ * server-first-message + "," +
+ * client-final-message-without-proof */
+ AuthMessage.size =
+ state->first_msg_bare.size + 1 +
+ server_first_msg->size + 1 +
+ client_final_msg_wo_proof.size;
+ AuthMessage.ptr = rd_alloca(AuthMessage.size+1);
+ rd_snprintf(AuthMessage.ptr, AuthMessage.size+1,
+ "%.*s,%.*s,%.*s",
+ (int)state->first_msg_bare.size, state->first_msg_bare.ptr,
+ (int)server_first_msg->size, server_first_msg->ptr,
+ (int)client_final_msg_wo_proof.size,
+ client_final_msg_wo_proof.ptr);
+
+ /*
+ * Calculate ServerSignature for later verification when
+ * server-final-message is received.
+ */
+
+ /* ServerKey := HMAC(SaltedPassword, "Server Key") */
+ if (rd_kafka_sasl_scram_HMAC(
+ rktrans, &SaltedPassword, &ServerKeyVerbatim,
+ &ServerKey) == -1) {
+ rd_free(client_final_msg_wo_proof.ptr);
+ return -1;
+ }
+
+ /* ServerSignature := HMAC(ServerKey, AuthMessage) */
+ if (rd_kafka_sasl_scram_HMAC(rktrans, &ServerKey,
+ &AuthMessage, &ServerSignature) == -1) {
+ rd_free(client_final_msg_wo_proof.ptr);
+ return -1;
+ }
+
+ /* Store the Base64 encoded ServerSignature for quick comparison */
+ state->ServerSignatureB64 = rd_base64_encode(&ServerSignature);
+
+
+ /*
+ * Continue with client-final-message
+ */
+
+ /* ClientSignature := HMAC(StoredKey, AuthMessage) */
+ if (rd_kafka_sasl_scram_HMAC(rktrans, &StoredKey,
+ &AuthMessage, &ClientSignature) == -1) {
+ rd_free(client_final_msg_wo_proof.ptr);
+ return -1;
+ }
+
+ /* ClientProof := ClientKey XOR ClientSignature */
+ assert(ClientKey.size == ClientSignature.size);
+ for (i = 0 ; i < (int)ClientKey.size ; i++)
+ ClientProof.ptr[i] = ClientKey.ptr[i] ^ ClientSignature.ptr[i];
+ ClientProof.size = ClientKey.size;
+
+
+ /* Base64 encoded ClientProof */
+ ClientProofB64 = rd_base64_encode(&ClientProof);
+
+ /* Construct client-final-message */
+ out->size = client_final_msg_wo_proof.size +
+ strlen(",p=") + strlen(ClientProofB64);
+ out->ptr = rd_malloc(out->size + 1);
+
+ rd_snprintf(out->ptr, out->size+1,
+ "%.*s,p=%s",
+ (int)client_final_msg_wo_proof.size,
+ client_final_msg_wo_proof.ptr,
+ ClientProofB64);
+ rd_free(ClientProofB64);
+ rd_free(client_final_msg_wo_proof.ptr);
+
+ return 0;
+}
+
+
+/**
+ * @brief Handle first message from server
+ *
+ * Parse server response which looks something like:
+ * "r=fyko+d2lbbFgONR....,s=QSXCR+Q6sek8bf92,i=4096"
+ *
+ * @returns -1 on error.
+ */
+static int
+rd_kafka_sasl_scram_handle_server_first_message (rd_kafka_transport_t *rktrans,
+ const rd_chariov_t *in,
+ rd_chariov_t *out,
+ char *errstr,
+ size_t errstr_size) {
+ struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
+ char *server_nonce;
+ rd_chariov_t salt_b64, salt;
+ char *itcntstr;
+ const char *endptr;
+ int itcnt;
+ char *attr_m;
+
+ /* Mandatory future extension check */
+ if ((attr_m = rd_kafka_sasl_scram_get_attr(
+ in, 'm', NULL, NULL, 0))) {
+ rd_snprintf(errstr, errstr_size,
+ "Unsupported mandatory SCRAM extension");
+ rd_free(attr_m);
+ return -1;
+ }
+
+ /* Server nonce */
+ if (!(server_nonce = rd_kafka_sasl_scram_get_attr(
+ in, 'r',
+ "Server nonce in server-first-message",
+ errstr, errstr_size)))
+ return -1;
+
+ if (strlen(server_nonce) <= state->cnonce.size ||
+ strncmp(state->cnonce.ptr, server_nonce, state->cnonce.size)) {
+ rd_snprintf(errstr, errstr_size,
+ "Server/client nonce mismatch in "
+ "server-first-message");
+ rd_free(server_nonce);
+ return -1;
+ }
+
+ /* Salt (Base64) */
+ if (!(salt_b64.ptr = rd_kafka_sasl_scram_get_attr(
+ in, 's',
+ "Salt in server-first-message",
+ errstr, errstr_size))) {
+ rd_free(server_nonce);
+ return -1;
+ }
+ salt_b64.size = strlen(salt_b64.ptr);
+
+ /* Convert Salt to binary */
+ if (rd_base64_decode(&salt_b64, &salt) == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid Base64 Salt in server-first-message");
+ rd_free(server_nonce);
+ rd_free(salt_b64.ptr);
+ }
+ rd_free(salt_b64.ptr);
+
+ /* Iteration count (as string) */
+ if (!(itcntstr = rd_kafka_sasl_scram_get_attr(
+ in, 'i',
+ "Iteration count in server-first-message",
+ errstr, errstr_size))) {
+ rd_free(server_nonce);
+ rd_free(salt.ptr);
+ return -1;
+ }
+
+ /* Iteration count (as int) */
+ errno = 0;
+ itcnt = (int)strtoul(itcntstr, (char **)&endptr, 10);
+ if (itcntstr == endptr || *endptr != '\0' || errno != 0 ||
+ itcnt > 1000000) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid value (not integer or too large) "
+ "for Iteration count in server-first-message");
+ rd_free(server_nonce);
+ rd_free(salt.ptr);
+ rd_free(itcntstr);
+ return -1;
+ }
+ rd_free(itcntstr);
+
+ /* Build client-final-message */
+ if (rd_kafka_sasl_scram_build_client_final_message(
+ rktrans, &salt, server_nonce, in, itcnt, out) == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "Failed to build SCRAM client-final-message");
+ rd_free(salt.ptr);
+ rd_free(server_nonce);
+ return -1;
+ }
+
+ rd_free(server_nonce);
+ rd_free(salt.ptr);
+
+ return 0;
+}
+
+/**
+ * @brief Handle server-final-message
+ *
+ * This is the end of authentication and the SCRAM state
+ * will be freed at the end of this function regardless of
+ * authentication outcome.
+ *
+ * @returns -1 on failure
+ */
+static int
+rd_kafka_sasl_scram_handle_server_final_message (
+ rd_kafka_transport_t *rktrans,
+ const rd_chariov_t *in,
+ char *errstr, size_t errstr_size) {
+ struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
+ char *attr_v, *attr_e;
+
+ if ((attr_e = rd_kafka_sasl_scram_get_attr(
+ in, 'e', "server-error in server-final-message",
+ errstr, errstr_size))) {
+ /* Authentication failed */
+
+ rd_snprintf(errstr, errstr_size,
+ "SASL SCRAM authentication failed: "
+ "broker responded with %s",
+ attr_e);
+ rd_free(attr_e);
+ return -1;
+
+ } else if ((attr_v = rd_kafka_sasl_scram_get_attr(
+ in, 'v', "verifier in server-final-message",
+ errstr, errstr_size))) {
+ const rd_kafka_conf_t *conf;
+
+ /* Authentication succesful on server,
+ * but we need to verify the ServerSignature too. */
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY | RD_KAFKA_DBG_BROKER,
+ "SCRAMAUTH",
+ "SASL SCRAM authentication succesful on server: "
+ "verifying ServerSignature");
+
+ if (strcmp(attr_v, state->ServerSignatureB64)) {
+ rd_snprintf(errstr, errstr_size,
+ "SASL SCRAM authentication failed: "
+ "ServerSignature mismatch "
+ "(server's %s != ours %s)",
+ attr_v, state->ServerSignatureB64);
+ rd_free(attr_v);
+ return -1;
+ }
+ rd_free(attr_v);
+
+ conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf;
+
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY | RD_KAFKA_DBG_BROKER,
+ "SCRAMAUTH",
+ "Authenticated as %s using %s",
+ conf->sasl.username,
+ conf->sasl.mechanisms);
+
+ rd_kafka_sasl_auth_done(rktrans);
+ return 0;
+
+ } else {
+ rd_snprintf(errstr, errstr_size,
+ "SASL SCRAM authentication failed: "
+ "no verifier or server-error returned from broker");
+ return -1;
+ }
+}
+
+
+
+/**
+ * @brief Build client-first-message
+ */
+static void
+rd_kafka_sasl_scram_build_client_first_message (
+ rd_kafka_transport_t *rktrans,
+ rd_chariov_t *out) {
+ char *sasl_username;
+ struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
+ const rd_kafka_conf_t *conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf;
+
+ rd_kafka_sasl_scram_generate_nonce(&state->cnonce);
+
+ sasl_username = rd_kafka_sasl_safe_string(conf->sasl.username);
+
+ out->size = strlen("n,,n=,r=") + strlen(sasl_username) +
+ state->cnonce.size;
+ out->ptr = rd_malloc(out->size+1);
+
+ rd_snprintf(out->ptr, out->size+1,
+ "n,,n=%s,r=%.*s",
+ sasl_username,
+ (int)state->cnonce.size, state->cnonce.ptr);
+ rd_free(sasl_username);
+
+ /* Save client-first-message-bare (skip gs2-header) */
+ state->first_msg_bare.size = out->size-3;
+ state->first_msg_bare.ptr = rd_memdup(out->ptr+3,
+ state->first_msg_bare.size);
+}
+
+
+
+/**
+ * @brief SASL SCRAM client state machine
+ * @returns -1 on failure (errstr set), else 0.
+ */
+static int rd_kafka_sasl_scram_fsm (rd_kafka_transport_t *rktrans,
+ const rd_chariov_t *in,
+ char *errstr, size_t errstr_size) {
+ static const char *state_names[] = {
+ "client-first-message",
+ "server-first-message",
+ "client-final-message",
+ };
+ struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
+ rd_chariov_t out = RD_ZERO_INIT;
+ int r = -1;
+ rd_ts_t ts_start = rd_clock();
+ int prev_state = state->state;
+
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLSCRAM",
+ "SASL SCRAM client in state %s",
+ state_names[state->state]);
+
+ switch (state->state)
+ {
+ case RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FIRST_MESSAGE:
+ rd_dassert(!in); /* Not expecting any server-input */
+
+ rd_kafka_sasl_scram_build_client_first_message(rktrans, &out);
+ state->state = RD_KAFKA_SASL_SCRAM_STATE_SERVER_FIRST_MESSAGE;
+ break;
+
+
+ case RD_KAFKA_SASL_SCRAM_STATE_SERVER_FIRST_MESSAGE:
+ rd_dassert(in); /* Requires server-input */
+
+ if (rd_kafka_sasl_scram_handle_server_first_message(
+ rktrans, in, &out, errstr, errstr_size) == -1)
+ return -1;
+
+ state->state = RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FINAL_MESSAGE;
+ break;
+
+ case RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FINAL_MESSAGE:
+ rd_dassert(in); /* Requires server-input */
+
+ r = rd_kafka_sasl_scram_handle_server_final_message(
+ rktrans, in, errstr, errstr_size);
+ break;
+ }
+
+ if (out.ptr) {
+ r = rd_kafka_sasl_send(rktrans, out.ptr, (int)out.size,
+ errstr, errstr_size);
+ rd_free(out.ptr);
+ }
+
+ ts_start = (rd_clock() - ts_start) / 1000;
+ if (ts_start >= 100)
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM",
+ "SASL SCRAM state %s handled in %"PRId64"ms",
+ state_names[prev_state], ts_start);
+
+
+ return r;
+}
+
+
+/**
+ * @brief Handle received frame from broker.
+ */
+static int rd_kafka_sasl_scram_recv (rd_kafka_transport_t *rktrans,
+ const void *buf, size_t size,
+ char *errstr, size_t errstr_size) {
+ const rd_chariov_t in = { .ptr = (char *)buf, .size = size };
+ return rd_kafka_sasl_scram_fsm(rktrans, &in, errstr, errstr_size);
+}
+
+
+/**
+ * @brief Initialize and start SASL SCRAM (builtin) authentication.
+ *
+ * Returns 0 on successful init and -1 on error.
+ *
+ * @locality broker thread
+ */
+static int rd_kafka_sasl_scram_client_new (rd_kafka_transport_t *rktrans,
+ const char *hostname,
+ char *errstr, size_t errstr_size) {
+ struct rd_kafka_sasl_scram_state *state;
+
+ state = rd_calloc(1, sizeof(*state));
+ state->state = RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FIRST_MESSAGE;
+ rktrans->rktrans_sasl.state = state;
+
+ /* Kick off the FSM */
+ return rd_kafka_sasl_scram_fsm(rktrans, NULL, errstr, errstr_size);
+}
+
+
+
+/**
+ * @brief Validate SCRAM config and look up the hash function
+ */
+static int rd_kafka_sasl_scram_conf_validate (rd_kafka_t *rk,
+ char *errstr,
+ size_t errstr_size) {
+ const char *mech = rk->rk_conf.sasl.mechanisms;
+
+ if (!rk->rk_conf.sasl.username || !rk->rk_conf.sasl.password) {
+ rd_snprintf(errstr, errstr_size,
+ "sasl.username and sasl.password must be set");
+ return -1;
+ }
+
+ if (!strcmp(mech, "SCRAM-SHA-1")) {
+ rk->rk_conf.sasl.scram_evp = EVP_sha1();
+ rk->rk_conf.sasl.scram_H = SHA1;
+ rk->rk_conf.sasl.scram_H_size = SHA_DIGEST_LENGTH;
+ } else if (!strcmp(mech, "SCRAM-SHA-256")) {
+ rk->rk_conf.sasl.scram_evp = EVP_sha256();
+ rk->rk_conf.sasl.scram_H = SHA256;
+ rk->rk_conf.sasl.scram_H_size = SHA256_DIGEST_LENGTH;
+ } else if (!strcmp(mech, "SCRAM-SHA-512")) {
+ rk->rk_conf.sasl.scram_evp = EVP_sha512();
+ rk->rk_conf.sasl.scram_H = SHA512;
+ rk->rk_conf.sasl.scram_H_size = SHA512_DIGEST_LENGTH;
+ } else {
+ rd_snprintf(errstr, errstr_size,
+ "Unsupported hash function: %s "
+ "(try SCRAM-SHA-512)",
+ mech);
+ return -1;
+ }
+
+ return 0;
+}
+
+
+
+
+const struct rd_kafka_sasl_provider rd_kafka_sasl_scram_provider = {
+ .name = "SCRAM (builtin)",
+ .client_new = rd_kafka_sasl_scram_client_new,
+ .recv = rd_kafka_sasl_scram_recv,
+ .close = rd_kafka_sasl_scram_close,
+ .conf_validate = rd_kafka_sasl_scram_conf_validate,
+};