You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by zy...@apache.org on 2012/05/23 04:03:04 UTC

git commit: TS-1222 single tcp connection will limit the cluster throughput

Updated Branches:
  refs/heads/master bad6bf092 -> e67b0e403


TS-1222 single tcp connection will limit the cluster throughput

we have changed:
1, make multiple ClusterHandler per ClusterMachine
2, limit cluster request works only in ET_CLUSTER

after this patch, you will get far better performance in clustering,
we can expect it may used in 10G networks.


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/e67b0e40
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/e67b0e40
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/e67b0e40

Branch: refs/heads/master
Commit: e67b0e4038d9144c02cf41d6bf06e42083295a52
Parents: bad6bf0
Author: Chen Bin <ku...@taobao.com>
Authored: Tue May 22 14:15:41 2012 +0800
Committer: Zhao Yongming <mi...@gmail.com>
Committed: Wed May 23 10:00:47 2012 +0800

----------------------------------------------------------------------
 iocore/cache/P_CacheInternal.h          |   13 ++---
 iocore/cluster/ClusterAPI.cc            |    8 ++--
 iocore/cluster/ClusterCache.cc          |   72 ++++++++++++++-----------
 iocore/cluster/ClusterConfig.cc         |   15 ++++--
 iocore/cluster/ClusterHandler.cc        |   48 +++++++++++------
 iocore/cluster/ClusterHandlerBase.cc    |   74 +++++++++++++++++--------
 iocore/cluster/ClusterLoadMonitor.cc    |   17 +++---
 iocore/cluster/ClusterMachine.cc        |   18 ++++++-
 iocore/cluster/ClusterProcessor.cc      |   39 +++++++-------
 iocore/cluster/ClusterRPC.cc            |   47 ++++++++---------
 iocore/cluster/ClusterVConnection.cc    |   17 +++----
 iocore/cluster/P_ClusterCache.h         |   39 +++++++------
 iocore/cluster/P_ClusterCacheInternal.h |    1 +
 iocore/cluster/P_ClusterHandler.h       |    5 ++-
 iocore/cluster/P_ClusterInline.h        |    3 +-
 iocore/cluster/P_ClusterLoadMonitor.h   |    6 +-
 iocore/cluster/P_ClusterMachine.h       |   13 +++--
 iocore/hostdb/HostDB.cc                 |   12 ++--
 iocore/net/UnixNetAccept.cc             |    2 +-
 mgmt/RecordsConfig.cc                   |    2 +
 proxy/http/HttpCacheSM.cc               |    3 +-
 21 files changed, 266 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cache/P_CacheInternal.h
