You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2009/12/10 00:01:30 UTC

svn commit: r889011 [2/2] - in /incubator/trafficserver/traffic/trunk: iocore/net/ proxy/

Modified: incubator/trafficserver/traffic/trunk/iocore/net/UnixNetVConnection.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/UnixNetVConnection.cc?rev=889011&r1=889010&r2=889011&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/net/UnixNetVConnection.cc (original)
+++ incubator/trafficserver/traffic/trunk/iocore/net/UnixNetVConnection.cc Wed Dec  9 23:01:29 2009
@@ -38,8 +38,6 @@
 #define NET_MAX_IOV UIO_MAXIOV
 #endif
 
-#define NET_THREAD_STEALING
-
 #ifdef DEBUG
 // Initialize class UnixNetVConnection static data
 int
@@ -58,34 +56,24 @@
 
 //
 // Reschedule a UnixNetVConnection by placing the VC 
-// into ReadyQueue or WaitList
+// into ready_list or wait_list
 //
 static inline void
 read_reschedule(NetHandler * nh, UnixNetVConnection * vc)
 {
   if (vc->read.triggered && vc->read.enabled) {
-    if (!vc->read.netready_queue) {
-      nh->ready_queue.epoll_addto_read_ready_queue(vc);
-    }
-  } else {
-    if (vc->read.netready_queue) {
-      ReadyQueue::epoll_remove_from_read_ready_queue(vc);
-    }
-  }
+    nh->read_ready_list.in_or_enqueue(vc);
+  } else
+    nh->read_ready_list.remove(vc);
 }
 
 static inline void
 write_reschedule(NetHandler * nh, UnixNetVConnection * vc)
 {
   if (vc->write.triggered && vc->write.enabled) {
-    if (!vc->write.netready_queue) {
-      nh->ready_queue.epoll_addto_write_ready_queue(vc);
-    }
-  } else {
-    if (vc->write.netready_queue) {
-      ReadyQueue::epoll_remove_from_write_ready_queue(vc);
-    }
-  }
+    nh->write_ready_list.in_or_enqueue(vc);
+  } else
+    nh->write_ready_list.remove(vc);
 }
 
 void
@@ -114,7 +102,6 @@
 
 //
 // Function used to close a UnixNetVConnection and free the vc
-// Modified by YTS Team, yamsat
 //
 void
 close_UnixNetVConnection(UnixNetVConnection * vc, EThread * t)
@@ -122,17 +109,13 @@
   if (vc->loggingEnabled()) {
     vc->addLogMessage("close_UnixNetVConnection");
     // display the slow log for the http client session
-    if (vc->getLogsTotalTime() / 1000000 > 30000) {
+    if (vc->getLogsTotalTime() / 1000000 > 30000)
       vc->printLogs();
-    }
     vc->clearLogs();
   }
 
-  XTIME(printf("%d %d close\n", vc->id, (int) ((ink_get_hrtime_internal() - vc->submit_time) / HRTIME_MSECOND)));
-
   vc->cancel_OOB();
 
-  //added by YTS Team, yamsat
   PollDescriptor *pd = get_PollDescriptor(t);
 #if defined(USE_EPOLL)
   struct epoll_event ev;
@@ -147,10 +130,6 @@
 #else
 #error port me
 #endif
-  if (vc->ep != NULL) {
-    xfree(vc->ep);
-    vc->ep = NULL;
-  }
 
   socketManager.fast_close(vc->con.fd);
   vc->con.fd = NO_FD;
@@ -170,38 +149,23 @@
   }
   vc->active_timeout_in = 0;
 
-  //added by YTS Team, yamsat
-
-  if (vc->read.queue) {
-    WaitList::epoll_remove_from_read_wait_list(vc);
-  }
-  if (vc->write.queue) {
-    WaitList::epoll_remove_from_write_wait_list(vc);
-  }
-
-  if (vc->read.netready_queue) {
-    ReadyQueue::epoll_remove_from_read_ready_queue(vc);
-  }
-  if (vc->write.netready_queue) {
-    ReadyQueue::epoll_remove_from_write_ready_queue(vc);
+  NetHandler *nh = vc->nh;
+  nh->open_list.remove(vc);
+  nh->read_ready_list.remove(vc);
+  nh->write_ready_list.remove(vc);
+  if (vc->read.in_enabled_list) {
+    ink_assert(vc->closed);
+    ink_assert(nh->read_enable_list.remove(vc) == vc);
+    vc->read.in_enabled_list = 0;
+  }
+  ink_assert(vc->read.enable_link.next == NULL);
+  if (vc->write.in_enabled_list) {
+    ink_assert(vc->closed);
+    ink_assert(nh->write_enable_list.remove(vc) == vc);
+    vc->write.in_enabled_list = 0;
   }
+  ink_assert(vc->write.enable_link.next == NULL);
 
-  if (vc->read.enable_queue) {
-    ((Queue<UnixNetVConnection> *)vc->read.enable_queue)->remove(vc, vc->read.enable_link);
-    vc->read.enable_queue = NULL;
-  }
-  if (vc->write.enable_queue) {
-    ((Queue<UnixNetVConnection> *)vc->write.enable_queue)->remove(vc, vc->write.enable_link);
-    vc->write.enable_queue = NULL;
-  }
-  // clear variables for reuse
-  vc->nh = NULL;                //added by YTS Team, yamsat
-  vc->closed = 1;
-  vc->read.ifd = -1;
-  vc->read.triggered = 0;       //added by YTS Team, yamsat
-  vc->write.ifd = -1;
-  vc->write.triggered = 0;      //added by YTS Team, yamsat
-  vc->options.reset();
   vc->free(t);
 }
 
@@ -278,19 +242,16 @@
 
 // read the data for a UnixNetVConnection.
 // Rescheduling the UnixNetVConnection by placing the VC into 
-// ReadyQueue (or) WaitList
+// ready_list (or) wait_list
 // Had to wrap this function with net_read_io to make SSL work..
 static void
 read_from_net(NetHandler * nh, UnixNetVConnection * vc, EThread * thread)
 {
   vc->addLogMessage("read from net");
-  NetState *
-    s = &vc->read;
-  ProxyMutex *
-    mutex = thread->mutex;
+  NetState * s = &vc->read;
+  ProxyMutex * mutex = thread->mutex;
   MIOBufferAccessor & buf = s->vio.buffer;
-  int
-    r = 0;
+  int r = 0;
 
   MUTEX_TRY_LOCK_FOR(lock, s->vio.mutex, thread, s->vio._cont);
 
@@ -381,34 +342,28 @@
       if (r == -EAGAIN || r == -ENOTCONN) {
         NET_DEBUG_COUNT_DYN_STAT(net_calls_to_read_nodata_stat, 1);
         vc->addLogMessage("EAGAIN or ENOTCONN");
-
         vc->read.triggered = 0;
-        ReadyQueue::epoll_remove_from_read_ready_queue(vc);
-
+        nh->read_ready_list.remove(vc);
         return;
       }
 
       if (!r || r == -ECONNRESET) {
         // display the slow log for the http client session
         if (vc->loggingEnabled()) {
-          if (vc->getLogsTotalTime() / 1000000 > 30000) {
+          if (vc->getLogsTotalTime() / 1000000 > 30000)
             vc->printLogs();
-          }
           vc->clearLogs();
         }
         // connection is closed
         vc->read.triggered = 0;
-        ReadyQueue::epoll_remove_from_read_ready_queue(vc);
+        nh->read_ready_list.remove(vc);
         read_signal_done(VC_EVENT_EOS, nh, vc);
-
         return;
       }
       vc->read.triggered = 0;
       read_signal_error(nh, vc, -r);
       return;
     }
-    NET_TRUSS(Debug("net_truss_read", "VC[%d:%d], read %d bytes", vc->id, vc->con.fd, r));
-    XTIME(printf("%d %d read: %d\n", vc->id, (int) ((ink_get_hrtime() - vc->submit_time) / HRTIME_MSECOND), r));
     NET_SUM_DYN_STAT(net_read_bytes_stat, r);
 
     // Add data to buffer and signal continuation.
@@ -461,8 +416,7 @@
 void
 write_to_net(NetHandler * nh, UnixNetVConnection * vc, PollDescriptor * pd, EThread * thread)
 {
-  ProxyMutex *
-    mutex = thread->mutex;
+  ProxyMutex *mutex = thread->mutex;
 
   NET_DEBUG_COUNT_DYN_STAT(net_calls_to_writetonet_stat, 1);
   NET_DEBUG_COUNT_DYN_STAT(net_calls_to_writetonet_afterpoll_stat, 1);
@@ -476,22 +430,18 @@
 {
   vc->addLogMessage("write to net io");
 
-  NetState *
-    s = &vc->write;
-  ProxyMutex *
-    mutex = thread->mutex;
+  NetState *s = &vc->write;
+  ProxyMutex *mutex = thread->mutex;
 
   MUTEX_TRY_LOCK_FOR(lock, s->vio.mutex, thread, s->vio._cont);
 
-  if (!lock || lock.m.m_ptr != s->vio.mutex.m_ptr) {
+  if (!lock || lock.m.m_ptr != s->vio.mutex.m_ptr)
     return;
-  }
+
   // This function will always return true unless
   // vc is an SSLNetVConnection.
   if (!vc->getSSLHandShakeComplete()) {
-    int
-      err,
-      ret;
+    int err, ret;
 
     if (vc->getSSLClientConnection())
       ret = vc->sslStartHandShake(SSL_EVENT_CLIENT, err);
@@ -499,31 +449,20 @@
       ret = vc->sslStartHandShake(SSL_EVENT_SERVER, err);
 
     if (ret == EVENT_ERROR) {
-      if (vc->write.triggered) {
-        vc->write.triggered = 0;
-      }
+      vc->write.triggered = 0;
       write_signal_error(nh, vc, err);
     } else if (ret == SSL_HANDSHAKE_WANT_READ || ret == SSL_HANDSHAKE_WANT_ACCEPT || ret == SSL_HANDSHAKE_WANT_CONNECT
                || ret == SSL_HANDSHAKE_WANT_WRITE) {
       vc->read.triggered = 0;
-      if (vc->read.netready_queue) {
-        ReadyQueue::epoll_remove_from_read_ready_queue(vc);
-      }
+      nh->read_ready_list.remove(vc);
       vc->write.triggered = 0;
-      if (vc->write.netready_queue) {
-        ReadyQueue::epoll_remove_from_write_ready_queue(vc);
-      }
+      nh->write_ready_list.remove(vc);
     } else if (ret == EVENT_DONE) {
       vc->read.triggered = 1;
-      if (vc->read.enabled) {
-        if (!vc->read.netready_queue) {
-          nh->ready_queue.epoll_addto_read_ready_queue(vc);
-        }
-      }
-
-    } else {
+      if (vc->read.enabled)
+        nh->read_ready_list.in_or_enqueue(vc); 
+    } else
       write_reschedule(nh, vc);
-    }
     return;
   }
   // If it is not enabled,add to WaitList.
@@ -532,8 +471,7 @@
     return;
   }
   // If there is nothing to do, disable
