You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@trafficserver.apache.org by James Peach <jp...@apache.org> on 2013/11/20 18:50:51 UTC
Re: [1/4] refine cluster
This is a huge commit with
- no links back to the Jira ticket
- no source code comments
- no indication of the problem that it is solving or how it solves it
- no performance comparison (I assume this for performance)
- no documentation of the new configuration parameters
- no indication of compatibility (is it really OK to the 4.x branch?)
- no tests
Please, can we talk about the above issues?
On Nov 19, 2013, at 11:45 PM, weijin@apache.org wrote:
> Updated Branches:
> refs/heads/refine_cluster [created] 7ffc10a9c
>
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/cluster/P_ClusterInline.h
> ----------------------------------------------------------------------
> diff --git a/iocore/cluster/P_ClusterInline.h b/iocore/cluster/P_ClusterInline.h
> index c653956..1d26b3a 100644
> --- a/iocore/cluster/P_ClusterInline.h
> +++ b/iocore/cluster/P_ClusterInline.h
> @@ -36,25 +36,30 @@ inline Action *
> Cluster_lookup(Continuation * cont, CacheKey * key, CacheFragType frag_type, char *hostname, int host_len)
> {
> // Try to send remote, if not possible, handle locally
> - Action *retAct;
> - ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
> - if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
> - CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
> - cc->action = cont;
> - cc->mutex = cont->mutex;
> - retAct = CacheContinuation::do_remote_lookup(cont, key, cc, frag_type, hostname, host_len);
> - if (retAct) {
> - return retAct;
> - } else {
> - // not remote, do local lookup
> - CacheContinuation::cacheContAllocator_free(cc);
> - return (Action *) NULL;
> - }
> - } else {
> - Action a;
> - a = cont;
> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_LOOKUP_FAILED, 0);
> - }
> +// Action *retAct;
> +// ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
> +// if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
> +// CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
> +// cc->action = cont;
> +// cc->mutex = cont->mutex;
> +// retAct = CacheContinuation::do_remote_lookup(cont, key, cc, frag_type, hostname, host_len);
> +// if (retAct) {
> +// return retAct;
> +// } else {
> +// // not remote, do local lookup
> +// CacheContinuation::cacheContAllocator_free(cc);
> +// return (Action *) NULL;
> +// }
> +// } else {
> +// Action a;
> +// a = cont;
> +// return CacheContinuation::callback_failure(&a, CACHE_EVENT_LOOKUP_FAILED, 0);
> +// }
> + (void) cont;
> + (void) key;
> + (void) frag_type;
> + (void) hostname;
> + (void) host_len;
> return (Action *) NULL;
> }
>
> @@ -66,18 +71,24 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
> time_t pin_in_cache, CacheFragType frag_type, char *hostname, int host_len)
> {
> (void) params;
> - if (clusterProcessor.disable_remote_cluster_ops(owner_machine)) {
> - Action a;
> - a = cont;
> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
> + ink_assert(cont);
> + ClusterSession session;
> + if (cluster_create_session(&session, owner_machine, NULL, 0)) {
> + cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, NULL);
> + return ACTION_RESULT_DONE;
> }
> +
> int vers = CacheOpMsg_long::protoToVersion(owner_machine->msg_proto_major);
> + CacheOpArgs_General readArgs;
> + Ptr<IOBufferData> d;
> +
> int flen;
> int len = 0;
> int cur_len;
> int res = 0;
> - char *msg;
> + char *msg = 0;
> char *data;
> + Action *action = NULL;
>
> if (vers == CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) {
> if ((opcode == CACHE_OPEN_READ_LONG)
> @@ -87,20 +98,21 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>
> const char *url_hostname;
> int url_hlen;
> - INK_MD5 url_only_md5;
> + INK_MD5 url_md5;
>
> - Cache::generate_key(&url_only_md5, url, 0);
> + Cache::generate_key(&url_md5, url);
> url_hostname = url->host_get(&url_hlen);
>
> len += request->m_heap->marshal_length();
> - len += params->marshal_length();
> + len += sizeof(CacheLookupHttpConfig) + params->marshal_length();
> len += url_hlen;
>
> if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
> goto err_exit;
>
> // Perform data Marshal operation
> - msg = (char *) ALLOCA_DOUBLE(flen + len);
> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
> + msg = (char *) d->data();
> data = msg + flen;
>
> cur_len = len;
> @@ -110,6 +122,13 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
> }
> data += res;
> cur_len -= res;
> +
> + if (cur_len < (int) sizeof(CacheLookupHttpConfig))
> + goto err_exit;
> + memcpy(data, params, sizeof(CacheLookupHttpConfig));
> + data += sizeof(CacheLookupHttpConfig);
> + cur_len -= sizeof(CacheLookupHttpConfig);
> +
> if ((res = params->marshal(data, cur_len)) < 0)
> goto err_exit;
> data += res;
> @@ -117,37 +136,33 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
> memcpy(data, url_hostname, url_hlen);
>
> CacheOpArgs_General readArgs;
> - readArgs.url_md5 = &url_only_md5;
> + readArgs.url_md5 = &url_md5;
> readArgs.pin_in_cache = pin_in_cache;
> readArgs.frag_type = frag_type;
> - return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
> - opcode, (char *) msg, (flen + len), -1, buf);
> +
> + action = CacheContinuation::do_op(cont, session, (void *) &readArgs,
> + opcode, d, (flen + len), -1, buf);
> } else {
> // Build message if we have host data.
> + flen = op_to_sizeof_fixedlen_msg(opcode);
> + len = host_len;
>
> - if (host_len) {
> - // Determine length of data to Marshal
> - flen = op_to_sizeof_fixedlen_msg(opcode);
> - len = host_len;
> -
> - if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
> - goto err_exit;
> + if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
> + goto err_exit;
>
> - msg = (char *) ALLOCA_DOUBLE(flen + len);
> - data = msg + flen;
> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
> + msg = (char *) d->data();
> + data = msg + flen;
> + if (host_len)
> memcpy(data, hostname, host_len);
>
> - } else {
> - msg = 0;
> - flen = 0;
> - len = 0;
> - }
> - CacheOpArgs_General readArgs;
> readArgs.url_md5 = key;
> readArgs.frag_type = frag_type;
> - return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
> - opcode, (char *) msg, (flen + len), -1, buf);
> +
> + action = CacheContinuation::do_op(cont, session, (void *) &readArgs,
> + opcode, d, (flen + len), -1, buf);
> }
> + ink_assert(msg);
>
> } else {
> //////////////////////////////////////////////////////////////
> @@ -155,10 +170,12 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
> //////////////////////////////////////////////////////////////
> ink_release_assert(!"CacheOpMsg_long [read] bad msg version");
> }
> +
> + if (action)
> + return action;
> err_exit:
> - Action a;
> - a = cont;
> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
> + cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, NULL);
> + return ACTION_RESULT_DONE;
> }
>
> inline Action *
> @@ -171,10 +188,11 @@ Cluster_write(Continuation * cont, int expected_size,
> {
> (void) key;
> (void) request;
> - if (clusterProcessor.disable_remote_cluster_ops(m)) {
> - Action a;
> - a = cont;
> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
> + ClusterSession session;
> + ink_assert(cont);
> + if (cluster_create_session(&session, m, NULL, 0)) {
> + cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, NULL);
> + return ACTION_RESULT_DONE;
> }
> char *msg = 0;
> char *data = 0;
> @@ -182,24 +200,22 @@ Cluster_write(Continuation * cont, int expected_size,
> int len = 0;
> int flen = 0;
> int vers = CacheOpMsg_long::protoToVersion(m->msg_proto_major);
> + Ptr<IOBufferData> d;
>
> switch (opcode) {
> case CACHE_OPEN_WRITE:
> {
> // Build message if we have host data
> - if (host_len) {
> - // Determine length of data to Marshal
> - flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE);
> - len = host_len;
> -
> - if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
> - goto err_exit;
> -
> - msg = (char *) ALLOCA_DOUBLE(flen + len);
> - data = msg + flen;
> + len = host_len;
> + flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE);
> + if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
> + goto err_exit;
>
> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
> + msg = (char *) d->data();
> + data = msg + flen;
> + if (host_len)
> memcpy(data, hostname, host_len);
> - }
> break;
> }
> case CACHE_OPEN_WRITE_LONG:
> @@ -223,8 +239,9 @@ Cluster_write(Continuation * cont, int expected_size,
> if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
> goto err_exit;
>
> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
> + msg = (char *) d->data();
> // Perform data Marshal operation
> - msg = (char *) ALLOCA_DOUBLE(flen + len);
> data = msg + flen;
> int res = 0;
>
> @@ -257,7 +274,9 @@ Cluster_write(Continuation * cont, int expected_size,
> writeArgs.cfl_flags |= (old_info ? CFL_LOPENWRITE_HAVE_OLDINFO : 0);
> writeArgs.cfl_flags |= (allow_multiple_writes ? CFL_ALLOW_MULTIPLE_WRITES : 0);
>
> - return CacheContinuation::do_op(cont, m, (void *) &writeArgs, opcode, msg, flen + len, expected_size, buf);
> + Action *action = CacheContinuation::do_op(cont, session, (void *) &writeArgs, opcode, d, flen + len, expected_size, buf);
> + if (action)
> + return action;
> } else {
> //////////////////////////////////////////////////////////////
> // Create the specified down rev version of this message
> @@ -267,19 +286,21 @@ Cluster_write(Continuation * cont, int expected_size,
> }
>
> err_exit:
> - Action a;
> - a = cont;
> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
> + cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, NULL);
> + return ACTION_RESULT_DONE;
> }
>
> inline Action *
> Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey * to,
> CacheFragType type, char *hostname, int host_len)
> {
> - if (clusterProcessor.disable_remote_cluster_ops(m)) {
> - Action a;
> - a = cont;
> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
> + ClusterSession session;
> + Ptr<IOBufferData> d;
> + char *msg = NULL;
> +
> + if (cluster_create_session(&session, m, NULL, 0)) {
> + cont->handleEvent(CACHE_EVENT_LINK_FAILED, NULL);
> + return ACTION_RESULT_DONE;
> }
>
> int vers = CacheOpMsg_short_2::protoToVersion(m->msg_proto_major);
> @@ -293,7 +314,8 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
> if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
> goto err_exit;
>
> - char *msg = (char *) ALLOCA_DOUBLE(flen + len);
> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
> + msg = (char *) d->data();
> memcpy((msg + flen), hostname, host_len);
>
> // Setup args for remote link
> @@ -301,7 +323,9 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
> linkArgs.from = from;
> linkArgs.to = to;
> linkArgs.frag_type = type;
> - return CacheContinuation::do_op(cont, m, (void *) &linkArgs, CACHE_LINK, msg, (flen + len));
> + Action *action = CacheContinuation::do_op(cont, session, (void *) &linkArgs, CACHE_LINK, d, (flen + len));
> + if (action)
> + return action;
> } else {
> //////////////////////////////////////////////////////////////
> // Create the specified down rev version of this message
> @@ -311,18 +335,20 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
> }
>
> err_exit:
> - Action a;
> - a = cont;
> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
> + cont->handleEvent(CACHE_EVENT_LINK_FAILED, NULL);
> + return ACTION_RESULT_DONE;
> }
>
> inline Action *
> Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFragType type, char *hostname, int host_len)
> {
> - if (clusterProcessor.disable_remote_cluster_ops(m)) {
> - Action a;
> - a = cont;
> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
> + ClusterSession session;
> + Ptr<IOBufferData> d;
> + char *msg = NULL;
> +
> + if (cluster_create_session(&session, m, NULL, 0)) {
> + cont->handleEvent(CACHE_EVENT_DEREF_FAILED, NULL);
> + return ACTION_RESULT_DONE ;
> }
>
> int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
> @@ -336,14 +362,17 @@ Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFrag
> if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
> goto err_exit;
>
> - char *msg = (char *) ALLOCA_DOUBLE(flen + len);
> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
> + msg = (char *) d->data();
> memcpy((msg + flen), hostname, host_len);
>
> // Setup args for remote deref
> CacheOpArgs_Deref drefArgs;
> drefArgs.md5 = key;
> drefArgs.frag_type = type;
> - return CacheContinuation::do_op(cont, m, (void *) &drefArgs, CACHE_DEREF, msg, (flen + len));
> + Action *action = CacheContinuation::do_op(cont, session, (void *) &drefArgs, CACHE_DEREF, d, (flen + len));
> + if (action)
> + return action;
> } else {
> //////////////////////////////////////////////////////////////
> // Create the specified down rev version of this message
> @@ -353,19 +382,22 @@ Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFrag
> }
>
> err_exit:
> - Action a;
> - a = cont;
> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
> + cont->handleEvent(CACHE_EVENT_DEREF_FAILED, NULL);
> + return ACTION_RESULT_DONE ;
> }
>
> inline Action *
> Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
> bool rm_user_agents, bool rm_link, CacheFragType frag_type, char *hostname, int host_len)
> {
> - if (clusterProcessor.disable_remote_cluster_ops(m)) {
> - Action a;
> - a = cont;
> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
> + ClusterSession session;
> + Ptr<IOBufferData> d;
> + char *msg = NULL;
> +
> + if (cluster_create_session(&session, m, NULL, 0)) {
> + if (cont)
> + cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, NULL);
> + return ACTION_RESULT_DONE;
> }
>
> int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
> @@ -379,7 +411,8 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
> if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
> goto err_exit;
>
> - char *msg = (char *) ALLOCA_DOUBLE(flen + len);
> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
> + msg = (char *) d->data();
> memcpy((msg + flen), hostname, host_len);
>
> // Setup args for remote update
> @@ -388,7 +421,9 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
> updateArgs.cfl_flags |= (rm_user_agents ? CFL_REMOVE_USER_AGENTS : 0);
> updateArgs.cfl_flags |= (rm_link ? CFL_REMOVE_LINK : 0);
> updateArgs.frag_type = frag_type;
> - return CacheContinuation::do_op(cont, m, (void *) &updateArgs, CACHE_REMOVE, msg, (flen + len));
> + Action *action = CacheContinuation::do_op(cont, session, (void *) &updateArgs, CACHE_REMOVE, d, (flen + len));
> + if (action)
> + return action;
> } else {
> //////////////////////////////////////////////////////////////
> // Create the specified down rev version of this message
> @@ -398,9 +433,8 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
> }
>
> err_exit:
> - Action a;
> - a = cont;
> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
> + if (cont)
> + cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, NULL);
> + return ACTION_RESULT_DONE;
> }
> -
> #endif /* __CLUSTERINLINE_H__ */
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/eventsystem/I_Event.h
> ----------------------------------------------------------------------
> diff --git a/iocore/eventsystem/I_Event.h b/iocore/eventsystem/I_Event.h
> index 7a37ea0..2659131 100644
> --- a/iocore/eventsystem/I_Event.h
> +++ b/iocore/eventsystem/I_Event.h
> @@ -85,6 +85,7 @@
> #define BLOCK_CACHE_EVENT_EVENTS_START 4000
> #define UTILS_EVENT_EVENTS_START 5000
> #define CONGESTION_EVENT_EVENTS_START 5100
> +#define CLUSTER_MSG_START 6000
> #define INK_API_EVENT_EVENTS_START 60000
> #define SRV_EVENT_EVENTS_START 62000
> #define REMAP_EVENT_EVENTS_START 63000
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/eventsystem/P_IOBuffer.h
> ----------------------------------------------------------------------
> diff --git a/iocore/eventsystem/P_IOBuffer.h b/iocore/eventsystem/P_IOBuffer.h
> index 0842aff..261aa1f 100644
> --- a/iocore/eventsystem/P_IOBuffer.h
> +++ b/iocore/eventsystem/P_IOBuffer.h
> @@ -203,7 +203,7 @@ new_IOBufferData_internal(
> void *b, int64_t size, int64_t asize_index)
> {
> (void) size;
> - IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_ethread());
> + IOBufferData *d = ioDataAllocator.alloc();
> d->_size_index = asize_index;
> ink_assert(BUFFER_SIZE_INDEX_IS_CONSTANT(asize_index)
> || size <= d->block_size());
> @@ -263,7 +263,7 @@ new_IOBufferData_internal(
> #endif
> int64_t size_index, AllocType type)
> {
> - IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_ethread());
> + IOBufferData *d = ioDataAllocator.alloc();
> #ifdef TRACK_BUFFER_USER
> d->_location = loc;
> #endif
> @@ -336,7 +336,7 @@ TS_INLINE void
> IOBufferData::free()
> {
> dealloc();
> - THREAD_FREE(this, ioDataAllocator, this_ethread());
> + ioDataAllocator.free(this);
> }
>
> //////////////////////////////////////////////////////////////////
> @@ -352,7 +352,7 @@ new_IOBufferBlock_internal(
> #endif
> )
> {
> - IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_ethread());
> + IOBufferBlock *b = ioBlockAllocator.alloc();
> #ifdef TRACK_BUFFER_USER
> b->_location = location;
> #endif
> @@ -366,7 +366,7 @@ new_IOBufferBlock_internal(
> #endif
> IOBufferData * d, int64_t len, int64_t offset)
> {
> - IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_ethread());
> + IOBufferBlock *b = ioBlockAllocator.alloc();
> #ifdef TRACK_BUFFER_USER
> b->_location = location;
> #endif
> @@ -468,7 +468,7 @@ TS_INLINE void
> IOBufferBlock::free()
> {
> dealloc();
> - THREAD_FREE(this, ioBlockAllocator, this_ethread());
> + ioBlockAllocator.free(this);
> }
>
> TS_INLINE void
> @@ -777,7 +777,7 @@ TS_INLINE MIOBuffer * new_MIOBuffer_internal(
> #endif
> int64_t size_index)
> {
> - MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_ethread());
> + MIOBuffer *b = ioAllocator.alloc();
> #ifdef TRACK_BUFFER_USER
> b->_location = location;
> #endif
> @@ -790,7 +790,7 @@ free_MIOBuffer(MIOBuffer * mio)
> {
> mio->_writer = NULL;
> mio->dealloc_all_readers();
> - THREAD_FREE(mio, ioAllocator, this_ethread());
> + ioAllocator.free(mio);
> }
>
> TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
> @@ -799,7 +799,7 @@ TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
> #endif
> int64_t size_index)
> {
> - MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_ethread());
> + MIOBuffer *b = ioAllocator.alloc();
> b->size_index = size_index;
> #ifdef TRACK_BUFFER_USER
> b->_location = location;
> @@ -810,7 +810,7 @@ TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
> TS_INLINE void
> free_empty_MIOBuffer(MIOBuffer * mio)
> {
> - THREAD_FREE(mio, ioAllocator, this_ethread());
> + ioAllocator.free(mio);
> }
>
> TS_INLINE IOBufferReader *
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/mgmt/RecordsConfig.cc
> ----------------------------------------------------------------------
> diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
> index 4a73f19..7677471 100644
> --- a/mgmt/RecordsConfig.cc
> +++ b/mgmt/RecordsConfig.cc
> @@ -814,6 +814,24 @@ RecordElement RecordsConfig[] = {
> ,
> {RECT_CONFIG, "proxy.config.cluster.cluster_port", RECD_INT, "8086", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
> ,
> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_bps", RECD_INT, "804857600", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
> + ,
> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_bps", RECD_INT, "4194304000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
> + ,
> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_send_wait_time", RECD_INT, "1000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
> + ,
> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_send_wait_time", RECD_INT, "5000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
> + ,
> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_loop_interval", RECD_INT, "0", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
> + ,
> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_loop_interval", RECD_INT, "1000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
> + ,
> + {RECT_CONFIG, "proxy.config.cluster.max_sessions_per_machine", RECD_INT, "1000000", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1000-4000000]", RECA_NULL}
> + ,
> + {RECT_CONFIG, "proxy.config.cluster.session_locks_per_machine", RECD_INT, "10949", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1-100000]", RECA_NULL}
> + ,
> + {RECT_CONFIG, "proxy.config.cluster.read_buffer_size", RECD_INT, "2097152", RECU_RESTART_TS, RR_NULL, RECC_INT, "[65536-2097152]", RECA_NULL}
> + ,
> {RECT_CONFIG, "proxy.config.cluster.cluster_configuration", RECD_STRING, "cluster.config", RECU_NULL, RR_NULL, RECC_NULL, NULL, RECA_NULL}
> ,
> {RECT_CONFIG, "proxy.config.cluster.ethernet_interface", RECD_STRING, TS_BUILD_DEFAULT_LOOPBACK_IFACE, RECU_RESTART_TS, RR_REQUIRED, RECC_STR, "^[^[:space:]]*$", RECA_NULL}
>
Re: [1/4] refine cluster
Posted by James Peach <jp...@apache.org>.
On Nov 20, 2013, at 9:50 AM, James Peach <jp...@apache.org> wrote:
> This is a huge commit with
>
> - no links back to the Jira ticket
> - no source code comments
> - no indication of the problem that it is solving or how it solves it
> - no performance comparison (I assume this for performance)
> - no documentation of the new configuration parameters
> - no indication of compatibility (is it really OK to the 4.x branch?)
> - no tests
Oh, I didn't notice that this was on a branch. Thanks, that's excellent!
>
> Please, can we talk about the above issues?
>
> On Nov 19, 2013, at 11:45 PM, weijin@apache.org wrote:
>
>> Updated Branches:
>> refs/heads/refine_cluster [created] 7ffc10a9c
>>
>>
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/cluster/P_ClusterInline.h
>> ----------------------------------------------------------------------
>> diff --git a/iocore/cluster/P_ClusterInline.h b/iocore/cluster/P_ClusterInline.h
>> index c653956..1d26b3a 100644
>> --- a/iocore/cluster/P_ClusterInline.h
>> +++ b/iocore/cluster/P_ClusterInline.h
>> @@ -36,25 +36,30 @@ inline Action *
>> Cluster_lookup(Continuation * cont, CacheKey * key, CacheFragType frag_type, char *hostname, int host_len)
>> {
>> // Try to send remote, if not possible, handle locally
>> - Action *retAct;
>> - ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
>> - if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
>> - CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
>> - cc->action = cont;
>> - cc->mutex = cont->mutex;
>> - retAct = CacheContinuation::do_remote_lookup(cont, key, cc, frag_type, hostname, host_len);
>> - if (retAct) {
>> - return retAct;
>> - } else {
>> - // not remote, do local lookup
>> - CacheContinuation::cacheContAllocator_free(cc);
>> - return (Action *) NULL;
>> - }
>> - } else {
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_LOOKUP_FAILED, 0);
>> - }
>> +// Action *retAct;
>> +// ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
>> +// if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
>> +// CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
>> +// cc->action = cont;
>> +// cc->mutex = cont->mutex;
>> +// retAct = CacheContinuation::do_remote_lookup(cont, key, cc, frag_type, hostname, host_len);
>> +// if (retAct) {
>> +// return retAct;
>> +// } else {
>> +// // not remote, do local lookup
>> +// CacheContinuation::cacheContAllocator_free(cc);
>> +// return (Action *) NULL;
>> +// }
>> +// } else {
>> +// Action a;
>> +// a = cont;
>> +// return CacheContinuation::callback_failure(&a, CACHE_EVENT_LOOKUP_FAILED, 0);
>> +// }
>> + (void) cont;
>> + (void) key;
>> + (void) frag_type;
>> + (void) hostname;
>> + (void) host_len;
>> return (Action *) NULL;
>> }
>>
>> @@ -66,18 +71,24 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>> time_t pin_in_cache, CacheFragType frag_type, char *hostname, int host_len)
>> {
>> (void) params;
>> - if (clusterProcessor.disable_remote_cluster_ops(owner_machine)) {
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
>> + ink_assert(cont);
>> + ClusterSession session;
>> + if (cluster_create_session(&session, owner_machine, NULL, 0)) {
>> + cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, NULL);
>> + return ACTION_RESULT_DONE;
>> }
>> +
>> int vers = CacheOpMsg_long::protoToVersion(owner_machine->msg_proto_major);
>> + CacheOpArgs_General readArgs;
>> + Ptr<IOBufferData> d;
>> +
>> int flen;
>> int len = 0;
>> int cur_len;
>> int res = 0;
>> - char *msg;
>> + char *msg = 0;
>> char *data;
>> + Action *action = NULL;
>>
>> if (vers == CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) {
>> if ((opcode == CACHE_OPEN_READ_LONG)
>> @@ -87,20 +98,21 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>>
>> const char *url_hostname;
>> int url_hlen;
>> - INK_MD5 url_only_md5;
>> + INK_MD5 url_md5;
>>
>> - Cache::generate_key(&url_only_md5, url, 0);
>> + Cache::generate_key(&url_md5, url);
>> url_hostname = url->host_get(&url_hlen);
>>
>> len += request->m_heap->marshal_length();
>> - len += params->marshal_length();
>> + len += sizeof(CacheLookupHttpConfig) + params->marshal_length();
>> len += url_hlen;
>>
>> if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> goto err_exit;
>>
>> // Perform data Marshal operation
>> - msg = (char *) ALLOCA_DOUBLE(flen + len);
>> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> + msg = (char *) d->data();
>> data = msg + flen;
>>
>> cur_len = len;
>> @@ -110,6 +122,13 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>> }
>> data += res;
>> cur_len -= res;
>> +
>> + if (cur_len < (int) sizeof(CacheLookupHttpConfig))
>> + goto err_exit;
>> + memcpy(data, params, sizeof(CacheLookupHttpConfig));
>> + data += sizeof(CacheLookupHttpConfig);
>> + cur_len -= sizeof(CacheLookupHttpConfig);
>> +
>> if ((res = params->marshal(data, cur_len)) < 0)
>> goto err_exit;
>> data += res;
>> @@ -117,37 +136,33 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>> memcpy(data, url_hostname, url_hlen);
>>
>> CacheOpArgs_General readArgs;
>> - readArgs.url_md5 = &url_only_md5;
>> + readArgs.url_md5 = &url_md5;
>> readArgs.pin_in_cache = pin_in_cache;
>> readArgs.frag_type = frag_type;
>> - return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
>> - opcode, (char *) msg, (flen + len), -1, buf);
>> +
>> + action = CacheContinuation::do_op(cont, session, (void *) &readArgs,
>> + opcode, d, (flen + len), -1, buf);
>> } else {
>> // Build message if we have host data.
>> + flen = op_to_sizeof_fixedlen_msg(opcode);
>> + len = host_len;
>>
>> - if (host_len) {
>> - // Determine length of data to Marshal
>> - flen = op_to_sizeof_fixedlen_msg(opcode);
>> - len = host_len;
>> -
>> - if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> - goto err_exit;
>> + if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> + goto err_exit;
>>
>> - msg = (char *) ALLOCA_DOUBLE(flen + len);
>> - data = msg + flen;
>> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> + msg = (char *) d->data();
>> + data = msg + flen;
>> + if (host_len)
>> memcpy(data, hostname, host_len);
>>
>> - } else {
>> - msg = 0;
>> - flen = 0;
>> - len = 0;
>> - }
>> - CacheOpArgs_General readArgs;
>> readArgs.url_md5 = key;
>> readArgs.frag_type = frag_type;
>> - return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
>> - opcode, (char *) msg, (flen + len), -1, buf);
>> +
>> + action = CacheContinuation::do_op(cont, session, (void *) &readArgs,
>> + opcode, d, (flen + len), -1, buf);
>> }
>> + ink_assert(msg);
>>
>> } else {
>> //////////////////////////////////////////////////////////////
>> @@ -155,10 +170,12 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>> //////////////////////////////////////////////////////////////
>> ink_release_assert(!"CacheOpMsg_long [read] bad msg version");
>> }
>> +
>> + if (action)
>> + return action;
>> err_exit:
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
>> + cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, NULL);
>> + return ACTION_RESULT_DONE;
>> }
>>
>> inline Action *
>> @@ -171,10 +188,11 @@ Cluster_write(Continuation * cont, int expected_size,
>> {
>> (void) key;
>> (void) request;
>> - if (clusterProcessor.disable_remote_cluster_ops(m)) {
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
>> + ClusterSession session;
>> + ink_assert(cont);
>> + if (cluster_create_session(&session, m, NULL, 0)) {
>> + cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, NULL);
>> + return ACTION_RESULT_DONE;
>> }
>> char *msg = 0;
>> char *data = 0;
>> @@ -182,24 +200,22 @@ Cluster_write(Continuation * cont, int expected_size,
>> int len = 0;
>> int flen = 0;
>> int vers = CacheOpMsg_long::protoToVersion(m->msg_proto_major);
>> + Ptr<IOBufferData> d;
>>
>> switch (opcode) {
>> case CACHE_OPEN_WRITE:
>> {
>> // Build message if we have host data
>> - if (host_len) {
>> - // Determine length of data to Marshal
>> - flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE);
>> - len = host_len;
>> -
>> - if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> - goto err_exit;
>> -
>> - msg = (char *) ALLOCA_DOUBLE(flen + len);
>> - data = msg + flen;
>> + len = host_len;
>> + flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE);
>> + if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> + goto err_exit;
>>
>> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> + msg = (char *) d->data();
>> + data = msg + flen;
>> + if (host_len)
>> memcpy(data, hostname, host_len);
>> - }
>> break;
>> }
>> case CACHE_OPEN_WRITE_LONG:
>> @@ -223,8 +239,9 @@ Cluster_write(Continuation * cont, int expected_size,
>> if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> goto err_exit;
>>
>> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> + msg = (char *) d->data();
>> // Perform data Marshal operation
>> - msg = (char *) ALLOCA_DOUBLE(flen + len);
>> data = msg + flen;
>> int res = 0;
>>
>> @@ -257,7 +274,9 @@ Cluster_write(Continuation * cont, int expected_size,
>> writeArgs.cfl_flags |= (old_info ? CFL_LOPENWRITE_HAVE_OLDINFO : 0);
>> writeArgs.cfl_flags |= (allow_multiple_writes ? CFL_ALLOW_MULTIPLE_WRITES : 0);
>>
>> - return CacheContinuation::do_op(cont, m, (void *) &writeArgs, opcode, msg, flen + len, expected_size, buf);
>> + Action *action = CacheContinuation::do_op(cont, session, (void *) &writeArgs, opcode, d, flen + len, expected_size, buf);
>> + if (action)
>> + return action;
>> } else {
>> //////////////////////////////////////////////////////////////
>> // Create the specified down rev version of this message
>> @@ -267,19 +286,21 @@ Cluster_write(Continuation * cont, int expected_size,
>> }
>>
>> err_exit:
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
>> + cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, NULL);
>> + return ACTION_RESULT_DONE;
>> }
>>
>> inline Action *
>> Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey * to,
>> CacheFragType type, char *hostname, int host_len)
>> {
>> - if (clusterProcessor.disable_remote_cluster_ops(m)) {
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
>> + ClusterSession session;
>> + Ptr<IOBufferData> d;
>> + char *msg = NULL;
>> +
>> + if (cluster_create_session(&session, m, NULL, 0)) {
>> + cont->handleEvent(CACHE_EVENT_LINK_FAILED, NULL);
>> + return ACTION_RESULT_DONE;
>> }
>>
>> int vers = CacheOpMsg_short_2::protoToVersion(m->msg_proto_major);
>> @@ -293,7 +314,8 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
>> if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> goto err_exit;
>>
>> - char *msg = (char *) ALLOCA_DOUBLE(flen + len);
>> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> + msg = (char *) d->data();
>> memcpy((msg + flen), hostname, host_len);
>>
>> // Setup args for remote link
>> @@ -301,7 +323,9 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
>> linkArgs.from = from;
>> linkArgs.to = to;
>> linkArgs.frag_type = type;
>> - return CacheContinuation::do_op(cont, m, (void *) &linkArgs, CACHE_LINK, msg, (flen + len));
>> + Action *action = CacheContinuation::do_op(cont, session, (void *) &linkArgs, CACHE_LINK, d, (flen + len));
>> + if (action)
>> + return action;
>> } else {
>> //////////////////////////////////////////////////////////////
>> // Create the specified down rev version of this message
>> @@ -311,18 +335,20 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
>> }
>>
>> err_exit:
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
>> + cont->handleEvent(CACHE_EVENT_LINK_FAILED, NULL);
>> + return ACTION_RESULT_DONE;
>> }
>>
>> inline Action *
>> Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFragType type, char *hostname, int host_len)
>> {
>> - if (clusterProcessor.disable_remote_cluster_ops(m)) {
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
>> + ClusterSession session;
>> + Ptr<IOBufferData> d;
>> + char *msg = NULL;
>> +
>> + if (cluster_create_session(&session, m, NULL, 0)) {
>> + cont->handleEvent(CACHE_EVENT_DEREF_FAILED, NULL);
>> + return ACTION_RESULT_DONE ;
>> }
>>
>> int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
>> @@ -336,14 +362,17 @@ Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFrag
>> if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> goto err_exit;
>>
>> - char *msg = (char *) ALLOCA_DOUBLE(flen + len);
>> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> + msg = (char *) d->data();
>> memcpy((msg + flen), hostname, host_len);
>>
>> // Setup args for remote deref
>> CacheOpArgs_Deref drefArgs;
>> drefArgs.md5 = key;
>> drefArgs.frag_type = type;
>> - return CacheContinuation::do_op(cont, m, (void *) &drefArgs, CACHE_DEREF, msg, (flen + len));
>> + Action *action = CacheContinuation::do_op(cont, session, (void *) &drefArgs, CACHE_DEREF, d, (flen + len));
>> + if (action)
>> + return action;
>> } else {
>> //////////////////////////////////////////////////////////////
>> // Create the specified down rev version of this message
>> @@ -353,19 +382,22 @@ Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFrag
>> }
>>
>> err_exit:
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
>> + cont->handleEvent(CACHE_EVENT_DEREF_FAILED, NULL);
>> + return ACTION_RESULT_DONE ;
>> }
>>
>> inline Action *
>> Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>> bool rm_user_agents, bool rm_link, CacheFragType frag_type, char *hostname, int host_len)
>> {
>> - if (clusterProcessor.disable_remote_cluster_ops(m)) {
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
>> + ClusterSession session;
>> + Ptr<IOBufferData> d;
>> + char *msg = NULL;
>> +
>> + if (cluster_create_session(&session, m, NULL, 0)) {
>> + if (cont)
>> + cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, NULL);
>> + return ACTION_RESULT_DONE;
>> }
>>
>> int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
>> @@ -379,7 +411,8 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>> if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> goto err_exit;
>>
>> - char *msg = (char *) ALLOCA_DOUBLE(flen + len);
>> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> + msg = (char *) d->data();
>> memcpy((msg + flen), hostname, host_len);
>>
>> // Setup args for remote update
>> @@ -388,7 +421,9 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>> updateArgs.cfl_flags |= (rm_user_agents ? CFL_REMOVE_USER_AGENTS : 0);
>> updateArgs.cfl_flags |= (rm_link ? CFL_REMOVE_LINK : 0);
>> updateArgs.frag_type = frag_type;
>> - return CacheContinuation::do_op(cont, m, (void *) &updateArgs, CACHE_REMOVE, msg, (flen + len));
>> + Action *action = CacheContinuation::do_op(cont, session, (void *) &updateArgs, CACHE_REMOVE, d, (flen + len));
>> + if (action)
>> + return action;
>> } else {
>> //////////////////////////////////////////////////////////////
>> // Create the specified down rev version of this message
>> @@ -398,9 +433,8 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>> }
>>
>> err_exit:
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
>> + if (cont)
>> + cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, NULL);
>> + return ACTION_RESULT_DONE;
>> }
>> -
>> #endif /* __CLUSTERINLINE_H__ */
>>
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/eventsystem/I_Event.h
>> ----------------------------------------------------------------------
>> diff --git a/iocore/eventsystem/I_Event.h b/iocore/eventsystem/I_Event.h
>> index 7a37ea0..2659131 100644
>> --- a/iocore/eventsystem/I_Event.h
>> +++ b/iocore/eventsystem/I_Event.h
>> @@ -85,6 +85,7 @@
>> #define BLOCK_CACHE_EVENT_EVENTS_START 4000
>> #define UTILS_EVENT_EVENTS_START 5000
>> #define CONGESTION_EVENT_EVENTS_START 5100
>> +#define CLUSTER_MSG_START 6000
>> #define INK_API_EVENT_EVENTS_START 60000
>> #define SRV_EVENT_EVENTS_START 62000
>> #define REMAP_EVENT_EVENTS_START 63000
>>
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/eventsystem/P_IOBuffer.h
>> ----------------------------------------------------------------------
>> diff --git a/iocore/eventsystem/P_IOBuffer.h b/iocore/eventsystem/P_IOBuffer.h
>> index 0842aff..261aa1f 100644
>> --- a/iocore/eventsystem/P_IOBuffer.h
>> +++ b/iocore/eventsystem/P_IOBuffer.h
>> @@ -203,7 +203,7 @@ new_IOBufferData_internal(
>> void *b, int64_t size, int64_t asize_index)
>> {
>> (void) size;
>> - IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_ethread());
>> + IOBufferData *d = ioDataAllocator.alloc();
>> d->_size_index = asize_index;
>> ink_assert(BUFFER_SIZE_INDEX_IS_CONSTANT(asize_index)
>> || size <= d->block_size());
>> @@ -263,7 +263,7 @@ new_IOBufferData_internal(
>> #endif
>> int64_t size_index, AllocType type)
>> {
>> - IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_ethread());
>> + IOBufferData *d = ioDataAllocator.alloc();
>> #ifdef TRACK_BUFFER_USER
>> d->_location = loc;
>> #endif
>> @@ -336,7 +336,7 @@ TS_INLINE void
>> IOBufferData::free()
>> {
>> dealloc();
>> - THREAD_FREE(this, ioDataAllocator, this_ethread());
>> + ioDataAllocator.free(this);
>> }
>>
>> //////////////////////////////////////////////////////////////////
>> @@ -352,7 +352,7 @@ new_IOBufferBlock_internal(
>> #endif
>> )
>> {
>> - IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_ethread());
>> + IOBufferBlock *b = ioBlockAllocator.alloc();
>> #ifdef TRACK_BUFFER_USER
>> b->_location = location;
>> #endif
>> @@ -366,7 +366,7 @@ new_IOBufferBlock_internal(
>> #endif
>> IOBufferData * d, int64_t len, int64_t offset)
>> {
>> - IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_ethread());
>> + IOBufferBlock *b = ioBlockAllocator.alloc();
>> #ifdef TRACK_BUFFER_USER
>> b->_location = location;
>> #endif
>> @@ -468,7 +468,7 @@ TS_INLINE void
>> IOBufferBlock::free()
>> {
>> dealloc();
>> - THREAD_FREE(this, ioBlockAllocator, this_ethread());
>> + ioBlockAllocator.free(this);
>> }
>>
>> TS_INLINE void
>> @@ -777,7 +777,7 @@ TS_INLINE MIOBuffer * new_MIOBuffer_internal(
>> #endif
>> int64_t size_index)
>> {
>> - MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_ethread());
>> + MIOBuffer *b = ioAllocator.alloc();
>> #ifdef TRACK_BUFFER_USER
>> b->_location = location;
>> #endif
>> @@ -790,7 +790,7 @@ free_MIOBuffer(MIOBuffer * mio)
>> {
>> mio->_writer = NULL;
>> mio->dealloc_all_readers();
>> - THREAD_FREE(mio, ioAllocator, this_ethread());
>> + ioAllocator.free(mio);
>> }
>>
>> TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
>> @@ -799,7 +799,7 @@ TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
>> #endif
>> int64_t size_index)
>> {
>> - MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_ethread());
>> + MIOBuffer *b = ioAllocator.alloc();
>> b->size_index = size_index;
>> #ifdef TRACK_BUFFER_USER
>> b->_location = location;
>> @@ -810,7 +810,7 @@ TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
>> TS_INLINE void
>> free_empty_MIOBuffer(MIOBuffer * mio)
>> {
>> - THREAD_FREE(mio, ioAllocator, this_ethread());
>> + ioAllocator.free(mio);
>> }
>>
>> TS_INLINE IOBufferReader *
>>
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/mgmt/RecordsConfig.cc
>> ----------------------------------------------------------------------
>> diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
>> index 4a73f19..7677471 100644
>> --- a/mgmt/RecordsConfig.cc
>> +++ b/mgmt/RecordsConfig.cc
>> @@ -814,6 +814,24 @@ RecordElement RecordsConfig[] = {
>> ,
>> {RECT_CONFIG, "proxy.config.cluster.cluster_port", RECD_INT, "8086", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> ,
>> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_bps", RECD_INT, "804857600", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> + ,
>> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_bps", RECD_INT, "4194304000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> + ,
>> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_send_wait_time", RECD_INT, "1000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> + ,
>> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_send_wait_time", RECD_INT, "5000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> + ,
>> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_loop_interval", RECD_INT, "0", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> + ,
>> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_loop_interval", RECD_INT, "1000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> + ,
>> + {RECT_CONFIG, "proxy.config.cluster.max_sessions_per_machine", RECD_INT, "1000000", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1000-4000000]", RECA_NULL}
>> + ,
>> + {RECT_CONFIG, "proxy.config.cluster.session_locks_per_machine", RECD_INT, "10949", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1-100000]", RECA_NULL}
>> + ,
>> + {RECT_CONFIG, "proxy.config.cluster.read_buffer_size", RECD_INT, "2097152", RECU_RESTART_TS, RR_NULL, RECC_INT, "[65536-2097152]", RECA_NULL}
>> + ,
>> {RECT_CONFIG, "proxy.config.cluster.cluster_configuration", RECD_STRING, "cluster.config", RECU_NULL, RR_NULL, RECC_NULL, NULL, RECA_NULL}
>> ,
>> {RECT_CONFIG, "proxy.config.cluster.ethernet_interface", RECD_STRING, TS_BUILD_DEFAULT_LOOPBACK_IFACE, RECU_RESTART_TS, RR_REQUIRED, RECC_STR, "^[^[:space:]]*$", RECA_NULL}
>>
>
Re: [1/4] refine cluster
Posted by James Peach <jp...@apache.org>.
On Nov 20, 2013, at 9:50 AM, James Peach <jp...@apache.org> wrote:
> This is a huge commit with
>
> - no links back to the Jira ticket
> - no source code comments
> - no indication of the problem that it is solving or how it solves it
> - no performance comparison (I assume this for performance)
> - no documentation of the new configuration parameters
> - no indication of compatibility (is it really OK to the 4.x branch?)
> - no tests
Oh, I didn't notice that this was on a branch. Thanks, that's excellent!
>
> Please, can we talk about the above issues?
>
> On Nov 19, 2013, at 11:45 PM, weijin@apache.org wrote:
>
>> Updated Branches:
>> refs/heads/refine_cluster [created] 7ffc10a9c
>>
>>
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/cluster/P_ClusterInline.h
>> ----------------------------------------------------------------------
>> diff --git a/iocore/cluster/P_ClusterInline.h b/iocore/cluster/P_ClusterInline.h
>> index c653956..1d26b3a 100644
>> --- a/iocore/cluster/P_ClusterInline.h
>> +++ b/iocore/cluster/P_ClusterInline.h
>> @@ -36,25 +36,30 @@ inline Action *
>> Cluster_lookup(Continuation * cont, CacheKey * key, CacheFragType frag_type, char *hostname, int host_len)
>> {
>> // Try to send remote, if not possible, handle locally
>> - Action *retAct;
>> - ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
>> - if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
>> - CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
>> - cc->action = cont;
>> - cc->mutex = cont->mutex;
>> - retAct = CacheContinuation::do_remote_lookup(cont, key, cc, frag_type, hostname, host_len);
>> - if (retAct) {
>> - return retAct;
>> - } else {
>> - // not remote, do local lookup
>> - CacheContinuation::cacheContAllocator_free(cc);
>> - return (Action *) NULL;
>> - }
>> - } else {
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_LOOKUP_FAILED, 0);
>> - }
>> +// Action *retAct;
>> +// ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
>> +// if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
>> +// CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
>> +// cc->action = cont;
>> +// cc->mutex = cont->mutex;
>> +// retAct = CacheContinuation::do_remote_lookup(cont, key, cc, frag_type, hostname, host_len);
>> +// if (retAct) {
>> +// return retAct;
>> +// } else {
>> +// // not remote, do local lookup
>> +// CacheContinuation::cacheContAllocator_free(cc);
>> +// return (Action *) NULL;
>> +// }
>> +// } else {
>> +// Action a;
>> +// a = cont;
>> +// return CacheContinuation::callback_failure(&a, CACHE_EVENT_LOOKUP_FAILED, 0);
>> +// }
>> + (void) cont;
>> + (void) key;
>> + (void) frag_type;
>> + (void) hostname;
>> + (void) host_len;
>> return (Action *) NULL;
>> }
>>
>> @@ -66,18 +71,24 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>> time_t pin_in_cache, CacheFragType frag_type, char *hostname, int host_len)
>> {
>> (void) params;
>> - if (clusterProcessor.disable_remote_cluster_ops(owner_machine)) {
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
>> + ink_assert(cont);
>> + ClusterSession session;
>> + if (cluster_create_session(&session, owner_machine, NULL, 0)) {
>> + cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, NULL);
>> + return ACTION_RESULT_DONE;
>> }
>> +
>> int vers = CacheOpMsg_long::protoToVersion(owner_machine->msg_proto_major);
>> + CacheOpArgs_General readArgs;
>> + Ptr<IOBufferData> d;
>> +
>> int flen;
>> int len = 0;
>> int cur_len;
>> int res = 0;
>> - char *msg;
>> + char *msg = 0;
>> char *data;
>> + Action *action = NULL;
>>
>> if (vers == CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) {
>> if ((opcode == CACHE_OPEN_READ_LONG)
>> @@ -87,20 +98,21 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>>
>> const char *url_hostname;
>> int url_hlen;
>> - INK_MD5 url_only_md5;
>> + INK_MD5 url_md5;
>>
>> - Cache::generate_key(&url_only_md5, url, 0);
>> + Cache::generate_key(&url_md5, url);
>> url_hostname = url->host_get(&url_hlen);
>>
>> len += request->m_heap->marshal_length();
>> - len += params->marshal_length();
>> + len += sizeof(CacheLookupHttpConfig) + params->marshal_length();
>> len += url_hlen;
>>
>> if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> goto err_exit;
>>
>> // Perform data Marshal operation
>> - msg = (char *) ALLOCA_DOUBLE(flen + len);
>> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> + msg = (char *) d->data();
>> data = msg + flen;
>>
>> cur_len = len;
>> @@ -110,6 +122,13 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>> }
>> data += res;
>> cur_len -= res;
>> +
>> + if (cur_len < (int) sizeof(CacheLookupHttpConfig))
>> + goto err_exit;
>> + memcpy(data, params, sizeof(CacheLookupHttpConfig));
>> + data += sizeof(CacheLookupHttpConfig);
>> + cur_len -= sizeof(CacheLookupHttpConfig);
>> +
>> if ((res = params->marshal(data, cur_len)) < 0)
>> goto err_exit;
>> data += res;
>> @@ -117,37 +136,33 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>> memcpy(data, url_hostname, url_hlen);
>>
>> CacheOpArgs_General readArgs;
>> - readArgs.url_md5 = &url_only_md5;
>> + readArgs.url_md5 = &url_md5;
>> readArgs.pin_in_cache = pin_in_cache;
>> readArgs.frag_type = frag_type;
>> - return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
>> - opcode, (char *) msg, (flen + len), -1, buf);
>> +
>> + action = CacheContinuation::do_op(cont, session, (void *) &readArgs,
>> + opcode, d, (flen + len), -1, buf);
>> } else {
>> // Build message if we have host data.
>> + flen = op_to_sizeof_fixedlen_msg(opcode);
>> + len = host_len;
>>
>> - if (host_len) {
>> - // Determine length of data to Marshal
>> - flen = op_to_sizeof_fixedlen_msg(opcode);
>> - len = host_len;
>> -
>> - if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> - goto err_exit;
>> + if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> + goto err_exit;
>>
>> - msg = (char *) ALLOCA_DOUBLE(flen + len);
>> - data = msg + flen;
>> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> + msg = (char *) d->data();
>> + data = msg + flen;
>> + if (host_len)
>> memcpy(data, hostname, host_len);
>>
>> - } else {
>> - msg = 0;
>> - flen = 0;
>> - len = 0;
>> - }
>> - CacheOpArgs_General readArgs;
>> readArgs.url_md5 = key;
>> readArgs.frag_type = frag_type;
>> - return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
>> - opcode, (char *) msg, (flen + len), -1, buf);
>> +
>> + action = CacheContinuation::do_op(cont, session, (void *) &readArgs,
>> + opcode, d, (flen + len), -1, buf);
>> }
>> + ink_assert(msg);
>>
>> } else {
>> //////////////////////////////////////////////////////////////
>> @@ -155,10 +170,12 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>> //////////////////////////////////////////////////////////////
>> ink_release_assert(!"CacheOpMsg_long [read] bad msg version");
>> }
>> +
>> + if (action)
>> + return action;
>> err_exit:
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
>> + cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, NULL);
>> + return ACTION_RESULT_DONE;
>> }
>>
>> inline Action *
>> @@ -171,10 +188,11 @@ Cluster_write(Continuation * cont, int expected_size,
>> {
>> (void) key;
>> (void) request;
>> - if (clusterProcessor.disable_remote_cluster_ops(m)) {
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
>> + ClusterSession session;
>> + ink_assert(cont);
>> + if (cluster_create_session(&session, m, NULL, 0)) {
>> + cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, NULL);
>> + return ACTION_RESULT_DONE;
>> }
>> char *msg = 0;
>> char *data = 0;
>> @@ -182,24 +200,22 @@ Cluster_write(Continuation * cont, int expected_size,
>> int len = 0;
>> int flen = 0;
>> int vers = CacheOpMsg_long::protoToVersion(m->msg_proto_major);
>> + Ptr<IOBufferData> d;
>>
>> switch (opcode) {
>> case CACHE_OPEN_WRITE:
>> {
>> // Build message if we have host data
>> - if (host_len) {
>> - // Determine length of data to Marshal
>> - flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE);
>> - len = host_len;
>> -
>> - if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> - goto err_exit;
>> -
>> - msg = (char *) ALLOCA_DOUBLE(flen + len);
>> - data = msg + flen;
>> + len = host_len;
>> + flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE);
>> + if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> + goto err_exit;
>>
>> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> + msg = (char *) d->data();
>> + data = msg + flen;
>> + if (host_len)
>> memcpy(data, hostname, host_len);
>> - }
>> break;
>> }
>> case CACHE_OPEN_WRITE_LONG:
>> @@ -223,8 +239,9 @@ Cluster_write(Continuation * cont, int expected_size,
>> if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> goto err_exit;
>>
>> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> + msg = (char *) d->data();
>> // Perform data Marshal operation
>> - msg = (char *) ALLOCA_DOUBLE(flen + len);
>> data = msg + flen;
>> int res = 0;
>>
>> @@ -257,7 +274,9 @@ Cluster_write(Continuation * cont, int expected_size,
>> writeArgs.cfl_flags |= (old_info ? CFL_LOPENWRITE_HAVE_OLDINFO : 0);
>> writeArgs.cfl_flags |= (allow_multiple_writes ? CFL_ALLOW_MULTIPLE_WRITES : 0);
>>
>> - return CacheContinuation::do_op(cont, m, (void *) &writeArgs, opcode, msg, flen + len, expected_size, buf);
>> + Action *action = CacheContinuation::do_op(cont, session, (void *) &writeArgs, opcode, d, flen + len, expected_size, buf);
>> + if (action)
>> + return action;
>> } else {
>> //////////////////////////////////////////////////////////////
>> // Create the specified down rev version of this message
>> @@ -267,19 +286,21 @@ Cluster_write(Continuation * cont, int expected_size,
>> }
>>
>> err_exit:
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
>> + cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, NULL);
>> + return ACTION_RESULT_DONE;
>> }
>>
>> inline Action *
>> Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey * to,
>> CacheFragType type, char *hostname, int host_len)
>> {
>> - if (clusterProcessor.disable_remote_cluster_ops(m)) {
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
>> + ClusterSession session;
>> + Ptr<IOBufferData> d;
>> + char *msg = NULL;
>> +
>> + if (cluster_create_session(&session, m, NULL, 0)) {
>> + cont->handleEvent(CACHE_EVENT_LINK_FAILED, NULL);
>> + return ACTION_RESULT_DONE;
>> }
>>
>> int vers = CacheOpMsg_short_2::protoToVersion(m->msg_proto_major);
>> @@ -293,7 +314,8 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
>> if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> goto err_exit;
>>
>> - char *msg = (char *) ALLOCA_DOUBLE(flen + len);
>> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> + msg = (char *) d->data();
>> memcpy((msg + flen), hostname, host_len);
>>
>> // Setup args for remote link
>> @@ -301,7 +323,9 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
>> linkArgs.from = from;
>> linkArgs.to = to;
>> linkArgs.frag_type = type;
>> - return CacheContinuation::do_op(cont, m, (void *) &linkArgs, CACHE_LINK, msg, (flen + len));
>> + Action *action = CacheContinuation::do_op(cont, session, (void *) &linkArgs, CACHE_LINK, d, (flen + len));
>> + if (action)
>> + return action;
>> } else {
>> //////////////////////////////////////////////////////////////
>> // Create the specified down rev version of this message
>> @@ -311,18 +335,20 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
>> }
>>
>> err_exit:
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
>> + cont->handleEvent(CACHE_EVENT_LINK_FAILED, NULL);
>> + return ACTION_RESULT_DONE;
>> }
>>
>> inline Action *
>> Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFragType type, char *hostname, int host_len)
>> {
>> - if (clusterProcessor.disable_remote_cluster_ops(m)) {
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
>> + ClusterSession session;
>> + Ptr<IOBufferData> d;
>> + char *msg = NULL;
>> +
>> + if (cluster_create_session(&session, m, NULL, 0)) {
>> + cont->handleEvent(CACHE_EVENT_DEREF_FAILED, NULL);
>> + return ACTION_RESULT_DONE ;
>> }
>>
>> int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
>> @@ -336,14 +362,17 @@ Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFrag
>> if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> goto err_exit;
>>
>> - char *msg = (char *) ALLOCA_DOUBLE(flen + len);
>> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> + msg = (char *) d->data();
>> memcpy((msg + flen), hostname, host_len);
>>
>> // Setup args for remote deref
>> CacheOpArgs_Deref drefArgs;
>> drefArgs.md5 = key;
>> drefArgs.frag_type = type;
>> - return CacheContinuation::do_op(cont, m, (void *) &drefArgs, CACHE_DEREF, msg, (flen + len));
>> + Action *action = CacheContinuation::do_op(cont, session, (void *) &drefArgs, CACHE_DEREF, d, (flen + len));
>> + if (action)
>> + return action;
>> } else {
>> //////////////////////////////////////////////////////////////
>> // Create the specified down rev version of this message
>> @@ -353,19 +382,22 @@ Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFrag
>> }
>>
>> err_exit:
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
>> + cont->handleEvent(CACHE_EVENT_DEREF_FAILED, NULL);
>> + return ACTION_RESULT_DONE ;
>> }
>>
>> inline Action *
>> Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>> bool rm_user_agents, bool rm_link, CacheFragType frag_type, char *hostname, int host_len)
>> {
>> - if (clusterProcessor.disable_remote_cluster_ops(m)) {
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
>> + ClusterSession session;
>> + Ptr<IOBufferData> d;
>> + char *msg = NULL;
>> +
>> + if (cluster_create_session(&session, m, NULL, 0)) {
>> + if (cont)
>> + cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, NULL);
>> + return ACTION_RESULT_DONE;
>> }
>>
>> int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
>> @@ -379,7 +411,8 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>> if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>> goto err_exit;
>>
>> - char *msg = (char *) ALLOCA_DOUBLE(flen + len);
>> + d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> + msg = (char *) d->data();
>> memcpy((msg + flen), hostname, host_len);
>>
>> // Setup args for remote update
>> @@ -388,7 +421,9 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>> updateArgs.cfl_flags |= (rm_user_agents ? CFL_REMOVE_USER_AGENTS : 0);
>> updateArgs.cfl_flags |= (rm_link ? CFL_REMOVE_LINK : 0);
>> updateArgs.frag_type = frag_type;
>> - return CacheContinuation::do_op(cont, m, (void *) &updateArgs, CACHE_REMOVE, msg, (flen + len));
>> + Action *action = CacheContinuation::do_op(cont, session, (void *) &updateArgs, CACHE_REMOVE, d, (flen + len));
>> + if (action)
>> + return action;
>> } else {
>> //////////////////////////////////////////////////////////////
>> // Create the specified down rev version of this message
>> @@ -398,9 +433,8 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>> }
>>
>> err_exit:
>> - Action a;
>> - a = cont;
>> - return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
>> + if (cont)
>> + cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, NULL);
>> + return ACTION_RESULT_DONE;
>> }
>> -
>> #endif /* __CLUSTERINLINE_H__ */
>>
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/eventsystem/I_Event.h
>> ----------------------------------------------------------------------
>> diff --git a/iocore/eventsystem/I_Event.h b/iocore/eventsystem/I_Event.h
>> index 7a37ea0..2659131 100644
>> --- a/iocore/eventsystem/I_Event.h
>> +++ b/iocore/eventsystem/I_Event.h
>> @@ -85,6 +85,7 @@
>> #define BLOCK_CACHE_EVENT_EVENTS_START 4000
>> #define UTILS_EVENT_EVENTS_START 5000
>> #define CONGESTION_EVENT_EVENTS_START 5100
>> +#define CLUSTER_MSG_START 6000
>> #define INK_API_EVENT_EVENTS_START 60000
>> #define SRV_EVENT_EVENTS_START 62000
>> #define REMAP_EVENT_EVENTS_START 63000
>>
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/eventsystem/P_IOBuffer.h
>> ----------------------------------------------------------------------
>> diff --git a/iocore/eventsystem/P_IOBuffer.h b/iocore/eventsystem/P_IOBuffer.h
>> index 0842aff..261aa1f 100644
>> --- a/iocore/eventsystem/P_IOBuffer.h
>> +++ b/iocore/eventsystem/P_IOBuffer.h
>> @@ -203,7 +203,7 @@ new_IOBufferData_internal(
>> void *b, int64_t size, int64_t asize_index)
>> {
>> (void) size;
>> - IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_ethread());
>> + IOBufferData *d = ioDataAllocator.alloc();
>> d->_size_index = asize_index;
>> ink_assert(BUFFER_SIZE_INDEX_IS_CONSTANT(asize_index)
>> || size <= d->block_size());
>> @@ -263,7 +263,7 @@ new_IOBufferData_internal(
>> #endif
>> int64_t size_index, AllocType type)
>> {
>> - IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_ethread());
>> + IOBufferData *d = ioDataAllocator.alloc();
>> #ifdef TRACK_BUFFER_USER
>> d->_location = loc;
>> #endif
>> @@ -336,7 +336,7 @@ TS_INLINE void
>> IOBufferData::free()
>> {
>> dealloc();
>> - THREAD_FREE(this, ioDataAllocator, this_ethread());
>> + ioDataAllocator.free(this);
>> }
>>
>> //////////////////////////////////////////////////////////////////
>> @@ -352,7 +352,7 @@ new_IOBufferBlock_internal(
>> #endif
>> )
>> {
>> - IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_ethread());
>> + IOBufferBlock *b = ioBlockAllocator.alloc();
>> #ifdef TRACK_BUFFER_USER
>> b->_location = location;
>> #endif
>> @@ -366,7 +366,7 @@ new_IOBufferBlock_internal(
>> #endif
>> IOBufferData * d, int64_t len, int64_t offset)
>> {
>> - IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_ethread());
>> + IOBufferBlock *b = ioBlockAllocator.alloc();
>> #ifdef TRACK_BUFFER_USER
>> b->_location = location;
>> #endif
>> @@ -468,7 +468,7 @@ TS_INLINE void
>> IOBufferBlock::free()
>> {
>> dealloc();
>> - THREAD_FREE(this, ioBlockAllocator, this_ethread());
>> + ioBlockAllocator.free(this);
>> }
>>
>> TS_INLINE void
>> @@ -777,7 +777,7 @@ TS_INLINE MIOBuffer * new_MIOBuffer_internal(
>> #endif
>> int64_t size_index)
>> {
>> - MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_ethread());
>> + MIOBuffer *b = ioAllocator.alloc();
>> #ifdef TRACK_BUFFER_USER
>> b->_location = location;
>> #endif
>> @@ -790,7 +790,7 @@ free_MIOBuffer(MIOBuffer * mio)
>> {
>> mio->_writer = NULL;
>> mio->dealloc_all_readers();
>> - THREAD_FREE(mio, ioAllocator, this_ethread());
>> + ioAllocator.free(mio);
>> }
>>
>> TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
>> @@ -799,7 +799,7 @@ TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
>> #endif
>> int64_t size_index)
>> {
>> - MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_ethread());
>> + MIOBuffer *b = ioAllocator.alloc();
>> b->size_index = size_index;
>> #ifdef TRACK_BUFFER_USER
>> b->_location = location;
>> @@ -810,7 +810,7 @@ TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
>> TS_INLINE void
>> free_empty_MIOBuffer(MIOBuffer * mio)
>> {
>> - THREAD_FREE(mio, ioAllocator, this_ethread());
>> + ioAllocator.free(mio);
>> }
>>
>> TS_INLINE IOBufferReader *
>>
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/mgmt/RecordsConfig.cc
>> ----------------------------------------------------------------------
>> diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
>> index 4a73f19..7677471 100644
>> --- a/mgmt/RecordsConfig.cc
>> +++ b/mgmt/RecordsConfig.cc
>> @@ -814,6 +814,24 @@ RecordElement RecordsConfig[] = {
>> ,
>> {RECT_CONFIG, "proxy.config.cluster.cluster_port", RECD_INT, "8086", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> ,
>> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_bps", RECD_INT, "804857600", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> + ,
>> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_bps", RECD_INT, "4194304000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> + ,
>> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_send_wait_time", RECD_INT, "1000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> + ,
>> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_send_wait_time", RECD_INT, "5000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> + ,
>> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_loop_interval", RECD_INT, "0", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> + ,
>> + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_loop_interval", RECD_INT, "1000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> + ,
>> + {RECT_CONFIG, "proxy.config.cluster.max_sessions_per_machine", RECD_INT, "1000000", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1000-4000000]", RECA_NULL}
>> + ,
>> + {RECT_CONFIG, "proxy.config.cluster.session_locks_per_machine", RECD_INT, "10949", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1-100000]", RECA_NULL}
>> + ,
>> + {RECT_CONFIG, "proxy.config.cluster.read_buffer_size", RECD_INT, "2097152", RECU_RESTART_TS, RR_NULL, RECC_INT, "[65536-2097152]", RECA_NULL}
>> + ,
>> {RECT_CONFIG, "proxy.config.cluster.cluster_configuration", RECD_STRING, "cluster.config", RECU_NULL, RR_NULL, RECC_NULL, NULL, RECA_NULL}
>> ,
>> {RECT_CONFIG, "proxy.config.cluster.ethernet_interface", RECD_STRING, TS_BUILD_DEFAULT_LOOPBACK_IFACE, RECU_RESTART_TS, RR_REQUIRED, RECC_STR, "^[^[:space:]]*$", RECA_NULL}
>>
>