----------------------------------------------------------------------
diff --git a/iocore/cache/P_CacheInternal.h b/iocore/cache/P_CacheInternal.h
index 69e298f..2dcee3e 100644
--- a/iocore/cache/P_CacheInternal.h
+++ b/iocore/cache/P_CacheInternal.h
@@ -1165,10 +1165,10 @@ CacheProcessor::open_write(Continuation *cont, CacheKey *key, CacheFragType frag
 {
   (void) expected_size;
 #ifdef CLUSTER_CACHE
-  ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
-
-  if (m && (cache_clustering_enabled > 0)) {
-    return Cluster_write(cont, expected_size, (MIOBuffer *) 0, m,
+  if (cache_clustering_enabled > 0) {
+    ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
+    if (m)
+      return Cluster_write(cont, expected_size, (MIOBuffer *) 0, m,
                          key, frag_type, options, pin_in_cache,
                          CACHE_OPEN_WRITE, key, (CacheURL *) 0,
                          (CacheHTTPHdr *) 0, (CacheHTTPInfo *) 0, hostname, host_len);
@@ -1278,10 +1278,9 @@ CacheProcessor::open_read_internal(int opcode,
     url_md5 = *key;
   }
   ClusterMachine *m = cluster_machine_at_depth(cache_hash(url_md5));
-  ClusterMachine *owner_machine = m ? m : this_cluster_machine();
 
-  if (owner_machine != this_cluster_machine()) {
-    return Cluster_read(owner_machine, opcode, cont, buf, url,
+  if (m) {
+    return Cluster_read(m, opcode, cont, buf, url,
                         request, params, key, pin_in_cache, frag_type, hostname, host_len);
   } else {
     if ((opcode == CACHE_OPEN_READ_LONG)

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterAPI.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterAPI.cc b/iocore/cluster/ClusterAPI.cc
index 5e64719..7691e90 100644
--- a/iocore/cluster/ClusterAPI.cc
+++ b/iocore/cluster/ClusterAPI.cc
@@ -487,9 +487,9 @@ TSDeleteClusterRPCFunction(TSClusterRPCHandle_t * rpch)
  *  Cluster calls us here for each RPC API function.
  */
 void
-default_api_ClusterFunction(ClusterMachine * m, void *data, int len)
+default_api_ClusterFunction(ClusterHandler *ch, void *data, int len)
 {
-  Debug("cluster_api", "default_api_ClusterFunction: [%u.%u.%u.%u] data %p len %d", DOT_SEPARATED(m->ip), data, len);
+  Debug("cluster_api", "default_api_ClusterFunction: [%u.%u.%u.%u] data %p len %d", DOT_SEPARATED(ch->machine->ip), data, len);
 
   TSClusterRPCMsg_t *msg = (TSClusterRPCMsg_t *) data;
   RPCHandle_t *rpch = (RPCHandle_t *) & msg->m_handle;
@@ -501,7 +501,7 @@ default_api_ClusterFunction(ClusterMachine * m, void *data, int len)
 
   if (cluster_function < API_END_CLUSTER_FUNCTION && RPC_Functions[cluster_function]) {
     int msg_data_len = len - SIZEOF_RPC_MSG_LESS_DATA;
-    TSNodeHandle_t nh = IP_TO_NODE_HANDLE(m->ip);
+    TSNodeHandle_t nh = IP_TO_NODE_HANDLE(ch->machine->ip);
     (*RPC_Functions[cluster_function]) (&nh, msg, msg_data_len);
   } else {
     clusterProcessor.free_remote_data((char *) data, len);
@@ -572,7 +572,7 @@ TSSendClusterRPC(TSNodeHandle_t * nh, TSClusterRPCMsg_t * msg)
     int len = c->len - sizeof(OutgoingControl *);
     ink_release_assert((size_t) len >= sizeof(TSClusterRPCMsg_t));
 
-    clusterProcessor.invoke_remote(m, rpch->u.internal.cluster_function,
+    clusterProcessor.invoke_remote(m->pop_ClusterHandler(), rpch->u.internal.cluster_function,
                                    msg, len, (CLUSTER_OPT_STEAL | CLUSTER_OPT_DATA_IS_OCONTROL));
     Debug("cluster_api", "TSSendClusterRPC: msg %p dlen %d [%u.%u.%u.%u] sent", msg, len, DOT_SEPARATED(ipaddr.s_addr));
   } else {

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterCache.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterCache.cc b/iocore/cluster/ClusterCache.cc
index 3b06f85..51f375e 100644
--- a/iocore/cluster/ClusterCache.cc
+++ b/iocore/cluster/ClusterCache.cc
@@ -379,6 +379,7 @@ CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args,
   CacheContinuation *cc = 0;
   Action *act = 0;
   char *msg = 0;
+  ClusterHandler *ch = mp->pop_ClusterHandler();
 
   /////////////////////////////////////////////////////////////////////
   // Unconditionally map open read buffer interfaces to open read.
@@ -396,8 +397,12 @@ CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args,
     break;
   }
 
+  if (!ch)
+    goto no_send_exit;
+
   if (c) {
     cc = cacheContAllocator_alloc();
+    cc->ch = ch;
     cc->target_machine = mp;
     cc->request_opcode = opcode;
     cc->mutex = c->mutex;
@@ -610,7 +615,7 @@ CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args,
 #ifdef CACHE_MSG_TRACE
   log_cache_op_sndmsg((c ? cc->seq_number : CACHE_NO_RESPONSE), 0, "do_op");
 #endif
-  clusterProcessor.invoke_remote(mp,
+  clusterProcessor.invoke_remote(ch,
                                  op_needs_marshalled_coi(opcode) ? CACHE_OP_MALLOCED_CLUSTER_FUNCTION
                                  : CACHE_OP_CLUSTER_FUNCTION, (char *) msg, data_len);
 
@@ -753,7 +758,7 @@ CacheContinuation::lookupOpenWriteVC()
     msg.seq_number = seq_number;
     msg.token = vc->token;
 
-    cache_op_result_ClusterFunction(from, (void *) &msg, msglen);
+    cache_op_result_ClusterFunction(ch, (void *) &msg, msglen);
 
   } else {
     // Miss, establish local VC and send remote open_write request
@@ -898,7 +903,7 @@ CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc)
 
     if (event != CLUSTER_EVENT_OPEN_EXISTS) {
       // Send request message
-      clusterProcessor.invoke_remote(from,
+      clusterProcessor.invoke_remote(ch,
                                      (op_needs_marshalled_coi(request_opcode) ?
                                       CACHE_OP_MALLOCED_CLUSTER_FUNCTION :
                                       CACHE_OP_CLUSTER_FUNCTION), (char *) getMsgBuffer(), getMsgBufferLen());
@@ -1057,7 +1062,7 @@ init_from_short_2(CacheContinuation * cont, CacheOpMsg_short_2 * msg, ClusterMac
 }
 
 void
-cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
+cache_op_ClusterFunction(ClusterHandler * ch, void *data, int len)
 {
   EThread *thread = this_ethread();
   ProxyMutex *mutex = thread->mutex;
@@ -1084,6 +1089,7 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
   c->request_opcode = opcode;
   c->token.clear();
   c->start_time = ink_get_hrtime();
+  c->ch = ch;
   SET_CONTINUATION_HANDLER(c, (CacheContHandler)
                            & CacheContinuation::replyOpEvent);
 
@@ -1101,9 +1107,9 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
   case CACHE_OPEN_READ:
     {
       CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
-      init_from_short(c, msg, from);
+      init_from_short(c, msg, ch->machine);
       Debug("cache_msg",
-            "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, from);
+            "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
       //
       // Establish the remote side of the ClusterVConnection
       //
@@ -1157,9 +1163,9 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
 
       int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
       CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
-      init_from_long(c, msg, from);
+      init_from_long(c, msg, ch->machine);
       Debug("cache_msg",
-            "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, from);
+            "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
 #ifdef CACHE_MSG_TRACE
       log_cache_op_msg(msg->seq_number, len, "cache_op_open_read_long");
 #endif
@@ -1247,9 +1253,9 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
   case CACHE_OPEN_WRITE:
     {
       CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
-      init_from_short(c, msg, from);
+      init_from_short(c, msg, ch->machine);
       Debug("cache_msg",
-            "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, from);
+            "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
 #ifdef CACHE_MSG_TRACE
       log_cache_op_msg(msg->seq_number, len, "cache_op_open_write");
 #endif
@@ -1302,9 +1308,9 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
 
       int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
       CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
-      init_from_long(c, msg, from);
+      init_from_long(c, msg, ch->machine);
       Debug("cache_msg",
-            "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, from);
+            "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
 #ifdef CACHE_MSG_TRACE
       log_cache_op_msg(msg->seq_number, len, "cache_op_open_write_long");
 #endif
@@ -1373,9 +1379,9 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
   case CACHE_REMOVE:
     {
       CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
-      init_from_short(c, msg, from);
+      init_from_short(c, msg, ch->machine);
       Debug("cache_msg",
-            "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, from);
+            "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
 #ifdef CACHE_MSG_TRACE
       log_cache_op_msg(msg->seq_number, len, "cache_op_remove");
 #endif
@@ -1400,9 +1406,9 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
   case CACHE_LINK:
     {
       CacheOpMsg_short_2 *msg = unmarshal_CacheOpMsg_short_2(data, mh->NeedByteSwap());
-      init_from_short_2(c, msg, from);
+      init_from_short_2(c, msg, ch->machine);
       Debug("cache_msg",
-            "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, from);
+            "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
 #ifdef CACHE_MSG_TRACE
       log_cache_op_msg(msg->seq_number, len, "cache_op_link");
 #endif
@@ -1427,9 +1433,9 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
   case CACHE_DEREF:
     {
       CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
-      init_from_short(c, msg, from);
+      init_from_short(c, msg, ch->machine);
       Debug("cache_msg",
-            "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, from);
+            "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
 #ifdef CACHE_MSG_TRACE
       log_cache_op_msg(msg->seq_number, len, "cache_op_deref");
 #endif
@@ -1459,9 +1465,9 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
 }
 
 void
-cache_op_malloc_ClusterFunction(ClusterMachine * from, void *data, int len)
+cache_op_malloc_ClusterFunction(ClusterHandler *ch, void *data, int len)
 {
-  cache_op_ClusterFunction(from, data, len);
+  cache_op_ClusterFunction(ch, data, len);
   // We own the message data, free it back to the Cluster subsystem
   clusterProcessor.free_remote_data((char *) data, len);
 }
@@ -1779,7 +1785,7 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc)
       // Transmit reply message and object data in same cluster message
       Debug("cache_proto", "Sending reply/data seqno=%d buflen=%"PRId64,
             seq_number, readahead_data ? bytes_IOBufferBlockList(readahead_data, 1) : 0);
-      clusterProcessor.invoke_remote_data(from,
+      clusterProcessor.invoke_remote_data(ch,
                                           CACHE_OP_RESULT_CLUSTER_FUNCTION,
                                           (void *) msg, (flen + len),
                                           readahead_data,
@@ -1787,7 +1793,7 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc)
                                           &CacheContinuation::disposeOfDataBuffer, (void *) this, CLUSTER_OPT_STEAL);
     } else {
       Debug("cache_proto", "Sending reply seqno=%d, (this=%p)", seq_number, this);
-      clusterProcessor.invoke_remote(from, CACHE_OP_RESULT_CLUSTER_FUNCTION,
+      clusterProcessor.invoke_remote(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION,
                                      (void *) msg, (flen + len), CLUSTER_OPT_STEAL);
     }
 
@@ -1966,9 +1972,8 @@ CacheContinuation::handleDisposeEvent(int event, CacheContinuation * cc)
 //   unmarshals the result and calls a continuation in the requesting thread.
 /////////////////////////////////////////////////////////////////////////////
 void
-cache_op_result_ClusterFunction(ClusterMachine * from, void *d, int l)
+cache_op_result_ClusterFunction(ClusterHandler *ch, void *d, int l)
 {
-  (void) from;
   ////////////////////////////////////////////////////////
   // Note: we are running on the ET_CACHE_CONT_SM thread
   ////////////////////////////////////////////////////////
@@ -2036,7 +2041,7 @@ cache_op_result_ClusterFunction(ClusterMachine * from, void *d, int l)
   }
   // See if this response is still expected (expected case == yes)
 
-  unsigned int hash = FOLDHASH(from->ip, msg->seq_number);
+  unsigned int hash = FOLDHASH(ch->machine->ip, msg->seq_number);
   EThread *thread = this_ethread();
   ProxyMutex *mutex = thread->mutex;
   if (MUTEX_TAKE_TRY_LOCK(remoteCacheContQueueMutex[hash], thread)) {
@@ -2044,7 +2049,7 @@ cache_op_result_ClusterFunction(ClusterMachine * from, void *d, int l)
     // Find it in pending list
 
     CacheContinuation *c = find_cache_continuation(msg->seq_number,
-                                                   from->ip);
+                                                   ch->machine->ip);
     if (!c) {
       // Reply took to long, response no longer expected.
       MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], thread);
@@ -2085,7 +2090,7 @@ cache_op_result_ClusterFunction(ClusterMachine * from, void *d, int l)
     CacheContinuation * c = CacheContinuation::cacheContAllocator_alloc();
     c->mutex = new_ProxyMutex();
     c->seq_number = msg->seq_number;
-    c->target_ip = from->ip;
+    c->target_ip = ch->machine->ip;
     SET_CONTINUATION_HANDLER(c, (CacheContHandler)
                              & CacheContinuation::handleReplyEvent);
     c->start_time = ink_get_hrtime();
@@ -2532,6 +2537,9 @@ CacheContinuation::do_remote_lookup(Continuation * cont, CacheKey * key,
 
   if (!m)
     return (Action *) 0;
+  ClusterHandler *ch = m->pop_ClusterHandler();
+  if (!ch)
+    return (Action *) 0;
 
   // If we do not have a continuation, build one
 
@@ -2541,6 +2549,7 @@ CacheContinuation::do_remote_lookup(Continuation * cont, CacheKey * key,
     c->probe_depth = probe_depth;
     memcpy(c->past_probes, past_probes, sizeof(past_probes));
   }
+  c->ch = ch;
   // Save hostname data in case we need to do a local lookup.
   if (hostname && hostname_len) {
     // Alloc buffer, copy hostname data and attach to continuation
@@ -2601,7 +2610,7 @@ CacheContinuation::do_remote_lookup(Continuation * cont, CacheKey * key,
 #ifdef CACHE_MSG_TRACE
   log_cache_op_sndmsg(msg.seq_number, 0, "cache_lookup");
 #endif
-  clusterProcessor.invoke_remote(m, CACHE_LOOKUP_CLUSTER_FUNCTION, data, len);
+  clusterProcessor.invoke_remote(c->ch, CACHE_LOOKUP_CLUSTER_FUNCTION, data, len);
   return &c->action;
 }
 
@@ -2613,9 +2622,8 @@ CacheContinuation::do_remote_lookup(Continuation * cont, CacheKey * key,
 //   continuation set to CacheContinuation::replyLookupEvent()
 ////////////////////////////////////////////////////////////////////////////
 void
-cache_lookup_ClusterFunction(ClusterMachine * from, void *data, int len)
+cache_lookup_ClusterFunction(ClusterHandler *ch, void *data, int len)
 {
-  (void) from;
   (void) len;
   EThread *thread = this_ethread();
   ProxyMutex *mutex = thread->mutex;
@@ -2642,7 +2650,7 @@ cache_lookup_ClusterFunction(ClusterMachine * from, void *data, int len)
   MUTEX_TRY_LOCK(lock, c->mutex, this_ethread());
   c->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
   c->seq_number = msg->seq_number;
-  c->from = from;
+  c->from = ch->machine;
   c->url_md5 = msg->url_md5;
   SET_CONTINUATION_HANDLER(c, (CacheContHandler)
                            & CacheContinuation::replyLookupEvent);
@@ -2692,7 +2700,7 @@ CacheContinuation::replyLookupEvent(int event, void *d)
 #ifdef CACHE_MSG_TRACE
       log_cache_op_sndmsg(seq_number, event, "cache_result");
 #endif
-      clusterProcessor.invoke_remote(from, CACHE_OP_RESULT_CLUSTER_FUNCTION, msg, len);
+      clusterProcessor.invoke_remote(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION, msg, len);
     }
   } else {
     //////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterConfig.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterConfig.cc b/iocore/cluster/ClusterConfig.cc
index 0120158..94a0c5f 100644
--- a/iocore/cluster/ClusterConfig.cc
+++ b/iocore/cluster/ClusterConfig.cc
@@ -162,15 +162,20 @@ make_cluster_connections(MachineList * l, MachineList * old)
   // Connect to all new machines.
   //
   uint32_t ip = this_cluster_machine()->ip;
+  int num_connections = this_cluster_machine()->num_connections;
 
   if (l) {
-    for (int i = 0; i < l->n; i++)
+    for (int i = 0; i < l->n; i++) {
 #ifdef LOCAL_CLUSTER_TEST_MODE
-      if (ip < l->machine[i].ip || (ip == l->machine[i].ip && (cluster_port < l->machine[i].port)))
+      if (ip < l->machine[i].ip || (ip == l->machine[i].ip && (cluster_port < l->machine[i].port))) {
 #else
-      if (ip < l->machine[i].ip)
+      if (ip < l->machine[i].ip) {
 #endif
-        clusterProcessor.connect(l->machine[i].ip, l->machine[i].port);
+        for (int j = 0; j < num_connections; j++) {
+          clusterProcessor.connect(l->machine[i].ip, l->machine[i].port, j);
+        }
+      }
+    }
   }
 }
 
@@ -470,7 +475,7 @@ cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine **
       continue;
     }
 
-    return m;
+    return (m != this_cluster_machine()) ? m : NULL;
   }
   return NULL;
 }

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterHandler.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterHandler.cc b/iocore/cluster/ClusterHandler.cc
index 9260ac3..1fd2555 100644
--- a/iocore/cluster/ClusterHandler.cc
+++ b/iocore/cluster/ClusterHandler.cc
@@ -141,6 +141,9 @@ ClusterHandler::ClusterHandler()
     hostname(NULL),
     machine(NULL),
     ifd(-1),
+    id(-1),
+    dead(true),
+    downing(false),
     active(false),
     on_stolen_thread(false),
     n_channels(0),
@@ -226,12 +229,19 @@ ClusterHandler::ClusterHandler()
 
 ClusterHandler::~ClusterHandler()
 {
+  bool free_m = false;
   if (net_vc) {
     net_vc->do_io(VIO::CLOSE);
     net_vc = 0;
   }
-  if (machine)
-    free_ClusterMachine(machine);
+  if (machine) {
+    MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
+    if (++machine->free_connections >= machine->num_connections)
+      free_m = true;
+    MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
+    if (free_m)
+      free_ClusterMachine(machine);
+  }
   machine = NULL;
   ats_free(hostname);
   hostname = NULL;
@@ -302,10 +312,10 @@ ClusterHandler::close_ClusterVConnection(ClusterVConnection * vc)
   ink_assert(!vc->write_list_bytes);
   ink_assert(!vc->write_bytes_in_transit);
 
-  if (((!vc->remote_closed && !vc->have_all_data) || (vc->remote_closed == FORCE_CLOSE_ON_OPEN_CHANNEL)) && vc->machine) {
+  if (((!vc->remote_closed && !vc->have_all_data) || (vc->remote_closed == FORCE_CLOSE_ON_OPEN_CHANNEL)) && vc->ch) {
 
     CloseMessage msg;
-    int vers = CloseMessage::protoToVersion(vc->machine->msg_proto_major);
+    int vers = CloseMessage::protoToVersion(vc->ch->machine->msg_proto_major);
     void *data;
     int len;
 
@@ -323,7 +333,7 @@ ClusterHandler::close_ClusterVConnection(ClusterVConnection * vc)
       //////////////////////////////////////////////////////////////
       ink_release_assert(!"close_ClusterVConnection() bad msg version");
     }
-    clusterProcessor.invoke_remote(vc->machine, CLOSE_CHANNEL_CLUSTER_FUNCTION, data, len);
+    clusterProcessor.invoke_remote(vc->ch, CLOSE_CHANNEL_CLUSTER_FUNCTION, data, len);
   }
   ink_hrtime now = ink_get_hrtime();
   CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
@@ -858,7 +868,7 @@ ClusterHandler::process_set_data_msgs()
 
       if ((cluster_function_index < (uint32_t) SIZE_clusterFunction)
           && (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) {
-        clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(machine, p + (2 * sizeof(uint32_t)), len - sizeof(uint32_t));
+        clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this, p + (2 * sizeof(uint32_t)), len - sizeof(uint32_t));
         // Mark message as processed.
         *((uint32_t *) (p + sizeof(uint32_t))) = ~*((uint32_t *) (p + sizeof(uint32_t)));
         p += (2 * sizeof(uint32_t)) + (len - sizeof(uint32_t));
@@ -892,7 +902,7 @@ ClusterHandler::process_set_data_msgs()
           && (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) {
 
         char *p = ic->data;
-        clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(machine,
+        clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this,
                                                                (void *) (p + sizeof(int32_t)), ic->len - sizeof(int32_t));
 
         // Reverse swap since this will be processed again for deallocation.
@@ -953,7 +963,7 @@ ClusterHandler::process_small_control_msgs()
       // Cluster function, can only be processed in ET_CLUSTER thread
       //////////////////////////////////////////////////////////////////////
       p += sizeof(uint32_t);
-      clusterFunction[cluster_function_index].pfn(machine, p, len - sizeof(int32_t));
+      clusterFunction[cluster_function_index].pfn(this, p, len - sizeof(int32_t));
       p += (len - sizeof(int32_t));
 
     } else {
@@ -1011,7 +1021,7 @@ ClusterHandler::process_large_control_msgs()
 
     } else if (clusterFunction[cluster_function_index].ClusterFunc) {
       // Cluster message, process in ET_CLUSTER thread
-      clusterFunction[cluster_function_index].pfn(machine, (void *) (ic->data + sizeof(int32_t)),
+      clusterFunction[cluster_function_index].pfn(this, (void *) (ic->data + sizeof(int32_t)),
                                                   ic->len - sizeof(int32_t));
 
       // Deallocate memory
@@ -1201,7 +1211,7 @@ ClusterHandler::process_incoming_callouts(ProxyMutex * m)
           // Invoke processing function
           ////////////////////////////////
           ink_assert(!clusterFunction[cluster_function_index].ClusterFunc);
-          clusterFunction[cluster_function_index].pfn(machine, p, len - sizeof(int32_t));
+          clusterFunction[cluster_function_index].pfn(this, p, len - sizeof(int32_t));
           now = ink_get_hrtime();
           CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT, now - ic->recognized_time);
         } else {
@@ -1222,7 +1232,7 @@ ClusterHandler::process_incoming_callouts(ProxyMutex * m)
           // Invoke processing function
           ////////////////////////////////
           ink_assert(!clusterFunction[cluster_function_index].ClusterFunc);
-          clusterFunction[cluster_function_index].pfn(machine, (void *) (ic->data + sizeof(int32_t)),
+          clusterFunction[cluster_function_index].pfn(this, (void *) (ic->data + sizeof(int32_t)),
                                                       ic->len - sizeof(int32_t));
           now = ink_get_hrtime();
           CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT, now - ic->recognized_time);
@@ -1732,7 +1742,7 @@ ClusterHandler::build_controlmsg_descriptors()
       control_msgs_built++;
 
       if (clusterFunction[*(int32_t *) c->data].post_pfn) {
-        clusterFunction[*(int32_t *) c->data].post_pfn(machine, c->data + sizeof(int32_t), c->len);
+        clusterFunction[*(int32_t *) c->data].post_pfn(this, c->data + sizeof(int32_t), c->len);
       }
       continue;
     }
@@ -1829,7 +1839,7 @@ ClusterHandler::build_controlmsg_descriptors()
       control_msgs_built++;
 
       if (clusterFunction[*(int32_t *) c->data].post_pfn) {
-        clusterFunction[*(int32_t *) c->data].post_pfn(machine, c->data + sizeof(int32_t), c->len);
+        clusterFunction[*(int32_t *) c->data].post_pfn(this, c->data + sizeof(int32_t), c->len);
       }
     }
   }
@@ -2466,6 +2476,12 @@ ClusterHandler::mainClusterEvent(int event, Event * e)
   while (io_activity) {
     io_activity = 0;
     only_write_control_msgs = 0;
+
+    if (downing) {
+      machine_down();
+      break;
+    }
+
     //////////////////////////
     // Read Processing
     //////////////////////////
@@ -2502,7 +2518,7 @@ ClusterHandler::mainClusterEvent(int event, Event * e)
   }
 
 #ifdef CLUSTER_IMMEDIATE_NETIO
-  if (!machine->dead && ((event == EVENT_POLL) || (event == EVENT_INTERVAL))) {
+  if (!dead && ((event == EVENT_POLL) || (event == EVENT_INTERVAL))) {
     if (res >= 0) {
       build_poll(true);
     }
@@ -2518,7 +2534,7 @@ ClusterHandler::process_read(ink_hrtime now)
 #ifdef CLUSTER_STATS
   _process_read_calls++;
 #endif
-  if (machine->dead) {
+  if (dead) {
     // Node is down
     return 0;
   }
@@ -3011,7 +3027,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs)
         //
         // periodic activities
         //
-        if (!on_stolen_thread && !cur_vcs && !machine->dead) {
+        if (!on_stolen_thread && !cur_vcs && !dead) {
           //
           // check if this machine is supposed to be in the cluster
           //

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterHandlerBase.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterHandlerBase.cc b/iocore/cluster/ClusterHandlerBase.cc
index baa97af..c184e89 100644
--- a/iocore/cluster/ClusterHandlerBase.cc
+++ b/iocore/cluster/ClusterHandlerBase.cc
@@ -182,7 +182,7 @@ OutgoingControl::alloc()
 }
 
 OutgoingControl::OutgoingControl()
-:m(NULL), submit_time(0)
+:ch(NULL), submit_time(0)
 {
 }
 
@@ -195,7 +195,6 @@ OutgoingControl::startEvent(int event, Event * e)
   //
   (void) event;
   (void) e;
-  ClusterHandler *ch = m->clusterHandler;
   // verify that the machine has not gone down
   if (!ch || !ch->thread)
     return EVENT_DONE;
@@ -741,7 +740,7 @@ ClusterHandler::machine_down()
 {
   char textbuf[sizeof("255.255.255.255:65535")];
 
-  if (machine->dead) {
+  if (dead) {
     return EVENT_DONE;
   }
   //
@@ -754,7 +753,7 @@ ClusterHandler::machine_down()
 #ifdef LOCAL_CLUSTER_TEST_MODE
   Note("machine down %u.%u.%u.%u:%d", DOT_SEPARATED(ip), port);
 #else
-  Note("machine down %u.%u.%u.%u", DOT_SEPARATED(ip));
+  Note("machine down %u.%u.%u.%u:%d", DOT_SEPARATED(ip), id);
 #endif
 #ifdef NON_MODULAR
   machine_offline_APIcallout(ip);
@@ -771,7 +770,8 @@ ClusterHandler::machine_down()
 
   MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
   ClusterConfiguration *c = this_cluster()->current_configuration();
-  if (c->find(ip, port)) {
+  machine->clusterHandlers[id] = NULL;
+  if ((--machine->now_connections == 0) && c->find(ip, port)) {
     ClusterConfiguration *cc = configuration_remove_machine(c, machine);
     CLUSTER_DECREMENT_DYN_STAT(CLUSTER_NODES_STAT);
     this_cluster()->configurations.push(cc);
@@ -780,7 +780,7 @@ ClusterHandler::machine_down()
   MachineList *cc = the_cluster_config();
   if (cc && cc->find(ip, port) && connector) {
     Debug(CL_NOTE, "cluster connect retry for %hhu.%hhu.%hhu.%hhu", DOT_SEPARATED(ip));
-    clusterProcessor.connect(ip, port);
+    clusterProcessor.connect(ip, port, id);
   }
   return zombify();             // defer deletion of *this
 }
@@ -793,8 +793,7 @@ ClusterHandler::zombify(Event * e)
   // Node associated with *this is declared down, setup the event to cleanup
   // and defer deletion of *this
   //
-  machine->clusterHandler = NULL;
-  machine->dead = true;
+  dead = true;
   if (cluster_periodic_event) {
     cluster_periodic_event->cancel(this);
     cluster_periodic_event = NULL;
@@ -910,6 +909,8 @@ ClusterHandler::startClusterEvent(int event, Event * e)
         nodeClusteringVersion._port = cluster_port;
 #endif
         cluster_connect_state = ClusterHandler::CLCON_SEND_MSG_COMPLETE;
+        if (connector)
+          nodeClusteringVersion._id = id;
         build_data_vector((char *) &nodeClusteringVersion, sizeof(nodeClusteringVersion), false);
         if (!write.doIO()) {
           // i/o not initiated, delay and retry
@@ -1019,29 +1020,57 @@ ClusterHandler::startClusterEvent(int event, Event * e)
           break;                // goto next state
         }
 
-        // include this node into the cluster configuration
-        MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
-        MachineList *cc = the_cluster_config();
 #ifdef LOCAL_CLUSTER_TEST_MODE
         port = clusteringVersion._port & 0xffff;
 #endif
+        if (!connector)
+          id = clusteringVersion._id & 0xffff;
+
+        // include this node into the cluster configuration
+        MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
+        MachineList *cc = the_cluster_config();
         if (cc && cc->find(ip, port)) {
           ClusterConfiguration *c = this_cluster()->current_configuration();
-          if (!c->find(ip, port)) {
+          ClusterMachine *m = c->find(ip, port);
+          
+          if (!m) { // this first connection
             ClusterConfiguration *cconf = configuration_add_machine(c, machine);
             CLUSTER_INCREMENT_DYN_STAT(CLUSTER_NODES_STAT);
             this_cluster()->configurations.push(cconf);
           } else {
-            // duplicate cluster connect, ignore
-            failed = -2;
-            Debug(CL_NOTE, "duplicate cluster connect %u.%u.%u.%u", DOT_SEPARATED(ip));
+            if (m->num_connections > machine->num_connections) {
+              // close old needlessness connection if new num_connections < old num_connections
+              for (int i = machine->num_connections; i < m->num_connections; i++) {
+                if (m->clusterHandlers[i])
+                  m->clusterHandlers[i]->downing = true;
+              }
+              m->num_connections = machine->num_connections;
+              m->free_connections -= (m->num_connections - machine->num_connections);
+              // delete_this
+              failed = -2;
+              MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
+              goto failed;
+            } else {
+              m->num_connections = machine->num_connections;
+              // close new connection if old connections is exist
+              if (id >= m->num_connections || m->clusterHandlers[id]) {
+                failed = -2;
+                MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
+                goto failed;
+              }
+              machine = m;
+            }
           }
+          machine->now_connections++;
+          machine->clusterHandlers[id] = this;
+          machine->dead = false;
+          dead = false;
         } else {
           Debug(CL_NOTE, "cluster connect aborted, machine %u.%u.%u.%u:%d not in cluster", DOT_SEPARATED(ip), port);
           failed = -1;
         }
         MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
-
+failed:
         if (failed) {
           if (failed == -1) {
             if (++configLookupFails <= CONFIG_LOOKUP_RETRIES) {
@@ -1054,7 +1083,6 @@ ClusterHandler::startClusterEvent(int event, Event * e)
         }
 
         this->needByteSwap = !clusteringVersion.NativeByteOrder();
-        machine->clusterHandler = this;
         machine->msg_proto_major = proto_major;
         machine->msg_proto_minor = proto_minor;
 #ifdef NON_MODULAR
@@ -1067,8 +1095,8 @@ ClusterHandler::startClusterEvent(int event, Event * e)
         Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d",
              DOT_SEPARATED(ip), port, clusteringVersion._major, clusteringVersion._minor);
 #else
-        Note("machine up %hhu.%hhu.%hhu.%hhu, protocol version=%d.%d",
-             DOT_SEPARATED(ip), clusteringVersion._major, clusteringVersion._minor);
+        Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d",
+             DOT_SEPARATED(ip), id, clusteringVersion._major, clusteringVersion._minor);
 #endif
         thread = e->ethread;
         read_vcs = NEW((new Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_read_link>[CLUSTER_BUCKETS]));
@@ -1103,7 +1131,7 @@ ClusterHandler::startClusterEvent(int event, Event * e)
         // Start cluster interconnect load monitoring
 
         if (!clm) {
-          clm = new ClusterLoadMonitor(machine);
+          clm = new ClusterLoadMonitor(this);
           clm->init();
         }
         return EVENT_DONE;
@@ -1114,10 +1142,8 @@ ClusterHandler::startClusterEvent(int event, Event * e)
       {
         if (connector) {
           Debug(CL_NOTE, "cluster connect retry for %u.%u.%u.%u", DOT_SEPARATED(ip));
-          ClusterConfiguration *cc = this_cluster()->current_configuration();
           // check for duplicate cluster connect
-          if (!cc->find(ip))
-            clusterProcessor.connect(ip, port, true);
+          clusterProcessor.connect(ip, port, id, true);
         }
         cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT;
         break;                  // goto next state
@@ -1243,7 +1269,7 @@ ClusterHandler::protoZombieEvent(int event, Event * e)
       vc = channels[i];
       if (VALID_CHANNEL(vc)) {
         if (vc->closed) {
-          vc->machine = 0;
+          vc->ch = 0;
           vc->write_list = 0;
           vc->write_list_tail = 0;
           vc->write_list_bytes = 0;

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterLoadMonitor.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterLoadMonitor.cc b/iocore/cluster/ClusterLoadMonitor.cc
index 848307d..7da7b21 100644
--- a/iocore/cluster/ClusterLoadMonitor.cc
+++ b/iocore/cluster/ClusterLoadMonitor.cc
@@ -49,12 +49,12 @@ int
 int
   ClusterLoadMonitor::cf_cluster_load_exceed_duration;
 
-ClusterLoadMonitor::ClusterLoadMonitor(ClusterMachine * m)
-:Continuation(0), machine(m), ping_history_buf_head(0),
+ClusterLoadMonitor::ClusterLoadMonitor(ClusterHandler * ch)
+:Continuation(0), ch(ch), ping_history_buf_head(0),
 periodic_action(0), cluster_overloaded(0), cancel_periodic(0),
 cluster_load_msg_sequence_number(0), cluster_load_msg_start_sequence_number(0)
 {
-  mutex = this->machine->clusterHandler->mutex;
+  mutex = this->ch->mutex;
   SET_HANDLER(&ClusterLoadMonitor::cluster_load_periodic);
 
   ping_message_send_msec_interval = cf_ping_message_send_msec_interval ? cf_ping_message_send_msec_interval : 100;
@@ -220,20 +220,20 @@ ClusterLoadMonitor::compute_cluster_load()
   }
   Debug("cluster_monitor",
         "[%u.%u.%u.%u] overload=%d, clear=%d, exceed=%d, latency=%d",
-        DOT_SEPARATED(this->machine->ip), cluster_overloaded, threshold_clear, threshold_exceeded, n_bucket);
+        DOT_SEPARATED(this->ch->machine->ip), cluster_overloaded, threshold_clear, threshold_exceeded, n_bucket);
 }
 
 void
 ClusterLoadMonitor::note_ping_response_time(ink_hrtime response_time, int sequence_number)
 {
 #ifdef CLUSTER_TOMCAT
-  ProxyMutex *mutex = this->machine->clusterHandler->mutex;     // hack for stats
+  ProxyMutex *mutex = this->ch->mutex;     // hack for stats
 #endif
 
   CLUSTER_SUM_DYN_STAT(CLUSTER_PING_TIME_STAT, response_time);
   int bucket = (int)
     (response_time / HRTIME_MSECONDS(msecs_per_ping_response_bucket));
-  Debug("cluster_monitor_ping", "[%u.%u.%u.%u] ping: %d %d", DOT_SEPARATED(this->machine->ip), bucket, sequence_number);
+  Debug("cluster_monitor_ping", "[%u.%u.%u.%u] ping: %d %d", DOT_SEPARATED(this->ch->machine->ip), bucket, sequence_number);
 
   if (bucket >= num_ping_response_buckets)
     bucket = num_ping_response_buckets - 1;
@@ -255,12 +255,11 @@ ClusterLoadMonitor::recv_cluster_load_msg(cluster_load_ping_msg * m)
 }
 
 void
-ClusterLoadMonitor::cluster_load_ping_rethandler(ClusterMachine * from, void *data, int len)
+ClusterLoadMonitor::cluster_load_ping_rethandler(ClusterHandler *ch, void *data, int len)
 {
   // Global cluster load ping message return handler which
   // dispatches the result to the class specific handler.
 
-  ClusterHandler *ch = from->clusterHandler;
   if (ch) {
     if (len == sizeof(struct cluster_load_ping_msg)) {
       struct cluster_load_ping_msg m;
@@ -283,7 +282,7 @@ ClusterLoadMonitor::send_cluster_load_msg(ink_hrtime current_time)
 
   m.sequence_number = cluster_load_msg_sequence_number++;
   m.send_time = current_time;
-  cluster_ping(machine, cluster_load_ping_rethandler, (void *) &m, sizeof(m));
+  cluster_ping(ch, cluster_load_ping_rethandler, (void *) &m, sizeof(m));
 }
 
 int

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterMachine.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterMachine.cc b/iocore/cluster/ClusterMachine.cc
index 31fab65..cd056e2 100644
--- a/iocore/cluster/ClusterMachine.cc
+++ b/iocore/cluster/ClusterMachine.cc
@@ -68,9 +68,13 @@ dead(false),
 hostname(ahostname),
 ip(aip),
 cluster_port(aport),
+num_connections(0),
+now_connections(0),
+free_connections(0),
+rr_count(0),
 msg_proto_major(0),
 msg_proto_minor(0),
-clusterHandler(0)
+clusterHandlers(0)
 {
   EThread *thread = this_ethread();
   ProxyMutex *mutex = thread->mutex;
@@ -141,11 +145,23 @@ clusterHandler(0)
     hostname_len = strlen(hostname);
   else
     hostname_len = 0;
+
+  IOCORE_ReadConfigInteger(num_connections, "proxy.config.cluster.num_of_cluster_connections");
+  clusterHandlers = (ClusterHandler **)ats_calloc(num_connections, sizeof(ClusterHandler *));
+}
+
+ClusterHandler *ClusterMachine::pop_ClusterHandler(int no_rr)
+{
+  int64_t now = rr_count;
+  if (no_rr == 0)
+    ink_atomic_increment64(&rr_count, 1);
+  return this->clusterHandlers[now % this->num_connections];
 }
 
 ClusterMachine::~ClusterMachine()
 {
   ats_free(hostname);
+  ats_free(clusterHandlers);
 }
 
 #ifndef INK_NO_CLUSTER

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterProcessor.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterProcessor.cc b/iocore/cluster/ClusterProcessor.cc
index 6f20f5e..aa2b188 100644
--- a/iocore/cluster/ClusterProcessor.cc
+++ b/iocore/cluster/ClusterProcessor.cc
@@ -52,7 +52,7 @@ ClusterProcessor::~ClusterProcessor()
 }
 
 int
-ClusterProcessor::internal_invoke_remote(ClusterMachine * m, int cluster_fn,
+ClusterProcessor::internal_invoke_remote(ClusterHandler *ch, int cluster_fn,
                                          void *data, int len, int options, void *cmsg)
 {
   EThread *thread = this_ethread();
@@ -67,7 +67,6 @@ ClusterProcessor::internal_invoke_remote(ClusterMachine * m, int cluster_fn,
   bool malloced = (cluster_fn == CLUSTER_FUNCTION_MALLOCED);
   OutgoingControl *c;
 
-  ClusterHandler *ch = m->clusterHandler;
   if (!ch || (!malloced && !((unsigned int) cluster_fn < (uint32_t) SIZE_clusterFunction))) {
     // Invalid message or node is down, free message data
     if (malloced) {
@@ -94,7 +93,6 @@ ClusterProcessor::internal_invoke_remote(ClusterMachine * m, int cluster_fn,
     c = OutgoingControl::alloc();
   }
   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CTRL_MSGS_SENT_STAT);
-  c->m = m;
   c->submit_time = ink_get_hrtime();
 
   if (malloced) {
@@ -152,13 +150,13 @@ ClusterProcessor::internal_invoke_remote(ClusterMachine * m, int cluster_fn,
 }
 
 int
-ClusterProcessor::invoke_remote(ClusterMachine * m, int cluster_fn, void *data, int len, int options)
+ClusterProcessor::invoke_remote(ClusterHandler *ch, int cluster_fn, void *data, int len, int options)
 {
-  return internal_invoke_remote(m, cluster_fn, data, len, options, (void *) NULL);
+  return internal_invoke_remote(ch, cluster_fn, data, len, options, (void *) NULL);
 }
 
 int
-ClusterProcessor::invoke_remote_data(ClusterMachine * m, int cluster_fn,
+ClusterProcessor::invoke_remote_data(ClusterHandler *ch, int cluster_fn,
                                      void *data, int data_len,
                                      IOBufferBlock * buf,
                                      int dest_channel, ClusterVCToken * token,
@@ -166,7 +164,7 @@ ClusterProcessor::invoke_remote_data(ClusterMachine * m, int cluster_fn,
 {
   if (!buf) {
     // No buffer data, translate this into a invoke_remote() request
-    return internal_invoke_remote(m, cluster_fn, data, data_len, options, (void *) NULL);
+    return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) NULL);
   }
   ink_assert(data);
   ink_assert(data_len);
@@ -197,7 +195,7 @@ ClusterProcessor::invoke_remote_data(ClusterMachine * m, int cluster_fn,
   *(int32_t *) chdr->data = -1;   // always -1 for compound message
   memcpy(chdr->data + sizeof(int32_t), (char *) &mh, sizeof(mh));
 
-  return internal_invoke_remote(m, cluster_fn, data, data_len, options, (void *) chdr);
+  return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) chdr);
 }
 
 void
@@ -239,7 +237,7 @@ ClusterProcessor::open_local(Continuation * cont, ClusterMachine * m, ClusterVCT
   bool immediate = ((options & CLUSTER_OPT_IMMEDIATE) ? true : false);
   bool allow_immediate = ((options & CLUSTER_OPT_ALLOW_IMMEDIATE) ? true : false);
 
-  ClusterHandler *ch = m->clusterHandler;
+  ClusterHandler *ch = ((CacheContinuation *)cont)->ch;
   if (!ch)
     return NULL;
   EThread *t = ch->thread;
@@ -252,8 +250,9 @@ ClusterProcessor::open_local(Continuation * cont, ClusterMachine * m, ClusterVCT
   vc->new_connect_read = (options & CLUSTER_OPT_CONN_READ ? 1 : 0);
   vc->start_time = ink_get_hrtime();
   vc->last_activity_time = vc->start_time;
-  vc->machine = m;
+  vc->ch = ch;
   vc->token.alloc();
+  vc->token.ch_id = ch->id;
   token = vc->token;
 #ifdef CLUSTER_THREAD_STEALING
   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPENNED_STAT);
@@ -306,7 +305,9 @@ ClusterProcessor::connect_local(Continuation * cont, ClusterVCToken * token, int
 #endif
   if (!m)
     return NULL;
-  ClusterHandler *ch = m->clusterHandler;
+  if (token->ch_id >= (uint32_t)m->num_connections)
+    return NULL;
+  ClusterHandler *ch = m->clusterHandlers[token->ch_id];
   if (!ch)
     return NULL;
   EThread *t = ch->thread;
@@ -319,7 +320,7 @@ ClusterProcessor::connect_local(Continuation * cont, ClusterVCToken * token, int
   vc->new_connect_read = (options & CLUSTER_OPT_CONN_READ ? 1 : 0);
   vc->start_time = ink_get_hrtime();
   vc->last_activity_time = vc->start_time;
-  vc->machine = m;
+  vc->ch = ch;
   vc->token = *token;
   vc->channel = channel;
 #ifdef CLUSTER_THREAD_STEALING
@@ -354,11 +355,7 @@ ClusterProcessor::connect_local(Continuation * cont, ClusterVCToken * token, int
 
 bool ClusterProcessor::disable_remote_cluster_ops(ClusterMachine * m)
 {
-  if (!m)
-    return false;
-
-  ClusterHandler *
-    ch = m->clusterHandler;
+  ClusterHandler *ch = m->pop_ClusterHandler(1);
   if (ch) {
     return ch->disable_remote_cluster_ops;
   } else {
@@ -753,7 +750,7 @@ ClusterProcessor::start()
 }
 
 void
-ClusterProcessor::connect(char *hostname)
+ClusterProcessor::connect(char *hostname, int16_t id)
 {
   //
   // Construct a cluster link to the given machine
@@ -762,11 +759,12 @@ ClusterProcessor::connect(char *hostname)
   SET_CONTINUATION_HANDLER(ch, (ClusterContHandler) & ClusterHandler::connectClusterEvent);
   ch->hostname = ats_strdup(hostname);
   ch->connector = true;
+  ch->id = id;
   eventProcessor.schedule_imm(ch, ET_CLUSTER);
 }
 
 void
-ClusterProcessor::connect(unsigned int ip, int port, bool delay)
+ClusterProcessor::connect(unsigned int ip, int port, int16_t id, bool delay)
 {
   //
   // Construct a cluster link to the given machine
@@ -776,6 +774,7 @@ ClusterProcessor::connect(unsigned int ip, int port, bool delay)
   ch->ip = ip;
   ch->port = port;
   ch->connector = true;
+  ch->id = id;
   if (delay)
     eventProcessor.schedule_in(ch, CLUSTER_MEMBER_DELAY, ET_CLUSTER);
   else
@@ -814,7 +813,7 @@ ClusterProcessor::send_machine_list(ClusterMachine * m)
     //////////////////////////////////////////////////////////////
     ink_release_assert(!"send_machine_list() bad msg version");
   }
-  invoke_remote(m, MACHINE_LIST_CLUSTER_FUNCTION, data, len);
+  invoke_remote(m->pop_ClusterHandler(), MACHINE_LIST_CLUSTER_FUNCTION, data, len);
 }
 
 void

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterRPC.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterRPC.cc b/iocore/cluster/ClusterRPC.cc
index 58302af..9aa132a 100644
--- a/iocore/cluster/ClusterRPC.cc
+++ b/iocore/cluster/ClusterRPC.cc
@@ -34,27 +34,26 @@
 /////////////////////////////////////////////////////////////////////////
 
 void
-ping_ClusterFunction(ClusterMachine * from, void *data, int len)
+ping_ClusterFunction(ClusterHandler *ch, void *data, int len)
 {
   //
   // Just return the data back
   //
-  clusterProcessor.invoke_remote(from, PING_REPLY_CLUSTER_FUNCTION, data, len);
+  clusterProcessor.invoke_remote(ch, PING_REPLY_CLUSTER_FUNCTION, data, len);
 }
 
 void
-ping_reply_ClusterFunction(ClusterMachine * from, void *data, int len)
+ping_reply_ClusterFunction(ClusterHandler *ch, void *data, int len)
 {
   //
   // Pass back the data.
   //
-  (void) from;
   PingMessage *msg = (PingMessage *) data;
-  msg->fn(from, msg->data, (len - msg->sizeof_fixedlen_msg()));
+  msg->fn(ch, msg->data, (len - msg->sizeof_fixedlen_msg()));
 }
 
 void
-machine_list_ClusterFunction(ClusterMachine * from, void *data, int len)
+machine_list_ClusterFunction(ClusterHandler * from, void *data, int len)
 {
   (void) from;
   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
@@ -82,14 +81,19 @@ machine_list_ClusterFunction(ClusterMachine * from, void *data, int len)
         goto Lfound;
     }
     // not found, must be a new machine
-    clusterProcessor.connect(m->ip[i]);
+    {
+      int num_connections = this_cluster_machine()->num_connections;
+      for (int k = 0; k < num_connections; k++) {
+        clusterProcessor.connect(m->ip[i], k);
+      }
+    }
   Lfound:
     ;
   }
 }
 
 void
-close_channel_ClusterFunction(ClusterMachine * from, void *data, int len)
+close_channel_ClusterFunction(ClusterHandler *ch, void *data, int len)
 {
   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
   CloseMessage *m = (CloseMessage *) data;
@@ -106,7 +110,6 @@ close_channel_ClusterFunction(ClusterMachine * from, void *data, int len)
   // Close the remote side of a VC connection (remote node is originator)
   //
   ink_assert(len >= (int) sizeof(CloseMessage));
-  ClusterHandler *ch = from->clusterHandler;
   if (!ch || !ch->channels)
     return;
   ClusterVConnection *vc = ch->channels[m->channel];
@@ -117,13 +120,13 @@ close_channel_ClusterFunction(ClusterMachine * from, void *data, int len)
 }
 
 void
-test_ClusterFunction(ClusterMachine * m, void *data, int len)
+test_ClusterFunction(ClusterHandler *ch, void *data, int len)
 {
   //
   // Note: Only for testing.
   //
   if (ptest_ClusterFunction)
-    ptest_ClusterFunction(m, data, len);
+    ptest_ClusterFunction(ch, data, len);
 }
 
 CacheVC *
@@ -157,7 +160,7 @@ ChannelToCacheWriteVC(ClusterHandler * ch, int channel, uint32_t channel_seqno,
 }
 
 void
-set_channel_data_ClusterFunction(ClusterMachine * from, void *tdata, int tlen)
+set_channel_data_ClusterFunction(ClusterHandler *ch, void *tdata, int tlen)
 {
   EThread *thread = this_ethread();
   ProxyMutex *mutex = thread->mutex;
@@ -189,7 +192,6 @@ set_channel_data_ClusterFunction(ClusterMachine * from, void *tdata, int tlen)
 
   ClusterVConnection *cvc;
   CacheVC *cache_vc;
-  ClusterHandler *ch = from->clusterHandler;
 
   if (ch) {
     cache_vc = ChannelToCacheWriteVC(ch, m->channel, m->sequence_number, &cvc);
@@ -228,7 +230,7 @@ set_channel_data_ClusterFunction(ClusterMachine * from, void *tdata, int tlen)
 }
 
 void
-post_setchan_send_ClusterFunction(ClusterMachine * to, void *data, int len)
+post_setchan_send_ClusterFunction(ClusterHandler *ch, void *data, int len)
 {
   NOWARN_UNUSED(len);
   EThread *thread = this_ethread();
@@ -240,7 +242,6 @@ post_setchan_send_ClusterFunction(ClusterMachine * to, void *data, int len)
   // initial open_write data after (n_set_data_msgs == 0).
 
   SetChanDataMessage *m = (SetChanDataMessage *) data;
-  ClusterHandler *ch = to->clusterHandler;
   ClusterVConnection *cvc;
 
   if (ch) {
@@ -256,7 +257,7 @@ post_setchan_send_ClusterFunction(ClusterMachine * to, void *data, int len)
 }
 
 void
-set_channel_pin_ClusterFunction(ClusterMachine * from, void *data, int len)
+set_channel_pin_ClusterFunction(ClusterHandler *ch, void *data, int len)
 {
   NOWARN_UNUSED(len);
   // This isn't used. /leif
@@ -279,9 +280,8 @@ set_channel_pin_ClusterFunction(ClusterMachine * from, void *data, int len)
 
   ClusterVConnection *cvc = NULL;       // Just to make GCC happy
   CacheVC *cache_vc;
-  ClusterHandler *ch;
 
-  if ((ch = from->clusterHandler) != 0) {
+  if (ch != 0) {
     cache_vc = ChannelToCacheWriteVC(ch, m->channel, m->sequence_number, &cvc);
     if (cache_vc) {
       cache_vc->set_pin_in_cache(m->pin_time);
@@ -292,7 +292,7 @@ set_channel_pin_ClusterFunction(ClusterMachine * from, void *data, int len)
 }
 
 void
-post_setchan_pin_ClusterFunction(ClusterMachine * to, void *data, int len)
+post_setchan_pin_ClusterFunction(ClusterHandler *ch, void *data, int len)
 {
   NOWARN_UNUSED(len);
   EThread *thread = this_ethread();
@@ -304,7 +304,6 @@ post_setchan_pin_ClusterFunction(ClusterMachine * to, void *data, int len)
   // initial open_write data after (n_set_data_msgs == 0).
 
   SetChanPinMessage *m = (SetChanPinMessage *) data;
-  ClusterHandler *ch = to->clusterHandler;
   ClusterVConnection *cvc;
 
   if (ch) {
@@ -320,7 +319,7 @@ post_setchan_pin_ClusterFunction(ClusterMachine * to, void *data, int len)
 }
 
 void
-set_channel_priority_ClusterFunction(ClusterMachine * from, void *data, int len)
+set_channel_priority_ClusterFunction(ClusterHandler *ch, void *data, int len)
 {
   NOWARN_UNUSED(len);
   // This isn't used.
@@ -342,9 +341,8 @@ set_channel_priority_ClusterFunction(ClusterMachine * from, void *data, int len)
 
   ClusterVConnection *cvc = NULL;       // Just to make GCC happy
   CacheVC *cache_vc;
-  ClusterHandler *ch;
 
-  if ((ch = from->clusterHandler) != 0) {
+  if (ch != 0) {
     cache_vc = ChannelToCacheWriteVC(ch, m->channel, m->sequence_number, &cvc);
     if (cache_vc) {
       cache_vc->set_disk_io_priority(m->disk_priority);
@@ -355,7 +353,7 @@ set_channel_priority_ClusterFunction(ClusterMachine * from, void *data, int len)
 }
 
 void
-post_setchan_priority_ClusterFunction(ClusterMachine * to, void *data, int len)
+post_setchan_priority_ClusterFunction(ClusterHandler *ch, void *data, int len)
 {
   NOWARN_UNUSED(len);
   EThread *thread = this_ethread();
@@ -368,7 +366,6 @@ post_setchan_priority_ClusterFunction(ClusterMachine * to, void *data, int len)
   // initial open_write data after (n_set_data_msgs == 0).
 
   SetChanPriorityMessage *m = (SetChanPriorityMessage *) data;
-  ClusterHandler *ch = to->clusterHandler;
   ClusterVConnection *cvc;
 
   if (ch) {

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterVConnection.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterVConnection.cc b/iocore/cluster/ClusterVConnection.cc
index bec5445..84dd223 100644
--- a/iocore/cluster/ClusterVConnection.cc
+++ b/iocore/cluster/ClusterVConnection.cc
@@ -186,7 +186,7 @@ ClusterVConnectionBase::reenable_re(VIO * vio)
 }
 
 ClusterVConnection::ClusterVConnection(int is_new_connect_read)
-  :  machine(NULL),
+  :  ch(NULL),
      new_connect_read(is_new_connect_read),
      remote_free(0),
      last_local_free(0),
@@ -306,7 +306,6 @@ ClusterVConnection::start(EThread * t)
   ///////////////////////////////////////////////////////////////////////////
 
   int status;
-  ClusterHandler *ch = NULL;
   if (!channel) {
 #ifdef CLUSTER_TOMCAT
     Ptr<ProxyMutex> m = action_.mutex;
@@ -323,7 +322,6 @@ ClusterVConnection::start(EThread * t)
       t->schedule_in(this, CLUSTER_CONNECT_RETRY);
       return EVENT_DONE;
     }
-    ch = machine->clusterHandler;
     if (!ch) {
       if (action_.continuation) {
         action_.continuation->handleEvent(CLUSTER_EVENT_OPEN_FAILED, (void *) -ECLUSTER_NO_MACHINE);
@@ -356,7 +354,6 @@ ClusterVConnection::start(EThread * t)
 
   } else {
     // Establish the remote side of the VC connection
-    ch = machine->clusterHandler;
     if ((status = ch->alloc_channel(this, channel)) < 0) {
       Debug(CL_TRACE, "VC start alloc remote failed chan=%d VC=%p", channel, this);
       clusterVCAllocator_free(this);
@@ -504,7 +501,7 @@ ClusterVConnection::set_http_info(CacheHTTPInfo * d)
   //   already done
   ink_release_assert(this->read.vio.op == VIO::NONE);   // should always be true
 
-  int vers = SetChanDataMessage::protoToVersion(machine->msg_proto_major);
+  int vers = SetChanDataMessage::protoToVersion(ch->machine->msg_proto_major);
   if (vers == SetChanDataMessage::SET_CHANNEL_DATA_MESSAGE_VERSION) {
     flen = SetChanDataMessage::sizeof_fixedlen_msg();
   } else {
@@ -537,7 +534,7 @@ ClusterVConnection::set_http_info(CacheHTTPInfo * d)
   // note pending set_data() msgs on VC.
   ink_atomic_increment(&n_set_data_msgs, 1);
 
-  clusterProcessor.invoke_remote(machine, SET_CHANNEL_DATA_CLUSTER_FUNCTION, data, flen + len);
+  clusterProcessor.invoke_remote(ch, SET_CHANNEL_DATA_CLUSTER_FUNCTION, data, flen + len);
 }
 
 bool ClusterVConnection::set_pin_in_cache(time_t t)
@@ -554,7 +551,7 @@ bool ClusterVConnection::set_pin_in_cache(time_t t)
   ink_release_assert(this->read.vio.op == VIO::NONE);   // should always be true
   time_pin = t;
 
-  int vers = SetChanPinMessage::protoToVersion(machine->msg_proto_major);
+  int vers = SetChanPinMessage::protoToVersion(ch->machine->msg_proto_major);
 
   if (vers == SetChanPinMessage::SET_CHANNEL_PIN_MESSAGE_VERSION) {
     SetChanPinMessage::sizeof_fixedlen_msg();
@@ -571,7 +568,7 @@ bool ClusterVConnection::set_pin_in_cache(time_t t)
   // note pending set_data() msgs on VC.
   ink_atomic_increment(&n_set_data_msgs, 1);
 
-  clusterProcessor.invoke_remote(machine, SET_CHANNEL_PIN_CLUSTER_FUNCTION, (char *) &msg, sizeof(msg));
+  clusterProcessor.invoke_remote(ch, SET_CHANNEL_PIN_CLUSTER_FUNCTION, (char *) &msg, sizeof(msg));
   return true;
 }
 
@@ -594,7 +591,7 @@ bool ClusterVConnection::set_disk_io_priority(int priority)
   ink_release_assert(this->read.vio.op == VIO::NONE);   // should always be true
   disk_io_priority = priority;
 
-  int vers = SetChanPriorityMessage::protoToVersion(machine->msg_proto_major);
+  int vers = SetChanPriorityMessage::protoToVersion(ch->machine->msg_proto_major);
 
   if (vers == SetChanPriorityMessage::SET_CHANNEL_PRIORITY_MESSAGE_VERSION) {
     SetChanPriorityMessage::sizeof_fixedlen_msg();
@@ -611,7 +608,7 @@ bool ClusterVConnection::set_disk_io_priority(int priority)
   // note pending set_data() msgs on VC.
   ink_atomic_increment(&n_set_data_msgs, 1);
 
-  clusterProcessor.invoke_remote(machine, SET_CHANNEL_PRIORITY_CLUSTER_FUNCTION, (char *) &msg, sizeof(msg));
+  clusterProcessor.invoke_remote(ch, SET_CHANNEL_PRIORITY_CLUSTER_FUNCTION, (char *) &msg, sizeof(msg));
   return true;
 }
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/P_ClusterCache.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_ClusterCache.h b/iocore/cluster/P_ClusterCache.h
index 4afc6ae..4fac3c0 100644
--- a/iocore/cluster/P_ClusterCache.h
+++ b/iocore/cluster/P_ClusterCache.h
@@ -63,7 +63,7 @@
 //   changes
 //
 #define CLUSTER_MAJOR_VERSION               3
-#define CLUSTER_MINOR_VERSION               0
+#define CLUSTER_MINOR_VERSION               2
 
 // Lowest supported major/minor cluster version
 #define MIN_CLUSTER_MAJOR_VERSION	    CLUSTER_MAJOR_VERSION
@@ -286,6 +286,7 @@ struct ClusterVCToken
   // Marshal this data to send the token across the cluster
   //
   uint32_t ip_created;
+  uint32_t ch_id;
   uint32_t sequence_number;
 
   bool is_clear()
@@ -298,8 +299,8 @@ struct ClusterVCToken
     sequence_number = 0;
   }
 
-  ClusterVCToken(unsigned int aip = 0, unsigned int aseq = 0)
-:  ip_created(aip), sequence_number(aseq) {
+  ClusterVCToken(unsigned int aip = 0, unsigned int id = 0, unsigned int aseq = 0)
+:  ip_created(aip), ch_id(id), sequence_number(aseq) {
   }
   //
   // Private
@@ -308,6 +309,7 @@ struct ClusterVCToken
 
   inline void SwapBytes()
   {
+    ats_swap32(&ch_id);
     ats_swap32(&sequence_number);
   }
 };
@@ -317,7 +319,7 @@ struct ClusterVCToken
 //   A pointer to a procedure which can be invoked accross the cluster.
 //   This must be registered.
 //
-typedef void ClusterFunction(ClusterMachine * from, void *data, int len);
+typedef void ClusterFunction(ClusterHandler * ch, void *data, int len);
 typedef ClusterFunction *ClusterFunctionPtr;
 
 struct ClusterVConnectionBase;
@@ -515,7 +517,7 @@ struct ClusterVConnection: public ClusterVConnectionBase
 
   virtual void do_io_close(int lerrno = -1);
 
-  ClusterMachine *machine;
+  ClusterHandler *ch;
   //
   //  Read Channel: (new_connect_read == true)
   //     - open_local()    caller is reader
@@ -631,18 +633,18 @@ struct ClusterProcessor
   // Options: CLUSTER_OPT_DELAY, CLUSTER_OPT_STEAL, CLUSTER_OPT_DATA_IS_OCONTROL
   // Returns: 1 for immediate send, 0 for delayed, -1 for error
 
-  int invoke_remote(ClusterMachine * m, int cluster_fn_index, void *data, int len, int options = CLUSTER_OPT_STEAL);
+  int invoke_remote(ClusterHandler *ch, int cluster_fn_index, void *data, int len, int options = CLUSTER_OPT_STEAL);
 
-  int invoke_remote_data(ClusterMachine * m, int cluster_fn_index,
+  int invoke_remote_data(ClusterHandler *ch, int cluster_fn_index,
                          void *data, int data_len,
                          IOBufferBlock * buf,
                          int logical_channel, ClusterVCToken * token,
                          void (*bufdata_free) (void *), void *bufdata_free_arg, int options = CLUSTER_OPT_STEAL);
 
   // Pass the data in as a malloc'ed block to be freed by callee
-  int invoke_remote_malloced(ClusterMachine * m, ClusterRemoteDataHeader * data, int len /* including header */ )
+  int invoke_remote_malloced(ClusterHandler *ch, ClusterRemoteDataHeader * data, int len /* including header */ )
   {
-    return invoke_remote(m, CLUSTER_FUNCTION_MALLOCED, data, len);
+    return invoke_remote(ch, CLUSTER_FUNCTION_MALLOCED, data, len);
   }
   void free_remote_data(char *data, int len);
 
@@ -662,7 +664,7 @@ struct ClusterProcessor
 
 #define CLUSTER_DELAYED_OPEN       ((ClusterVConnection*)-1)
 #define CLUSTER_NODE_DOWN          ((ClusterVConnection*)-2)
-  ClusterVConnection *open_local(Continuation * cont, ClusterMachine * m, ClusterVCToken & token, int options = 0);
+  ClusterVConnection *open_local(Continuation * cont, ClusterMachine * mp, ClusterVCToken & token, int options = 0);
 
   // Get the other side of a remote VConnection which was previously
   // allocated with open.
@@ -688,13 +690,13 @@ struct ClusterProcessor
   ClusterAccept *accept_handler;
   Cluster *this_cluster;
   // Connect to a new cluster machine
-  void connect(char *hostname);
-  void connect(unsigned int ip, int port = 0, bool delay = false);
+  void connect(char *hostname, int16_t id = -1);
+  void connect(unsigned int ip, int port = 0, int16_t id = -1, bool delay = false);
   // send the list of known machines to new machine
   void send_machine_list(ClusterMachine * m);
   void compute_cluster_mode();
   // Internal invoke_remote interface
-  int internal_invoke_remote(ClusterMachine * m, int cluster_fn, void *data, int len, int options, void *cmsg);
+  int internal_invoke_remote(ClusterHandler * m, int cluster_fn, void *data, int len, int options, void *cmsg);
 };
 
 inkcoreapi extern ClusterProcessor clusterProcessor;
@@ -996,11 +998,12 @@ struct ClusterHelloMessage
   uint16_t _minor;
   uint16_t _min_major;
   uint16_t _min_minor;
+  int16_t _id;
 #ifdef LOCAL_CLUSTER_TEST_MODE
   int16_t _port;
-  char _pad[116];               // pad out to 128 bytes
+  char _pad[114];               // pad out to 128 bytes
 #else
-  char _pad[118];               // pad out to 128 bytes
+  char _pad[116];               // pad out to 128 bytes
 #endif
 
     ClusterHelloMessage():_NativeByteOrder(1)
@@ -1068,7 +1071,7 @@ struct ClusterMessageHeader
 //
 // cluster_ping
 //
-typedef void (*PingReturnFunction) (ClusterMachine *, void *data, int len);
+typedef void (*PingReturnFunction) (ClusterHandler *, void *data, int len);
 
 struct PingMessage:public ClusterMessageHeader
 {
@@ -1108,13 +1111,13 @@ struct PingMessage:public ClusterMessageHeader
 };
 
 inline void
-cluster_ping(ClusterMachine * m, PingReturnFunction fn, void *data, int len)
+cluster_ping(ClusterHandler *ch, PingReturnFunction fn, void *data, int len)
 {
   PingMessage *msg = (PingMessage *)alloca(PingMessage::sizeof_fixedlen_msg() + len);
   msg->init();
   msg->fn = fn;
   memcpy(msg->data, data, len);
-  clusterProcessor.invoke_remote(m, PING_CLUSTER_FUNCTION, (void *) msg, (msg->sizeof_fixedlen_msg() + len));
+  clusterProcessor.invoke_remote(ch, PING_CLUSTER_FUNCTION, (void *) msg, (msg->sizeof_fixedlen_msg() + len));
 }
 
 // filled with 0's

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/P_ClusterCacheInternal.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_ClusterCacheInternal.h b/iocore/cluster/P_ClusterCacheInternal.h
index 3cef837..aea7beb 100644
--- a/iocore/cluster/P_ClusterCacheInternal.h
+++ b/iocore/cluster/P_ClusterCacheInternal.h
@@ -104,6 +104,7 @@ struct CacheContinuation:public Continuation
   ClusterMachine *past_probes[CONFIGURATION_HISTORY_PROBE_DEPTH];
   ink_hrtime start_time;
   ClusterMachine *from;
+  ClusterHandler *ch;
   VConnection *cache_vc;
   bool cache_read;
   int result;                   // return event code

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/P_ClusterHandler.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_ClusterHandler.h b/iocore/cluster/P_ClusterHandler.h
index 35bb6b0..979de00 100644
--- a/iocore/cluster/P_ClusterHandler.h
+++ b/iocore/cluster/P_ClusterHandler.h
@@ -83,7 +83,7 @@ struct ClusterControl: public Continuation
 
 struct OutgoingControl: public ClusterControl
 {
-  ClusterMachine *m;
+  ClusterHandler *ch;
   ink_hrtime submit_time;
 
   static OutgoingControl *alloc();
@@ -430,6 +430,9 @@ struct ClusterHandler:public ClusterHandlerBase
   char *hostname;
   ClusterMachine *machine;
   int ifd;
+  int id;
+  bool dead;
+  bool downing;
 
   int32_t active;                 // handler currently running
   bool on_stolen_thread;

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/P_ClusterInline.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_ClusterInline.h b/iocore/cluster/P_ClusterInline.h
index d62d54b..c653956 100644
--- a/iocore/cluster/P_ClusterInline.h
+++ b/iocore/cluster/P_ClusterInline.h
@@ -30,6 +30,7 @@
 #define __P_CLUSTERINLINE_H__
 #include "P_ClusterCacheInternal.h"
 #include "P_CacheInternal.h"
+#include "P_ClusterHandler.h"
 
 inline Action *
 Cluster_lookup(Continuation * cont, CacheKey * key, CacheFragType frag_type, char *hostname, int host_len)
@@ -37,7 +38,7 @@ Cluster_lookup(Continuation * cont, CacheKey * key, CacheFragType frag_type, cha
   // Try to send remote, if not possible, handle locally
   Action *retAct;
   ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
-  if (!clusterProcessor.disable_remote_cluster_ops(m)) {
+  if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
     CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
     cc->action = cont;
     cc->mutex = cont->mutex;

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/P_ClusterLoadMonitor.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_ClusterLoadMonitor.h b/iocore/cluster/P_ClusterLoadMonitor.h
index 46f9806..89156bd 100644
--- a/iocore/cluster/P_ClusterLoadMonitor.h
+++ b/iocore/cluster/P_ClusterLoadMonitor.h
@@ -70,10 +70,10 @@ public:
     }
   };
 
-  static void cluster_load_ping_rethandler(ClusterMachine *, void *, int);
+  static void cluster_load_ping_rethandler(ClusterHandler *, void *, int);
 
 public:
-  ClusterLoadMonitor(ClusterMachine * m);
+  ClusterLoadMonitor(ClusterHandler * ch);
   void init();
   ~ClusterLoadMonitor();
   void cancel_monitor();
@@ -101,7 +101,7 @@ private:
   int cluster_load_exceed_duration;
 
   // Class specific data
-  ClusterMachine *machine;
+  ClusterHandler *ch;
   int *ping_response_buckets;
   ink_hrtime *ping_response_history_buf;
   int ping_history_buf_head;

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/P_ClusterMachine.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_ClusterMachine.h b/iocore/cluster/P_ClusterMachine.h
index a69227d..61aee3a 100644
--- a/iocore/cluster/P_ClusterMachine.h
+++ b/iocore/cluster/P_ClusterMachine.h
@@ -66,12 +66,17 @@ struct ClusterMachine: public Server
   //
   unsigned int ip;
   int cluster_port;
+  int num_connections;
+  int now_connections;
+  int free_connections;
+  int64_t rr_count;
 
-    Link<ClusterMachine> link;
+  Link<ClusterMachine> link;
 
   // default for localhost
-    ClusterMachine(char *hostname = NULL, unsigned int ip = 0, int acluster_port = 0);
-   ~ClusterMachine();
+  ClusterMachine(char *hostname = NULL, unsigned int ip = 0, int acluster_port = 0);
+  ~ClusterMachine();
+  ClusterHandler *pop_ClusterHandler(int no_rr = 0);
 
   // Cluster message protocol version
   uint16_t msg_proto_major;
@@ -79,7 +84,7 @@ struct ClusterMachine: public Server
 
   // Private data for ClusterProcessor
   //
-  ClusterHandler *clusterHandler;
+  ClusterHandler **clusterHandlers;
 };
 
 struct MachineListElement

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/hostdb/HostDB.cc
----------------------------------------------------------------------
diff --git a/iocore/hostdb/HostDB.cc b/iocore/hostdb/HostDB.cc
index 4841fba..4c17f5a 100644
--- a/iocore/hostdb/HostDB.cc
+++ b/iocore/hostdb/HostDB.cc
@@ -1577,7 +1577,7 @@ bool HostDBContinuation::do_get_response(Event * e)
 
   // Send the message
   //
-  clusterProcessor.invoke_remote(m, GET_HOSTINFO_CLUSTER_FUNCTION, (char *) &msg, len);
+  clusterProcessor.invoke_remote(m->pop_ClusterHandler(), GET_HOSTINFO_CLUSTER_FUNCTION, (char *) &msg, len);
 
   return true;
 }
@@ -1650,7 +1650,7 @@ HostDBContinuation::do_put_response(ClusterMachine * m, HostDBInfo * r, Continua
   HostDB_put_message msg;
   int len = make_put_message(r, c, (char *) &msg, sizeof(HostDB_put_message));
 
-  clusterProcessor.invoke_remote(m, PUT_HOSTINFO_CLUSTER_FUNCTION, (char *) &msg, len);
+  clusterProcessor.invoke_remote(m->pop_ClusterHandler(), PUT_HOSTINFO_CLUSTER_FUNCTION, (char *) &msg, len);
 
 }
 #endif // NON_MODULAR
@@ -1935,7 +1935,7 @@ HostDBContinuation::failed_cluster_request(Event * e)
 
 
 void
-get_hostinfo_ClusterFunction(ClusterMachine * from, void *data, int len)
+get_hostinfo_ClusterFunction(ClusterHandler *ch, void *data, int len)
 {
   NOWARN_UNUSED(len);
   void *pDS = 0;
@@ -1956,7 +1956,7 @@ get_hostinfo_ClusterFunction(ClusterMachine * from, void *data, int len)
 
   HostDBContinuation *c = hostDBContAllocator.alloc();
   SET_CONTINUATION_HANDLER(c, (HostDBContHandler) & HostDBContinuation::probeEvent);
-  c->from = from;
+  c->from = ch->machine;
   c->from_cont = msg->cont;
 
   /* -----------------------------------------
@@ -1975,7 +1975,7 @@ get_hostinfo_ClusterFunction(ClusterMachine * from, void *data, int len)
 
 
 void
-put_hostinfo_ClusterFunction(ClusterMachine * from, void *data, int len)
+put_hostinfo_ClusterFunction(ClusterHandler *ch, void *data, int len)
 {
   NOWARN_UNUSED(len);
   HostDB_put_message *msg = (HostDB_put_message *) data;
@@ -1988,7 +1988,7 @@ put_hostinfo_ClusterFunction(ClusterMachine * from, void *data, int len)
   c->missing = msg->missing;
   c->round_robin = msg->round_robin;
   c->ttl = msg->ttl;
-  c->from = from;
+  c->from = ch->machine;
   dnsProcessor.thread->schedule_imm(c);
 }
 #endif // NON_MODULAR

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/net/UnixNetAccept.cc
----------------------------------------------------------------------
diff --git a/iocore/net/UnixNetAccept.cc b/iocore/net/UnixNetAccept.cc
index 1ff6511..f2ff274 100644
--- a/iocore/net/UnixNetAccept.cc
+++ b/iocore/net/UnixNetAccept.cc
@@ -163,7 +163,7 @@ NetAccept::allocateGlobal()
 // or ET_NET).
 EventType NetAccept::getEtype()
 {
-  return (ET_NET);
+  return etype;
 }
 
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/mgmt/RecordsConfig.cc
----------------------------------------------------------------------
diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
index d86d57c..d08f59a 100644
--- a/mgmt/RecordsConfig.cc
+++ b/mgmt/RecordsConfig.cc
@@ -858,6 +858,8 @@ RecordElement RecordsConfig[] = {
   //##############################################################################
   {RECT_CONFIG, "proxy.config.cluster.threads", RECD_INT, "1", RECU_RESTART_TS, RR_NULL, RECC_INT, "[0-512]", RECA_NULL}
   ,
+  {RECT_CONFIG, "proxy.config.cluster.num_of_cluster_connections", RECD_INT, "1", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1-4096]", RECA_NULL}
+  ,
   {RECT_CONFIG, "proxy.config.cluster.cluster_port", RECD_INT, "8086", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
   ,
   {RECT_CONFIG, "proxy.config.cluster.cluster_configuration", RECD_STRING, "cluster.config", RECU_NULL, RR_NULL, RECC_NULL, NULL, RECA_NULL}

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/proxy/http/HttpCacheSM.cc
----------------------------------------------------------------------
diff --git a/proxy/http/HttpCacheSM.cc b/proxy/http/HttpCacheSM.cc
index f8e26a6..14c9f30 100644
--- a/proxy/http/HttpCacheSM.cc
+++ b/proxy/http/HttpCacheSM.cc
@@ -57,7 +57,8 @@ HttpCacheAction::cancel(Continuation * c)
   ink_assert(this->cancelled == 0);
 
   this->cancelled = 1;
-  sm->pending_action->cancel();
+  if (sm->pending_action)
+    sm->pending_action->cancel();
 }
 
 HttpCacheSM::HttpCacheSM():