You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apr.apache.org by Paul Querna <ch...@force-elite.com> on 2006/08/04 01:25:36 UTC

[PATCH] APR Memcache Multi-Get Support

Attached is a patch to add support for multiple parallel fetching of
keys from a memcache cluster.  It enables fetching of thousands of
values from multiple memcache nodes without any significant slow downs
for adding more nodes or keys.

The basic logic behind the main function, apr_memcache_multgetp, is
currently pounding away with thousands of concurrent queries per second,
without any problem.  I wouldn't call the function itself 'pretty', but
I believe the logic embedded in it is correct.

[[[
Add memcache multi-get support to apr_memcache.

* include/apr_memcache.h
    Add _value_t structure for holding an individual value from
    memcached.
    Add new functions: apr_memcache_add_multget_key,
    apr_memcache_multgetp.

* memcache/apr_memcache.c:
    Add a local baton structure for associating severs to queries.
    Add new functions: apr_memcache_add_multget_key,
    mget_conn_result, apr_memcache_multgetp

Submitted By: Rob Emanuele <rob.emanuele ask.com>, Paul Querna
<paul.querna ask.com>
]]]

Re: [PATCH] APR Memcache Multi-Get Support

Posted by Davi Arnaut <da...@haxent.com.br>.
Em 03/08/2006, às 20:25, Paul Querna escreveu:

> Attached is a patch to add support for multiple parallel fetching of
> keys from a memcache cluster.  It enables fetching of thousands of
> values from multiple memcache nodes without any significant slow downs
> for adding more nodes or keys.

Nice!

> The basic logic behind the main function, apr_memcache_multgetp, is
> currently pounding away with thousands of concurrent queries per  
> second,
> without any problem.  I wouldn't call the function itself 'pretty',  
> but
> I believe the logic embedded in it is correct.

This might be very interesting for a cache storage module that I'm  
working
on.

> [[[
> Add memcache multi-get support to apr_memcache.
>
> * include/apr_memcache.h
>     Add _value_t structure for holding an individual value from
>     memcached.
>     Add new functions: apr_memcache_add_multget_key,
>     apr_memcache_multgetp.
>
> * memcache/apr_memcache.c:
>     Add a local baton structure for associating severs to queries.
>     Add new functions: apr_memcache_add_multget_key,
>     mget_conn_result, apr_memcache_multgetp
>
> Submitted By: Rob Emanuele <rob.emanuele ask.com>, Paul Querna
> <paul.querna ask.com>
> ]]]
> Index: memcache/apr_memcache.c
> ===================================================================
> --- memcache/apr_memcache.c	(revision 428526)
> +++ memcache/apr_memcache.c	(working copy)
> @@ -17,6 +17,7 @@
>   */
>
>  #include "apr_memcache.h"
> +#include "apr_poll.h"
>  #include "apr_version.h"
>  #include <stdlib.h>
>
> @@ -38,6 +39,9 @@
>  #define MC_EOL "\r\n"
>  #define MC_EOL_LEN (sizeof(MC_EOL)-1)
>
> +#define MC_WS " "
> +#define MC_WS_LEN (sizeof(MC_WS)-1)
> +
>  #define MC_GET "get "
>  #define MC_GET_LEN (sizeof(MC_GET)-1)
>
> @@ -97,7 +101,16 @@
>  #define MS_END "END"
>  #define MS_END_LEN (sizeof(MS_END)-1)
>
> +/** Server and Query Structure for a multiple get */
> +struct cache_server_query_t {
> +    apr_memcache_server_t* ms;
> +    apr_memcache_conn_t* conn;
> +    struct iovec* query_vec;
> +    unsigned int query_vec_count;
> +};
>
> +#define MULT_GET_TIMEOUT 50000

Someone might want to modify this, function parameter ?

