You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by zw...@apache.org on 2013/04/02 23:36:30 UTC
[13/15] git commit: TS-1067 More cleanup of unused code
TS-1067 More cleanup of unused code
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/7a57cd36
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/7a57cd36
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/7a57cd36
Branch: refs/heads/master
Commit: 7a57cd365b412a3043f7ccc12410860e2cc4d357
Parents: 02bd9a7
Author: Leif Hedstrom <zw...@apache.org>
Authored: Tue Mar 26 21:59:11 2013 -0700
Committer: Leif Hedstrom <zw...@apache.org>
Committed: Tue Apr 2 13:53:42 2013 -0600
----------------------------------------------------------------------
iocore/net/I_UDPPacket.h | 15 ++--
iocore/net/P_UDPNet.h | 4 -
iocore/net/P_UDPPacket.h | 35 +--------
iocore/net/UnixUDPNet.cc | 158 ++++++++++++++++-------------------------
4 files changed, 70 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7a57cd36/iocore/net/I_UDPPacket.h
----------------------------------------------------------------------
diff --git a/iocore/net/I_UDPPacket.h b/iocore/net/I_UDPPacket.h
index 09826dc..08dfa88 100644
--- a/iocore/net/I_UDPPacket.h
+++ b/iocore/net/I_UDPPacket.h
@@ -45,24 +45,22 @@ class UDPPacket
public:
- virtual ~ UDPPacket()
- {
- };
- virtual void free(); // fast deallocate
+ virtual ~UDPPacket()
+ { }
+ virtual void free(); // fast deallocate
void setContinuation(Continuation * c);
void setConnection(UDPConnection * c);
UDPConnection *getConnection();
- void setArrivalTime(ink_hrtime t);
IOBufferBlock *getIOBlockChain();
int64_t getPktLength();
+
/**
Add IOBufferBlock (chain) to end of packet.
@param block block chain to add.
*/
inkcoreapi void append_block(IOBufferBlock * block);
- virtual void UDPPacket_is_abstract() = 0;
IpEndpoint from; // what address came from
IpEndpoint to; // what address to send to
@@ -94,8 +92,9 @@ extern UDPPacket *new_UDPPacket(struct sockaddr const* to, ink_hrtime when = 0,
for packet
@param len # of bytes to reference from block
*/
-extern UDPPacket *new_UDPPacket(struct sockaddr const* to,
- ink_hrtime when = 0, IOBufferBlock * block = NULL, int len = 0);
+
+TS_INLINE UDPPacket *new_UDPPacket(struct sockaddr const* to, ink_hrtime when = 0,
+ IOBufferBlock * block = NULL, int len = 0);
/**
Create a new packet to be sent over UDPConnection. Packet has no
destination or data.
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7a57cd36/iocore/net/P_UDPNet.h
----------------------------------------------------------------------
diff --git a/iocore/net/P_UDPNet.h b/iocore/net/P_UDPNet.h
index 9c564f3..9cba1b3 100644
--- a/iocore/net/P_UDPNet.h
+++ b/iocore/net/P_UDPNet.h
@@ -70,7 +70,6 @@ public:
// Interface exported to the outside world
void send(UDPPacket * p);
- Queue<UDPPacketInternal> reliabilityPktQueue;
InkAtomicList atomicQueue;
ink_hrtime last_report;
ink_hrtime last_service;
@@ -81,8 +80,6 @@ public:
~UDPQueue();
};
-#ifdef PACKETQUEUE_IMPL_AS_RING
-
// 20 ms slots; 2048 slots => 40 sec. into the future
#define SLOT_TIME_MSEC 20
#define SLOT_TIME HRTIME_MSECONDS(SLOT_TIME_MSEC)
@@ -296,7 +293,6 @@ private:
void kill_cancelled_events()
{ }
};
-#endif
void initialize_thread_for_udp_net(EThread * thread);
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7a57cd36/iocore/net/P_UDPPacket.h
----------------------------------------------------------------------
diff --git a/iocore/net/P_UDPPacket.h b/iocore/net/P_UDPPacket.h
index a93f083..f56e0b9 100644
--- a/iocore/net/P_UDPPacket.h
+++ b/iocore/net/P_UDPPacket.h
@@ -26,7 +26,6 @@
P_UDPPacket.h
Implementation of UDPPacket
-
****************************************************************************/
@@ -35,9 +34,6 @@
#include "I_UDPNet.h"
-//#define PACKETQUEUE_IMPL_AS_PQLIST
-#define PACKETQUEUE_IMPL_AS_RING
-
class UDPPacketInternal:public UDPPacket
{
@@ -51,43 +47,32 @@ public:
SLINK(UDPPacketInternal, alink); // atomic link
// packet scheduling stuff: keep it a doubly linked list
- uint64_t pktSendStartTime;
- uint64_t pktSendFinishTime;
uint64_t pktLength;
int reqGenerationNum;
ink_hrtime delivery_time; // when to deliver packet
- ink_hrtime arrival_time; // when packet arrived
Ptr<IOBufferBlock> chain;
Continuation *cont; // callback on error
UDPConnectionInternal *conn; // connection where packet should be sent to.
-#if defined(PACKETQUEUE_IMPL_AS_PQLIST) || defined(PACKETQUEUE_IMPL_AS_RING)
int in_the_priority_queue;
int in_heap;
-#endif
-
- virtual void UDPPacket_is_abstract() { }
};
inkcoreapi extern ClassAllocator<UDPPacketInternal> udpPacketAllocator;
TS_INLINE
UDPPacketInternal::UDPPacketInternal()
- : pktSendStartTime(0), pktSendFinishTime(0), pktLength(0),
- reqGenerationNum(0), delivery_time(0), arrival_time(0), cont(NULL) , conn(NULL)
-#if defined(PACKETQUEUE_IMPL_AS_PQLIST) || defined(PACKETQUEUE_IMPL_AS_RING)
- ,in_the_priority_queue(0), in_heap(0)
-#endif
+ : pktLength(0), reqGenerationNum(0), delivery_time(0), cont(NULL),
+ conn(NULL), in_the_priority_queue(0), in_heap(0)
{
memset(&from, '\0', sizeof(from));
memset(&to, '\0', sizeof(to));
}
TS_INLINE
-UDPPacketInternal::~
-UDPPacketInternal()
+UDPPacketInternal::~UDPPacketInternal()
{
chain = NULL;
}
@@ -181,21 +166,13 @@ UDPPacket::getConnection(void)
return ((UDPPacketInternal *) this)->conn;
}
-TS_INLINE void
-UDPPacket::setArrivalTime(ink_hrtime t)
-{
- ((UDPPacketInternal *) this)->arrival_time = t;
-}
-
TS_INLINE UDPPacket *
new_UDPPacket(struct sockaddr const* to, ink_hrtime when, char *buf, int len)
{
UDPPacketInternal *p = udpPacketAllocator.alloc();
-#if defined(PACKETQUEUE_IMPL_AS_PQLIST) || defined(PACKETQUEUE_IMPL_AS_RING)
p->in_the_priority_queue = 0;
p->in_heap = 0;
-#endif
p->delivery_time = when;
ats_ip_copy(&p->to, to);
@@ -217,10 +194,8 @@ new_UDPPacket(struct sockaddr const* to, ink_hrtime when, IOBufferBlock * buf, i
UDPPacketInternal *p = udpPacketAllocator.alloc();
IOBufferBlock *body;
-#if defined(PACKETQUEUE_IMPL_AS_PQLIST) || defined(PACKETQUEUE_IMPL_AS_RING)
p->in_the_priority_queue = 0;
p->in_heap = 0;
-#endif
p->delivery_time = when;
ats_ip_copy(&p->to, to);
@@ -237,10 +212,8 @@ new_UDPPacket(struct sockaddr const* to, ink_hrtime when, Ptr<IOBufferBlock> buf
{
UDPPacketInternal *p = udpPacketAllocator.alloc();
-#if defined(PACKETQUEUE_IMPL_AS_PQLIST) || defined(PACKETQUEUE_IMPL_AS_RING)
p->in_the_priority_queue = 0;
p->in_heap = 0;
-#endif
p->delivery_time = when;
if (to)
ats_ip_copy(&p->to, to);
@@ -259,10 +232,8 @@ new_incoming_UDPPacket(struct sockaddr * from, char *buf, int len)
{
UDPPacketInternal *p = udpPacketAllocator.alloc();
-#if defined(PACKETQUEUE_IMPL_AS_PQLIST) || defined(PACKETQUEUE_IMPL_AS_RING)
p->in_the_priority_queue = 0;
p->in_heap = 0;
-#endif
p->delivery_time = 0;
ats_ip_copy(&p->from, from);
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7a57cd36/iocore/net/UnixUDPNet.cc
----------------------------------------------------------------------
diff --git a/iocore/net/UnixUDPNet.cc b/iocore/net/UnixUDPNet.cc
index a20c93f..fc9c31d 100644
--- a/iocore/net/UnixUDPNet.cc
+++ b/iocore/net/UnixUDPNet.cc
@@ -136,8 +136,6 @@ UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler * nh,
// create packet
UDPPacket *p = new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr), buf, r);
p->setConnection(uc);
- // XXX: is this expensive? I] really want to know this information
- p->setArrivalTime(ink_get_hrtime_internal());
// queue onto the UDPConnection
ink_atomiclist_push(&uc->inQueue, p);
iters++;
@@ -189,18 +187,21 @@ public:
~UDPReadContinuation();
inline void free(void);
inline void init_token(Event * completionToken);
+ inline void init_read(int fd, IOBufferBlock * buf, int len, struct sockaddr *fromaddr, socklen_t *fromaddrlen);
- inline void init_read(int fd, IOBufferBlock * buf, int len, struct sockaddr *fromaddr, socklen_t *fromaddrlen); // start up polling
void set_timer(int seconds)
{
timeout_interval = HRTIME_SECONDS(seconds);
}
+
void cancel();
int readPollEvent(int event, Event * e);
+
Action *getAction()
{
return event;
}
+
void setupPollDescriptor();
private:
@@ -222,8 +223,8 @@ ClassAllocator<UDPReadContinuation> udpReadContAllocator("udpReadContAllocator")
#define UNINITIALIZED_EVENT_PTR (Event *)0xdeadbeef
UDPReadContinuation::UDPReadContinuation(Event * completionToken)
-: Continuation(NULL), event(completionToken), readbuf(NULL),
- readlen(0), fromaddrlen(0), fd(-1), ifd(-1), period(0), elapsed_time(0), timeout_interval(0)
+ : Continuation(NULL), event(completionToken), readbuf(NULL), readlen(0), fromaddrlen(0), fd(-1),
+ ifd(-1), period(0), elapsed_time(0), timeout_interval(0)
{
if (completionToken->continuation)
this->mutex = completionToken->continuation->mutex;
@@ -232,10 +233,9 @@ UDPReadContinuation::UDPReadContinuation(Event * completionToken)
}
UDPReadContinuation::UDPReadContinuation()
-: Continuation(NULL), event(UNINITIALIZED_EVENT_PTR), readbuf(NULL),
- readlen(0), fromaddrlen(0), fd(-1), ifd(-1), period(0), elapsed_time(0), timeout_interval(0)
-{
-}
+ : Continuation(NULL), event(UNINITIALIZED_EVENT_PTR), readbuf(NULL), readlen(0), fromaddrlen(0), fd(-1),
+ ifd(-1), period(0), elapsed_time(0), timeout_interval(0)
+{ }
inline void
UDPReadContinuation::free(void)
@@ -345,40 +345,39 @@ UDPReadContinuation::readPollEvent(int event_, Event * e)
//ink_assert(ifd < 0 || event_ == EVENT_INTERVAL || (event_ == EVENT_POLL && pc->pollDescriptor->nfds > ifd && pc->pollDescriptor->pfd[ifd].fd == fd));
//if (ifd < 0 || event_ == EVENT_INTERVAL || (pc->pollDescriptor->pfd[ifd].revents & POLLIN)) {
ink_debug_assert(!"incomplete");
+ c = completionUtil::getContinuation(event);
+ // do read
+ socklen_t tmp_fromlen = *fromaddrlen;
+ int rlen = socketManager.recvfrom(fd, readbuf->end(), readlen, 0, ats_ip_sa_cast(fromaddr), &tmp_fromlen);
+
+ completionUtil::setThread(event, e->ethread);
+ // call back user with their event
+ if (rlen > 0) {
+ // do callback if read is successful
+ *fromaddrlen = tmp_fromlen;
+ completionUtil::setInfo(event, fd, readbuf, rlen, errno);
+ readbuf->fill(rlen);
+ // TODO: Should we deal with the return code?
+ c->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event);
+ e->cancel();
+ free();
+ // delete this;
+ return EVENT_DONE;
+ } else if (rlen < 0 && rlen != -EAGAIN) {
+ // signal error.
+ *fromaddrlen = tmp_fromlen;
+ completionUtil::setInfo(event, fd, (IOBufferBlock *) readbuf, rlen, errno);
c = completionUtil::getContinuation(event);
- // do read
- socklen_t tmp_fromlen = *fromaddrlen;
- int rlen = socketManager.recvfrom(fd, readbuf->end(), readlen,
- 0, // default flags
- ats_ip_sa_cast(fromaddr), &tmp_fromlen);
- completionUtil::setThread(event, e->ethread);
- // call back user with their event
- if (rlen > 0) {
- // do callback if read is successful
- *fromaddrlen = tmp_fromlen;
- completionUtil::setInfo(event, fd, readbuf, rlen, errno);
- readbuf->fill(rlen);
- // TODO: Should we deal with the return code?
- c->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event);
- e->cancel();
- free();
- // delete this;
- return EVENT_DONE;
- } else if (rlen < 0 && rlen != -EAGAIN) {
- // signal error.
- *fromaddrlen = tmp_fromlen;
- completionUtil::setInfo(event, fd, (IOBufferBlock *) readbuf, rlen, errno);
- c = completionUtil::getContinuation(event);
- // TODO: Should we deal with the return code?
- c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
- e->cancel();
- free();
- //delete this;
- return EVENT_DONE;
- } else {
- completionUtil::setThread(event, NULL);
- }
-//}
+ // TODO: Should we deal with the return code?
+ c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
+ e->cancel();
+ free();
+ //delete this;
+ return EVENT_DONE;
+ } else {
+ completionUtil::setThread(event, NULL);
+ }
+
if (event->cancelled) {
e->cancel();
free();
@@ -417,10 +416,11 @@ UDPNetProcessor::recvfrom_re(Continuation * cont,
ink_assert(buf->write_avail() >= len);
int actual;
Event *event = completionUtil::create();
+
completionUtil::setContinuation(event, cont);
completionUtil::setHandle(event, token);
- actual = socketManager.recvfrom(fd, buf->end(), len, 0, // default flags
- fromaddr, fromaddrlen);
+ actual = socketManager.recvfrom(fd, buf->end(), len, 0, fromaddr, fromaddrlen);
+
if (actual > 0) {
completionUtil::setThread(event, this_ethread());
completionUtil::setInfo(event, fd, buf, actual, errno);
@@ -458,6 +458,7 @@ UDPNetProcessor::sendmsg_re(Continuation * cont, void *token, int fd, struct msg
{
int actual;
Event *event = completionUtil::create();
+
completionUtil::setContinuation(event, cont);
completionUtil::setHandle(event, token);
@@ -489,13 +490,13 @@ UDPNetProcessor::sendmsg_re(Continuation * cont, void *token, int fd, struct msg
*
*/
Action *
-UDPNetProcessor::sendto_re(Continuation * cont,
- void *token, int fd, struct sockaddr const* toaddr, int toaddrlen, IOBufferBlock * buf, int len)
+UDPNetProcessor::sendto_re(Continuation * cont, void *token, int fd, struct sockaddr const* toaddr, int toaddrlen,
+ IOBufferBlock * buf, int len)
{
(void) token;
ink_assert(buf->read_avail() >= len);
- int nbytes_sent = socketManager.sendto(fd, buf->start(), len, 0,
- toaddr, toaddrlen);
+ int nbytes_sent = socketManager.sendto(fd, buf->start(), len, 0, toaddr, toaddrlen);
+
if (nbytes_sent >= 0) {
ink_assert(nbytes_sent == len);
buf->consume(nbytes_sent);
@@ -509,14 +510,9 @@ UDPNetProcessor::sendto_re(Continuation * cont,
bool
-UDPNetProcessor::CreateUDPSocket(
- int *resfd,
- sockaddr const* remote_addr,
- sockaddr* local_addr,
- int *local_addr_len,
- Action ** status,
- int send_bufsize, int recv_bufsize
-) {
+UDPNetProcessor::CreateUDPSocket(int *resfd, sockaddr const* remote_addr, sockaddr* local_addr,
+ int *local_addr_len, Action ** status, int send_bufsize, int recv_bufsize)
+{
int res = 0, fd = -1;
ink_assert(ats_ip_are_compatible(remote_addr, local_addr));
@@ -630,15 +626,10 @@ Lerror:
// send out all packets that need to be sent out as of time=now
UDPQueue::UDPQueue()
: last_report(0), last_service(0), packets(0), added(0)
-{
-}
+{ }
UDPQueue::~UDPQueue()
{
- UDPPacketInternal *p;
-
- while ((p = reliabilityPktQueue.dequeue()) != NULL)
- p->free();
}
/*
@@ -647,26 +638,18 @@ UDPQueue::~UDPQueue()
void
UDPQueue::service(UDPNetHandler * nh)
{
+ (void) nh;
ink_hrtime now = ink_get_hrtime_internal();
uint64_t timeSpent = 0;
+ uint64_t pktSendStartTime;
UDPPacketInternal *p;
ink_hrtime pktSendTime;
double minPktSpacing;
uint32_t pktSize;
int64_t pktLen;
- (void) nh;
- static ink_hrtime lastPrintTime = ink_get_hrtime_internal();
- static ink_hrtime lastSchedTime = ink_get_hrtime_internal();
- static uint32_t schedJitter = 0;
- static uint32_t numTimesSched = 0;
-
- schedJitter += ink_hrtime_to_msec(now - lastSchedTime);
- numTimesSched++;
-
p = (UDPPacketInternal *) ink_atomiclist_popall(&atomicQueue);
if (p) {
-
UDPPacketInternal *pnext = NULL;
Queue<UDPPacketInternal> stk;
@@ -676,6 +659,7 @@ UDPQueue::service(UDPNetHandler * nh)
stk.push(p);
p = pnext;
}
+
// walk backwards down list since this is actually an atomic stack.
while (stk.head) {
p = stk.pop();
@@ -685,27 +669,20 @@ UDPQueue::service(UDPNetHandler * nh)
Debug("udp-send", "Adding %p", p);
pktLen = p->getPktLength();
if (p->conn->lastPktStartTime == 0) {
- p->pktSendStartTime = MAX(now, p->delivery_time);
+ pktSendStartTime = MAX(now, p->delivery_time);
} else {
pktSize = MAX(INK_ETHERNET_MTU_SIZE, pktLen);
minPktSpacing = 0.0;
pktSendTime = p->delivery_time;
- p->pktSendStartTime = MAX(MAX(now, pktSendTime), p->delivery_time);
+ pktSendStartTime = MAX(MAX(now, pktSendTime), p->delivery_time);
}
- p->conn->lastPktStartTime = p->pktSendStartTime;
- p->delivery_time = p->pktSendStartTime;
+ p->conn->lastPktStartTime = pktSendStartTime;
+ p->delivery_time = pktSendStartTime;
G_inkPipeInfo.addPacket(p, now);
}
}
- if ((now - lastPrintTime) > ink_hrtime_from_sec(30)) {
- Debug("udp-sched-jitter", "avg. udp sched jitter: %f", (double) schedJitter / numTimesSched);
- schedJitter = 0;
- numTimesSched = 0;
- lastPrintTime = now;
- }
-
G_inkPipeInfo.advanceNow(now);
SendPackets();
@@ -733,21 +710,6 @@ UDPQueue::SendPackets()
if (now > last_service)
timeDelta = ink_hrtime_to_msec(now - last_service);
- while ((p = reliabilityPktQueue.dequeue()) != NULL) {
- pktLen = p->getPktLength();
- if (p->conn->shouldDestroy())
- goto next_pkt_3;
- if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum)
- goto next_pkt_3;
-
- SendUDPPacket(p, pktLen);
- bytesThisSlot -= pktLen;
- if (bytesThisSlot < 0)
- break;
- next_pkt_3:
- p->free();
- }
-
bytesThisSlot = INT_MAX;
sendPackets: