You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@trafficserver.apache.org by James Peach <jp...@apache.org> on 2013/11/20 18:50:51 UTC

Re: [1/4] refine cluster

This is a huge commit with

	- no links back to the Jira ticket
	- no source code comments
	- no indication of the problem that it is solving or how it solves it
	- no performance comparison (I assume this for performance)
	- no documentation of the new configuration parameters
	- no indication of compatibility (is it really OK to the 4.x branch?)
	- no tests

Please, can we talk about the above issues?

On Nov 19, 2013, at 11:45 PM, weijin@apache.org wrote:

> Updated Branches:
>  refs/heads/refine_cluster [created] 7ffc10a9c
> 
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/cluster/P_ClusterInline.h
> ----------------------------------------------------------------------
> diff --git a/iocore/cluster/P_ClusterInline.h b/iocore/cluster/P_ClusterInline.h
> index c653956..1d26b3a 100644
> --- a/iocore/cluster/P_ClusterInline.h
> +++ b/iocore/cluster/P_ClusterInline.h
> @@ -36,25 +36,30 @@ inline Action *
> Cluster_lookup(Continuation * cont, CacheKey * key, CacheFragType frag_type, char *hostname, int host_len)
> {
>   // Try to send remote, if not possible, handle locally
> -  Action *retAct;
> -  ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
> -  if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
> -    CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
> -    cc->action = cont;
> -    cc->mutex = cont->mutex;
> -    retAct = CacheContinuation::do_remote_lookup(cont, key, cc, frag_type, hostname, host_len);
> -    if (retAct) {
> -      return retAct;
> -    } else {
> -      // not remote, do local lookup
> -      CacheContinuation::cacheContAllocator_free(cc);
> -      return (Action *) NULL;
> -    }
> -  } else {
> -    Action a;
> -    a = cont;
> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_LOOKUP_FAILED, 0);
> -  }
> +//  Action *retAct;
> +//  ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
> +//  if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
> +//    CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
> +//    cc->action = cont;
> +//    cc->mutex = cont->mutex;
> +//    retAct = CacheContinuation::do_remote_lookup(cont, key, cc, frag_type, hostname, host_len);
> +//    if (retAct) {
> +//      return retAct;
> +//    } else {
> +//      // not remote, do local lookup
> +//      CacheContinuation::cacheContAllocator_free(cc);
> +//      return (Action *) NULL;
> +//    }
> +//  } else {
> +//    Action a;
> +//    a = cont;
> +//    return CacheContinuation::callback_failure(&a, CACHE_EVENT_LOOKUP_FAILED, 0);
> +//  }
> +  (void) cont;
> +  (void) key;
> +  (void) frag_type;
> +  (void) hostname;
> +  (void) host_len;
>   return (Action *) NULL;
> }
> 
> @@ -66,18 +71,24 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>              time_t pin_in_cache, CacheFragType frag_type, char *hostname, int host_len)
> {
>   (void) params;
> -  if (clusterProcessor.disable_remote_cluster_ops(owner_machine)) {
> -    Action a;
> -    a = cont;
> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
> +  ink_assert(cont);
> +  ClusterSession session;
> +  if (cluster_create_session(&session, owner_machine, NULL, 0)) {
> +    cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, NULL);
> +    return ACTION_RESULT_DONE;
>   }
> +
>   int vers = CacheOpMsg_long::protoToVersion(owner_machine->msg_proto_major);
> +  CacheOpArgs_General readArgs;
> +  Ptr<IOBufferData> d;
> +
>   int flen;
>   int len = 0;
>   int cur_len;
>   int res = 0;
> -  char *msg;
> +  char *msg = 0;
>   char *data;
> +  Action *action = NULL;
> 
>   if (vers == CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) {
>     if ((opcode == CACHE_OPEN_READ_LONG)
> @@ -87,20 +98,21 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
> 
>       const char *url_hostname;
>       int url_hlen;
> -      INK_MD5 url_only_md5;
> +      INK_MD5 url_md5;
> 
> -      Cache::generate_key(&url_only_md5, url, 0);
> +      Cache::generate_key(&url_md5, url);
>       url_hostname = url->host_get(&url_hlen);
> 
>       len += request->m_heap->marshal_length();
> -      len += params->marshal_length();
> +      len += sizeof(CacheLookupHttpConfig) + params->marshal_length();
>       len += url_hlen;
> 
>       if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)       // Bound marshalled data
>         goto err_exit;
> 
>       // Perform data Marshal operation
> -      msg = (char *) ALLOCA_DOUBLE(flen + len);
> +      d = new_IOBufferData(iobuffer_size_to_index(flen + len));
> +      msg = (char *) d->data();
>       data = msg + flen;
> 
>       cur_len = len;
> @@ -110,6 +122,13 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>       }
>       data += res;
>       cur_len -= res;
> +
> +      if (cur_len < (int) sizeof(CacheLookupHttpConfig))
> +        goto err_exit;
> +      memcpy(data, params, sizeof(CacheLookupHttpConfig));
> +      data += sizeof(CacheLookupHttpConfig);
> +      cur_len -= sizeof(CacheLookupHttpConfig);
> +
>       if ((res = params->marshal(data, cur_len)) < 0)
>         goto err_exit;
>       data += res;
> @@ -117,37 +136,33 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>       memcpy(data, url_hostname, url_hlen);
> 
>       CacheOpArgs_General readArgs;
> -      readArgs.url_md5 = &url_only_md5;
> +      readArgs.url_md5 = &url_md5;
>       readArgs.pin_in_cache = pin_in_cache;
>       readArgs.frag_type = frag_type;
> -      return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
> -                                      opcode, (char *) msg, (flen + len), -1, buf);
> +
> +      action = CacheContinuation::do_op(cont, session, (void *) &readArgs,
> +                                            opcode, d, (flen + len), -1, buf);
>     } else {
>       // Build message if we have host data.
> +      flen = op_to_sizeof_fixedlen_msg(opcode);
> +      len = host_len;
> 
> -      if (host_len) {
> -        // Determine length of data to Marshal
> -        flen = op_to_sizeof_fixedlen_msg(opcode);
> -        len = host_len;
> -
> -        if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)     // Bound marshalled data
> -          goto err_exit;
> +      if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)     // Bound marshalled data
> +        goto err_exit;
> 
> -        msg = (char *) ALLOCA_DOUBLE(flen + len);
> -        data = msg + flen;
> +      d = new_IOBufferData(iobuffer_size_to_index(flen + len));
> +      msg = (char *) d->data();
> +      data = msg + flen;
> +      if (host_len)
>         memcpy(data, hostname, host_len);
> 
> -      } else {
> -        msg = 0;
> -        flen = 0;
> -        len = 0;
> -      }
> -      CacheOpArgs_General readArgs;
>       readArgs.url_md5 = key;
>       readArgs.frag_type = frag_type;
> -      return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
> -                                      opcode, (char *) msg, (flen + len), -1, buf);
> +
> +      action = CacheContinuation::do_op(cont, session, (void *) &readArgs,
> +                                            opcode, d, (flen + len), -1, buf);
>     }
> +    ink_assert(msg);
> 
>   } else {
>     //////////////////////////////////////////////////////////////
> @@ -155,10 +170,12 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>     //////////////////////////////////////////////////////////////
>     ink_release_assert(!"CacheOpMsg_long [read] bad msg version");
>   }
> +
> +  if (action)
> +    return action;
> err_exit:
> -  Action a;
> -  a = cont;
> -  return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
> +  cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, NULL);
> +  return ACTION_RESULT_DONE;
> }
> 
> inline Action *
> @@ -171,10 +188,11 @@ Cluster_write(Continuation * cont, int expected_size,
> {
>   (void) key;
>   (void) request;
> -  if (clusterProcessor.disable_remote_cluster_ops(m)) {
> -    Action a;
> -    a = cont;
> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
> +  ClusterSession session;
> +  ink_assert(cont);
> +  if (cluster_create_session(&session, m, NULL, 0)) {
> +     cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, NULL);
> +     return ACTION_RESULT_DONE;
>   }
>   char *msg = 0;
>   char *data = 0;
> @@ -182,24 +200,22 @@ Cluster_write(Continuation * cont, int expected_size,
>   int len = 0;
>   int flen = 0;
>   int vers = CacheOpMsg_long::protoToVersion(m->msg_proto_major);
> +  Ptr<IOBufferData> d;
> 
>   switch (opcode) {
>   case CACHE_OPEN_WRITE:
>     {
>       // Build message if we have host data
> -      if (host_len) {
> -        // Determine length of data to Marshal
> -        flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE);
> -        len = host_len;
> -
> -        if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)     // Bound marshalled data
> -          goto err_exit;
> -
> -        msg = (char *) ALLOCA_DOUBLE(flen + len);
> -        data = msg + flen;
> +      len = host_len;
> +      flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE);
> +      if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)     // Bound marshalled data
> +        goto err_exit;
> 
> +      d = new_IOBufferData(iobuffer_size_to_index(flen + len));
> +      msg = (char *) d->data();
> +      data = msg + flen;
> +      if (host_len)
>         memcpy(data, hostname, host_len);
> -      }
>       break;
>     }
>   case CACHE_OPEN_WRITE_LONG:
> @@ -223,8 +239,9 @@ Cluster_write(Continuation * cont, int expected_size,
>       if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)       // Bound marshalled data
>         goto err_exit;
> 
> +      d = new_IOBufferData(iobuffer_size_to_index(flen + len));
> +      msg = (char *) d->data();
>       // Perform data Marshal operation
> -      msg = (char *) ALLOCA_DOUBLE(flen + len);
>       data = msg + flen;
>       int res = 0;
> 
> @@ -257,7 +274,9 @@ Cluster_write(Continuation * cont, int expected_size,
>     writeArgs.cfl_flags |= (old_info ? CFL_LOPENWRITE_HAVE_OLDINFO : 0);
>     writeArgs.cfl_flags |= (allow_multiple_writes ? CFL_ALLOW_MULTIPLE_WRITES : 0);
> 
> -    return CacheContinuation::do_op(cont, m, (void *) &writeArgs, opcode, msg, flen + len, expected_size, buf);
> +    Action *action = CacheContinuation::do_op(cont, session, (void *) &writeArgs, opcode, d, flen + len, expected_size, buf);
> +    if (action)
> +      return action;
>   } else {
>     //////////////////////////////////////////////////////////////
>     // Create the specified down rev version of this message
> @@ -267,19 +286,21 @@ Cluster_write(Continuation * cont, int expected_size,
>   }
> 
> err_exit:
> -  Action a;
> -  a = cont;
> -  return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
> +  cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, NULL);
> +  return ACTION_RESULT_DONE;
> }
> 
> inline Action *
> Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey * to,
>              CacheFragType type, char *hostname, int host_len)
> {
> -  if (clusterProcessor.disable_remote_cluster_ops(m)) {
> -    Action a;
> -    a = cont;
> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
> +  ClusterSession session;
> +  Ptr<IOBufferData> d;
> +  char *msg = NULL;
> +
> +  if (cluster_create_session(&session, m, NULL, 0)) {
> +    cont->handleEvent(CACHE_EVENT_LINK_FAILED, NULL);
> +    return ACTION_RESULT_DONE;
>   }
> 
>   int vers = CacheOpMsg_short_2::protoToVersion(m->msg_proto_major);
> @@ -293,7 +314,8 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
>     if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>       goto err_exit;
> 
> -    char *msg = (char *) ALLOCA_DOUBLE(flen + len);
> +    d = new_IOBufferData(iobuffer_size_to_index(flen + len));
> +    msg = (char *) d->data();
>     memcpy((msg + flen), hostname, host_len);
> 
>     // Setup args for remote link
> @@ -301,7 +323,9 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
>     linkArgs.from = from;
>     linkArgs.to = to;
>     linkArgs.frag_type = type;
> -    return CacheContinuation::do_op(cont, m, (void *) &linkArgs, CACHE_LINK, msg, (flen + len));
> +    Action *action = CacheContinuation::do_op(cont, session, (void *) &linkArgs, CACHE_LINK, d, (flen + len));
> +    if (action)
> +      return action;
>   } else {
>     //////////////////////////////////////////////////////////////
>     // Create the specified down rev version of this message
> @@ -311,18 +335,20 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
>   }
> 
> err_exit:
> -  Action a;
> -  a = cont;
> -  return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
> +  cont->handleEvent(CACHE_EVENT_LINK_FAILED, NULL);
> +  return ACTION_RESULT_DONE;
> }
> 
> inline Action *
> Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFragType type, char *hostname, int host_len)
> {
> -  if (clusterProcessor.disable_remote_cluster_ops(m)) {
> -    Action a;
> -    a = cont;
> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
> +  ClusterSession session;
> +  Ptr<IOBufferData> d;
> +  char *msg = NULL;
> +
> +  if (cluster_create_session(&session, m, NULL, 0)) {
> +    cont->handleEvent(CACHE_EVENT_DEREF_FAILED, NULL);
> +    return ACTION_RESULT_DONE ;
>   }
> 
>   int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
> @@ -336,14 +362,17 @@ Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFrag
>     if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>       goto err_exit;
> 
> -    char *msg = (char *) ALLOCA_DOUBLE(flen + len);
> +    d = new_IOBufferData(iobuffer_size_to_index(flen + len));
> +    msg = (char *) d->data();
>     memcpy((msg + flen), hostname, host_len);
> 
>     // Setup args for remote deref
>     CacheOpArgs_Deref drefArgs;
>     drefArgs.md5 = key;
>     drefArgs.frag_type = type;
> -    return CacheContinuation::do_op(cont, m, (void *) &drefArgs, CACHE_DEREF, msg, (flen + len));
> +    Action *action = CacheContinuation::do_op(cont, session, (void *) &drefArgs, CACHE_DEREF, d, (flen + len));
> +    if (action)
> +      return action;
>   } else {
>     //////////////////////////////////////////////////////////////
>     // Create the specified down rev version of this message
> @@ -353,19 +382,22 @@ Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFrag
>   }
> 
> err_exit:
> -  Action a;
> -  a = cont;
> -  return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
> +  cont->handleEvent(CACHE_EVENT_DEREF_FAILED, NULL);
> +  return ACTION_RESULT_DONE ;
> }
> 
> inline Action *
> Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>                bool rm_user_agents, bool rm_link, CacheFragType frag_type, char *hostname, int host_len)
> {
> -  if (clusterProcessor.disable_remote_cluster_ops(m)) {
> -    Action a;
> -    a = cont;
> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
> +  ClusterSession session;
> +  Ptr<IOBufferData> d;
> +  char *msg = NULL;
> +
> +  if (cluster_create_session(&session, m, NULL, 0)) {
> +    if (cont)
> +      cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, NULL);
> +    return ACTION_RESULT_DONE;
>   }
> 
>   int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
> @@ -379,7 +411,8 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>     if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>       goto err_exit;
> 
> -    char *msg = (char *) ALLOCA_DOUBLE(flen + len);
> +    d = new_IOBufferData(iobuffer_size_to_index(flen + len));
> +    msg = (char *) d->data();
>     memcpy((msg + flen), hostname, host_len);
> 
>     // Setup args for remote update
> @@ -388,7 +421,9 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>     updateArgs.cfl_flags |= (rm_user_agents ? CFL_REMOVE_USER_AGENTS : 0);
>     updateArgs.cfl_flags |= (rm_link ? CFL_REMOVE_LINK : 0);
>     updateArgs.frag_type = frag_type;
> -    return CacheContinuation::do_op(cont, m, (void *) &updateArgs, CACHE_REMOVE, msg, (flen + len));
> +    Action *action = CacheContinuation::do_op(cont, session, (void *) &updateArgs, CACHE_REMOVE, d, (flen + len));
> +    if (action)
> +      return action;
>   } else {
>     //////////////////////////////////////////////////////////////
>     // Create the specified down rev version of this message
> @@ -398,9 +433,8 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>   }
> 
> err_exit:
> -  Action a;
> -  a = cont;
> -  return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
> +  if (cont)
> +    cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, NULL);
> +  return ACTION_RESULT_DONE;
> }
> -
> #endif /* __CLUSTERINLINE_H__ */
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/eventsystem/I_Event.h
> ----------------------------------------------------------------------
> diff --git a/iocore/eventsystem/I_Event.h b/iocore/eventsystem/I_Event.h
> index 7a37ea0..2659131 100644
> --- a/iocore/eventsystem/I_Event.h
> +++ b/iocore/eventsystem/I_Event.h
> @@ -85,6 +85,7 @@
> #define BLOCK_CACHE_EVENT_EVENTS_START            4000
> #define UTILS_EVENT_EVENTS_START                  5000
> #define CONGESTION_EVENT_EVENTS_START             5100
> +#define CLUSTER_MSG_START                         6000
> #define INK_API_EVENT_EVENTS_START                60000
> #define SRV_EVENT_EVENTS_START	                  62000
> #define REMAP_EVENT_EVENTS_START                  63000
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/eventsystem/P_IOBuffer.h
> ----------------------------------------------------------------------
> diff --git a/iocore/eventsystem/P_IOBuffer.h b/iocore/eventsystem/P_IOBuffer.h
> index 0842aff..261aa1f 100644
> --- a/iocore/eventsystem/P_IOBuffer.h
> +++ b/iocore/eventsystem/P_IOBuffer.h
> @@ -203,7 +203,7 @@ new_IOBufferData_internal(
>                            void *b, int64_t size, int64_t asize_index)
> {
>   (void) size;
> -  IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_ethread());
> +  IOBufferData *d = ioDataAllocator.alloc();
>   d->_size_index = asize_index;
>   ink_assert(BUFFER_SIZE_INDEX_IS_CONSTANT(asize_index)
>              || size <= d->block_size());
> @@ -263,7 +263,7 @@ new_IOBufferData_internal(
> #endif
>                            int64_t size_index, AllocType type)
> {
> -  IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_ethread());
> +  IOBufferData *d = ioDataAllocator.alloc();
> #ifdef TRACK_BUFFER_USER
>   d->_location = loc;
> #endif
> @@ -336,7 +336,7 @@ TS_INLINE void
> IOBufferData::free()
> {
>   dealloc();
> -  THREAD_FREE(this, ioDataAllocator, this_ethread());
> +  ioDataAllocator.free(this);
> }
> 
> //////////////////////////////////////////////////////////////////
> @@ -352,7 +352,7 @@ new_IOBufferBlock_internal(
> #endif
>   )
> {
> -  IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_ethread());
> +  IOBufferBlock *b = ioBlockAllocator.alloc();
> #ifdef TRACK_BUFFER_USER
>   b->_location = location;
> #endif
> @@ -366,7 +366,7 @@ new_IOBufferBlock_internal(
> #endif
>                             IOBufferData * d, int64_t len, int64_t offset)
> {
> -  IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_ethread());
> +  IOBufferBlock *b = ioBlockAllocator.alloc();
> #ifdef TRACK_BUFFER_USER
>   b->_location = location;
> #endif
> @@ -468,7 +468,7 @@ TS_INLINE void
> IOBufferBlock::free()
> {
>   dealloc();
> -  THREAD_FREE(this, ioBlockAllocator, this_ethread());
> +  ioBlockAllocator.free(this);
> }
> 
> TS_INLINE void
> @@ -777,7 +777,7 @@ TS_INLINE MIOBuffer * new_MIOBuffer_internal(
> #endif
>                                                int64_t size_index)
> {
> -  MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_ethread());
> +  MIOBuffer *b = ioAllocator.alloc();
> #ifdef TRACK_BUFFER_USER
>   b->_location = location;
> #endif
> @@ -790,7 +790,7 @@ free_MIOBuffer(MIOBuffer * mio)
> {
>   mio->_writer = NULL;
>   mio->dealloc_all_readers();
> -  THREAD_FREE(mio, ioAllocator, this_ethread());
> +  ioAllocator.free(mio);
> }
> 
> TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
> @@ -799,7 +799,7 @@ TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
> #endif
>                                                      int64_t size_index)
> {
> -  MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_ethread());
> +  MIOBuffer *b = ioAllocator.alloc();
>   b->size_index = size_index;
> #ifdef TRACK_BUFFER_USER
>   b->_location = location;
> @@ -810,7 +810,7 @@ TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
> TS_INLINE void
> free_empty_MIOBuffer(MIOBuffer * mio)
> {
> -  THREAD_FREE(mio, ioAllocator, this_ethread());
> +  ioAllocator.free(mio);
> }
> 
> TS_INLINE IOBufferReader *
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/mgmt/RecordsConfig.cc
> ----------------------------------------------------------------------
> diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
> index 4a73f19..7677471 100644
> --- a/mgmt/RecordsConfig.cc
> +++ b/mgmt/RecordsConfig.cc
> @@ -814,6 +814,24 @@ RecordElement RecordsConfig[] = {
>   ,
>   {RECT_CONFIG, "proxy.config.cluster.cluster_port", RECD_INT, "8086", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>   ,
> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_bps", RECD_INT, "804857600", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
> +  ,
> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_bps", RECD_INT, "4194304000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
> +  ,
> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_send_wait_time", RECD_INT, "1000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
> +  ,
> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_send_wait_time", RECD_INT, "5000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
> +  ,
> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_loop_interval", RECD_INT, "0", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
> +  ,
> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_loop_interval", RECD_INT, "1000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
> +  ,
> +  {RECT_CONFIG, "proxy.config.cluster.max_sessions_per_machine", RECD_INT, "1000000", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1000-4000000]", RECA_NULL}
> +  ,
> +  {RECT_CONFIG, "proxy.config.cluster.session_locks_per_machine", RECD_INT, "10949", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1-100000]", RECA_NULL}
> +  ,
> +  {RECT_CONFIG, "proxy.config.cluster.read_buffer_size", RECD_INT, "2097152", RECU_RESTART_TS, RR_NULL, RECC_INT, "[65536-2097152]", RECA_NULL}
> +  ,
>   {RECT_CONFIG, "proxy.config.cluster.cluster_configuration", RECD_STRING, "cluster.config", RECU_NULL, RR_NULL, RECC_NULL, NULL, RECA_NULL}
>   ,
>   {RECT_CONFIG, "proxy.config.cluster.ethernet_interface", RECD_STRING, TS_BUILD_DEFAULT_LOOPBACK_IFACE, RECU_RESTART_TS, RR_REQUIRED, RECC_STR, "^[^[:space:]]*$", RECA_NULL}
> 


Re: [1/4] refine cluster

Posted by James Peach <jp...@apache.org>.
On Nov 20, 2013, at 9:50 AM, James Peach <jp...@apache.org> wrote:

> This is a huge commit with
> 
> 	- no links back to the Jira ticket
> 	- no source code comments
> 	- no indication of the problem that it is solving or how it solves it
> 	- no performance comparison (I assume this for performance)
> 	- no documentation of the new configuration parameters
> 	- no indication of compatibility (is it really OK to the 4.x branch?)
> 	- no tests

Oh, I didn't notice that this was on a branch. Thanks, that's excellent!

> 
> Please, can we talk about the above issues?
> 
> On Nov 19, 2013, at 11:45 PM, weijin@apache.org wrote:
> 
>> Updated Branches:
>> refs/heads/refine_cluster [created] 7ffc10a9c
>> 
>> 
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/cluster/P_ClusterInline.h
>> ----------------------------------------------------------------------
>> diff --git a/iocore/cluster/P_ClusterInline.h b/iocore/cluster/P_ClusterInline.h
>> index c653956..1d26b3a 100644
>> --- a/iocore/cluster/P_ClusterInline.h
>> +++ b/iocore/cluster/P_ClusterInline.h
>> @@ -36,25 +36,30 @@ inline Action *
>> Cluster_lookup(Continuation * cont, CacheKey * key, CacheFragType frag_type, char *hostname, int host_len)
>> {
>>  // Try to send remote, if not possible, handle locally
>> -  Action *retAct;
>> -  ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
>> -  if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
>> -    CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
>> -    cc->action = cont;
>> -    cc->mutex = cont->mutex;
>> -    retAct = CacheContinuation::do_remote_lookup(cont, key, cc, frag_type, hostname, host_len);
>> -    if (retAct) {
>> -      return retAct;
>> -    } else {
>> -      // not remote, do local lookup
>> -      CacheContinuation::cacheContAllocator_free(cc);
>> -      return (Action *) NULL;
>> -    }
>> -  } else {
>> -    Action a;
>> -    a = cont;
>> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_LOOKUP_FAILED, 0);
>> -  }
>> +//  Action *retAct;
>> +//  ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
>> +//  if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
>> +//    CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
>> +//    cc->action = cont;
>> +//    cc->mutex = cont->mutex;
>> +//    retAct = CacheContinuation::do_remote_lookup(cont, key, cc, frag_type, hostname, host_len);
>> +//    if (retAct) {
>> +//      return retAct;
>> +//    } else {
>> +//      // not remote, do local lookup
>> +//      CacheContinuation::cacheContAllocator_free(cc);
>> +//      return (Action *) NULL;
>> +//    }
>> +//  } else {
>> +//    Action a;
>> +//    a = cont;
>> +//    return CacheContinuation::callback_failure(&a, CACHE_EVENT_LOOKUP_FAILED, 0);
>> +//  }
>> +  (void) cont;
>> +  (void) key;
>> +  (void) frag_type;
>> +  (void) hostname;
>> +  (void) host_len;
>>  return (Action *) NULL;
>> }
>> 
>> @@ -66,18 +71,24 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>>             time_t pin_in_cache, CacheFragType frag_type, char *hostname, int host_len)
>> {
>>  (void) params;
>> -  if (clusterProcessor.disable_remote_cluster_ops(owner_machine)) {
>> -    Action a;
>> -    a = cont;
>> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
>> +  ink_assert(cont);
>> +  ClusterSession session;
>> +  if (cluster_create_session(&session, owner_machine, NULL, 0)) {
>> +    cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, NULL);
>> +    return ACTION_RESULT_DONE;
>>  }
>> +
>>  int vers = CacheOpMsg_long::protoToVersion(owner_machine->msg_proto_major);
>> +  CacheOpArgs_General readArgs;
>> +  Ptr<IOBufferData> d;
>> +
>>  int flen;
>>  int len = 0;
>>  int cur_len;
>>  int res = 0;
>> -  char *msg;
>> +  char *msg = 0;
>>  char *data;
>> +  Action *action = NULL;
>> 
>>  if (vers == CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) {
>>    if ((opcode == CACHE_OPEN_READ_LONG)
>> @@ -87,20 +98,21 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>> 
>>      const char *url_hostname;
>>      int url_hlen;
>> -      INK_MD5 url_only_md5;
>> +      INK_MD5 url_md5;
>> 
>> -      Cache::generate_key(&url_only_md5, url, 0);
>> +      Cache::generate_key(&url_md5, url);
>>      url_hostname = url->host_get(&url_hlen);
>> 
>>      len += request->m_heap->marshal_length();
>> -      len += params->marshal_length();
>> +      len += sizeof(CacheLookupHttpConfig) + params->marshal_length();
>>      len += url_hlen;
>> 
>>      if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)       // Bound marshalled data
>>        goto err_exit;
>> 
>>      // Perform data Marshal operation
>> -      msg = (char *) ALLOCA_DOUBLE(flen + len);
>> +      d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> +      msg = (char *) d->data();
>>      data = msg + flen;
>> 
>>      cur_len = len;
>> @@ -110,6 +122,13 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>>      }
>>      data += res;
>>      cur_len -= res;
>> +
>> +      if (cur_len < (int) sizeof(CacheLookupHttpConfig))
>> +        goto err_exit;
>> +      memcpy(data, params, sizeof(CacheLookupHttpConfig));
>> +      data += sizeof(CacheLookupHttpConfig);
>> +      cur_len -= sizeof(CacheLookupHttpConfig);
>> +
>>      if ((res = params->marshal(data, cur_len)) < 0)
>>        goto err_exit;
>>      data += res;
>> @@ -117,37 +136,33 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>>      memcpy(data, url_hostname, url_hlen);
>> 
>>      CacheOpArgs_General readArgs;
>> -      readArgs.url_md5 = &url_only_md5;
>> +      readArgs.url_md5 = &url_md5;
>>      readArgs.pin_in_cache = pin_in_cache;
>>      readArgs.frag_type = frag_type;
>> -      return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
>> -                                      opcode, (char *) msg, (flen + len), -1, buf);
>> +
>> +      action = CacheContinuation::do_op(cont, session, (void *) &readArgs,
>> +                                            opcode, d, (flen + len), -1, buf);
>>    } else {
>>      // Build message if we have host data.
>> +      flen = op_to_sizeof_fixedlen_msg(opcode);
>> +      len = host_len;
>> 
>> -      if (host_len) {
>> -        // Determine length of data to Marshal
>> -        flen = op_to_sizeof_fixedlen_msg(opcode);
>> -        len = host_len;
>> -
>> -        if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)     // Bound marshalled data
>> -          goto err_exit;
>> +      if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)     // Bound marshalled data
>> +        goto err_exit;
>> 
>> -        msg = (char *) ALLOCA_DOUBLE(flen + len);
>> -        data = msg + flen;
>> +      d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> +      msg = (char *) d->data();
>> +      data = msg + flen;
>> +      if (host_len)
>>        memcpy(data, hostname, host_len);
>> 
>> -      } else {
>> -        msg = 0;
>> -        flen = 0;
>> -        len = 0;
>> -      }
>> -      CacheOpArgs_General readArgs;
>>      readArgs.url_md5 = key;
>>      readArgs.frag_type = frag_type;
>> -      return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
>> -                                      opcode, (char *) msg, (flen + len), -1, buf);
>> +
>> +      action = CacheContinuation::do_op(cont, session, (void *) &readArgs,
>> +                                            opcode, d, (flen + len), -1, buf);
>>    }
>> +    ink_assert(msg);
>> 
>>  } else {
>>    //////////////////////////////////////////////////////////////
>> @@ -155,10 +170,12 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>>    //////////////////////////////////////////////////////////////
>>    ink_release_assert(!"CacheOpMsg_long [read] bad msg version");
>>  }
>> +
>> +  if (action)
>> +    return action;
>> err_exit:
>> -  Action a;
>> -  a = cont;
>> -  return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
>> +  cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, NULL);
>> +  return ACTION_RESULT_DONE;
>> }
>> 
>> inline Action *
>> @@ -171,10 +188,11 @@ Cluster_write(Continuation * cont, int expected_size,
>> {
>>  (void) key;
>>  (void) request;
>> -  if (clusterProcessor.disable_remote_cluster_ops(m)) {
>> -    Action a;
>> -    a = cont;
>> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
>> +  ClusterSession session;
>> +  ink_assert(cont);
>> +  if (cluster_create_session(&session, m, NULL, 0)) {
>> +     cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, NULL);
>> +     return ACTION_RESULT_DONE;
>>  }
>>  char *msg = 0;
>>  char *data = 0;
>> @@ -182,24 +200,22 @@ Cluster_write(Continuation * cont, int expected_size,
>>  int len = 0;
>>  int flen = 0;
>>  int vers = CacheOpMsg_long::protoToVersion(m->msg_proto_major);
>> +  Ptr<IOBufferData> d;
>> 
>>  switch (opcode) {
>>  case CACHE_OPEN_WRITE:
>>    {
>>      // Build message if we have host data
>> -      if (host_len) {
>> -        // Determine length of data to Marshal
>> -        flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE);
>> -        len = host_len;
>> -
>> -        if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)     // Bound marshalled data
>> -          goto err_exit;
>> -
>> -        msg = (char *) ALLOCA_DOUBLE(flen + len);
>> -        data = msg + flen;
>> +      len = host_len;
>> +      flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE);
>> +      if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)     // Bound marshalled data
>> +        goto err_exit;
>> 
>> +      d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> +      msg = (char *) d->data();
>> +      data = msg + flen;
>> +      if (host_len)
>>        memcpy(data, hostname, host_len);
>> -      }
>>      break;
>>    }
>>  case CACHE_OPEN_WRITE_LONG:
>> @@ -223,8 +239,9 @@ Cluster_write(Continuation * cont, int expected_size,
>>      if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)       // Bound marshalled data
>>        goto err_exit;
>> 
>> +      d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> +      msg = (char *) d->data();
>>      // Perform data Marshal operation
>> -      msg = (char *) ALLOCA_DOUBLE(flen + len);
>>      data = msg + flen;
>>      int res = 0;
>> 
>> @@ -257,7 +274,9 @@ Cluster_write(Continuation * cont, int expected_size,
>>    writeArgs.cfl_flags |= (old_info ? CFL_LOPENWRITE_HAVE_OLDINFO : 0);
>>    writeArgs.cfl_flags |= (allow_multiple_writes ? CFL_ALLOW_MULTIPLE_WRITES : 0);
>> 
>> -    return CacheContinuation::do_op(cont, m, (void *) &writeArgs, opcode, msg, flen + len, expected_size, buf);
>> +    Action *action = CacheContinuation::do_op(cont, session, (void *) &writeArgs, opcode, d, flen + len, expected_size, buf);
>> +    if (action)
>> +      return action;
>>  } else {
>>    //////////////////////////////////////////////////////////////
>>    // Create the specified down rev version of this message
>> @@ -267,19 +286,21 @@ Cluster_write(Continuation * cont, int expected_size,
>>  }
>> 
>> err_exit:
>> -  Action a;
>> -  a = cont;
>> -  return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
>> +  cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, NULL);
>> +  return ACTION_RESULT_DONE;
>> }
>> 
>> inline Action *
>> Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey * to,
>>             CacheFragType type, char *hostname, int host_len)
>> {
>> -  if (clusterProcessor.disable_remote_cluster_ops(m)) {
>> -    Action a;
>> -    a = cont;
>> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
>> +  ClusterSession session;
>> +  Ptr<IOBufferData> d;
>> +  char *msg = NULL;
>> +
>> +  if (cluster_create_session(&session, m, NULL, 0)) {
>> +    cont->handleEvent(CACHE_EVENT_LINK_FAILED, NULL);
>> +    return ACTION_RESULT_DONE;
>>  }
>> 
>>  int vers = CacheOpMsg_short_2::protoToVersion(m->msg_proto_major);
>> @@ -293,7 +314,8 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
>>    if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>>      goto err_exit;
>> 
>> -    char *msg = (char *) ALLOCA_DOUBLE(flen + len);
>> +    d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> +    msg = (char *) d->data();
>>    memcpy((msg + flen), hostname, host_len);
>> 
>>    // Setup args for remote link
>> @@ -301,7 +323,9 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
>>    linkArgs.from = from;
>>    linkArgs.to = to;
>>    linkArgs.frag_type = type;
>> -    return CacheContinuation::do_op(cont, m, (void *) &linkArgs, CACHE_LINK, msg, (flen + len));
>> +    Action *action = CacheContinuation::do_op(cont, session, (void *) &linkArgs, CACHE_LINK, d, (flen + len));
>> +    if (action)
>> +      return action;
>>  } else {
>>    //////////////////////////////////////////////////////////////
>>    // Create the specified down rev version of this message
>> @@ -311,18 +335,20 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
>>  }
>> 
>> err_exit:
>> -  Action a;
>> -  a = cont;
>> -  return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
>> +  cont->handleEvent(CACHE_EVENT_LINK_FAILED, NULL);
>> +  return ACTION_RESULT_DONE;
>> }
>> 
>> inline Action *
>> Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFragType type, char *hostname, int host_len)
>> {
>> -  if (clusterProcessor.disable_remote_cluster_ops(m)) {
>> -    Action a;
>> -    a = cont;
>> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
>> +  ClusterSession session;
>> +  Ptr<IOBufferData> d;
>> +  char *msg = NULL;
>> +
>> +  if (cluster_create_session(&session, m, NULL, 0)) {
>> +    cont->handleEvent(CACHE_EVENT_DEREF_FAILED, NULL);
>> +    return ACTION_RESULT_DONE ;
>>  }
>> 
>>  int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
>> @@ -336,14 +362,17 @@ Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFrag
>>    if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>>      goto err_exit;
>> 
>> -    char *msg = (char *) ALLOCA_DOUBLE(flen + len);
>> +    d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> +    msg = (char *) d->data();
>>    memcpy((msg + flen), hostname, host_len);
>> 
>>    // Setup args for remote deref
>>    CacheOpArgs_Deref drefArgs;
>>    drefArgs.md5 = key;
>>    drefArgs.frag_type = type;
>> -    return CacheContinuation::do_op(cont, m, (void *) &drefArgs, CACHE_DEREF, msg, (flen + len));
>> +    Action *action = CacheContinuation::do_op(cont, session, (void *) &drefArgs, CACHE_DEREF, d, (flen + len));
>> +    if (action)
>> +      return action;
>>  } else {
>>    //////////////////////////////////////////////////////////////
>>    // Create the specified down rev version of this message
>> @@ -353,19 +382,22 @@ Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFrag
>>  }
>> 
>> err_exit:
>> -  Action a;
>> -  a = cont;
>> -  return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
>> +  cont->handleEvent(CACHE_EVENT_DEREF_FAILED, NULL);
>> +  return ACTION_RESULT_DONE ;
>> }
>> 
>> inline Action *
>> Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>>               bool rm_user_agents, bool rm_link, CacheFragType frag_type, char *hostname, int host_len)
>> {
>> -  if (clusterProcessor.disable_remote_cluster_ops(m)) {
>> -    Action a;
>> -    a = cont;
>> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
>> +  ClusterSession session;
>> +  Ptr<IOBufferData> d;
>> +  char *msg = NULL;
>> +
>> +  if (cluster_create_session(&session, m, NULL, 0)) {
>> +    if (cont)
>> +      cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, NULL);
>> +    return ACTION_RESULT_DONE;
>>  }
>> 
>>  int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
>> @@ -379,7 +411,8 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>>    if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>>      goto err_exit;
>> 
>> -    char *msg = (char *) ALLOCA_DOUBLE(flen + len);
>> +    d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> +    msg = (char *) d->data();
>>    memcpy((msg + flen), hostname, host_len);
>> 
>>    // Setup args for remote update
>> @@ -388,7 +421,9 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>>    updateArgs.cfl_flags |= (rm_user_agents ? CFL_REMOVE_USER_AGENTS : 0);
>>    updateArgs.cfl_flags |= (rm_link ? CFL_REMOVE_LINK : 0);
>>    updateArgs.frag_type = frag_type;
>> -    return CacheContinuation::do_op(cont, m, (void *) &updateArgs, CACHE_REMOVE, msg, (flen + len));
>> +    Action *action = CacheContinuation::do_op(cont, session, (void *) &updateArgs, CACHE_REMOVE, d, (flen + len));
>> +    if (action)
>> +      return action;
>>  } else {
>>    //////////////////////////////////////////////////////////////
>>    // Create the specified down rev version of this message
>> @@ -398,9 +433,8 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>>  }
>> 
>> err_exit:
>> -  Action a;
>> -  a = cont;
>> -  return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
>> +  if (cont)
>> +    cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, NULL);
>> +  return ACTION_RESULT_DONE;
>> }
>> -
>> #endif /* __CLUSTERINLINE_H__ */
>> 
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/eventsystem/I_Event.h
>> ----------------------------------------------------------------------
>> diff --git a/iocore/eventsystem/I_Event.h b/iocore/eventsystem/I_Event.h
>> index 7a37ea0..2659131 100644
>> --- a/iocore/eventsystem/I_Event.h
>> +++ b/iocore/eventsystem/I_Event.h
>> @@ -85,6 +85,7 @@
>> #define BLOCK_CACHE_EVENT_EVENTS_START            4000
>> #define UTILS_EVENT_EVENTS_START                  5000
>> #define CONGESTION_EVENT_EVENTS_START             5100
>> +#define CLUSTER_MSG_START                         6000
>> #define INK_API_EVENT_EVENTS_START                60000
>> #define SRV_EVENT_EVENTS_START	                  62000
>> #define REMAP_EVENT_EVENTS_START                  63000
>> 
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/eventsystem/P_IOBuffer.h
>> ----------------------------------------------------------------------
>> diff --git a/iocore/eventsystem/P_IOBuffer.h b/iocore/eventsystem/P_IOBuffer.h
>> index 0842aff..261aa1f 100644
>> --- a/iocore/eventsystem/P_IOBuffer.h
>> +++ b/iocore/eventsystem/P_IOBuffer.h
>> @@ -203,7 +203,7 @@ new_IOBufferData_internal(
>>                           void *b, int64_t size, int64_t asize_index)
>> {
>>  (void) size;
>> -  IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_ethread());
>> +  IOBufferData *d = ioDataAllocator.alloc();
>>  d->_size_index = asize_index;
>>  ink_assert(BUFFER_SIZE_INDEX_IS_CONSTANT(asize_index)
>>             || size <= d->block_size());
>> @@ -263,7 +263,7 @@ new_IOBufferData_internal(
>> #endif
>>                           int64_t size_index, AllocType type)
>> {
>> -  IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_ethread());
>> +  IOBufferData *d = ioDataAllocator.alloc();
>> #ifdef TRACK_BUFFER_USER
>>  d->_location = loc;
>> #endif
>> @@ -336,7 +336,7 @@ TS_INLINE void
>> IOBufferData::free()
>> {
>>  dealloc();
>> -  THREAD_FREE(this, ioDataAllocator, this_ethread());
>> +  ioDataAllocator.free(this);
>> }
>> 
>> //////////////////////////////////////////////////////////////////
>> @@ -352,7 +352,7 @@ new_IOBufferBlock_internal(
>> #endif
>>  )
>> {
>> -  IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_ethread());
>> +  IOBufferBlock *b = ioBlockAllocator.alloc();
>> #ifdef TRACK_BUFFER_USER
>>  b->_location = location;
>> #endif
>> @@ -366,7 +366,7 @@ new_IOBufferBlock_internal(
>> #endif
>>                            IOBufferData * d, int64_t len, int64_t offset)
>> {
>> -  IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_ethread());
>> +  IOBufferBlock *b = ioBlockAllocator.alloc();
>> #ifdef TRACK_BUFFER_USER
>>  b->_location = location;
>> #endif
>> @@ -468,7 +468,7 @@ TS_INLINE void
>> IOBufferBlock::free()
>> {
>>  dealloc();
>> -  THREAD_FREE(this, ioBlockAllocator, this_ethread());
>> +  ioBlockAllocator.free(this);
>> }
>> 
>> TS_INLINE void
>> @@ -777,7 +777,7 @@ TS_INLINE MIOBuffer * new_MIOBuffer_internal(
>> #endif
>>                                               int64_t size_index)
>> {
>> -  MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_ethread());
>> +  MIOBuffer *b = ioAllocator.alloc();
>> #ifdef TRACK_BUFFER_USER
>>  b->_location = location;
>> #endif
>> @@ -790,7 +790,7 @@ free_MIOBuffer(MIOBuffer * mio)
>> {
>>  mio->_writer = NULL;
>>  mio->dealloc_all_readers();
>> -  THREAD_FREE(mio, ioAllocator, this_ethread());
>> +  ioAllocator.free(mio);
>> }
>> 
>> TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
>> @@ -799,7 +799,7 @@ TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
>> #endif
>>                                                     int64_t size_index)
>> {
>> -  MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_ethread());
>> +  MIOBuffer *b = ioAllocator.alloc();
>>  b->size_index = size_index;
>> #ifdef TRACK_BUFFER_USER
>>  b->_location = location;
>> @@ -810,7 +810,7 @@ TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
>> TS_INLINE void
>> free_empty_MIOBuffer(MIOBuffer * mio)
>> {
>> -  THREAD_FREE(mio, ioAllocator, this_ethread());
>> +  ioAllocator.free(mio);
>> }
>> 
>> TS_INLINE IOBufferReader *
>> 
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/mgmt/RecordsConfig.cc
>> ----------------------------------------------------------------------
>> diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
>> index 4a73f19..7677471 100644
>> --- a/mgmt/RecordsConfig.cc
>> +++ b/mgmt/RecordsConfig.cc
>> @@ -814,6 +814,24 @@ RecordElement RecordsConfig[] = {
>>  ,
>>  {RECT_CONFIG, "proxy.config.cluster.cluster_port", RECD_INT, "8086", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>>  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_bps", RECD_INT, "804857600", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> +  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_bps", RECD_INT, "4194304000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> +  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_send_wait_time", RECD_INT, "1000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> +  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_send_wait_time", RECD_INT, "5000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> +  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_loop_interval", RECD_INT, "0", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> +  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_loop_interval", RECD_INT, "1000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> +  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.max_sessions_per_machine", RECD_INT, "1000000", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1000-4000000]", RECA_NULL}
>> +  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.session_locks_per_machine", RECD_INT, "10949", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1-100000]", RECA_NULL}
>> +  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.read_buffer_size", RECD_INT, "2097152", RECU_RESTART_TS, RR_NULL, RECC_INT, "[65536-2097152]", RECA_NULL}
>> +  ,
>>  {RECT_CONFIG, "proxy.config.cluster.cluster_configuration", RECD_STRING, "cluster.config", RECU_NULL, RR_NULL, RECC_NULL, NULL, RECA_NULL}
>>  ,
>>  {RECT_CONFIG, "proxy.config.cluster.ethernet_interface", RECD_STRING, TS_BUILD_DEFAULT_LOOPBACK_IFACE, RECU_RESTART_TS, RR_REQUIRED, RECC_STR, "^[^[:space:]]*$", RECA_NULL}
>> 
> 


