You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by fr...@apache.org on 2019/07/02 14:02:19 UTC

[impala] 02/03: Update squeasel to 7973705170f4744d1806e32695f7ea1e8308ee95

This is an automated email from the ASF dual-hosted git repository.

fredyw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e8c60a30839463f162e31fc76a87de9753531b72
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Fri Jun 28 16:40:07 2019 -0700

    Update squeasel to 7973705170f4744d1806e32695f7ea1e8308ee95
    
    This updates to the latest build of squeasel, which fixes a few small
    issues and also improves support for keepalive. This patch doesn't
    itself enable keepalive, but switches to the new APIs.
    
    Change-Id: I17f90561e0ea6b0917fff51b055225060a4fa549
    Reviewed-on: http://gerrit.cloudera.org:8080/13768
    Reviewed-by: Todd Lipcon <to...@apache.org>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/thirdparty/squeasel/squeasel.c | 63 +++++++++++++++++++++++++++--------
 be/src/thirdparty/squeasel/squeasel.h | 19 ++++++++---
 be/src/util/webserver.cc              | 21 +++++-------
 be/src/util/webserver.h               |  5 +--
 4 files changed, 77 insertions(+), 31 deletions(-)

diff --git a/be/src/thirdparty/squeasel/squeasel.c b/be/src/thirdparty/squeasel/squeasel.c
index 045740d..b301f5c 100644
--- a/be/src/thirdparty/squeasel/squeasel.c
+++ b/be/src/thirdparty/squeasel/squeasel.c
@@ -923,6 +923,18 @@ static int64_t push(FILE *fp, SOCKET sock, SSL *ssl, const char *buf,
   return sent;
 }
 
