You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by we...@apache.org on 2013/12/04 04:39:06 UTC

[4/6] refine the codes of cluster

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/ClusterConfig.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterConfig.cc b/iocore/cluster/ClusterConfig.cc
index a541364..b969f44 100644
--- a/iocore/cluster/ClusterConfig.cc
+++ b/iocore/cluster/ClusterConfig.cc
@@ -28,6 +28,9 @@
 ****************************************************************************/
 
 #include "P_Cluster.h"
+#include "machine.h"
+#include "connection.h"
+
 // updated from the cluster port configuration variable
 int cluster_port = DEFAULT_CLUSTER_PORT_NUMBER;
 
@@ -155,16 +158,30 @@ ClusterAccept::ClusterAcceptMachine(NetVConnection * NetVC)
 }
 
 static void
-make_cluster_connections(MachineList * l)
+make_cluster_connections(MachineList * l, MachineList * old)
 {
   //
   // Connect to all new machines.
   //
-  uint32_t ip = this_cluster_machine()->ip;
-  int num_connections = this_cluster_machine()->num_connections;
+  //uint32_t ip = this_cluster_machine()->ip;
+  //int num_connections = this_cluster_machine()->num_connections;
 
+  int i;
+  int k;
+  ClusterMachine *m;
   if (l) {
-    for (int i = 0; i < l->n; i++) {
+    for (i = 0; i < l->n; i++) {
+      struct in_addr in;
+      in.s_addr = l->machine[i].ip;
+      m = add_machine(l->machine[i].ip, l->machine[i].port);
+      if (m != NULL) {
+        machine_make_connections(m);
+      }
+
+      Debug(CL_NOTE, "do connect hostname: %u.%u.%u.%u:%d, %s, cluster_machine_count: %d\n",
+          DOT_SEPARATED(l->machine[i].ip), l->machine[i].port, inet_ntoa(in), cluster_machine_count);
+
+    /*
 #ifdef LOCAL_CLUSTER_TEST_MODE
       if (ip < l->machine[i].ip || (ip == l->machine[i].ip && (cluster_port < l->machine[i].port))) {
 #else
@@ -175,6 +192,48 @@ make_cluster_connections(MachineList * l)
         }
       }
     }
+    */
+    }
+  }
+
+  if (old == NULL) {
+    return;
+  }
+
+  //found down machines
+  if (l == NULL) {
+    for (i = 0; i < old->n; i++) {
+      struct in_addr in;
+      in.s_addr = old->machine[i].ip;
+      Debug(CL_NOTE, "stop connect hostname: %u.%u.%u.%u:%d, %s\n",
+          DOT_SEPARATED(old->machine[i].ip), old->machine[i].port, inet_ntoa(in));
+      m = get_machine(old->machine[i].ip, old->machine[i].port);
+      if (m != NULL) {
+        machine_stop_reconnect(m);
+      }
+    }
+  }
+  else {
+    for (i = 0; i < old->n; i++) {
+      for (k = 0; k < l->n; k++) {
+        if (l->machine[k].ip == old->machine[i].ip &&
+            l->machine[k].port == old->machine[i].port)
+        {
+          break;
+        }
+      }
+
+      if (k == l->n) {  //not found, machine down
+        struct in_addr in;
+        in.s_addr = old->machine[i].ip;
+        Debug(CL_NOTE, "stop connect hostname: %u.%u.%u.%u:%d, %s\n",
+            DOT_SEPARATED(old->machine[i].ip), old->machine[i].port, inet_ntoa(in));
+        m = get_machine(old->machine[i].ip, old->machine[i].port);
+        if (m != NULL) {
+          machine_stop_reconnect(m);
+        }
+      }
+    }
   }
 }
 
@@ -201,7 +260,7 @@ machine_config_change(const char * /* name ATS_UNUSED */, RecDataT /* data_type
   case CLUSTER_CONFIG:
     old = cluster_config;
     cluster_config = l;
-    make_cluster_connections(l);
+    make_cluster_connections(l, old);
     break;
   }
 #else
@@ -209,7 +268,7 @@ machine_config_change(const char * /* name ATS_UNUSED */, RecDataT /* data_type
   old = cluster_config;
   machines_config = l;
   cluster_config = l;
-  make_cluster_connections(l);
+  make_cluster_connections(l, old);
 #endif
   if (old)
     free_MachineList(old);
@@ -291,8 +350,10 @@ configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m)
   // Build a new cluster configuration with the new machine.
   // Machines are stored in ip sorted order.
   //
+  /*
   EThread *thread = this_ethread();
   ProxyMutex *mutex = thread->mutex;
+  */
   int i = 0;
   ClusterConfiguration *cc = NEW(new ClusterConfiguration(*c));
 
@@ -319,7 +380,7 @@ configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m)
 
   build_cluster_hash_table(cc);
   INK_MEMORY_BARRIER;           // commit writes before freeing old hash table
-  CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
+  //CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
 
   free_configuration(c, cc);
   return cc;
@@ -328,9 +389,6 @@ configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m)
 ClusterConfiguration *
 configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m)
 {
-  EThread *thread = this_ethread();
-  ProxyMutex *mutex = thread->mutex;
-
   //
   // Build a new cluster configuration without a machine
   //
@@ -350,7 +408,7 @@ configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m)
 
   build_cluster_hash_table(cc);
   INK_MEMORY_BARRIER;           // commit writes before freeing old hash table
-  CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
+  //CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
 
   free_configuration(c, cc);
   return cc;

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/ClusterMachine.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterMachine.cc b/iocore/cluster/ClusterMachine.cc
index 00e99e6..1ac4fae 100644
--- a/iocore/cluster/ClusterMachine.cc
+++ b/iocore/cluster/ClusterMachine.cc
@@ -77,9 +77,9 @@ ClusterMachine::ClusterMachine(char *ahostname, unsigned int aip, int aport)
     msg_proto_minor(0),
     clusterHandlers(0)
 {
-  EThread *thread = this_ethread();
-  ProxyMutex *mutex = thread->mutex;
-  CLUSTER_INCREMENT_DYN_STAT(CLUSTER_MACHINES_ALLOCATED_STAT);
+  //EThread *thread = this_ethread();
+  //ProxyMutex *mutex = thread->mutex;
+  //CLUSTER_INCREMENT_DYN_STAT(CLUSTER_MACHINES_ALLOCATED_STAT);
   if (!aip) {
     char localhost[1024];
     if (!ahostname) {
@@ -166,7 +166,7 @@ ClusterHandler *ClusterMachine::pop_ClusterHandler(int no_rr)
 ClusterMachine::~ClusterMachine()
 {
   ats_free(hostname);
-  ats_free(clusterHandlers);
+  // ats_free(clusterHandlers);
 }
 
 struct MachineTimeoutContinuation;
@@ -193,10 +193,10 @@ struct MachineTimeoutContinuation: public Continuation
 void
 free_ClusterMachine(ClusterMachine * m)
 {
-  EThread *thread = this_ethread();
-  ProxyMutex *mutex = thread->mutex;
+  //EThread *thread = this_ethread();
+  //ProxyMutex *mutex = thread->mutex;
   // delay before the final free
-  CLUSTER_INCREMENT_DYN_STAT(CLUSTER_MACHINES_FREED_STAT);
+  //CLUSTER_INCREMENT_DYN_STAT(CLUSTER_MACHINES_FREED_STAT);
   m->dead = true;
   eventProcessor.schedule_in(NEW(new MachineTimeoutContinuation(m)), MACHINE_TIMEOUT, ET_CALL);
 }

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/ClusterProcessor.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterProcessor.cc b/iocore/cluster/ClusterProcessor.cc
index b01e0ff..ab6c0a0 100644
--- a/iocore/cluster/ClusterProcessor.cc
+++ b/iocore/cluster/ClusterProcessor.cc
@@ -28,21 +28,34 @@
 ****************************************************************************/
 
 #include "P_Cluster.h"
+#include "global.h"
+#include "connection.h"
+
 /*************************************************************************/
 // ClusterProcessor member functions (Public class)
 /*************************************************************************/
 int cluster_port_number = DEFAULT_CLUSTER_PORT_NUMBER;
 int cache_clustering_enabled = 0;
 int num_of_cluster_threads = DEFAULT_NUMBER_OF_CLUSTER_THREADS;
+int num_of_cluster_connections = 0;
 
 ClusterProcessor clusterProcessor;
 RecRawStatBlock *cluster_rsb = NULL;
 int ET_CLUSTER;
 
+void cluster_main_handler(ClusterSession session, void *context,
+    const int func_id, IOBufferBlock *data, const int data_len);
 ClusterProcessor::ClusterProcessor():accept_handler(NULL), this_cluster(NULL)
 {
 }
 
+
+//void cluster_error_handler(int event, void *arg);
+//void cluster_main_handler(ClusterSession *session, const int func_id,
+//  void *data, const int data_len) {
+//  ClusterRPC[func_id](session, data, data_len);
+//}
+
 ClusterProcessor::~ClusterProcessor()
 {
   if (accept_handler) {
@@ -55,98 +68,111 @@ int
 ClusterProcessor::internal_invoke_remote(ClusterHandler *ch, int cluster_fn,
                                          void *data, int len, int options, void *cmsg)
 {
-  EThread *thread = this_ethread();
-  ProxyMutex *mutex = thread->mutex;
-  //
-  // RPC facility for intercluster communication available to other
-  //  subsystems.
-  //
-  bool steal = (options & CLUSTER_OPT_STEAL ? true : false);
-  bool delay = (options & CLUSTER_OPT_DELAY ? true : false);
-  bool data_in_ocntl = (options & CLUSTER_OPT_DATA_IS_OCONTROL ? true : false);
-  bool malloced = (cluster_fn == CLUSTER_FUNCTION_MALLOCED);
-  OutgoingControl *c;
-
-  if (!ch || (!malloced && !((unsigned int) cluster_fn < (uint32_t) SIZE_clusterFunction))) {
-    // Invalid message or node is down, free message data
-    if (malloced) {
-      ats_free(data);
-    }
-    if (cmsg) {
-      invoke_remote_data_args *args = (invoke_remote_data_args *)
-        (((OutgoingControl *) cmsg)->data + sizeof(int32_t));
-      ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
+//  EThread *thread = this_ethread();
+//  ProxyMutex *mutex = thread->mutex;
+//  //
+//  // RPC facility for intercluster communication available to other
+//  //  subsystems.
+//  //
+//  bool steal = (options & CLUSTER_OPT_STEAL ? true : false);
+//  bool delay = (options & CLUSTER_OPT_DELAY ? true : false);
+//  bool data_in_ocntl = (options & CLUSTER_OPT_DATA_IS_OCONTROL ? true : false);
+//  bool malloced = (cluster_fn == CLUSTER_FUNCTION_MALLOCED);
+//  OutgoingControl *c;
+//
+//  if (!ch || (!malloced && !((unsigned int) cluster_fn < (uint32_t) SIZE_clusterFunction))) {
+//    // Invalid message or node is down, free message data
+//    if (malloced) {
+//      ats_free(data);
+//    }
+//    if (cmsg) {
+//      invoke_remote_data_args *args = (invoke_remote_data_args *)
+//        (((OutgoingControl *) cmsg)->data + sizeof(int32_t));
+//      ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
+//
+//      args->data_oc->freeall();
+//      ((OutgoingControl *) cmsg)->freeall();
+//    }
+//    if (data_in_ocntl) {
+//      c = *((OutgoingControl **) ((char *) data - sizeof(OutgoingControl *)));
+//      c->freeall();
+//    }
+//    return -1;
+//  }
+//
+//  if (data_in_ocntl) {
+//    c = *((OutgoingControl **) ((char *) data - sizeof(OutgoingControl *)));
+//  } else {
+//    c = OutgoingControl::alloc();
+//  }
+//  CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CTRL_MSGS_SENT_STAT);
+//  c->submit_time = ink_get_hrtime();
+//  if ((c->zero_body = zero_body)) {
+//    c->free_proc = &CacheContinuation::disposeOfDataBuffer;
+//    c->free_proc_arg = cc;
+//  }
+//
+//  if (malloced) {
+//    c->set_data((char *) data, len);
+//  } else {
+//    if (!data_in_ocntl) {
+//      c->len = len + sizeof(int32_t);
+//      c->alloc_data();
+//    }
+//    if (!c->fast_data()) {
+//      CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SLOW_CTRL_MSGS_SENT_STAT);
+//    }
+//    *(int32_t *) c->data = cluster_fn;
+//    if (!data_in_ocntl) {
+//      memcpy(c->data + sizeof(int32_t), data, len);
+//    }
+//  }
+//
+//  SET_CONTINUATION_HANDLER(c, (OutgoingCtrlHandler) & OutgoingControl::startEvent);
+//
+//  /////////////////////////////////////
+//  // Compound message adjustments
+//  /////////////////////////////////////
+//  if (cmsg) {
+//    invoke_remote_data_args *args = (invoke_remote_data_args *)
+//      (((OutgoingControl *) cmsg)->data + sizeof(int32_t));
+//    ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
+//    args->msg_oc = c;
+//    c = (OutgoingControl *) cmsg;
+//  }
+//#ifndef CLUSTER_THREAD_STEALING
+//  delay = true;
+//#endif
+//  if (!delay) {
+//    EThread *tt = this_ethread();
+//    {
+//      int q = ClusterFuncToQpri(cluster_fn);
+//      ink_atomiclist_push(&ch->outgoing_control_al[q], (void *) c);
+//
+//      MUTEX_TRY_LOCK(lock, ch->mutex, tt);
+//      if (!lock) {
+//		if(ch->thread && ch->thread->signal_hook)
+//		  ch->thread->signal_hook(ch->thread);
+//		return 1;
+//      }
+//      if (steal)
+//        ch->steal_thread(tt);
+//      return 1;
+//    }
+//  } else {
+//    c->mutex = ch->mutex;
+//    eventProcessor.schedule_imm_signal(c);
+//    return 0;
+//  }
+
+  (void) ch;
+  (void) cluster_fn;
+  (void) data;
+  (void) len;
+  (void) options;
+  (void) cmsg;
 
-      args->data_oc->freeall();
-      ((OutgoingControl *) cmsg)->freeall();
-    }
-    if (data_in_ocntl) {
-      c = *((OutgoingControl **) ((char *) data - sizeof(OutgoingControl *)));
-      c->freeall();
-    }
-    return -1;
-  }
-
-  if (data_in_ocntl) {
-    c = *((OutgoingControl **) ((char *) data - sizeof(OutgoingControl *)));
-  } else {
-    c = OutgoingControl::alloc();
-  }
-  CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CTRL_MSGS_SENT_STAT);
-  c->submit_time = ink_get_hrtime();
-
-  if (malloced) {
-    c->set_data((char *) data, len);
-  } else {
-    if (!data_in_ocntl) {
-      c->len = len + sizeof(int32_t);
-      c->alloc_data();
-    }
-    if (!c->fast_data()) {
-      CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SLOW_CTRL_MSGS_SENT_STAT);
-    }
-    *(int32_t *) c->data = cluster_fn;
-    if (!data_in_ocntl) {
-      memcpy(c->data + sizeof(int32_t), data, len);
-    }
-  }
-
-  SET_CONTINUATION_HANDLER(c, (OutgoingCtrlHandler) & OutgoingControl::startEvent);
-
-  /////////////////////////////////////
-  // Compound message adjustments
-  /////////////////////////////////////
-  if (cmsg) {
-    invoke_remote_data_args *args = (invoke_remote_data_args *)
-      (((OutgoingControl *) cmsg)->data + sizeof(int32_t));
-    ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
-    args->msg_oc = c;
-    c = (OutgoingControl *) cmsg;
-  }
-#ifndef CLUSTER_THREAD_STEALING
-  delay = true;
-#endif
-  if (!delay) {
-    EThread *tt = this_ethread();
-    {
-      int q = ClusterFuncToQpri(cluster_fn);
-      ink_atomiclist_push(&ch->outgoing_control_al[q], (void *) c);
-
-      MUTEX_TRY_LOCK(lock, ch->mutex, tt);
-      if (!lock) {
-		if(ch->thread && ch->thread->signal_hook)
-		  ch->thread->signal_hook(ch->thread);
-		return 1;
-      }
-      if (steal)
-        ch->steal_thread(tt);
-      return 1;
-    }
-  } else {
-    c->mutex = ch->mutex;
-    eventProcessor.schedule_imm_signal(c);
-    return 0;
-  }
+  return 0;
 }
 
 int
@@ -162,45 +188,56 @@ ClusterProcessor::invoke_remote_data(ClusterHandler *ch, int cluster_fn,
                                      int dest_channel, ClusterVCToken * token,
                                      void (*bufdata_free_proc) (void *), void *bufdata_free_proc_arg, int options)
 {
-  if (!buf) {
-    // No buffer data, translate this into a invoke_remote() request
-    return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) NULL);
-  }
-  ink_assert(data);
-  ink_assert(data_len);
-  ink_assert(dest_channel);
-  ink_assert(token);
-  ink_assert(bufdata_free_proc);
-  ink_assert(bufdata_free_proc_arg);
-
-  /////////////////////////////////////////////////////////////////////////
-  // Build the compound message as described by invoke_remote_data_args.
-  /////////////////////////////////////////////////////////////////////////
-
-  // Build OutgoingControl for buffer data
-  OutgoingControl *bufdata_oc = OutgoingControl::alloc();
-  bufdata_oc->set_data(buf, bufdata_free_proc, bufdata_free_proc_arg);
-
-  // Build OutgoingControl for compound message header
-  invoke_remote_data_args mh;
-  mh.msg_oc = 0;
-  mh.data_oc = bufdata_oc;
-  mh.dest_channel = dest_channel;
-  mh.token = *token;
-
-  OutgoingControl *chdr = OutgoingControl::alloc();
-  chdr->submit_time = ink_get_hrtime();
-  chdr->len = sizeof(int32_t) + sizeof(mh);
-  chdr->alloc_data();
-  *(int32_t *) chdr->data = -1;   // always -1 for compound message
-  memcpy(chdr->data + sizeof(int32_t), (char *) &mh, sizeof(mh));
-
-  return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) chdr);
+//  if (!buf) {
+//    // No buffer data, translate this into a invoke_remote() request
+//    return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) NULL, zero_body, bufdata_free_proc_arg);
+//  }
+//  ink_assert(data);
+//  ink_assert(data_len);
+//  ink_assert(dest_channel);
+//  ink_assert(token);
+//  ink_assert(bufdata_free_proc);
+//  ink_assert(bufdata_free_proc_arg);
+//
+//  /////////////////////////////////////////////////////////////////////////
+//  // Build the compound message as described by invoke_remote_data_args.
+//  /////////////////////////////////////////////////////////////////////////
+//
+//  // Build OutgoingControl for buffer data
+//  OutgoingControl *bufdata_oc = OutgoingControl::alloc();
+//  bufdata_oc->set_data(buf, bufdata_free_proc, bufdata_free_proc_arg);
+//
+//  // Build OutgoingControl for compound message header
+//  invoke_remote_data_args mh;
+//  mh.msg_oc = 0;
+//  mh.data_oc = bufdata_oc;
+//  mh.dest_channel = dest_channel;
+//  mh.token = *token;
+//
+//  OutgoingControl *chdr = OutgoingControl::alloc();
+//  chdr->submit_time = ink_get_hrtime();
+//  chdr->len = sizeof(int32_t) + sizeof(mh);
+//  chdr->alloc_data();
+//  *(int32_t *) chdr->data = -1;   // always -1 for compound message
+//  memcpy(chdr->data + sizeof(int32_t), (char *) &mh, sizeof(mh));
+//
+//  return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) chdr);
+
+  (void) ch;
+  (void) cluster_fn;
+  (void) data;
+  (void) data_len;
+  (void) buf;
+  (void) dest_channel;
+  (void) token;
+  (void) bufdata_free_proc;
+  (void) bufdata_free_proc_arg;
+  (void) options;
+  return 0;
 }
 