Re: [1/4] refine cluster

Posted by James Peach <jp...@apache.org>.
On Nov 20, 2013, at 9:50 AM, James Peach <jp...@apache.org> wrote:

> This is a huge commit with
> 
> 	- no links back to the Jira ticket
> 	- no source code comments
> 	- no indication of the problem that it is solving or how it solves it
> 	- no performance comparison (I assume this for performance)
> 	- no documentation of the new configuration parameters
> 	- no indication of compatibility (is it really OK to the 4.x branch?)
> 	- no tests

Oh, I didn't notice that this was on a branch. Thanks, that's excellent!

> 
> Please, can we talk about the above issues?
> 
> On Nov 19, 2013, at 11:45 PM, weijin@apache.org wrote:
> 
>> Updated Branches:
>> refs/heads/refine_cluster [created] 7ffc10a9c
>> 
>> 
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/cluster/P_ClusterInline.h
>> ----------------------------------------------------------------------
>> diff --git a/iocore/cluster/P_ClusterInline.h b/iocore/cluster/P_ClusterInline.h
>> index c653956..1d26b3a 100644
>> --- a/iocore/cluster/P_ClusterInline.h
>> +++ b/iocore/cluster/P_ClusterInline.h
>> @@ -36,25 +36,30 @@ inline Action *
>> Cluster_lookup(Continuation * cont, CacheKey * key, CacheFragType frag_type, char *hostname, int host_len)
>> {
>>  // Try to send remote, if not possible, handle locally
>> -  Action *retAct;
>> -  ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
>> -  if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
>> -    CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
>> -    cc->action = cont;
>> -    cc->mutex = cont->mutex;
>> -    retAct = CacheContinuation::do_remote_lookup(cont, key, cc, frag_type, hostname, host_len);
>> -    if (retAct) {
>> -      return retAct;
>> -    } else {
>> -      // not remote, do local lookup
>> -      CacheContinuation::cacheContAllocator_free(cc);
>> -      return (Action *) NULL;
>> -    }
>> -  } else {
>> -    Action a;
>> -    a = cont;
>> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_LOOKUP_FAILED, 0);
>> -  }
>> +//  Action *retAct;
>> +//  ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
>> +//  if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
>> +//    CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
>> +//    cc->action = cont;
>> +//    cc->mutex = cont->mutex;
>> +//    retAct = CacheContinuation::do_remote_lookup(cont, key, cc, frag_type, hostname, host_len);
>> +//    if (retAct) {
>> +//      return retAct;
>> +//    } else {
>> +//      // not remote, do local lookup
>> +//      CacheContinuation::cacheContAllocator_free(cc);
>> +//      return (Action *) NULL;
>> +//    }
>> +//  } else {
>> +//    Action a;
>> +//    a = cont;
>> +//    return CacheContinuation::callback_failure(&a, CACHE_EVENT_LOOKUP_FAILED, 0);
>> +//  }
>> +  (void) cont;
>> +  (void) key;
>> +  (void) frag_type;
>> +  (void) hostname;
>> +  (void) host_len;
>>  return (Action *) NULL;
>> }
>> 
>> @@ -66,18 +71,24 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>>             time_t pin_in_cache, CacheFragType frag_type, char *hostname, int host_len)
>> {
>>  (void) params;
>> -  if (clusterProcessor.disable_remote_cluster_ops(owner_machine)) {
>> -    Action a;
>> -    a = cont;
>> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
>> +  ink_assert(cont);
>> +  ClusterSession session;
>> +  if (cluster_create_session(&session, owner_machine, NULL, 0)) {
>> +    cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, NULL);
>> +    return ACTION_RESULT_DONE;
>>  }
>> +
>>  int vers = CacheOpMsg_long::protoToVersion(owner_machine->msg_proto_major);
>> +  CacheOpArgs_General readArgs;
>> +  Ptr<IOBufferData> d;
>> +
>>  int flen;
>>  int len = 0;
>>  int cur_len;
>>  int res = 0;
>> -  char *msg;
>> +  char *msg = 0;
>>  char *data;
>> +  Action *action = NULL;
>> 
>>  if (vers == CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) {
>>    if ((opcode == CACHE_OPEN_READ_LONG)
>> @@ -87,20 +98,21 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>> 
>>      const char *url_hostname;
>>      int url_hlen;
>> -      INK_MD5 url_only_md5;
>> +      INK_MD5 url_md5;
>> 
>> -      Cache::generate_key(&url_only_md5, url, 0);
>> +      Cache::generate_key(&url_md5, url);
>>      url_hostname = url->host_get(&url_hlen);
>> 
>>      len += request->m_heap->marshal_length();
>> -      len += params->marshal_length();
>> +      len += sizeof(CacheLookupHttpConfig) + params->marshal_length();
>>      len += url_hlen;
>> 
>>      if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)       // Bound marshalled data
>>        goto err_exit;
>> 
>>      // Perform data Marshal operation
>> -      msg = (char *) ALLOCA_DOUBLE(flen + len);
>> +      d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> +      msg = (char *) d->data();
>>      data = msg + flen;
>> 
>>      cur_len = len;
>> @@ -110,6 +122,13 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>>      }
>>      data += res;
>>      cur_len -= res;
>> +
>> +      if (cur_len < (int) sizeof(CacheLookupHttpConfig))
>> +        goto err_exit;
>> +      memcpy(data, params, sizeof(CacheLookupHttpConfig));
>> +      data += sizeof(CacheLookupHttpConfig);
>> +      cur_len -= sizeof(CacheLookupHttpConfig);
>> +
>>      if ((res = params->marshal(data, cur_len)) < 0)
>>        goto err_exit;
>>      data += res;
>> @@ -117,37 +136,33 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>>      memcpy(data, url_hostname, url_hlen);
>> 
>>      CacheOpArgs_General readArgs;
>> -      readArgs.url_md5 = &url_only_md5;
>> +      readArgs.url_md5 = &url_md5;
>>      readArgs.pin_in_cache = pin_in_cache;
>>      readArgs.frag_type = frag_type;
>> -      return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
>> -                                      opcode, (char *) msg, (flen + len), -1, buf);
>> +
>> +      action = CacheContinuation::do_op(cont, session, (void *) &readArgs,
>> +                                            opcode, d, (flen + len), -1, buf);
>>    } else {
>>      // Build message if we have host data.
>> +      flen = op_to_sizeof_fixedlen_msg(opcode);
>> +      len = host_len;
>> 
>> -      if (host_len) {
>> -        // Determine length of data to Marshal
>> -        flen = op_to_sizeof_fixedlen_msg(opcode);
>> -        len = host_len;
>> -
>> -        if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)     // Bound marshalled data
>> -          goto err_exit;
>> +      if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)     // Bound marshalled data
>> +        goto err_exit;
>> 
>> -        msg = (char *) ALLOCA_DOUBLE(flen + len);
>> -        data = msg + flen;
>> +      d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> +      msg = (char *) d->data();
>> +      data = msg + flen;
>> +      if (host_len)
>>        memcpy(data, hostname, host_len);
>> 
>> -      } else {
>> -        msg = 0;
>> -        flen = 0;
>> -        len = 0;
>> -      }
>> -      CacheOpArgs_General readArgs;
>>      readArgs.url_md5 = key;
>>      readArgs.frag_type = frag_type;
>> -      return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
>> -                                      opcode, (char *) msg, (flen + len), -1, buf);
>> +
>> +      action = CacheContinuation::do_op(cont, session, (void *) &readArgs,
>> +                                            opcode, d, (flen + len), -1, buf);
>>    }
>> +    ink_assert(msg);
>> 
>>  } else {
>>    //////////////////////////////////////////////////////////////
>> @@ -155,10 +170,12 @@ Cluster_read(ClusterMachine * owner_machine, int opcode,
>>    //////////////////////////////////////////////////////////////
>>    ink_release_assert(!"CacheOpMsg_long [read] bad msg version");
>>  }
>> +
>> +  if (action)
>> +    return action;
>> err_exit:
>> -  Action a;
>> -  a = cont;
>> -  return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
>> +  cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, NULL);
>> +  return ACTION_RESULT_DONE;
>> }
>> 
>> inline Action *
>> @@ -171,10 +188,11 @@ Cluster_write(Continuation * cont, int expected_size,
>> {
>>  (void) key;
>>  (void) request;
>> -  if (clusterProcessor.disable_remote_cluster_ops(m)) {
>> -    Action a;
>> -    a = cont;
>> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
>> +  ClusterSession session;
>> +  ink_assert(cont);
>> +  if (cluster_create_session(&session, m, NULL, 0)) {
>> +     cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, NULL);
>> +     return ACTION_RESULT_DONE;
>>  }
>>  char *msg = 0;
>>  char *data = 0;
>> @@ -182,24 +200,22 @@ Cluster_write(Continuation * cont, int expected_size,
>>  int len = 0;
>>  int flen = 0;
>>  int vers = CacheOpMsg_long::protoToVersion(m->msg_proto_major);
>> +  Ptr<IOBufferData> d;
>> 
>>  switch (opcode) {
>>  case CACHE_OPEN_WRITE:
>>    {
>>      // Build message if we have host data
>> -      if (host_len) {
>> -        // Determine length of data to Marshal
>> -        flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE);
>> -        len = host_len;
>> -
>> -        if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)     // Bound marshalled data
>> -          goto err_exit;
>> -
>> -        msg = (char *) ALLOCA_DOUBLE(flen + len);
>> -        data = msg + flen;
>> +      len = host_len;
>> +      flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE);
>> +      if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)     // Bound marshalled data
>> +        goto err_exit;
>> 
>> +      d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> +      msg = (char *) d->data();
>> +      data = msg + flen;
>> +      if (host_len)
>>        memcpy(data, hostname, host_len);
>> -      }
>>      break;
>>    }
>>  case CACHE_OPEN_WRITE_LONG:
>> @@ -223,8 +239,9 @@ Cluster_write(Continuation * cont, int expected_size,
>>      if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)       // Bound marshalled data
>>        goto err_exit;
>> 
>> +      d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> +      msg = (char *) d->data();
>>      // Perform data Marshal operation
>> -      msg = (char *) ALLOCA_DOUBLE(flen + len);
>>      data = msg + flen;
>>      int res = 0;
>> 
>> @@ -257,7 +274,9 @@ Cluster_write(Continuation * cont, int expected_size,
>>    writeArgs.cfl_flags |= (old_info ? CFL_LOPENWRITE_HAVE_OLDINFO : 0);
>>    writeArgs.cfl_flags |= (allow_multiple_writes ? CFL_ALLOW_MULTIPLE_WRITES : 0);
>> 
>> -    return CacheContinuation::do_op(cont, m, (void *) &writeArgs, opcode, msg, flen + len, expected_size, buf);
>> +    Action *action = CacheContinuation::do_op(cont, session, (void *) &writeArgs, opcode, d, flen + len, expected_size, buf);
>> +    if (action)
>> +      return action;
>>  } else {
>>    //////////////////////////////////////////////////////////////
>>    // Create the specified down rev version of this message
>> @@ -267,19 +286,21 @@ Cluster_write(Continuation * cont, int expected_size,
>>  }
>> 
>> err_exit:
>> -  Action a;
>> -  a = cont;
>> -  return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
>> +  cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, NULL);
>> +  return ACTION_RESULT_DONE;
>> }
>> 
>> inline Action *
>> Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey * to,
>>             CacheFragType type, char *hostname, int host_len)
>> {
>> -  if (clusterProcessor.disable_remote_cluster_ops(m)) {
>> -    Action a;
>> -    a = cont;
>> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
>> +  ClusterSession session;
>> +  Ptr<IOBufferData> d;
>> +  char *msg = NULL;
>> +
>> +  if (cluster_create_session(&session, m, NULL, 0)) {
>> +    cont->handleEvent(CACHE_EVENT_LINK_FAILED, NULL);
>> +    return ACTION_RESULT_DONE;
>>  }
>> 
>>  int vers = CacheOpMsg_short_2::protoToVersion(m->msg_proto_major);
>> @@ -293,7 +314,8 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
>>    if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>>      goto err_exit;
>> 
>> -    char *msg = (char *) ALLOCA_DOUBLE(flen + len);
>> +    d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> +    msg = (char *) d->data();
>>    memcpy((msg + flen), hostname, host_len);
>> 
>>    // Setup args for remote link
>> @@ -301,7 +323,9 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
>>    linkArgs.from = from;
>>    linkArgs.to = to;
>>    linkArgs.frag_type = type;
>> -    return CacheContinuation::do_op(cont, m, (void *) &linkArgs, CACHE_LINK, msg, (flen + len));
>> +    Action *action = CacheContinuation::do_op(cont, session, (void *) &linkArgs, CACHE_LINK, d, (flen + len));
>> +    if (action)
>> +      return action;
>>  } else {
>>    //////////////////////////////////////////////////////////////
>>    // Create the specified down rev version of this message
>> @@ -311,18 +335,20 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey
>>  }
>> 
>> err_exit:
>> -  Action a;
>> -  a = cont;
>> -  return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
>> +  cont->handleEvent(CACHE_EVENT_LINK_FAILED, NULL);
>> +  return ACTION_RESULT_DONE;
>> }
>> 
>> inline Action *
>> Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFragType type, char *hostname, int host_len)
>> {
>> -  if (clusterProcessor.disable_remote_cluster_ops(m)) {
>> -    Action a;
>> -    a = cont;
>> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
>> +  ClusterSession session;
>> +  Ptr<IOBufferData> d;
>> +  char *msg = NULL;
>> +
>> +  if (cluster_create_session(&session, m, NULL, 0)) {
>> +    cont->handleEvent(CACHE_EVENT_DEREF_FAILED, NULL);
>> +    return ACTION_RESULT_DONE ;
>>  }
>> 
>>  int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
>> @@ -336,14 +362,17 @@ Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFrag
>>    if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>>      goto err_exit;
>> 
>> -    char *msg = (char *) ALLOCA_DOUBLE(flen + len);
>> +    d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> +    msg = (char *) d->data();
>>    memcpy((msg + flen), hostname, host_len);
>> 
>>    // Setup args for remote deref
>>    CacheOpArgs_Deref drefArgs;
>>    drefArgs.md5 = key;
>>    drefArgs.frag_type = type;
>> -    return CacheContinuation::do_op(cont, m, (void *) &drefArgs, CACHE_DEREF, msg, (flen + len));
>> +    Action *action = CacheContinuation::do_op(cont, session, (void *) &drefArgs, CACHE_DEREF, d, (flen + len));
>> +    if (action)
>> +      return action;
>>  } else {
>>    //////////////////////////////////////////////////////////////
>>    // Create the specified down rev version of this message
>> @@ -353,19 +382,22 @@ Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFrag
>>  }
>> 
>> err_exit:
>> -  Action a;
>> -  a = cont;
>> -  return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
>> +  cont->handleEvent(CACHE_EVENT_DEREF_FAILED, NULL);
>> +  return ACTION_RESULT_DONE ;
>> }
>> 
>> inline Action *
>> Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>>               bool rm_user_agents, bool rm_link, CacheFragType frag_type, char *hostname, int host_len)
>> {
>> -  if (clusterProcessor.disable_remote_cluster_ops(m)) {
>> -    Action a;
>> -    a = cont;
>> -    return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
>> +  ClusterSession session;
>> +  Ptr<IOBufferData> d;
>> +  char *msg = NULL;
>> +
>> +  if (cluster_create_session(&session, m, NULL, 0)) {
>> +    if (cont)
>> +      cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, NULL);
>> +    return ACTION_RESULT_DONE;
>>  }
>> 
>>  int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
>> @@ -379,7 +411,8 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>>    if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
>>      goto err_exit;
>> 
>> -    char *msg = (char *) ALLOCA_DOUBLE(flen + len);
>> +    d = new_IOBufferData(iobuffer_size_to_index(flen + len));
>> +    msg = (char *) d->data();
>>    memcpy((msg + flen), hostname, host_len);
>> 
>>    // Setup args for remote update
>> @@ -388,7 +421,9 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>>    updateArgs.cfl_flags |= (rm_user_agents ? CFL_REMOVE_USER_AGENTS : 0);
>>    updateArgs.cfl_flags |= (rm_link ? CFL_REMOVE_LINK : 0);
>>    updateArgs.frag_type = frag_type;
>> -    return CacheContinuation::do_op(cont, m, (void *) &updateArgs, CACHE_REMOVE, msg, (flen + len));
>> +    Action *action = CacheContinuation::do_op(cont, session, (void *) &updateArgs, CACHE_REMOVE, d, (flen + len));
>> +    if (action)
>> +      return action;
>>  } else {
>>    //////////////////////////////////////////////////////////////
>>    // Create the specified down rev version of this message
>> @@ -398,9 +433,8 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
>>  }
>> 
>> err_exit:
>> -  Action a;
>> -  a = cont;
>> -  return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
>> +  if (cont)
>> +    cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, NULL);
>> +  return ACTION_RESULT_DONE;
>> }
>> -
>> #endif /* __CLUSTERINLINE_H__ */
>> 
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/eventsystem/I_Event.h
>> ----------------------------------------------------------------------
>> diff --git a/iocore/eventsystem/I_Event.h b/iocore/eventsystem/I_Event.h
>> index 7a37ea0..2659131 100644
>> --- a/iocore/eventsystem/I_Event.h
>> +++ b/iocore/eventsystem/I_Event.h
>> @@ -85,6 +85,7 @@
>> #define BLOCK_CACHE_EVENT_EVENTS_START            4000
>> #define UTILS_EVENT_EVENTS_START                  5000
>> #define CONGESTION_EVENT_EVENTS_START             5100
>> +#define CLUSTER_MSG_START                         6000
>> #define INK_API_EVENT_EVENTS_START                60000
>> #define SRV_EVENT_EVENTS_START	                  62000
>> #define REMAP_EVENT_EVENTS_START                  63000
>> 
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/iocore/eventsystem/P_IOBuffer.h
>> ----------------------------------------------------------------------
>> diff --git a/iocore/eventsystem/P_IOBuffer.h b/iocore/eventsystem/P_IOBuffer.h
>> index 0842aff..261aa1f 100644
>> --- a/iocore/eventsystem/P_IOBuffer.h
>> +++ b/iocore/eventsystem/P_IOBuffer.h
>> @@ -203,7 +203,7 @@ new_IOBufferData_internal(
>>                           void *b, int64_t size, int64_t asize_index)
>> {
>>  (void) size;
>> -  IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_ethread());
>> +  IOBufferData *d = ioDataAllocator.alloc();
>>  d->_size_index = asize_index;
>>  ink_assert(BUFFER_SIZE_INDEX_IS_CONSTANT(asize_index)
>>             || size <= d->block_size());
>> @@ -263,7 +263,7 @@ new_IOBufferData_internal(
>> #endif
>>                           int64_t size_index, AllocType type)
>> {
>> -  IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_ethread());
>> +  IOBufferData *d = ioDataAllocator.alloc();
>> #ifdef TRACK_BUFFER_USER
>>  d->_location = loc;
>> #endif
>> @@ -336,7 +336,7 @@ TS_INLINE void
>> IOBufferData::free()
>> {
>>  dealloc();
>> -  THREAD_FREE(this, ioDataAllocator, this_ethread());
>> +  ioDataAllocator.free(this);
>> }
>> 
>> //////////////////////////////////////////////////////////////////
>> @@ -352,7 +352,7 @@ new_IOBufferBlock_internal(
>> #endif
>>  )
>> {
>> -  IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_ethread());
>> +  IOBufferBlock *b = ioBlockAllocator.alloc();
>> #ifdef TRACK_BUFFER_USER
>>  b->_location = location;
>> #endif
>> @@ -366,7 +366,7 @@ new_IOBufferBlock_internal(
>> #endif
>>                            IOBufferData * d, int64_t len, int64_t offset)
>> {
>> -  IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_ethread());
>> +  IOBufferBlock *b = ioBlockAllocator.alloc();
>> #ifdef TRACK_BUFFER_USER
>>  b->_location = location;
>> #endif
>> @@ -468,7 +468,7 @@ TS_INLINE void
>> IOBufferBlock::free()
>> {
>>  dealloc();
>> -  THREAD_FREE(this, ioBlockAllocator, this_ethread());
>> +  ioBlockAllocator.free(this);
>> }
>> 
>> TS_INLINE void
>> @@ -777,7 +777,7 @@ TS_INLINE MIOBuffer * new_MIOBuffer_internal(
>> #endif
>>                                               int64_t size_index)
>> {
>> -  MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_ethread());
>> +  MIOBuffer *b = ioAllocator.alloc();
>> #ifdef TRACK_BUFFER_USER
>>  b->_location = location;
>> #endif
>> @@ -790,7 +790,7 @@ free_MIOBuffer(MIOBuffer * mio)
>> {
>>  mio->_writer = NULL;
>>  mio->dealloc_all_readers();
>> -  THREAD_FREE(mio, ioAllocator, this_ethread());
>> +  ioAllocator.free(mio);
>> }
>> 
>> TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
>> @@ -799,7 +799,7 @@ TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
>> #endif
>>                                                     int64_t size_index)
>> {
>> -  MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_ethread());
>> +  MIOBuffer *b = ioAllocator.alloc();
>>  b->size_index = size_index;
>> #ifdef TRACK_BUFFER_USER
>>  b->_location = location;
>> @@ -810,7 +810,7 @@ TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal(
>> TS_INLINE void
>> free_empty_MIOBuffer(MIOBuffer * mio)
>> {
>> -  THREAD_FREE(mio, ioAllocator, this_ethread());
>> +  ioAllocator.free(mio);
>> }
>> 
>> TS_INLINE IOBufferReader *
>> 
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7ffc10a9/mgmt/RecordsConfig.cc
>> ----------------------------------------------------------------------
>> diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
>> index 4a73f19..7677471 100644
>> --- a/mgmt/RecordsConfig.cc
>> +++ b/mgmt/RecordsConfig.cc
>> @@ -814,6 +814,24 @@ RecordElement RecordsConfig[] = {
>>  ,
>>  {RECT_CONFIG, "proxy.config.cluster.cluster_port", RECD_INT, "8086", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>>  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_bps", RECD_INT, "804857600", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> +  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_bps", RECD_INT, "4194304000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> +  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_send_wait_time", RECD_INT, "1000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> +  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_send_wait_time", RECD_INT, "5000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> +  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_loop_interval", RECD_INT, "0", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> +  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_loop_interval", RECD_INT, "1000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
>> +  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.max_sessions_per_machine", RECD_INT, "1000000", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1000-4000000]", RECA_NULL}
>> +  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.session_locks_per_machine", RECD_INT, "10949", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1-100000]", RECA_NULL}
>> +  ,
>> +  {RECT_CONFIG, "proxy.config.cluster.read_buffer_size", RECD_INT, "2097152", RECU_RESTART_TS, RR_NULL, RECC_INT, "[65536-2097152]", RECA_NULL}
>> +  ,
>>  {RECT_CONFIG, "proxy.config.cluster.cluster_configuration", RECD_STRING, "cluster.config", RECU_NULL, RR_NULL, RECC_NULL, NULL, RECA_NULL}
>>  ,
>>  {RECT_CONFIG, "proxy.config.cluster.ethernet_interface", RECD_STRING, TS_BUILD_DEFAULT_LOOPBACK_IFACE, RECU_RESTART_TS, RR_REQUIRED, RECC_STR, "^[^[:space:]]*$", RECA_NULL}
>> 
>