You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apr.apache.org by pq...@apache.org on 2007/07/10 12:28:37 UTC
svn commit: r554898 - in /apr/apr-util/branches/mc-binary-protocol-dev:
include/apr_memcache.h memcache/apr_memcache.c
Author: pquerna
Date: Tue Jul 10 03:28:24 2007
New Revision: 554898
URL: http://svn.apache.org/viewvc?view=rev&rev=554898
Log:
First rev of the work in progress support for a binary memcached protocol, as discussed at the memcached hackathon today:
<http://code.sixapart.com/svn/memcached/trunk/server/doc/binary-protocol-plan.jpg>
This version strips out support for multi-get, stats, and version commands, and stubs out all the other commands with assumed values. Once the memcache server side of the binary protocol is written, I wil sync our API side, and then add support for more complicated things like multi-get.
Modified:
apr/apr-util/branches/mc-binary-protocol-dev/include/apr_memcache.h
apr/apr-util/branches/mc-binary-protocol-dev/memcache/apr_memcache.c
Modified: apr/apr-util/branches/mc-binary-protocol-dev/include/apr_memcache.h
URL: http://svn.apache.org/viewvc/apr/apr-util/branches/mc-binary-protocol-dev/include/apr_memcache.h?view=diff&rev=554898&r1=554897&r2=554898
==============================================================================
--- apr/apr-util/branches/mc-binary-protocol-dev/include/apr_memcache.h (original)
+++ apr/apr-util/branches/mc-binary-protocol-dev/include/apr_memcache.h Tue Jul 10 03:28:24 2007
@@ -249,7 +249,7 @@
const char* key,
char **baton,
apr_size_t *len,
- apr_uint16_t *flags);
+ apr_uint32_t *flags);
/**
Modified: apr/apr-util/branches/mc-binary-protocol-dev/memcache/apr_memcache.c
URL: http://svn.apache.org/viewvc/apr/apr-util/branches/mc-binary-protocol-dev/memcache/apr_memcache.c?view=diff&rev=554898&r1=554897&r2=554898
==============================================================================
--- apr/apr-util/branches/mc-binary-protocol-dev/memcache/apr_memcache.c (original)
+++ apr/apr-util/branches/mc-binary-protocol-dev/memcache/apr_memcache.c Tue Jul 10 03:28:24 2007
@@ -19,10 +19,8 @@
#include "apr_version.h"
#include <stdlib.h>
-#define BUFFER_SIZE 512
struct apr_memcache_conn_t
{
- char *buffer;
apr_size_t blen;
apr_pool_t *p;
apr_socket_t *sock;
@@ -32,72 +30,45 @@
apr_memcache_server_t *ms;
};
-/* Strings for Client Commands */
+#define REQ_MAGIC_BYTE (0xF)
-#define MC_EOL "\r\n"
-#define MC_EOL_LEN (sizeof(MC_EOL)-1)
-
-#define MC_WS " "
-#define MC_WS_LEN (sizeof(MC_WS)-1)
-
-#define MC_GET "get "
-#define MC_GET_LEN (sizeof(MC_GET)-1)
-
-#define MC_SET "set "
-#define MC_SET_LEN (sizeof(MC_SET)-1)
-
-#define MC_ADD "add "
-#define MC_ADD_LEN (sizeof(MC_ADD)-1)
-
-#define MC_REPLACE "replace "
-#define MC_REPLACE_LEN (sizeof(MC_REPLACE)-1)
-
-#define MC_DELETE "delete "
-#define MC_DELETE_LEN (sizeof(MC_DELETE)-1)
-
-#define MC_INCR "incr "
-#define MC_INCR_LEN (sizeof(MC_INCR)-1)
-
-#define MC_DECR "decr "
-#define MC_DECR_LEN (sizeof(MC_DECR)-1)
-
-#define MC_VERSION "version"
-#define MC_VERSION_LEN (sizeof(MC_VERSION)-1)
-
-#define MC_STATS "stats"
-#define MC_STATS_LEN (sizeof(MC_STATS)-1)
-
-#define MC_QUIT "quit"
-#define MC_QUIT_LEN (sizeof(MC_QUIT)-1)
-
-/* Strings for Server Replies */
-
-#define MS_STORED "STORED"
-#define MS_STORED_LEN (sizeof(MS_STORED)-1)
-
-#define MS_NOT_STORED "NOT_STORED"
-#define MS_NOT_STORED_LEN (sizeof(MS_NOT_STORED)-1)
-
-#define MS_DELETED "DELETED"
-#define MS_DELETED_LEN (sizeof(MS_DELETED)-1)
-
-#define MS_NOT_FOUND "NOT_FOUND"
-#define MS_NOT_FOUND_LEN (sizeof(MS_NOT_FOUND)-1)
-
-#define MS_VALUE "VALUE"
-#define MS_VALUE_LEN (sizeof(MS_VALUE)-1)
-
-#define MS_ERROR "ERROR"
-#define MS_ERROR_LEN (sizeof(MS_ERROR)-1)
-
-#define MS_VERSION "VERSION"
-#define MS_VERSION_LEN (sizeof(MS_VERSION)-1)
+/* XXXX: sync these with memcached server impl */
+enum mc_cmd_type {
+ CMD_GET = 0,
+ CMD_SET = 1,
+ CMD_ADD = 2,
+ CMD_REPLACE = 3,
+ CMD_DELETE = 4,
+ CMD_INCR = 4,
+ CMD_DECR = 4,
+ CMD_QUIT = 99,
+};
-#define MS_STAT "STAT"
-#define MS_STAT_LEN (sizeof(MS_STAT)-1)
+#define MC_STATUS_SUCCESS (0)
+#define MC_STATUS_STORED (1)
+#define MC_STATUS_NOT_STORED (2)
+#define MC_STATUS_DELETED (3)
+#define MC_STATUS_NOT_FOUND (4)
+
+/* XXXX: why isn't this in apr.h? */
+typedef unsigned char apr_uint8_t;
+
+typedef struct {
+ apr_uint8_t magic;
+ apr_uint8_t cmd;
+ union {
+ apr_uint8_t rep_status;
+ apr_uint8_t req_keylen;
+ };
+ union {
+ apr_uint8_t req_reserved;
+ apr_uint8_t rep_reserved;
+ };
+ apr_uint32_t opaque_id;
+ apr_uint32_t lenleft;
+} apr_mc_hdr_t;
-#define MS_END "END"
-#define MS_END_LEN (sizeof(MS_END)-1)
+#define MC_HDR_LEN (sizeof(apr_mc_hdr_t))
/** Server and Query Structure for a multiple get */
struct cache_server_query_t {
@@ -324,7 +295,6 @@
conn->balloc = apr_bucket_alloc_create(conn->p);
conn->bb = apr_brigade_create(conn->p, conn->balloc);
conn->tb = apr_brigade_create(conn->p, conn->balloc);
- conn->buffer = apr_palloc(conn->p, BUFFER_SIZE);
conn->blen = 0;
conn->ms = ms;
@@ -342,22 +312,107 @@
return rv;
}
+static void setup_req_hdr(apr_mc_hdr_t *hdr, apr_uint8_t cmd)
+{
+ hdr->magic = REQ_MAGIC_BYTE;
+ hdr->cmd = cmd;
+ hdr->req_keylen = 0;
+ hdr->req_reserved = 0;
+ hdr->opaque_id = 0;
+ hdr->lenleft = 0;
+}
+
+static void finalize_req_hdr(apr_mc_hdr_t *hdr)
+{
+ /* XXXX: convert to network byte order */
+
+}
+
+static void repair_hdr_rep(apr_mc_hdr_t *hdr)
+{
+ /* XXXX: convert from network byte order */
+}
+
+static apr_status_t brigade_split_offset(apr_bucket_brigade *bbOut,
+ apr_bucket_brigade *bbIn,
+ apr_read_type_e block,
+ apr_off_t offset)
+{
+ apr_off_t readbytes = 0;
+
+ while (!APR_BRIGADE_EMPTY(bbIn)) {
+ const char *str;
+ apr_size_t len;
+ apr_status_t rv;
+ apr_bucket *e;
+
+ e = APR_BRIGADE_FIRST(bbIn);
+ rv = apr_bucket_read(e, &str, &len, block);
+
+ if (rv != APR_SUCCESS) {
+ /* if we didn't read the entire thing, stick what we have buffered
+ * back into bbOut */
+ if (!APR_BRIGADE_EMPTY(bbOut)) {
+ APR_BRIGADE_PREPEND(bbIn, bbOut);
+ }
+ return rv;
+ }
+
+ if (readbytes+len > offset) {
+ apr_bucket_split(e, offset - readbytes);
+ APR_BUCKET_REMOVE(e);
+ APR_BRIGADE_INSERT_TAIL(bbOut, e);
+ return APR_SUCCESS;
+ }
+
+ APR_BUCKET_REMOVE(e);
+ APR_BRIGADE_INSERT_TAIL(bbOut, e);
+ readbytes += len;
+ }
+
+ return APR_SUCCESS;
+}
+
+static apr_status_t get_server_rep(apr_memcache_conn_t *conn, apr_mc_hdr_t *hdr)
+{
+ apr_size_t bsize = MC_HDR_LEN;
+ apr_status_t rv = APR_SUCCESS;
+
+ rv = brigade_split_offset(conn->tb, conn->bb, APR_BLOCK_READ, bsize);
+
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ rv = apr_brigade_flatten(conn->tb, (char*)hdr, &bsize);
+
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ repair_hdr_rep(hdr);
+
+ return apr_brigade_cleanup(conn->tb);
+}
+
+
static apr_status_t
mc_conn_destruct(void *conn_, void *params, apr_pool_t *pool)
{
apr_memcache_conn_t *conn = (apr_memcache_conn_t*)conn_;
+ apr_mc_hdr_t hdr;
+
struct iovec vec[2];
apr_size_t written;
/* send a quit message to the memcached server to be nice about it. */
- vec[0].iov_base = MC_QUIT;
- vec[0].iov_len = MC_QUIT_LEN;
+ setup_req_hdr(&hdr, CMD_QUIT);
+ finalize_req_hdr(&hdr);
+ vec[0].iov_base = &hdr;
+ vec[0].iov_len = MC_HDR_LEN;
- vec[1].iov_base = MC_EOL;
- vec[1].iov_len = MC_EOL_LEN;
-
/* Return values not checked, since we just want to make it go away. */
- apr_socket_sendv(conn->sock, vec, 2, &written);
+ apr_socket_sendv(conn->sock, vec, 1, &written);
apr_socket_close(conn->sock);
return APR_SUCCESS;
@@ -538,44 +593,21 @@
}
}
-static apr_status_t get_server_line(apr_memcache_conn_t *conn)
-{
- apr_size_t bsize = BUFFER_SIZE;
- apr_status_t rv = APR_SUCCESS;
-
- rv = apr_brigade_split_line(conn->tb, conn->bb, APR_BLOCK_READ, BUFFER_SIZE);
-
- if (rv != APR_SUCCESS) {
- return rv;
- }
-
- rv = apr_brigade_flatten(conn->tb, conn->buffer, &bsize);
-
- if (rv != APR_SUCCESS) {
- return rv;
- }
-
- conn->blen = bsize;
- conn->buffer[bsize] = '\0';
-
- return apr_brigade_cleanup(conn->tb);
-}
-
static apr_status_t storage_cmd_write(apr_memcache_t *mc,
- char *cmd,
- const apr_size_t cmd_size,
+ apr_uint16_t cmdtype,
const char *key,
char *data,
const apr_size_t data_size,
apr_uint32_t timeout,
apr_uint16_t flags)
{
+ apr_mc_hdr_t hdr;
apr_uint32_t hash;
apr_memcache_server_t *ms;
apr_memcache_conn_t *conn;
apr_status_t rv;
apr_size_t written;
- struct iovec vec[5];
+ struct iovec vec[4];
int klen;
apr_size_t key_size = strlen(key);
@@ -594,27 +626,29 @@
return rv;
}
- /* <command name> <key> <flags> <exptime> <bytes>\r\n<data>\r\n */
+ setup_req_hdr(&hdr, cmdtype);
+ hdr.req_keylen = key_size;
+ hdr.opaque_id = 1;
+ hdr.lenleft = 12 + key_size + data_size;
+
+ finalize_req_hdr(&hdr);
- vec[0].iov_base = cmd;
- vec[0].iov_len = cmd_size;
+ vec[0].iov_base = &hdr;
+ vec[0].iov_len = MC_HDR_LEN;
vec[1].iov_base = (void*)key;
vec[1].iov_len = key_size;
- klen = apr_snprintf(conn->buffer, BUFFER_SIZE, " %u %u %" APR_SIZE_T_FMT " " MC_EOL,
- flags, timeout, data_size);
-
- vec[2].iov_base = conn->buffer;
- vec[2].iov_len = klen;
+ vec[2].iov_base = (void*)&flags;
+ vec[2].iov_len = sizeof(apr_uint32_t);
+
+ vec[2].iov_base = (void*)&timeout;
+ vec[2].iov_len = sizeof(apr_uint32_t);
vec[3].iov_base = data;
vec[3].iov_len = data_size;
- vec[4].iov_base = MC_EOL;
- vec[4].iov_len = MC_EOL_LEN;
-
- rv = apr_socket_sendv(conn->sock, vec, 5, &written);
+ rv = apr_socket_sendv(conn->sock, vec, 4, &written);
if (rv != APR_SUCCESS) {
ms_bad_conn(ms, conn);
@@ -622,7 +656,7 @@
return rv;
}
- rv = get_server_line(conn);
+ rv = get_server_rep(conn, &hdr);
if (rv != APR_SUCCESS) {
ms_bad_conn(ms, conn);
@@ -630,16 +664,16 @@
return rv;
}
- if (strcmp(conn->buffer, MS_STORED MC_EOL) == 0) {
- rv = APR_SUCCESS;
- }
- else if (strcmp(conn->buffer, MS_NOT_STORED MC_EOL) == 0) {
- rv = APR_EEXIST;
- }
- else {
- rv = APR_EGENERAL;
+ rv = APR_EGENERAL;
+ if (hdr.rep_status) {
+ if (hdr.rep_status == MC_STATUS_STORED) {
+ rv = APR_SUCCESS;
+ }
+ else if (hdr.rep_status == MC_STATUS_NOT_STORED) {
+ rv = APR_EEXIST;
+ }
}
-
+
ms_release_conn(ms, conn);
return rv;
@@ -654,7 +688,7 @@
apr_uint16_t flags)
{
return storage_cmd_write(mc,
- MC_SET, MC_SET_LEN,
+ CMD_SET,
key,
data, data_size,
timeout, flags);
@@ -669,7 +703,7 @@
apr_uint16_t flags)
{
return storage_cmd_write(mc,
- MC_ADD, MC_ADD_LEN,
+ CMD_ADD,
key,
data, data_size,
timeout, flags);
@@ -684,7 +718,7 @@
apr_uint16_t flags)
{
return storage_cmd_write(mc,
- MC_REPLACE, MC_REPLACE_LEN,
+ CMD_REPLACE,
key,
data, data_size,
timeout, flags);
@@ -697,15 +731,16 @@
const char *key,
char **baton,
apr_size_t *new_length,
- apr_uint16_t *flags_)
+ apr_uint32_t *flags)
{
+ apr_mc_hdr_t hdr;
apr_status_t rv;
apr_memcache_server_t *ms;
apr_memcache_conn_t *conn;
apr_uint32_t hash;
apr_size_t written;
int klen = strlen(key);
- struct iovec vec[3];
+ struct iovec vec[2];
hash = apr_memcache_hash(mc, key, klen);
ms = apr_memcache_find_server_hash(mc, hash);
@@ -719,17 +754,20 @@
return rv;
}
- /* get <key>[ <key>[...]]\r\n */
- vec[0].iov_base = MC_GET;
- vec[0].iov_len = MC_GET_LEN;
-
+ setup_req_hdr(&hdr, CMD_GET);
+ hdr.req_keylen = klen;
+ hdr.opaque_id = 1;
+ hdr.lenleft = klen;
+
+ finalize_req_hdr(&hdr);
+
+ vec[0].iov_base = &hdr;
+ vec[0].iov_len = MC_HDR_LEN;
+
vec[1].iov_base = (void*)key;
vec[1].iov_len = klen;
-
- vec[2].iov_base = MC_EOL;
- vec[2].iov_len = MC_EOL_LEN;
-
- rv = apr_socket_sendv(conn->sock, vec, 3, &written);
+
+ rv = apr_socket_sendv(conn->sock, vec, 2, &written);
if (rv != APR_SUCCESS) {
ms_bad_conn(ms, conn);
@@ -737,86 +775,41 @@
return rv;
}
- rv = get_server_line(conn);
+ rv = get_server_rep(conn, &hdr);
+
if (rv != APR_SUCCESS) {
ms_bad_conn(ms, conn);
apr_memcache_disable_server(mc, ms);
return rv;
}
-
- if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) {
- char *flags;
- char *length;
- char *start;
- char *last;
+
+ {
apr_size_t len;
- start = conn->buffer;
- flags = apr_strtok(conn->buffer," ",&last);
- flags = apr_strtok(NULL," ",&last);
- flags = apr_strtok(NULL," ",&last);
-
- if (flags_)
- *flags_ = atoi(flags);
-
- length = apr_strtok(NULL," ",&last);
- len = atoi(length);
- if (len < 0) {
- *new_length = 0;
- *baton = NULL;
- }
- else {
- apr_bucket_brigade *bbb;
- apr_bucket *e;
-
- /* eat the trailing \r\n */
- rv = apr_brigade_partition(conn->bb, len+2, &e);
-
- if (rv != APR_SUCCESS) {
- ms_bad_conn(ms, conn);
- apr_memcache_disable_server(mc, ms);
- return rv;
- }
-
- bbb = apr_brigade_split(conn->bb, e);
-
- rv = apr_brigade_pflatten(conn->bb, baton, &len, p);
-
- if (rv != APR_SUCCESS) {
- ms_bad_conn(ms, conn);
- apr_memcache_disable_server(mc, ms);
- return rv;
- }
-
- rv = apr_brigade_destroy(conn->bb);
- if (rv != APR_SUCCESS) {
- ms_bad_conn(ms, conn);
- apr_memcache_disable_server(mc, ms);
- return rv;
- }
-
- conn->bb = bbb;
-
- *new_length = len - 2;
- (*baton)[*new_length] = '\0';
- }
+ rv = brigade_split_offset(conn->tb, conn->bb, APR_BLOCK_READ, hdr.lenleft);
- rv = get_server_line(conn);
if (rv != APR_SUCCESS) {
ms_bad_conn(ms, conn);
apr_memcache_disable_server(mc, ms);
return rv;
}
+
+ len = hdr.lenleft + 1;
+
+ rv = apr_brigade_pflatten(conn->tb, baton, &len, p);
- if (strncmp(MS_END, conn->buffer, MS_END_LEN) != 0) {
- rv = APR_EGENERAL;
+ if (rv != APR_SUCCESS) {
+ ms_bad_conn(ms, conn);
+ apr_memcache_disable_server(mc, ms);
+ return rv;
}
- }
- else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
- rv = APR_NOTFOUND;
- }
- else {
- rv = APR_EGENERAL;
+
+ apr_brigade_cleanup(conn->tb);
+
+ *new_length = len - 1 - 4;
+ memcpy(flags, *baton, 4);
+ *baton = *baton + 4;
+ (*baton)[*new_length] = '\0';
}
ms_release_conn(ms, conn);
@@ -829,6 +822,7 @@
const char *key,
apr_uint32_t timeout)
{
+ apr_mc_hdr_t hdr;
apr_status_t rv;
apr_memcache_server_t *ms;
apr_memcache_conn_t *conn;
@@ -849,19 +843,20 @@
return rv;
}
- /* delete <key> <time>\r\n */
- vec[0].iov_base = MC_DELETE;
- vec[0].iov_len = MC_DELETE_LEN;
-
+ setup_req_hdr(&hdr, CMD_DELETE);
+ hdr.req_keylen = klen;
+ hdr.opaque_id = 1;
+ hdr.lenleft = klen;
+
+ finalize_req_hdr(&hdr);
+
+ vec[0].iov_base = &hdr;
+ vec[0].iov_len = MC_HDR_LEN;
+
vec[1].iov_base = (void*)key;
vec[1].iov_len = klen;
- klen = apr_snprintf(conn->buffer, BUFFER_SIZE, " %u" MC_EOL, timeout);
-
- vec[2].iov_base = conn->buffer;
- vec[2].iov_len = klen;
-
- rv = apr_socket_sendv(conn->sock, vec, 3, &written);
+ rv = apr_socket_sendv(conn->sock, vec, 2, &written);
if (rv != APR_SUCCESS) {
ms_bad_conn(ms, conn);
@@ -869,35 +864,36 @@
return rv;
}
- rv = get_server_line(conn);
+ rv = get_server_rep(conn, &hdr);
+
if (rv != APR_SUCCESS) {
ms_bad_conn(ms, conn);
apr_memcache_disable_server(mc, ms);
return rv;
}
- if (strncmp(MS_DELETED, conn->buffer, MS_DELETED_LEN) == 0) {
- rv = APR_SUCCESS;
- }
- else if (strncmp(MS_NOT_FOUND, conn->buffer, MS_NOT_FOUND_LEN) == 0) {
- rv = APR_NOTFOUND;
- }
- else {
- rv = APR_EGENERAL;
+ rv = APR_EGENERAL;
+ if (hdr.rep_status) {
+ if (hdr.rep_status == MC_STATUS_DELETED) {
+ rv = APR_SUCCESS;
+ }
+ else if (hdr.rep_status == MC_STATUS_NOT_FOUND) {
+ rv = APR_NOTFOUND;
+ }
}
-
+
ms_release_conn(ms, conn);
return rv;
}
static apr_status_t num_cmd_write(apr_memcache_t *mc,
- char *cmd,
- const apr_uint32_t cmd_size,
- const char *key,
- const apr_int32_t inc,
- apr_uint32_t *new_value)
+ apr_uint8_t cmd,
+ const char *key,
+ const apr_int32_t inc,
+ apr_uint32_t *new_value)
{
+ apr_mc_hdr_t hdr;
apr_status_t rv;
apr_memcache_server_t *ms;
apr_memcache_conn_t *conn;
@@ -918,17 +914,21 @@
return rv;
}
- /* <cmd> <key> <value>\r\n */
- vec[0].iov_base = cmd;
- vec[0].iov_len = cmd_size;
-
+ setup_req_hdr(&hdr, CMD_DELETE);
+ hdr.req_keylen = klen;
+ hdr.opaque_id = 1;
+ hdr.lenleft = klen + 4;
+
+ finalize_req_hdr(&hdr);
+
+ vec[0].iov_base = &hdr;
+ vec[0].iov_len = MC_HDR_LEN;
+
vec[1].iov_base = (void*)key;
vec[1].iov_len = klen;
- klen = apr_snprintf(conn->buffer, BUFFER_SIZE, " %u" MC_EOL, inc);
-
- vec[2].iov_base = conn->buffer;
- vec[2].iov_len = klen;
+ vec[2].iov_base = (void*)&inc;
+ vec[2].iov_len = sizeof(inc);
rv = apr_socket_sendv(conn->sock, vec, 3, &written);
@@ -938,24 +938,42 @@
return rv;
}
- rv = get_server_line(conn);
+ rv = get_server_rep(conn, &hdr);
+
if (rv != APR_SUCCESS) {
ms_bad_conn(ms, conn);
apr_memcache_disable_server(mc, ms);
return rv;
}
-
- if (strncmp(MS_ERROR, conn->buffer, MS_ERROR_LEN) == 0) {
- rv = APR_EGENERAL;
- }
- else if (strncmp(MS_NOT_FOUND, conn->buffer, MS_NOT_FOUND_LEN) == 0) {
- rv = APR_NOTFOUND;
+
+ rv = APR_EGENERAL;
+ if (hdr.rep_status) {
+ if (hdr.rep_status == MC_STATUS_DELETED) {
+ rv = APR_SUCCESS;
+ }
+ else if (hdr.rep_status == MC_STATUS_NOT_FOUND) {
+ rv = APR_NOTFOUND;
+ }
}
else {
+ rv = brigade_split_offset(conn->tb, conn->bb, APR_BLOCK_READ, hdr.lenleft);
+ if (rv) {
+ ms_bad_conn(ms, conn);
+ apr_memcache_disable_server(mc, ms);
+ return rv;
+ }
+
if (new_value) {
- *new_value = atoi(conn->buffer);
+ apr_size_t len = 4;
+ rv = apr_brigade_flatten(conn->tb, (char*)new_value, &len);
+ if (rv) {
+ ms_bad_conn(ms, conn);
+ apr_memcache_disable_server(mc, ms);
+ return rv;
+ }
+
+ apr_brigade_cleanup(conn->tb);
}
- rv = APR_SUCCESS;
}
ms_release_conn(ms, conn);
@@ -970,8 +988,7 @@
apr_uint32_t *new_value)
{
return num_cmd_write(mc,
- MC_INCR,
- MC_INCR_LEN,
+ CMD_INCR,
key,
inc,
new_value);
@@ -985,8 +1002,7 @@
apr_uint32_t *new_value)
{
return num_cmd_write(mc,
- MC_DECR,
- MC_DECR_LEN,
+ CMD_DECR,
key,
inc,
new_value);
@@ -999,84 +1015,14 @@
apr_pool_t *p,
char **baton)
{
- apr_status_t rv;
- apr_memcache_conn_t *conn;
- apr_size_t written;
- struct iovec vec[2];
-
- rv = ms_find_conn(ms, &conn);
-
- if (rv != APR_SUCCESS) {
- return rv;
- }
-
- /* version\r\n */
- vec[0].iov_base = MC_VERSION;
- vec[0].iov_len = MC_VERSION_LEN;
-
- vec[1].iov_base = MC_EOL;
- vec[1].iov_len = MC_EOL_LEN;
-
- rv = apr_socket_sendv(conn->sock, vec, 2, &written);
-
- if (rv != APR_SUCCESS) {
- ms_bad_conn(ms, conn);
- return rv;
- }
-
- rv = get_server_line(conn);
- if (rv != APR_SUCCESS) {
- ms_bad_conn(ms, conn);
- return rv;
- }
-
- if (strncmp(MS_VERSION, conn->buffer, MS_VERSION_LEN) == 0) {
- *baton = apr_pstrmemdup(p, conn->buffer+MS_VERSION_LEN+1,
- conn->blen - MS_VERSION_LEN - 2);
- rv = APR_SUCCESS;
- }
- else {
- rv = APR_EGENERAL;
- }
-
- ms_release_conn(ms, conn);
-
- return rv;
+ return APR_ENOTIMPL;
}
apr_status_t mc_version_ping(apr_memcache_server_t *ms)
{
- apr_status_t rv;
- apr_size_t written;
- struct iovec vec[2];
- apr_memcache_conn_t *conn;
-
- rv = ms_find_conn(ms, &conn);
-
- if (rv != APR_SUCCESS) {
- return rv;
- }
-
- /* version\r\n */
- vec[0].iov_base = MC_VERSION;
- vec[0].iov_len = MC_VERSION_LEN;
-
- vec[1].iov_base = MC_EOL;
- vec[1].iov_len = MC_EOL_LEN;
-
- rv = apr_socket_sendv(conn->sock, vec, 2, &written);
-
- if (rv != APR_SUCCESS) {
- ms_bad_conn(ms, conn);
- return rv;
- }
-
- rv = get_server_line(conn);
- ms_release_conn(ms, conn);
- return rv;
+ return APR_ENOTIMPL;
}
-
APR_DECLARE(void)
apr_memcache_add_multget_key(apr_pool_t *data_pool,
const char* key,
@@ -1099,560 +1045,21 @@
apr_hash_set(*values, value->key, klen, value);
}
-static void mget_conn_result(int up,
- apr_status_t rv,
- apr_memcache_t *mc,
- apr_memcache_server_t *ms,
- apr_memcache_conn_t *conn,
- struct cache_server_query_t *server_query,
- apr_hash_t *values,
- apr_hash_t *server_queries)
-{
- int j;
- apr_memcache_value_t* value;
-
- if (!up) {
- ms_bad_conn(ms, conn);
- apr_memcache_disable_server(mc, ms);
- }
-
- for (j = 1; j < server_query->query_vec_count ; j+=2) {
- if (server_query->query_vec[j].iov_base) {
- value = apr_hash_get(values, server_query->query_vec[j].iov_base,
- strlen(server_query->query_vec[j].iov_base));
-
- if (value->status == APR_NOTFOUND) {
- value->status = rv;
- }
- }
- }
-
- ms_release_conn(ms, conn);
-
- apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
-}
-
APR_DECLARE(apr_status_t)
apr_memcache_multgetp(apr_memcache_t *mc,
apr_pool_t *temp_pool,
apr_pool_t *data_pool,
apr_hash_t *values)
{
- apr_status_t rv;
- apr_memcache_server_t* ms;
- apr_memcache_conn_t* conn;
- apr_uint32_t hash;
- apr_size_t written;
- int klen;
-
- apr_memcache_value_t* value;
- apr_hash_index_t* value_hash_index;
-
- /* this is a little over aggresive, but beats multiple loops
- * to figure out how long each vector needs to be per-server.
- */
- unsigned int veclen = 2 + 2 * apr_hash_count(values) - 1; /* get <key>[<space><key>...]\r\n */
- unsigned int i, j;
- unsigned int queries_sent;
- apr_int32_t queries_recvd;
-
- apr_hash_t * server_queries = apr_hash_make(temp_pool);
- struct cache_server_query_t* server_query;
- apr_hash_index_t * query_hash_index;
-
- apr_pollset_t* pollset;
- const apr_pollfd_t* activefds;
- apr_pollfd_t* pollfds;
-
-
- /* build all the queries */
- value_hash_index = apr_hash_first(temp_pool, values);
- while (value_hash_index) {
- void *v;
- apr_hash_this(value_hash_index, NULL, NULL, &v);
- value = v;
- value_hash_index = apr_hash_next(value_hash_index);
- klen = strlen(value->key);
-
- hash = apr_memcache_hash(mc, value->key, klen);
- ms = apr_memcache_find_server_hash(mc, hash);
- if (ms == NULL) {
- continue;
- }
-
- server_query = apr_hash_get(server_queries, &ms, sizeof(ms));
-
- if (!server_query) {
- rv = ms_find_conn(ms, &conn);
-
- if (rv != APR_SUCCESS) {
- apr_memcache_disable_server(mc, ms);
- value->status = rv;
- continue;
- }
-
- server_query = apr_pcalloc(temp_pool,sizeof(struct cache_server_query_t));
-
- apr_hash_set(server_queries, &ms, sizeof(ms), server_query);
-
- server_query->ms = ms;
- server_query->conn = conn;
- server_query->query_vec = apr_pcalloc(temp_pool, sizeof(struct iovec)*veclen);
-
- /* set up the first key */
- server_query->query_vec[0].iov_base = MC_GET;
- server_query->query_vec[0].iov_len = MC_GET_LEN;
-
- server_query->query_vec[1].iov_base = (void*)(value->key);
- server_query->query_vec[1].iov_len = klen;
-
- server_query->query_vec[2].iov_base = MC_EOL;
- server_query->query_vec[2].iov_len = MC_EOL_LEN;
-
- server_query->query_vec_count = 3;
- }
- else {
- j = server_query->query_vec_count - 1;
-
- server_query->query_vec[j].iov_base = MC_WS;
- server_query->query_vec[j].iov_len = MC_WS_LEN;
- j++;
-
- server_query->query_vec[j].iov_base = (void*)(value->key);
- server_query->query_vec[j].iov_len = klen;
- j++;
-
- server_query->query_vec[j].iov_base = MC_EOL;
- server_query->query_vec[j].iov_len = MC_EOL_LEN;
- j++;
-
- server_query->query_vec_count = j;
- }
- }
-
- /* create polling structures */
- pollfds = apr_pcalloc(temp_pool, apr_hash_count(server_queries) * sizeof(apr_pollfd_t));
-
- rv = apr_pollset_create(&pollset, apr_hash_count(server_queries), temp_pool, 0);
-
- if (rv != APR_SUCCESS) {
- return rv;
- }
-
- /* send all the queries */
- queries_sent = 0;
- query_hash_index = apr_hash_first(temp_pool, server_queries);
-
- while (query_hash_index) {
- void *v;
- apr_hash_this(query_hash_index, NULL, NULL, &v);
- server_query = v;
- query_hash_index = apr_hash_next(query_hash_index);
-
- conn = server_query->conn;
- ms = server_query->ms;
-
- for (i = 0, rv = APR_SUCCESS; i < veclen && rv == APR_SUCCESS; i += APR_MAX_IOVEC_SIZE) {
- rv = apr_socket_sendv(conn->sock, &(server_query->query_vec[i]),
- veclen-i>APR_MAX_IOVEC_SIZE ? APR_MAX_IOVEC_SIZE : veclen-i , &written);
- }
-
- if (rv != APR_SUCCESS) {
- mget_conn_result(FALSE, rv, mc, ms, conn,
- server_query, values, server_queries);
- continue;
- }
-
- pollfds[queries_sent].desc_type = APR_POLL_SOCKET;
- pollfds[queries_sent].reqevents = APR_POLLIN;
- pollfds[queries_sent].p = temp_pool;
- pollfds[queries_sent].desc.s = conn->sock;
- pollfds[queries_sent].client_data = (void *)server_query;
- apr_pollset_add (pollset, &pollfds[queries_sent]);
-
- queries_sent++;
- }
-
- while (queries_sent) {
- rv = apr_pollset_poll(pollset, MULT_GET_TIMEOUT, &queries_recvd, &activefds);
-
- if (rv != APR_SUCCESS) {
- /* timeout */
- queries_sent = 0;
- continue;
- }
- for (i = 0; i < queries_recvd; i++) {
- server_query = activefds[i].client_data;
- conn = server_query->conn;
- ms = server_query->ms;
-
- rv = get_server_line(conn);
-
- if (rv != APR_SUCCESS) {
- apr_pollset_remove (pollset, &activefds[i]);
- mget_conn_result(FALSE, rv, mc, ms, conn,
- server_query, values, server_queries);
- queries_sent--;
- continue;
- }
-
- if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) {
- char *key;
- char *flags;
- char *length;
- char *start;
- char *last;
- char *data;
- apr_size_t len;
-
- start = conn->buffer;
- key = apr_strtok(conn->buffer, " ", &last); /* just the VALUE, ignore */
- key = apr_strtok(NULL, " ", &last);
- flags = apr_strtok(NULL, " ", &last);
-
-
- length = apr_strtok(NULL, " ", &last);
- len = atoi(length);
-
- value = apr_hash_get(values, key, strlen(key));
-
-
- if (value) {
- if (len > 0) {
- apr_bucket_brigade *bbb;
- apr_bucket *e;
-
- /* eat the trailing \r\n */
- rv = apr_brigade_partition(conn->bb, len+2, &e);
-
- if (rv != APR_SUCCESS) {
- apr_pollset_remove (pollset, &activefds[i]);
- mget_conn_result(FALSE, rv, mc, ms, conn,
- server_query, values, server_queries);
- queries_sent--;
- continue;
- }
-
- bbb = apr_brigade_split(conn->bb, e);
-
- rv = apr_brigade_pflatten(conn->bb, &data, &len, data_pool);
-
- if (rv != APR_SUCCESS) {
- apr_pollset_remove (pollset, &activefds[i]);
- mget_conn_result(FALSE, rv, mc, ms, conn,
- server_query, values, server_queries);
- queries_sent--;
- continue;
- }
-
- rv = apr_brigade_destroy(conn->bb);
- if (rv != APR_SUCCESS) {
- apr_pollset_remove (pollset, &activefds[i]);
- mget_conn_result(FALSE, rv, mc, ms, conn,
- server_query, values, server_queries);
- queries_sent--;
- continue;
- }
-
- conn->bb = bbb;
-
- value->len = len - 2;
- data[value->len] = '\0';
- value->data = data;
- }
-
- value->status = rv;
- value->flags = atoi(flags);
-
- /* stay on the server */
- i--;
-
- }
- else {
- /* TODO: Server Sent back a key I didn't ask for or my
- * hash is corrupt */
- }
- }
- else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
- /* this connection is done */
- apr_pollset_remove (pollset, &activefds[i]);
- ms_release_conn(ms, conn);
- apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
-
- queries_sent--;
- }
- else {
- /* unknown reply? */
- rv = APR_EGENERAL;
- }
-
- } /* /for */
- } /* /while */
-
- query_hash_index = apr_hash_first(temp_pool, server_queries);
- while (query_hash_index) {
- void *v;
- apr_hash_this(query_hash_index, NULL, NULL, &v);
- server_query = v;
- query_hash_index = apr_hash_next(query_hash_index);
-
- conn = server_query->conn;
- ms = server_query->ms;
-
- mget_conn_result(TRUE, rv, mc, ms, conn,
- server_query, values, server_queries);
- continue;
- }
-
- apr_pool_clear(temp_pool);
- apr_pollset_destroy(pollset);
- return APR_SUCCESS;
-
-}
-
-
-
-/**
- * Define all of the strings for stats
- */
-
-#define STAT_pid MS_STAT " pid "
-#define STAT_pid_LEN (sizeof(STAT_pid)-1)
-
-#define STAT_uptime MS_STAT " uptime "
-#define STAT_uptime_LEN (sizeof(STAT_uptime)-1)
-
-#define STAT_time MS_STAT " time "
-#define STAT_time_LEN (sizeof(STAT_time)-1)
-
-#define STAT_version MS_STAT " version "
-#define STAT_version_LEN (sizeof(STAT_version)-1)
-
-#define STAT_pointer_size MS_STAT " pointer_size "
-#define STAT_pointer_size_LEN (sizeof(STAT_pointer_size)-1)
-
-#define STAT_rusage_user MS_STAT " rusage_user "
-#define STAT_rusage_user_LEN (sizeof(STAT_rusage_user)-1)
-
-#define STAT_rusage_system MS_STAT " rusage_system "
-#define STAT_rusage_system_LEN (sizeof(STAT_rusage_system)-1)
-
-#define STAT_curr_items MS_STAT " curr_items "
-#define STAT_curr_items_LEN (sizeof(STAT_curr_items)-1)
-
-#define STAT_total_items MS_STAT " total_items "
-#define STAT_total_items_LEN (sizeof(STAT_total_items)-1)
-
-#define STAT_bytes MS_STAT " bytes "
-#define STAT_bytes_LEN (sizeof(STAT_bytes)-1)
-
-#define STAT_curr_connections MS_STAT " curr_connections "
-#define STAT_curr_connections_LEN (sizeof(STAT_curr_connections)-1)
-
-#define STAT_total_connections MS_STAT " total_connections "
-#define STAT_total_connections_LEN (sizeof(STAT_total_connections)-1)
-
-#define STAT_connection_structures MS_STAT " connection_structures "
-#define STAT_connection_structures_LEN (sizeof(STAT_connection_structures)-1)
-
-#define STAT_cmd_get MS_STAT " cmd_get "
-#define STAT_cmd_get_LEN (sizeof(STAT_cmd_get)-1)
-
-#define STAT_cmd_set MS_STAT " cmd_set "
-#define STAT_cmd_set_LEN (sizeof(STAT_cmd_set)-1)
-
-#define STAT_get_hits MS_STAT " get_hits "
-#define STAT_get_hits_LEN (sizeof(STAT_get_hits)-1)
-
-#define STAT_get_misses MS_STAT " get_misses "
-#define STAT_get_misses_LEN (sizeof(STAT_get_misses)-1)
-
-#define STAT_evictions MS_STAT " evictions "
-#define STAT_evictions_LEN (sizeof(STAT_evictions)-1)
-
-#define STAT_bytes_read MS_STAT " bytes_read "
-#define STAT_bytes_read_LEN (sizeof(STAT_bytes_read)-1)
-
-#define STAT_bytes_written MS_STAT " bytes_written "
-#define STAT_bytes_written_LEN (sizeof(STAT_bytes_written)-1)
-
-#define STAT_limit_maxbytes MS_STAT " limit_maxbytes "
-#define STAT_limit_maxbytes_LEN (sizeof(STAT_limit_maxbytes)-1)
-
-#define STAT_threads MS_STAT " threads "
-#define STAT_threads_LEN (sizeof(STAT_threads)-1)
-
-static const char *stat_read_string(apr_pool_t *p, char *buf, int len)
-{
- /* remove trailing \r\n and null char */
- return apr_pstrmemdup(p, buf, len-2);
-}
-
-static apr_uint32_t stat_read_uint32(apr_pool_t *p, char *buf, int len)
-{
- buf[len-2] = '\0';
- return atoi(buf);
-}
-
-static apr_uint64_t stat_read_uint64(apr_pool_t *p, char *buf, int len)
-{
- buf[len-2] = '\0';
- return apr_atoi64(buf);
-}
-
-static apr_time_t stat_read_time(apr_pool_t *p, char *buf, int len)
-{
- buf[len-2] = '\0';
- return apr_time_from_sec(atoi(buf));
+ return APR_ENOTIMPL;
}
-static apr_time_t stat_read_rtime(apr_pool_t *p, char *buf, int len)
-{
- char *tok;
- char *secs;
- char *usecs;
- const char *sep = ":";
-
- buf[len-2] = '\0';
-
- secs = apr_strtok(buf, sep, &tok);
- if (secs == NULL) {
- sep = ".";
- secs = apr_strtok(buf, sep, &tok);
- }
- usecs = apr_strtok(NULL, sep, &tok);
- if (secs && usecs) {
- return apr_time_make(atoi(secs), atoi(usecs));
- }
- else {
- return apr_time_make(0, 0);
- }
-}
-
-/**
- * I got tired of Typing. Meh.
- *
- * TODO: Convert it to static tables to make it cooler.
- */
-
-#define mc_stat_cmp(name) \
- strncmp(STAT_ ## name, conn->buffer, STAT_ ## name ## _LEN) == 0
-
-#define mc_stat_str(name) \
- stat_read_string(p, conn->buffer + name, \
- conn->blen - name)
-
-#define mc_stat_uint32(name) \
- stat_read_uint32(p, conn->buffer + name, \
- conn->blen - name)
-
-#define mc_stat_uint64(name) \
- stat_read_uint64(p, conn->buffer + name, \
- conn->blen - name)
-
-#define mc_stat_time(name) \
- stat_read_time(p, conn->buffer + name, \
- conn->blen - name)
-
-#define mc_stat_rtime(name) \
- stat_read_rtime(p, conn->buffer + name, \
- conn->blen - name)
-
-
-#define mc_do_stat(name, type) \
- if (mc_stat_cmp(name)) { \
- stats-> name = mc_stat_ ## type ((STAT_ ## name ## _LEN)); \
- }
-
-static void update_stats(apr_pool_t *p, apr_memcache_conn_t *conn,
- apr_memcache_stats_t *stats)
-{
-
- mc_do_stat(version, str)
- else mc_do_stat(pid, uint32)
- else mc_do_stat(uptime, uint32)
- else mc_do_stat(pointer_size, uint32)
- else mc_do_stat(time, time)
- else mc_do_stat(rusage_user, rtime)
- else mc_do_stat(rusage_system, rtime)
- else mc_do_stat(curr_items, uint32)
- else mc_do_stat(total_items, uint32)
- else mc_do_stat(bytes, uint64)
- else mc_do_stat(curr_connections, uint32)
- else mc_do_stat(total_connections, uint32)
- else mc_do_stat(connection_structures, uint32)
- else mc_do_stat(cmd_get, uint32)
- else mc_do_stat(cmd_set, uint32)
- else mc_do_stat(get_hits, uint32)
- else mc_do_stat(get_misses, uint32)
- else mc_do_stat(evictions, uint64)
- else mc_do_stat(bytes_read, uint64)
- else mc_do_stat(bytes_written, uint64)
- else mc_do_stat(limit_maxbytes, uint32)
- else mc_do_stat(threads, uint32)
-}
APR_DECLARE(apr_status_t)
apr_memcache_stats(apr_memcache_server_t *ms,
apr_pool_t *p,
apr_memcache_stats_t **stats)
{
- apr_memcache_stats_t *ret;
- apr_status_t rv;
- apr_memcache_conn_t *conn;
- apr_size_t written;
- struct iovec vec[2];
-
- rv = ms_find_conn(ms, &conn);
-
- if (rv != APR_SUCCESS) {
- return rv;
- }
-
- /* version\r\n */
- vec[0].iov_base = MC_STATS;
- vec[0].iov_len = MC_STATS_LEN;
-
- vec[1].iov_base = MC_EOL;
- vec[1].iov_len = MC_EOL_LEN;
-
- rv = apr_socket_sendv(conn->sock, vec, 2, &written);
-
- if (rv != APR_SUCCESS) {
- ms_bad_conn(ms, conn);
- return rv;
- }
-
- ret = apr_pcalloc(p, sizeof(apr_memcache_stats_t));
-
- do {
- rv = get_server_line(conn);
- if (rv != APR_SUCCESS) {
- ms_bad_conn(ms, conn);
- return rv;
- }
-
- if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
- rv = APR_SUCCESS;
- break;
- }
- else if (strncmp(MS_STAT, conn->buffer, MS_STAT_LEN) == 0) {
- update_stats(p, conn, ret);
- continue;
- }
- else {
- rv = APR_EGENERAL;
- break;
- }
-
- } while(1);
-
- ms_release_conn(ms, conn);
-
- if (stats) {
- *stats = ret;
- }
-
- return rv;
+ return APR_ENOTIMPL;
}