-// TODO: Why pass in the length here if not used ?
 void
-ClusterProcessor::free_remote_data(char *p, int /* l ATS_UNUSED */)
+ClusterProcessor::free_remote_data(char *p, int /* l */)
 {
   char *d = p - sizeof(int32_t);  // reset to ptr to function code
   int data_hdr = ClusterControl::DATA_HDR;
@@ -225,65 +262,70 @@ ClusterProcessor::free_remote_data(char *p, int /* l ATS_UNUSED */)
 }
 
 ClusterVConnection *
-ClusterProcessor::open_local(Continuation * cont, ClusterMachine */* m ATS_UNUSED */, ClusterVCToken & token, int options)
+ClusterProcessor::open_local(Continuation * cont, ClusterMachine * m, ClusterVCToken & token, int options)
 {
-  //
-  //  New connect protocol.
-  //  As a VC initiator, establish the VC connection to the remote node
-  //  by allocating the VC locally and requiring the caller to pass the
-  //  token and channel id in the remote request.  The remote handler calls
-  //  connect_local to establish the remote side of the connection.
-  //
-  bool immediate = ((options & CLUSTER_OPT_IMMEDIATE) ? true : false);
-  bool allow_immediate = ((options & CLUSTER_OPT_ALLOW_IMMEDIATE) ? true : false);
-
-  ClusterHandler *ch = ((CacheContinuation *)cont)->ch;
-  if (!ch)
-    return NULL;
-  EThread *t = ch->thread;
-  if (!t)
-    return NULL;
-
-  EThread *thread = this_ethread();
-  ProxyMutex *mutex = thread->mutex;
-  ClusterVConnection *vc = clusterVCAllocator.alloc();
-  vc->new_connect_read = (options & CLUSTER_OPT_CONN_READ ? 1 : 0);
-  vc->start_time = ink_get_hrtime();
-  vc->last_activity_time = vc->start_time;
-  vc->ch = ch;
-  vc->token.alloc();
-  vc->token.ch_id = ch->id;
-  token = vc->token;
-#ifdef CLUSTER_THREAD_STEALING
-  CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPENNED_STAT);
-  CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
-  MUTEX_TRY_LOCK(lock, ch->mutex, thread);
-  if (!lock) {
-#endif
-    if (immediate) {
-      clusterVCAllocator_free(vc);
-      return NULL;
-    }
-    vc->action_ = cont;
-    ink_atomiclist_push(&ch->external_incoming_open_local, (void *) vc);
-	if(ch->thread && ch->thread->signal_hook)
-	  ch->thread->signal_hook(ch->thread);
-    return CLUSTER_DELAYED_OPEN;
-
-#ifdef CLUSTER_THREAD_STEALING
-  } else {
-    if (!(immediate || allow_immediate))
-      vc->action_ = cont;
-    if (vc->start(thread) < 0) {
-      return NULL;
-    }
-    if (immediate || allow_immediate) {
-      return vc;
-    } else {
-      return CLUSTER_DELAYED_OPEN;
-    }
-  }
-#endif
+//  //
+//  //  New connect protocol.
+//  //  As a VC initiator, establish the VC connection to the remote node
+//  //  by allocating the VC locally and requiring the caller to pass the
+//  //  token and channel id in the remote request.  The remote handler calls
+//  //  connect_local to establish the remote side of the connection.
+//  //
+//  bool immediate = ((options & CLUSTER_OPT_IMMEDIATE) ? true : false);
+//  bool allow_immediate = ((options & CLUSTER_OPT_ALLOW_IMMEDIATE) ? true : false);
+//
+//  ClusterHandler *ch = ((CacheContinuation *)cont)->ch;
+//  if (!ch)
+//    return NULL;
+//  EThread *t = ch->thread;
+//  if (!t)
+//    return NULL;
+//
+//  EThread *thread = this_ethread();
+//  ProxyMutex *mutex = thread->mutex;
+//  ClusterVConnection *vc = clusterVCAllocator.alloc();
+//  vc->new_connect_read = (options & CLUSTER_OPT_CONN_READ ? 1 : 0);
+//  vc->start_time = ink_get_hrtime();
+//  vc->last_activity_time = vc->start_time;
+//  vc->ch = ch;
+//  vc->token.alloc();
+//  vc->token.ch_id = ch->id;
+//  token = vc->token;
+//#ifdef CLUSTER_THREAD_STEALING
+//  CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPENNED_STAT);
+//  CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
+//  MUTEX_TRY_LOCK(lock, ch->mutex, thread);
+//  if (!lock) {
+//#endif
+//    if (immediate) {
+//      clusterVCAllocator_free(vc);
+//      return NULL;
+//    }
+//    vc->action_ = cont;
+//    ink_atomiclist_push(&ch->external_incoming_open_local, (void *) vc);
+//	if(ch->thread && ch->thread->signal_hook)
+//	  ch->thread->signal_hook(ch->thread);
+//    return CLUSTER_DELAYED_OPEN;
+//
+//#ifdef CLUSTER_THREAD_STEALING
+//  } else {
+//    if (!(immediate || allow_immediate))
+//      vc->action_ = cont;
+//    if (vc->start(thread) < 0) {
+//      return NULL;
+//    }
+//    if (immediate || allow_immediate) {
+//      return vc;
+//    } else {
+//      return CLUSTER_DELAYED_OPEN;
+//    }
+//  }
+//#endif
+  (void) cont;
+  (void) m;
+  (void) token;
+  (void) options;
+  return NULL;
 }
 
 ClusterVConnection *
@@ -367,9 +409,10 @@ bool ClusterProcessor::disable_remote_cluster_ops(ClusterMachine * m)
 // Simplify debug access to stats
 ////////////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////////////