+// Wait for either 'fd' or 'wakeup_fds' to have readable data.
+static void wait_for_readable_or_wakeup(struct sq_context *ctx,
+    int fd, int timeout_ms) {
+  struct pollfd pfd[2];
+  pfd[0].fd = fd;
+  pfd[0].events = POLLIN;
+  pfd[1].fd = ctx->wakeup_fds[0];
+  pfd[1].events = POLLIN;
+  int poll_rc;
+  RETRY_ON_EINTR(poll_rc, poll(pfd, 2, timeout_ms));
+}
+
 // Read from IO channel - opened file descriptor, socket, or SSL descriptor.
 // Return negative value on error, or number of bytes read on success.
 static int pull(FILE *fp, struct sq_connection *conn, char *buf, int len) {
@@ -939,7 +951,7 @@ static int pull(FILE *fp, struct sq_connection *conn, char *buf, int len) {
 #endif
   } else {
     RETRY_ON_EINTR(nread, recv(conn->client.sock, buf, (size_t) len, 0));
-    if (nread == -1) {
+    if (nread == -1 && errno != EAGAIN) {
       cry(conn, "error reading: %s", strerror(errno));
     }
   }
@@ -2535,6 +2547,14 @@ static int read_request(FILE *fp, struct sq_connection *conn,
   int request_len, n = 0;
 
   request_len = get_request_len(buf, *nread);
+  if (request_len == 0) {
+    // If we are starting to read a new request, with nothing buffered,
+    // wait for either the beginning of the request, or for the shutdown
+    // signal.
+    wait_for_readable_or_wakeup(conn->ctx, fp ? fileno(fp) : conn->client.sock,
+        atoi(conn->ctx->config[REQUEST_TIMEOUT]));
+  }
+
   while (conn->ctx->stop_flag == 0 &&
          *nread < bufsiz && request_len == 0 &&
          (n = pull(fp, conn, buf + *nread, bufsiz - *nread)) > 0) {
@@ -3850,6 +3870,7 @@ static void handle_request(struct sq_connection *conn) {
   char path[PATH_MAX];
   int uri_len, ssl_index;
   struct file file = STRUCT_FILE_INITIALIZER;
+  sq_callback_result_t callback_result = SQ_HANDLED_OK;
 
   if ((conn->request_info.query_string = strchr(ri->uri, '?')) != NULL) {
     * ((char *) conn->request_info.query_string++) = '\0';
@@ -3871,8 +3892,10 @@ static void handle_request(struct sq_connection *conn) {
              !check_authorization(conn, path)) {
     send_authorization_request(conn);
   } else if (conn->ctx->callbacks.begin_request != NULL &&
-      conn->ctx->callbacks.begin_request(conn)) {
+      ((callback_result = conn->ctx->callbacks.begin_request(conn))
+          != SQ_CONTINUE_HANDLING)) {
     // Do nothing, callback has served the request
+    conn->must_close = (callback_result == SQ_HANDLED_CLOSE_CONNECTION);
 #if defined(USE_WEBSOCKET)
   } else if (is_websocket_request(conn)) {
     handle_websocket_request(conn);
@@ -4313,11 +4336,12 @@ static int set_ssl_option(struct sq_context *ctx) {
     EC_KEY* ecdh = EC_KEY_new_by_curve_name(NID_X9_62_prime256v1);
     if (ecdh == NULL) {
       cry(fc(ctx), "EC_KEY_new_by_curve_name: %s", ssl_error());
-    }
-
-    int rc = SSL_CTX_set_tmp_ecdh(ctx->ssl_ctx, ecdh);
-    if (rc <= 0) {
-      cry(fc(ctx), "SSL_CTX_set_tmp_ecdh: %s", ssl_error());
+    } else {
+      int rc = SSL_CTX_set_tmp_ecdh(ctx->ssl_ctx, ecdh);
+      if (rc <= 0) {
+        cry(fc(ctx), "SSL_CTX_set_tmp_ecdh: %s", ssl_error());
+      }
+      EC_KEY_free(ecdh);
     }
 #elif OPENSSL_VERSION_NUMBER < 0x10100000L
     // OpenSSL 1.0.2 provides the set_ecdh_auto API which internally figures out
@@ -4459,7 +4483,14 @@ static int is_valid_uri(const char *uri) {
   return uri[0] == '/' || (uri[0] == '*' && uri[1] == '\0');
 }
 
-static int getreq(struct sq_connection *conn, char *ebuf, size_t ebuf_len) {
+
+typedef enum {
+  GETREQ_OK,
+  GETREQ_KEEPALIVE_TIMEOUT,
+  GETREQ_ERROR
+} GetReqResult;
+
+static GetReqResult getreq(struct sq_connection *conn, char *ebuf, size_t ebuf_len) {
   const char *cl;
 
   ebuf[0] = '\0';
@@ -4470,11 +4501,13 @@ static int getreq(struct sq_connection *conn, char *ebuf, size_t ebuf_len) {
 
   if (conn->request_len == 0 && conn->data_len == conn->buf_size) {
     snprintf(ebuf, ebuf_len, "%s", "Request Too Large");
+    return GETREQ_ERROR;
   } else if (conn->request_len <= 0) {
-    snprintf(ebuf, ebuf_len, "%s", "Client closed connection");
+    return GETREQ_KEEPALIVE_TIMEOUT;
   } else if (parse_http_message(conn->buf, conn->buf_size,
                                 &conn->request_info) <= 0) {
     snprintf(ebuf, ebuf_len, "Bad request: [%.*s]", conn->data_len, conn->buf);
+    return GETREQ_ERROR;
   } else {
     // Request is valid
     if ((cl = get_header(&conn->request_info, "Content-Length")) != NULL) {
@@ -4487,7 +4520,7 @@ static int getreq(struct sq_connection *conn, char *ebuf, size_t ebuf_len) {
     }
     conn->birth_time = time(NULL);
   }
-  return ebuf[0] == '\0';
+  return GETREQ_OK;
 }
 
 struct sq_connection *sq_download(const char *host, int port, int use_ssl,
@@ -4517,6 +4550,7 @@ static void process_new_connection(struct sq_connection *conn) {
   struct sq_request_info *ri = &conn->request_info;
   int keep_alive_enabled, keep_alive, discard_len;
   char ebuf[100];
+  GetReqResult getreq_status;
 
   keep_alive_enabled = !strcmp(conn->ctx->config[ENABLE_KEEP_ALIVE], "yes");
   keep_alive = 0;
@@ -4525,8 +4559,11 @@ static void process_new_connection(struct sq_connection *conn) {
   // to crule42.
   conn->data_len = 0;
   do {
-    if (!getreq(conn, ebuf, sizeof(ebuf))) {
-      send_http_error(conn, 500, "Server Error", "%s", ebuf);
+    getreq_status = getreq(conn, ebuf, sizeof(ebuf));
+    if (getreq_status != GETREQ_OK) {
+      if (getreq_status == GETREQ_ERROR) {
+        send_http_error(conn, 500, "Server Error", "%s", ebuf);
+      }
       conn->must_close = 1;
     } else if (!is_valid_uri(conn->request_info.uri)) {
       char* encoded = (char*) malloc(SQ_BUF_LEN);
@@ -4540,7 +4577,7 @@ static void process_new_connection(struct sq_connection *conn) {
       send_http_error(conn, 505, "Bad HTTP version", "%s", ebuf);
     }
 
-    if (ebuf[0] == '\0') {
+    if (getreq_status == GETREQ_OK) {
       handle_request(conn);
       if (conn->ctx->callbacks.end_request != NULL) {
         conn->ctx->callbacks.end_request(conn, conn->status_code);
diff --git a/be/src/thirdparty/squeasel/squeasel.h b/be/src/thirdparty/squeasel/squeasel.h
index 68a4034..d4ca155 100644
--- a/be/src/thirdparty/squeasel/squeasel.h
+++ b/be/src/thirdparty/squeasel/squeasel.h
@@ -88,18 +88,29 @@ struct sq_request_info {
   } http_headers[64];         // Maximum 64 headers
 };
 
+typedef enum sq_callback_result {
+  // The callback didn't handle the request, and squeasel should
+  // continue with request processing.
+  SQ_CONTINUE_HANDLING = 0,
+  // The callback handled the request, and the connection is still
+  // in a valid state.
+  SQ_HANDLED_OK = 1,
+  // The callback handled the request, but no more requests should
+  // be read from this connection (eg the request was invalid).
+  SQ_HANDLED_CLOSE_CONNECTION = 2
+} sq_callback_result_t;
 
 // This structure needs to be passed to sq_start(), to let squeasel know
 // which callbacks to invoke. For detailed description, see
 // https://github.com/cloudera/squeasel/blob/master/UserManual.md
 struct sq_callbacks {
   // Called when squeasel has received new HTTP request.
-  // If callback returns non-zero,
+  // If callback returns one of the SQ_HANDLED_* results,
   // callback must process the request by sending valid HTTP headers and body,
   // and squeasel will not do any further processing.
-  // If callback returns 0, squeasel processes the request itself. In this case,
-  // callback must not send any data to the client.
-  int  (*begin_request)(struct sq_connection *);
+  // If callback returns SQ_CONTINUE_HANDLING, squeasel processes the request itself.
+  // In this case, callback must not send any data to the client.
+  sq_callback_result_t (*begin_request)(struct sq_connection *);
 
   // Called when squeasel has finished processing request.
   void (*end_request)(const struct sq_connection *, int reply_status_code);
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 3ea3e65..caf45d3 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -112,10 +112,6 @@ DECLARE_string(ssl_cipher_list);
 static const char* DOC_FOLDER = "/www/";
 static const int DOC_FOLDER_LEN = strlen(DOC_FOLDER);
 
-// Easy-to-read constants for Squeasel return codes
-static const uint32_t PROCESSING_COMPLETE = 1;
-static const uint32_t NOT_PROCESSED = 0;
-
 // Standard key in the json document sent to templates for rendering. Must be kept in
 // sync with the templates themselves.
 static const char* COMMON_JSON_KEY = "__common__";
@@ -391,23 +387,24 @@ int Webserver::LogMessageCallbackStatic(const struct sq_connection* connection,
   if (message != nullptr) {
     LOG(INFO) << "Webserver: " << message;
   }
-  return PROCESSING_COMPLETE;
+  return SQ_HANDLED_OK;
 }
 
-int Webserver::BeginRequestCallbackStatic(struct sq_connection* connection) {
+sq_callback_result_t Webserver::BeginRequestCallbackStatic(
+    struct sq_connection* connection) {
   struct sq_request_info* request_info = sq_get_request_info(connection);
   Webserver* instance = reinterpret_cast<Webserver*>(request_info->user_data);
   return instance->BeginRequestCallback(connection, request_info);
 }
 
-int Webserver::BeginRequestCallback(struct sq_connection* connection,
+sq_callback_result_t Webserver::BeginRequestCallback(struct sq_connection* connection,
     struct sq_request_info* request_info) {
   if (!FLAGS_webserver_doc_root.empty() && FLAGS_enable_webserver_doc_root) {
     if (strncmp(DOC_FOLDER, request_info->uri, DOC_FOLDER_LEN) == 0) {
       VLOG(2) << "HTTP File access: " << request_info->uri;
       // Let Squeasel deal with this request; returning NULL will fall through
       // to the default handler which will serve files.
-      return NOT_PROCESSED;
+      return SQ_CONTINUE_HANDLING;
     }
   }
 
@@ -445,7 +442,7 @@ int Webserver::BeginRequestCallback(struct sq_connection* connection,
       sq_printf(connection,
                 "HTTP/1.1 %s\r\n",
                 HttpStatusCodeToString(HttpStatusCode::LengthRequired).c_str());
-      return 1;
+      return SQ_HANDLED_OK;
     }
     if (content_len > FLAGS_webserver_max_post_length_bytes) {
       // TODO: for this and other HTTP requests, we should log the
@@ -454,7 +451,7 @@ int Webserver::BeginRequestCallback(struct sq_connection* connection,
       sq_printf(connection,
                 "HTTP/1.1 %s\r\n",
                 HttpStatusCodeToString(HttpStatusCode::RequestEntityTooLarge).c_str());
-      return 1;
+      return SQ_HANDLED_CLOSE_CONNECTION;
     }
 
     char buf[8192];
@@ -468,7 +465,7 @@ int Webserver::BeginRequestCallback(struct sq_connection* connection,
         sq_printf(connection,
                   "HTTP/1.1 %s\r\n",
                   HttpStatusCodeToString(HttpStatusCode::InternalServerError).c_str());
-        return 1;
+        return SQ_HANDLED_CLOSE_CONNECTION;
       }
 
       req.post_data.append(buf, n);
@@ -501,7 +498,7 @@ int Webserver::BeginRequestCallback(struct sq_connection* connection,
 
   // Make sure to use sq_write for printing the body; sq_printf truncates at 8kb
   sq_write(connection, str.c_str(), str.length());
-  return PROCESSING_COMPLETE;
+  return SQ_HANDLED_OK;
 }
 
 void Webserver::RenderUrlWithTemplate(const WebRequest& req,
diff --git a/be/src/util/webserver.h b/be/src/util/webserver.h
index 2fbc0cd..21b85be 100644
--- a/be/src/util/webserver.h
+++ b/be/src/util/webserver.h
@@ -156,10 +156,11 @@ class Webserver {
 
   /// Squeasel callback for HTTP request events. Static so that it can act as a function
   /// pointer, and then call the next method. Returns squeasel success code.
-  static int BeginRequestCallbackStatic(struct sq_connection* connection);
+  static sq_callback_result_t BeginRequestCallbackStatic(
+      struct sq_connection* connection);
 
   /// Dispatch point for all incoming requests. Returns squeasel success code.
-  int BeginRequestCallback(struct sq_connection* connection,
+  sq_callback_result_t BeginRequestCallback(struct sq_connection* connection,
       struct sq_request_info* request_info);
 
   /// Renders URLs through the Mustache templating library.