You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@httpd.apache.org by Graham Leggett <mi...@sharp.fm> on 2020/02/22 18:42:50 UTC

POC: Allowing ap_process_connection() to return EAGAIN

Hi all,

I’ve put together a proof of concept as to how ap_process_connection() might be able to return EAGAIN (or AGAIN in this case).

The idea is that ap_process_connection() can return AGAIN at any time, and if so, we’ll jump ahead to where we left off and run the hook again. This way the MPMs aren’t obliged to swallow a whole request before returning.

This doesn’t yet work, it’s just to illustrate the idea.

Regards,
Graham
--

Index: include/ap_mpm.h
===================================================================
--- include/ap_mpm.h	(revision 1874370)
+++ include/ap_mpm.h	(working copy)
@@ -182,6 +182,8 @@
 #define AP_MPMQ_CAN_SUSPEND          17
 /** MPM supports additional pollfds */
 #define AP_MPMQ_CAN_POLL             18
+/** MPM supports EAGAIN */
+#define AP_MPMQ_CAN_AGAIN            19
 /** @} */
 
 /**
Index: include/http_connection.h
===================================================================
--- include/http_connection.h	(revision 1874370)
+++ include/http_connection.h	(working copy)
@@ -40,8 +40,11 @@
  * @param csd The mechanism on which this connection is to be read.
  *            Most times this will be a socket, but it is up to the module
  *            that accepts the request to determine the exact type.
+ * @return OK if processing is complete, SUSPENDED if processing
+ *         should be suspended and retried at a later time, and
+ *         AGAIN if processing should be retried when data is available.
  */
-AP_CORE_DECLARE(void) ap_process_connection(conn_rec *c, void *csd);
+AP_CORE_DECLARE(int) ap_process_connection(conn_rec *c, void *csd);
 
 /**
  * Shutdown the connection for writing.
@@ -109,7 +112,7 @@
  * @param csd The mechanism on which this connection is to be read.
  *            Most times this will be a socket, but it is up to the module
  *            that accepts the request to determine the exact type.
- * @return OK or DECLINED
+ * @return OK, DECLINED, SUSPENDED or AGAIN.
  */
 AP_DECLARE_HOOK(int,pre_connection,(conn_rec *c, void *csd))
 
@@ -119,7 +122,7 @@
  * function does that for each protocol module.  The first protocol module
  * to handle the request is the last module run.
  * @param c The connection on which the request has been received.
- * @return OK or DECLINED
+ * @return OK, DECLINED, SUSPENDED or AGAIN.
  */
 AP_DECLARE_HOOK(int,process_connection,(conn_rec *c))
 
Index: include/httpd.h
===================================================================
--- include/httpd.h	(revision 1874370)
+++ include/httpd.h	(working copy)
@@ -464,6 +464,9 @@
                                  */
 #define SUSPENDED -3 /**< Module will handle the remainder of the request.
                       * The core will never invoke the request again, */
+#define AGAIN -4                /**< Module has not finished handling the
+                                  * stage and wants to be called again
+                                  */
 
 /** Returned by the bottom-most filter if no data was written.
  *  @see ap_pass_brigade(). */
Index: server/connection.c
===================================================================
--- server/connection.c	(revision 1874370)
+++ server/connection.c	(working copy)
@@ -30,6 +30,8 @@
 #include "http_log.h"
 #include "util_filter.h"
 
+#include "core.h"
+
 APR_HOOK_STRUCT(
             APR_HOOK_LINK(create_connection)
             APR_HOOK_LINK(process_connection)
@@ -205,17 +207,38 @@
     apr_socket_close(csd);
 }
 
-AP_CORE_DECLARE(void) ap_process_connection(conn_rec *c, void *csd)
+AP_CORE_DECLARE(int) ap_process_connection(conn_rec *c, void *csd)
 {
-    int rc;
+    int rc = OK;
+    conn_config_t *conn_config = ap_get_core_module_config(c->conn_config);
+
+switch (conn_config->process_connection_st) {
+case STATE_UPDATE_VHOST_GIVEN_IP:
+
     ap_update_vhost_given_ip(c);
 
+conn_config->process_connection_st = STATE_RUN_PRE_CONNECTION;
+case STATE_RUN_PRE_CONNECTION:
+
     rc = ap_run_pre_connection(c, csd);
+    if (rc == SUSPENDED || rc == AGAIN) {
+        return rc;
+    }
     if (rc != OK && rc != DONE) {
         c->aborted = 1;
     }
 
     if (!c->aborted) {
-        ap_run_process_connection(c);
+
+conn_config->process_connection_st = STATE_RUN_PROCESS_CONNECTION;
+case STATE_RUN_PROCESS_CONNECTION:
+
+        rc = ap_run_process_connection(c);
+
     }
+
+}; /* end of switch */
+
+    return rc;
 }