-
+/*
 GlobalClusterPeriodicEvent *
   PeriodicClusterEvent;
+*/
 
 #ifdef CLUSTER_TOMCAT
 extern int cache_clustering_enabled;
@@ -386,6 +429,77 @@ unsigned long cluster_packet_tos = 0;
 int RPC_only_CacheCluster = 0;
 #endif
 
+static int machine_change_notify(ClusterMachine * m)
+{
+  //char textbuf[sizeof("255.255.255.255:65535")];
+  int result;
+
+  Debug("cluster_io", "start notify, machine %s %hhu.%hhu.%hhu.%hhu:%d, version: %d.%d",
+      m->dead ? "down" : "up",
+      DOT_SEPARATED(m->ip), m->cluster_port, m->msg_proto_major,
+      m->msg_proto_minor);
+
+  if (m->dead) {
+    ClusterConfiguration *c = this_cluster()->current_configuration();
+    if (c->find(m->ip, m->cluster_port)) {
+      ClusterConfiguration *cc = configuration_remove_machine(c, m);
+      //CLUSTER_DECREMENT_DYN_STAT(CLUSTER_NODES_STAT);
+      this_cluster()->configurations.push(cc);
+      result = 0;
+    }
+    else {
+      result = ENOENT;
+    }
+
+    Note("machine down %hhu.%hhu.%hhu.%hhu:%d, version=%d.%d",
+        DOT_SEPARATED(m->ip), m->cluster_port, m->msg_proto_major,
+        m->msg_proto_minor);
+    /*
+    snprintf(textbuf, sizeof(textbuf), "%hhu.%hhu.%hhu.%hhu:%d", DOT_SEPARATED(m->ip), m->cluster_port);
+    REC_SignalManager(REC_SIGNAL_MACHINE_DOWN, textbuf);
+    */
+  }
+  else {
+    ClusterConfiguration *c = this_cluster()->current_configuration();
+    if (c->find(m->ip, m->cluster_port)) {
+      Warning("machine %hhu.%hhu.%hhu.%hhu:%d already up",
+        DOT_SEPARATED(m->ip), m->cluster_port);
+      result = EEXIST;
+    }
+    else {
+        ClusterConfiguration *cconf = configuration_add_machine(c, m);
+        //CLUSTER_INCREMENT_DYN_STAT(CLUSTER_NODES_STAT);
+        this_cluster()->configurations.push(cconf);
+        result = 0;
+    }
+
+    Note("machine up %hhu.%hhu.%hhu.%hhu:%d, version=%d.%d",
+        DOT_SEPARATED(m->ip), m->cluster_port, m->msg_proto_major,
+        m->msg_proto_minor);
+
+    /*
+    snprintf(textbuf, sizeof(textbuf), "%hhu.%hhu.%hhu.%hhu:%d", DOT_SEPARATED(m->ip), m->cluster_port);
+    REC_SignalManager(REC_SIGNAL_MACHINE_UP, textbuf);
+    */
+  }
+
+  return result;
+}
+
+
+static int
+cluster_ping_config_cb(const char *name, RecDataT /* data_type */, RecData data, void * /* cookie */)
+{
+  if (strcmp(name, "proxy.config.cluster.ping_send_interval_msecs") == 0) {
+    cluster_ping_send_interval = data.rec_int * HRTIME_MSECOND;
+  }
+  else if (strcmp(name, "proxy.config.cluster.ping_latency_threshold_msecs") == 0) {
+    cluster_ping_latency_threshold =  data.rec_int * HRTIME_MSECOND;
+  }
+
+  return 0;
+}
+
 int
 ClusterProcessor::init()
 {
@@ -686,6 +800,11 @@ ClusterProcessor::init()
   if (num_of_cluster_threads == DEFAULT_NUMBER_OF_CLUSTER_THREADS)
     REC_ReadConfigInteger(num_of_cluster_threads, "proxy.config.cluster.threads");
 
+  REC_ReadConfigInteger(num_of_cluster_connections, "proxy.config.cluster.connections");
+  if (num_of_cluster_connections == 0) {
+    num_of_cluster_connections = num_of_cluster_threads;
+  }
+
   REC_EstablishStaticConfigInt32(CacheClusterMonitorEnabled, "proxy.config.cluster.enable_monitor");
   REC_EstablishStaticConfigInt32(CacheClusterMonitorIntervalSecs, "proxy.config.cluster.monitor_interval_secs");
   REC_ReadConfigInteger(cluster_receive_buffer_size, "proxy.config.cluster.receive_buffer_size");
@@ -695,17 +814,28 @@ ClusterProcessor::init()
   REC_ReadConfigInteger(cluster_packet_tos, "proxy.config.cluster.sock_packet_tos");
   REC_EstablishStaticConfigInt32(RPC_only_CacheCluster, "proxy.config.cluster.rpc_cache_cluster");
 
+  REC_EstablishStaticConfigInteger(cluster_flow_ctrl_min_bps, "proxy.config.cluster.flow_ctrl.min_bps");
+  REC_EstablishStaticConfigInteger(cluster_flow_ctrl_max_bps, "proxy.config.cluster.flow_ctrl.max_bps");
+  REC_EstablishStaticConfigInt32(cluster_send_min_wait_time, "proxy.config.cluster.flow_ctrl.min_send_wait_time");
+  REC_EstablishStaticConfigInt32(cluster_send_max_wait_time, "proxy.config.cluster.flow_ctrl.max_send_wait_time");
+  REC_EstablishStaticConfigInt32(cluster_min_loop_interval, "proxy.config.cluster.flow_ctrl.min_loop_interval");
+  REC_EstablishStaticConfigInt32(cluster_max_loop_interval, "proxy.config.cluster.flow_ctrl.max_loop_interval");
+
   int cluster_type = 0;
   REC_ReadConfigInteger(cluster_type, "proxy.local.cluster.type");
 
   create_this_cluster_machine();
+
+  /*
 #ifdef NON_MODULAR
   // Cluster API Initializations
   clusterAPI_init();
 #endif
+
   // Start global Cluster periodic event
   PeriodicClusterEvent = NEW(new GlobalClusterPeriodicEvent);
   PeriodicClusterEvent->init();
+  */
 
   this_cluster = NEW(new Cluster);
   ClusterConfiguration *cc = NEW(new ClusterConfiguration);
@@ -713,19 +843,61 @@ ClusterProcessor::init()
   cc->n_machines = 1;
   cc->machines[0] = this_cluster_machine();
   memset(cc->hash_table, 0, CLUSTER_HASH_TABLE_SIZE);
-  // 0 dummy output data
 
+  /*
+  // 0 dummy output data
   memset(channel_dummy_output, 0, sizeof(channel_dummy_output));
+  */
+
+  int result;
 
   if (cluster_type == 1) {
-    cache_clustering_enabled = 1;
-    Note("cache clustering enabled");
-    compute_cluster_mode();
+    REC_ReadConfigInteger(cluster_ping_send_interval, "proxy.config.cluster.ping_send_interval_msecs");
+    REC_ReadConfigInteger(cluster_ping_latency_threshold, "proxy.config.cluster.ping_latency_threshold_msecs");
+    cluster_ping_send_interval *= HRTIME_MSECOND;
+    cluster_ping_latency_threshold *= HRTIME_MSECOND;
+
+    REC_RegisterConfigUpdateFunc("proxy.config.cluster.ping_send_interval_msecs", cluster_ping_config_cb, NULL);
+    REC_RegisterConfigUpdateFunc("proxy.config.cluster.ping_latency_threshold_msecs", cluster_ping_config_cb, NULL);
+    REC_EstablishStaticConfigInt32(cluster_ping_retries, "proxy.config.cluster.ping_retries");
+
+    REC_ReadConfigInteger(max_session_count_per_machine, "proxy.config.cluster.max_sessions_per_machine");
+    REC_ReadConfigInteger(session_lock_count_per_machine, "proxy.config.cluster.session_locks_per_machine");
+
+    bool found;
+    IpEndpoint cluster_ip;    // ip addr of the cluster interface
+    char *intrName;               // Name of the interface we are to use
+    intrName = REC_readString("proxy.config.cluster.ethernet_interface", &found);
+    ink_assert(found && intrName != NULL);
+
+    found = mgmt_getAddrForIntr(intrName, &cluster_ip.sa);
+    if (!found) {
+      ink_fatal(1, "[ClusterProcessor::init] Unable to find network interface %s.  Exiting...\n", intrName);
+    } else if (!ats_is_ip4(&cluster_ip)) {
+      ink_fatal(1, "[ClusterProcessor::init] Unable to find IPv4 network interface %s.  Exiting...\n", intrName);
+    }
+
+    if (num_of_cluster_connections % 2 != 0) {
+      num_of_cluster_connections++;
+    }
+    cluster_global_init(cluster_main_handler, machine_change_notify);
+
+    result = connection_manager_init(cluster_ip.sin.sin_addr.s_addr);
+    if (result == 0) {
+      cache_clustering_enabled = 1;
+      Note("cache clustering enabled");
+      compute_cluster_mode();
+    }
+    else {
+      cache_clustering_enabled = 0;
+      Note("init fail, cache clustering disabled");
+    }
   } else {
     cache_clustering_enabled = 0;
     Note("cache clustering disabled");
+    result = 0;
   }
-  return 0;
+  return result;
 }
 
 // function added to adhere to the name calling convention of init functions
@@ -742,13 +914,18 @@ ClusterProcessor::start()
   this_cluster_machine()->cluster_port = cluster_port;
 #endif
   if (cache_clustering_enabled && (cacheProcessor.IsCacheEnabled() == CACHE_INITIALIZED)) {
-    size_t stacksize;
 
-    REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
-    ET_CLUSTER = eventProcessor.spawn_event_threads(num_of_cluster_threads, "ET_CLUSTER", stacksize);
+    /*
+    ET_CLUSTER = eventProcessor.spawn_event_threads(num_of_cluster_threads, "ET_CLUSTER");
     for (int i = 0; i < eventProcessor.n_threads_for_type[ET_CLUSTER]; i++) {
-      initialize_thread_for_net(eventProcessor.eventthread[ET_CLUSTER][i]);
+      initialize_thread_for_net(eventProcessor.eventthread[ET_CLUSTER][i], i);
+#ifndef STANDALONE_IOCORE
+      extern void initialize_thread_for_http_sessions(EThread *thread, int thread_index);
+      initialize_thread_for_http_sessions(eventProcessor.eventthread[ET_CLUSTER][i], i);
+#endif
     }
+    */
+
     REC_RegisterConfigUpdateFunc("proxy.config.cluster.cluster_configuration", machine_config_change, (void *) CLUSTER_CONFIG);
     do_machine_config_change((void *) CLUSTER_CONFIG, "proxy.config.cluster.cluster_configuration");
     // TODO: Remove this?
@@ -757,9 +934,12 @@ ClusterProcessor::start()
     do_machine_config_change((void *) MACHINE_CONFIG, "proxy.config.cluster.machine_configuration");
 #endif
 
-    accept_handler = NEW(new ClusterAccept(&cluster_port, cluster_receive_buffer_size, cluster_send_buffer_size));
-    accept_handler->Init();
+    //accept_handler = NEW(new ClusterAccept(&cluster_port, cluster_receive_buffer_size, cluster_send_buffer_size));
+    //accept_handler->Init();
+
+    connection_manager_start();
   }
+
   return 0;
 }
 
@@ -846,4 +1026,53 @@ ClusterProcessor::compute_cluster_mode()
   }
 }
 
+
+void cluster_main_handler(ClusterSession session, void *context,
+    const int func_id, IOBufferBlock *data, const int data_len)
+{
+  int event = func_id < 0 ? -func_id: func_id;
+  switch (event) {
+    case CLUSTER_CACHE_DATA_ABORT:
+    case CLUSTER_CACHE_DATA_READ_REENABLE: {
+      ink_assert(data_len == 0 && context && data == NULL);
+      CacheContinuation *cc = (CacheContinuation *) context;
+      cc->thread->schedule_imm(cc, event);
+      return;
+    }
+    default: {
+      ClusterCont *cc = clusterContAllocator.alloc();
+      SET_CONTINUATION_HANDLER(cc, &ClusterCont::handleEvent);
+      cc->session = session;
+      cc->context = context;
+      cc->func_id = event;
+      cc->data = data;
+      cc->data_len = data_len;
+      cc->_action = (Continuation *) context;
+      if (cc->_action.continuation) {
+        cc->mutex = cc->_action.mutex;
+      }
+#ifdef DEBUG
+      int64_t nbytes = 0;
+      for (IOBufferBlock *b = data; b; b = b->next) {
+        nbytes += b->read_avail();
+      }
+      ink_assert(data_len == nbytes);
+#endif
+
+      if (event == CLUSTER_CACHE_DATA_READ_DONE
+          || event == CLUSTER_CACHE_DATA_ERROR
+          || event == CLUSTER_CACHE_OP_RESULT_CLUSTER_FUNCTION) {
+        ink_assert(context);
+        ClusterCacheVC *cvc = (ClusterCacheVC *) context;
+        cvc->initial_thread->schedule_imm(cc);
+        return;
+      }
+
+      eventProcessor.schedule_imm(cc);
+      return;
+    }
+  }
+}
+
+
 // End of ClusterProcessor.cc

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/ClusterVConnection.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterVConnection.cc b/iocore/cluster/ClusterVConnection.cc
index 0630edf..a76c912 100644
--- a/iocore/cluster/ClusterVConnection.cc
+++ b/iocore/cluster/ClusterVConnection.cc
@@ -30,6 +30,17 @@
 #include "P_Cluster.h"
 ClassAllocator<ClusterVConnection> clusterVCAllocator("clusterVCAllocator");
 ClassAllocator<ByteBankDescriptor> byteBankAllocator("byteBankAllocator");