-  int
-    ntodo = s->vio.ntodo();
+  int ntodo = s->vio.ntodo();
   if (ntodo <= 0) {
     write_disable(nh, vc);
     return;
@@ -543,12 +481,10 @@
   ink_debug_assert(buf.writer());
 
   // Calculate amount to write
-  int
-    towrite = buf.reader()->read_avail();
+  int towrite = buf.reader()->read_avail();
   if (towrite > ntodo)
     towrite = ntodo;
-  int
-    signalled = 0;
+  int signalled = 0;
 
   // signal write ready to allow user to fill the buffer
   if (towrite != ntodo && buf.writer()->write_avail()) {
@@ -573,12 +509,10 @@
     return;
   }
 
-  int
-    r = 0, total_wrote = 0, wattempted = 0;
-  r = vc->loadBufferAndCallWrite(towrite, wattempted, total_wrote, buf);
+  int total_wrote = 0, wattempted = 0;
+  int r = vc->loadBufferAndCallWrite(towrite, wattempted, total_wrote, buf);
   if (vc->loggingEnabled()) {
-    char
-      message[256];
+    char message[256];
     snprintf(message, sizeof(message), "rval: %d towrite: %d ntodo: %d total_wrote: %d", r, towrite, ntodo,
              total_wrote);
     vc->addLogMessage(message);
@@ -595,9 +529,7 @@
     if (r == -EAGAIN || r == -ENOTCONN) {
       NET_DEBUG_COUNT_DYN_STAT(net_calls_to_write_nodata_stat, 1);
       vc->write.triggered = 0;
-      if (vc->write.netready_queue) {
-        ReadyQueue::epoll_remove_from_write_ready_queue(vc);
-      }
+      nh->write_ready_list.remove(vc);
       return;
     }
     if (!r || r == -ECONNRESET) {
@@ -609,9 +541,6 @@
     write_signal_error(nh, vc, -r);
     return;
   } else {
-    NET_TRUSS(Debug("net_truss_write", "VC[%d:%d], write %d bytes", vc->id, vc->con.fd, r));
-    XTIME(printf("%d %d write: %d\n", vc->id,
-                 (int) ((ink_get_hrtime_internal() - vc->submit_time) / HRTIME_MSECOND), r));
     NET_SUM_DYN_STAT(net_write_bytes_stat, r);
 
     // Remove data from the buffer and signal continuation.
@@ -650,23 +579,17 @@
 VIO *
 UnixNetVConnection::do_io_read(Continuation * c, int nbytes, MIOBuffer * buf)
 {
-  //  addLogMessage("do_io_read");
-  NET_TRUSS(Debug("net_truss_read", "VC[%d:%d] do_io_read(%d)", id, con.fd, nbytes));
   ink_assert(!closed);
   if (buf)
     read.vio.buffer.writer_for(buf);
   else
     read.vio.buffer.clear();
-  // boost the NetVC
-  read.priority = INK_MIN_PRIORITY;
-  write.priority = INK_MIN_PRIORITY;
   read.vio.op = VIO::READ;
   read.vio.mutex = c->mutex;
   read.vio._cont = c;
   read.vio.nbytes = nbytes;
   read.vio.ndone = 0;
   read.vio.vc_server = (VConnection *) this;
-  XTIME(printf("%d %d do_io_read\n", id, (int) ((ink_get_hrtime_internal() - submit_time) / HRTIME_MSECOND)));
   if (buf) {
     if (!read.enabled)
       read.vio.reenable();
@@ -680,24 +603,18 @@
 UnixNetVConnection::do_io_write(Continuation * acont, int anbytes, IOBufferReader * abuffer, bool owner)
 {
   addLogMessage("do_io_write");
-
-  NET_TRUSS(Debug("net_truss_write", "VC[%d:%d] do_io_write(%d)", id, con.fd, anbytes));
   ink_assert(!closed);
   if (abuffer) {
     ink_assert(!owner);
     write.vio.buffer.reader_for(abuffer);
   } else
     write.vio.buffer.clear();
-  // boost the NetVC
-  read.priority = INK_MIN_PRIORITY;
-  write.priority = INK_MIN_PRIORITY;
   write.vio.op = VIO::WRITE;
   write.vio.mutex = acont->mutex;
   write.vio._cont = acont;
   write.vio.nbytes = anbytes;
   write.vio.ndone = 0;
   write.vio.vc_server = (VConnection *) this;
-  XTIME(printf("%d %d do_io_write\n", id, (int) ((ink_get_hrtime_internal() - submit_time) / HRTIME_MSECOND)));
   if (abuffer) {
     if (!write.enabled)
       write.vio.reenable();
@@ -834,142 +751,92 @@
 
 //
 // Function used to reenable the VC for reading or
-// writing. Modified by YTS Team, yamsat
+// writing.
 //
 void
 UnixNetVConnection::reenable(VIO * vio)
 {
-
-  if (STATE_FROM_VIO(vio)->enabled) {
+  if (STATE_FROM_VIO(vio)->enabled)
     return;
-  }
-  NET_TRUSS(Debug("net_truss", "VC[%d:%d] UnixNetVConnection::reenable", id, con.fd));
   set_enabled(vio);
-#ifdef NET_THREAD_STEALING
   if (!thread)
     return;
   EThread *t = vio->mutex->thread_holding;
   ink_debug_assert(t == this_ethread());
-  //Modified by YTS Team, yamsat
   if (nh->mutex->thread_holding == t) {
     if (vio == &read.vio) {
-      if (read.triggered) {
-        if (!read.netready_queue) {
-          nh->ready_queue.epoll_addto_read_ready_queue(this);
-        }
-      } else {
-        if (read.netready_queue) {
-          ReadyQueue::epoll_remove_from_read_ready_queue(this);
-        }
-      }
-
+      if (read.triggered)
+        nh->read_ready_list.in_or_enqueue(this);
+      else
+        nh->read_ready_list.remove(this);
     } else {
-      if (write.triggered) {
-        if (!write.netready_queue) {
-          nh->ready_queue.epoll_addto_write_ready_queue(this);
-        }
-      } else {
-        if (write.netready_queue) {
-          ReadyQueue::epoll_remove_from_write_ready_queue(this);
-        }
-      }
+      if (write.triggered)
+        nh->write_ready_list.in_or_enqueue(this);
+      else
+        nh->write_ready_list.remove(this);
     }
   } else if (!nh->mutex->is_thread()) {
     MUTEX_TRY_LOCK(lock, nh->mutex, t);
     if (!lock) {
       if (vio == &read.vio) {
-        ink_mutex_acquire(&nh->read_enable_mutex.m_ptr->the_mutex);
-        if (!read.enable_queue) {
-          read.enable_queue = &nh->read_enable_list;
-          nh->read_enable_list.enqueue(this, read.enable_link);
+        if (!read.in_enabled_list) {
+          read.in_enabled_list = 1;
+          nh->read_enable_list.push(this);
         }
-        ink_mutex_release(&nh->read_enable_mutex.m_ptr->the_mutex);
       } else {
-        ink_mutex_acquire(&nh->write_enable_mutex.m_ptr->the_mutex);
-        if (!write.enable_queue) {
-          write.enable_queue = &nh->write_enable_list;
-          nh->write_enable_list.enqueue(this, write.enable_link);
-        }
-        ink_mutex_release(&nh->write_enable_mutex.m_ptr->the_mutex);
-      }
-      return;
-    }
-    if (vio == &read.vio) {
-      if (read.triggered) {
-        if (!read.netready_queue) {
-          nh->ready_queue.epoll_addto_read_ready_queue(this);
-        }
-      } else {
-        if (read.netready_queue) {
-          ReadyQueue::epoll_remove_from_read_ready_queue(this);
+        if (!write.in_enabled_list) {
+          write.in_enabled_list = 1;
+          nh->write_enable_list.push(this);
         }
       }
     } else {
-      if (write.triggered) {
-        if (!write.netready_queue) {
-          nh->ready_queue.epoll_addto_write_ready_queue(this);
-        }
+      if (vio == &read.vio) {
+        if (read.triggered)
+          nh->read_ready_list.in_or_enqueue(this);
+        else
+          nh->read_ready_list.remove(this);
       } else {
-        if (write.netready_queue) {
-          ReadyQueue::epoll_remove_from_write_ready_queue(this);
-        }
+        if (write.triggered)
+          nh->write_ready_list.in_or_enqueue(this);
+        else
+          nh->write_ready_list.remove(this);
       }
     }
   }
-#endif
 }
 
 void
 UnixNetVConnection::reenable_re(VIO * vio)
 {
-
   set_enabled(vio);
-
-#ifdef NET_THREAD_STEALING
-  if (!thread) {
+  if (!thread)
     return;
-  }
   EThread *t = vio->mutex->thread_holding;
   ink_debug_assert(t == this_ethread());
   if (nh->mutex->thread_holding == t) {
     if (vio == &read.vio) {
-      if (read.triggered) {
+      if (read.triggered)
         net_read_io(nh, t);
-      } else {
-        if (read.netready_queue != NULL) {
-          ReadyQueue::epoll_remove_from_read_ready_queue(this);
-        }
-      }
+      else
+        nh->read_ready_list.remove(this);
     } else {
-      if (write.triggered) {
+      if (write.triggered)
         write_to_net(nh, this, NULL, t);
-      } else {
-        if (write.netready_queue != NULL) {
-          ReadyQueue::epoll_remove_from_write_ready_queue(this);
-        }
-      }
+      else
+        nh->write_ready_list.remove(this);
     }
   }
-#endif
-}
-
-
-void
-UnixNetVConnection::boost()
-{
-  ink_assert(thread);
 }
 
 
 UnixNetVConnection::UnixNetVConnection():
-closed(1), inactivity_timeout_in(0), active_timeout_in(0),
+closed(0), inactivity_timeout_in(0), active_timeout_in(0),
 #ifdef INACTIVITY_TIMEOUT
   inactivity_timeout(NULL),
 #else
   next_inactivity_timeout_at(0),
 #endif
-  active_timeout(NULL), ep(NULL),       //added by YTS Team, yamsat
-  nh(NULL),                     //added by YTS Team, yamsat
+  active_timeout(NULL), nh(NULL),
   id(0), ip(0), _interface(0), accept_port(0), port(0), flags(0), recursion(0), submit_time(0), oob_ptr(0)
 {
   memset(&local_sa, 0, sizeof local_sa);
@@ -987,11 +854,9 @@
   STATE_FROM_VIO(vio)->enabled = 1;
 #ifdef DEBUG
   if (vio == &read.vio) {
-    XTIME(printf("%d %d reenable read\n", id, (int) ((ink_get_hrtime_internal() - submit_time) / HRTIME_MSECOND)));
     if (enable_debug_trace && (vio->buffer.mbuf && !vio->buffer.writer()->write_avail()));
   } else {
     ink_assert(vio == &write.vio);
-    XTIME(printf("%d %d reenable write\n", id, (int) ((ink_get_hrtime_internal() - submit_time) / HRTIME_MSECOND)));
     if (enable_debug_trace && (vio->buffer.mbuf && !vio->buffer.reader()->read_avail()));
   }
 #endif
@@ -1004,8 +869,6 @@
 #endif
 }
 
-
-
 void
 UnixNetVConnection::net_read_io(NetHandler * nh, EThread * lthread)
 {
@@ -1145,25 +1008,17 @@
 
   SET_HANDLER((NetVConnHandler) & UnixNetVConnection::mainEvent);
 
-  //added by YTS Team, yamsat
   nh = get_NetHandler(thread);
   PollDescriptor *pd = get_PollDescriptor(thread);
-
-  struct epoll_data_ptr *eptr;
-  eptr = (struct epoll_data_ptr *) xmalloc(sizeof(struct epoll_data_ptr));
-  eptr->type = EPOLL_READWRITE_VC;
-  eptr->data.vc = this;
-
-  this->ep = eptr;
-
-  closed = 0;
+  ep.type = EPOLL_READWRITE_VC;
+  ep.data.vc = this;
 
 #if defined(USE_EPOLL)
   struct epoll_event ev;
   memset(&ev, 0, sizeof(struct epoll_event));
 
   ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
-  ev.data.ptr = eptr;
+  ev.data.ptr = &ep;
   //printf("Added to epoll ctl fd %d and number is %d\n",con.fd,id);
   if (epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, con.fd, &ev) < 0) {
     Debug("iocore_net", "acceptEvent : Failed to add to epoll list\n");
@@ -1172,8 +1027,8 @@
   }
 #elif defined(USE_KQUEUE)
   struct kevent ev[2];
-  EV_SET(&ev[0], con.fd, EVFILT_READ, EV_ADD, 0, 0, eptr);
-  EV_SET(&ev[1], con.fd, EVFILT_WRITE, EV_ADD, 0, 0, eptr);
+  EV_SET(&ev[0], con.fd, EVFILT_READ, EV_ADD, 0, 0, &ep);
+  EV_SET(&ev[1], con.fd, EVFILT_WRITE, EV_ADD, 0, 0, &ep);
   if (kevent(pd->kqueue_fd, &ev[0], 2, NULL, 0, NULL) < 0) {
     Debug("iocore_net", "acceptEvent : Failed to add to kqueue list\n");
     close_UnixNetVConnection(this, e->ethread);
@@ -1183,14 +1038,7 @@
 #error port me
 #endif
 
-  Debug("iocore_net", "acceptEvent : Adding fd %d to read wait list\n", con.fd);
-  nh->wait_list.epoll_addto_read_wait_list(this);
-  Debug("iocore_net", "acceptEvent : Adding fd %d to write wait list\n", con.fd);
-  nh->wait_list.epoll_addto_write_wait_list(this);
-
-  //Debug("iocore_net", "acceptEvent : Setting triggered and adding to the read ready queue");
-  //read.triggered = 1;
-  //nh->ready_queue.epoll_addto_read_ready_queue(this);
+  nh->open_list.enqueue(this);
 
   if (inactivity_timeout_in)
     UnixNetVConnection::set_inactivity_timeout(inactivity_timeout_in);
@@ -1209,7 +1057,6 @@
 UnixNetVConnection::mainEvent(int event, Event * e)
 {
   addLogMessage("main event");
-  NET_TRUSS(Debug("netvc_truss_timeout", "UnixNetVConnection[%d]::timeout", con.fd));
   ink_debug_assert(event == EVENT_IMMEDIATE || event == EVENT_INTERVAL);
   /* BZ 31932 */
   ink_debug_assert(thread == this_ethread());
@@ -1223,14 +1070,11 @@
     if (e == active_timeout)
 #endif
       e->schedule_in(NET_RETRY_DELAY);
-
     return EVENT_CONT;
   }
-
-  if (e->cancelled) {
-
+  if (e->cancelled)
     return EVENT_DONE;
-  }
+
   int signal_event;
   Event **signal_timeout;
   Continuation *reader_cont = NULL;
@@ -1250,9 +1094,8 @@
     /* BZ 49408 */
     //ink_debug_assert(inactivity_timeout_in);
     //ink_debug_assert(next_inactivity_timeout_at < ink_get_hrtime());
-    if (!inactivity_timeout_in || next_inactivity_timeout_at > ink_get_hrtime()) {
+    if (!inactivity_timeout_in || next_inactivity_timeout_at > ink_get_hrtime())
       return EVENT_CONT;
-    }
     signal_event = VC_EVENT_INACTIVITY_TIMEOUT;
     signal_timeout_at = &next_inactivity_timeout_at;
   }
@@ -1265,27 +1108,24 @@
   *signal_timeout = 0;
   *signal_timeout_at = 0;
   writer_cont = write.vio._cont;
-
+  
   if (closed) {
-    //added by YTS Team, yamsat
     close_UnixNetVConnection(this, thread);
     return EVENT_DONE;
   }
+
   if (read.vio.op == VIO::READ && !(f.shutdown & NET_VC_SHUTDOWN_READ)) {
     reader_cont = read.vio._cont;
-    if (read_signal_and_update(signal_event, this) == EVENT_DONE) {
+    if (read_signal_and_update(signal_event, this) == EVENT_DONE)
       return EVENT_DONE;
-    }
   }
 
   if (!*signal_timeout &&
       !*signal_timeout_at &&
       !closed && write.vio.op == VIO::WRITE &&
-      !(f.shutdown & NET_VC_SHUTDOWN_WRITE) && reader_cont != write.vio._cont && writer_cont == write.vio._cont) {
-    if (write_signal_and_update(signal_event, this) == EVENT_DONE) {
+      !(f.shutdown & NET_VC_SHUTDOWN_WRITE) && reader_cont != write.vio._cont && writer_cont == write.vio._cont)
+    if (write_signal_and_update(signal_event, this) == EVENT_DONE)
       return EVENT_DONE;
-    }
-  }
   return EVENT_DONE;
 }
 
@@ -1320,19 +1160,14 @@
       nh = get_NetHandler(t);
       PollDescriptor *pd = get_PollDescriptor(t);
 
-      closed = 0;               // need to set this before adding it to epoll
-
-      struct epoll_data_ptr *eptr;
-      eptr = (struct epoll_data_ptr *) xmalloc(sizeof(struct epoll_data_ptr));
-      eptr->type = EPOLL_READWRITE_VC;
-      eptr->data.vc = this;
-      ep = eptr;
+      ep.type = EPOLL_READWRITE_VC;
+      ep.data.vc = this;
 
 #if defined(USE_EPOLL)
       struct epoll_event ev;
       memset(&ev, 0, sizeof(struct epoll_event));
       ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
-      ev.data.ptr = eptr;
+      ev.data.ptr = &ep;
       int rval = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, socketFd, &ev);
       if (rval != 0) {
         lerrno = errno;
@@ -1343,8 +1178,8 @@
       }
 #elif defined(USE_KQUEUE)
       struct kevent ev[2];
-      EV_SET(&ev[0], socketFd, EVFILT_READ, EV_ADD, 0, 0, eptr);
-      EV_SET(&ev[1], socketFd, EVFILT_WRITE, EV_ADD, 0, 0, eptr);
+      EV_SET(&ev[0], socketFd, EVFILT_READ, EV_ADD, 0, 0, &ep);
+      EV_SET(&ev[1], socketFd, EVFILT_WRITE, EV_ADD, 0, 0, &ep);
       int rval = kevent(pd->kqueue_fd, &ev[0], 2, NULL, 0, NULL);
       if (rval < 0) {
         lerrno = errno;
@@ -1378,24 +1213,18 @@
   // function code not to be duplicated in the inherited SSL class.
   //  sslStartHandShake (SSL_EVENT_CLIENT, err);
 
-  //added for epoll by YTS Team, yamsat
-
   if (_interface || options.local_port || options.spoof_ip) {
     nh = get_NetHandler(t);
     PollDescriptor *pd = get_PollDescriptor(t);
 
-    struct epoll_data_ptr *eptr;
-    eptr = (struct epoll_data_ptr *) xmalloc(sizeof(struct epoll_data_ptr));
-    eptr->type = EPOLL_READWRITE_VC;
-    eptr->data.vc = this;
-
-    ep = eptr;
+    ep.type = EPOLL_READWRITE_VC;
+    ep.data.vc = this;
 
 #if defined(USE_EPOLL)
     struct epoll_event ev;
     memset(&ev, 0, sizeof(struct epoll_event));
     ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
-    ev.data.ptr = eptr;
+    ev.data.ptr = &ep;
     res = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, con.fd, &ev);
     if (res < 0) {
       Debug("iocore_net", "connectUp : Failed to add to epoll list\n");
@@ -1406,8 +1235,8 @@
     }
 #elif defined(USE_KQUEUE)
     struct kevent ev[2];
-    EV_SET(&ev[0], con.fd, EVFILT_READ, EV_ADD, 0, 0, eptr);
-    EV_SET(&ev[1], con.fd, EVFILT_WRITE, EV_ADD, 0, 0, eptr);
+    EV_SET(&ev[0], con.fd, EVFILT_READ, EV_ADD, 0, 0, &ep);
+    EV_SET(&ev[1], con.fd, EVFILT_WRITE, EV_ADD, 0, 0, &ep);
     res = kevent(pd->kqueue_fd, &ev[0], 2, NULL, 0, NULL);
     if (res < 0) {
       lerrno = errno;
@@ -1421,18 +1250,11 @@
 #endif
   }
 
-  closed = 0;
-
-  Debug("iocore_net", "connectUp : Adding fd %d to read wait list\n", con.fd);
-  nh->wait_list.epoll_addto_read_wait_list(this);
-
-  Debug("iocore_net", "connectUp : Adding fd %d to write wait list\n", con.fd);
-  nh->wait_list.epoll_addto_write_wait_list(this);
+  nh->open_list.enqueue(this);
 
   ink_assert(!inactivity_timeout_in);
   ink_assert(!active_timeout_in);
   action_.continuation->handleEvent(NET_EVENT_OPEN, this);
-  XTIME(printf("%d 2connect\n", id));
   return CONNECT_SUCCESS;
 }
 
@@ -1441,6 +1263,7 @@
 UnixNetVConnection::free(EThread * t)
 {
   NET_DECREMENT_THREAD_DYN_STAT(net_connections_currently_open_stat, t);
+  // clear variables for reuse
   got_remote_addr = 0;
   got_local_addr = 0;
   read.vio.mutex.clear();
@@ -1450,26 +1273,18 @@
   flags = 0;
   accept_port = 0;
   SET_CONTINUATION_HANDLER(this, (NetVConnHandler) & UnixNetVConnection::startEvent);
-  //added for epoll by YTS Team, yamsat
-  if (ep != NULL) {
-    xfree(ep);
-    ep = NULL;
-  }
-  if (nh != NULL) {
-    nh = NULL;
-  }
-  ink_debug_assert(!read.queue && !write.queue);
-  ink_debug_assert(!read.netready_queue && !write.netready_queue);
-  ink_debug_assert(!read.enable_queue && !write.enable_queue);
-  ink_debug_assert(!read.link.prev && !read.link.next);
-  ink_debug_assert(!read.netready_link.prev && !read.netready_link.next);
-  ink_debug_assert(!read.enable_link.prev && !read.enable_link.next);
-  ink_debug_assert(!write.link.prev && !write.link.next);
-  ink_debug_assert(!write.netready_link.prev && !write.netready_link.next);
-  ink_debug_assert(!write.enable_link.prev && !write.enable_link.next);
+  nh = NULL;
+  read.triggered = 0;
+  write.triggered = 0;
+  options.reset();
+  ink_debug_assert(!read.ready_link.prev && !read.ready_link.next);
+  ink_debug_assert(!read.enable_link.next);
+  ink_debug_assert(!write.ready_link.prev && !write.ready_link.next);
+  ink_debug_assert(!write.enable_link.next);
   ink_debug_assert(!link.next && !link.prev);
   ink_debug_assert(!active_timeout);
   ink_debug_assert(con.fd == NO_FD);
   ink_debug_assert(t == this_ethread());
+  closed = 0;
   THREAD_FREE(this, netVCAllocator, t);
 }

Modified: incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPConnection.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPConnection.cc?rev=889011&r1=889010&r2=889011&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPConnection.cc (original)
+++ incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPConnection.cc Wed Dec  9 23:01:29 2009
@@ -35,8 +35,8 @@
 {
   UDPPacketInternal *p = (UDPPacketInternal *) ink_atomiclist_popall(&inQueue);
 
-  if (!m_tobedestroyed)
-    m_tobedestroyed = 1;
+  if (!tobedestroyed)
+    tobedestroyed = 1;
 
   if (p) {
     UDPPacketInternal *pnext = NULL;
@@ -44,7 +44,6 @@
       pnext = p->alink.next;
       p->alink.next = NULL;
       p->free();
-      //delete p;
       p = pnext;
     }
   }
@@ -53,10 +52,10 @@
     callbackAction = NULL;
   }
   Debug("udpnet", "Destroying udp port = %d", getPortNum());
-  if (m_fd != -1) {
-    socketManager.close(m_fd, keSocket);
+  if (fd != -1) {
+    socketManager.close(fd, keSocket);
   }
-  m_fd = -1;
+  fd = -1;
 }
 
 // called with continuation lock taken out
@@ -81,7 +80,7 @@
     if (p) {
       Debug("udpnet", "UDPConnection::callbackHandler");
       UDPPacketInternal *pnext = NULL;
-      Queue<UDPPacketInternal> result;
+      Que(UDPPacketInternal, link) result;
       while (p) {
         pnext = p->alink.next;
         p->alink.next = NULL;
@@ -91,9 +90,8 @@
       if (!shouldDestroy())
         continuation->handleEvent(NET_EVENT_DATAGRAM_READ_READY, &result);
       else {
-        while ((p = result.dequeue())) {
+        while ((p = result.dequeue()))
           p->free();
-        }
       }
     }
   }
@@ -109,7 +107,7 @@
   EThread *t = eventProcessor.assign_thread(ET_UDP);
   ink_assert(t);
   ink_assert(get_UDPNetHandler(t));
-  uc->m_ethread = t;
+  uc->ethread = t;
   AddRef();
   uc->continuation = c;
   mutex = c->mutex;
@@ -134,7 +132,7 @@
   conn->continuation = c;
   ink_assert(conn->continuation != NULL);
   mutex = c->mutex;
-  p->m_reqGenerationNum = conn->m_sendGenerationNum;
-  get_UDPNetHandler(conn->m_ethread)->udpOutQueue->send(p);
+  p->reqGenerationNum = conn->sendGenerationNum;
+  get_UDPNetHandler(conn->ethread)->udpOutQueue.send(p);
   return ACTION_RESULT_NONE;
 }

Modified: incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPNet.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPNet.cc?rev=889011&r1=889010&r2=889011&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPNet.cc (original)
+++ incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPNet.cc Wed Dec  9 23:01:29 2009
@@ -106,28 +106,11 @@
     return -1;
 
   pollCont_offset = eventProcessor.allocate(sizeof(PollCont));
-  udpNetHandler_offset = eventProcessor.allocate(sizeof(NetHandler));
+  udpNetHandler_offset = eventProcessor.allocate(sizeof(UDPNetHandler));
 
   for (int i = 0; i < eventProcessor.n_threads_for_type[ET_UDP]; i++)
     initialize_thread_for_udp_net(eventProcessor.eventthread[ET_UDP][i]);
 
-#if 0
-  unsigned long hoodIpaddr;
-  unsigned char *ip;
-  struct sockaddr_in addr;
-  Action *status;
-
-  ip = (unsigned char *) &hoodIpaddr;
-  ip[0] = 216;
-  ip[1] = 155;
-  ip[2] = 202;
-  ip[3] = 240;
-  G_bwGrapherLoc.sin_family = AF_INET;
-  G_bwGrapherLoc.sin_addr.s_addr = hoodIpaddr;
-  G_bwGrapherLoc.sin_port = 7777;
-  CreateUDPSocket(&G_bwGrapherFd, &addr, &status, 0, 0, 65536, 65536);
-#endif
-
   return 0;
 }
 
@@ -138,21 +121,6 @@
   (void) thread;
   UnixUDPConnection *uc = (UnixUDPConnection *) xuc;
 
-//epoll changes 
-/*
-  int i = uc->getPollvecIndex();
-  if (pd && i >= 0) {
-    Pollfd * pfd = &pd->pfd[i];
-    uc->clearPollvecIndex();
-    if (!(pfd->revents & POLLIN)) { // not ready for read
-      return;
-    }
-  } else {
-    uc->clearPollvecIndex();
-    return;
-  }
- */
-//epoll changes ends here
   // receive packet and queue onto UDPConnection.
   // don't call back connection at this time.
   int r;
@@ -187,7 +155,7 @@
     ink_assert(uc->callback_link.next == NULL);
     ink_assert(uc->callback_link.prev == NULL);
     uc->AddRef();
-    nh->udp_callbacks->enqueue(uc, uc->callback_link);
+    nh->udp_callbacks.enqueue(uc);
     uc->onCallbackQueue = 1;
   }
 }
@@ -610,7 +578,7 @@
 
   worker->init(cont, nPairs, myIP, destIP, send_bufsize, recv_bufsize);
   eventProcessor.schedule_imm(worker, ET_UDP);
-  return &(worker->m_action);
+  return &(worker->action);
 }
 
 
@@ -689,28 +657,28 @@
   int i;
   UDPConnectionInternal *p = (UDPConnectionInternal *) udpConn;
 
-  if (G_inkPipeInfo.m_numPipes == 0) {
-    p->m_pipe_class = 0;
+  if (G_inkPipeInfo.numPipes == 0) {
+    p->pipe_class = 0;
     return;
   }
-  p->m_pipe_class = -1;
+  p->pipe_class = -1;
   // find a match: 0 is best-effort
-  for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++)
-    if (G_inkPipeInfo.m_perPipeInfo[i].m_destIP == destIP)
-      p->m_pipe_class = i;
+  for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++)
+    if (G_inkPipeInfo.perPipeInfo[i].destIP == destIP)
+      p->pipe_class = i;
   // no match; set it to the destIP=0 class
