You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2009/12/10 00:01:30 UTC
svn commit: r889011 [2/2] - in /incubator/trafficserver/traffic/trunk:
iocore/net/ proxy/
Modified: incubator/trafficserver/traffic/trunk/iocore/net/UnixNetVConnection.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/UnixNetVConnection.cc?rev=889011&r1=889010&r2=889011&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/net/UnixNetVConnection.cc (original)
+++ incubator/trafficserver/traffic/trunk/iocore/net/UnixNetVConnection.cc Wed Dec 9 23:01:29 2009
@@ -38,8 +38,6 @@
#define NET_MAX_IOV UIO_MAXIOV
#endif
-#define NET_THREAD_STEALING
-
#ifdef DEBUG
// Initialize class UnixNetVConnection static data
int
@@ -58,34 +56,24 @@
//
// Reschedule a UnixNetVConnection by placing the VC
-// into ReadyQueue or WaitList
+// into ready_list or wait_list
//
static inline void
read_reschedule(NetHandler * nh, UnixNetVConnection * vc)
{
if (vc->read.triggered && vc->read.enabled) {
- if (!vc->read.netready_queue) {
- nh->ready_queue.epoll_addto_read_ready_queue(vc);
- }
- } else {
- if (vc->read.netready_queue) {
- ReadyQueue::epoll_remove_from_read_ready_queue(vc);
- }
- }
+ nh->read_ready_list.in_or_enqueue(vc);
+ } else
+ nh->read_ready_list.remove(vc);
}
static inline void
write_reschedule(NetHandler * nh, UnixNetVConnection * vc)
{
if (vc->write.triggered && vc->write.enabled) {
- if (!vc->write.netready_queue) {
- nh->ready_queue.epoll_addto_write_ready_queue(vc);
- }
- } else {
- if (vc->write.netready_queue) {
- ReadyQueue::epoll_remove_from_write_ready_queue(vc);
- }
- }
+ nh->write_ready_list.in_or_enqueue(vc);
+ } else
+ nh->write_ready_list.remove(vc);
}
void
@@ -114,7 +102,6 @@
//
// Function used to close a UnixNetVConnection and free the vc
-// Modified by YTS Team, yamsat
//
void
close_UnixNetVConnection(UnixNetVConnection * vc, EThread * t)
@@ -122,17 +109,13 @@
if (vc->loggingEnabled()) {
vc->addLogMessage("close_UnixNetVConnection");
// display the slow log for the http client session
- if (vc->getLogsTotalTime() / 1000000 > 30000) {
+ if (vc->getLogsTotalTime() / 1000000 > 30000)
vc->printLogs();
- }
vc->clearLogs();
}
- XTIME(printf("%d %d close\n", vc->id, (int) ((ink_get_hrtime_internal() - vc->submit_time) / HRTIME_MSECOND)));
-
vc->cancel_OOB();
- //added by YTS Team, yamsat
PollDescriptor *pd = get_PollDescriptor(t);
#if defined(USE_EPOLL)
struct epoll_event ev;
@@ -147,10 +130,6 @@
#else
#error port me
#endif
- if (vc->ep != NULL) {
- xfree(vc->ep);
- vc->ep = NULL;
- }
socketManager.fast_close(vc->con.fd);
vc->con.fd = NO_FD;
@@ -170,38 +149,23 @@
}
vc->active_timeout_in = 0;
- //added by YTS Team, yamsat
-
- if (vc->read.queue) {
- WaitList::epoll_remove_from_read_wait_list(vc);
- }
- if (vc->write.queue) {
- WaitList::epoll_remove_from_write_wait_list(vc);
- }
-
- if (vc->read.netready_queue) {
- ReadyQueue::epoll_remove_from_read_ready_queue(vc);
- }
- if (vc->write.netready_queue) {
- ReadyQueue::epoll_remove_from_write_ready_queue(vc);
+ NetHandler *nh = vc->nh;
+ nh->open_list.remove(vc);
+ nh->read_ready_list.remove(vc);
+ nh->write_ready_list.remove(vc);
+ if (vc->read.in_enabled_list) {
+ ink_assert(vc->closed);
+ ink_assert(nh->read_enable_list.remove(vc) == vc);
+ vc->read.in_enabled_list = 0;
+ }
+ ink_assert(vc->read.enable_link.next == NULL);
+ if (vc->write.in_enabled_list) {
+ ink_assert(vc->closed);
+ ink_assert(nh->write_enable_list.remove(vc) == vc);
+ vc->write.in_enabled_list = 0;
}
+ ink_assert(vc->write.enable_link.next == NULL);
- if (vc->read.enable_queue) {
- ((Queue<UnixNetVConnection> *)vc->read.enable_queue)->remove(vc, vc->read.enable_link);
- vc->read.enable_queue = NULL;
- }
- if (vc->write.enable_queue) {
- ((Queue<UnixNetVConnection> *)vc->write.enable_queue)->remove(vc, vc->write.enable_link);
- vc->write.enable_queue = NULL;
- }
- // clear variables for reuse
- vc->nh = NULL; //added by YTS Team, yamsat
- vc->closed = 1;
- vc->read.ifd = -1;
- vc->read.triggered = 0; //added by YTS Team, yamsat
- vc->write.ifd = -1;
- vc->write.triggered = 0; //added by YTS Team, yamsat
- vc->options.reset();
vc->free(t);
}
@@ -278,19 +242,16 @@
// read the data for a UnixNetVConnection.
// Rescheduling the UnixNetVConnection by placing the VC into
-// ReadyQueue (or) WaitList
+// ready_list (or) wait_list
// Had to wrap this function with net_read_io to make SSL work..
static void
read_from_net(NetHandler * nh, UnixNetVConnection * vc, EThread * thread)
{
vc->addLogMessage("read from net");
- NetState *
- s = &vc->read;
- ProxyMutex *
- mutex = thread->mutex;
+ NetState * s = &vc->read;
+ ProxyMutex * mutex = thread->mutex;
MIOBufferAccessor & buf = s->vio.buffer;
- int
- r = 0;
+ int r = 0;
MUTEX_TRY_LOCK_FOR(lock, s->vio.mutex, thread, s->vio._cont);
@@ -381,34 +342,28 @@
if (r == -EAGAIN || r == -ENOTCONN) {
NET_DEBUG_COUNT_DYN_STAT(net_calls_to_read_nodata_stat, 1);
vc->addLogMessage("EAGAIN or ENOTCONN");
-
vc->read.triggered = 0;
- ReadyQueue::epoll_remove_from_read_ready_queue(vc);
-
+ nh->read_ready_list.remove(vc);
return;
}
if (!r || r == -ECONNRESET) {
// display the slow log for the http client session
if (vc->loggingEnabled()) {
- if (vc->getLogsTotalTime() / 1000000 > 30000) {
+ if (vc->getLogsTotalTime() / 1000000 > 30000)
vc->printLogs();
- }
vc->clearLogs();
}
// connection is closed
vc->read.triggered = 0;
- ReadyQueue::epoll_remove_from_read_ready_queue(vc);
+ nh->read_ready_list.remove(vc);
read_signal_done(VC_EVENT_EOS, nh, vc);
-
return;
}
vc->read.triggered = 0;
read_signal_error(nh, vc, -r);
return;
}
- NET_TRUSS(Debug("net_truss_read", "VC[%d:%d], read %d bytes", vc->id, vc->con.fd, r));
- XTIME(printf("%d %d read: %d\n", vc->id, (int) ((ink_get_hrtime() - vc->submit_time) / HRTIME_MSECOND), r));
NET_SUM_DYN_STAT(net_read_bytes_stat, r);
// Add data to buffer and signal continuation.
@@ -461,8 +416,7 @@
void
write_to_net(NetHandler * nh, UnixNetVConnection * vc, PollDescriptor * pd, EThread * thread)
{
- ProxyMutex *
- mutex = thread->mutex;
+ ProxyMutex *mutex = thread->mutex;
NET_DEBUG_COUNT_DYN_STAT(net_calls_to_writetonet_stat, 1);
NET_DEBUG_COUNT_DYN_STAT(net_calls_to_writetonet_afterpoll_stat, 1);
@@ -476,22 +430,18 @@
{
vc->addLogMessage("write to net io");
- NetState *
- s = &vc->write;
- ProxyMutex *
- mutex = thread->mutex;
+ NetState *s = &vc->write;
+ ProxyMutex *mutex = thread->mutex;
MUTEX_TRY_LOCK_FOR(lock, s->vio.mutex, thread, s->vio._cont);
- if (!lock || lock.m.m_ptr != s->vio.mutex.m_ptr) {
+ if (!lock || lock.m.m_ptr != s->vio.mutex.m_ptr)
return;
- }
+
// This function will always return true unless
// vc is an SSLNetVConnection.
if (!vc->getSSLHandShakeComplete()) {
- int
- err,
- ret;
+ int err, ret;
if (vc->getSSLClientConnection())
ret = vc->sslStartHandShake(SSL_EVENT_CLIENT, err);
@@ -499,31 +449,20 @@
ret = vc->sslStartHandShake(SSL_EVENT_SERVER, err);
if (ret == EVENT_ERROR) {
- if (vc->write.triggered) {
- vc->write.triggered = 0;
- }
+ vc->write.triggered = 0;
write_signal_error(nh, vc, err);
} else if (ret == SSL_HANDSHAKE_WANT_READ || ret == SSL_HANDSHAKE_WANT_ACCEPT || ret == SSL_HANDSHAKE_WANT_CONNECT
|| ret == SSL_HANDSHAKE_WANT_WRITE) {
vc->read.triggered = 0;
- if (vc->read.netready_queue) {
- ReadyQueue::epoll_remove_from_read_ready_queue(vc);
- }
+ nh->read_ready_list.remove(vc);
vc->write.triggered = 0;
- if (vc->write.netready_queue) {
- ReadyQueue::epoll_remove_from_write_ready_queue(vc);
- }
+ nh->write_ready_list.remove(vc);
} else if (ret == EVENT_DONE) {
vc->read.triggered = 1;
- if (vc->read.enabled) {
- if (!vc->read.netready_queue) {
- nh->ready_queue.epoll_addto_read_ready_queue(vc);
- }
- }
-
- } else {
+ if (vc->read.enabled)
+ nh->read_ready_list.in_or_enqueue(vc);
+ } else
write_reschedule(nh, vc);
- }
return;
}
// If it is not enabled,add to WaitList.
@@ -532,8 +471,7 @@
return;
}
// If there is nothing to do, disable
- int
- ntodo = s->vio.ntodo();
+ int ntodo = s->vio.ntodo();
if (ntodo <= 0) {
write_disable(nh, vc);
return;
@@ -543,12 +481,10 @@
ink_debug_assert(buf.writer());
// Calculate amount to write
- int
- towrite = buf.reader()->read_avail();
+ int towrite = buf.reader()->read_avail();
if (towrite > ntodo)
towrite = ntodo;
- int
- signalled = 0;
+ int signalled = 0;
// signal write ready to allow user to fill the buffer
if (towrite != ntodo && buf.writer()->write_avail()) {
@@ -573,12 +509,10 @@
return;
}
- int
- r = 0, total_wrote = 0, wattempted = 0;
- r = vc->loadBufferAndCallWrite(towrite, wattempted, total_wrote, buf);
+ int total_wrote = 0, wattempted = 0;
+ int r = vc->loadBufferAndCallWrite(towrite, wattempted, total_wrote, buf);
if (vc->loggingEnabled()) {
- char
- message[256];
+ char message[256];
snprintf(message, sizeof(message), "rval: %d towrite: %d ntodo: %d total_wrote: %d", r, towrite, ntodo,
total_wrote);
vc->addLogMessage(message);
@@ -595,9 +529,7 @@
if (r == -EAGAIN || r == -ENOTCONN) {
NET_DEBUG_COUNT_DYN_STAT(net_calls_to_write_nodata_stat, 1);
vc->write.triggered = 0;
- if (vc->write.netready_queue) {
- ReadyQueue::epoll_remove_from_write_ready_queue(vc);
- }
+ nh->write_ready_list.remove(vc);
return;
}
if (!r || r == -ECONNRESET) {
@@ -609,9 +541,6 @@
write_signal_error(nh, vc, -r);
return;
} else {
- NET_TRUSS(Debug("net_truss_write", "VC[%d:%d], write %d bytes", vc->id, vc->con.fd, r));
- XTIME(printf("%d %d write: %d\n", vc->id,
- (int) ((ink_get_hrtime_internal() - vc->submit_time) / HRTIME_MSECOND), r));
NET_SUM_DYN_STAT(net_write_bytes_stat, r);
// Remove data from the buffer and signal continuation.
@@ -650,23 +579,17 @@
VIO *
UnixNetVConnection::do_io_read(Continuation * c, int nbytes, MIOBuffer * buf)
{
- // addLogMessage("do_io_read");
- NET_TRUSS(Debug("net_truss_read", "VC[%d:%d] do_io_read(%d)", id, con.fd, nbytes));
ink_assert(!closed);
if (buf)
read.vio.buffer.writer_for(buf);
else
read.vio.buffer.clear();
- // boost the NetVC
- read.priority = INK_MIN_PRIORITY;
- write.priority = INK_MIN_PRIORITY;
read.vio.op = VIO::READ;
read.vio.mutex = c->mutex;
read.vio._cont = c;
read.vio.nbytes = nbytes;
read.vio.ndone = 0;
read.vio.vc_server = (VConnection *) this;
- XTIME(printf("%d %d do_io_read\n", id, (int) ((ink_get_hrtime_internal() - submit_time) / HRTIME_MSECOND)));
if (buf) {
if (!read.enabled)
read.vio.reenable();
@@ -680,24 +603,18 @@
UnixNetVConnection::do_io_write(Continuation * acont, int anbytes, IOBufferReader * abuffer, bool owner)
{
addLogMessage("do_io_write");
-
- NET_TRUSS(Debug("net_truss_write", "VC[%d:%d] do_io_write(%d)", id, con.fd, anbytes));
ink_assert(!closed);
if (abuffer) {
ink_assert(!owner);
write.vio.buffer.reader_for(abuffer);
} else
write.vio.buffer.clear();
- // boost the NetVC
- read.priority = INK_MIN_PRIORITY;
- write.priority = INK_MIN_PRIORITY;
write.vio.op = VIO::WRITE;
write.vio.mutex = acont->mutex;
write.vio._cont = acont;
write.vio.nbytes = anbytes;
write.vio.ndone = 0;
write.vio.vc_server = (VConnection *) this;
- XTIME(printf("%d %d do_io_write\n", id, (int) ((ink_get_hrtime_internal() - submit_time) / HRTIME_MSECOND)));
if (abuffer) {
if (!write.enabled)
write.vio.reenable();
@@ -834,142 +751,92 @@
//
// Function used to reenable the VC for reading or
-// writing. Modified by YTS Team, yamsat
+// writing.
//
void
UnixNetVConnection::reenable(VIO * vio)
{
-
- if (STATE_FROM_VIO(vio)->enabled) {
+ if (STATE_FROM_VIO(vio)->enabled)
return;
- }
- NET_TRUSS(Debug("net_truss", "VC[%d:%d] UnixNetVConnection::reenable", id, con.fd));
set_enabled(vio);
-#ifdef NET_THREAD_STEALING
if (!thread)
return;
EThread *t = vio->mutex->thread_holding;
ink_debug_assert(t == this_ethread());
- //Modified by YTS Team, yamsat
if (nh->mutex->thread_holding == t) {
if (vio == &read.vio) {
- if (read.triggered) {
- if (!read.netready_queue) {
- nh->ready_queue.epoll_addto_read_ready_queue(this);
- }
- } else {
- if (read.netready_queue) {
- ReadyQueue::epoll_remove_from_read_ready_queue(this);
- }
- }
-
+ if (read.triggered)
+ nh->read_ready_list.in_or_enqueue(this);
+ else
+ nh->read_ready_list.remove(this);
} else {
- if (write.triggered) {
- if (!write.netready_queue) {
- nh->ready_queue.epoll_addto_write_ready_queue(this);
- }
- } else {
- if (write.netready_queue) {
- ReadyQueue::epoll_remove_from_write_ready_queue(this);
- }
- }
+ if (write.triggered)
+ nh->write_ready_list.in_or_enqueue(this);
+ else
+ nh->write_ready_list.remove(this);
}
} else if (!nh->mutex->is_thread()) {
MUTEX_TRY_LOCK(lock, nh->mutex, t);
if (!lock) {
if (vio == &read.vio) {
- ink_mutex_acquire(&nh->read_enable_mutex.m_ptr->the_mutex);
- if (!read.enable_queue) {
- read.enable_queue = &nh->read_enable_list;
- nh->read_enable_list.enqueue(this, read.enable_link);
+ if (!read.in_enabled_list) {
+ read.in_enabled_list = 1;
+ nh->read_enable_list.push(this);
}
- ink_mutex_release(&nh->read_enable_mutex.m_ptr->the_mutex);
} else {
- ink_mutex_acquire(&nh->write_enable_mutex.m_ptr->the_mutex);
- if (!write.enable_queue) {
- write.enable_queue = &nh->write_enable_list;
- nh->write_enable_list.enqueue(this, write.enable_link);
- }
- ink_mutex_release(&nh->write_enable_mutex.m_ptr->the_mutex);
- }
- return;
- }
- if (vio == &read.vio) {
- if (read.triggered) {
- if (!read.netready_queue) {
- nh->ready_queue.epoll_addto_read_ready_queue(this);
- }
- } else {
- if (read.netready_queue) {
- ReadyQueue::epoll_remove_from_read_ready_queue(this);
+ if (!write.in_enabled_list) {
+ write.in_enabled_list = 1;
+ nh->write_enable_list.push(this);
}
}
} else {
- if (write.triggered) {
- if (!write.netready_queue) {
- nh->ready_queue.epoll_addto_write_ready_queue(this);
- }
+ if (vio == &read.vio) {
+ if (read.triggered)
+ nh->read_ready_list.in_or_enqueue(this);
+ else
+ nh->read_ready_list.remove(this);
} else {
- if (write.netready_queue) {
- ReadyQueue::epoll_remove_from_write_ready_queue(this);
- }
+ if (write.triggered)
+ nh->write_ready_list.in_or_enqueue(this);
+ else
+ nh->write_ready_list.remove(this);
}
}
}
-#endif
}
void
UnixNetVConnection::reenable_re(VIO * vio)
{
-
set_enabled(vio);
-
-#ifdef NET_THREAD_STEALING
- if (!thread) {
+ if (!thread)
return;
- }
EThread *t = vio->mutex->thread_holding;
ink_debug_assert(t == this_ethread());
if (nh->mutex->thread_holding == t) {
if (vio == &read.vio) {
- if (read.triggered) {
+ if (read.triggered)
net_read_io(nh, t);
- } else {
- if (read.netready_queue != NULL) {
- ReadyQueue::epoll_remove_from_read_ready_queue(this);
- }
- }
+ else
+ nh->read_ready_list.remove(this);
} else {
- if (write.triggered) {
+ if (write.triggered)
write_to_net(nh, this, NULL, t);
- } else {
- if (write.netready_queue != NULL) {
- ReadyQueue::epoll_remove_from_write_ready_queue(this);
- }
- }
+ else
+ nh->write_ready_list.remove(this);
}
}
-#endif
-}
-
-
-void
-UnixNetVConnection::boost()
-{
- ink_assert(thread);
}
UnixNetVConnection::UnixNetVConnection():
-closed(1), inactivity_timeout_in(0), active_timeout_in(0),
+closed(0), inactivity_timeout_in(0), active_timeout_in(0),
#ifdef INACTIVITY_TIMEOUT
inactivity_timeout(NULL),
#else
next_inactivity_timeout_at(0),
#endif
- active_timeout(NULL), ep(NULL), //added by YTS Team, yamsat
- nh(NULL), //added by YTS Team, yamsat
+ active_timeout(NULL), nh(NULL),
id(0), ip(0), _interface(0), accept_port(0), port(0), flags(0), recursion(0), submit_time(0), oob_ptr(0)
{
memset(&local_sa, 0, sizeof local_sa);
@@ -987,11 +854,9 @@
STATE_FROM_VIO(vio)->enabled = 1;
#ifdef DEBUG
if (vio == &read.vio) {
- XTIME(printf("%d %d reenable read\n", id, (int) ((ink_get_hrtime_internal() - submit_time) / HRTIME_MSECOND)));
if (enable_debug_trace && (vio->buffer.mbuf && !vio->buffer.writer()->write_avail()));
} else {
ink_assert(vio == &write.vio);
- XTIME(printf("%d %d reenable write\n", id, (int) ((ink_get_hrtime_internal() - submit_time) / HRTIME_MSECOND)));
if (enable_debug_trace && (vio->buffer.mbuf && !vio->buffer.reader()->read_avail()));
}
#endif
@@ -1004,8 +869,6 @@
#endif
}
-
-
void
UnixNetVConnection::net_read_io(NetHandler * nh, EThread * lthread)
{
@@ -1145,25 +1008,17 @@
SET_HANDLER((NetVConnHandler) & UnixNetVConnection::mainEvent);
- //added by YTS Team, yamsat
nh = get_NetHandler(thread);
PollDescriptor *pd = get_PollDescriptor(thread);
-
- struct epoll_data_ptr *eptr;
- eptr = (struct epoll_data_ptr *) xmalloc(sizeof(struct epoll_data_ptr));
- eptr->type = EPOLL_READWRITE_VC;
- eptr->data.vc = this;
-
- this->ep = eptr;
-
- closed = 0;
+ ep.type = EPOLL_READWRITE_VC;
+ ep.data.vc = this;
#if defined(USE_EPOLL)
struct epoll_event ev;
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
- ev.data.ptr = eptr;
+ ev.data.ptr = &ep;
//printf("Added to epoll ctl fd %d and number is %d\n",con.fd,id);
if (epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, con.fd, &ev) < 0) {
Debug("iocore_net", "acceptEvent : Failed to add to epoll list\n");
@@ -1172,8 +1027,8 @@
}
#elif defined(USE_KQUEUE)
struct kevent ev[2];
- EV_SET(&ev[0], con.fd, EVFILT_READ, EV_ADD, 0, 0, eptr);
- EV_SET(&ev[1], con.fd, EVFILT_WRITE, EV_ADD, 0, 0, eptr);
+ EV_SET(&ev[0], con.fd, EVFILT_READ, EV_ADD, 0, 0, &ep);
+ EV_SET(&ev[1], con.fd, EVFILT_WRITE, EV_ADD, 0, 0, &ep);
if (kevent(pd->kqueue_fd, &ev[0], 2, NULL, 0, NULL) < 0) {
Debug("iocore_net", "acceptEvent : Failed to add to kqueue list\n");
close_UnixNetVConnection(this, e->ethread);
@@ -1183,14 +1038,7 @@
#error port me
#endif
- Debug("iocore_net", "acceptEvent : Adding fd %d to read wait list\n", con.fd);
- nh->wait_list.epoll_addto_read_wait_list(this);
- Debug("iocore_net", "acceptEvent : Adding fd %d to write wait list\n", con.fd);
- nh->wait_list.epoll_addto_write_wait_list(this);
-
- //Debug("iocore_net", "acceptEvent : Setting triggered and adding to the read ready queue");
- //read.triggered = 1;
- //nh->ready_queue.epoll_addto_read_ready_queue(this);
+ nh->open_list.enqueue(this);
if (inactivity_timeout_in)
UnixNetVConnection::set_inactivity_timeout(inactivity_timeout_in);
@@ -1209,7 +1057,6 @@
UnixNetVConnection::mainEvent(int event, Event * e)
{
addLogMessage("main event");
- NET_TRUSS(Debug("netvc_truss_timeout", "UnixNetVConnection[%d]::timeout", con.fd));
ink_debug_assert(event == EVENT_IMMEDIATE || event == EVENT_INTERVAL);
/* BZ 31932 */
ink_debug_assert(thread == this_ethread());
@@ -1223,14 +1070,11 @@
if (e == active_timeout)
#endif
e->schedule_in(NET_RETRY_DELAY);
-
return EVENT_CONT;
}
-
- if (e->cancelled) {
-
+ if (e->cancelled)
return EVENT_DONE;
- }
+
int signal_event;
Event **signal_timeout;
Continuation *reader_cont = NULL;
@@ -1250,9 +1094,8 @@
/* BZ 49408 */
//ink_debug_assert(inactivity_timeout_in);
//ink_debug_assert(next_inactivity_timeout_at < ink_get_hrtime());
- if (!inactivity_timeout_in || next_inactivity_timeout_at > ink_get_hrtime()) {
+ if (!inactivity_timeout_in || next_inactivity_timeout_at > ink_get_hrtime())
return EVENT_CONT;
- }
signal_event = VC_EVENT_INACTIVITY_TIMEOUT;
signal_timeout_at = &next_inactivity_timeout_at;
}
@@ -1265,27 +1108,24 @@
*signal_timeout = 0;
*signal_timeout_at = 0;
writer_cont = write.vio._cont;
-
+
if (closed) {
- //added by YTS Team, yamsat
close_UnixNetVConnection(this, thread);
return EVENT_DONE;
}
+
if (read.vio.op == VIO::READ && !(f.shutdown & NET_VC_SHUTDOWN_READ)) {
reader_cont = read.vio._cont;
- if (read_signal_and_update(signal_event, this) == EVENT_DONE) {
+ if (read_signal_and_update(signal_event, this) == EVENT_DONE)
return EVENT_DONE;
- }
}
if (!*signal_timeout &&
!*signal_timeout_at &&
!closed && write.vio.op == VIO::WRITE &&
- !(f.shutdown & NET_VC_SHUTDOWN_WRITE) && reader_cont != write.vio._cont && writer_cont == write.vio._cont) {
- if (write_signal_and_update(signal_event, this) == EVENT_DONE) {
+ !(f.shutdown & NET_VC_SHUTDOWN_WRITE) && reader_cont != write.vio._cont && writer_cont == write.vio._cont)
+ if (write_signal_and_update(signal_event, this) == EVENT_DONE)
return EVENT_DONE;
- }
- }
return EVENT_DONE;
}
@@ -1320,19 +1160,14 @@
nh = get_NetHandler(t);
PollDescriptor *pd = get_PollDescriptor(t);
- closed = 0; // need to set this before adding it to epoll
-
- struct epoll_data_ptr *eptr;
- eptr = (struct epoll_data_ptr *) xmalloc(sizeof(struct epoll_data_ptr));
- eptr->type = EPOLL_READWRITE_VC;
- eptr->data.vc = this;
- ep = eptr;
+ ep.type = EPOLL_READWRITE_VC;
+ ep.data.vc = this;
#if defined(USE_EPOLL)
struct epoll_event ev;
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
- ev.data.ptr = eptr;
+ ev.data.ptr = &ep;
int rval = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, socketFd, &ev);
if (rval != 0) {
lerrno = errno;
@@ -1343,8 +1178,8 @@
}
#elif defined(USE_KQUEUE)
struct kevent ev[2];
- EV_SET(&ev[0], socketFd, EVFILT_READ, EV_ADD, 0, 0, eptr);
- EV_SET(&ev[1], socketFd, EVFILT_WRITE, EV_ADD, 0, 0, eptr);
+ EV_SET(&ev[0], socketFd, EVFILT_READ, EV_ADD, 0, 0, &ep);
+ EV_SET(&ev[1], socketFd, EVFILT_WRITE, EV_ADD, 0, 0, &ep);
int rval = kevent(pd->kqueue_fd, &ev[0], 2, NULL, 0, NULL);
if (rval < 0) {
lerrno = errno;
@@ -1378,24 +1213,18 @@
// function code not to be duplicated in the inherited SSL class.
// sslStartHandShake (SSL_EVENT_CLIENT, err);
- //added for epoll by YTS Team, yamsat
-
if (_interface || options.local_port || options.spoof_ip) {
nh = get_NetHandler(t);
PollDescriptor *pd = get_PollDescriptor(t);
- struct epoll_data_ptr *eptr;
- eptr = (struct epoll_data_ptr *) xmalloc(sizeof(struct epoll_data_ptr));
- eptr->type = EPOLL_READWRITE_VC;
- eptr->data.vc = this;
-
- ep = eptr;
+ ep.type = EPOLL_READWRITE_VC;
+ ep.data.vc = this;
#if defined(USE_EPOLL)
struct epoll_event ev;
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
- ev.data.ptr = eptr;
+ ev.data.ptr = &ep;
res = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, con.fd, &ev);
if (res < 0) {
Debug("iocore_net", "connectUp : Failed to add to epoll list\n");
@@ -1406,8 +1235,8 @@
}
#elif defined(USE_KQUEUE)
struct kevent ev[2];
- EV_SET(&ev[0], con.fd, EVFILT_READ, EV_ADD, 0, 0, eptr);
- EV_SET(&ev[1], con.fd, EVFILT_WRITE, EV_ADD, 0, 0, eptr);
+ EV_SET(&ev[0], con.fd, EVFILT_READ, EV_ADD, 0, 0, &ep);
+ EV_SET(&ev[1], con.fd, EVFILT_WRITE, EV_ADD, 0, 0, &ep);
res = kevent(pd->kqueue_fd, &ev[0], 2, NULL, 0, NULL);
if (res < 0) {
lerrno = errno;
@@ -1421,18 +1250,11 @@
#endif
}
- closed = 0;
-
- Debug("iocore_net", "connectUp : Adding fd %d to read wait list\n", con.fd);
- nh->wait_list.epoll_addto_read_wait_list(this);
-
- Debug("iocore_net", "connectUp : Adding fd %d to write wait list\n", con.fd);
- nh->wait_list.epoll_addto_write_wait_list(this);
+ nh->open_list.enqueue(this);
ink_assert(!inactivity_timeout_in);
ink_assert(!active_timeout_in);
action_.continuation->handleEvent(NET_EVENT_OPEN, this);
- XTIME(printf("%d 2connect\n", id));
return CONNECT_SUCCESS;
}
@@ -1441,6 +1263,7 @@
UnixNetVConnection::free(EThread * t)
{
NET_DECREMENT_THREAD_DYN_STAT(net_connections_currently_open_stat, t);
+ // clear variables for reuse
got_remote_addr = 0;
got_local_addr = 0;
read.vio.mutex.clear();
@@ -1450,26 +1273,18 @@
flags = 0;
accept_port = 0;
SET_CONTINUATION_HANDLER(this, (NetVConnHandler) & UnixNetVConnection::startEvent);
- //added for epoll by YTS Team, yamsat
- if (ep != NULL) {
- xfree(ep);
- ep = NULL;
- }
- if (nh != NULL) {
- nh = NULL;
- }
- ink_debug_assert(!read.queue && !write.queue);
- ink_debug_assert(!read.netready_queue && !write.netready_queue);
- ink_debug_assert(!read.enable_queue && !write.enable_queue);
- ink_debug_assert(!read.link.prev && !read.link.next);
- ink_debug_assert(!read.netready_link.prev && !read.netready_link.next);
- ink_debug_assert(!read.enable_link.prev && !read.enable_link.next);
- ink_debug_assert(!write.link.prev && !write.link.next);
- ink_debug_assert(!write.netready_link.prev && !write.netready_link.next);
- ink_debug_assert(!write.enable_link.prev && !write.enable_link.next);
+ nh = NULL;
+ read.triggered = 0;
+ write.triggered = 0;
+ options.reset();
+ ink_debug_assert(!read.ready_link.prev && !read.ready_link.next);
+ ink_debug_assert(!read.enable_link.next);
+ ink_debug_assert(!write.ready_link.prev && !write.ready_link.next);
+ ink_debug_assert(!write.enable_link.next);
ink_debug_assert(!link.next && !link.prev);
ink_debug_assert(!active_timeout);
ink_debug_assert(con.fd == NO_FD);
ink_debug_assert(t == this_ethread());
+ closed = 0;
THREAD_FREE(this, netVCAllocator, t);
}
Modified: incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPConnection.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPConnection.cc?rev=889011&r1=889010&r2=889011&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPConnection.cc (original)
+++ incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPConnection.cc Wed Dec 9 23:01:29 2009
@@ -35,8 +35,8 @@
{
UDPPacketInternal *p = (UDPPacketInternal *) ink_atomiclist_popall(&inQueue);
- if (!m_tobedestroyed)
- m_tobedestroyed = 1;
+ if (!tobedestroyed)
+ tobedestroyed = 1;
if (p) {
UDPPacketInternal *pnext = NULL;
@@ -44,7 +44,6 @@
pnext = p->alink.next;
p->alink.next = NULL;
p->free();
- //delete p;
p = pnext;
}
}
@@ -53,10 +52,10 @@
callbackAction = NULL;
}
Debug("udpnet", "Destroying udp port = %d", getPortNum());
- if (m_fd != -1) {
- socketManager.close(m_fd, keSocket);
+ if (fd != -1) {
+ socketManager.close(fd, keSocket);
}
- m_fd = -1;
+ fd = -1;
}
// called with continuation lock taken out
@@ -81,7 +80,7 @@
if (p) {
Debug("udpnet", "UDPConnection::callbackHandler");
UDPPacketInternal *pnext = NULL;
- Queue<UDPPacketInternal> result;
+ Que(UDPPacketInternal, link) result;
while (p) {
pnext = p->alink.next;
p->alink.next = NULL;
@@ -91,9 +90,8 @@
if (!shouldDestroy())
continuation->handleEvent(NET_EVENT_DATAGRAM_READ_READY, &result);
else {
- while ((p = result.dequeue())) {
+ while ((p = result.dequeue()))
p->free();
- }
}
}
}
@@ -109,7 +107,7 @@
EThread *t = eventProcessor.assign_thread(ET_UDP);
ink_assert(t);
ink_assert(get_UDPNetHandler(t));
- uc->m_ethread = t;
+ uc->ethread = t;
AddRef();
uc->continuation = c;
mutex = c->mutex;
@@ -134,7 +132,7 @@
conn->continuation = c;
ink_assert(conn->continuation != NULL);
mutex = c->mutex;
- p->m_reqGenerationNum = conn->m_sendGenerationNum;
- get_UDPNetHandler(conn->m_ethread)->udpOutQueue->send(p);
+ p->reqGenerationNum = conn->sendGenerationNum;
+ get_UDPNetHandler(conn->ethread)->udpOutQueue.send(p);
return ACTION_RESULT_NONE;
}
Modified: incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPNet.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPNet.cc?rev=889011&r1=889010&r2=889011&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPNet.cc (original)
+++ incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPNet.cc Wed Dec 9 23:01:29 2009
@@ -106,28 +106,11 @@
return -1;
pollCont_offset = eventProcessor.allocate(sizeof(PollCont));
- udpNetHandler_offset = eventProcessor.allocate(sizeof(NetHandler));
+ udpNetHandler_offset = eventProcessor.allocate(sizeof(UDPNetHandler));
for (int i = 0; i < eventProcessor.n_threads_for_type[ET_UDP]; i++)
initialize_thread_for_udp_net(eventProcessor.eventthread[ET_UDP][i]);
-#if 0
- unsigned long hoodIpaddr;
- unsigned char *ip;
- struct sockaddr_in addr;
- Action *status;
-
- ip = (unsigned char *) &hoodIpaddr;
- ip[0] = 216;
- ip[1] = 155;
- ip[2] = 202;
- ip[3] = 240;
- G_bwGrapherLoc.sin_family = AF_INET;
- G_bwGrapherLoc.sin_addr.s_addr = hoodIpaddr;
- G_bwGrapherLoc.sin_port = 7777;
- CreateUDPSocket(&G_bwGrapherFd, &addr, &status, 0, 0, 65536, 65536);
-#endif
-
return 0;
}
@@ -138,21 +121,6 @@
(void) thread;
UnixUDPConnection *uc = (UnixUDPConnection *) xuc;
-//epoll changes
-/*
- int i = uc->getPollvecIndex();
- if (pd && i >= 0) {
- Pollfd * pfd = &pd->pfd[i];
- uc->clearPollvecIndex();
- if (!(pfd->revents & POLLIN)) { // not ready for read
- return;
- }
- } else {
- uc->clearPollvecIndex();
- return;
- }
- */
-//epoll changes ends here
// receive packet and queue onto UDPConnection.
// don't call back connection at this time.
int r;
@@ -187,7 +155,7 @@
ink_assert(uc->callback_link.next == NULL);
ink_assert(uc->callback_link.prev == NULL);
uc->AddRef();
- nh->udp_callbacks->enqueue(uc, uc->callback_link);
+ nh->udp_callbacks.enqueue(uc);
uc->onCallbackQueue = 1;
}
}
@@ -610,7 +578,7 @@
worker->init(cont, nPairs, myIP, destIP, send_bufsize, recv_bufsize);
eventProcessor.schedule_imm(worker, ET_UDP);
- return &(worker->m_action);
+ return &(worker->action);
}
@@ -689,28 +657,28 @@
int i;
UDPConnectionInternal *p = (UDPConnectionInternal *) udpConn;
- if (G_inkPipeInfo.m_numPipes == 0) {
- p->m_pipe_class = 0;
+ if (G_inkPipeInfo.numPipes == 0) {
+ p->pipe_class = 0;
return;
}
- p->m_pipe_class = -1;
+ p->pipe_class = -1;
// find a match: 0 is best-effort
- for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++)
- if (G_inkPipeInfo.m_perPipeInfo[i].m_destIP == destIP)
- p->m_pipe_class = i;
+ for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++)
+ if (G_inkPipeInfo.perPipeInfo[i].destIP == destIP)
+ p->pipe_class = i;
// no match; set it to the destIP=0 class
- if (p->m_pipe_class == -1) {
- for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++)
- if (G_inkPipeInfo.m_perPipeInfo[i].m_destIP == 0) {
- p->m_pipe_class = i;
+ if (p->pipe_class == -1) {
+ for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++)
+ if (G_inkPipeInfo.perPipeInfo[i].destIP == 0) {
+ p->pipe_class = i;
break;
}
}
- Debug("udpnet-pipe", "Pipe class = %d", p->m_pipe_class);
- ink_debug_assert(p->m_pipe_class != -1);
- if (p->m_pipe_class == -1)
- p->m_pipe_class = 0;
- G_inkPipeInfo.m_perPipeInfo[p->m_pipe_class].m_count++;
+ Debug("udpnet-pipe", "Pipe class = %d", p->pipe_class);
+ ink_debug_assert(p->pipe_class != -1);
+ if (p->pipe_class == -1)
+ p->pipe_class = 0;
+ G_inkPipeInfo.perPipeInfo[p->pipe_class].count++;
}
Action *
@@ -788,24 +756,24 @@
UDPConnectionInternal *udpIntConn = (UDPConnectionInternal *) udpConn;
ink64 desiredbps = (ink64) (desiredMbps * 1024.0 * 1024.0);
- if (G_inkPipeInfo.m_numPipes == 0) {
- udpIntConn->m_flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
+ if (G_inkPipeInfo.numPipes == 0) {
+ udpIntConn->flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
return true;
}
- if ((udpIntConn->m_pipe_class == 0) ||
- (G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc + desiredbps >
- G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwLimit)) {
+ if ((udpIntConn->pipe_class == 0) ||
+ (G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc + desiredbps >
+ G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwLimit)) {
Debug("udpnet-admit", "Denying flow with %lf Mbps", desiredMbps);
return false;
}
- udpIntConn->m_flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
- udpIntConn->m_allocedbps = desiredbps;
- ink_atomic_increment64(&G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc, desiredbps);
+ udpIntConn->flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
+ udpIntConn->allocedbps = desiredbps;
+ ink_atomic_increment64(&G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc, desiredbps);
Debug("udpnet-admit", "Admitting flow with %lf Mbps (a=%lld, lim=%lld)",
desiredMbps,
- G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc,
- G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwLimit);
+ G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc,
+ G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwLimit);
return true;
}
@@ -814,27 +782,27 @@
{
UDPConnectionInternal *udpIntConn = (UDPConnectionInternal *) udpConn;
ink64 desiredbps = (ink64) (desiredMbps * 1024.0 * 1024.0);
- ink64 oldbps = (ink64) (udpIntConn->m_flowRateBps * 8.0);
+ ink64 oldbps = (ink64) (udpIntConn->flowRateBps * 8.0);
- if (G_inkPipeInfo.m_numPipes == 0) {
- udpIntConn->m_flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
+ if (G_inkPipeInfo.numPipes == 0) {
+ udpIntConn->flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
return true;
}
// arithmetic here is in bits-per-sec.
- if ((udpIntConn->m_pipe_class == 0) ||
- (G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc +
- desiredbps - oldbps) > G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwLimit) {
+ if ((udpIntConn->pipe_class == 0) ||
+ (G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc +
+ desiredbps - oldbps) > G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwLimit) {
Debug("udpnet-admit", "Unable to change b/w for flow to %lf Mbps", desiredMbps);
return false;
}
- udpIntConn->m_flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
- udpIntConn->m_allocedbps = desiredbps;
- ink_atomic_increment64(&G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc, desiredbps - oldbps);
+ udpIntConn->flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
+ udpIntConn->allocedbps = desiredbps;
+ ink_atomic_increment64(&G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc, desiredbps - oldbps);
Debug("udpnet-admit", "Changing flow's b/w from %lf Mbps to %lf Mbps (a=%lld, lim=%lld)",
(double) oldbps / (1024.0 * 1024.0),
desiredMbps,
- G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc,
- G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwLimit);
+ G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc,
+ G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwLimit);
return true;
}
@@ -844,24 +812,24 @@
UDPConnectionInternal *udpIntConn = (UDPConnectionInternal *) udpConn;
ink64 bps;
- if (G_inkPipeInfo.m_numPipes == 0)
+ if (G_inkPipeInfo.numPipes == 0)
return;
- Debug("udpnet-free", "Trying to releasing %lf (%lld) Kbps", udpIntConn->m_flowRateBps, udpIntConn->m_allocedbps);
+ Debug("udpnet-free", "Trying to releasing %lf (%lld) Kbps", udpIntConn->flowRateBps, udpIntConn->allocedbps);
- bps = udpIntConn->m_allocedbps;
+ bps = udpIntConn->allocedbps;
if (bps <= 0)
return;
- ink_atomic_increment64(&G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc, -bps);
+ ink_atomic_increment64(&G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc, -bps);
Debug("udpnet-free", "Releasing %lf Kbps", bps / 1024.0);
- if (G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc < 0)
- G_inkPipeInfo.m_perPipeInfo[udpIntConn->m_pipe_class].m_bwAlloc = 0;
+ if (G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc < 0)
+ G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc = 0;
- udpIntConn->m_flowRateBps = 0.0;
- udpIntConn->m_allocedbps = 0;
+ udpIntConn->flowRateBps = 0.0;
+ udpIntConn->allocedbps = 0;
}
double
@@ -870,36 +838,33 @@
int i;
double usedBw = 0.0;
- if (G_inkPipeInfo.m_numPipes == 0)
+ if (G_inkPipeInfo.numPipes == 0)
// return 100Mbps if there are no pipes
return 100.0;
- for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++) {
- usedBw += G_inkPipeInfo.m_perPipeInfo[i].m_bwUsed;
+ for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
+ usedBw += G_inkPipeInfo.perPipeInfo[i].bwUsed;
}
- return G_inkPipeInfo.m_interfaceMbps - usedBw;
+ return G_inkPipeInfo.interfaceMbps - usedBw;
}
// send out all packets that need to be sent out as of time=now
-UDPQueue::UDPQueue(InkAtomicList * pQueue)
-:m_atomicQueue(pQueue)
- , m_last_report(0)
- , m_last_service(0)
- , m_last_byteperiod(0)
- , m_bytesSent(0)
- , m_packets(0)
- , m_added(0)
+UDPQueue::UDPQueue()
+: last_report(0)
+, last_service(0)
+, last_byteperiod(0)
+, bytesSent(0)
+, packets(0)
+, added(0)
{
-
}
UDPQueue::~UDPQueue()
{
UDPPacketInternal *p;
- while ((p = m_reliabilityPktQueue.dequeue()) != NULL) {
+ while ((p = reliabilityPktQueue.dequeue()) != NULL)
p->free();
- }
}
/*
@@ -926,11 +891,11 @@
schedJitter += ink_hrtime_to_msec(now - lastSchedTime);
numTimesSched++;
- p = (UDPPacketInternal *) ink_atomiclist_popall(m_atomicQueue);
+ p = (UDPPacketInternal *) ink_atomiclist_popall(&atomicQueue);
if (p) {
UDPPacketInternal *pnext = NULL;
- Queue<UDPPacketInternal> stk;
+ Que(UDPPacketInternal, link) stk;
while (p) {
pnext = p->alink.next;
@@ -943,49 +908,49 @@
p = stk.pop();
ink_assert(p->link.prev == NULL);
ink_assert(p->link.next == NULL);
- if (p->m_isReliabilityPkt) {
- m_reliabilityPktQueue.enqueue(p);
+ if (p->isReliabilityPkt) {
+ reliabilityPktQueue.enqueue(p);
continue;
}
// insert into our queue.
Debug("udp-send", "Adding 0x%x", p);
- addToGuaranteedQ = ((p->m_conn->m_pipe_class > 0) && (p->m_conn->m_flowRateBps > 10.0));
+ addToGuaranteedQ = ((p->conn->pipe_class > 0) && (p->conn->flowRateBps > 10.0));
pktLen = p->getPktLength();
- if (p->m_conn->m_lastPktStartTime == 0) {
- p->m_pktSendStartTime = MAX(now, p->m_delivery_time);
+ if (p->conn->lastPktStartTime == 0) {
+ p->pktSendStartTime = MAX(now, p->delivery_time);
} else {
pktSize = MAX(INK_ETHERNET_MTU_SIZE, pktLen);
if (addToGuaranteedQ) {
// NOTE: this is flow rate in Bytes per sec.; convert to milli-sec.
- minPktSpacing = 1000.0 / (p->m_conn->m_flowRateBps / p->m_conn->m_avgPktSize);
+ minPktSpacing = 1000.0 / (p->conn->flowRateBps / p->conn->avgPktSize);
- pktSendTime = p->m_conn->m_lastPktStartTime + ink_hrtime_from_msec((inku32) minPktSpacing);
+ pktSendTime = p->conn->lastPktStartTime + ink_hrtime_from_msec((inku32) minPktSpacing);
} else {
minPktSpacing = 0.0;
- pktSendTime = p->m_delivery_time;
+ pktSendTime = p->delivery_time;
}
- p->m_pktSendStartTime = MAX(MAX(now, pktSendTime), p->m_delivery_time);
- if (p->m_conn->m_flowRateBps > 25600.0)
+ p->pktSendStartTime = MAX(MAX(now, pktSendTime), p->delivery_time);
+ if (p->conn->flowRateBps > 25600.0)
Debug("udpnet-pkt", "Pkt size = %.1lf now = %lld, send = %lld, del = %lld, Delay delta = %lld; delta = %lld",
- p->m_conn->m_avgPktSize,
- now, pktSendTime, p->m_delivery_time,
- ink_hrtime_to_msec(p->m_pktSendStartTime - now),
- ink_hrtime_to_msec(p->m_pktSendStartTime - p->m_conn->m_lastPktStartTime));
+ p->conn->avgPktSize,
+ now, pktSendTime, p->delivery_time,
+ ink_hrtime_to_msec(p->pktSendStartTime - now),
+ ink_hrtime_to_msec(p->pktSendStartTime - p->conn->lastPktStartTime));
- p->m_conn->m_avgPktSize = ((4.0 * p->m_conn->m_avgPktSize) / 5.0) + (pktSize / 5.0);
+ p->conn->avgPktSize = ((4.0 * p->conn->avgPktSize) / 5.0) + (pktSize / 5.0);
}
- p->m_conn->m_lastPktStartTime = p->m_pktSendStartTime;
- p->m_delivery_time = p->m_pktSendStartTime;
- p->m_conn->m_nBytesTodo += pktLen;
+ p->conn->lastPktStartTime = p->pktSendStartTime;
+ p->delivery_time = p->pktSendStartTime;
+ p->conn->nBytesTodo += pktLen;
g_udp_bytesPending += pktLen;
if (addToGuaranteedQ)
- G_inkPipeInfo.m_perPipeInfo[p->m_conn->m_pipe_class].m_queue->addPacket(p, now);
+ G_inkPipeInfo.perPipeInfo[p->conn->pipe_class].queue->addPacket(p, now);
else {
// stick in the best-effort queue: either it was a best-effort flow or
// the thingy wasn't alloc'ed bandwidth
- G_inkPipeInfo.m_perPipeInfo[0].m_queue->addPacket(p, now);
+ G_inkPipeInfo.perPipeInfo[0].queue->addPacket(p, now);
}
}
}
@@ -998,8 +963,8 @@
lastPrintTime = now;
}
- for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++)
- G_inkPipeInfo.m_perPipeInfo[i].m_queue->advanceNow(now);
+ for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++)
+ G_inkPipeInfo.perPipeInfo[i].queue->advanceNow(now);
if (G_bulkIOState) {
BulkIOSend();
@@ -1007,9 +972,9 @@
SendPackets();
}
- timeSpent = ink_hrtime_to_msec(now - m_last_report);
+ timeSpent = ink_hrtime_to_msec(now - last_report);
if (timeSpent > 10000) {
- // if (m_bytesSent > 0)
+ // if (bytesSent > 0)
// timespent is in milli-seconds
char temp[2048], *p1;
char bwMessage[2048];
@@ -1020,45 +985,45 @@
bwMessage[0] = '\0';
p1 = temp;
- if (m_bytesSent > 0)
- totalBw = (m_bytesSent * 8.0 * 1000.0) / (timeSpent * 1024.0 * 1024.0);
+ if (bytesSent > 0)
+ totalBw = (bytesSent * 8.0 * 1000.0) / (timeSpent * 1024.0 * 1024.0);
else
totalBw = 1.0;
- for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++) {
+ for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
// bw is in Mbps
- bw = (G_inkPipeInfo.m_perPipeInfo[i].m_bytesSent * 8.0 * 1000.0) / (timeSpent * 1024.0 * 1024.0);
+ bw = (G_inkPipeInfo.perPipeInfo[i].bytesSent * 8.0 * 1000.0) / (timeSpent * 1024.0 * 1024.0);
snprintf(p1, sizeof(temp), "\t class[%d] = %f Mbps, alloc = %f Mbps, (conf'ed = %f, got = %f) \n",
- i, bw, (G_inkPipeInfo.m_perPipeInfo[i].m_bwAlloc / (1024.0 * 1024.0)),
- G_inkPipeInfo.m_perPipeInfo[i].m_wt, bw / totalBw);
+ i, bw, (G_inkPipeInfo.perPipeInfo[i].bwAlloc / (1024.0 * 1024.0)),
+ G_inkPipeInfo.perPipeInfo[i].wt, bw / totalBw);
p1 += strlen(p1);
- ip = (unsigned char *) &(G_inkPipeInfo.m_perPipeInfo[i].m_destIP);
+ ip = (unsigned char *) &(G_inkPipeInfo.perPipeInfo[i].destIP);
#if 0
if (i == 0)
- sprintf(bwMessage, "%d mixt Best-Effort %f %f\n", time(0), bw, bw / G_inkPipeInfo.m_interfaceMbps);
+ sprintf(bwMessage, "%d mixt Best-Effort %f %f\n", time(0), bw, bw / G_inkPipeInfo.interfaceMbps);
else
sprintf(bwMessage, "%d mixt %d.%d.%d.%d %f %f\n",
- time(0), ip[0], ip[1], ip[2], ip[3], bw, bw / G_inkPipeInfo.m_interfaceMbps);
+ time(0), ip[0], ip[1], ip[2], ip[3], bw, bw / G_inkPipeInfo.interfaceMbps);
::sendto(G_bwGrapherFd, bwMessage, strlen(bwMessage), 0,
(struct sockaddr *) &G_bwGrapherLoc, sizeof(struct sockaddr_in));
#endif
// use a weighted estimator of current usage
- G_inkPipeInfo.m_perPipeInfo[i].m_bwUsed = (4.0 * G_inkPipeInfo.m_perPipeInfo[i].m_bwUsed / 5.0) + (bw / 5.0);
- G_inkPipeInfo.m_perPipeInfo[i].m_bytesSent = 0;
- G_inkPipeInfo.m_perPipeInfo[i].m_pktsSent = 0;
+ G_inkPipeInfo.perPipeInfo[i].bwUsed = (4.0 * G_inkPipeInfo.perPipeInfo[i].bwUsed / 5.0) + (bw / 5.0);
+ G_inkPipeInfo.perPipeInfo[i].bytesSent = 0;
+ G_inkPipeInfo.perPipeInfo[i].pktsSent = 0;
}
if (temp[0])
Debug("udpnet-bw", "B/w: %f Mbps; breakdown: \n%s", totalBw, temp);
- m_bytesSent = 0;
- m_last_report = now;
- m_added = 0;
- m_packets = 0;
+ bytesSent = 0;
+ last_report = now;
+ added = 0;
+ packets = 0;
}
- m_last_service = now;
+ last_service = now;
}
void
@@ -1075,28 +1040,28 @@
ink32 pktLen;
ink_hrtime timeDelta = 0;
- if (now > m_last_service)
- timeDelta = ink_hrtime_to_msec(now - m_last_service);
+ if (now > last_service)
+ timeDelta = ink_hrtime_to_msec(now - last_service);
- if (G_inkPipeInfo.m_numPipes > 0) {
- bytesThisSlot = (ink32) (((G_inkPipeInfo.m_reliabilityMbps * 1024.0 * 1024.0) / (8.0 * 1000.0)) * timeDelta);
+ if (G_inkPipeInfo.numPipes > 0) {
+ bytesThisSlot = (ink32) (((G_inkPipeInfo.reliabilityMbps * 1024.0 * 1024.0) / (8.0 * 1000.0)) * timeDelta);
if (bytesThisSlot == 0) {
// use at most 10% for reliability
- bytesThisSlot = (ink32) (((G_inkPipeInfo.m_interfaceMbps * 1024.0 * 1024.0) / (8.0 * 1000.0)) * timeDelta * 0.1);
+ bytesThisSlot = (ink32) (((G_inkPipeInfo.interfaceMbps * 1024.0 * 1024.0) / (8.0 * 1000.0)) * timeDelta * 0.1);
reliabilityBytes = bytesThisSlot;
}
}
- while ((p = m_reliabilityPktQueue.dequeue()) != NULL) {
+ while ((p = reliabilityPktQueue.dequeue()) != NULL) {
pktLen = p->getPktLength();
g_udp_bytesPending -= pktLen;
- p->m_conn->m_nBytesTodo -= pktLen;
- p->m_conn->m_nBytesDone += pktLen;
+ p->conn->nBytesTodo -= pktLen;
+ p->conn->nBytesDone += pktLen;
- if (p->m_conn->shouldDestroy())
+ if (p->conn->shouldDestroy())
goto next_pkt_3;
- if (p->m_conn->GetSendGenerationNumber() != p->m_reqGenerationNum)
+ if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum)
goto next_pkt_3;
SendUDPPacket(p, pktLen);
@@ -1108,8 +1073,8 @@
}
- if (G_inkPipeInfo.m_numPipes > 0)
- bytesThisSlot = (ink32) (((G_inkPipeInfo.m_interfaceMbps * 1024.0 * 1024.0) /
+ if (G_inkPipeInfo.numPipes > 0)
+ bytesThisSlot = (ink32) (((G_inkPipeInfo.interfaceMbps * 1024.0 * 1024.0) /
(8.0 * 1000.0)) * timeDelta - reliabilityBytes);
else
bytesThisSlot = INT_MAX;
@@ -1117,21 +1082,21 @@
sendPackets:
sentOne = false;
send_threshold_time = now + SLOT_TIME;
- for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++) {
- bytesThisPipe = (ink32) (bytesThisSlot * G_inkPipeInfo.m_perPipeInfo[i].m_wt);
- while ((bytesThisPipe > 0) && (G_inkPipeInfo.m_perPipeInfo[i].m_queue->firstPacket(send_threshold_time))) {
- p = G_inkPipeInfo.m_perPipeInfo[i].m_queue->getFirstPacket();
+ for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
+ bytesThisPipe = (ink32) (bytesThisSlot * G_inkPipeInfo.perPipeInfo[i].wt);
+ while ((bytesThisPipe > 0) && (G_inkPipeInfo.perPipeInfo[i].queue->firstPacket(send_threshold_time))) {
+ p = G_inkPipeInfo.perPipeInfo[i].queue->getFirstPacket();
pktLen = p->getPktLength();
g_udp_bytesPending -= pktLen;
- p->m_conn->m_nBytesTodo -= pktLen;
- p->m_conn->m_nBytesDone += pktLen;
- if (p->m_conn->shouldDestroy())
+ p->conn->nBytesTodo -= pktLen;
+ p->conn->nBytesDone += pktLen;
+ if (p->conn->shouldDestroy())
goto next_pkt;
- if (p->m_conn->GetSendGenerationNumber() != p->m_reqGenerationNum)
+ if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum)
goto next_pkt;
- G_inkPipeInfo.m_perPipeInfo[i].m_bytesSent += pktLen;
+ G_inkPipeInfo.perPipeInfo[i].bytesSent += pktLen;
SendUDPPacket(p, pktLen);
bytesUsed += pktLen;
bytesThisPipe -= pktLen;
@@ -1149,9 +1114,9 @@
if ((bytesThisSlot > 0) && (sentOne)) {
// redistribute the slack...
now = ink_get_hrtime_internal();
- for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++) {
- if (G_inkPipeInfo.m_perPipeInfo[i].m_queue->firstPacket(now) == NULL) {
- G_inkPipeInfo.m_perPipeInfo[i].m_queue->advanceNow(now);
+ for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
+ if (G_inkPipeInfo.perPipeInfo[i].queue->firstPacket(now) == NULL) {
+ G_inkPipeInfo.perPipeInfo[i].queue->advanceNow(now);
}
}
goto sendPackets;
@@ -1161,8 +1126,8 @@
(now - lastCleanupTime > ink_hrtime_from_sec(g_udp_periodicFreeCancelledPkts))) {
inku64 nbytes = g_udp_bytesPending;
ink_hrtime startTime = ink_get_hrtime_internal(), endTime;
- for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++) {
- G_inkPipeInfo.m_perPipeInfo[i].m_queue->FreeCancelledPackets(g_udp_periodicCleanupSlots);
+ for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
+ G_inkPipeInfo.perPipeInfo[i].queue->FreeCancelledPackets(g_udp_periodicCleanupSlots);
}
endTime = ink_get_hrtime_internal();
Debug("udp-pending-packets", "Did cleanup of %d buckets: %lld bytes in %d m.sec",
@@ -1180,20 +1145,20 @@
int real_len = 0;
int n, count, iov_len = 0;
- if (!p->m_isReliabilityPkt) {
- p->m_conn->SetLastSentPktTSSeqNum(p->m_pktTSSeqNum);
- p->m_conn->m_lastSentPktStartTime = p->m_delivery_time;
+ if (!p->isReliabilityPkt) {
+ p->conn->SetLastSentPktTSSeqNum(p->pktTSSeqNum);
+ p->conn->lastSentPktStartTime = p->delivery_time;
}
Debug("udp-send", "Sending 0x%x", p);
msg.msg_control = 0;
msg.msg_controllen = 0;
msg.msg_flags = 0;
- msg.msg_name = (caddr_t) & p->m_to;
- msg.msg_namelen = sizeof(p->m_to);
+ msg.msg_name = (caddr_t) & p->to;
+ msg.msg_namelen = sizeof(p->to);
iov_len = 0;
- m_bytesSent += pktLen;
- for (b = p->m_chain; b != NULL; b = b->next) {
+ bytesSent += pktLen;
+ for (b = p->chain; b != NULL; b = b->next) {
iov[iov_len].iov_base = (caddr_t) b->start();
iov[iov_len].iov_len = b->size();
real_len += iov[iov_len].iov_len;
@@ -1205,7 +1170,7 @@
count = 0;
while (1) {
// stupid Linux problem: sendmsg can return EAGAIN
- n =::sendmsg(p->m_conn->getFd(), &msg, 0);
+ n =::sendmsg(p->conn->getFd(), &msg, 0);
if ((n >= 0) || ((n < 0) && (errno != EAGAIN)))
// send succeeded or some random error happened.
break;
@@ -1235,13 +1200,13 @@
ink_hrtime now = ink_get_hrtime_internal();
ink_hrtime send_threshold_time = now + SLOT_TIME;
- for (int i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++) {
- while (p = G_inkPipeInfo.m_perPipeInfo[i].m_queue->firstPacket(send_threshold_time)) {
- p = G_inkPipeInfo.m_perPipeInfo[i].m_queue->getFirstPacket();
+ for (int i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
+ while (p = G_inkPipeInfo.perPipeInfo[i].queue->firstPacket(send_threshold_time)) {
+ p = G_inkPipeInfo.perPipeInfo[i].queue->getFirstPacket();
sentOne = true;
Debug("bulk-io-pkt", "Adding a packet...");
- BulkIOAddPkt(G_bulkIOState, &G_bulkIOAggregator, p, p->m_conn->getPortNum());
- m_bytesSent += p->getPktLength();
+ BulkIOAddPkt(G_bulkIOState, &G_bulkIOAggregator, p, p->conn->getPortNum());
+ bytesSent += p->getPktLength();
// Now the packet is "sent"; get rid of it
p->free();
}
@@ -1256,34 +1221,16 @@
UDPQueue::send(UDPPacket * p)
{
// XXX: maybe fastpath for immediate send?
-// Debug("udpnet","add packet %x, delivery=%lld",p,p->m_delivery_time);
- ink_atomiclist_push(m_atomicQueue, p);
- //m_queue->addPacket(p,ink_get_hrtime());
+ ink_atomiclist_push(&atomicQueue, p);
}
#undef LINK
UDPNetHandler::UDPNetHandler()
{
-
mutex = new_ProxyMutex();
-
- //pollDescriptor = (PollDescriptor*)xmalloc(sizeof(PollDescriptor));
- //pollDescriptor->init();
- udpConnections = (UnixUDPConnection **) xmalloc(sizeof(UnixUDPConnection *) * MAX_UDP_CONNECTION);
- int i;
- for (i = 0; i < MAX_UDP_CONNECTION; i++) {
- udpConnections[i] = NULL;
- }
-
- ink_atomiclist_init(&udpAtomicQueue, "Outgoing UDP Packet queue", (uintptr_t) &((UDPPacketInternal *) 0)->alink.next);
-
- ink_atomiclist_init(&udpNewConnections, "UDP Connection queue", (uintptr_t)
- &((UnixUDPConnection *) 0)->newconn_alink.next);
-
- udpOutQueue = NEW(new UDPQueue(&udpAtomicQueue));
- udp_polling = NEW(new Queue<UnixUDPConnection> ());
- udp_callbacks = NEW(new Queue<UnixUDPConnection> ());
+ ink_atomiclist_init(&udpOutQueue.atomicQueue, "Outgoing UDP Packet queue", offsetof(UDPPacketInternal, alink.next));
+ ink_atomiclist_init(&udpNewConnections, "UDP Connection queue", offsetof(UnixUDPConnection, newconn_alink.next));
nextCheck = ink_get_hrtime_internal() + HRTIME_MSECONDS(1000);
lastCheck = 0;
SET_HANDLER((UDPNetContHandler) & UDPNetHandler::startNetEvent);
@@ -1294,23 +1241,18 @@
{
(void) event;
SET_HANDLER((UDPNetContHandler) & UDPNetHandler::mainNetEvent);
- e->schedule_every(-HRTIME_MSECONDS(9));
trigger_event = e;
+ e->schedule_every(-HRTIME_MSECONDS(9));
return EVENT_CONT;
}
+
inline PollDescriptor *
UDPNetHandler::build_one_udpread_poll(int fd, UnixUDPConnection * uc, PollDescriptor * pd)
{
- int i;
-
// XXX: just hack until figure things out
ink_assert(uc->getFd() > 0);
- Pollfd *pfd;
- pfd = pd->alloc();
+ Pollfd *pfd = pd->alloc();
pfd->fd = fd;
- i = pfd - pd->pfd;
- uc->setPollvecIndex(i);
- udpConnections[i] = uc;
pfd->events = POLLIN;
pfd->revents = 0;
return pd;
@@ -1320,10 +1262,9 @@
UDPNetHandler::build_poll(PollDescriptor * pd)
{
// build read poll for UDP connections.
- UnixUDPConnection *uc;
ink_assert(pd->empty());
int i = 0;
- for (uc = udp_polling->head; uc; uc = uc->polling_link.next) {
+ forl_LL(UnixUDPConnection, uc, udp_polling) {
if (uc->recvActive) {
pd = build_one_udpread_poll(uc->getFd(), uc, pd);
i++;
@@ -1344,46 +1285,14 @@
PollCont *pc = get_UDPPollCont(e->ethread);
- //changed by YTS Team, yamsat
- // pick up new UDP connections for servicing
- /* if (!INK_ATOMICLIST_EMPTY(udpNewConnections)) {
- UnixUDPConnection *c =
- (UnixUDPConnection *)ink_atomiclist_popall(&udpNewConnections);
- if (c) {
- UnixUDPConnection *cnext = NULL;
- while (c) {
- if (c->shouldDestroy()) {
- cnext = c->newconn_alink.next;
- c->newconn_alink.next = NULL;
- if (G_inkPipeInfo.m_numPipes > 0)
- G_inkPipeInfo.m_perPipeInfo[c->m_pipe_class].m_count--;
- c->Release();
- c = cnext;
- continue;
- }
- ink_assert(!c->mutex == !c->continuation);
- cnext = c->newconn_alink.next;
- c->newconn_alink.next = NULL;
- ink_assert(c->polling_link.next ==NULL);
- ink_assert(c->polling_link.prev ==NULL);
- ink_assert(c->m_ethread == trigger_event->ethread);
- c->setEthread(trigger_event->ethread);
- //udp_polling->enqueue(c,c->polling_link);
- c = cnext;
- }
- }
- } */
-
// handle UDP outgoing engine
- udpOutQueue->service(this);
+ udpOutQueue.service(this);
// handle UDP read operations
UnixUDPConnection *uc, *next;
int i;
int nread = 0;
-//epoll changes
- //changed by YTS Team, yamsat
struct epoll_data_ptr *temp_eptr = NULL;
for (i = 0; i < pc->pollDescriptor->result; i++) {
temp_eptr = (struct epoll_data_ptr *) get_ev_data(pc->pollDescriptor,i);
@@ -1391,7 +1300,7 @@
&& temp_eptr->type == EPOLL_UDP_CONNECTION) {
uc = temp_eptr->data.uc;
ink_assert(uc && uc->mutex && uc->continuation);
- ink_assert(uc->m_refcount >= 1);
+ ink_assert(uc->refcount >= 1);
if (uc->shouldDestroy()) {
// udp_polling->remove(uc,uc->polling_link);
uc->Release();
@@ -1402,34 +1311,16 @@
} //if EPOLLIN
} //end for
-//epoll changes ends here
-
-/*
- for (i = 0; i <pc->pollDescriptor->nfds; i++) {
- if (pc->pollDescriptor->pfd[i].revents & POLLIN) {
- uc = udpConnections[i];
- ink_assert(uc && uc->mutex && uc->continuation);
- ink_assert(uc->m_refcount >= 1);
- if (uc->shouldDestroy()) {
- udp_polling->remove(uc,uc->polling_link);
- uc->Release();
- } else {
- udpNetInternal.udp_read_from_net(this,uc,pc->pollDescriptor,trigger_event->ethread);
- nread++;
- }
- }
- } */
-
// remove dead UDP connections
ink_hrtime now = ink_get_hrtime_internal();
if (now >= nextCheck) {
- for (uc = udp_polling->head; uc; uc = next) {
+ for (uc = udp_polling.head; uc; uc = next) {
ink_assert(uc->mutex && uc->continuation);
- ink_assert(uc->m_refcount >= 1);
+ ink_assert(uc->refcount >= 1);
next = uc->polling_link.next;
if (uc->shouldDestroy()) {
- if (G_inkPipeInfo.m_numPipes > 0)
- G_inkPipeInfo.m_perPipeInfo[uc->m_pipe_class].m_count--;
+ if (G_inkPipeInfo.numPipes > 0)
+ G_inkPipeInfo.perPipeInfo[uc->pipe_class].count--;
//changed by YTS Team, yamsat
//udp_polling->remove(uc,uc->polling_link);
uc->Release();
@@ -1438,15 +1329,15 @@
nextCheck = ink_get_hrtime_internal() + HRTIME_MSECONDS(1000);
}
// service UDPConnections with data ready for callback.
- Queue<UnixUDPConnection> q = *udp_callbacks;
- udp_callbacks->clear();
- while ((uc = q.dequeue(q.head, q.head->callback_link))) {
+ Que(UnixUDPConnection, callback_link) q = udp_callbacks;
+ udp_callbacks.clear();
+ while ((uc = q.dequeue())) {
ink_assert(uc->mutex && uc->continuation);
if (udpNetInternal.udp_callback(this, uc, trigger_event->ethread)) { // not successful
// schedule on a thread of its own.
ink_assert(uc->callback_link.next == NULL);
ink_assert(uc->callback_link.prev == NULL);
- udp_callbacks->enqueue(uc, uc->callback_link);
+ udp_callbacks.enqueue(uc);
} else {
ink_assert(uc->callback_link.next == NULL);
ink_assert(uc->callback_link.prev == NULL);
@@ -1455,7 +1346,6 @@
}
}
- //changed by YTS Team, yamsat
return EVENT_CONT;
}
@@ -1471,14 +1361,14 @@
unsigned int my_ip, unsigned int dest_ip, int s_bufsize, int r_bufsize)
{
mutex = c->mutex;
- m_cont = c;
- m_action = c;
- m_numPairs = numPairs;
- m_myIP = my_ip;
- m_destIP = dest_ip;
- m_sendbufsize = s_bufsize;
- m_recvbufsize = r_bufsize;
- m_udpConns = NULL;
+ cont = c;
+ action = c;
+ numPairs = numPairs;
+ myIP = my_ip;
+ destIP = dest_ip;
+ sendbufsize = s_bufsize;
+ recvbufsize = r_bufsize;
+ udpConns = NULL;
SET_HANDLER((UDPWorkContinuation_Handler) & UDPWorkContinuation::StateCreatePortPairs);
}
@@ -1486,7 +1376,7 @@
UDPWorkContinuation::StateCreatePortPairs(int event, void *data)
{
// int res = 0;
- int numUdpPorts = 2 * m_numPairs;
+ int numUdpPorts = 2 * numPairs;
int fd1 = -1, fd2 = -1;
// struct sockaddr_in bind_sa;
struct sockaddr_in myaddr1, myaddr2;
@@ -1497,14 +1387,12 @@
Action *status;
//epoll changes
- //added by YTS Team, yamsat
- struct epoll_data_ptr *eptr = NULL;
PollCont *pc = NULL;
//epoll changes ends here
ink_debug_assert(mutex->thread_holding == this_ethread());
- if (m_action.cancelled) {
- m_action = NULL;
+ if (action.cancelled) {
+ action = NULL;
mutex = NULL;
udpWorkContinuationAllocator.free(this);
return EVENT_CONT;
@@ -1512,9 +1400,9 @@
startTime = ink_get_hrtime_internal();
- m_udpConns = NEW(new UnixUDPConnection *[numUdpPorts]);
+ udpConns = NEW(new UnixUDPConnection *[numUdpPorts]);
for (i = 0; i < numUdpPorts; i++)
- m_udpConns[i] = NULL;
+ udpConns[i] = NULL;
ink_atomic_swap(&portNum, lastAllocPort);
portNum %= 50000;
if (portNum == 0)
@@ -1523,13 +1411,13 @@
i = 0;
while (i < numUdpPorts) {
- if (udpNet.CreateUDPSocket(&fd1, &myaddr1, &status, portNum, m_myIP, m_sendbufsize, m_recvbufsize)) {
- if (udpNet.CreateUDPSocket(&fd2, &myaddr2, &status, portNum + 1, m_myIP, m_sendbufsize, m_recvbufsize)) {
- m_udpConns[i] = NEW(new UnixUDPConnection(fd1)); // new_UnixUDPConnection(fd1);
- m_udpConns[i]->setBinding(&myaddr1);
+ if (udpNet.CreateUDPSocket(&fd1, &myaddr1, &status, portNum, myIP, sendbufsize, recvbufsize)) {
+ if (udpNet.CreateUDPSocket(&fd2, &myaddr2, &status, portNum + 1, myIP, sendbufsize, recvbufsize)) {
+ udpConns[i] = NEW(new UnixUDPConnection(fd1)); // new_UnixUDPConnection(fd1);
+ udpConns[i]->setBinding(&myaddr1);
i++;
- m_udpConns[i] = NEW(new UnixUDPConnection(fd2)); // new_UnixUDPConnection(fd2);
- m_udpConns[i]->setBinding(&myaddr2);
+ udpConns[i] = NEW(new UnixUDPConnection(fd2)); // new_UnixUDPConnection(fd2);
+ udpConns[i]->setBinding(&myaddr2);
i++;
// remember the last alloc'ed port
ink_atomic_swap(&lastAllocPort, portNum + 2);
@@ -1558,21 +1446,18 @@
}
for (i = 0; i < numUdpPorts; i++) {
- udpNet.UDPClassifyConnection(m_udpConns[i], m_destIP);
+ udpNet.UDPClassifyConnection(udpConns[i], destIP);
Debug("udpnet-pipe", "Adding (port = %d) to Pipe class: %d",
- m_udpConns[i]->getPortNum(), m_udpConns[i]->m_pipe_class);
+ udpConns[i]->getPortNum(), udpConns[i]->pipe_class);
}
// assert should *never* fire; we check for this at the begin of the func.
- ink_assert(!m_action.cancelled);
+ ink_assert(!action.cancelled);
// Bind to threads only on a success. Currently, after you have
// bound to have a thread, the only way to remove a UDPConnection is
// to call destroy(); the thread to which the UDPConnection will
// remove the connection from a linked list and call delete.
- //struct epoll_event ev;
- //struct epoll_data_ptr *eptr=NULL;
- //PollCont * pc = NULL;
#if defined(USE_EPOLL)
struct epoll_event ev;
@@ -1583,22 +1468,19 @@
#endif
//changed by YTS Team, yamsat
for (i = 0; i < numUdpPorts; i++) {
- m_udpConns[i]->bindToThread(m_cont);
+ udpConns[i]->bindToThread(cont);
//epoll changes
- pc = get_UDPPollCont(m_udpConns[i]->m_ethread);
- eptr = (struct epoll_data_ptr *) malloc(sizeof(struct epoll_data_ptr));
-
- eptr->type = 5; //UDP
- eptr->data.uc = m_udpConns[i];
- m_udpConns[i]->eptr = eptr;
+ pc = get_UDPPollCont(udpConns[i]->ethread);
+ udpConns[i]->ep.type = 5; //UDP
+ udpConns[i]->ep.data.uc = udpConns[i];
#if defined(USE_EPOLL)
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN | EPOLLET;
- ev.data.ptr = eptr;
- epoll_ctl(pc->pollDescriptor->epoll_fd, EPOLL_CTL_ADD, m_udpConns[i]->getFd(), &ev);
+ ev.data.ptr = &udpConns[i]->ep;
+ epoll_ctl(pc->pollDescriptor->epoll_fd, EPOLL_CTL_ADD, udpConns[i]->getFd(), &ev);
#elif defined(USE_KQUEUE)
- EV_SET(&ev, m_udpConns[i]->getFd(), EVFILT_READ, EV_ADD, 0, 0, eptr);
+ EV_SET(&ev, udpConns[i]->getFd(), EVFILT_READ, EV_ADD, 0, 0, &udpConns[i].ep);
kevent(pc->pollDescriptor->kqueue_fd, &ev, 1, NULL, 0, NULL);
#else
#error port me
@@ -1606,16 +1488,16 @@
//epoll changes ends here
} //for
- m_resultCode = NET_EVENT_DATAGRAM_OPEN;
+ resultCode = NET_EVENT_DATAGRAM_OPEN;
goto out;
Lerror:
- m_resultCode = NET_EVENT_DATAGRAM_ERROR;
+ resultCode = NET_EVENT_DATAGRAM_ERROR;
for (i = 0; i < numUdpPorts; i++) {
- delete m_udpConns[i];
+ delete udpConns[i];
}
- delete[]m_udpConns;
- m_udpConns = NULL;
+ delete[]udpConns;
+ udpConns = NULL;
out:
SET_HANDLER((UDPWorkContinuation_Handler) & UDPWorkContinuation::StateDoCallback);
@@ -1625,26 +1507,26 @@
int
UDPWorkContinuation::StateDoCallback(int event, void *data)
{
- MUTEX_TRY_LOCK(lock, m_action.mutex, this_ethread());
+ MUTEX_TRY_LOCK(lock, action.mutex, this_ethread());
if (!lock) {
this_ethread()->schedule_in(this, MUTEX_RETRY_DELAY);
return EVENT_CONT;
}
- if (!m_action.cancelled) {
- m_action.continuation->handleEvent(m_resultCode, m_udpConns);
+ if (!action.cancelled) {
+ action.continuation->handleEvent(resultCode, udpConns);
} else {
- // else m_action.cancelled
- if (m_resultCode == NET_EVENT_DATAGRAM_OPEN) {
- for (int i = 0; i < m_numPairs * 2; i++)
+ // else action.cancelled
+ if (resultCode == NET_EVENT_DATAGRAM_OPEN) {
+ for (int i = 0; i < numPairs * 2; i++)
// don't call delete on individual connections; the udp thread will do
// that when it cleans up an fd.
- m_udpConns[i]->destroy();
- delete[]m_udpConns; // I think this is OK to delete the array, what we shouldn't do is loop over
- m_udpConns = NULL; // the conns and and do delete m_udpConns[i].
+ udpConns[i]->destroy();
+ delete[]udpConns; // I think this is OK to delete the array, what we shouldn't do is loop over
+ udpConns = NULL; // the conns and and do delete udpConns[i].
}
}
- m_action = NULL;
+ action = NULL;
mutex = NULL;
udpWorkContinuationAllocator.free(this);
Modified: incubator/trafficserver/traffic/trunk/proxy/CoreUtils.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/proxy/CoreUtils.cc?rev=889011&r1=889010&r2=889011&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/proxy/CoreUtils.cc (original)
+++ incubator/trafficserver/traffic/trunk/proxy/CoreUtils.cc Wed Dec 9 23:01:29 2009
@@ -1143,7 +1143,7 @@
print_netstate(NetState * n)
{
// These might not be 64-bit correct. /leif
- printf(" enabled: %d priority: %d\n", n->enabled, n->priority);
+ printf(" enabled: %d\n", n->enabled);
printf(" op: %d _cont: 0x%p\n", n->vio.op, n->vio._cont);
printf(" nbytes: %d done: %d\n", n->vio.nbytes, n->vio.ndone);
printf(" vc_server: 0x%p mutex: 0x%p\n\n", n->vio.vc_server, n->vio.mutex.m_ptr);
Modified: incubator/trafficserver/traffic/trunk/proxy/InkIOCoreAPI.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/proxy/InkIOCoreAPI.cc?rev=889011&r1=889010&r2=889011&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/proxy/InkIOCoreAPI.cc (original)
+++ incubator/trafficserver/traffic/trunk/proxy/InkIOCoreAPI.cc Wed Dec 9 23:01:29 2009
@@ -400,9 +400,9 @@
UDPPacket *packet = new_UDPPacket();
UDPConnection *conn = (UDPConnection *) udp;
- packet->m_to.sin_family = PF_INET;
- packet->m_to.sin_port = htons(port);
- packet->m_to.sin_addr.s_addr = ip;
+ packet->to.sin_family = PF_INET;
+ packet->to.sin_port = htons(port);
+ packet->to.sin_addr.s_addr = ip;
IOBufferBlock *blockp = new_IOBufferBlock();
blockp->alloc(BUFFER_SIZE_INDEX_32K);
@@ -458,14 +458,14 @@
INKUDPPacketFromAddressGet(INKUDPPacket packet)
{
UDPPacket *p = (UDPPacket *) packet;
- return (p->m_from.sin_addr.s_addr);
+ return (p->from.sin_addr.s_addr);
}
int
INKUDPPacketFromPortGet(INKUDPPacket packet)
{
UDPPacket *p = (UDPPacket *) packet;
- return (ntohs(p->m_from.sin_port));
+ return (ntohs(p->from.sin_port));
}
INKUDPConn
Modified: incubator/trafficserver/traffic/trunk/proxy/Main.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/proxy/Main.cc?rev=889011&r1=889010&r2=889011&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/proxy/Main.cc (original)
+++ incubator/trafficserver/traffic/trunk/proxy/Main.cc Wed Dec 9 23:01:29 2009
@@ -1991,10 +1991,10 @@
// file doesn't exist
if (node->getNodeName() == NULL) {
// alloc 1-elt array to store stuff for best-effort traffic
- G_inkPipeInfo.m_perPipeInfo = NEW(new InkSinglePipeInfo[1]);
- G_inkPipeInfo.m_perPipeInfo[0].m_wt = 1.0;
- G_inkPipeInfo.m_numPipes = 0;
- G_inkPipeInfo.m_interfaceMbps = 0.0;
+ G_inkPipeInfo.perPipeInfo = NEW(new InkSinglePipeInfo[1]);
+ G_inkPipeInfo.perPipeInfo[0].wt = 1.0;
+ G_inkPipeInfo.numPipes = 0;
+ G_inkPipeInfo.interfaceMbps = 0.0;
return true;
}
@@ -2002,24 +2002,24 @@
Debug("bw-mgmt", "Root node should be an interface tag!\n");
return false;
}
- // First entry G_inkPipeInfo.m_perPipeInfo[0] is the one for "best-effort" traffic.
- G_inkPipeInfo.m_perPipeInfo = NEW(new InkSinglePipeInfo[node->getChildCount() + 1]);
- G_inkPipeInfo.m_perPipeInfo[0].m_wt = 1.0;
- G_inkPipeInfo.m_numPipes = 0;
- G_inkPipeInfo.m_reliabilityMbps = 1.0;
- G_inkPipeInfo.m_interfaceMbps = 30.0;
+ // First entry G_inkPipeInfo.perPipeInfo[0] is the one for "best-effort" traffic.
+ G_inkPipeInfo.perPipeInfo = NEW(new InkSinglePipeInfo[node->getChildCount() + 1]);
+ G_inkPipeInfo.perPipeInfo[0].wt = 1.0;
+ G_inkPipeInfo.numPipes = 0;
+ G_inkPipeInfo.reliabilityMbps = 1.0;
+ G_inkPipeInfo.interfaceMbps = 30.0;
for (i = 0; i < node->getChildCount(); i++) {
if ((child = node->getChildNode(i))) {
if (strcmp(child->getNodeName(), "pipe") == 0) {
- G_inkPipeInfo.m_numPipes++;
+ G_inkPipeInfo.numPipes++;
for (k = 0; k < child->getChildCount(); k++) {
c2 = child->getChildNode(k);
for (int l = 0; l < c2->m_nACount; l++) {
if (strcmp(c2->m_pAList[l].pAName, "weight") == 0) {
- G_inkPipeInfo.m_perPipeInfo[G_inkPipeInfo.m_numPipes].m_wt = atof(c2->m_pAList[l].pAValue);
- G_inkPipeInfo.m_perPipeInfo[0].m_wt -= G_inkPipeInfo.m_perPipeInfo[G_inkPipeInfo.m_numPipes].m_wt;
+ G_inkPipeInfo.perPipeInfo[G_inkPipeInfo.numPipes].wt = atof(c2->m_pAList[l].pAValue);
+ G_inkPipeInfo.perPipeInfo[0].wt -= G_inkPipeInfo.perPipeInfo[G_inkPipeInfo.numPipes].wt;
} else if (strcmp(c2->m_pAList[l].pAName, "dest_ip") == 0) {
- p = (unsigned char *) &(G_inkPipeInfo.m_perPipeInfo[G_inkPipeInfo.m_numPipes].m_destIP);
+ p = (unsigned char *) &(G_inkPipeInfo.perPipeInfo[G_inkPipeInfo.numPipes].destIP);
ip = c2->m_pAList[l].pAValue;
for (j = 0; j < 4; j++) {
p[j] = atoi(ip);
@@ -2033,21 +2033,21 @@
} else if (strcmp(child->getNodeName(), "bandwidth") == 0) {
for (j = 0; j < child->m_nACount; j++) {
if (strcmp(child->m_pAList[j].pAName, "limit_mbps") == 0) {
- G_inkPipeInfo.m_interfaceMbps = atof(child->m_pAList[j].pAValue);
+ G_inkPipeInfo.interfaceMbps = atof(child->m_pAList[j].pAValue);
} else if (strcmp(child->m_pAList[j].pAName, "reliability_mbps") == 0) {
- G_inkPipeInfo.m_reliabilityMbps = atof(child->m_pAList[j].pAValue);
+ G_inkPipeInfo.reliabilityMbps = atof(child->m_pAList[j].pAValue);
}
}
}
}
}
- Debug("bw-mgmt", "Read in: limit_mbps = %lf\n", G_inkPipeInfo.m_interfaceMbps);
- for (i = 0; i < G_inkPipeInfo.m_numPipes + 1; i++) {
- G_inkPipeInfo.m_perPipeInfo[i].m_bwLimit =
- (ink64) (G_inkPipeInfo.m_perPipeInfo[i].m_wt * G_inkPipeInfo.m_interfaceMbps * 1024.0 * 1024.0);
- p = (unsigned char *) &(G_inkPipeInfo.m_perPipeInfo[i].m_destIP);
+ Debug("bw-mgmt", "Read in: limit_mbps = %lf\n", G_inkPipeInfo.interfaceMbps);
+ for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
+ G_inkPipeInfo.perPipeInfo[i].bwLimit =
+ (ink64) (G_inkPipeInfo.perPipeInfo[i].wt * G_inkPipeInfo.interfaceMbps * 1024.0 * 1024.0);
+ p = (unsigned char *) &(G_inkPipeInfo.perPipeInfo[i].destIP);
Debug("bw-mgmt", "Pipe [%d]: wt = %lf, dest ip = %d.%d.%d.%d\n",
- i, G_inkPipeInfo.m_perPipeInfo[i].m_wt, p[0], p[1], p[2], p[3]);
+ i, G_inkPipeInfo.perPipeInfo[i].wt, p[0], p[1], p[2], p[3]);
}
return true;
}