> +
>  static apr_status_t make_server_dead(apr_memcache_t *mc,  
> apr_memcache_server_t *ms)
>  {
>  #if APR_HAS_THREADS
> @@ -1021,6 +1034,334 @@
>  }
>
>
> +APR_DECLARE(void)
> +apr_memcache_add_multget_key(apr_pool_t *data_pool,
> +                             const char* key,
> +                             apr_hash_t **values)
> +{
> +    apr_memcache_value_t* value;
> +    int klen = strlen(key);

s/int/apr_size_t/

> +    // create the value hash if need be

C++ comment.

> +    if (!*values) {
> +        *values = apr_hash_make(data_pool);
> +    }
> +
> +    // init key and add it to the value hash

C++ comment.

> +    value = apr_pcalloc(data_pool, sizeof(apr_memcache_value_t));
> +
> +    value->status = APR_NOTFOUND;
> +    value->key = apr_pstrdup(data_pool, key);
> +
> +    apr_hash_set(*values, value->key, klen, value);

Strange, why does apr_hash_set takes an apr_ssize_t argument ?

> +}
> +
> +static void mget_conn_result(int up,
> +                             apr_status_t rv,
> +                             apr_memcache_t *mc,
> +                             apr_memcache_server_t *ms,
> +                             apr_memcache_conn_t *conn,
> +                             struct cache_server_query_t  
> *server_query,
> +                             apr_hash_t *values,
> +                             apr_hash_t *server_queries)
> +{
> +    int j;

Should be unsigned int

> +    apr_memcache_value_t* value;
> +
> +    if (!up) {
> +        ms_bad_conn(ms, conn);
> +        apr_memcache_disable_server(mc, ms);
> +    }
> +
> +    for (j = 1; j < server_query->query_vec_count ; j+=2) {
> +        if (server_query->query_vec[j].iov_base) {
> +            value = apr_hash_get(values, server_query->query_vec 
> [j].iov_base,
> +                                 strlen(server_query->query_vec 
> [j].iov_base));

Why not use iov_len ? or at least APR_HASH_KEY_STRING

> +
> +            if (value->status == APR_NOTFOUND) {
> +                value->status = rv;
> +            }
> +        }
> +    }
> +
> +    ms_release_conn(ms, conn);
> +
> +    apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
> +}
> +
> +APR_DECLARE(apr_status_t)
> +apr_memcache_multgetp(apr_memcache_t *mc,
> +                      apr_pool_t *temp_pool,
> +                      apr_pool_t *data_pool,
> +                      apr_hash_t *values)
> +{
> +    apr_status_t rv;
> +    apr_memcache_server_t* ms;
> +    apr_memcache_conn_t* conn;
> +    apr_uint32_t hash;
> +    apr_size_t written;
> +    int klen;

apr_size_t

> +
> +    apr_memcache_value_t* value;
> +    apr_hash_index_t* value_hash_index;
> +
> +    /* this is a little over aggresive, but beats multiple loops
> +     * to figure out how long each vector needs to be per-server.
> +     */
> +    unsigned int veclen = 2 + 2 * apr_hash_count(values) - 1; /*  
> get <key>[<space><key>...]\r\n */
> +    unsigned int i, j;
> +    unsigned int queries_sent;
> +    apr_int32_t queries_recvd;
> +
> +    apr_hash_t * server_queries = apr_hash_make(temp_pool);
> +    struct cache_server_query_t* server_query;
> +    apr_hash_index_t * query_hash_index;
> +
> +    apr_pollset_t* pollset;
> +    const apr_pollfd_t* activefds;
> +    apr_pollfd_t* pollfds;
> +
> +
> +    // build all the queries

C++ comment.

> +    value_hash_index = apr_hash_first(temp_pool, values);
> +    while (value_hash_index) {
> +        apr_hash_this(value_hash_index, NULL, NULL, (void**)&value);
> +        value_hash_index = apr_hash_next(value_hash_index);
> +        klen = strlen(value->key);
> +
> +        hash = apr_memcache_hash(value->key, klen);
> +        ms = apr_memcache_find_server_hash(mc, hash);
> +        if (ms == NULL) {
> +            continue;
> +        }
> +
> +        server_query = apr_hash_get(server_queries, &ms, sizeof(ms));
> +
> +        if (!server_query) {
> +            rv = ms_find_conn(ms, &conn);
> +
> +            if (rv != APR_SUCCESS) {
> +                apr_memcache_disable_server(mc, ms);
> +                value->status = rv;
> +                continue;
> +            }
> +
> +            server_query = apr_pcalloc(temp_pool,sizeof(struct  
> cache_server_query_t));

Missing space ",sizeof"

> +
> +            apr_hash_set(server_queries, &ms, sizeof(ms),  
> server_query);
> +
> +            server_query->ms = ms;
> +            server_query->conn = conn;
> +            server_query->query_vec = apr_pcalloc(temp_pool, sizeof 
> (struct iovec)*veclen);

s/veclen/3/ ?

> +
> +            // set up the first key
> +            server_query->query_vec[0].iov_base = MC_GET;
> +            server_query->query_vec[0].iov_len  = MC_GET_LEN;
> +
> +            server_query->query_vec[1].iov_base = (void*)(value- 
> >key);
> +            server_query->query_vec[1].iov_len  = klen;
> +
> +            server_query->query_vec[2].iov_base = MC_EOL;
> +            server_query->query_vec[2].iov_len  = MC_EOL_LEN;
> +
> +            server_query->query_vec_count = 3;
> +        }
> +        else {
> +            j = server_query->query_vec_count - 1;
> +
> +            server_query->query_vec[j].iov_base = MC_WS;
> +            server_query->query_vec[j].iov_len  = MC_WS_LEN;
> +            j++;
> +
> +            server_query->query_vec[j].iov_base = (void*)(value- 
> >key);
> +            server_query->query_vec[j].iov_len  = klen;
> +            j++;
> +
> +            server_query->query_vec[j].iov_base = MC_EOL;
> +            server_query->query_vec[j].iov_len  = MC_EOL_LEN;
> +            j++;
> +
> +           server_query->query_vec_count = j;
> +        }
> +    }
> +
> +    // create polling structures

C++ comment...etc :)