-  if (p->m_pipe_class == -1) {
-    for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++)
-      if (G_inkPipeInfo.m_perPipeInfo[i].m_destIP == 0) {
-        p->m_pipe_class = i;
+  if (p->pipe_class == -1) {
+    for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++)
+      if (G_inkPipeInfo.perPipeInfo[i].destIP == 0) {
+        p->pipe_class = i;
         break;
       }
   }
-  Debug("udpnet-pipe", "Pipe class = %d", p->m_pipe_class);
-  ink_debug_assert(p->m_pipe_class != -1);
-  if (p->m_pipe_class == -1)
-    p->m_pipe_class = 0;
-  G_inkPipeInfo.m_perPipeInfo[p->m_pipe_class].m_count++;
+  Debug("udpnet-pipe", "Pipe class = %d", p->pipe_class);
+  ink_debug_assert(p->pipe_class != -1);
+  if (p->pipe_class == -1)
+    p->pipe_class = 0;
+  G_inkPipeInfo.perPipeInfo[p->pipe_class].count++;
 }
 
 Action *
@@ -788,24 +756,24 @@
   UDPConnectionInternal *udpIntConn = (UDPConnectionInternal *) udpConn;
   ink64 desiredbps = (ink64) (desiredMbps * 1024.0 * 1024.0);
 