+ClassAllocator<ClusterCacheVC> clusterCacheVCAllocator("custerCacheVCAllocator");
+
+int ClusterCacheVC::size_to_init = -1;
+
+#define CLUSTER_WRITE_MIN_SIZE (1 << 16)
+
+#define CLUSTER_CACHE_VC_CLOSE_SESSION \
+{ \
+  cluster_close_session(cs); \
+  session_closed = true; \
+}
 
 ByteBankDescriptor *
 ByteBankDescriptor::ByteBankDescriptor_alloc(IOBufferBlock * iob)
@@ -271,13 +282,13 @@ ClusterVConnection::do_io_write(Continuation * c, int64_t nbytes, IOBufferReader
 void
 ClusterVConnection::do_io_close(int alerrno)
 {
-  if ((type == VC_CLUSTER) && current_cont) {
-    if (((CacheContinuation *)current_cont)->read_cluster_vc == this)
-      type = VC_CLUSTER_READ;
-    else if (((CacheContinuation *)current_cont)->write_cluster_vc == this)
-      type = VC_CLUSTER_WRITE;
-  }
-  ch->vcs_push(this, type);
+//  if ((type == VC_CLUSTER) && current_cont) {
+//    if (((CacheContinuation *)current_cont)->read_cluster_vc == this)
+//      type = VC_CLUSTER_READ;
+//    else if (((CacheContinuation *)current_cont)->write_cluster_vc == this)
+//      type = VC_CLUSTER_WRITE;
+//  }
+//  ch->vcs_push(this, type);
 
   ClusterVConnectionBase::do_io_close(alerrno);
 }
@@ -650,4 +661,527 @@ ClusterVConnection::get_disk_io_priority()
   return disk_io_priority;
 }
 