> +    pollfds = apr_pcalloc(temp_pool, apr_hash_count 
> (server_queries) * sizeof(apr_pollfd_t));
> +
> +    rv = apr_pollset_create(&pollset, apr_hash_count 
> (server_queries), temp_pool, 0);
> +
> +    if (rv != APR_SUCCESS) {
> +        return rv;
> +    }
> +
> +    // send all the queries
> +    queries_sent = 0;
> +    query_hash_index = apr_hash_first(temp_pool, server_queries);
> +
> +    while (query_hash_index) {
> +        apr_hash_this(query_hash_index, NULL, NULL, (void**) 
> &server_query);
> +        query_hash_index = apr_hash_next(query_hash_index);
> +
> +        conn = server_query->conn;
> +        ms = server_query->ms;
> +
> +        for (i = 0, rv = APR_SUCCESS; i < veclen && rv ==  
> APR_SUCCESS; i += IOV_MAX) {
> +            rv = apr_socket_sendv(conn->sock, &(server_query- 
> >query_vec[i]),
> +                                  veclen-i>IOV_MAX ? IOV_MAX :  
> veclen-i , &written);
> +        }
> +
> +        if (rv != APR_SUCCESS) {
> +            mget_conn_result(FALSE, rv, mc, ms, conn,
> +                             server_query, values, server_queries);
> +            continue;
> +        }
> +
> +        pollfds[queries_sent].desc_type = APR_POLL_SOCKET;
> +        pollfds[queries_sent].reqevents = APR_POLLIN;
> +        pollfds[queries_sent].p = temp_pool;
> +        pollfds[queries_sent].desc.s = conn->sock;
> +        pollfds[queries_sent].client_data = (void *)server_query;

Useless cast.

> +        apr_pollset_add (pollset, &pollfds[queries_sent]);
> +
> +        queries_sent++;
> +    }
> +
> +    while (queries_sent) {
> +        rv = apr_pollset_poll(pollset, MULT_GET_TIMEOUT,  
> &queries_recvd, &activefds);
> +
> +        if (rv != APR_SUCCESS) {
> +            // timeout
> +            queries_sent = 0;
> +            continue;
> +        }
> +        for (i = 0; i < queries_recvd; i++) {
> +            server_query = activefds[i].client_data;
> +            conn = server_query->conn;
> +            ms = server_query->ms;
> +
> +           rv = get_server_line(conn);
> +
> +           if (rv != APR_SUCCESS) {
> +               apr_pollset_remove (pollset, &activefds[i]);
> +               mget_conn_result(FALSE, rv, mc, ms, conn,
> +                                server_query, values,  
> server_queries);
> +               queries_sent--;
> +               continue;
> +           }
> +
> +           if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) {

I think you should move this to a function .

> +               char *key;
> +               char *flags;
> +               char *length;
> +               char *start;
> +               char *last;
> +               char *data;
> +               apr_size_t len;
> +
> +               start = conn->buffer;
> +               key = apr_strtok(conn->buffer, " ", &last); // just  
> the VALUE, ignore
> +               key = apr_strtok(NULL, " ", &last);
> +               flags = apr_strtok(NULL, " ", &last);
> +
> +

Too much new lines..

> +               length = apr_strtok(NULL, " ", &last);
> +               len = atoi(length);
> +
> +               value = apr_hash_get(values, key, strlen(key));
> +
> +

\n\n ??

>
> +               if (value) {
> +                   if (len > 0)  {
> +                       apr_bucket_brigade *bbb;
> +                       apr_bucket *e;
> +
> +                       /* eat the trailing \r\n */
> +                       rv = apr_brigade_partition(conn->bb, len+2,  
> &e);
> +
> +                       if (rv != APR_SUCCESS) {
> +                           apr_pollset_remove (pollset, &activefds 
> [i]);
> +                           mget_conn_result(FALSE, rv, mc, ms, conn,
> +                                            server_query, values,  
> server_queries);
> +                           queries_sent--;
> +                           continue;
> +                       }

error_path_out_whatever

> +
> +                       bbb = apr_brigade_split(conn->bb, e);
> +
> +                       rv = apr_brigade_pflatten(conn->bb, &data,  
> &len, data_pool);
> +
> +                       if (rv != APR_SUCCESS) {
> +                           apr_pollset_remove (pollset, &activefds 
> [i]);
> +                           mget_conn_result(FALSE, rv, mc, ms, conn,
> +                                            server_query, values,  
> server_queries);
> +                           queries_sent--;
> +                           continue;

if (rv != APR_SUCCESS)
   goto error_path_out_whatever;

error_path_out_whatever:
> +                           apr_pollset_remove (pollset, &activefds 
> [i]);
> +                           mget_conn_result(FALSE, rv, mc, ms, conn,
> +                                            server_query, values,  
> server_queries);
> +                           queries_sent--;
> +                           continue;



> +                       }
> +
> +                       rv = apr_brigade_destroy(conn->bb);
> +                       if (rv != APR_SUCCESS) {
> +                           apr_pollset_remove (pollset, &activefds 
> [i]);
> +                           mget_conn_result(FALSE, rv, mc, ms, conn,
> +                                            server_query, values,  
> server_queries);
> +                           queries_sent--;
> +                           continue;
> +                       }

error_path_out_whatever

> +
> +                       conn->bb = bbb;
> +
> +                       value->len = len - 2;
> +                       data[value->len] = '\0';
> +                       value->data = data;
> +                   }
> +
> +                   value->status = rv;
> +                   value->flags = atoi(flags);
> +
> +                   // stay on the server
> +                   i--;
> +
> +               }
> +               else {
> +                   // TODO: Server Sent back a key I didn't ask  
> for or my hash is corrupt
> +               }
> +           }
> +           else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
> +               // this connection is done
> +               ms_release_conn(ms, conn);
> +               apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
> +
> +               apr_pollset_remove (pollset, &activefds[i]);
> +               queries_sent--;
> +           }
> +           else {
> +               /* unknown reply? */
> +               rv = APR_EGENERAL;
> +           }
> +
> +        } /* /for */
> +    } /* /while */
> +
> +    query_hash_index = apr_hash_first(temp_pool, server_queries);
> +    while (query_hash_index) {
> +        apr_hash_this(query_hash_index, NULL, NULL, (void**) 
> &server_query);
> +        query_hash_index = apr_hash_next(query_hash_index);
> +
> +        conn = server_query->conn;
> +        ms = server_query->ms;
> +
> +        mget_conn_result(TRUE, rv, mc, ms, conn,
> +                         server_query, values, server_queries);
> +        continue;
> +    }
> +
> +    apr_pool_clear(temp_pool);
> +    apr_pollset_destroy(pollset);
> +    return APR_SUCCESS;
> +
> +}
> +
> +
> +
>  /**
>   * Define all of the strings for stats
>   */
> Index: include/apr_memcache.h
> ===================================================================
> --- include/apr_memcache.h	(revision 428526)
> +++ include/apr_memcache.h	(working copy)
> @@ -35,6 +35,7 @@
>  #include "apr_ring.h"
>  #include "apr_buckets.h"
>  #include "apr_reslist.h"
> +#include "apr_hash.h"
>
>  #ifdef __cplusplus
>  extern "C" {
> @@ -85,6 +86,16 @@
>      apr_pool_t *p; /** Pool to use for allocations */
>  } apr_memcache_t;
>
> +/** Returned Data from a multiple get */
> +typedef struct
> +{
> +    apr_status_t status;
> +    const char* key;
> +    apr_size_t len;
> +    char *data;
> +    apr_uint16_t flags;
> +} apr_memcache_value_t;
> +
>  /**
>   * Creates a crc32 hash used to split keys between servers
>   * @param data Data to be hashed
> @@ -194,7 +205,37 @@
>                                              apr_size_t *len,
>                                              apr_uint16_t *flags);
>
> +
>  /**
> + * Add a key to a hash for a multiget query
> + *  if the hash (*value) is NULL it will be created
> + * @param data_pool pool from where the hash and their items are  
> created from
> + * @param key null terminated string containing the key
> + * @param values hash of keys and values that this key will be  
> added to
> + * @return
> + */
> +APR_DECLARE(void)
> +apr_memcache_add_multget_key(apr_pool_t *data_pool,
> +                             const char* key,
> +                             apr_hash_t **values);
> +
> +/**
> + * Gets multiple values from the server, allocating the values out  
> of p
> + * @param mc client to use
> + * @param temp_pool Pool used for tempoary allocations. May be  
> cleared inside this
> + *        call.
> + * @param data_pool Pool used to allocate data for the returned  
> values.
> + * @param values hash of apr_memcache_value_t keyed by strings,  
> contains the
> + *        result of the multiget call.
> + * @return
> + */
> +APR_DECLARE(apr_status_t)
> +apr_memcache_multgetp(apr_memcache_t *mc,
> +                      apr_pool_t *temp_pool,
> +                      apr_pool_t *data_pool,
> +                      apr_hash_t *values);
> +
> +/**
>   * Sets a value by key on the server
>   * @param mc client to use
>   * @param key   null terminated string containing the key


Re: [PATCH] APR Memcache Multi-Get Support

Posted by Paul Querna <ch...@force-elite.com>.
Since no one seemed to object, I have committed this patch to trunk in
r428931.

-Paul

Paul Querna wrote:
> Attached is a patch to add support for multiple parallel fetching of
> keys from a memcache cluster.  It enables fetching of thousands of
> values from multiple memcache nodes without any significant slow downs
> for adding more nodes or keys.
> 
> The basic logic behind the main function, apr_memcache_multgetp, is
> currently pounding away with thousands of concurrent queries per second,
> without any problem.  I wouldn't call the function itself 'pretty', but
> I believe the logic embedded in it is correct.
> 
> [[[
> Add memcache multi-get support to apr_memcache.
> 
> * include/apr_memcache.h
>     Add _value_t structure for holding an individual value from
>     memcached.
>     Add new functions: apr_memcache_add_multget_key,
>     apr_memcache_multgetp.
> 
> * memcache/apr_memcache.c:
>     Add a local baton structure for associating severs to queries.
>     Add new functions: apr_memcache_add_multget_key,
>     mget_conn_result, apr_memcache_multgetp
> 
> Submitted By: Rob Emanuele <rob.emanuele ask.com>, Paul Querna
> <paul.querna ask.com>
> ]]]
> 
> 
> ------------------------------------------------------------------------
> 
> Index: memcache/apr_memcache.c
> ===================================================================
> --- memcache/apr_memcache.c	(revision 428526)
> +++ memcache/apr_memcache.c	(working copy)
> @@ -17,6 +17,7 @@
>   */
>  
>  #include "apr_memcache.h"
> +#include "apr_poll.h"
>  #include "apr_version.h"
>  #include <stdlib.h>
>  
> @@ -38,6 +39,9 @@
>  #define MC_EOL "\r\n"
>  #define MC_EOL_LEN (sizeof(MC_EOL)-1)
>  
> +#define MC_WS " "
> +#define MC_WS_LEN (sizeof(MC_WS)-1)
> +
>  #define MC_GET "get "
>  #define MC_GET_LEN (sizeof(MC_GET)-1)
>  
> @@ -97,7 +101,16 @@
>  #define MS_END "END"
>  #define MS_END_LEN (sizeof(MS_END)-1)
>  
> +/** Server and Query Structure for a multiple get */
> +struct cache_server_query_t {
> +    apr_memcache_server_t* ms;
> +    apr_memcache_conn_t* conn;
> +    struct iovec* query_vec;
> +    unsigned int query_vec_count;
> +};
>  
> +#define MULT_GET_TIMEOUT 50000
> +
>  static apr_status_t make_server_dead(apr_memcache_t *mc, apr_memcache_server_t *ms)
>  {
>  #if APR_HAS_THREADS
> @@ -1021,6 +1034,334 @@
>  }
>  
>  
> +APR_DECLARE(void) 
> +apr_memcache_add_multget_key(apr_pool_t *data_pool,
> +                             const char* key,
> +                             apr_hash_t **values)
> +{
> +    apr_memcache_value_t* value;
> +    int klen = strlen(key);
> +
> +    // create the value hash if need be
> +    if (!*values) {
> +        *values = apr_hash_make(data_pool);
> +    }
> +
> +    // init key and add it to the value hash
> +    value = apr_pcalloc(data_pool, sizeof(apr_memcache_value_t));
> +
> +    value->status = APR_NOTFOUND;
> +    value->key = apr_pstrdup(data_pool, key);
> +
> +    apr_hash_set(*values, value->key, klen, value);
> +}
> +
> +static void mget_conn_result(int up,
> +                             apr_status_t rv,
> +                             apr_memcache_t *mc,
> +                             apr_memcache_server_t *ms,
> +                             apr_memcache_conn_t *conn,
> +                             struct cache_server_query_t *server_query,
> +                             apr_hash_t *values,
> +                             apr_hash_t *server_queries)
> +{
> +    int j;
> +    apr_memcache_value_t* value;
> +    
> +    if (!up) {
> +        ms_bad_conn(ms, conn);
> +        apr_memcache_disable_server(mc, ms);
> +    }
> +    
> +    for (j = 1; j < server_query->query_vec_count ; j+=2) {
> +        if (server_query->query_vec[j].iov_base) {
> +            value = apr_hash_get(values, server_query->query_vec[j].iov_base,
> +                                 strlen(server_query->query_vec[j].iov_base));
> +            
> +            if (value->status == APR_NOTFOUND) {
> +                value->status = rv;
> +            }
> +        }
> +    }
> +
> +    ms_release_conn(ms, conn);
> +    
> +    apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
> +}
> +
> +APR_DECLARE(apr_status_t)
> +apr_memcache_multgetp(apr_memcache_t *mc,
> +                      apr_pool_t *temp_pool,
> +                      apr_pool_t *data_pool,
> +                      apr_hash_t *values)
> +{
> +    apr_status_t rv;
> +    apr_memcache_server_t* ms;
> +    apr_memcache_conn_t* conn;
> +    apr_uint32_t hash;
> +    apr_size_t written;
> +    int klen;
> +
> +    apr_memcache_value_t* value;
> +    apr_hash_index_t* value_hash_index;
> +
> +    /* this is a little over aggresive, but beats multiple loops
> +     * to figure out how long each vector needs to be per-server.
> +     */
> +    unsigned int veclen = 2 + 2 * apr_hash_count(values) - 1; /* get <key>[<space><key>...]\r\n */
> +    unsigned int i, j;
> +    unsigned int queries_sent;
> +    apr_int32_t queries_recvd;
> +
> +    apr_hash_t * server_queries = apr_hash_make(temp_pool);
> +    struct cache_server_query_t* server_query;
> +    apr_hash_index_t * query_hash_index;
> +
> +    apr_pollset_t* pollset;
> +    const apr_pollfd_t* activefds;
> +    apr_pollfd_t* pollfds;
> +
> +
> +    // build all the queries
> +    value_hash_index = apr_hash_first(temp_pool, values);
> +    while (value_hash_index) {
> +        apr_hash_this(value_hash_index, NULL, NULL, (void**)&value);
> +        value_hash_index = apr_hash_next(value_hash_index);
> +        klen = strlen(value->key);
> +
> +        hash = apr_memcache_hash(value->key, klen);
> +        ms = apr_memcache_find_server_hash(mc, hash);
> +        if (ms == NULL) {
> +            continue;
> +        }
> +
> +        server_query = apr_hash_get(server_queries, &ms, sizeof(ms));
> +
> +        if (!server_query) {
> +            rv = ms_find_conn(ms, &conn);
> +
> +            if (rv != APR_SUCCESS) {
> +                apr_memcache_disable_server(mc, ms);
> +                value->status = rv;
> +                continue;
> +            }
> +
> +            server_query = apr_pcalloc(temp_pool,sizeof(struct cache_server_query_t));
> +
> +            apr_hash_set(server_queries, &ms, sizeof(ms), server_query);
> +
> +            server_query->ms = ms;
> +            server_query->conn = conn;
> +            server_query->query_vec = apr_pcalloc(temp_pool, sizeof(struct iovec)*veclen);
> +
> +            // set up the first key
> +            server_query->query_vec[0].iov_base = MC_GET;
> +            server_query->query_vec[0].iov_len  = MC_GET_LEN;
> +
> +            server_query->query_vec[1].iov_base = (void*)(value->key);
> +            server_query->query_vec[1].iov_len  = klen;
> +
> +            server_query->query_vec[2].iov_base = MC_EOL;
> +            server_query->query_vec[2].iov_len  = MC_EOL_LEN;
> +
> +            server_query->query_vec_count = 3;
> +        }
> +        else {
> +            j = server_query->query_vec_count - 1;
> +
> +            server_query->query_vec[j].iov_base = MC_WS;
> +            server_query->query_vec[j].iov_len  = MC_WS_LEN;
> +            j++;
> +
> +            server_query->query_vec[j].iov_base = (void*)(value->key);
> +            server_query->query_vec[j].iov_len  = klen;
> +            j++;
> +
> +            server_query->query_vec[j].iov_base = MC_EOL;
> +            server_query->query_vec[j].iov_len  = MC_EOL_LEN;
> +            j++;
> +
> +           server_query->query_vec_count = j;
> +        }
> +    }
> +
> +    // create polling structures
> +    pollfds = apr_pcalloc(temp_pool, apr_hash_count(server_queries) * sizeof(apr_pollfd_t));
> +    
> +    rv = apr_pollset_create(&pollset, apr_hash_count(server_queries), temp_pool, 0);
> +
> +    if (rv != APR_SUCCESS) {
> +        return rv;
> +    }
> +
> +    // send all the queries
> +    queries_sent = 0;
> +    query_hash_index = apr_hash_first(temp_pool, server_queries);
> +
> +    while (query_hash_index) {
> +        apr_hash_this(query_hash_index, NULL, NULL, (void**)&server_query);
> +        query_hash_index = apr_hash_next(query_hash_index);
> +
> +        conn = server_query->conn;
> +        ms = server_query->ms;
> +
> +        for (i = 0, rv = APR_SUCCESS; i < veclen && rv == APR_SUCCESS; i += IOV_MAX) {
> +            rv = apr_socket_sendv(conn->sock, &(server_query->query_vec[i]),
> +                                  veclen-i>IOV_MAX ? IOV_MAX : veclen-i , &written);
> +        }
> +
> +        if (rv != APR_SUCCESS) {
> +            mget_conn_result(FALSE, rv, mc, ms, conn,
> +                             server_query, values, server_queries);
> +            continue;
> +        }
> +
> +        pollfds[queries_sent].desc_type = APR_POLL_SOCKET;
> +        pollfds[queries_sent].reqevents = APR_POLLIN;
> +        pollfds[queries_sent].p = temp_pool;
> +        pollfds[queries_sent].desc.s = conn->sock;
> +        pollfds[queries_sent].client_data = (void *)server_query;
> +        apr_pollset_add (pollset, &pollfds[queries_sent]);
> +
> +        queries_sent++;
> +    }
> +
> +    while (queries_sent) {
> +        rv = apr_pollset_poll(pollset, MULT_GET_TIMEOUT, &queries_recvd, &activefds);
> +
> +        if (rv != APR_SUCCESS) {
> +            // timeout
> +            queries_sent = 0;
> +            continue;
> +        }
> +        for (i = 0; i < queries_recvd; i++) {
> +            server_query = activefds[i].client_data;
> +            conn = server_query->conn;
> +            ms = server_query->ms;
> +
> +           rv = get_server_line(conn);
> +
> +           if (rv != APR_SUCCESS) {
> +               apr_pollset_remove (pollset, &activefds[i]);
> +               mget_conn_result(FALSE, rv, mc, ms, conn,
> +                                server_query, values, server_queries);
> +               queries_sent--;
> +               continue;
> +           }
> +
> +           if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) {
> +               char *key;
> +               char *flags;
> +               char *length;
> +               char *start;
> +               char *last;
> +               char *data;
> +               apr_size_t len;
> +
> +               start = conn->buffer;
> +               key = apr_strtok(conn->buffer, " ", &last); // just the VALUE, ignore
> +               key = apr_strtok(NULL, " ", &last);
> +               flags = apr_strtok(NULL, " ", &last);
> +
> +
> +               length = apr_strtok(NULL, " ", &last);
> +               len = atoi(length);
> +
> +               value = apr_hash_get(values, key, strlen(key));
> +
> +               
> +               if (value) {
> +                   if (len > 0)  {
> +                       apr_bucket_brigade *bbb;
> +                       apr_bucket *e;
> +                       
> +                       /* eat the trailing \r\n */
> +                       rv = apr_brigade_partition(conn->bb, len+2, &e);
> +                       
> +                       if (rv != APR_SUCCESS) {
> +                           apr_pollset_remove (pollset, &activefds[i]);
> +                           mget_conn_result(FALSE, rv, mc, ms, conn,
> +                                            server_query, values, server_queries);
> +                           queries_sent--;
> +                           continue;
> +                       }
> +                       
> +                       bbb = apr_brigade_split(conn->bb, e);
> +                       
> +                       rv = apr_brigade_pflatten(conn->bb, &data, &len, data_pool);
> +                       
> +                       if (rv != APR_SUCCESS) {
> +                           apr_pollset_remove (pollset, &activefds[i]);
> +                           mget_conn_result(FALSE, rv, mc, ms, conn,
> +                                            server_query, values, server_queries);
> +                           queries_sent--;
> +                           continue;
> +                       }
> +                       
> +                       rv = apr_brigade_destroy(conn->bb);
> +                       if (rv != APR_SUCCESS) {
> +                           apr_pollset_remove (pollset, &activefds[i]);
> +                           mget_conn_result(FALSE, rv, mc, ms, conn,
> +                                            server_query, values, server_queries);
> +                           queries_sent--;
> +                           continue;
> +                       }
> +                       
> +                       conn->bb = bbb;
> +                       
> +                       value->len = len - 2;
> +                       data[value->len] = '\0';
> +                       value->data = data;
> +                   }
> +                   
> +                   value->status = rv;
> +                   value->flags = atoi(flags);
> +                   
> +                   // stay on the server
> +                   i--;
> +                   
> +               }
> +               else {
> +                   // TODO: Server Sent back a key I didn't ask for or my hash is corrupt
> +               }
> +           }
> +           else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
> +               // this connection is done
> +               ms_release_conn(ms, conn);
> +               apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
> +               
> +               apr_pollset_remove (pollset, &activefds[i]);
> +               queries_sent--;
> +           }
> +           else {
> +               /* unknown reply? */
> +               rv = APR_EGENERAL;
> +           }
> +           
> +        } /* /for */
> +    } /* /while */
> +    
> +    query_hash_index = apr_hash_first(temp_pool, server_queries);
> +    while (query_hash_index) {
> +        apr_hash_this(query_hash_index, NULL, NULL, (void**)&server_query);
> +        query_hash_index = apr_hash_next(query_hash_index);
> +        
> +        conn = server_query->conn;
> +        ms = server_query->ms;
> +        
> +        mget_conn_result(TRUE, rv, mc, ms, conn,
> +                         server_query, values, server_queries);
> +        continue;
> +    }
> +    
> +    apr_pool_clear(temp_pool);
> +    apr_pollset_destroy(pollset);
> +    return APR_SUCCESS;
> +    
> +}
> +
> +
> +
>  /**
>   * Define all of the strings for stats
>   */
> Index: include/apr_memcache.h
> ===================================================================
> --- include/apr_memcache.h	(revision 428526)
> +++ include/apr_memcache.h	(working copy)
> @@ -35,6 +35,7 @@
>  #include "apr_ring.h"
>  #include "apr_buckets.h"
>  #include "apr_reslist.h"
> +#include "apr_hash.h"
>  
>  #ifdef __cplusplus
>  extern "C" {
> @@ -85,6 +86,16 @@
>      apr_pool_t *p; /** Pool to use for allocations */
>  } apr_memcache_t;
>  
> +/** Returned Data from a multiple get */
> +typedef struct
> +{
> +    apr_status_t status;
> +    const char* key;
> +    apr_size_t len;
> +    char *data;
> +    apr_uint16_t flags;
> +} apr_memcache_value_t;
> +
>  /**
>   * Creates a crc32 hash used to split keys between servers
>   * @param data Data to be hashed
> @@ -194,7 +205,37 @@
>                                              apr_size_t *len,
>                                              apr_uint16_t *flags);
>  
> +
>  /**
> + * Add a key to a hash for a multiget query
> + *  if the hash (*value) is NULL it will be created
> + * @param data_pool pool from where the hash and their items are created from
> + * @param key null terminated string containing the key
> + * @param values hash of keys and values that this key will be added to
> + * @return
> + */
> +APR_DECLARE(void) 
> +apr_memcache_add_multget_key(apr_pool_t *data_pool,
> +                             const char* key,
> +                             apr_hash_t **values);
> +
> +/**
> + * Gets multiple values from the server, allocating the values out of p
> + * @param mc client to use
> + * @param temp_pool Pool used for tempoary allocations. May be cleared inside this
> + *        call.
> + * @param data_pool Pool used to allocate data for the returned values.
> + * @param values hash of apr_memcache_value_t keyed by strings, contains the
> + *        result of the multiget call.
> + * @return
> + */
> +APR_DECLARE(apr_status_t)
> +apr_memcache_multgetp(apr_memcache_t *mc,
> +                      apr_pool_t *temp_pool,
> +                      apr_pool_t *data_pool,
> +                      apr_hash_t *values);
> +
> +/**
>   * Sets a value by key on the server
>   * @param mc client to use
>   * @param key   null terminated string containing the key