You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ok...@apache.org on 2017/10/31 04:12:19 UTC
[trafficserver] branch master updated: Rework the new udp connection
This is an automated email from the ASF dual-hosted git repository.
oknet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 83e50f5 Rework the new udp connection
83e50f5 is described below
commit 83e50f5dab49081a223cc4d01005e033a699897e
Author: Oknet Xu <xu...@skyguard.com.cn>
AuthorDate: Fri Oct 27 22:33:56 2017 +0800
Rework the new udp connection
The UDPNetHandler works like the NetHandler. But it is free lock to
push a UDPConnection into UDPNetHandler.
The UDPNetHandler gets new UDPConnection from newconn_list (an atomic
list) and push into open_list that saved all opend udp connections.
There is no InactivityCop for UDPConnection. The open_list is traversal
within UDPNetHandler::mainNetEvent() every second.
---
iocore/net/P_UDPNet.h | 25 ++++++-----
iocore/net/P_UnixUDPConnection.h | 3 +-
iocore/net/UnixUDPConnection.cc | 6 +--
iocore/net/UnixUDPNet.cc | 97 ++++++++++++++++++++++++++++------------
4 files changed, 86 insertions(+), 45 deletions(-)
diff --git a/iocore/net/P_UDPNet.h b/iocore/net/P_UDPNet.h
index f2f8c0f..509064e 100644
--- a/iocore/net/P_UDPNet.h
+++ b/iocore/net/P_UDPNet.h
@@ -31,6 +31,7 @@
#ifndef __P_UDPNET_H_
#define __P_UDPNET_H_
+#include "ts/ink_platform.h"
#include "I_UDPNet.h"
#include "P_UDPPacket.h"
@@ -40,7 +41,7 @@ static inline PollCont *get_UDPPollCont(EThread *);
#include "P_UnixUDPConnection.h"
#include "P_UDPIOEvent.h"
-struct UDPNetHandler;
+class UDPNetHandler;
struct UDPNetProcessorInternal : public UDPNetProcessor {
virtual int start(int n_udp_threads, size_t stacksize);
@@ -303,19 +304,21 @@ public:
void initialize_thread_for_udp_net(EThread *thread);
-struct UDPNetHandler : public Continuation {
+class UDPNetHandler : public Continuation
+{
public:
- // to be polled for read
- Que(UnixUDPConnection, polling_link) udp_polling;
+ // engine for outgoing packets
+ UDPQueue udpOutQueue{};
+
+ // New UDPConnections
+ // to hold the newly created descriptors before scheduling them on the servicing buckets.
+ // atomically added to by a thread creating a new connection with UDPBind
+ ASLL(UnixUDPConnection, newconn_alink) newconn_list;
+ // All opened UDPConnections
+ Que(UnixUDPConnection, link) open_list;
// to be called back with data
Que(UnixUDPConnection, callback_link) udp_callbacks;
- // outgoing packets
- UDPQueue udpOutQueue{};
- // to hold the newly created descriptors before scheduling them on
- // the servicing buckets.
- // atomically added to by a thread creating a new connection with
- // UDPBind
- InkAtomicList udpNewConnections;
+
Event *trigger_event = nullptr;
ink_hrtime nextCheck;
ink_hrtime lastCheck;
diff --git a/iocore/net/P_UnixUDPConnection.h b/iocore/net/P_UnixUDPConnection.h
index 29b6d82..4815f8a 100644
--- a/iocore/net/P_UnixUDPConnection.h
+++ b/iocore/net/P_UnixUDPConnection.h
@@ -42,9 +42,8 @@ public:
void errorAndDie(int e);
int callbackHandler(int event, void *data);
- LINK(UnixUDPConnection, polling_link);
- LINK(UnixUDPConnection, callback_link);
SLINK(UnixUDPConnection, newconn_alink);
+ LINK(UnixUDPConnection, callback_link);
// Incoming UDP Packet Queue
ASLL(UDPPacketInternal, alink) inQueue;
diff --git a/iocore/net/UnixUDPConnection.cc b/iocore/net/UnixUDPConnection.cc
index 9cbab85..4d1eaf3 100644
--- a/iocore/net/UnixUDPConnection.cc
+++ b/iocore/net/UnixUDPConnection.cc
@@ -109,7 +109,7 @@ UDPConnection::bindToThread(Continuation *c)
AddRef();
uc->continuation = c;
mutex = c->mutex;
- ink_atomiclist_push(&get_UDPNetHandler(t)->udpNewConnections, uc);
+ get_UDPNetHandler(t)->newconn_list.push(uc);
}
Action *
@@ -145,8 +145,8 @@ UDPConnection::Release()
ink_assert(p->callback_link.next == nullptr);
ink_assert(p->callback_link.prev == nullptr);
- ink_assert(p->polling_link.next == nullptr);
- ink_assert(p->polling_link.prev == nullptr);
+ ink_assert(p->link.next == nullptr);
+ ink_assert(p->link.prev == nullptr);
ink_assert(p->newconn_alink.next == nullptr);
delete this;
diff --git a/iocore/net/UnixUDPNet.cc b/iocore/net/UnixUDPNet.cc
index 008eafc..dcf3e7e 100644
--- a/iocore/net/UnixUDPNet.cc
+++ b/iocore/net/UnixUDPNet.cc
@@ -64,8 +64,13 @@ sockaddr_in6 G_bwGrapherLoc;
void
initialize_thread_for_udp_net(EThread *thread)
{
+ UDPNetHandler *nh = get_UDPNetHandler(thread);
+
+ new ((ink_dummy_for_new *)nh) UDPNetHandler;
new ((ink_dummy_for_new *)get_UDPPollCont(thread)) PollCont(thread->mutex);
- new ((ink_dummy_for_new *)get_UDPNetHandler(thread)) UDPNetHandler;
+ // The UDPNetHandler cannot be accessed across EThreads.
+ // Because the UDPNetHandler should be called back immediately after UDPPollCont.
+ nh->mutex = thread->mutex.get();
// This variable controls how often we cleanup the cancelled packets.
// If it is set to 0, then cleanup never occurs.
@@ -807,8 +812,6 @@ UDPQueue::send(UDPPacket *p)
UDPNetHandler::UDPNetHandler()
{
- mutex = new_ProxyMutex();
- ink_atomiclist_init(&udpNewConnections, "UDP Connection queue", offsetof(UnixUDPConnection, newconn_alink.next));
nextCheck = Thread::get_hrtime_updated() + HRTIME_MSECONDS(1000);
lastCheck = 0;
SET_HANDLER((UDPNetContHandler)&UDPNetHandler::startNetEvent);
@@ -831,44 +834,80 @@ UDPNetHandler::mainNetEvent(int event, Event *e)
(void)event;
(void)e;
- PollCont *pc = get_UDPPollCont(e->ethread);
+ UnixUDPConnection *uc;
+
+ /* Notice: the race between traversal of newconn_list and UDPBind()
+ *
+ * If the UDPBind() is called after the traversal of newconn_list,
+ * the UDPConnection, the one from the pollDescriptor->result, did not push into the open_list.
+ *
+ * TODO:
+ *
+ * Take UnixNetVConnection::acceptEvent() as reference to create UnixUDPConnection::newconnEvent().
+ */
+
+ // handle new UDP connection
+ SList(UnixUDPConnection, newconn_alink) ncq(newconn_list.popall());
+ while ((uc = ncq.pop())) {
+ if (uc->shouldDestroy()) {
+ open_list.remove(uc); // due to the above race
+ uc->Release();
+ } else {
+ ink_assert(uc->mutex && uc->continuation);
+ open_list.in_or_enqueue(uc); // due to the above race
+ }
+ }
// handle UDP outgoing engine
udpOutQueue.service(this);
// handle UDP read operations
- UnixUDPConnection *uc, *next;
- int i;
- int nread = 0;
-
- EventIO *temp_eptr = nullptr;
+ int i, nread = 0;
+ PollCont *pc = get_UDPPollCont(e->ethread);
+ EventIO *epd = nullptr;
for (i = 0; i < pc->pollDescriptor->result; i++) {
- temp_eptr = (EventIO *)get_ev_data(pc->pollDescriptor, i);
- if ((get_ev_events(pc->pollDescriptor, i) & EVENTIO_READ) && temp_eptr->type == EVENTIO_UDP_CONNECTION) {
- uc = temp_eptr->data.uc;
- ink_assert(uc && uc->mutex && uc->continuation);
- ink_assert(uc->refcount >= 1);
- if (uc->shouldDestroy()) {
- // udp_polling->remove(uc,uc->polling_link);
- uc->Release();
+ epd = (EventIO *)get_ev_data(pc->pollDescriptor, i);
+ if (epd->type == EVENTIO_UDP_CONNECTION) {
+ // TODO: handle EVENTIO_ERROR
+ if (get_ev_events(pc->pollDescriptor, i) & EVENTIO_READ) {
+ uc = epd->data.uc;
+ ink_assert(uc && uc->mutex && uc->continuation);
+ ink_assert(uc->refcount >= 1);
+ open_list.in_or_enqueue(uc); // due to the above race
+ if (uc->shouldDestroy()) {
+ open_list.remove(uc);
+ uc->Release();
+ } else {
+ udpNetInternal.udp_read_from_net(this, uc);
+ nread++;
+ }
} else {
- udpNetInternal.udp_read_from_net(this, uc);
- nread++;
+ Debug("iocore_udp_main", "Unhandled epoll event: 0x%04x", get_ev_events(pc->pollDescriptor, i));
+ }
+ } else if (epd->type == EVENTIO_DNS_CONNECTION) {
+ // TODO: handle DNS conn if there is ET_UDP
+ if (epd->data.dnscon != nullptr) {
+ epd->data.dnscon->trigger();
+#if defined(USE_EDGE_TRIGGER)
+ epd->refresh(EVENTIO_READ);
+#endif
}
- } // if EPOLLIN
- } // end for
+ } else if (epd->type == EVENTIO_ASYNC_SIGNAL) {
+ // TODO: receive signal from event system
+ // net_signal_hook_callback(this->trigger_event->ethread);
+ }
+ } // end for
// remove dead UDP connections
ink_hrtime now = Thread::get_hrtime_updated();
if (now >= nextCheck) {
- for (uc = udp_polling.head; uc; uc = next) {
- ink_assert(uc->mutex && uc->continuation);
- ink_assert(uc->refcount >= 1);
- next = uc->polling_link.next;
- if (uc->shouldDestroy()) {
- // changed by YTS Team, yamsat
- // udp_polling->remove(uc,uc->polling_link);
- uc->Release();
+ forl_LL(UnixUDPConnection, xuc, open_list)
+ {
+ ink_assert(xuc->mutex && xuc->continuation);
+ ink_assert(xuc->refcount >= 1);
+ if (xuc->shouldDestroy()) {
+ open_list.remove(xuc);
+ xuc->Release();
}
}
nextCheck = Thread::get_hrtime_updated() + HRTIME_MSECONDS(1000);
--
To stop receiving notification emails like this one, please contact
['"commits@trafficserver.apache.org" <co...@trafficserver.apache.org>'].