+
+ClusterCacheVC::ClusterCacheVC() {
+  size_to_init = sizeof(ClusterCacheVC) - (size_t) & ((ClusterCacheVC *) 0)->vio;
+  memset((char *) &vio, 0, size_to_init);
+}
+
+int
+ClusterCacheVC::handleRead(int, void *)
+{
+  ink_assert(!in_progress && !remote_closed);
+  PUSH_HANDLER(&ClusterCacheVC::openReadReadDone);
+  if (vio.nbytes > 0 && total_len == 0) {
+    SetIOReadMessage msg;
+    msg.nbytes = vio.nbytes;
+    msg.offset = seek_to;
+    if (!cluster_send_message(cs, -CLUSTER_CACHE_DATA_READ_BEGIN, (char *) &msg,
+        sizeof(msg), PRIORITY_HIGH)) {
+      in_progress = true;
+      cluster_set_events(cs, RESPONSE_EVENT_NOTIFY_DEALER);
+      return EVENT_CONT;
+    }
+    goto Lfailed;
+  }
+
+  if (!cluster_send_message(cs, -CLUSTER_CACHE_DATA_READ_REENABLE, NULL, 0,
+      PRIORITY_HIGH)) {
+    in_progress = true;
+    cluster_set_events(cs, RESPONSE_EVENT_NOTIFY_DEALER);
+    return EVENT_CONT;
+  }
+  Lfailed:
+  CLUSTER_CACHE_VC_CLOSE_SESSION;
+  return calluser(VC_EVENT_ERROR);
+}
+
+int
+ClusterCacheVC::openReadReadDone(int event, void *data)
+{
+  cancel_trigger();
+  ink_assert(in_progress);
+  if (event == EVENT_IMMEDIATE)
+    return EVENT_CONT;
+
+  in_progress = false;
+  POP_HANDLER;
+
+  switch (event) {
+    case CLUSTER_CACHE_DATA_ERROR:
+    {
+      ClusterCont *cc = (ClusterCont *) data;
+      ink_assert(cc && cc->data_len > 0);
+      remote_closed = true;
+      event = *(int *) cc->data->start();
+      break;
+    }
+    case CLUSTER_CACHE_DATA_READ_DONE:
+    {
+      ClusterCont *cc = (ClusterCont *) data;
+      ink_assert(cc && d_len == 0);
+
+      d_len = cc->data_len;
+      total_len += d_len;
+      blocks = cc->data;
+      if (total_len >= vio.nbytes)
+        remote_closed = true;
+      break;
+    }
+    case CLUSTER_INTERNEL_ERROR:
+    default:
+      event = VC_EVENT_ERROR;
+      remote_closed = true;
+      break;
+  }
+
+  if (closed) {
+    if (!remote_closed)
+      cluster_send_message(cs, CLUSTER_CACHE_DATA_ABORT, NULL, 0, PRIORITY_HIGH);
+
+    free_ClusterCacheVC(this);
+    return EVENT_DONE;
+  }
+  // recevied data from cluster
+
+  return handleEvent(event, data);
+}
+
+int
+ClusterCacheVC::openReadStart(int event, void *data)
+{
+  ink_assert(in_progress);
+  in_progress = false;
+  if (_action.cancelled) {
+    if (!remote_closed)
+      cluster_send_message(cs, CLUSTER_CACHE_DATA_ABORT, NULL, 0, PRIORITY_HIGH);
+    free_ClusterCacheVC(this);
+    return EVENT_DONE;
+  }
+  if (event != CACHE_EVENT_OPEN_READ) {
+    if (event == CACHE_EVENT_OPEN_WRITE) {
+      // the remote side do the pre_write
+      vio.op = VIO::WRITE;
+      SET_HANDLER(&ClusterCacheVC::openWriteMain);
+      _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, this);
+      return EVENT_DONE;
+    }
+    // prevent further trigger
+    remote_closed = true;
+    CLUSTER_CACHE_VC_CLOSE_SESSION;
+    _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, data);
+    free_ClusterCacheVC(this);
+    return EVENT_DONE;
+  }
+
+  SET_HANDLER(&ClusterCacheVC::openReadMain);
+  callcont(CACHE_EVENT_OPEN_READ);
+  return EVENT_CONT;
+}
+int
+ClusterCacheVC::openReadMain(int event, void *e)
+{
+  cancel_trigger();
+  ink_assert(!in_progress);
+  if (event == VC_EVENT_ERROR || event == VC_EVENT_EOS) {
+    remote_closed = true;
+    CLUSTER_CACHE_VC_CLOSE_SESSION;
+    return calluser(event);
+  }
+
+  int64_t bytes = d_len;
+  int64_t ntodo = vio.ntodo();
+  if (ntodo <= 0)
+    return EVENT_CONT;
+  if (vio.buffer.writer()->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) // initiate read of first block
+    return EVENT_CONT;
+  if (!blocks && vio.ntodo() > 0)
+    goto Lread;
+
+  if (bytes > vio.ntodo())
+    bytes = vio.ntodo();
+  vio.buffer.writer()->append_block(blocks);
+  vio.ndone += bytes;
+  blocks = NULL;
+  d_len -= bytes;
+
+  if (vio.ntodo() <= 0)
+    return calluser(VC_EVENT_READ_COMPLETE);
+  else {
+    if (calluser(VC_EVENT_READ_READY) == EVENT_DONE)
+      return EVENT_DONE;
+    // we have to keep reading until we give the user all the
+    // bytes it wanted or we hit the watermark.
+    if (vio.ntodo() > 0 && !vio.buffer.writer()->high_water())
+      goto Lread;
+    return EVENT_CONT;
+  }
+Lread:
+  if (vio.ndone >= (int64_t) doc_len) {
+    // reached the end of the document and the user still wants more
+    return calluser(VC_EVENT_EOS);
+  }
+  // if the state machine calls reenable on the callback from the cache,
+  // we set up a schedule_imm event. The openReadReadDone discards
+  // EVENT_IMMEDIATE events. So, we have to cancel that trigger and set
+  // a new EVENT_INTERVAL event.
+  cancel_trigger();
+  return handleRead(event, e);
+}
+
+int
+ClusterCacheVC::openWriteStart(int event, void *data)
+{
+  ink_assert(in_progress);
+  in_progress = false;
+  if (_action.cancelled) {
+    if (!remote_closed)
+      cluster_send_message(cs, CLUSTER_CACHE_DATA_ABORT, NULL, 0, PRIORITY_HIGH);
+    free_ClusterCacheVC(this);
+    return EVENT_DONE;
+  }
+  // process the data
+  if (event != CACHE_EVENT_OPEN_WRITE) {
+    // prevent further trigger
+    remote_closed = true;
+    CLUSTER_CACHE_VC_CLOSE_SESSION;
+    _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, data);
+    free_ClusterCacheVC(this);
+    return EVENT_DONE;
+  }
+  SET_HANDLER(&ClusterCacheVC::openWriteMain);
+  return callcont(CACHE_EVENT_OPEN_WRITE);
+}
+int
+ClusterCacheVC::openWriteMain(int , void *)
+{
+  cancel_trigger();
+  ink_assert(!in_progress);
+
+Lagain:
+  if (remote_closed) {
+    if (calluser(VC_EVENT_ERROR) == EVENT_DONE)
+      return EVENT_DONE;
+    return EVENT_CONT;
+  }
+
+  if (!vio.buffer.writer()) {
+    if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE)
+      return EVENT_DONE;
+    if (!vio.buffer.writer())
+      return EVENT_CONT;
+  }
+
+  int64_t ntodo = vio.ntodo();
+
+  if (ntodo <= 0) {
+    if (calluser(VC_EVENT_WRITE_COMPLETE) == EVENT_DONE)
+      return EVENT_DONE;
+    ink_assert(!"close expected after write COMPLETE");
+    if (vio.ntodo() <= 0)
+      return EVENT_CONT;
+  }
+
+  ntodo = vio.ntodo() + length;
+  int64_t total_avail = vio.buffer.reader()->read_avail();
+  int64_t avail = total_avail;
+  int64_t towrite = avail + length;
+  if (towrite > ntodo) {
+    avail -= (towrite - ntodo);
+    towrite = ntodo;
+  }
+
+  if (!blocks && towrite) {
+    blocks = vio.buffer.reader()->block;
+    offset = vio.buffer.reader()->start_offset;
+  }
+
+  if (avail > 0) {
+    vio.buffer.reader()->consume(avail);
+    vio.ndone += avail;
+    total_len += avail;
+  }
+
+  ink_assert(towrite >= 0);
+  length = towrite;
+
+  int flen = cache_config_target_fragment_size;
+
+  while (length >= flen) {
+    IOBufferBlock *r = clone_IOBufferBlockList(blocks, offset, flen);
+    blocks = iobufferblock_skip(blocks, &offset, &length, flen);
+
+    remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_WRITE_DONE, r, -1,
+        priority);
+    if (remote_closed)
+      goto Lagain;
+
+    data_sent += flen;
+    Debug("data_sent", "sent bytes %d, reminds %"PRId64"", flen, length);
+  }
+  // for the read_from_writer work better,
+  // especailly the slow original
+  flen = CLUSTER_WRITE_MIN_SIZE;
+  if (length >= flen || (vio.ntodo() <= 0 && length > 0)) {
+    data_sent += length;
+    IOBufferBlock *r = clone_IOBufferBlockList(blocks, offset, length);
+    blocks = iobufferblock_skip(blocks, &offset, &length, length);
+    remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_WRITE_DONE, r,
+              -1, priority);
+    if (remote_closed)
+      goto Lagain;
+    Debug("data_sent", "sent bytes %d, reminds %"PRId64"", flen, length);
+  }
+
+  if (vio.ntodo() <= 0) {
+    ink_assert(length == 0 && total_len == vio.nbytes);
+    return calluser(VC_EVENT_WRITE_COMPLETE);
+  }
+  return calluser(VC_EVENT_WRITE_READY);
+}
+
+int
+ClusterCacheVC::removeEvent(int event, void *data)
+{
+  ink_assert(in_progress);
+  in_progress = false;
+  remote_closed = true;
+  CLUSTER_CACHE_VC_CLOSE_SESSION;
+  if (!_action.cancelled)
+    _action.continuation->handleEvent(event, data);
+  free_ClusterCacheVC(this);
+  return EVENT_DONE;
+}
+
+VIO *
+ClusterCacheVC::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *abuf)
+{
+  ink_assert(vio.op == VIO::READ && alternate.valid());
+  vio.buffer.writer_for(abuf);
+  vio.set_continuation(c);
+  vio.ndone = 0;
+  vio.nbytes = nbytes;
+  vio.vc_server = this;
+  seek_to = 0;
+  ink_assert(c->mutex->thread_holding);
+
+  ink_assert(!in_progress);
+  if (!trigger && !recursive)
+    trigger = c->mutex->thread_holding->schedule_imm_local(this);
+  return &vio;
+}
+
+VIO *
+ClusterCacheVC::do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *abuf, int64_t offset)
+{
+  ink_assert(vio.op == VIO::READ && alternate.valid());
+  vio.buffer.writer_for(abuf);
+  vio.set_continuation(c);
+  vio.ndone = 0;
+  vio.nbytes = nbytes;
+  vio.vc_server = this;
+  seek_to = offset;
+  ink_assert(c->mutex->thread_holding);
+
+  ink_assert(!in_progress);
+  if (!trigger && !recursive)
+    trigger = c->mutex->thread_holding->schedule_imm_local(this);
+  return &vio;
+}
+
+VIO *
+ClusterCacheVC::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuf, bool owner)
+{
+  ink_assert(vio.op == VIO::WRITE);
+  ink_assert(!owner && !in_progress);
+  vio.buffer.reader_for(abuf);
+  vio.set_continuation(c);
+  vio.ndone = 0;
+  vio.nbytes = nbytes;
+  doc_len = nbytes; // note: the doc_len maybe not the real length of the body
+  vio.vc_server = this;
+  ink_assert(c->mutex->thread_holding);
+
+  if (nbytes < (1 << 20))
+    priority = PRIORITY_MID;
+  else
+    priority = PRIORITY_LOW;
+
+  CacheHTTPInfo *r = &alternate;
+  SetIOWriteMessage msg;
+  msg.nbytes = nbytes;
+  int len = r->valid() ? r->marshal_length() : 0;
+  msg.hdr_len = len;
+  ink_assert(total_len == 0);
+  ink_assert((frag_type == CACHE_FRAG_TYPE_HTTP && len > 0) ||
+      (frag_type != CACHE_FRAG_TYPE_HTTP && len == 0));
+
+  if (len > 0) {
+    Ptr<IOBufferData> data;
+    data = new_IOBufferData(iobuffer_size_to_index(sizeof msg + len, MAX_BUFFER_SIZE_INDEX));
+    memcpy((char *) data->data(), &msg, sizeof(msg));
+    char *p = (char *) data->data() + sizeof msg;
+    int res = r->marshal(p, len);
+    ink_assert(res >= 0);
+    IOBufferBlock *ret = new_IOBufferBlock(data, sizeof msg + len, 0);
+    ret->_buf_end = ret->_end;
+    remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_WRITE_BEGIN, ret, -1, priority);
+  } else
+    remote_closed = cluster_send_message(cs, -CLUSTER_CACHE_DATA_WRITE_BEGIN, &msg, sizeof msg, priority);
+
+  if (!trigger && !recursive)
+    trigger = c->mutex->thread_holding->schedule_imm_local(this);
+  return &vio;
+}
+
+void
+ClusterCacheVC::do_io_close(int alerrno)
+{
+  ink_assert(mutex->thread_holding == this_ethread());
+  int previous_closed = closed;
+  closed = (alerrno == -1) ? 1 : -1;    // Stupid default arguments
+  DDebug("cache_close", "do_io_close %p %d %d", this, alerrno, closed);
+
+  // special case: to cache 0 bytes document
+  if (f.force_empty)
+    closed = 1;
+
+  if (!remote_closed) {
+    if (closed > 0 && vio.op == VIO::WRITE) {
+      if ((f.update && vio.nbytes == 0) || f.force_empty) {
+        //header only update
+        //
+        if (frag_type == CACHE_FRAG_TYPE_HTTP) {
+          if (alternate.valid()) {
+            SetIOCloseMessage msg;
+            msg.h_len = alternate.marshal_length();
+            msg.d_len = 0;
+            msg.total_len = 0;
+
+            Ptr<IOBufferData> d;
+            d = new_IOBufferData(iobuffer_size_to_index(sizeof msg + msg.h_len));
+            char *data = d->data();
+            memcpy(data, &msg, sizeof msg);
+
+            int res = alternate.marshal((char *) data + sizeof msg, msg.h_len);
+            ink_assert(res >= 0 && res <= msg.h_len);
+
+            IOBufferBlock *ret = new_IOBufferBlock(d, sizeof msg + msg.h_len, 0);
+            ret->_buf_end = ret->_end;
+
+            remote_closed = cluster_send_message(cs,
+                -CLUSTER_CACHE_HEADER_ONLY_UPDATE, ret, -1, PRIORITY_HIGH);
+          } else
+            remote_closed = cluster_send_message(cs, -CLUSTER_CACHE_DATA_CLOSE, &total_len,
+                sizeof total_len, PRIORITY_HIGH);
+        } else {
+          remote_closed = cluster_send_message(cs, -CLUSTER_CACHE_DATA_CLOSE,
+              &total_len, sizeof total_len, priority);
+        }
+
+        goto Lfree;
+      } else if ((total_len < vio.nbytes) || length > 0) {
+        int64_t ntodo = vio.ntodo() + length;
+        int64_t total_avail = vio.buffer.reader()->read_avail();
+        int64_t avail = total_avail;
+        int64_t towrite = avail + length;
+        if (towrite > ntodo) {
+          avail -= (towrite - ntodo);
+          towrite = ntodo;
+        }
+
+        if (!blocks && towrite) {
+          blocks = vio.buffer.reader()->block;
+          offset = vio.buffer.reader()->start_offset;
+        }
+
+        if (avail > 0) {
+          vio.buffer.reader()->consume(avail);
+          vio.ndone += avail;
+          total_len += avail;
+        }
+
+        if (vio.ntodo() > 0) {
+          Warning("writer closed success but still want more data");
+          remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_ABORT, NULL, 0,
+                        priority);
+          goto Lfree;
+        }
+
+        length = towrite;
+        ink_assert(total_len == vio.nbytes);
+        int flen = cache_config_target_fragment_size;
+        while (length >= flen) {
+          IOBufferBlock *ret = clone_IOBufferBlockList(blocks, offset, flen);
+          blocks = iobufferblock_skip(blocks, &offset, &length, flen);
+
+          remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_WRITE_DONE, ret,
+              -1, priority);
+          if (remote_closed)
+            goto Lfree;
+
+          data_sent += flen;
+          Debug("data_sent", "sent bytes %d, reminds %"PRId64"", flen, length);
+        }
+
+        if (length > 0) {
+          data_sent += length;
+          IOBufferBlock *ret = clone_IOBufferBlockList(blocks, offset, length);
+          blocks = iobufferblock_skip(blocks, &offset, &length, length);
+          remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_WRITE_DONE, ret, -1,
+              priority);
+          if (remote_closed)
+            goto Lfree;
+          Debug("data_sent", "sent bytes done: %"PRId64", reminds %"PRId64"", data_sent, length);
+        }
+      }
+
+      if (doc_len != vio.nbytes) {
+        // for trunk
+        ink_assert(total_len == vio.nbytes && length == 0);
+        remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_CLOSE,
+            &total_len, sizeof total_len, priority);
+        goto Lfree;
+      }
+      ink_assert(data_sent == total_len);
+    }
+
+    if (closed < 0 && vio.op == VIO::WRITE)
+      remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_ABORT, NULL, 0, PRIORITY_HIGH);
+
+    if (vio.op == VIO::READ && !in_progress) {
+      remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_ABORT, NULL, 0, PRIORITY_HIGH);
+    }
+  }
+Lfree:
+  if (!previous_closed && !recursive && !in_progress) {
+    free_ClusterCacheVC(this);
+  }
+}
+
+void
+ClusterCacheVC::reenable(VIO *avio)
+{
+  DDebug("cache_reenable", "reenable %p, trigger %p, in_progress %d", this, trigger, in_progress);
+  (void) avio;
+  ink_assert(avio->mutex->thread_holding);
+  if (!trigger && !in_progress) {
+    trigger = avio->mutex->thread_holding->schedule_imm_local(this);
+  }
+}
+
+void
+ClusterCacheVC::reenable_re(VIO *avio)
+{
+  DDebug("cache_reenable", "reenable %p", this);
+  (void) avio;
+  ink_assert(avio->mutex->thread_holding);
+
+  if (!trigger) {
+    if (!in_progress && !recursive) {
+      handleEvent(EVENT_NONE, (void *) 0);
+    } else if (!in_progress)
+      trigger = avio->mutex->thread_holding->schedule_imm_local(this);
+  }
+}
 // End of ClusterVConnection.cc

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/EventPoll.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/EventPoll.cc b/iocore/cluster/EventPoll.cc
new file mode 100644
index 0000000..8a6b9e6
--- /dev/null
+++ b/iocore/cluster/EventPoll.cc
@@ -0,0 +1,158 @@
+/** @file
+
+  A brief file description
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#include "EventPoll.h"
+
+EventPoll::EventPoll(const int size, int timeout) : _size(size)
+{
+  int bytes;
+
+#if TS_USE_EPOLL
+  _extra_events = EPOLLET;
+  _timeout = timeout;
+  _poll_fd = epoll_create(_size);
+  bytes = sizeof(struct epoll_event) * size;
+  _events = (struct epoll_event *)ats_malloc(bytes);
+#elif TS_USE_KQUEUE
+  _extra_events = INK_EV_EDGE_TRIGGER;
+  _timeout.tv_sec = timeout / 1000;
+  _timeout.tv_nsec = 1000000 * (timeout % 1000);
+  _poll_fd = kqueue();
+  bytes = sizeof(struct kevent) * size;
+  _events = (struct kevent *)ats_malloc(bytes);
+#elif TS_USE_PORT
+  _extra_events = 0;
+  _timeout.tv_sec = timeout / 1000;
+  _timeout.tv_nsec = 1000000 * (timeout % 1000);
+  _poll_fd = port_create();
+  bytes = sizeof(port_event_t) * size;
+  _events = (port_event_t *)ats_malloc(bytes);
+#endif
+}
+
+EventPoll::~EventPoll()
+{
+  ats_free(_events);
+  close(_poll_fd);
+}
+
+int EventPoll::attach(const int fd, const int e, void *data)
+{
+#if TS_USE_EPOLL
+  struct epoll_event ev;
+  memset(&ev, 0, sizeof(ev));
+  ev.events = e | _extra_events;
+  ev.data.ptr = data;
+  return epoll_ctl(_poll_fd, EPOLL_CTL_ADD, fd, &ev);
+#elif TS_USE_KQUEUE
+  struct kevent ev[2];
+  int n = 0;
+  if (e & EVENTIO_READ) {
+    EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | _extra_events, 0, 0, data);
+  }
+  if (e & EVENTIO_WRITE) {
+    EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | _extra_events, 0, 0, data);
+  }
+  return kevent(_poll_fd, ev, n, NULL, 0, NULL);
+#elif TS_USE_PORT
+  return port_associate(_poll_fd, PORT_SOURCE_FD, fd, e, data);
+#endif
+}
+
+int EventPoll::modify(const int fd, const int e, void *data)
+{
+#if TS_USE_EPOLL
+  struct epoll_event ev;
+  memset(&ev, 0, sizeof(ev));
+  ev.events = e | _extra_events;
+  ev.data.ptr = data;
+  return epoll_ctl(_poll_fd, EPOLL_CTL_MOD, fd, &ev);
+#elif TS_USE_KQUEUE
+  struct kevent ev[2];
+  int n = 0;
+  if (e & EVENTIO_READ) {
+    EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | _extra_events, 0, 0, data);
+  }
+  else {
+    EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, data);
+  }
+
+  if (e & EVENTIO_WRITE) {
+    EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | _extra_events, 0, 0, data);
+  }
+  else {
+    EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, data);
+  }
+  return kevent(_poll_fd, ev, n, NULL, 0, NULL);
+#elif TS_USE_PORT
+  return port_associate(_poll_fd, PORT_SOURCE_FD, fd, e, data);
+#endif
+}
+
+int EventPoll::detach(const int fd)
+{
+#if TS_USE_EPOLL
+  return epoll_ctl(_poll_fd, EPOLL_CTL_DEL, fd, NULL);
+#elif TS_USE_PORT
+  return port_dissociate(_poll_fd, PORT_SOURCE_FD, fd);
+#else
+  return 0;
+#endif
+}
+
+int EventPoll::poll()
+{
+#if TS_USE_EPOLL
+  return epoll_wait(_poll_fd, _events, _size, _timeout);
+#elif TS_USE_KQUEUE
+  return kevent(_poll_fd, NULL, 0, _events, _size, &_timeout);
+#elif TS_USE_PORT
+  int retval;
+  unsigned nget = 1;
+  if((retval = port_getn(_poll_fd, _events,
+          _size, &nget, &_timeout)) == 0)
+  {
+    result = (int)nget;
+  } else {
+    switch(errno) {
+      case EINTR:
+      case EAGAIN:
+      case ETIME:
+        if (nget > 0) {
+          result = (int)nget;
+        }
+        else {
+          result = 0;
+        }
+        break;
+      default:
+        result = -1;
+        break;
+    }
+  }
+  return result;
+#else
+#error port me
+#endif
+}
+

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/EventPoll.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/EventPoll.h b/iocore/cluster/EventPoll.h
new file mode 100644
index 0000000..ccb65de
--- /dev/null
+++ b/iocore/cluster/EventPoll.h
@@ -0,0 +1,105 @@
+/** @file
+
+  A brief file description
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#ifndef __EVENT_POLL_H__
+#define __EVENT_POLL_H__
+
+#include "P_Net.h"
+
+class EventPoll {
+  public:
+    EventPoll(const int size, int timeout);
+    ~EventPoll();
+    int attach(const int fd, const int e, void *data);
+    int modify(const int fd, const int e, void *data);
+    int detach(const int fd);
+    int poll();
+
+#if TS_USE_KQUEUE
+    /* we define these here as numbers, because for kqueue mapping them to a combination of
+     * filters / flags is hard to do. */
+    inline int kq_event_convert(int16_t event, uint16_t flags)
+    {
+      int r = 0;
+
+      if (event == EVFILT_READ) {
+        r |= INK_EVP_IN;
+      }
+      else if (event == EVFILT_WRITE) {
+        r |= INK_EVP_OUT;
+      }
+
+      if (flags & EV_EOF) {
+        r |= INK_EVP_HUP;
+      }
+      return r;
+    }
+#endif
+
+    inline int getEvents(const int index)
+    {
+#if TS_USE_EPOLL
+      return _events[index].events;
+#elif TS_USE_KQUEUE
+      /* we define these here as numbers, because for kqueue mapping them to a combination of
+       * filters / flags is hard to do. */
+      return kq_event_convert(_events[index].filter, _events[index].flags);
+#elif TS_USE_PORT
+      return _events[index].portev_events;
+#else
+#error port me
+#endif
+    }
+
+    inline void *getData(const int index)
+    {
+#if TS_USE_EPOLL
+      return _events[index].data.ptr;
+#elif TS_USE_KQUEUE
+      return _events[index].udata;
+#elif TS_USE_PORT
+      return _events[index].portev_user;
+#else
+#error port me
+#endif
+    }
+
+  protected:
+    int _size;  //max events (fd)
+    int _extra_events;
+    int _poll_fd;
+
+#if TS_USE_EPOLL
+    struct epoll_event *_events;
+    int _timeout;
+#elif TS_USE_KQUEUE
+    struct kevent *_events;
+    struct timespec _timeout;
+#elif TS_USE_PORT
+    port_event_t *_events;
+    timespec_t _timeout;
+#endif
+};
+
+#endif
+

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/Makefile.am
----------------------------------------------------------------------
diff --git a/iocore/cluster/Makefile.am b/iocore/cluster/Makefile.am
index 1d8266d..b49046d 100644
--- a/iocore/cluster/Makefile.am
+++ b/iocore/cluster/Makefile.am
@@ -57,5 +57,12 @@ libinkcluster_a_SOURCES = \
   P_ClusterLoadMonitor.h \
   P_ClusterMachine.h \
   P_TimeTrace.h \