+
Index: server/core.h
===================================================================
--- server/core.h	(revision 1874370)
+++ server/core.h	(working copy)
@@ -26,11 +26,22 @@
 #define CORE_H
 
 /**
+ * @brief States for the ap_process_connection function
+ */
+typedef enum __attribute__((__packed__)) ap_process_connection_e {
+  STATE_UPDATE_VHOST_GIVEN_IP = 0,
+  STATE_RUN_PRE_CONNECTION,
+  STATE_RUN_PROCESS_CONNECTION
+} ap_process_connection_e;
+
+/**
  * @brief A structure to contain connection state information
  */
 typedef struct conn_config_t {
     /** Socket belonging to the connection */
     apr_socket_t *socket;
+    /** State of ap_process_connection() */
+    ap_process_connection_e process_connection_st;
 } conn_config_t;
 
 /**
Index: server/mpm/event/event.c
===================================================================
--- server/mpm/event/event.c	(revision 1874370)
+++ server/mpm/event/event.c	(working copy)
@@ -221,6 +221,16 @@
 typedef struct event_conn_state_t event_conn_state_t;
 
 /*
+ * States for the process_socket function
+ */
+typedef enum __attribute__((__packed__)) process_socket_e {
+  STATE_PROCESS_SOCKET = 0,
+  STATE_UPDATE_VHOST_GIVEN_IP,
+  STATE_RUN_PRE_CONNECTION,
+  STATE_RUN_PROCESS_CONNECTION
+} process_socket_e;
+
+/*
  * The chain of connections to be shutdown by a worker thread (deferred),
  * linked list updated atomically.
  */
@@ -254,6 +264,8 @@
     conn_state_t pub;
     /** chaining in defer_linger_chain */
     struct event_conn_state_t *chain;
+    /** state of process_socket() */
+    process_socket_e process_socket_st;
 };
 
 APR_RING_HEAD(timeout_head_t, event_conn_state_t);
@@ -725,6 +737,9 @@
     case AP_MPMQ_CAN_POLL:
         *result = 1;
         break;
+    case AP_MPMQ_CAN_AGAIN:
+        *result = 1;
+        break;
     default:
         *rv = APR_ENOTIMPL;
         break;
@@ -997,6 +1012,9 @@
     apr_status_t rv;
     int rc = OK;
 
+switch (cs->process_socket_st) {
+case STATE_PROCESS_SOCKET:
+
     if (cs == NULL) {           /* This is a new connection */
         listener_poll_type *pt = apr_pcalloc(p, sizeof(*pt));
         cs = apr_pcalloc(p, sizeof(event_conn_state_t));
@@ -1027,9 +1045,18 @@
         apr_pool_pre_cleanup_register(p, cs, ptrans_pre_cleanup);
         TO_QUEUE_ELEM_INIT(cs);
 
+cs->process_socket_st = STATE_UPDATE_VHOST_GIVEN_IP;
+case STATE_UPDATE_VHOST_GIVEN_IP:
+
         ap_update_vhost_given_ip(c);
 
+cs->process_socket_st = STATE_RUN_PRE_CONNECTION;
+case STATE_RUN_PRE_CONNECTION:
+
         rc = ap_run_pre_connection(c, sock);
+        if (rc == AGAIN) {
+            return;
+        }
         if (rc != OK && rc != DONE) {
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(00469)
                           "process_socket: connection aborted");
@@ -1084,7 +1111,13 @@
             if (clogging) {
                 apr_atomic_inc32(&clogged_count);
             }
+
+cs->process_socket_st = STATE_RUN_PROCESS_CONNECTION;
+case STATE_RUN_PROCESS_CONNECTION:
             rc = ap_run_process_connection(c);
+            if (rc == AGAIN) {
+                return;
+            }
             if (clogging) {
                 apr_atomic_dec32(&clogged_count);
             }
@@ -1239,6 +1272,7 @@
         else {
             apr_thread_mutex_unlock(timeout_mutex);
         }
+cs->process_socket_st = STATE_PROCESS_SOCKET;
         return;
     }
 
@@ -1246,6 +1280,7 @@
         cs->c->suspended_baton = cs;
         apr_atomic_inc32(&suspended_count);
         notify_suspend(cs);
+cs->process_socket_st = STATE_PROCESS_SOCKET;
         return;
     }
 
@@ -1256,6 +1291,10 @@
                      cs->pub.state == CONN_STATE_LINGER_SHORT)) {
         process_lingering_close(cs);
     }
+
+}; /* end of switch */
+
+cs->process_socket_st = STATE_PROCESS_SOCKET;
 }
 
 /* Put a SUSPENDED connection back into a queue. */