You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ok...@apache.org on 2017/10/31 04:12:19 UTC

[trafficserver] branch master updated: Rework the new udp connection

This is an automated email from the ASF dual-hosted git repository.

oknet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 83e50f5  Rework the new udp connection
83e50f5 is described below

commit 83e50f5dab49081a223cc4d01005e033a699897e
Author: Oknet Xu <xu...@skyguard.com.cn>
AuthorDate: Fri Oct 27 22:33:56 2017 +0800

    Rework the new udp connection
    
    The UDPNetHandler works like the NetHandler. But it is free lock to
    push a UDPConnection into UDPNetHandler.
    The UDPNetHandler gets new UDPConnection from newconn_list (an atomic
    list) and push into open_list that saved all opend udp connections.
    
    There is no InactivityCop for UDPConnection. The open_list is traversal
    within UDPNetHandler::mainNetEvent() every second.
---
 iocore/net/P_UDPNet.h            | 25 ++++++-----
 iocore/net/P_UnixUDPConnection.h |  3 +-
 iocore/net/UnixUDPConnection.cc  |  6 +--
 iocore/net/UnixUDPNet.cc         | 97 ++++++++++++++++++++++++++++------------
 4 files changed, 86 insertions(+), 45 deletions(-)

diff --git a/iocore/net/P_UDPNet.h b/iocore/net/P_UDPNet.h
index f2f8c0f..509064e 100644
--- a/iocore/net/P_UDPNet.h
+++ b/iocore/net/P_UDPNet.h
@@ -31,6 +31,7 @@
 #ifndef __P_UDPNET_H_
 #define __P_UDPNET_H_
 
+#include "ts/ink_platform.h"
 #include "I_UDPNet.h"
 #include "P_UDPPacket.h"
 
@@ -40,7 +41,7 @@ static inline PollCont *get_UDPPollCont(EThread *);
 #include "P_UnixUDPConnection.h"
 #include "P_UDPIOEvent.h"
 