-  Inline.cc
+  Inline.cc \
+  global.cc  \
+  nio.cc \
+  session.cc  \
+  message.cc \
+  connection.cc \
+  machine.cc \
+	EventPoll.cc
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/P_Cluster.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_Cluster.h b/iocore/cluster/P_Cluster.h
index aa3d6a5..f24d3de 100644
--- a/iocore/cluster/P_Cluster.h
+++ b/iocore/cluster/P_Cluster.h
@@ -123,6 +123,8 @@ enum
   cluster_stat_count
 };
 
+#define SIZE_OF_FRAGEMENT ((1 << 20) - 128)
+
 extern RecRawStatBlock *cluster_rsb;
 #define CLUSTER_INCREMENT_DYN_STAT(x) \
 	RecIncrRawStat(cluster_rsb, mutex->thread_holding, (int) x, 1);

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/P_ClusterCache.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_ClusterCache.h b/iocore/cluster/P_ClusterCache.h
index 10fc46a..162fef0 100644
--- a/iocore/cluster/P_ClusterCache.h
+++ b/iocore/cluster/P_ClusterCache.h
@@ -53,6 +53,9 @@
 /****************************************************************************/
 
 #include "P_ClusterMachine.h"
+#include "clusterinterface.h"
+
+extern int enable_cache_empty_http_doc;
 
 //
 // Cluster Processor
@@ -310,6 +313,7 @@ struct ClusterVCToken
 //
 typedef void ClusterFunction(ClusterHandler * ch, void *data, int len);
 typedef ClusterFunction *ClusterFunctionPtr;
+typedef void ClusterFunctionExt(ClusterSession cs, void *context, void *data);
 
 struct ClusterVConnectionBase;
 
@@ -512,7 +516,9 @@ struct ClusterVConnection: public ClusterVConnectionBase
   ClusterVConnection(int is_new_connect_read = 0);
   ~ClusterVConnection();
   void free();                  // Destructor actions (we are using ClassAllocator)
-
+  virtual bool is_read_from_writer() {
+    return false;
+  }
   virtual void do_io_close(int lerrno = -1);
   virtual VIO *do_io_read(Continuation * c, int64_t nbytes, MIOBuffer * buf);
   virtual VIO *do_io_write(Continuation * c, int64_t nbytes, IOBufferReader * buf, bool owner = false);
@@ -739,9 +745,9 @@ extern ClusterFunction close_channel_ClusterFunction;
 extern ClusterFunction get_hostinfo_ClusterFunction;
 extern ClusterFunction put_hostinfo_ClusterFunction;
 extern ClusterFunction cache_lookup_ClusterFunction;
-extern ClusterFunction cache_op_ClusterFunction;
+//extern ClusterFunction cache_op_ClusterFunction;
 extern ClusterFunction cache_op_malloc_ClusterFunction;
-extern ClusterFunction cache_op_result_ClusterFunction;
+//extern ClusterFunction cache_op_result_ClusterFunction;
 extern ClusterFunction set_channel_data_ClusterFunction;
 extern ClusterFunction post_setchan_send_ClusterFunction;
 extern ClusterFunction set_channel_pin_ClusterFunction;
@@ -750,6 +756,9 @@ extern ClusterFunction set_channel_priority_ClusterFunction;
 extern ClusterFunction post_setchan_priority_ClusterFunction;
 extern ClusterFunction default_api_ClusterFunction;
 
+extern ClusterFunctionExt cache_op_ClusterFunction;
+extern ClusterFunctionExt cache_op_result_ClusterFunction;
+
 struct ClusterFunctionDescriptor
 {
   bool fMalloced;               // the function will free the data
@@ -767,7 +776,8 @@ struct ClusterFunctionDescriptor
 #ifndef DEFINE_CLUSTER_FUNCTIONS
 extern
 #endif
-ClusterFunctionDescriptor clusterFunction[]
+ClusterFunctionDescriptor clusterFunction[1]
+#if 0
 #ifdef DEFINE_CLUSTER_FUNCTIONS
   = {
   {false, true, CMSG_LOW_PRI, test_ClusterFunction, 0},
@@ -863,7 +873,7 @@ ClusterFunctionDescriptor clusterFunction[]
   // ********** ADD NEW ENTRIES ABOVE THIS LINE ************
 }
 #endif
-
+#endif
 ;
 extern unsigned SIZE_clusterFunction;        // clusterFunction[] entries
 
@@ -983,10 +993,27 @@ ClusterFuncToQpri(int cluster_func)
 #define API_F29_CLUSTER_FUNCTION  	     	     79
 #define API_F30_CLUSTER_FUNCTION  	     	     80
 
-#define API_STARECT_CLUSTER_FUNCTION		     API_F01_CLUSTER_FUNCTION
-#define API_END_CLUSTER_FUNCTION		     API_F30_CLUSTER_FUNCTION
+#define CLUSTER_CACHE_OP_CLUSTER_FUNCTION      (CLUSTER_MSG_START+81)
+#define CLUSTER_CACHE_DATA_READ_BEGIN          (CLUSTER_MSG_START+82)
+#define CLUSTER_CACHE_DATA_READ_REENABLE       (CLUSTER_MSG_START+83)
+#define CLUSTER_CACHE_DATA_WRITE_BEGIN         (CLUSTER_MSG_START+84)
+#define CLUSTER_CACHE_HEADER_ONLY_UPDATE       (CLUSTER_MSG_START+85)
+#define CLUSTER_CACHE_DATA_CLOSE               (CLUSTER_MSG_START+86)
+#define CLUSTER_CACHE_DATA_ABORT               (CLUSTER_MSG_START+87)
+#define CLUSTER_CACHE_DATA_WRITE_DONE          (CLUSTER_MSG_START+88)
+
+#define CLUSTER_CACHE_OP_RESULT_CLUSTER_FUNCTION (CLUSTER_MSG_START+89)
+#define CLUSTER_CACHE_DATA_READ_DONE           (CLUSTER_MSG_START+90)
+#define CLUSTER_CACHE_DATA_ERROR               (CLUSTER_MSG_START+91)
 
-#define UNDEFINED_CLUSTER_FUNCTION                   0xFDEFFDEF
+#define CLUSTER_INTERNEL_ERROR                 (CLUSTER_MSG_START+100)
+#define CLUSTER_PING_CLUSTER_FUNCTION          (CLUSTER_MSG_START+101)                        1
+#define CLUSTER_PING_REPLY_CLUSTER_FUNCTION    (CLUSTER_MSG_START+102)
+
+#define API_STARECT_CLUSTER_FUNCTION           API_F01_CLUSTER_FUNCTION
+#define API_END_CLUSTER_FUNCTION               API_F30_CLUSTER_FUNCTION
+
+#define UNDEFINED_CLUSTER_FUNCTION             0xFDEFFDEF
 
 //////////////////////////////////////////////
 // Initial cluster connect exchange message
@@ -1171,4 +1198,328 @@ ClusterVC_remove_write(ClusterVConnectionBase * vc)
 }
 
 
