You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by pq...@apache.org on 2008/04/09 09:21:18 UTC

svn commit: r646206 - in /httpd/sandbox/amsterdam/d: build/build-modules-c.awk include/http_core.h server/Makefile.in server/serf_filters.c

Author: pquerna
Date: Wed Apr  9 00:21:01 2008
New Revision: 646206

URL: http://svn.apache.org/viewvc?rev=646206&view=rev
Log:
Move the core serf filters to the core.

Added:
    httpd/sandbox/amsterdam/d/server/serf_filters.c
Modified:
    httpd/sandbox/amsterdam/d/build/build-modules-c.awk
    httpd/sandbox/amsterdam/d/include/http_core.h
    httpd/sandbox/amsterdam/d/server/Makefile.in

Modified: httpd/sandbox/amsterdam/d/build/build-modules-c.awk
URL: http://svn.apache.org/viewvc/httpd/sandbox/amsterdam/d/build/build-modules-c.awk?rev=646206&r1=646205&r2=646206&view=diff
==============================================================================
--- httpd/sandbox/amsterdam/d/build/build-modules-c.awk (original)
+++ httpd/sandbox/amsterdam/d/build/build-modules-c.awk Wed Apr  9 00:21:01 2008
@@ -16,6 +16,8 @@
     RS = " "
     modules[n++] = "core"
     pmodules[pn++] = "core"