-  if (G_inkPipeInfo.m_numPipes == 0) {
-    udpIntConn->m_flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
+  if (G_inkPipeInfo.numPipes == 0) {
+    udpIntConn->flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
     return true;
   }
 
-  if ((udpIntConn->m_pipe_class == 0) ||
-      (G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc + desiredbps >
-       G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwLimit)) {
+  if ((udpIntConn->pipe_class == 0) ||
+      (G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc + desiredbps >
+       G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwLimit)) {
     Debug("udpnet-admit", "Denying flow with %lf Mbps", desiredMbps);
     return false;
   }
-  udpIntConn->m_flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
-  udpIntConn->m_allocedbps = desiredbps;
-  ink_atomic_increment64(&G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc, desiredbps);
+  udpIntConn->flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
+  udpIntConn->allocedbps = desiredbps;
+  ink_atomic_increment64(&G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc, desiredbps);
   Debug("udpnet-admit", "Admitting flow with %lf Mbps (a=%lld, lim=%lld)",
         desiredMbps,
-        G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc,
-        G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwLimit);
+        G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc,
+        G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwLimit);
   return true;
 }
 
@@ -814,27 +782,27 @@
 {
   UDPConnectionInternal *udpIntConn = (UDPConnectionInternal *) udpConn;
   ink64 desiredbps = (ink64) (desiredMbps * 1024.0 * 1024.0);
-  ink64 oldbps = (ink64) (udpIntConn->m_flowRateBps * 8.0);
+  ink64 oldbps = (ink64) (udpIntConn->flowRateBps * 8.0);
 
-  if (G_inkPipeInfo.m_numPipes == 0) {
-    udpIntConn->m_flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
+  if (G_inkPipeInfo.numPipes == 0) {
+    udpIntConn->flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
     return true;
   }
   // arithmetic here is in bits-per-sec.
-  if ((udpIntConn->m_pipe_class == 0) ||
-      (G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc +
-       desiredbps - oldbps) > G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwLimit) {
+  if ((udpIntConn->pipe_class == 0) ||
+      (G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc +
+       desiredbps - oldbps) > G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwLimit) {
     Debug("udpnet-admit", "Unable to change b/w for flow to %lf Mbps", desiredMbps);
     return false;
   }
-  udpIntConn->m_flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
-  udpIntConn->m_allocedbps = desiredbps;
-  ink_atomic_increment64(&G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc, desiredbps - oldbps);
+  udpIntConn->flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
+  udpIntConn->allocedbps = desiredbps;
+  ink_atomic_increment64(&G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc, desiredbps - oldbps);
   Debug("udpnet-admit", "Changing flow's b/w from %lf Mbps to %lf Mbps (a=%lld, lim=%lld)",
         (double) oldbps / (1024.0 * 1024.0),
         desiredMbps,
-        G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc,
-        G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwLimit);
+        G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc,
+        G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwLimit);
   return true;
 }
 
@@ -844,24 +812,24 @@
   UDPConnectionInternal *udpIntConn = (UDPConnectionInternal *) udpConn;
   ink64 bps;
 
-  if (G_inkPipeInfo.m_numPipes == 0)
+  if (G_inkPipeInfo.numPipes == 0)
     return;
 
-  Debug("udpnet-free", "Trying to releasing %lf (%lld) Kbps", udpIntConn->m_flowRateBps, udpIntConn->m_allocedbps);
+  Debug("udpnet-free", "Trying to releasing %lf (%lld) Kbps", udpIntConn->flowRateBps, udpIntConn->allocedbps);
 
-  bps = udpIntConn->m_allocedbps;
+  bps = udpIntConn->allocedbps;
   if (bps <= 0)
     return;
 
-  ink_atomic_increment64(&G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc, -bps);
+  ink_atomic_increment64(&G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc, -bps);
 
   Debug("udpnet-free", "Releasing %lf Kbps", bps / 1024.0);
 
-  if (G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc < 0)
-    G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc = 0;
+  if (G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc < 0)
+    G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc = 0;
 
-  udpIntConn->m_flowRateBps = 0.0;
-  udpIntConn->m_allocedbps = 0;
+  udpIntConn->flowRateBps = 0.0;
+  udpIntConn->allocedbps = 0;
 }
 
 double
@@ -870,36 +838,33 @@
   int i;
   double usedBw = 0.0;
 
-  if (G_inkPipeInfo.m_numPipes == 0)
+  if (G_inkPipeInfo.numPipes == 0)
     // return 100Mbps if there are no pipes
     return 100.0;
 
