You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by je...@apache.org on 2007/11/14 20:59:06 UTC

svn commit: r595028 - /httpd/sandbox/amsterdam/d/modules/proxy/mod_serf.c

Author: jerenkrantz
Date: Wed Nov 14 11:59:05 2007
New Revision: 595028

URL: http://svn.apache.org/viewvc?rev=595028&view=rev
Log:
Amsterdam sandbox: add serf input/output filters that replace the core filters.

Modified:
    httpd/sandbox/amsterdam/d/modules/proxy/mod_serf.c

Modified: httpd/sandbox/amsterdam/d/modules/proxy/mod_serf.c
URL: http://svn.apache.org/viewvc/httpd/sandbox/amsterdam/d/modules/proxy/mod_serf.c?rev=595028&r1=595027&r2=595028&view=diff
==============================================================================
--- httpd/sandbox/amsterdam/d/modules/proxy/mod_serf.c (original)
+++ httpd/sandbox/amsterdam/d/modules/proxy/mod_serf.c Wed Nov 14 11:59:05 2007
@@ -21,6 +21,8 @@
 #include "http_log.h"
 
 #include "serf.h"
+#include "serf_bucket_types.h"
+#include "serf_bucket_util.h"
 #include "apr_uri.h"
 
 module AP_MODULE_DECLARE_DATA serf_module;
@@ -370,6 +372,207 @@
     return baton.rstatus;
 }
 