+    modules[n++] = "serf_filters"
+    pmodules[pn++] = "serf_filters"
 } 
 {
     modules[n] = $1;

Modified: httpd/sandbox/amsterdam/d/include/http_core.h
URL: http://svn.apache.org/viewvc/httpd/sandbox/amsterdam/d/include/http_core.h?rev=646206&r1=646205&r2=646206&view=diff
==============================================================================
--- httpd/sandbox/amsterdam/d/include/http_core.h (original)
+++ httpd/sandbox/amsterdam/d/include/http_core.h Wed Apr  9 00:21:01 2008
@@ -315,6 +315,7 @@
  * the code that cares really is in http_core.c.  Also, another accessor.
  */
 AP_DECLARE_DATA extern module core_module;
+AP_DECLARE_DATA extern module serf_filters_module;
 
 /**
  * @brief  Per-request configuration 

Modified: httpd/sandbox/amsterdam/d/server/Makefile.in
URL: http://svn.apache.org/viewvc/httpd/sandbox/amsterdam/d/server/Makefile.in?rev=646206&r1=646205&r2=646206&view=diff
==============================================================================
--- httpd/sandbox/amsterdam/d/server/Makefile.in (original)
+++ httpd/sandbox/amsterdam/d/server/Makefile.in Wed Apr  9 00:21:01 2008
@@ -14,7 +14,7 @@
 	mpm_common.c util_charset.c util_cookies.c util_debug.c util_xml.c \
 	util_expr.c util_filter.c util_pcre.c exports.c \
 	scoreboard.c error_bucket.c protocol.c core.c request.c provider.c \
-	eoc_bucket.c eor_bucket.c core_filters.c
+	eoc_bucket.c eor_bucket.c core_filters.c serf_filters.c
 
 TARGETS = delete-exports $(LTLIBRARY_NAME) $(CORE_IMPLIB_FILE) export_vars.h httpd.exp
 

Added: httpd/sandbox/amsterdam/d/server/serf_filters.c
URL: http://svn.apache.org/viewvc/httpd/sandbox/amsterdam/d/server/serf_filters.c?rev=646206&view=auto
==============================================================================
--- httpd/sandbox/amsterdam/d/server/serf_filters.c (added)
+++ httpd/sandbox/amsterdam/d/server/serf_filters.c Wed Apr  9 00:21:01 2008
@@ -0,0 +1,376 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "httpd.h"
+#include "http_core.h"
+#include "http_config.h"
+#include "http_protocol.h"
+#include "http_request.h"
+#include "http_log.h"
+
+#include "serf.h"
+#include "serf_bucket_types.h"
+#include "serf_bucket_util.h"
+#include "apr_uri.h"
+
+typedef struct {
+    serf_context_t *serf_ctx;
+    serf_bucket_alloc_t *serf_bkt_alloc;
+    serf_bucket_t *serf_in_bucket;
+    serf_bucket_t *serf_out_bucket;
+    apr_bucket_brigade *out_brigade;
+    apr_bucket_brigade *tmp_brigade;
+    apr_status_t serf_bucket_status;
+} serf_core_ctx_t;
+
+typedef struct {
+    apr_pool_t *pool;
+    serf_bucket_alloc_t *allocator;
+    serf_core_ctx_t *core_ctx;
+    apr_bucket_brigade *bb;
+    apr_bucket_brigade *tmp_bb;
+    apr_size_t last_read;
+} brigade_bucket_ctx_t;
+
+/* Forward-declare */
+const serf_bucket_type_t serf_bucket_type_brigade;
+
+static serf_bucket_t * brigade_create(ap_filter_t *f,
+                                      apr_bucket_brigade *bb,
+                                      serf_core_ctx_t *core_ctx)
+{
+    brigade_bucket_ctx_t *ctx;
+
+    ctx = serf_bucket_mem_alloc(core_ctx->serf_bkt_alloc, sizeof(*ctx));
+    ctx->allocator = core_ctx->serf_bkt_alloc;
+    ctx->pool = serf_bucket_allocator_get_pool(ctx->allocator);
+    ctx->core_ctx = core_ctx;
+    ctx->last_read = 0;
+
+    /* Replace witha more optimized mechanism for converting */
+    ap_save_brigade(f, &ctx->bb, &bb, f->c->pool);
+    ctx->tmp_bb = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
+
+    return serf_bucket_create(&serf_bucket_type_brigade, ctx->allocator, ctx);
+}
+
+static void brigade_collapse(serf_bucket_t *bucket)
+{
+    brigade_bucket_ctx_t *ctx = bucket->data;
+    apr_status_t status;
+
+    if (ctx->last_read) {
+        apr_bucket *end, *f;
+        status = apr_brigade_partition(ctx->bb, ctx->last_read, &end);
+        f = APR_BRIGADE_FIRST(ctx->bb);
+        while (f != end && f != APR_BRIGADE_SENTINEL(ctx->bb)) {
+            apr_bucket_delete(f);
+            f = APR_BRIGADE_FIRST(ctx->bb);
+        }
+        ctx->last_read = 0;
+    }
+}
+
+static apr_status_t brigade_read(serf_bucket_t *bucket,
+                                 apr_size_t requested,
+                                 const char **data, apr_size_t *len)
+{
+    brigade_bucket_ctx_t *ctx = bucket->data;
+    apr_status_t status;
+
+    brigade_collapse(bucket);
+
+    if (APR_BRIGADE_EMPTY(ctx->bb) ||
+        APR_BUCKET_IS_EOS(APR_BRIGADE_FIRST(ctx->bb)) ||
+        AP_BUCKET_IS_EOR(APR_BRIGADE_FIRST(ctx->bb))) {
+        *len = 0;
+        return APR_EOF;
+    }
+
+    status = apr_bucket_read(APR_BRIGADE_FIRST(ctx->bb), data, len,
+                             APR_BLOCK_READ);
+
+    if (requested < *len) {
+        *len = requested;
+    }
+    ctx->last_read = *len;
+
+    return status;
+}
+
+static apr_status_t brigade_readline(serf_bucket_t *bucket,
+                                     int acceptable, int *found,
+                                     const char **data, apr_size_t *len)
+{
+    brigade_bucket_ctx_t *ctx = bucket->data;
+    apr_status_t orig_status, status;
+
+    orig_status = apr_brigade_split_line(ctx->tmp_bb, ctx->bb,
+                                         APR_BLOCK_READ, HUGE_STRING_LEN);
+    if (APR_STATUS_IS_EAGAIN(orig_status)) {
+        if (found) {
+            *found = SERF_NEWLINE_NONE;
+        }
+        status = APR_SUCCESS;
+    }
+    else {
+        status = orig_status;
+    }
+
+    apr_brigade_pflatten(ctx->tmp_bb, (char**)data, len, ctx->pool);
+    apr_brigade_cleanup(ctx->tmp_bb);
+
+    if (orig_status == APR_SUCCESS && found) {
+        if (*len > 2 && (*data)[*len-2] == '\r' && (*data)[*len-1] == '\n') {
+            *found = SERF_NEWLINE_CRLF;
+        }
+        else if (*len && (*data)[*len-1] == '\r') {
+            *found = SERF_NEWLINE_CR;
+        } else if (*len && (*data)[*len-1] == '\n') {
+            *found = SERF_NEWLINE_LF;
+        } else {
+            *found = SERF_NEWLINE_NONE;
+        }
+    }
+    return status;
+}
+
+static apr_status_t brigade_peek(serf_bucket_t *bucket,
+                                 const char **data,
+                                 apr_size_t *len)
+{
+    brigade_bucket_ctx_t *ctx = bucket->data;
+
+    brigade_collapse(bucket);
+
+    if (APR_BRIGADE_EMPTY(ctx->bb) ||
+        APR_BUCKET_IS_EOS(APR_BRIGADE_FIRST(ctx->bb)) ||
+        AP_BUCKET_IS_EOR(APR_BRIGADE_FIRST(ctx->bb))) {
+        *len = 0;
+        return APR_EOF;
+    }
+
+    return apr_bucket_read(APR_BRIGADE_FIRST(ctx->bb), data, len,
+                           APR_BLOCK_READ);
+}
+
+static void brigade_destroy(serf_bucket_t *bucket)
+{
+    brigade_bucket_ctx_t *ctx = bucket->data;
+
+    apr_brigade_destroy(ctx->bb);
+    apr_brigade_destroy(ctx->tmp_bb);
+
+    serf_default_destroy_and_data(bucket);
+}
+
+const serf_bucket_type_t serf_bucket_type_brigade = {
+    "BRIGADE",
+    brigade_read,
+    brigade_readline,
+    serf_default_read_iovec,
+    serf_default_read_for_sendfile,
+    serf_default_read_bucket,
+    brigade_peek,
+    brigade_destroy,
+};
+
+static serf_core_ctx_t* init_ctx(ap_filter_t *f, apr_socket_t *socket)
+{
+    serf_core_ctx_t *ctx;
+
+    ctx = apr_pcalloc(f->c->pool, sizeof(*ctx));
+
+    ctx->serf_ctx = serf_context_create(f->c->pool);
+    ctx->serf_bkt_alloc = serf_bucket_allocator_create(f->c->pool, NULL, NULL);
+    ctx->serf_in_bucket = serf_bucket_socket_create(socket,
+                                                    ctx->serf_bkt_alloc);
+    ctx->serf_out_bucket = serf_bucket_aggregate_create(ctx->serf_bkt_alloc);
+
+    ctx->out_brigade = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
+    ctx->tmp_brigade = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
+
+    return ctx;
+}
+
+static int serf_input_filter(ap_filter_t *f, apr_bucket_brigade *bb,
+                             ap_input_mode_t mode, apr_read_type_e block,
+                             apr_off_t readbytes)
+{
+    apr_status_t status;
+    core_net_rec *net = f->ctx;
+    serf_core_ctx_t *ctx = (serf_core_ctx_t*)net->in_ctx;
+
+    if (mode == AP_MODE_INIT) {
+        return APR_SUCCESS;
+    }
+    if (!ctx)
+    {
+        ctx = init_ctx(f, net->client_socket);
+        net->in_ctx = (void*)ctx;
+    }
+
+    if (mode == AP_MODE_GETLINE) {
+        const char *data;
+        apr_size_t len;
+        int found;
+        apr_bucket *b;
+
+        ctx->serf_bucket_status = serf_bucket_readline(ctx->serf_in_bucket,
+                                                       SERF_NEWLINE_ANY,
+                                                       &found, &data, &len);
+        b = apr_bucket_transient_create(data, len, f->c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(bb, b);
+        return APR_SUCCESS;
+    }
+    if (mode == AP_MODE_READBYTES) {
+        const char *data;
+        apr_size_t len;
+        apr_bucket *b;
+
+        ctx->serf_bucket_status = serf_bucket_read(ctx->serf_in_bucket,
+                                                   readbytes, &data, &len);
+        b = apr_bucket_transient_create(data, len, f->c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(bb, b);
+        return APR_SUCCESS;
+    }
+
+    if (mode == AP_MODE_SPECULATIVE) {
+        const char *data;
+        apr_size_t len;
+        apr_bucket *b;
+        serf_bucket_t *sb;
+
+        ctx->serf_bucket_status = serf_bucket_peek(ctx->serf_in_bucket,
+                                                   &data, &len);
+
+        b = apr_bucket_transient_create(data, len, f->c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(bb, b);
+        return APR_SUCCESS;
+    }
+
+    if (mode == AP_MODE_EATCRLF || mode == AP_MODE_EXHAUSTIVE) {
+        abort();
+    }
+}
+
+static apr_status_t serf_output_filter(ap_filter_t *f,
+                                       apr_bucket_brigade *new_bb)
+{
+    apr_status_t rv;
+    serf_bucket_t *b;
+    conn_rec *c = f->c;
+    core_net_rec *net = f->ctx;
+    serf_core_ctx_t *ctx = (serf_core_ctx_t*)net->in_ctx;
+    if (!ctx) {
+        ctx = init_ctx(f, net->client_socket);
+        net->in_ctx = (void*)ctx;
+    }
+
+    if (new_bb) {
+        b = brigade_create(f, new_bb, ctx);
+        serf_bucket_aggregate_append(ctx->serf_out_bucket, b);
+        c->data_in_output_filters = 1;
+    }
+
+    if (c->data_in_output_filters && new_bb == NULL) {
+        do {
+            apr_status_t srv;
+            const char *buf;
+            apr_size_t len = 0;
+
+            srv = serf_bucket_read(ctx->serf_out_bucket, SERF_READ_ALL_AVAIL,
+                                   &buf, &len);
+
+            if (SERF_BUCKET_READ_ERROR(srv)) {
+                /* bad, logme */
+                return srv;
+            }
+
+            /* write data to network here. */
+            if (len > 0) {
+                apr_size_t blen = len;
+                rv = apr_socket_send(net->client_socket, buf, &blen);
+                if (blen != len) {
+                    b = serf_bucket_simple_create(buf+blen, len - blen, NULL, NULL, ctx->serf_bkt_alloc);
+                    serf_bucket_aggregate_prepend(ctx->serf_out_bucket, b);
+                    srv = APR_SUCCESS;
+                }
+            }
+
+            if (APR_STATUS_IS_EOF(srv)) {
+                c->data_in_output_filters = 0;
+                break;
+            }
+            if (APR_STATUS_IS_EAGAIN(srv)) {
+                break;
+            }
+        } while (rv == APR_SUCCESS);
+    }
+
+    return APR_SUCCESS;
+}
+
+static ap_filter_rec_t *serf_input_filter_handle;
+static ap_filter_rec_t *serf_output_filter_handle;
+
+static int serf_pre_connection(conn_rec *c, void *csd)
+{
+    core_net_rec *net = apr_palloc(c->pool, sizeof(*net));
+    apr_status_t status;
+
+    apr_socket_timeout_set(csd, 0);
+
+    net->c = c;
+    net->in_ctx = NULL;
+    net->out_ctx = NULL;
+    net->client_socket = csd;
+
+    ap_set_module_config(net->c->conn_config, &serf_filters_module, csd);
+    ap_add_input_filter_handle(serf_input_filter_handle, net, NULL, net->c);
+    ap_add_output_filter_handle(serf_output_filter_handle, net, NULL, net->c);
+
+    return DONE;
+}
+
+static const command_rec serf_cmds[] =
+{
+    {NULL}
+};
+
+static void register_hooks(apr_pool_t *p)
+{
+    ap_hook_pre_connection(serf_pre_connection, NULL, NULL, APR_HOOK_MIDDLE);
+    serf_input_filter_handle =
+        ap_register_input_filter("SERF_IN", serf_input_filter, NULL,
+                                 AP_FTYPE_NETWORK);
+    serf_output_filter_handle =
+        ap_register_output_filter("SERF_OUT", serf_output_filter, NULL,
+                                  AP_FTYPE_NETWORK);
+
+}
+
+module AP_MODULE_DECLARE_DATA serf_filters_module =
+{
+    STANDARD20_MODULE_STUFF,
+    NULL,
+    NULL,
+    NULL,
+    NULL,
+    serf_cmds,
+    register_hooks
+};
+



Re: svn commit: r646206 - in /httpd/sandbox/amsterdam/d: build/build-modules-c.awk include/http_core.h server/Makefile.in server/serf_filters.c

Posted by Ruediger Pluem <rp...@apache.org>.
On 09.04.2008 09:21, pquerna@apache.org wrote:
> Author: pquerna
> Date: Wed Apr  9 00:21:01 2008
> New Revision: 646206
> 
> URL: http://svn.apache.org/viewvc?rev=646206&view=rev
> Log:
> Move the core serf filters to the core.
> 
> Added:
>     httpd/sandbox/amsterdam/d/server/serf_filters.c
> Modified:
>     httpd/sandbox/amsterdam/d/build/build-modules-c.awk
>     httpd/sandbox/amsterdam/d/include/http_core.h
>     httpd/sandbox/amsterdam/d/server/Makefile.in
> 

> Added: httpd/sandbox/amsterdam/d/server/serf_filters.c
> URL: http://svn.apache.org/viewvc/httpd/sandbox/amsterdam/d/server/serf_filters.c?rev=646206&view=auto
> ==============================================================================
> --- httpd/sandbox/amsterdam/d/server/serf_filters.c (added)
> +++ httpd/sandbox/amsterdam/d/server/serf_filters.c Wed Apr  9 00:21:01 2008

> +
> +static int serf_input_filter(ap_filter_t *f, apr_bucket_brigade *bb,
> +                             ap_input_mode_t mode, apr_read_type_e block,
> +                             apr_off_t readbytes)
> +{
> +    apr_status_t status;
> +    core_net_rec *net = f->ctx;
> +    serf_core_ctx_t *ctx = (serf_core_ctx_t*)net->in_ctx;
> +
> +    if (mode == AP_MODE_INIT) {
> +        return APR_SUCCESS;
> +    }
> +    if (!ctx)
> +    {
> +        ctx = init_ctx(f, net->client_socket);
> +        net->in_ctx = (void*)ctx;
> +    }
> +
> +    if (mode == AP_MODE_GETLINE) {
> +        const char *data;
> +        apr_size_t len;
> +        int found;
> +        apr_bucket *b;
> +
> +        ctx->serf_bucket_status = serf_bucket_readline(ctx->serf_in_bucket,
> +                                                       SERF_NEWLINE_ANY,
> +                                                       &found, &data, &len);
> +        b = apr_bucket_transient_create(data, len, f->c->bucket_alloc);
> +        APR_BRIGADE_INSERT_TAIL(bb, b);
> +        return APR_SUCCESS;

Why returning APR_SUCCESS here in all cases? What if the connection to the 
client got broken?

> +    }
> +    if (mode == AP_MODE_READBYTES) {
> +        const char *data;
> +        apr_size_t len;
> +        apr_bucket *b;
> +
> +        ctx->serf_bucket_status = serf_bucket_read(ctx->serf_in_bucket,
> +                                                   readbytes, &data, &len);
> +        b = apr_bucket_transient_create(data, len, f->c->bucket_alloc);
> +        APR_BRIGADE_INSERT_TAIL(bb, b);
> +        return APR_SUCCESS;

Same as above.

> +    }
> +
> +    if (mode == AP_MODE_SPECULATIVE) {
> +        const char *data;
> +        apr_size_t len;
> +        apr_bucket *b;
> +        serf_bucket_t *sb;
> +
> +        ctx->serf_bucket_status = serf_bucket_peek(ctx->serf_in_bucket,
> +                                                   &data, &len);
> +
> +        b = apr_bucket_transient_create(data, len, f->c->bucket_alloc);
> +        APR_BRIGADE_INSERT_TAIL(bb, b);
> +        return APR_SUCCESS;

Same as above.

> +    }
> +
> +    if (mode == AP_MODE_EATCRLF || mode == AP_MODE_EXHAUSTIVE) {
> +        abort();

Looks like a TODO here :-).

> +    }
> +}
> +
> +static apr_status_t serf_output_filter(ap_filter_t *f,
> +                                       apr_bucket_brigade *new_bb)
> +{
> +    apr_status_t rv;
> +    serf_bucket_t *b;
> +    conn_rec *c = f->c;
> +    core_net_rec *net = f->ctx;
> +    serf_core_ctx_t *ctx = (serf_core_ctx_t*)net->in_ctx;
> +    if (!ctx) {
> +        ctx = init_ctx(f, net->client_socket);
> +        net->in_ctx = (void*)ctx;
> +    }
> +
> +    if (new_bb) {
> +        b = brigade_create(f, new_bb, ctx);
> +        serf_bucket_aggregate_append(ctx->serf_out_bucket, b);
> +        c->data_in_output_filters = 1;
> +    }
> +
> +    if (c->data_in_output_filters && new_bb == NULL) {

How do we get in the situation where new_bb is NULL?

> +        do {
> +            apr_status_t srv;
> +            const char *buf;
> +            apr_size_t len = 0;
> +
> +            srv = serf_bucket_read(ctx->serf_out_bucket, SERF_READ_ALL_AVAIL,
> +                                   &buf, &len);
> +
> +            if (SERF_BUCKET_READ_ERROR(srv)) {
> +                /* bad, logme */
> +                return srv;
> +            }
> +
> +            /* write data to network here. */
> +            if (len > 0) {
> +                apr_size_t blen = len;
> +                rv = apr_socket_send(net->client_socket, buf, &blen);
> +                if (blen != len) {
> +                    b = serf_bucket_simple_create(buf+blen, len - blen, NULL, NULL, ctx->serf_bkt_alloc);
> +                    serf_bucket_aggregate_prepend(ctx->serf_out_bucket, b);
> +                    srv = APR_SUCCESS;
> +                }
> +            }
> +
> +            if (APR_STATUS_IS_EOF(srv)) {
> +                c->data_in_output_filters = 0;
> +                break;
> +            }
> +            if (APR_STATUS_IS_EAGAIN(srv)) {
> +                break;
> +            }
> +        } while (rv == APR_SUCCESS);
> +    }
> +
> +    return APR_SUCCESS;

Is it really ok to return APR_SUCCESS in all cases?

Regards

RĂ¼diger