-  for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++) {
-    usedBw += G_inkPipeInfo.m_perPipeInfo[i].m_bwUsed;
+  for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
+    usedBw += G_inkPipeInfo.perPipeInfo[i].bwUsed;
   }
-  return G_inkPipeInfo.m_interfaceMbps - usedBw;
+  return G_inkPipeInfo.interfaceMbps - usedBw;
 }
 
 // send out all packets that need to be sent out as of time=now
-UDPQueue::UDPQueue(InkAtomicList * pQueue)
-:m_atomicQueue(pQueue)
-  , m_last_report(0)
-  , m_last_service(0)
-  , m_last_byteperiod(0)
-  , m_bytesSent(0)
-  , m_packets(0)
-  , m_added(0)
+UDPQueue::UDPQueue()
+: last_report(0)
+, last_service(0)
+, last_byteperiod(0)
+, bytesSent(0)
+, packets(0)
+, added(0)
 {
-
 }
 
 UDPQueue::~UDPQueue()
 {
   UDPPacketInternal *p;
 
-  while ((p = m_reliabilityPktQueue.dequeue()) != NULL) {
+  while ((p = reliabilityPktQueue.dequeue()) != NULL)
     p->free();
-  }
 }
 
 /*
@@ -926,11 +891,11 @@
   schedJitter += ink_hrtime_to_msec(now - lastSchedTime);
   numTimesSched++;
 
-  p = (UDPPacketInternal *) ink_atomiclist_popall(m_atomicQueue);
+  p = (UDPPacketInternal *) ink_atomiclist_popall(&atomicQueue);
   if (p) {
 
     UDPPacketInternal *pnext = NULL;
-    Queue<UDPPacketInternal> stk;
+    Que(UDPPacketInternal, link) stk;
 
     while (p) {
       pnext = p->alink.next;
@@ -943,49 +908,49 @@
       p = stk.pop();
       ink_assert(p->link.prev == NULL);
       ink_assert(p->link.next == NULL);
-      if (p->m_isReliabilityPkt) {
-        m_reliabilityPktQueue.enqueue(p);
+      if (p->isReliabilityPkt) {
+        reliabilityPktQueue.enqueue(p);
         continue;
       }
       // insert into our queue.
       Debug("udp-send", "Adding 0x%x", p);
-      addToGuaranteedQ = ((p->m_conn->m_pipe_class > 0) && (p->m_conn->m_flowRateBps > 10.0));
+      addToGuaranteedQ = ((p->conn->pipe_class > 0) && (p->conn->flowRateBps > 10.0));
       pktLen = p->getPktLength();
-      if (p->m_conn->m_lastPktStartTime == 0) {
-        p->m_pktSendStartTime = MAX(now, p->m_delivery_time);
+      if (p->conn->lastPktStartTime == 0) {
+        p->pktSendStartTime = MAX(now, p->delivery_time);
       } else {
         pktSize = MAX(INK_ETHERNET_MTU_SIZE, pktLen);
         if (addToGuaranteedQ) {
           // NOTE: this is flow rate in Bytes per sec.; convert to milli-sec.
-          minPktSpacing = 1000.0 / (p->m_conn->m_flowRateBps / p->m_conn->m_avgPktSize);
+          minPktSpacing = 1000.0 / (p->conn->flowRateBps / p->conn->avgPktSize);
 
-          pktSendTime = p->m_conn->m_lastPktStartTime + ink_hrtime_from_msec((inku32) minPktSpacing);
+          pktSendTime = p->conn->lastPktStartTime + ink_hrtime_from_msec((inku32) minPktSpacing);
         } else {
           minPktSpacing = 0.0;
-          pktSendTime = p->m_delivery_time;
+          pktSendTime = p->delivery_time;
         }
-        p->m_pktSendStartTime = MAX(MAX(now, pktSendTime), p->m_delivery_time);
-        if (p->m_conn->m_flowRateBps > 25600.0)
+        p->pktSendStartTime = MAX(MAX(now, pktSendTime), p->delivery_time);
+        if (p->conn->flowRateBps > 25600.0)
           Debug("udpnet-pkt", "Pkt size = %.1lf now = %lld, send = %lld, del = %lld, Delay delta = %lld; delta = %lld",
-                p->m_conn->m_avgPktSize,
-                now, pktSendTime, p->m_delivery_time,
-                ink_hrtime_to_msec(p->m_pktSendStartTime - now),
-                ink_hrtime_to_msec(p->m_pktSendStartTime - p->m_conn->m_lastPktStartTime));
+                p->conn->avgPktSize,
+                now, pktSendTime, p->delivery_time,
+                ink_hrtime_to_msec(p->pktSendStartTime - now),
+                ink_hrtime_to_msec(p->pktSendStartTime - p->conn->lastPktStartTime));
 
-        p->m_conn->m_avgPktSize = ((4.0 * p->m_conn->m_avgPktSize) / 5.0) + (pktSize / 5.0);
+        p->conn->avgPktSize = ((4.0 * p->conn->avgPktSize) / 5.0) + (pktSize / 5.0);
       }
-      p->m_conn->m_lastPktStartTime = p->m_pktSendStartTime;
-      p->m_delivery_time = p->m_pktSendStartTime;
-      p->m_conn->m_nBytesTodo += pktLen;
+      p->conn->lastPktStartTime = p->pktSendStartTime;
+      p->delivery_time = p->pktSendStartTime;
+      p->conn->nBytesTodo += pktLen;
 
       g_udp_bytesPending += pktLen;
 
       if (addToGuaranteedQ)
-        G_inkPipeInfo.m_perPipeInfo[p->m_conn->m_pipe_class].m_queue->addPacket(p, now);
+        G_inkPipeInfo.perPipeInfo[p->conn->pipe_class].queue->addPacket(p, now);
       else {
         // stick in the best-effort queue: either it was a best-effort flow or
         // the thingy wasn't alloc'ed bandwidth
-        G_inkPipeInfo.m_perPipeInfo[0].m_queue->addPacket(p, now);
+        G_inkPipeInfo.perPipeInfo[0].queue->addPacket(p, now);
       }
     }
   }
@@ -998,8 +963,8 @@
     lastPrintTime = now;
   }
 
-  for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++)
-    G_inkPipeInfo.m_perPipeInfo[i].m_queue->advanceNow(now);
+  for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++)
+    G_inkPipeInfo.perPipeInfo[i].queue->advanceNow(now);
 
   if (G_bulkIOState) {
     BulkIOSend();
@@ -1007,9 +972,9 @@
     SendPackets();
   }
 
-  timeSpent = ink_hrtime_to_msec(now - m_last_report);
+  timeSpent = ink_hrtime_to_msec(now - last_report);
   if (timeSpent > 10000) {
-    // if (m_bytesSent > 0)
+    // if (bytesSent > 0)
     // timespent is in milli-seconds
     char temp[2048], *p1;
     char bwMessage[2048];
@@ -1020,45 +985,45 @@
     bwMessage[0] = '\0';
     p1 = temp;
 
-    if (m_bytesSent > 0)
-      totalBw = (m_bytesSent * 8.0 * 1000.0) / (timeSpent * 1024.0 * 1024.0);
+    if (bytesSent > 0)
+      totalBw = (bytesSent * 8.0 * 1000.0) / (timeSpent * 1024.0 * 1024.0);
     else
       totalBw = 1.0;
 
-    for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++) {
+    for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
       // bw is in Mbps 
-      bw = (G_inkPipeInfo.m_perPipeInfo[i].m_bytesSent * 8.0 * 1000.0) / (timeSpent * 1024.0 * 1024.0);
+      bw = (G_inkPipeInfo.perPipeInfo[i].bytesSent * 8.0 * 1000.0) / (timeSpent * 1024.0 * 1024.0);
       snprintf(p1, sizeof(temp), "\t class[%d] = %f Mbps, alloc = %f Mbps, (conf'ed = %f, got = %f) \n",
-               i, bw, (G_inkPipeInfo.m_perPipeInfo[i].m_bwAlloc / (1024.0 * 1024.0)),
-               G_inkPipeInfo.m_perPipeInfo[i].m_wt, bw / totalBw);
+               i, bw, (G_inkPipeInfo.perPipeInfo[i].bwAlloc / (1024.0 * 1024.0)),
+               G_inkPipeInfo.perPipeInfo[i].wt, bw / totalBw);
       p1 += strlen(p1);
 
-      ip = (unsigned char *) &(G_inkPipeInfo.m_perPipeInfo[i].m_destIP);
+      ip = (unsigned char *) &(G_inkPipeInfo.perPipeInfo[i].destIP);
 #if 0
       if (i == 0)
-        sprintf(bwMessage, "%d mixt Best-Effort %f %f\n", time(0), bw, bw / G_inkPipeInfo.m_interfaceMbps);
+        sprintf(bwMessage, "%d mixt Best-Effort %f %f\n", time(0), bw, bw / G_inkPipeInfo.interfaceMbps);
       else
         sprintf(bwMessage, "%d mixt %d.%d.%d.%d %f %f\n",
-                time(0), ip[0], ip[1], ip[2], ip[3], bw, bw / G_inkPipeInfo.m_interfaceMbps);
+                time(0), ip[0], ip[1], ip[2], ip[3], bw, bw / G_inkPipeInfo.interfaceMbps);
 
       ::sendto(G_bwGrapherFd, bwMessage, strlen(bwMessage), 0,
                (struct sockaddr *) &G_bwGrapherLoc, sizeof(struct sockaddr_in));
 #endif
       // use a weighted estimator of current usage
-      G_inkPipeInfo.m_perPipeInfo[i].m_bwUsed = (4.0 * G_inkPipeInfo.m_perPipeInfo[i].m_bwUsed / 5.0) + (bw / 5.0);
-      G_inkPipeInfo.m_perPipeInfo[i].m_bytesSent = 0;
-      G_inkPipeInfo.m_perPipeInfo[i].m_pktsSent = 0;
+      G_inkPipeInfo.perPipeInfo[i].bwUsed = (4.0 * G_inkPipeInfo.perPipeInfo[i].bwUsed / 5.0) + (bw / 5.0);
+      G_inkPipeInfo.perPipeInfo[i].bytesSent = 0;
+      G_inkPipeInfo.perPipeInfo[i].pktsSent = 0;
     }
     if (temp[0])
       Debug("udpnet-bw", "B/w: %f Mbps; breakdown: \n%s", totalBw, temp);
 
 
-    m_bytesSent = 0;
-    m_last_report = now;
-    m_added = 0;
-    m_packets = 0;
+    bytesSent = 0;
+    last_report = now;
+    added = 0;
+    packets = 0;
   }
-  m_last_service = now;
+  last_service = now;
 }
 
 void
@@ -1075,28 +1040,28 @@
   ink32 pktLen;
   ink_hrtime timeDelta = 0;
 
-  if (now > m_last_service)
-    timeDelta = ink_hrtime_to_msec(now - m_last_service);
+  if (now > last_service)
+    timeDelta = ink_hrtime_to_msec(now - last_service);
 
-  if (G_inkPipeInfo.m_numPipes > 0) {
-    bytesThisSlot = (ink32) (((G_inkPipeInfo.m_reliabilityMbps * 1024.0 * 1024.0) / (8.0 * 1000.0)) * timeDelta);
+  if (G_inkPipeInfo.numPipes > 0) {
+    bytesThisSlot = (ink32) (((G_inkPipeInfo.reliabilityMbps * 1024.0 * 1024.0) / (8.0 * 1000.0)) * timeDelta);
     if (bytesThisSlot == 0) {
       // use at most 10% for reliability
-      bytesThisSlot = (ink32) (((G_inkPipeInfo.m_interfaceMbps * 1024.0 * 1024.0) / (8.0 * 1000.0)) * timeDelta * 0.1);
+      bytesThisSlot = (ink32) (((G_inkPipeInfo.interfaceMbps * 1024.0 * 1024.0) / (8.0 * 1000.0)) * timeDelta * 0.1);
       reliabilityBytes = bytesThisSlot;
     }
   }
 
-  while ((p = m_reliabilityPktQueue.dequeue()) != NULL) {
+  while ((p = reliabilityPktQueue.dequeue()) != NULL) {
     pktLen = p->getPktLength();
     g_udp_bytesPending -= pktLen;
 
-    p->m_conn->m_nBytesTodo -= pktLen;
-    p->m_conn->m_nBytesDone += pktLen;
+    p->conn->nBytesTodo -= pktLen;
+    p->conn->nBytesDone += pktLen;
 
-    if (p->m_conn->shouldDestroy())
+    if (p->conn->shouldDestroy())
       goto next_pkt_3;
-    if (p->m_conn->GetSendGenerationNumber() != p->m_reqGenerationNum)
+    if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum)
       goto next_pkt_3;
 
     SendUDPPacket(p, pktLen);
@@ -1108,8 +1073,8 @@
   }
 
 
-  if (G_inkPipeInfo.m_numPipes > 0)
-    bytesThisSlot = (ink32) (((G_inkPipeInfo.m_interfaceMbps * 1024.0 * 1024.0) /
+  if (G_inkPipeInfo.numPipes > 0)
+    bytesThisSlot = (ink32) (((G_inkPipeInfo.interfaceMbps * 1024.0 * 1024.0) /
                               (8.0 * 1000.0)) * timeDelta - reliabilityBytes);
   else
     bytesThisSlot = INT_MAX;
@@ -1117,21 +1082,21 @@
 sendPackets:
   sentOne = false;
   send_threshold_time = now + SLOT_TIME;
-  for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++) {
-    bytesThisPipe = (ink32) (bytesThisSlot * G_inkPipeInfo.m_perPipeInfo[i].m_wt);
-    while ((bytesThisPipe > 0) && (G_inkPipeInfo.m_perPipeInfo[i].m_queue->firstPacket(send_threshold_time))) {
-      p = G_inkPipeInfo.m_perPipeInfo[i].m_queue->getFirstPacket();
+  for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
+    bytesThisPipe = (ink32) (bytesThisSlot * G_inkPipeInfo.perPipeInfo[i].wt);
+    while ((bytesThisPipe > 0) && (G_inkPipeInfo.perPipeInfo[i].queue->firstPacket(send_threshold_time))) {
+      p = G_inkPipeInfo.perPipeInfo[i].queue->getFirstPacket();
       pktLen = p->getPktLength();
       g_udp_bytesPending -= pktLen;
 
-      p->m_conn->m_nBytesTodo -= pktLen;
-      p->m_conn->m_nBytesDone += pktLen;
-      if (p->m_conn->shouldDestroy())
+      p->conn->nBytesTodo -= pktLen;
+      p->conn->nBytesDone += pktLen;
+      if (p->conn->shouldDestroy())
         goto next_pkt;
-      if (p->m_conn->GetSendGenerationNumber() != p->m_reqGenerationNum)
+      if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum)
         goto next_pkt;
 
-      G_inkPipeInfo.m_perPipeInfo[i].m_bytesSent += pktLen;
+      G_inkPipeInfo.perPipeInfo[i].bytesSent += pktLen;
       SendUDPPacket(p, pktLen);
       bytesUsed += pktLen;
       bytesThisPipe -= pktLen;
@@ -1149,9 +1114,9 @@
   if ((bytesThisSlot > 0) && (sentOne)) {
     // redistribute the slack...
     now = ink_get_hrtime_internal();
-    for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++) {
-      if (G_inkPipeInfo.m_perPipeInfo[i].m_queue->firstPacket(now) == NULL) {
-        G_inkPipeInfo.m_perPipeInfo[i].m_queue->advanceNow(now);
+    for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
+      if (G_inkPipeInfo.perPipeInfo[i].queue->firstPacket(now) == NULL) {
+        G_inkPipeInfo.perPipeInfo[i].queue->advanceNow(now);
       }
     }
     goto sendPackets;
@@ -1161,8 +1126,8 @@
       (now - lastCleanupTime > ink_hrtime_from_sec(g_udp_periodicFreeCancelledPkts))) {
     inku64 nbytes = g_udp_bytesPending;
     ink_hrtime startTime = ink_get_hrtime_internal(), endTime;
-    for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++) {
-      G_inkPipeInfo.m_perPipeInfo[i].m_queue->FreeCancelledPackets(g_udp_periodicCleanupSlots);
+    for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
+      G_inkPipeInfo.perPipeInfo[i].queue->FreeCancelledPackets(g_udp_periodicCleanupSlots);
     }
     endTime = ink_get_hrtime_internal();
     Debug("udp-pending-packets", "Did cleanup of %d buckets: %lld bytes in %d m.sec",
@@ -1180,20 +1145,20 @@
   int real_len = 0;
   int n, count, iov_len = 0;
 
-  if (!p->m_isReliabilityPkt) {
-    p->m_conn->SetLastSentPktTSSeqNum(p->m_pktTSSeqNum);
-    p->m_conn->m_lastSentPktStartTime = p->m_delivery_time;
+  if (!p->isReliabilityPkt) {
+    p->conn->SetLastSentPktTSSeqNum(p->pktTSSeqNum);
+    p->conn->lastSentPktStartTime = p->delivery_time;
   }
 
   Debug("udp-send", "Sending 0x%x", p);
   msg.msg_control = 0;
   msg.msg_controllen = 0;
   msg.msg_flags = 0;
-  msg.msg_name = (caddr_t) & p->m_to;
-  msg.msg_namelen = sizeof(p->m_to);
+  msg.msg_name = (caddr_t) & p->to;
+  msg.msg_namelen = sizeof(p->to);
   iov_len = 0;
-  m_bytesSent += pktLen;
-  for (b = p->m_chain; b != NULL; b = b->next) {
+  bytesSent += pktLen;
+  for (b = p->chain; b != NULL; b = b->next) {
     iov[iov_len].iov_base = (caddr_t) b->start();
     iov[iov_len].iov_len = b->size();
     real_len += iov[iov_len].iov_len;
@@ -1205,7 +1170,7 @@
   count = 0;
   while (1) {
     // stupid Linux problem: sendmsg can return EAGAIN
-    n =::sendmsg(p->m_conn->getFd(), &msg, 0);
+    n =::sendmsg(p->conn->getFd(), &msg, 0);
     if ((n >= 0) || ((n < 0) && (errno != EAGAIN)))
       // send succeeded or some random error happened.
       break;
@@ -1235,13 +1200,13 @@
   ink_hrtime now = ink_get_hrtime_internal();
   ink_hrtime send_threshold_time = now + SLOT_TIME;
 
-  for (int i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++) {
-    while (p = G_inkPipeInfo.m_perPipeInfo[i].m_queue->firstPacket(send_threshold_time)) {
-      p = G_inkPipeInfo.m_perPipeInfo[i].m_queue->getFirstPacket();
+  for (int i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
+    while (p = G_inkPipeInfo.perPipeInfo[i].queue->firstPacket(send_threshold_time)) {
+      p = G_inkPipeInfo.perPipeInfo[i].queue->getFirstPacket();
       sentOne = true;
       Debug("bulk-io-pkt", "Adding a packet...");
-      BulkIOAddPkt(G_bulkIOState, &G_bulkIOAggregator, p, p->m_conn->getPortNum());
-      m_bytesSent += p->getPktLength();
+      BulkIOAddPkt(G_bulkIOState, &G_bulkIOAggregator, p, p->conn->getPortNum());
+      bytesSent += p->getPktLength();
       // Now the packet is "sent"; get rid of it
       p->free();
     }
@@ -1256,34 +1221,16 @@
 UDPQueue::send(UDPPacket * p)
 {
   // XXX: maybe fastpath for immediate send?
-//  Debug("udpnet","add packet %x, delivery=%lld",p,p->m_delivery_time);
-  ink_atomiclist_push(m_atomicQueue, p);
-  //m_queue->addPacket(p,ink_get_hrtime());
+  ink_atomiclist_push(&atomicQueue, p);
 }
 
 #undef LINK
 
 UDPNetHandler::UDPNetHandler()
 {
-
   mutex = new_ProxyMutex();
-
-  //pollDescriptor = (PollDescriptor*)xmalloc(sizeof(PollDescriptor));
-  //pollDescriptor->init();
-  udpConnections = (UnixUDPConnection **) xmalloc(sizeof(UnixUDPConnection *) * MAX_UDP_CONNECTION);
-  int i;
-  for (i = 0; i < MAX_UDP_CONNECTION; i++) {
-    udpConnections[i] = NULL;
-  }
-
-  ink_atomiclist_init(&udpAtomicQueue, "Outgoing UDP Packet queue", (uintptr_t) &((UDPPacketInternal *) 0)->alink.next);
-
-  ink_atomiclist_init(&udpNewConnections, "UDP Connection queue", (uintptr_t)
-                      &((UnixUDPConnection *) 0)->newconn_alink.next);
-
-  udpOutQueue = NEW(new UDPQueue(&udpAtomicQueue));
-  udp_polling = NEW(new Queue<UnixUDPConnection> ());
-  udp_callbacks = NEW(new Queue<UnixUDPConnection> ());
+  ink_atomiclist_init(&udpOutQueue.atomicQueue, "Outgoing UDP Packet queue", offsetof(UDPPacketInternal, alink.next));
+  ink_atomiclist_init(&udpNewConnections, "UDP Connection queue", offsetof(UnixUDPConnection, newconn_alink.next));
   nextCheck = ink_get_hrtime_internal() + HRTIME_MSECONDS(1000);
   lastCheck = 0;
   SET_HANDLER((UDPNetContHandler) & UDPNetHandler::startNetEvent);
@@ -1294,23 +1241,18 @@
 {
   (void) event;
   SET_HANDLER((UDPNetContHandler) & UDPNetHandler::mainNetEvent);
-  e->schedule_every(-HRTIME_MSECONDS(9));
   trigger_event = e;
+  e->schedule_every(-HRTIME_MSECONDS(9));
   return EVENT_CONT;
 }
+
 inline PollDescriptor *
 UDPNetHandler::build_one_udpread_poll(int fd, UnixUDPConnection * uc, PollDescriptor * pd)
 {
-  int i;
-
   // XXX: just hack until figure things out
   ink_assert(uc->getFd() > 0);
-  Pollfd *pfd;
-  pfd = pd->alloc();
+  Pollfd *pfd = pd->alloc();
   pfd->fd = fd;
-  i = pfd - pd->pfd;
-  uc->setPollvecIndex(i);
-  udpConnections[i] = uc;
   pfd->events = POLLIN;
   pfd->revents = 0;
   return pd;
@@ -1320,10 +1262,9 @@
 UDPNetHandler::build_poll(PollDescriptor * pd)
 {
   // build read poll for UDP connections.
-  UnixUDPConnection *uc;
   ink_assert(pd->empty());
   int i = 0;
-  for (uc = udp_polling->head; uc; uc = uc->polling_link.next) {
+  forl_LL(UnixUDPConnection, uc, udp_polling) {
     if (uc->recvActive) {
       pd = build_one_udpread_poll(uc->getFd(), uc, pd);
       i++;
@@ -1344,46 +1285,14 @@
 
   PollCont *pc = get_UDPPollCont(e->ethread);
 
-  //changed by YTS Team, yamsat 
-  // pick up new UDP connections for servicing
-  /* if (!INK_ATOMICLIST_EMPTY(udpNewConnections)) {
-     UnixUDPConnection *c = 
-     (UnixUDPConnection *)ink_atomiclist_popall(&udpNewConnections);
-     if (c) {
-     UnixUDPConnection *cnext = NULL;
-     while (c) {
-     if (c->shouldDestroy()) {
-     cnext = c->newconn_alink.next;
-     c->newconn_alink.next = NULL;
-     if (G_inkPipeInfo.m_numPipes > 0)
-     G_inkPipeInfo.m_perPipeInfo[c->m_pipe_class].m_count--;
-     c->Release();
-     c = cnext;
-     continue;
-     }
-     ink_assert(!c->mutex == !c->continuation);
-     cnext = c->newconn_alink.next;
-     c->newconn_alink.next = NULL;
-     ink_assert(c->polling_link.next ==NULL);
-     ink_assert(c->polling_link.prev ==NULL);
-     ink_assert(c->m_ethread == trigger_event->ethread);
-     c->setEthread(trigger_event->ethread);
-     //udp_polling->enqueue(c,c->polling_link);
-     c = cnext;
-     }
-     }
-     } */
-
   // handle UDP outgoing engine