+typedef struct {
+    serf_context_t *serf_ctx;
+    serf_bucket_alloc_t *serf_bkt_alloc;
+    serf_bucket_t *serf_in_bucket;
+    serf_bucket_t *serf_out_bucket;
+    apr_bucket_brigade *out_brigade;
+    apr_bucket_brigade *tmp_brigade;
+    apr_status_t serf_bucket_status;
+} serf_core_ctx_t;
+
+typedef struct {
+    apr_pool_t *pool;
+    serf_bucket_alloc_t *allocator;
+    serf_core_ctx_t *core_ctx;
+    apr_bucket_brigade *bb;
+    apr_bucket_brigade *tmp_bb;
+} brigade_bucket_ctx_t;
+
+/* Forward-declare */
+const serf_bucket_type_t serf_bucket_type_brigade;
+
+static serf_bucket_t * brigade_create(ap_filter_t *f, serf_core_ctx_t *core_ctx)
+{
+    brigade_bucket_ctx_t *ctx;
+
+    ctx = serf_bucket_mem_alloc(core_ctx->serf_bkt_alloc, sizeof(*ctx));
+    ctx->allocator = core_ctx->serf_bkt_alloc;
+    ctx->pool = serf_bucket_allocator_get_pool(ctx->allocator);
+    ctx->core_ctx = core_ctx;
+    ctx->bb = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
+    ctx->tmp_bb = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
+
+    return serf_bucket_create(&serf_bucket_type_brigade, ctx->allocator, ctx);
+}
+
+static apr_status_t brigade_read(serf_bucket_t *bucket,
+                                 apr_size_t requested,
+                                 const char **data, apr_size_t *len)
+{
+    brigade_bucket_ctx_t *ctx = bucket->data;
+    apr_status_t status;
+    apr_bucket *b, *end, *f;
+
+    b = APR_BRIGADE_FIRST(ctx->bb);
+    status = apr_bucket_read(b, data, len, APR_BLOCK_READ);
+
+    if (requested < *len) {
+        *len = requested;
+    }
+    status = apr_brigade_partition(ctx->bb, *len, &end);
+    f = APR_BRIGADE_FIRST(ctx->bb);
+    while (f != end && f != APR_BRIGADE_SENTINEL(ctx->bb)) {
+        apr_bucket_delete(f);
+        f = APR_BRIGADE_FIRST(ctx->bb);
+    }
+    return status;
+}
+
+static apr_status_t brigade_readline(serf_bucket_t *bucket,
+                                     int acceptable, int *found,
+                                     const char **data, apr_size_t *len)
+{
+    brigade_bucket_ctx_t *ctx = bucket->data;
+    apr_status_t status;
+
+    status = apr_brigade_split_line(ctx->tmp_bb, ctx->bb,
+                                    APR_BLOCK_READ, HUGE_STRING_LEN);
+    if (APR_STATUS_IS_EAGAIN(status)) {
+        if (found) {
+            *found = SERF_NEWLINE_NONE;
+        }
+        status = APR_SUCCESS;
+    }
+    apr_brigade_pflatten(ctx->bb, data, len, ctx->pool);
+    return status;
+}
+
+static apr_status_t brigade_peek(serf_bucket_t *bucket,
+                                 const char **data,
+                                 apr_size_t *len)
+{
+    return APR_ENOTIMPL;
+}
+
+static void brigade_destroy(serf_bucket_t *bucket)
+{
+    serf_default_destroy_and_data(bucket);
+}
+
+const serf_bucket_type_t serf_bucket_type_brigade = {
+    brigade_read,
+    brigade_readline,
+    serf_default_read_iovec,
+    serf_default_read_for_sendfile,
+    serf_default_read_bucket,
+    brigade_peek,
+    brigade_destroy,
+};
+
+static serf_core_ctx_t* init_ctx(ap_filter_t *f, apr_socket_t *socket)
+{
+    serf_core_ctx_t *ctx;
+
+    ctx = apr_pcalloc(f->c->pool, sizeof(*ctx));
+
+    ctx->serf_ctx = serf_context_create(f->c->pool);
+    ctx->serf_bkt_alloc = serf_bucket_allocator_create(f->c->pool, NULL, NULL);
+    ctx->serf_in_bucket = serf_bucket_socket_create(socket,
+                                                    ctx->serf_bkt_alloc);
+    ctx->serf_out_bucket = serf_bucket_aggregate_create(ctx->serf_bkt_alloc);
+
+    ctx->out_brigade = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
+    ctx->tmp_brigade = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
+
+    return ctx;
+}
+
+static int serf_input_filter(ap_filter_t *f, apr_bucket_brigade *bb,
+                             ap_input_mode_t mode, apr_read_type_e block,
+                             apr_off_t readbytes)
+{
+    apr_status_t status;
+    core_net_rec *net = f->ctx;
+    serf_core_ctx_t *ctx = (serf_core_ctx_t*)net->in_ctx;
+
+    if (mode == AP_MODE_INIT) {
+        return APR_SUCCESS;
+    }
+    if (!ctx)
+    {
+        ctx = init_ctx(f, net->client_socket);
+    }
+
+    if (mode == AP_MODE_GETLINE) {
+        const char *data;
+        apr_size_t len;
+        int found;
+        apr_bucket *b;
+
+        ctx->serf_bucket_status = serf_bucket_readline(ctx->serf_in_bucket,
+                                                       SERF_NEWLINE_ANY,
+                                                       &found, &data, &len);
+        b = apr_bucket_transient_create(data, len, f->c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(bb, b);
+        return APR_SUCCESS;
+    }
+    if (mode == AP_MODE_READBYTES) {
+        const char *data;
+        apr_size_t len;
+        apr_bucket *b;
+
+        ctx->serf_bucket_status = serf_bucket_read(ctx->serf_in_bucket,
+                                                   readbytes, &data, &len);
+        b = apr_bucket_transient_create(data, len, f->c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(bb, b);
+        return APR_SUCCESS;
+    }
+
+    if (mode == AP_MODE_EATCRLF || mode == AP_MODE_EXHAUSTIVE ||
+        mode == AP_MODE_SPECULATIVE) {
+        abort();
+    }
+}
+
+static apr_status_t serf_output_filter(ap_filter_t *f,
+                                       apr_bucket_brigade *new_bb)
+{
+    conn_rec *c = f->c;
+    core_net_rec *net = f->ctx;
+    serf_core_ctx_t *ctx = (serf_core_ctx_t*)net->in_ctx;
+    if (!ctx) {
+        ctx = init_ctx(f, net->client_socket);
+    }
+
+    ap_save_brigade(f, &ctx->tmp_brigade, &new_bb, c->pool);
+    apr_brigade_destroy(new_bb);
+    APR_BRIGADE_CONCAT(ctx->out_brigade, ctx->tmp_brigade);
+
+    return APR_SUCCESS;
+}
+
+static ap_filter_rec_t *serf_input_filter_handle;
+static ap_filter_rec_t *serf_output_filter_handle;
+
+static int serf_pre_connection(conn_rec *c, void *csd)
+{
+    core_net_rec *net = apr_palloc(c->pool, sizeof(*net));
+    apr_status_t status;
+
+    net->c = c;
+    net->in_ctx = NULL;
+    net->out_ctx = NULL;
+    net->client_socket = csd;
+
+    ap_set_module_config(net->c->conn_config, &serf_module, csd);
+    ap_add_input_filter_handle(serf_input_filter_handle, net, NULL, net->c);
+    ap_add_output_filter_handle(serf_output_filter_handle, net, NULL, net->c);
+
+    return DONE;
+}
+
 static int serf_handler(request_rec *r)
 {
     serf_config_rec *conf = ap_get_module_config(r->per_dir_config,
@@ -424,7 +627,16 @@
 
 static void register_hooks(apr_pool_t *p)
 {
+    ap_hook_pre_connection(serf_pre_connection, NULL, NULL, APR_HOOK_MIDDLE);
     ap_hook_handler(serf_handler, NULL, NULL, APR_HOOK_FIRST);
+
+    serf_input_filter_handle =
+        ap_register_input_filter("SERF_IN", serf_input_filter, NULL,
+                                 AP_FTYPE_NETWORK);
+    serf_output_filter_handle =
+        ap_register_output_filter("SERF_OUT", serf_output_filter, NULL,
+                                  AP_FTYPE_NETWORK);
+
 }
 
 module AP_MODULE_DECLARE_DATA serf_module =



Re: svn commit: r595028 - /httpd/sandbox/amsterdam/d/modules/proxy/mod_serf.c

Posted by Justin Erenkrantz <ju...@erenkrantz.com>.
On Nov 14, 2007 4:18 PM, Ruediger Pluem <rp...@apache.org> wrote:
> > +    b = APR_BRIGADE_FIRST(ctx->bb);
> > +    status = apr_bucket_read(b, data, len, APR_BLOCK_READ);
>
> Isn't it dangerous that we do not copy *data here?
> Doesn't this data get lost when we delete the bucket in the while loop below?

Yup.  Should be addressed in r595070.

> How do we set *found if the status is not EAGAIN?

Doh.  r595077.

> > +    apr_brigade_pflatten(ctx->bb, data, len, ctx->pool);
>
> Shouldn't this be ctx->tmp_bb?
> Shouldn't we call apr_brigade_cleanup(ctx->tmp_bb) here?

*don paper bag* r595079.

Thanks.  -- justin

Re: svn commit: r595028 - /httpd/sandbox/amsterdam/d/modules/proxy/mod_serf.c

Posted by Ruediger Pluem <rp...@apache.org>.

On 11/14/2007 08:59 PM, jerenkrantz@apache.org wrote:
> Author: jerenkrantz
> Date: Wed Nov 14 11:59:05 2007
> New Revision: 595028
> 
> URL: http://svn.apache.org/viewvc?rev=595028&view=rev
> Log:
> Amsterdam sandbox: add serf input/output filters that replace the core filters.
> 
> Modified:
>     httpd/sandbox/amsterdam/d/modules/proxy/mod_serf.c
> 
> Modified: httpd/sandbox/amsterdam/d/modules/proxy/mod_serf.c
> URL: http://svn.apache.org/viewvc/httpd/sandbox/amsterdam/d/modules/proxy/mod_serf.c?rev=595028&r1=595027&r2=595028&view=diff
> ==============================================================================
> --- httpd/sandbox/amsterdam/d/modules/proxy/mod_serf.c (original)
> +++ httpd/sandbox/amsterdam/d/modules/proxy/mod_serf.c Wed Nov 14 11:59:05 2007

> @@ -370,6 +372,207 @@
>      return baton.rstatus;
>  }
>  
> +typedef struct {
> +    serf_context_t *serf_ctx;
> +    serf_bucket_alloc_t *serf_bkt_alloc;
> +    serf_bucket_t *serf_in_bucket;
> +    serf_bucket_t *serf_out_bucket;
> +    apr_bucket_brigade *out_brigade;
> +    apr_bucket_brigade *tmp_brigade;
> +    apr_status_t serf_bucket_status;
> +} serf_core_ctx_t;
> +
> +typedef struct {
> +    apr_pool_t *pool;
> +    serf_bucket_alloc_t *allocator;
> +    serf_core_ctx_t *core_ctx;
> +    apr_bucket_brigade *bb;
> +    apr_bucket_brigade *tmp_bb;
> +} brigade_bucket_ctx_t;
> +
> +/* Forward-declare */
> +const serf_bucket_type_t serf_bucket_type_brigade;
> +
> +static serf_bucket_t * brigade_create(ap_filter_t *f, serf_core_ctx_t *core_ctx)
> +{
> +    brigade_bucket_ctx_t *ctx;
> +
> +    ctx = serf_bucket_mem_alloc(core_ctx->serf_bkt_alloc, sizeof(*ctx));
> +    ctx->allocator = core_ctx->serf_bkt_alloc;
> +    ctx->pool = serf_bucket_allocator_get_pool(ctx->allocator);
> +    ctx->core_ctx = core_ctx;
> +    ctx->bb = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
> +    ctx->tmp_bb = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
> +
> +    return serf_bucket_create(&serf_bucket_type_brigade, ctx->allocator, ctx);
> +}
> +
> +static apr_status_t brigade_read(serf_bucket_t *bucket,
> +                                 apr_size_t requested,
> +                                 const char **data, apr_size_t *len)
> +{
> +    brigade_bucket_ctx_t *ctx = bucket->data;
> +    apr_status_t status;
> +    apr_bucket *b, *end, *f;
> +
> +    b = APR_BRIGADE_FIRST(ctx->bb);
> +    status = apr_bucket_read(b, data, len, APR_BLOCK_READ);

Isn't it dangerous that we do not copy *data here?
Doesn't this data get lost when we delete the bucket in the while loop below?

> +
> +    if (requested < *len) {
> +        *len = requested;
> +    }
> +    status = apr_brigade_partition(ctx->bb, *len, &end);
> +    f = APR_BRIGADE_FIRST(ctx->bb);
> +    while (f != end && f != APR_BRIGADE_SENTINEL(ctx->bb)) {
> +        apr_bucket_delete(f);
> +        f = APR_BRIGADE_FIRST(ctx->bb);
> +    }
> +    return status;
> +}
> +
> +static apr_status_t brigade_readline(serf_bucket_t *bucket,
> +                                     int acceptable, int *found,
> +                                     const char **data, apr_size_t *len)
> +{
> +    brigade_bucket_ctx_t *ctx = bucket->data;
> +    apr_status_t status;
> +
> +    status = apr_brigade_split_line(ctx->tmp_bb, ctx->bb,
> +                                    APR_BLOCK_READ, HUGE_STRING_LEN);
> +    if (APR_STATUS_IS_EAGAIN(status)) {
> +        if (found) {
> +            *found = SERF_NEWLINE_NONE;
> +        }

How do we set *found if the status is not EAGAIN?

> +        status = APR_SUCCESS;
> +    }
> +    apr_brigade_pflatten(ctx->bb, data, len, ctx->pool);

Shouldn't this be ctx->tmp_bb?
Shouldn't we call apr_brigade_cleanup(ctx->tmp_bb) here?

Regards

RĂ¼diger