+struct ClusterCacheVC: public CacheVConnection
+{
+  static int size_to_init;
+  Action _action;
+  Ptr<IOBufferData> buf;          // for read
+  Ptr<IOBufferData> first_buf;    // the head fragment
+  Ptr<IOBufferBlock> blocks; // data available to write
+
+  CacheHTTPInfo alternate;
+
+  VIO vio;
+  ink_hrtime start_time;
+  CacheFragType frag_type;
+  int64_t seek_to;                // pread offset
+  int64_t offset;                 // offset into 'blocks' of data to write
+  int64_t length;                 // length of data available to write
+  int64_t total_len;
+  int64_t data_sent;
+  int64_t doc_len;
+
+  int doc_pos;                // read position in 'buf'
+  int d_len;                  // the length of data in 'buf'
+
+  int closed;
+  int recursive;
+  int disk_io_priority;
+  int probe_depth;
+  MessagePriority priority;
+
+  time_t time_pin;
+  EThread *initial_thread;  // initial thread open_XX was called on
+  ClusterSession cs;
+  Event *trigger;
+  ContinuationHandler save_handler;
+
+
+  bool in_progress; //
+  bool remote_closed;
+  bool session_closed;
+
+  union
+  {
+    uint32_t flags;
+    struct
+    {
+      unsigned int use_first_key:1;
+      unsigned int overwrite:1; // overwrite first_key Dir if it exists
+      unsigned int close_complete:1; // WRITE_COMPLETE is final
+      unsigned int sync:1; // write to be committed to durable storage before WRITE_COMPLETE
+      unsigned int evacuator:1;
+      unsigned int single_fragment:1;
+      unsigned int evac_vector:1;
+      unsigned int lookup:1;
+      unsigned int update:1;
+      unsigned int remove:1;
+      unsigned int remove_aborted_writers:1;
+      unsigned int open_read_timeout:1; // UNUSED
+      unsigned int data_done:1;
+      unsigned int read_from_writer_called:1;
+      unsigned int not_from_ram_cache:1;        // entire object was from ram cache
+      unsigned int rewrite_resident_alt:1;
+      unsigned int readers:1;
+      unsigned int doc_from_ram_cache:1;
+#ifdef HIT_EVACUATE
+      unsigned int hit_evacuate:1;
+#endif
+#ifdef HTTP_CACHE
+      unsigned int force_empty:1; // used for cache empty http document
+#endif
+#ifdef SSD_CACHE
+      unsigned int read_from_ssd:1;
+      unsigned int write_into_ssd:1;
+      unsigned int ram_fixup:1;
+      unsigned int transistor:1;
+#endif
+    } f;
+  };
+  ClusterCacheVC();
+  VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf); // invoke remote
+  VIO *do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *buf, int64_t offset); // invoke remote
+  VIO *do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner = false); // invoke remote
+  void do_io_close(int lerrno = -1); // invoke remote ?
+  void reenable(VIO *avio); // invoke remote ?
+  void reenable_re(VIO *avio); // invoke remote ?
+
+  void do_remote_close(); // invoke remote, for cancel or error
+
+  virtual int get_header(void **, int *)
+  {
+    ink_assert(!"implemented");
+    return -1;
+  }
+  virtual int set_header(void *, int)
+  {
+    ink_assert(!"implemented");
+    return -1;
+  }
+  virtual int get_single_data(void **, int *)
+  {
+    ink_assert(!"implemented");
+    return -1;
+  }
+
+#ifdef HTTP_CACHE
+  virtual void set_http_info(CacheHTTPInfo *info) {
+    if (enable_cache_empty_http_doc) {
+      MIMEField *field = info->m_alt->m_response_hdr.field_find(
+          MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH);
+      if (field && !field->value_get_int64())
+        f.force_empty = 1;
+      else
+        f.force_empty = 0;
+    } else
+      f.force_empty = 0;
+    alternate.copy_shallow(info);
+    info->clear();
+  }
+  virtual void get_http_info(CacheHTTPInfo ** info) {
+    *info = &alternate;
+  }
+#endif
+
+  bool is_ram_cache_hit() {
+    ink_assert(vio.op == VIO::READ);
+    return !f.not_from_ram_cache;
+  }
+  virtual bool set_disk_io_priority(int priority)
+  {
+    disk_io_priority = priority;
+    return true;
+  }
+  virtual int get_disk_io_priority() {
+    return disk_io_priority;
+  }
+  virtual bool set_pin_in_cache(time_t t) {
+    time_pin = t;
+    return true;
+  }
+  virtual time_t get_pin_in_cache() {
+    return time_pin;
+  }
+  virtual int64_t get_object_size()
+  {
+    return alternate.object_size_get();
+  }
+  virtual bool is_read_from_writer()
+  {
+    return f.read_from_writer_called;
+  }
+  virtual bool is_ram_cache_hit() const
+  {
+    return !f.not_from_ram_cache;
+  }
+  virtual bool is_pread_capable()
+  {
+    return true;
+  }
+  void
+  cancel_trigger()
+  {
+    if (trigger) {
+      trigger->cancel_action();
+      trigger = NULL;
+    }
+  }
+
+  int calluser(int event);
+  int callcont(int event);
+  int handleRead(int event, void *data);
+  int openReadReadDone(int event, void *data);
+//  int handleWrite(int event, void *data);
+//  int openWriteWriteDone(int event, void *data);
+  int openReadStart(int event, void *data);
+  int openWriteStart(int event, void *data);
+  int openReadMain(int event, void *data);
+  int openWriteMain(int event, void *data);
+  int removeEvent(int event, void *data);
+};
+
+
+
+struct SetIOReadMessage: public ClusterMessageHeader
+{
+  int64_t nbytes;
+  int64_t offset;
+};
+
+struct SetIOWriteMessage: public ClusterMessageHeader
+{
+  int32_t hdr_len;
+  int64_t nbytes;
+};
+
+struct SetIOCloseMessage: public ClusterMessageHeader
+{
+  int h_len;
+  int d_len;
+  int64_t total_len;
+};
+
+struct SetIOReenableMessage: public ClusterMessageHeader
+{
+  int reenable;
+};
+struct SetResponseMessage: public ClusterMessageHeader
+{
+
+};
+
+inline IOBufferBlock *
+clone_IOBufferBlockList(IOBufferBlock *ab, int64_t offset, int64_t len)
+{
+  IOBufferBlock *b = ab;
+  IOBufferBlock *head = NULL;
+  IOBufferBlock *clone = NULL;
+
+  while (b && len >= 0) {
+    int64_t max_bytes = b->read_avail();
+    max_bytes -= offset;
+    if (max_bytes <= 0) {
+      offset = -max_bytes;
+      b = b->next;
+      continue;
+    }
+
+    if (!head) {
+      head = b->clone();
+      head->consume(offset);
+      clone = head;
+    } else {
+      clone->next = b->clone();
+      clone = clone->next;
+    }
+
+    len -= max_bytes;
+    b = b->next;
+    offset = 0;
+  }
+  if (clone && len < 0)
+    clone->fill(len);
+  return head;
+}
+
+ClusterCacheVC *new_ClusterCacheVC();
+void free_ClusterCacheVC(ClusterCacheVC *ccvc);
+
+inline int
+ClusterCacheVC::calluser(int event)
+{
+  recursive++;
+  ink_assert(this_ethread() == vio._cont->mutex->thread_holding);
+  vio._cont->handleEvent(event, (void *) &vio);
+  recursive--;
+  if (closed && !in_progress) {
+    free_ClusterCacheVC(this);
+    return EVENT_DONE;
+  }
+  return EVENT_CONT;
+}
+
+inline int
+ClusterCacheVC::callcont(int event)
+{
+  recursive++;
+  ink_assert(this_ethread() == _action.mutex->thread_holding);
+  _action.continuation->handleEvent(event, this);
+  recursive--;
+  if (closed && !in_progress) {
+    free_ClusterCacheVC(this);
+    return EVENT_DONE;
+  } else if (vio.vc_server)
+    handleEvent(EVENT_IMMEDIATE, 0);
+  return EVENT_DONE;
+}
+
+extern ClassAllocator<ClusterCacheVC> clusterCacheVCAllocator;
+
+inline ClusterCacheVC *
+new_ClusterCacheVC(Continuation *cont)
+{
+  EThread *t = cont->mutex->thread_holding;
+  ClusterCacheVC *c = clusterCacheVCAllocator.alloc();
+  c->_action = cont;
+  c->initial_thread = t;
+  c->mutex = cont->mutex;
+  c->start_time = ink_get_hrtime();
+  ink_assert(c->trigger == NULL);
+
+  Debug("cluster_cache_new", "new %p", c);
+  return c;
+}
+
+inline void
+free_ClusterCacheVC(ClusterCacheVC *cont)
+{
+  Debug("cluster_cache_free", "free %p", cont);
+  ink_assert(cont->mutex->thread_holding == this_ethread());
+
+  if (cont->trigger)
+    cont->trigger->cancel();
+  ink_assert(!cont->in_progress);
+
+  if (!cont->session_closed)
+    cluster_close_session(cont->cs);
+
+  cont->vio.buffer.clear();
+  cont->vio.mutex.clear();
+#ifdef HTTP_CACHE
+  if (cont->vio.op == VIO::WRITE)
+    cont->alternate.destroy();
+  else
+    cont->alternate.clear();
+#endif
+  cont->_action.cancelled = 0;
+  cont->_action.mutex.clear();
+  cont->mutex.clear();
+  cont->buf.clear();
+  cont->first_buf.clear();
+  cont->blocks.clear();
+
+  memset((char *) &cont->vio, 0, cont->size_to_init);
+
+  clusterCacheVCAllocator.free(cont);
+}
 #endif /* _Cluster_h */

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/P_ClusterCacheInternal.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_ClusterCacheInternal.h b/iocore/cluster/P_ClusterCacheInternal.h
index 8b62d44..44b675c 100644
--- a/iocore/cluster/P_ClusterCacheInternal.h
+++ b/iocore/cluster/P_ClusterCacheInternal.h
@@ -81,6 +81,88 @@ extern int ET_CLUSTER;
 #define PROBE_LOCAL_CACHE_FIRST        DO_REPLICATION
 #define PROBE_LOCAL_CACHE_LAST         false
 
+struct ClusterCont: public Continuation
+{
+  ClusterSession session;
+  Ptr<IOBufferBlock> data;
+  void *context;
+  int func_id;
+  int data_len;
+
+  Action _action;
+  int handleEvent(int event, void *d);
+  IOBufferData *copy_data();
+  int copy_data(char *buf, int size);
+  void consume(int size);
+};
+
+inline IOBufferData *
+ClusterCont::copy_data() {
+  IOBufferData *buf = new_IOBufferData(iobuffer_size_to_index(data_len, MAX_BUFFER_SIZE_INDEX));
+  char *p = buf->data();
+  for (IOBufferBlock *b = data; b; b = b->next) {
+    memcpy(p, b->_start, b->_end - b->_start);
+    p += b->_end - b->_start;
+  }
+  return buf;
+}
+
+inline void
+ClusterCont::consume(int size) {
+
+  int64_t sz = size;
+  while (data && sz >= data->read_avail()) {
+    sz -= data->read_avail();
+    data = data->next;
+  }
+  if (data)
+    data->_start += sz;
+
+  data_len = data_len > size ? (data_len - size) : 0;
+}
+
+inline int
+ClusterCont::copy_data(char *buf, int len)
+{
+  ink_assert(data_len >= len);
+  IOBufferBlock *b = data;
+  int64_t sz = len;
+  while (len > 0 && b) {
+    int64_t avail = b->read_avail();
+    sz -= avail;
+    if (sz < 0) {
+      memcpy(buf, b->_start, avail + sz);
+      sz = 0;
+      break;
+    } else {
+      memcpy(buf, b->_start, avail);
+      buf += avail;
+      b = b->next;
+    }
+  }
+  return len - (int) sz;
+}
+extern ClassAllocator<ClusterCont> clusterContAllocator;
+
+inline int
+ClusterCont::handleEvent(int, void *) {
+  if (func_id == CLUSTER_CACHE_OP_CLUSTER_FUNCTION)
+    cache_op_ClusterFunction(session, context, this);
+  else if (func_id == CLUSTER_CACHE_OP_RESULT_CLUSTER_FUNCTION)
+    cache_op_result_ClusterFunction(session, context, this);
+  else if (func_id == CLUSTER_INTERNEL_ERROR)
+    _action.continuation->handleEvent(func_id, NULL);
+  else
+    _action.continuation->handleEvent(func_id, this);
+
+  mutex.clear();
+  _action.mutex.clear();
+  data = NULL;
+
+  clusterContAllocator.free(this);
+  return EVENT_DONE;
+}
+
 //
 // This continuation handles all cache cluster traffic, on both
 // sides (state machine client and cache server)