-  udpOutQueue->service(this);
+  udpOutQueue.service(this);
 
   // handle UDP read operations
   UnixUDPConnection *uc, *next;
   int i;
   int nread = 0;
-//epoll changes
 
-  //changed by YTS Team, yamsat 
   struct epoll_data_ptr *temp_eptr = NULL;
   for (i = 0; i < pc->pollDescriptor->result; i++) {
     temp_eptr = (struct epoll_data_ptr *) get_ev_data(pc->pollDescriptor,i);
@@ -1391,7 +1300,7 @@
         && temp_eptr->type == EPOLL_UDP_CONNECTION) {
       uc = temp_eptr->data.uc;
       ink_assert(uc && uc->mutex && uc->continuation);
-      ink_assert(uc->m_refcount >= 1);
+      ink_assert(uc->refcount >= 1);
       if (uc->shouldDestroy()) {
         // udp_polling->remove(uc,uc->polling_link);
         uc->Release();
@@ -1402,34 +1311,16 @@
     }                           //if EPOLLIN        
   }                             //end for
 
-//epoll changes ends here
-
-/*
-  for (i = 0; i <pc->pollDescriptor->nfds; i++) {
-    if (pc->pollDescriptor->pfd[i].revents & POLLIN) {
-      uc = udpConnections[i];
-      ink_assert(uc && uc->mutex && uc->continuation);
-      ink_assert(uc->m_refcount >= 1);
-      if (uc->shouldDestroy()) {
-	udp_polling->remove(uc,uc->polling_link);
-	uc->Release();
-      } else {
-	udpNetInternal.udp_read_from_net(this,uc,pc->pollDescriptor,trigger_event->ethread);
-	nread++;
-      }
-    }
-  } */
-
   // remove dead UDP connections
   ink_hrtime now = ink_get_hrtime_internal();
   if (now >= nextCheck) {
-    for (uc = udp_polling->head; uc; uc = next) {
+    for (uc = udp_polling.head; uc; uc = next) {
       ink_assert(uc->mutex && uc->continuation);
-      ink_assert(uc->m_refcount >= 1);
+      ink_assert(uc->refcount >= 1);
       next = uc->polling_link.next;
       if (uc->shouldDestroy()) {
-        if (G_inkPipeInfo.m_numPipes > 0)
-          G_inkPipeInfo.m_perPipeInfo[uc->m_pipe_class].m_count--;
+        if (G_inkPipeInfo.numPipes > 0)
+          G_inkPipeInfo.perPipeInfo[uc->pipe_class].count--;
         //changed by YTS Team, yamsat
         //udp_polling->remove(uc,uc->polling_link);
         uc->Release();
@@ -1438,15 +1329,15 @@
     nextCheck = ink_get_hrtime_internal() + HRTIME_MSECONDS(1000);
   }
   // service UDPConnections with data ready for callback.
-  Queue<UnixUDPConnection> q = *udp_callbacks;
-  udp_callbacks->clear();
-  while ((uc = q.dequeue(q.head, q.head->callback_link))) {
+  Que(UnixUDPConnection, callback_link) q = udp_callbacks;
+  udp_callbacks.clear();
+  while ((uc = q.dequeue())) {
     ink_assert(uc->mutex && uc->continuation);
     if (udpNetInternal.udp_callback(this, uc, trigger_event->ethread)) {        // not successful
       // schedule on a thread of its own.
       ink_assert(uc->callback_link.next == NULL);
       ink_assert(uc->callback_link.prev == NULL);
-      udp_callbacks->enqueue(uc, uc->callback_link);
+      udp_callbacks.enqueue(uc);
     } else {
       ink_assert(uc->callback_link.next == NULL);
       ink_assert(uc->callback_link.prev == NULL);
@@ -1455,7 +1346,6 @@
     }
   }
 
-  //changed by YTS Team, yamsat 
   return EVENT_CONT;
 }
 
@@ -1471,14 +1361,14 @@
                           unsigned int my_ip, unsigned int dest_ip, int s_bufsize, int r_bufsize)
 {
   mutex = c->mutex;
-  m_cont = c;
-  m_action = c;
-  m_numPairs = numPairs;
-  m_myIP = my_ip;
-  m_destIP = dest_ip;
-  m_sendbufsize = s_bufsize;
-  m_recvbufsize = r_bufsize;
-  m_udpConns = NULL;
+  cont = c;
+  action = c;
+  numPairs = numPairs;
+  myIP = my_ip;
+  destIP = dest_ip;
+  sendbufsize = s_bufsize;
+  recvbufsize = r_bufsize;
+  udpConns = NULL;
   SET_HANDLER((UDPWorkContinuation_Handler) & UDPWorkContinuation::StateCreatePortPairs);
 }
 
@@ -1486,7 +1376,7 @@
 UDPWorkContinuation::StateCreatePortPairs(int event, void *data)
 {
 //  int res = 0;
-  int numUdpPorts = 2 * m_numPairs;
+  int numUdpPorts = 2 * numPairs;
   int fd1 = -1, fd2 = -1;
 //  struct sockaddr_in bind_sa;
   struct sockaddr_in myaddr1, myaddr2;
@@ -1497,14 +1387,12 @@
   Action *status;
   //epoll changes
 
-  //added by YTS Team, yamsat 
-  struct epoll_data_ptr *eptr = NULL;
   PollCont *pc = NULL;
   //epoll changes ends here
   ink_debug_assert(mutex->thread_holding == this_ethread());
 
-  if (m_action.cancelled) {
-    m_action = NULL;
+  if (action.cancelled) {
+    action = NULL;
     mutex = NULL;
     udpWorkContinuationAllocator.free(this);
     return EVENT_CONT;
@@ -1512,9 +1400,9 @@
 
   startTime = ink_get_hrtime_internal();
 
-  m_udpConns = NEW(new UnixUDPConnection *[numUdpPorts]);
+  udpConns = NEW(new UnixUDPConnection *[numUdpPorts]);
   for (i = 0; i < numUdpPorts; i++)
-    m_udpConns[i] = NULL;
+    udpConns[i] = NULL;
   ink_atomic_swap(&portNum, lastAllocPort);
   portNum %= 50000;
   if (portNum == 0)
@@ -1523,13 +1411,13 @@
   i = 0;
   while (i < numUdpPorts) {
 
-    if (udpNet.CreateUDPSocket(&fd1, &myaddr1, &status, portNum, m_myIP, m_sendbufsize, m_recvbufsize)) {
-      if (udpNet.CreateUDPSocket(&fd2, &myaddr2, &status, portNum + 1, m_myIP, m_sendbufsize, m_recvbufsize)) {
-        m_udpConns[i] = NEW(new UnixUDPConnection(fd1));        // new_UnixUDPConnection(fd1);
-        m_udpConns[i]->setBinding(&myaddr1);
+    if (udpNet.CreateUDPSocket(&fd1, &myaddr1, &status, portNum, myIP, sendbufsize, recvbufsize)) {
+      if (udpNet.CreateUDPSocket(&fd2, &myaddr2, &status, portNum + 1, myIP, sendbufsize, recvbufsize)) {
+        udpConns[i] = NEW(new UnixUDPConnection(fd1));        // new_UnixUDPConnection(fd1);
+        udpConns[i]->setBinding(&myaddr1);
         i++;
-        m_udpConns[i] = NEW(new UnixUDPConnection(fd2));        // new_UnixUDPConnection(fd2);
-        m_udpConns[i]->setBinding(&myaddr2);
+        udpConns[i] = NEW(new UnixUDPConnection(fd2));        // new_UnixUDPConnection(fd2);
+        udpConns[i]->setBinding(&myaddr2);
         i++;
         // remember the last alloc'ed port
         ink_atomic_swap(&lastAllocPort, portNum + 2);
@@ -1558,21 +1446,18 @@
   }
 
   for (i = 0; i < numUdpPorts; i++) {
-    udpNet.UDPClassifyConnection(m_udpConns[i], m_destIP);
+    udpNet.UDPClassifyConnection(udpConns[i], destIP);
     Debug("udpnet-pipe", "Adding (port = %d) to Pipe class: %d",
-          m_udpConns[i]->getPortNum(), m_udpConns[i]->m_pipe_class);
+          udpConns[i]->getPortNum(), udpConns[i]->pipe_class);
   }
 
   // assert should *never* fire; we check for this at the begin of the func.
-  ink_assert(!m_action.cancelled);
+  ink_assert(!action.cancelled);
 
   // Bind to threads only on a success.  Currently, after you have
   // bound to have a thread, the only way to remove a UDPConnection is
   // to call destroy(); the thread to which the UDPConnection will
   // remove the connection from a linked list and call delete.
-  //struct epoll_event ev;
-  //struct epoll_data_ptr *eptr=NULL;
-  //PollCont * pc  = NULL;
 
 #if defined(USE_EPOLL)
   struct epoll_event ev;
@@ -1583,22 +1468,19 @@
 #endif
   //changed by YTS Team, yamsat
   for (i = 0; i < numUdpPorts; i++) {
-    m_udpConns[i]->bindToThread(m_cont);
+    udpConns[i]->bindToThread(cont);
     //epoll changes
-    pc = get_UDPPollCont(m_udpConns[i]->m_ethread);
-    eptr = (struct epoll_data_ptr *) malloc(sizeof(struct epoll_data_ptr));
-
-    eptr->type = 5;             //UDP
-    eptr->data.uc = m_udpConns[i];
-    m_udpConns[i]->eptr = eptr;
+    pc = get_UDPPollCont(udpConns[i]->ethread);
+    udpConns[i]->ep.type = 5;             //UDP
+    udpConns[i]->ep.data.uc = udpConns[i];
 
 #if defined(USE_EPOLL)
     memset(&ev, 0, sizeof(struct epoll_event));
     ev.events = EPOLLIN | EPOLLET;
-    ev.data.ptr = eptr;
-    epoll_ctl(pc->pollDescriptor->epoll_fd, EPOLL_CTL_ADD, m_udpConns[i]->getFd(), &ev);
+    ev.data.ptr = &udpConns[i]->ep;
+    epoll_ctl(pc->pollDescriptor->epoll_fd, EPOLL_CTL_ADD, udpConns[i]->getFd(), &ev);
 #elif defined(USE_KQUEUE)
-    EV_SET(&ev, m_udpConns[i]->getFd(), EVFILT_READ, EV_ADD, 0, 0, eptr);
+    EV_SET(&ev, udpConns[i]->getFd(), EVFILT_READ, EV_ADD, 0, 0, &udpConns[i].ep);
     kevent(pc->pollDescriptor->kqueue_fd, &ev, 1, NULL, 0, NULL);
 #else
 #error port me
@@ -1606,16 +1488,16 @@
     //epoll changes ends here
   }                             //for
 
-  m_resultCode = NET_EVENT_DATAGRAM_OPEN;
+  resultCode = NET_EVENT_DATAGRAM_OPEN;
   goto out;
 
 Lerror:
-  m_resultCode = NET_EVENT_DATAGRAM_ERROR;
+  resultCode = NET_EVENT_DATAGRAM_ERROR;
   for (i = 0; i < numUdpPorts; i++) {
-    delete m_udpConns[i];
+    delete udpConns[i];
   }
-  delete[]m_udpConns;
-  m_udpConns = NULL;
+  delete[]udpConns;
+  udpConns = NULL;
 
 out:
   SET_HANDLER((UDPWorkContinuation_Handler) & UDPWorkContinuation::StateDoCallback);
@@ -1625,26 +1507,26 @@
 int
 UDPWorkContinuation::StateDoCallback(int event, void *data)
 {
-  MUTEX_TRY_LOCK(lock, m_action.mutex, this_ethread());
+  MUTEX_TRY_LOCK(lock, action.mutex, this_ethread());
   if (!lock) {
     this_ethread()->schedule_in(this, MUTEX_RETRY_DELAY);
     return EVENT_CONT;
   }
-  if (!m_action.cancelled) {
-    m_action.continuation->handleEvent(m_resultCode, m_udpConns);
+  if (!action.cancelled) {
+    action.continuation->handleEvent(resultCode, udpConns);
   } else {
-    // else m_action.cancelled
-    if (m_resultCode == NET_EVENT_DATAGRAM_OPEN) {
-      for (int i = 0; i < m_numPairs * 2; i++)
+    // else action.cancelled
+    if (resultCode == NET_EVENT_DATAGRAM_OPEN) {
+      for (int i = 0; i < numPairs * 2; i++)
         // don't call delete on individual connections; the udp thread will do
         // that when it cleans up an fd.
-        m_udpConns[i]->destroy();
-      delete[]m_udpConns;       // I think this is OK to delete the array, what we shouldn't do is loop over
-      m_udpConns = NULL;        // the conns and and do delete m_udpConns[i].
+        udpConns[i]->destroy();
+      delete[]udpConns;       // I think this is OK to delete the array, what we shouldn't do is loop over
+      udpConns = NULL;        // the conns and and do delete udpConns[i].
     }
   }
 
-  m_action = NULL;
+  action = NULL;
   mutex = NULL;
   udpWorkContinuationAllocator.free(this);
 

Modified: incubator/trafficserver/traffic/trunk/proxy/CoreUtils.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/proxy/CoreUtils.cc?rev=889011&r1=889010&r2=889011&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/proxy/CoreUtils.cc (original)
+++ incubator/trafficserver/traffic/trunk/proxy/CoreUtils.cc Wed Dec  9 23:01:29 2009
@@ -1143,7 +1143,7 @@
 print_netstate(NetState * n)
 {
   // These might not be 64-bit correct. /leif
-  printf("      enabled: %d  priority: %d\n", n->enabled, n->priority);
+  printf("      enabled: %d\n", n->enabled);
   printf("      op: %d  _cont: 0x%p\n", n->vio.op, n->vio._cont);
   printf("      nbytes: %d  done: %d\n", n->vio.nbytes, n->vio.ndone);
   printf("      vc_server: 0x%p   mutex: 0x%p\n\n", n->vio.vc_server, n->vio.mutex.m_ptr);

Modified: incubator/trafficserver/traffic/trunk/proxy/InkIOCoreAPI.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/proxy/InkIOCoreAPI.cc?rev=889011&r1=889010&r2=889011&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/proxy/InkIOCoreAPI.cc (original)
+++ incubator/trafficserver/traffic/trunk/proxy/InkIOCoreAPI.cc Wed Dec  9 23:01:29 2009
@@ -400,9 +400,9 @@
   UDPPacket *packet = new_UDPPacket();
   UDPConnection *conn = (UDPConnection *) udp;
 
-  packet->m_to.sin_family = PF_INET;
-  packet->m_to.sin_port = htons(port);
-  packet->m_to.sin_addr.s_addr = ip;
+  packet->to.sin_family = PF_INET;
+  packet->to.sin_port = htons(port);
+  packet->to.sin_addr.s_addr = ip;
 
   IOBufferBlock *blockp = new_IOBufferBlock();
   blockp->alloc(BUFFER_SIZE_INDEX_32K);
@@ -458,14 +458,14 @@
 INKUDPPacketFromAddressGet(INKUDPPacket packet)
 {
   UDPPacket *p = (UDPPacket *) packet;
-  return (p->m_from.sin_addr.s_addr);
+  return (p->from.sin_addr.s_addr);
 }
 
 int
 INKUDPPacketFromPortGet(INKUDPPacket packet)
 {
   UDPPacket *p = (UDPPacket *) packet;
-  return (ntohs(p->m_from.sin_port));
+  return (ntohs(p->from.sin_port));
 }
 
 INKUDPConn

Modified: incubator/trafficserver/traffic/trunk/proxy/Main.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/proxy/Main.cc?rev=889011&r1=889010&r2=889011&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/proxy/Main.cc (original)
+++ incubator/trafficserver/traffic/trunk/proxy/Main.cc Wed Dec  9 23:01:29 2009
@@ -1991,10 +1991,10 @@
   // file doesn't exist
   if (node->getNodeName() == NULL) {
     // alloc 1-elt array to store stuff for best-effort traffic
-    G_inkPipeInfo.m_perPipeInfo = NEW(new InkSinglePipeInfo[1]);
-    G_inkPipeInfo.m_perPipeInfo[0].m_wt = 1.0;
-    G_inkPipeInfo.m_numPipes = 0;
-    G_inkPipeInfo.m_interfaceMbps = 0.0;
+    G_inkPipeInfo.perPipeInfo = NEW(new InkSinglePipeInfo[1]);
+    G_inkPipeInfo.perPipeInfo[0].wt = 1.0;
+    G_inkPipeInfo.numPipes = 0;
+    G_inkPipeInfo.interfaceMbps = 0.0;
     return true;
   }
 
@@ -2002,24 +2002,24 @@
     Debug("bw-mgmt", "Root node should be an interface tag!\n");
     return false;
   }
-  // First entry G_inkPipeInfo.m_perPipeInfo[0] is the one for "best-effort" traffic.
-  G_inkPipeInfo.m_perPipeInfo = NEW(new InkSinglePipeInfo[node->getChildCount() + 1]);
-  G_inkPipeInfo.m_perPipeInfo[0].m_wt = 1.0;
-  G_inkPipeInfo.m_numPipes = 0;
-  G_inkPipeInfo.m_reliabilityMbps = 1.0;
-  G_inkPipeInfo.m_interfaceMbps = 30.0;
+  // First entry G_inkPipeInfo.perPipeInfo[0] is the one for "best-effort" traffic.
+  G_inkPipeInfo.perPipeInfo = NEW(new InkSinglePipeInfo[node->getChildCount() + 1]);
+  G_inkPipeInfo.perPipeInfo[0].wt = 1.0;
+  G_inkPipeInfo.numPipes = 0;
+  G_inkPipeInfo.reliabilityMbps = 1.0;
+  G_inkPipeInfo.interfaceMbps = 30.0;
   for (i = 0; i < node->getChildCount(); i++) {
     if ((child = node->getChildNode(i))) {
       if (strcmp(child->getNodeName(), "pipe") == 0) {
-        G_inkPipeInfo.m_numPipes++;
+        G_inkPipeInfo.numPipes++;
         for (k = 0; k < child->getChildCount(); k++) {
           c2 = child->getChildNode(k);
           for (int l = 0; l < c2->m_nACount; l++) {
             if (strcmp(c2->m_pAList[l].pAName, "weight") == 0) {
-              G_inkPipeInfo.m_perPipeInfo[G_inkPipeInfo.m_numPipes].m_wt = atof(c2->m_pAList[l].pAValue);
-              G_inkPipeInfo.m_perPipeInfo[0].m_wt -= G_inkPipeInfo.m_perPipeInfo[G_inkPipeInfo.m_numPipes].m_wt;
+              G_inkPipeInfo.perPipeInfo[G_inkPipeInfo.numPipes].wt = atof(c2->m_pAList[l].pAValue);
+              G_inkPipeInfo.perPipeInfo[0].wt -= G_inkPipeInfo.perPipeInfo[G_inkPipeInfo.numPipes].wt;
             } else if (strcmp(c2->m_pAList[l].pAName, "dest_ip") == 0) {
-              p = (unsigned char *) &(G_inkPipeInfo.m_perPipeInfo[G_inkPipeInfo.m_numPipes].m_destIP);
+              p = (unsigned char *) &(G_inkPipeInfo.perPipeInfo[G_inkPipeInfo.numPipes].destIP);
               ip = c2->m_pAList[l].pAValue;
               for (j = 0; j < 4; j++) {
                 p[j] = atoi(ip);
@@ -2033,21 +2033,21 @@
       } else if (strcmp(child->getNodeName(), "bandwidth") == 0) {
         for (j = 0; j < child->m_nACount; j++) {
           if (strcmp(child->m_pAList[j].pAName, "limit_mbps") == 0) {
-            G_inkPipeInfo.m_interfaceMbps = atof(child->m_pAList[j].pAValue);
+            G_inkPipeInfo.interfaceMbps = atof(child->m_pAList[j].pAValue);
           } else if (strcmp(child->m_pAList[j].pAName, "reliability_mbps") == 0) {
-            G_inkPipeInfo.m_reliabilityMbps = atof(child->m_pAList[j].pAValue);
+            G_inkPipeInfo.reliabilityMbps = atof(child->m_pAList[j].pAValue);
           }
         }
       }
     }
   }
-  Debug("bw-mgmt", "Read in: limit_mbps = %lf\n", G_inkPipeInfo.m_interfaceMbps);
-  for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++) {
-    G_inkPipeInfo.m_perPipeInfo[i].m_bwLimit =
-      (ink64) (G_inkPipeInfo.m_perPipeInfo[i].m_wt * G_inkPipeInfo.m_interfaceMbps * 1024.0 * 1024.0);
-    p = (unsigned char *) &(G_inkPipeInfo.m_perPipeInfo[i].m_destIP);
+  Debug("bw-mgmt", "Read in: limit_mbps = %lf\n", G_inkPipeInfo.interfaceMbps);
+  for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
+    G_inkPipeInfo.perPipeInfo[i].bwLimit =
+      (ink64) (G_inkPipeInfo.perPipeInfo[i].wt * G_inkPipeInfo.interfaceMbps * 1024.0 * 1024.0);
+    p = (unsigned char *) &(G_inkPipeInfo.perPipeInfo[i].destIP);
     Debug("bw-mgmt", "Pipe [%d]: wt = %lf, dest ip = %d.%d.%d.%d\n",
-          i, G_inkPipeInfo.m_perPipeInfo[i].m_wt, p[0], p[1], p[2], p[3]);
+          i, G_inkPipeInfo.perPipeInfo[i].wt, p[0], p[1], p[2], p[3]);
   }
   return true;
 }