You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by fr...@apache.org on 2019/07/02 14:02:19 UTC
[impala] 02/03: Update squeasel to
7973705170f4744d1806e32695f7ea1e8308ee95
This is an automated email from the ASF dual-hosted git repository.
fredyw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit e8c60a30839463f162e31fc76a87de9753531b72
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Fri Jun 28 16:40:07 2019 -0700
Update squeasel to 7973705170f4744d1806e32695f7ea1e8308ee95
This updates to the latest build of squeasel, which fixes a few small
issues and also improves support for keepalive. This patch doesn't
itself enable keepalive, but switches to the new APIs.
Change-Id: I17f90561e0ea6b0917fff51b055225060a4fa549
Reviewed-on: http://gerrit.cloudera.org:8080/13768
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/thirdparty/squeasel/squeasel.c | 63 +++++++++++++++++++++++++++--------
be/src/thirdparty/squeasel/squeasel.h | 19 ++++++++---
be/src/util/webserver.cc | 21 +++++-------
be/src/util/webserver.h | 5 +--
4 files changed, 77 insertions(+), 31 deletions(-)
diff --git a/be/src/thirdparty/squeasel/squeasel.c b/be/src/thirdparty/squeasel/squeasel.c
index 045740d..b301f5c 100644
--- a/be/src/thirdparty/squeasel/squeasel.c
+++ b/be/src/thirdparty/squeasel/squeasel.c
@@ -923,6 +923,18 @@ static int64_t push(FILE *fp, SOCKET sock, SSL *ssl, const char *buf,
return sent;
}
+// Wait for either 'fd' or 'wakeup_fds' to have readable data.
+static void wait_for_readable_or_wakeup(struct sq_context *ctx,
+ int fd, int timeout_ms) {
+ struct pollfd pfd[2];
+ pfd[0].fd = fd;
+ pfd[0].events = POLLIN;
+ pfd[1].fd = ctx->wakeup_fds[0];
+ pfd[1].events = POLLIN;
+ int poll_rc;
+ RETRY_ON_EINTR(poll_rc, poll(pfd, 2, timeout_ms));
+}
+
// Read from IO channel - opened file descriptor, socket, or SSL descriptor.
// Return negative value on error, or number of bytes read on success.
static int pull(FILE *fp, struct sq_connection *conn, char *buf, int len) {
@@ -939,7 +951,7 @@ static int pull(FILE *fp, struct sq_connection *conn, char *buf, int len) {
#endif
} else {
RETRY_ON_EINTR(nread, recv(conn->client.sock, buf, (size_t) len, 0));
- if (nread == -1) {
+ if (nread == -1 && errno != EAGAIN) {
cry(conn, "error reading: %s", strerror(errno));
}
}
@@ -2535,6 +2547,14 @@ static int read_request(FILE *fp, struct sq_connection *conn,
int request_len, n = 0;
request_len = get_request_len(buf, *nread);
+ if (request_len == 0) {
+ // If we are starting to read a new request, with nothing buffered,
+ // wait for either the beginning of the request, or for the shutdown
+ // signal.
+ wait_for_readable_or_wakeup(conn->ctx, fp ? fileno(fp) : conn->client.sock,
+ atoi(conn->ctx->config[REQUEST_TIMEOUT]));
+ }
+
while (conn->ctx->stop_flag == 0 &&
*nread < bufsiz && request_len == 0 &&
(n = pull(fp, conn, buf + *nread, bufsiz - *nread)) > 0) {
@@ -3850,6 +3870,7 @@ static void handle_request(struct sq_connection *conn) {
char path[PATH_MAX];
int uri_len, ssl_index;
struct file file = STRUCT_FILE_INITIALIZER;
+ sq_callback_result_t callback_result = SQ_HANDLED_OK;
if ((conn->request_info.query_string = strchr(ri->uri, '?')) != NULL) {
* ((char *) conn->request_info.query_string++) = '\0';
@@ -3871,8 +3892,10 @@ static void handle_request(struct sq_connection *conn) {
!check_authorization(conn, path)) {
send_authorization_request(conn);
} else if (conn->ctx->callbacks.begin_request != NULL &&
- conn->ctx->callbacks.begin_request(conn)) {
+ ((callback_result = conn->ctx->callbacks.begin_request(conn))
+ != SQ_CONTINUE_HANDLING)) {
// Do nothing, callback has served the request
+ conn->must_close = (callback_result == SQ_HANDLED_CLOSE_CONNECTION);
#if defined(USE_WEBSOCKET)
} else if (is_websocket_request(conn)) {
handle_websocket_request(conn);
@@ -4313,11 +4336,12 @@ static int set_ssl_option(struct sq_context *ctx) {
EC_KEY* ecdh = EC_KEY_new_by_curve_name(NID_X9_62_prime256v1);
if (ecdh == NULL) {
cry(fc(ctx), "EC_KEY_new_by_curve_name: %s", ssl_error());
- }
-
- int rc = SSL_CTX_set_tmp_ecdh(ctx->ssl_ctx, ecdh);
- if (rc <= 0) {
- cry(fc(ctx), "SSL_CTX_set_tmp_ecdh: %s", ssl_error());
+ } else {
+ int rc = SSL_CTX_set_tmp_ecdh(ctx->ssl_ctx, ecdh);
+ if (rc <= 0) {
+ cry(fc(ctx), "SSL_CTX_set_tmp_ecdh: %s", ssl_error());
+ }
+ EC_KEY_free(ecdh);
}
#elif OPENSSL_VERSION_NUMBER < 0x10100000L
// OpenSSL 1.0.2 provides the set_ecdh_auto API which internally figures out
@@ -4459,7 +4483,14 @@ static int is_valid_uri(const char *uri) {
return uri[0] == '/' || (uri[0] == '*' && uri[1] == '\0');
}
-static int getreq(struct sq_connection *conn, char *ebuf, size_t ebuf_len) {
+
+typedef enum {
+ GETREQ_OK,
+ GETREQ_KEEPALIVE_TIMEOUT,
+ GETREQ_ERROR
+} GetReqResult;
+
+static GetReqResult getreq(struct sq_connection *conn, char *ebuf, size_t ebuf_len) {
const char *cl;
ebuf[0] = '\0';
@@ -4470,11 +4501,13 @@ static int getreq(struct sq_connection *conn, char *ebuf, size_t ebuf_len) {
if (conn->request_len == 0 && conn->data_len == conn->buf_size) {
snprintf(ebuf, ebuf_len, "%s", "Request Too Large");
+ return GETREQ_ERROR;
} else if (conn->request_len <= 0) {
- snprintf(ebuf, ebuf_len, "%s", "Client closed connection");
+ return GETREQ_KEEPALIVE_TIMEOUT;
} else if (parse_http_message(conn->buf, conn->buf_size,
&conn->request_info) <= 0) {
snprintf(ebuf, ebuf_len, "Bad request: [%.*s]", conn->data_len, conn->buf);
+ return GETREQ_ERROR;
} else {
// Request is valid
if ((cl = get_header(&conn->request_info, "Content-Length")) != NULL) {
@@ -4487,7 +4520,7 @@ static int getreq(struct sq_connection *conn, char *ebuf, size_t ebuf_len) {
}
conn->birth_time = time(NULL);
}
- return ebuf[0] == '\0';
+ return GETREQ_OK;
}
struct sq_connection *sq_download(const char *host, int port, int use_ssl,
@@ -4517,6 +4550,7 @@ static void process_new_connection(struct sq_connection *conn) {
struct sq_request_info *ri = &conn->request_info;
int keep_alive_enabled, keep_alive, discard_len;
char ebuf[100];
+ GetReqResult getreq_status;
keep_alive_enabled = !strcmp(conn->ctx->config[ENABLE_KEEP_ALIVE], "yes");
keep_alive = 0;
@@ -4525,8 +4559,11 @@ static void process_new_connection(struct sq_connection *conn) {
// to crule42.
conn->data_len = 0;
do {
- if (!getreq(conn, ebuf, sizeof(ebuf))) {
- send_http_error(conn, 500, "Server Error", "%s", ebuf);
+ getreq_status = getreq(conn, ebuf, sizeof(ebuf));
+ if (getreq_status != GETREQ_OK) {
+ if (getreq_status == GETREQ_ERROR) {
+ send_http_error(conn, 500, "Server Error", "%s", ebuf);
+ }
conn->must_close = 1;
} else if (!is_valid_uri(conn->request_info.uri)) {
char* encoded = (char*) malloc(SQ_BUF_LEN);
@@ -4540,7 +4577,7 @@ static void process_new_connection(struct sq_connection *conn) {
send_http_error(conn, 505, "Bad HTTP version", "%s", ebuf);
}
- if (ebuf[0] == '\0') {
+ if (getreq_status == GETREQ_OK) {
handle_request(conn);
if (conn->ctx->callbacks.end_request != NULL) {
conn->ctx->callbacks.end_request(conn, conn->status_code);
diff --git a/be/src/thirdparty/squeasel/squeasel.h b/be/src/thirdparty/squeasel/squeasel.h
index 68a4034..d4ca155 100644
--- a/be/src/thirdparty/squeasel/squeasel.h
+++ b/be/src/thirdparty/squeasel/squeasel.h
@@ -88,18 +88,29 @@ struct sq_request_info {
} http_headers[64]; // Maximum 64 headers
};
+typedef enum sq_callback_result {
+ // The callback didn't handle the request, and squeasel should
+ // continue with request processing.
+ SQ_CONTINUE_HANDLING = 0,
+ // The callback handled the request, and the connection is still
+ // in a valid state.
+ SQ_HANDLED_OK = 1,
+ // The callback handled the request, but no more requests should
+ // be read from this connection (eg the request was invalid).
+ SQ_HANDLED_CLOSE_CONNECTION = 2
+} sq_callback_result_t;
// This structure needs to be passed to sq_start(), to let squeasel know
// which callbacks to invoke. For detailed description, see
// https://github.com/cloudera/squeasel/blob/master/UserManual.md
struct sq_callbacks {
// Called when squeasel has received new HTTP request.
- // If callback returns non-zero,
+ // If callback returns one of the SQ_HANDLED_* results,
// callback must process the request by sending valid HTTP headers and body,
// and squeasel will not do any further processing.
- // If callback returns 0, squeasel processes the request itself. In this case,
- // callback must not send any data to the client.
- int (*begin_request)(struct sq_connection *);
+ // If callback returns SQ_CONTINUE_HANDLING, squeasel processes the request itself.
+ // In this case, callback must not send any data to the client.
+ sq_callback_result_t (*begin_request)(struct sq_connection *);
// Called when squeasel has finished processing request.
void (*end_request)(const struct sq_connection *, int reply_status_code);
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 3ea3e65..caf45d3 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -112,10 +112,6 @@ DECLARE_string(ssl_cipher_list);
static const char* DOC_FOLDER = "/www/";
static const int DOC_FOLDER_LEN = strlen(DOC_FOLDER);
-// Easy-to-read constants for Squeasel return codes
-static const uint32_t PROCESSING_COMPLETE = 1;
-static const uint32_t NOT_PROCESSED = 0;
-
// Standard key in the json document sent to templates for rendering. Must be kept in
// sync with the templates themselves.
static const char* COMMON_JSON_KEY = "__common__";
@@ -391,23 +387,24 @@ int Webserver::LogMessageCallbackStatic(const struct sq_connection* connection,
if (message != nullptr) {
LOG(INFO) << "Webserver: " << message;
}
- return PROCESSING_COMPLETE;
+ return SQ_HANDLED_OK;
}
-int Webserver::BeginRequestCallbackStatic(struct sq_connection* connection) {
+sq_callback_result_t Webserver::BeginRequestCallbackStatic(
+ struct sq_connection* connection) {
struct sq_request_info* request_info = sq_get_request_info(connection);
Webserver* instance = reinterpret_cast<Webserver*>(request_info->user_data);
return instance->BeginRequestCallback(connection, request_info);
}
-int Webserver::BeginRequestCallback(struct sq_connection* connection,
+sq_callback_result_t Webserver::BeginRequestCallback(struct sq_connection* connection,
struct sq_request_info* request_info) {
if (!FLAGS_webserver_doc_root.empty() && FLAGS_enable_webserver_doc_root) {
if (strncmp(DOC_FOLDER, request_info->uri, DOC_FOLDER_LEN) == 0) {
VLOG(2) << "HTTP File access: " << request_info->uri;
// Let Squeasel deal with this request; returning NULL will fall through
// to the default handler which will serve files.
- return NOT_PROCESSED;
+ return SQ_CONTINUE_HANDLING;
}
}
@@ -445,7 +442,7 @@ int Webserver::BeginRequestCallback(struct sq_connection* connection,
sq_printf(connection,
"HTTP/1.1 %s\r\n",
HttpStatusCodeToString(HttpStatusCode::LengthRequired).c_str());
- return 1;
+ return SQ_HANDLED_OK;
}
if (content_len > FLAGS_webserver_max_post_length_bytes) {
// TODO: for this and other HTTP requests, we should log the
@@ -454,7 +451,7 @@ int Webserver::BeginRequestCallback(struct sq_connection* connection,
sq_printf(connection,
"HTTP/1.1 %s\r\n",
HttpStatusCodeToString(HttpStatusCode::RequestEntityTooLarge).c_str());
- return 1;
+ return SQ_HANDLED_CLOSE_CONNECTION;
}
char buf[8192];
@@ -468,7 +465,7 @@ int Webserver::BeginRequestCallback(struct sq_connection* connection,
sq_printf(connection,
"HTTP/1.1 %s\r\n",
HttpStatusCodeToString(HttpStatusCode::InternalServerError).c_str());
- return 1;
+ return SQ_HANDLED_CLOSE_CONNECTION;
}
req.post_data.append(buf, n);
@@ -501,7 +498,7 @@ int Webserver::BeginRequestCallback(struct sq_connection* connection,
// Make sure to use sq_write for printing the body; sq_printf truncates at 8kb
sq_write(connection, str.c_str(), str.length());
- return PROCESSING_COMPLETE;
+ return SQ_HANDLED_OK;
}
void Webserver::RenderUrlWithTemplate(const WebRequest& req,
diff --git a/be/src/util/webserver.h b/be/src/util/webserver.h
index 2fbc0cd..21b85be 100644
--- a/be/src/util/webserver.h
+++ b/be/src/util/webserver.h
@@ -156,10 +156,11 @@ class Webserver {
/// Squeasel callback for HTTP request events. Static so that it can act as a function
/// pointer, and then call the next method. Returns squeasel success code.
- static int BeginRequestCallbackStatic(struct sq_connection* connection);
+ static sq_callback_result_t BeginRequestCallbackStatic(
+ struct sq_connection* connection);
/// Dispatch point for all incoming requests. Returns squeasel success code.
- int BeginRequestCallback(struct sq_connection* connection,
+ sq_callback_result_t BeginRequestCallback(struct sq_connection* connection,
struct sq_request_info* request_info);
/// Renders URLs through the Mustache templating library.