-struct UDPNetHandler;
+class UDPNetHandler;
 
 struct UDPNetProcessorInternal : public UDPNetProcessor {
   virtual int start(int n_udp_threads, size_t stacksize);
@@ -303,19 +304,21 @@ public:
 
 void initialize_thread_for_udp_net(EThread *thread);
 
-struct UDPNetHandler : public Continuation {
+class UDPNetHandler : public Continuation
+{
 public:
-  // to be polled for read
-  Que(UnixUDPConnection, polling_link) udp_polling;
+  // engine for outgoing packets
+  UDPQueue udpOutQueue{};
+
+  // New UDPConnections
+  // to hold the newly created descriptors before scheduling them on the servicing buckets.
+  // atomically added to by a thread creating a new connection with UDPBind
+  ASLL(UnixUDPConnection, newconn_alink) newconn_list;
+  // All opened UDPConnections
+  Que(UnixUDPConnection, link) open_list;
   // to be called back with data
   Que(UnixUDPConnection, callback_link) udp_callbacks;
-  // outgoing packets
-  UDPQueue udpOutQueue{};
-  // to hold the newly created descriptors before scheduling them on
-  // the servicing buckets.
-  // atomically added to by a thread creating a new connection with
-  // UDPBind
-  InkAtomicList udpNewConnections;
+
   Event *trigger_event = nullptr;
   ink_hrtime nextCheck;
   ink_hrtime lastCheck;
diff --git a/iocore/net/P_UnixUDPConnection.h b/iocore/net/P_UnixUDPConnection.h
index 29b6d82..4815f8a 100644
--- a/iocore/net/P_UnixUDPConnection.h
+++ b/iocore/net/P_UnixUDPConnection.h
@@ -42,9 +42,8 @@ public:
   void errorAndDie(int e);
   int callbackHandler(int event, void *data);
 
-  LINK(UnixUDPConnection, polling_link);
-  LINK(UnixUDPConnection, callback_link);
   SLINK(UnixUDPConnection, newconn_alink);
+  LINK(UnixUDPConnection, callback_link);
 
   // Incoming UDP Packet Queue
   ASLL(UDPPacketInternal, alink) inQueue;
diff --git a/iocore/net/UnixUDPConnection.cc b/iocore/net/UnixUDPConnection.cc
index 9cbab85..4d1eaf3 100644
--- a/iocore/net/UnixUDPConnection.cc
+++ b/iocore/net/UnixUDPConnection.cc
@@ -109,7 +109,7 @@ UDPConnection::bindToThread(Continuation *c)
   AddRef();
   uc->continuation = c;
   mutex            = c->mutex;
-  ink_atomiclist_push(&get_UDPNetHandler(t)->udpNewConnections, uc);
+  get_UDPNetHandler(t)->newconn_list.push(uc);
 }
 
 Action *
@@ -145,8 +145,8 @@ UDPConnection::Release()
 
     ink_assert(p->callback_link.next == nullptr);
     ink_assert(p->callback_link.prev == nullptr);
-    ink_assert(p->polling_link.next == nullptr);
-    ink_assert(p->polling_link.prev == nullptr);
+    ink_assert(p->link.next == nullptr);
+    ink_assert(p->link.prev == nullptr);
     ink_assert(p->newconn_alink.next == nullptr);
 
     delete this;
diff --git a/iocore/net/UnixUDPNet.cc b/iocore/net/UnixUDPNet.cc
index 008eafc..dcf3e7e 100644
--- a/iocore/net/UnixUDPNet.cc
+++ b/iocore/net/UnixUDPNet.cc
@@ -64,8 +64,13 @@ sockaddr_in6 G_bwGrapherLoc;
 void
 initialize_thread_for_udp_net(EThread *thread)
 {
+  UDPNetHandler *nh = get_UDPNetHandler(thread);
+
+  new ((ink_dummy_for_new *)nh) UDPNetHandler;
   new ((ink_dummy_for_new *)get_UDPPollCont(thread)) PollCont(thread->mutex);
-  new ((ink_dummy_for_new *)get_UDPNetHandler(thread)) UDPNetHandler;
+  // The UDPNetHandler cannot be accessed across EThreads.
+  // Because the UDPNetHandler should be called back immediately after UDPPollCont.
+  nh->mutex = thread->mutex.get();
 
   // This variable controls how often we cleanup the cancelled packets.
   // If it is set to 0, then cleanup never occurs.
@@ -807,8 +812,6 @@ UDPQueue::send(UDPPacket *p)
 
 UDPNetHandler::UDPNetHandler()
 {
-  mutex = new_ProxyMutex();
-  ink_atomiclist_init(&udpNewConnections, "UDP Connection queue", offsetof(UnixUDPConnection, newconn_alink.next));
   nextCheck = Thread::get_hrtime_updated() + HRTIME_MSECONDS(1000);
   lastCheck = 0;
   SET_HANDLER((UDPNetContHandler)&UDPNetHandler::startNetEvent);
@@ -831,44 +834,80 @@ UDPNetHandler::mainNetEvent(int event, Event *e)
   (void)event;
   (void)e;
 
-  PollCont *pc = get_UDPPollCont(e->ethread);
+  UnixUDPConnection *uc;
+
+  /* Notice: the race between traversal of newconn_list and UDPBind()
+   *
+   * If the UDPBind() is called after the traversal of newconn_list,
+   * the UDPConnection, the one from the pollDescriptor->result, did not push into the open_list.
+   *
+   * TODO:
+   *
+   * Take UnixNetVConnection::acceptEvent() as reference to create UnixUDPConnection::newconnEvent().
+   */
+
+  // handle new UDP connection
+  SList(UnixUDPConnection, newconn_alink) ncq(newconn_list.popall());
+  while ((uc = ncq.pop())) {
+    if (uc->shouldDestroy()) {
+      open_list.remove(uc); // due to the above race
+      uc->Release();
+    } else {
+      ink_assert(uc->mutex && uc->continuation);
+      open_list.in_or_enqueue(uc); // due to the above race
+    }
+  }
 
   // handle UDP outgoing engine
   udpOutQueue.service(this);
 
   // handle UDP read operations
-  UnixUDPConnection *uc, *next;
-  int i;
-  int nread = 0;
-
-  EventIO *temp_eptr = nullptr;
+  int i, nread = 0;
+  PollCont *pc = get_UDPPollCont(e->ethread);
+  EventIO *epd = nullptr;
   for (i = 0; i < pc->pollDescriptor->result; i++) {
-    temp_eptr = (EventIO *)get_ev_data(pc->pollDescriptor, i);
-    if ((get_ev_events(pc->pollDescriptor, i) & EVENTIO_READ) && temp_eptr->type == EVENTIO_UDP_CONNECTION) {
-      uc = temp_eptr->data.uc;
-      ink_assert(uc && uc->mutex && uc->continuation);
-      ink_assert(uc->refcount >= 1);
-      if (uc->shouldDestroy()) {
-        // udp_polling->remove(uc,uc->polling_link);
-        uc->Release();
+    epd = (EventIO *)get_ev_data(pc->pollDescriptor, i);
+    if (epd->type == EVENTIO_UDP_CONNECTION) {
+      // TODO: handle EVENTIO_ERROR
+      if (get_ev_events(pc->pollDescriptor, i) & EVENTIO_READ) {
+        uc = epd->data.uc;
+        ink_assert(uc && uc->mutex && uc->continuation);
+        ink_assert(uc->refcount >= 1);
+        open_list.in_or_enqueue(uc); // due to the above race
+        if (uc->shouldDestroy()) {
+          open_list.remove(uc);
+          uc->Release();
+        } else {
+          udpNetInternal.udp_read_from_net(this, uc);
+          nread++;
+        }
       } else {
-        udpNetInternal.udp_read_from_net(this, uc);
-        nread++;
+        Debug("iocore_udp_main", "Unhandled epoll event: 0x%04x", get_ev_events(pc->pollDescriptor, i));
+      }
+    } else if (epd->type == EVENTIO_DNS_CONNECTION) {
+      // TODO: handle DNS conn if there is ET_UDP
+      if (epd->data.dnscon != nullptr) {
+        epd->data.dnscon->trigger();
+#if defined(USE_EDGE_TRIGGER)
+        epd->refresh(EVENTIO_READ);
+#endif
       }
-    } // if EPOLLIN
-  }   // end for
+    } else if (epd->type == EVENTIO_ASYNC_SIGNAL) {
+      // TODO: receive signal from event system
+      // net_signal_hook_callback(this->trigger_event->ethread);
+    }
+  } // end for
 
   // remove dead UDP connections
   ink_hrtime now = Thread::get_hrtime_updated();
   if (now >= nextCheck) {
-    for (uc = udp_polling.head; uc; uc = next) {
-      ink_assert(uc->mutex && uc->continuation);
-      ink_assert(uc->refcount >= 1);
-      next = uc->polling_link.next;
-      if (uc->shouldDestroy()) {
-        // changed by YTS Team, yamsat
-        // udp_polling->remove(uc,uc->polling_link);
-        uc->Release();
+    forl_LL(UnixUDPConnection, xuc, open_list)
+    {
+      ink_assert(xuc->mutex && xuc->continuation);
+      ink_assert(xuc->refcount >= 1);
+      if (xuc->shouldDestroy()) {
+        open_list.remove(xuc);
+        xuc->Release();
       }
     }
     nextCheck = Thread::get_hrtime_updated() + HRTIME_MSECONDS(1000);

-- 
To stop receiving notification emails like this one, please contact
['"commits@trafficserver.apache.org" <co...@trafficserver.apache.org>'].