@@ -89,111 +171,91 @@ struct CacheContinuation;
 typedef int (CacheContinuation::*CacheContHandler) (int, void *);
 struct CacheContinuation:public Continuation
 {
+  static int size_to_init;
   enum
   {
     MagicNo = 0x92183123
   };
   int magicno;
-  void *callback_data;
-  void *callback_data_2;
   INK_MD5 url_md5;
-  Event *timeout;
-  Action action;
-  ClusterMachine *target_machine;
-  int probe_depth;
+
   ClusterMachine *past_probes[CONFIGURATION_HISTORY_PROBE_DEPTH];
+
+  ClusterVCToken token;
+
+  CacheHTTPInfo cache_vc_info; // for get_http_info
+//  MIOBuffer doc_data;
+  Ptr<IOBufferBlock> doc_data;
+  // Incoming data generated from unmarshaling request/response ops
+  Ptr<IOBufferData> rw_buf_msg;
+  Arena ic_arena;
+  CacheHTTPHdr ic_request; // for lookup or read
+  CacheHTTPInfo ic_old_info; // for update
+  CacheHTTPInfo ic_new_info; // for set_http_info
+
+  ClusterSession cs;
+  char *ic_hostname;
+  int ic_hostname_len;
+
   ink_hrtime start_time;
-  ClusterMachine *from;
-  ClusterHandler *ch;
-  VConnection *cache_vc;
+  ClusterMachine *target_machine;
+  int probe_depth;
+
+  CacheVC *cache_vc;
+  Action *pending_action;
   bool cache_read;
+  bool request_purge;
+  bool have_all_data;           // all object data in response
+  bool expect_next;
+  bool writer_aborted;
   int result;                   // return event code
   int result_error;             // error code associated with event
-  ClusterVCToken token;
-  unsigned int seq_number;
   uint16_t cfl_flags;             // Request flags; see CFL_XXX defines
+
+  unsigned int seq_number;
   CacheFragType frag_type;
-  int nbytes;
+  int nbytes;       // the msg nbyts
   unsigned int target_ip;
   int request_opcode;
-  bool request_purge;
-  bool local_lookup_only;
-  bool no_reply_message;
-  bool request_timeout;         // timeout occurred before
-  //   op complete
-  bool expect_cache_callback;
-
-  // remove_and_delete() specific data
-  bool use_deferred_callback;
-
-  // open_read/write data
-
-  time_t pin_in_cache;
-
-  // setMsgBufferLen(), allocMsgBuffer() and freeMsgBuffer() data
-
-    Ptr<IOBufferData> rw_buf_msg;
+  int header_len;
   int rw_buf_msg_len;
 
-  // open data
-
-  ClusterVConnection *read_cluster_vc;
-  ClusterVConnection *write_cluster_vc;
-  int cluster_vc_channel;
-  ClusterVCToken open_local_token;
-
-  // Readahead on open read specific data
-
-  int caller_buf_freebytes;     // remote bufsize for
-  //  initial data
-  VIO *readahead_vio;
-  IOBufferReader *readahead_reader;
-    Ptr<IOBufferBlock> readahead_data;
-  bool have_all_data;           // all object data in response
-
-  CacheHTTPInfo cache_vc_info;
-  OneWayTunnel *tunnel;
-    Ptr<ProxyMutex> tunnel_mutex;
-  CacheContinuation *tunnel_cont;
-  bool tunnel_closed;
-  Action *cache_action;
-  Event *lookup_open_write_vc_event;
-
-  // Incoming data generated from unmarshaling request/response ops
-
-  Arena ic_arena;
-  CacheHTTPHdr ic_request;
-  CacheHTTPHdr ic_response;
+  time_t pin_in_cache;
+  int64_t doc_size;
+  int64_t total_length;
+  VIO *vio;                  //
+  IOBufferReader *reader;    // for normal read
   CacheLookupHttpConfig *ic_params;
-  CacheHTTPInfo ic_old_info;
-  CacheHTTPInfo ic_new_info;
-    Ptr<IOBufferData> ic_hostname;
-  int ic_hostname_len;
-
-  // debugging
-  int cache_op_ClusterFunction;
-
-  int lookupEvent(int event, void *d);
-  int probeLookupEvent(int event, void *d);
-  int remoteOpEvent(int event, Event * e);
-  int replyLookupEvent(int event, void *d);
-  int replyOpEvent(int event, VConnection * vc);
-  int handleReplyEvent(int event, Event * e);
-  int callbackEvent(int event, Event * e);
-  int setupVCdataRead(int event, VConnection * vc);
-  int VCdataRead(int event, VIO * target_vio);
-  int setupReadWriteVC(int, VConnection *);
-  ClusterVConnection *lookupOpenWriteVC();
-  int lookupOpenWriteVCEvent(int, Event *);
-  int localVCsetupEvent(int event, ClusterVConnection * vc);
-  void insert_cache_callback_user(ClusterVConnection *, int, void *);
-  int insertCallbackEvent(int, Event *);
-  void callback_user(int result, void *d);
-  void defer_callback_result(int result, void *d);
-  int callbackResultEvent(int event, Event * e);
-  void setupReadBufTunnel(VConnection *, VConnection *);
-  int tunnelClosedEvent(int event, void *);
-  int remove_and_delete(int, Event *);
+  MIOBuffer *mbuf;
+  EThread *thread;
+
+//  int lookupEvent(int event, void *d);
+//  int probeLookupEvent(int event, void *d);
+//  int remoteOpEvent(int event, Event * e);
+//  int replyLookupEvent(int event, void *d);
+  int replyOpEvent();
+//  int handleReplyEvent(int event, Event * e);
+//  int callbackEvent(int event, Event * e);
+  int setupVCdataRead(int event, void *data);
+  int setupVCdataWrite(int event, void *data);
+  int setupVCdataRemove(int event, void *data);
+  int setupVCdataLink(int event, void *data);
+  int setupVCdataDeref(int event, void *data);
+  int VCdataRead(int event, void *data);
+  int VCdataWrite(int event, void *data);
+  int VCSmallDataRead(int event, void *data);
+//  int setupReadWriteVC(int, VConnection *);
+//  ClusterVConnection *lookupOpenWriteVC();
+//  int lookupOpenWriteVCEvent(int, Event *);
+//  int localVCsetupEvent(int event, ClusterVConnection * vc);
+//  void insert_cache_callback_user(ClusterVConnection *, int, void *);
+//  int insertCallbackEvent(int, Event *);
+//  void callback_user(int result, void *d);
+//  void defer_callback_result(int result, void *d);
+//  int callbackResultEvent(int event, Event * e);
+//  void setupReadBufTunnel(VConnection *, VConnection *);
+//  int tunnelClosedEvent(int event, void *);
+//  int remove_and_delete(int, Event *);
 
 
   inline void setMsgBufferLen(int l, IOBufferData * b = 0) {
@@ -254,66 +316,26 @@ struct CacheContinuation:public Continuation
     if (ic_request.valid()) {
       ic_request.clear();
     }
-    if (ic_response.valid()) {
-      ic_response.clear();
-    }
+//    if (ic_response.valid()) {
+//      ic_response.clear();
+//    }
     if (ic_old_info.valid()) {
       ic_old_info.destroy();
     }
     if (ic_new_info.valid()) {
       ic_new_info.destroy();
     }
-    ic_arena.reset();
+//    ic_arena.reset();
     freeMsgBuffer();
-
-    tunnel_mutex = 0;
-    readahead_data = 0;
+//
+//    tunnel_mutex = 0;
+//    readahead_data = 0;
     ic_hostname = 0;
   }
 
-CacheContinuation():
-  Continuation(NULL),
-    magicno(MagicNo),
-    callback_data(0),
-    callback_data_2(0),
-    timeout(0),
-    target_machine(0),
-    probe_depth(0),
-    start_time(0),
-    cache_read(false),
-    result(0),
-    result_error(0),
-    seq_number(0),
-    cfl_flags(0),
-    frag_type(CACHE_FRAG_TYPE_NONE),
-    nbytes(0),
-    target_ip(0),
-    request_opcode(0),
-    request_purge(false),
-    local_lookup_only(0),
-    no_reply_message(0),
-    request_timeout(0),
-    expect_cache_callback(true),
-    use_deferred_callback(0),
-    pin_in_cache(0),
-    rw_buf_msg_len(0),
-    read_cluster_vc(0),
-    write_cluster_vc(0),
-    cluster_vc_channel(0),
-    caller_buf_freebytes(0),
-    readahead_vio(0),
-    readahead_reader(0),
-    have_all_data(false),
-    cache_vc_info(),
-    tunnel(0),
-    tunnel_cont(0),
-    tunnel_closed(0),
-    lookup_open_write_vc_event(0),
-    ic_arena(),
-    ic_request(),
-    ic_response(), ic_params(0), ic_old_info(), ic_new_info(), ic_hostname_len(0), cache_op_ClusterFunction(0) {
-    token.clear();
-    SET_HANDLER((CacheContHandler) & CacheContinuation::remoteOpEvent);
+  CacheContinuation(): magicno(MagicNo) {
+    size_to_init = sizeof(CacheContinuation) - (size_t) & ((CacheContinuation *) 0)->cs;
+    memset((char *) &cs, 0, size_to_init);
   }
 
   inline static bool is_ClusterThread(EThread * et)
@@ -334,14 +356,66 @@ CacheContinuation():
   static void cacheContAllocator_free(CacheContinuation *);
   inkcoreapi static Action *callback_failure(Action *, int, int, CacheContinuation * this_cc = 0);
   static Action *do_remote_lookup(Continuation *, CacheKey *, CacheContinuation *, CacheFragType, char *, int);
-  inkcoreapi static Action *do_op(Continuation *, ClusterMachine *, void *, int, char *, int,
-                                  int nbytes = -1, MIOBuffer * b = 0);
+  inkcoreapi static Action *do_op(Continuation * c, ClusterSession cs, void *args,
+                           int user_opcode, IOBufferData *data, int data_len, int nbytes = -1, MIOBuffer * b = 0);
   static int setup_local_vc(char *data, int data_len, CacheContinuation * cc, ClusterMachine * mp, Action **);
   static void disposeOfDataBuffer(void *buf);
   static int handleDisposeEvent(int event, CacheContinuation * cc);
-  static int32_t getObjectSize(VConnection *, int, CacheHTTPInfo *);
+  int32_t getObjectSize(VConnection *, int, CacheHTTPInfo *);
 };
 
+extern ClassAllocator<CacheContinuation> cacheContAllocator;
+
+inline CacheContinuation *
+new_CacheCont(EThread *t) {
+  ink_assert(t == this_ethread());
+  CacheContinuation *c = cacheContAllocator.alloc();
+  c->mutex = new_ProxyMutex();
+  c->start_time = ink_get_hrtime();
+  c->thread = t;
+  return c;
+}
+
+inline void
+free_CacheCont(CacheContinuation *c) {
+  ink_assert(c->magicno == (int) c->MagicNo && !c->expect_next);
+//  ink_assert(!c->cache_op_ClusterFunction);
+  if (c->pending_action) {
+    c->pending_action->cancel();
+    c->pending_action = NULL;
+  }
+  if (c->cache_vc) {
+    if (c->cache_vc->vio.op == VIO::READ)
+      c->cache_vc->do_io(VIO::CLOSE);
+    else
+      c->cache_vc->do_io(VIO::ABORT);
+    c->cache_vc = NULL;
+  }
+  if (c->mbuf) {
+    free_MIOBuffer(c->mbuf);
+    c->mbuf = NULL;
+  }
+
+  c->magicno = -1;
+  c->token.clear();
+  c->cache_vc_info.clear();
+  if (c->ic_params) {
+    delete c->ic_params;
+    c->ic_params = 0;
+  }
+  c->ic_request.clear();
+  c->ic_old_info.clear();
+  c->ic_new_info.destroy();
+  c->ic_arena.reset();
+  c->freeMsgBuffer();
+  c->ic_hostname = 0;
+  c->mutex.clear();
+
+  c->doc_data = NULL;
+
+  cacheContAllocator.free(c);
+}
+
 /////////////////////////////////////////
 // Cache OP specific args for do_op()  //
 /////////////////////////////////////////
@@ -595,18 +669,19 @@ struct CacheOpReplyMsg:public ClusterMessageHeader
 {
   uint32_t seq_number;
   int32_t result;
-  ClusterVCToken token;
-  bool is_ram_cache_hit;          // Entire object was from ram cache
-  Alias32 moi;                 // Used by CACHE_OPEN_READ & CACHE_LINK reply
+  int32_t h_len;
+  int32_t d_len;
+  int32_t reason; // // Used by CACHE_OPEN_READ & CACHE_LINK reply
+  int64_t doc_size;
+
   enum
   {
     MIN_VERSION = 1,
     MAX_VERSION = 1,
     CACHE_OP_REPLY_MESSAGE_VERSION = MAX_VERSION
   };
-  CacheOpReplyMsg(uint16_t vers = CACHE_OP_REPLY_MESSAGE_VERSION)
-    : ClusterMessageHeader(vers), seq_number(0), result(0), is_ram_cache_hit(false) {
-    moi.u32 = 0;
+  CacheOpReplyMsg(uint16_t vers = CACHE_OP_REPLY_MESSAGE_VERSION):
+  ClusterMessageHeader(vers), seq_number(0), result(0), h_len(0), d_len(0), reason(0), doc_size(0) {
   }
 
   //////////////////////////////////////////////////////////////////////////
@@ -617,7 +692,7 @@ struct CacheOpReplyMsg:public ClusterMessageHeader
   }
   static int sizeof_fixedlen_msg()
   {
-    return (int) ALIGN_DOUBLE(offsetof(CacheOpReplyMsg, moi));
+    return INK_ALIGN(sizeof (CacheOpReplyMsg), 16);
   }
   void init(uint16_t vers = CACHE_OP_REPLY_MESSAGE_VERSION) {
     _init(vers);
@@ -627,12 +702,12 @@ struct CacheOpReplyMsg:public ClusterMessageHeader
     if (NeedByteSwap()) {
       ats_swap32(&seq_number);
       ats_swap32((uint32_t *) & result);
-      token.SwapBytes();
+      ats_swap32((uint32_t *) & reason);
+      ats_swap64((uint64_t *) & doc_size);
     }
   }
   //////////////////////////////////////////////////////////////////////////
 };
-
 inline int
 maxval(int a, int b)
 {
@@ -795,6 +870,7 @@ event_reply_may_have_moi(int event)
 {
   switch (event) {
   case CACHE_EVENT_OPEN_READ:
+  case CACHE_EVENT_OPEN_WRITE:
   case CACHE_EVENT_LINK:
   case CACHE_EVENT_LINK_FAILED:
   case CACHE_EVENT_OPEN_READ_FAILED: