You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by sc...@apache.org on 2018/02/07 06:19:48 UTC

[trafficserver] 02/06: complete pollcont

This is an automated email from the ASF dual-hosted git repository.

scw00 pushed a commit to branch quic-latest
in repository https://gitbox.apache.org/repos/asf/trafficserver.git

commit c85f903a11d12b4e1505bd68603f2c157805c4a8
Author: scw00 <sc...@apache.org>
AuthorDate: Sun Jan 28 21:22:41 2018 +0800

    complete pollcont
---
 cmd/traffic_quic/quic_client.h    |   1 +
 cmd/traffic_quic/traffic_quic.cc  |   2 +
 iocore/net/I_UDPPacket.h          |   2 +-
 iocore/net/Makefile.am            |   2 +
 iocore/net/P_Net.h                |   1 +
 iocore/net/P_QUICNet.h            |   6 ++
 iocore/net/P_QUICNetProcessor.h   |   1 +
 iocore/net/P_QUICNetVConnection.h |   3 +
 iocore/net/QUICNet.cc             | 141 +++++++++++++++++++++++---------------
 iocore/net/QUICNetProcessor.cc    |  10 +++
 iocore/net/QUICNetVConnection.cc  |   1 +
 iocore/net/QUICPacketHandler.cc   |  14 ++--
 proxy/Main.cc                     |   4 ++
 13 files changed, 129 insertions(+), 59 deletions(-)

diff --git a/cmd/traffic_quic/quic_client.h b/cmd/traffic_quic/quic_client.h
index 05d0c47..575d7a1 100644
--- a/cmd/traffic_quic/quic_client.h
+++ b/cmd/traffic_quic/quic_client.h
@@ -23,6 +23,7 @@
 
 #pragma once
 
+#include "P_Net.h"
 #include "I_EventSystem.h"
 #include "I_NetVConnection.h"
 #include "P_QUICNetProcessor.h"
diff --git a/cmd/traffic_quic/traffic_quic.cc b/cmd/traffic_quic/traffic_quic.cc
index 5e48349..19ea9c2 100644
--- a/cmd/traffic_quic/traffic_quic.cc
+++ b/cmd/traffic_quic/traffic_quic.cc
@@ -78,6 +78,8 @@ main(int argc, const char **argv)
   SSLInitializeLibrary();
   SSLConfig::startup();
 
+  quic_NetProcessor.init();
+
   ink_event_system_init(EVENT_SYSTEM_MODULE_VERSION);
   eventProcessor.start(THREADS);
   udpNet.start(1, stacksize);
diff --git a/iocore/net/I_UDPPacket.h b/iocore/net/I_UDPPacket.h
index 271a250..fd047e5 100644
--- a/iocore/net/I_UDPPacket.h
+++ b/iocore/net/I_UDPPacket.h
@@ -63,7 +63,7 @@ public:
 
   int from_size;
   typedef union udppacket_data {
-    void    *ptr;
+    void *ptr;
     uint32_t u32;
     uint64_t u64;
   } udppacket_data_t;
diff --git a/iocore/net/Makefile.am b/iocore/net/Makefile.am
index b044e80..f32cc13 100644
--- a/iocore/net/Makefile.am
+++ b/iocore/net/Makefile.am
@@ -162,10 +162,12 @@ libinknet_a_SOURCES = \
 if ENABLE_QUIC
 libinknet_a_SOURCES += \
   P_QUICPacketHandler.h \
+  P_QUICNet.h \
   P_QUICNetProcessor.h \
   P_QUICNetVConnection.h \
   P_QUICNextProtocolAccept.h \
   QUICPacketHandler.cc \
+  QUICNet.cc \
   QUICNetProcessor.cc \
   QUICNetVConnection.cc \
   QUICNextProtocolAccept.cc
diff --git a/iocore/net/P_Net.h b/iocore/net/P_Net.h
index 0a7a502..38504ee 100644
--- a/iocore/net/P_Net.h
+++ b/iocore/net/P_Net.h
@@ -113,6 +113,7 @@ extern RecRawStatBlock *net_rsb;
 #include "P_QUICNetVConnection.h"
 #include "P_QUICNetProcessor.h"
 #include "P_QUICPacketHandler.h"
+#include "P_QUICNet.h"
 #endif
 // #include "P_QUICCertLookup.h"
 
diff --git a/iocore/net/P_QUICNet.h b/iocore/net/P_QUICNet.h
index fba0e96..7231207 100644
--- a/iocore/net/P_QUICNet.h
+++ b/iocore/net/P_QUICNet.h
@@ -32,6 +32,8 @@
 class NetHandler;
 typedef int (NetHandler::*NetContHandler)(int, void *);
 
+void initialize_thread_for_quic_net(EThread *thread);
+
 struct QUICPollCont : public Continuation {
   NetHandler *net_handler;
   PollDescriptor *pollDescriptor;
@@ -49,6 +51,10 @@ public:
   Que(UDPPacket, link) longInQueue;
   // Internal Queue to save Short Header Packet
   Que(UDPPacket, link) shortInQueue;
+
+private:
+  void _process_short_header_packet(UDPPacketInternal *p, NetHandler *nh);
+  void _process_long_header_packet(UDPPacketInternal *p, NetHandler *nh);
 };
 
 static inline QUICPollCont *
diff --git a/iocore/net/P_QUICNetProcessor.h b/iocore/net/P_QUICNetProcessor.h
index 2b7eb10..c9d6c2a 100644
--- a/iocore/net/P_QUICNetProcessor.h
+++ b/iocore/net/P_QUICNetProcessor.h
@@ -56,6 +56,7 @@ public:
   QUICNetProcessor();
   virtual ~QUICNetProcessor();
 
+  void init() override;
   virtual int start(int, size_t stacksize) override;
   void cleanup();
   // TODO: refactoring NetProcessor::connect_re and UnixNetProcessor::connect_re_internal
diff --git a/iocore/net/P_QUICNetVConnection.h b/iocore/net/P_QUICNetVConnection.h
index 4e999d9..8f4d7e5 100644
--- a/iocore/net/P_QUICNetVConnection.h
+++ b/iocore/net/P_QUICNetVConnection.h
@@ -145,6 +145,9 @@ public:
   QUICNetVConnection() {}
   void init(QUICConnectionId original_cid, UDPConnection *, QUICPacketHandler *, QUICConnectionTable *ctable = nullptr);
 
+  // accept new conn_id
+  int acceptEvent(int event, Event *e);
+
   // UnixNetVConnection
   void reenable(VIO *vio) override;
   VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) override;
diff --git a/iocore/net/QUICNet.cc b/iocore/net/QUICNet.cc
index c40121f..3e730af 100644
--- a/iocore/net/QUICNet.cc
+++ b/iocore/net/QUICNet.cc
@@ -23,14 +23,12 @@
 
 #include "P_Net.h"
 
-QUICPollCont::QUICPollCont(Ptr<ProxyMutex> &m)
-  : Continuation(m.get()), net_handler(nullptr)
+QUICPollCont::QUICPollCont(Ptr<ProxyMutex> &m) : Continuation(m.get()), net_handler(nullptr)
 {
-  SET_HANDLER(&PollCont::pollEvent);
+  SET_HANDLER(&QUICPollCont::pollEvent);
 }
 
-QUICPollCont::QUICPollCont(Ptr<ProxyMutex> &m, NetHandler *nh)
-  : Continuation(m.get()), net_handler(nh)
+QUICPollCont::QUICPollCont(Ptr<ProxyMutex> &m, NetHandler *nh) : Continuation(m.get()), net_handler(nh)
 {
   SET_HANDLER(&QUICPollCont::pollEvent);
 }
@@ -39,6 +37,83 @@ QUICPollCont::~QUICPollCont()
 {
 }
 
+void
+QUICPollCont::_process_long_header_packet(UDPPacketInternal *p, NetHandler *nh)
+{
+  QUICNetVConnection *vc;
+  QUICPacketType ptype;
+  uint8_t *buf;
+
+  // FIXME: VC is nullptr ?
+  vc  = static_cast<QUICNetVConnection *>(p->data.ptr);
+  buf = (uint8_t *)p->getIOBlockChain()->buf();
+  if (!QUICTypeUtil::has_connection_id(reinterpret_cast<const uint8_t *>(buf))) {
+    // TODO: Some packets may not have connection id
+    p->free();
+    return;
+  }
+
+  ptype = static_cast<QUICPacketType>(buf[0] & 0x7f);
+  switch (ptype) {
+  case QUICPacketType::INITIAL:
+    vc->read.triggered = 1;
+    vc->handle_received_packet(p);
+    this->mutex->thread_holding->schedule_imm(vc);
+    return;
+  case QUICPacketType::ZERO_RTT_PROTECTED:
+  // TODO:: do something ?
+  // break;
+  case QUICPacketType::HANDSHAKE:
+  default:
+    // Just Pass Through
+    if (vc) {
+      vc->read.triggered = 1;
+      vc->handle_received_packet(p);
+    } else {
+      longInQueue.push(p);
+    }
+
+    // Push QUICNetVC into nethandler's enabled list
+    if (vc != nullptr) {
+      int isin = ink_atomic_swap(&vc->read.in_enabled_list, 1);
+      if (!isin) {
+        nh->read_enable_list.push(vc);
+      }
+    }
+    break;
+  }
+}
+
+void
+QUICPollCont::_process_short_header_packet(UDPPacketInternal *p, NetHandler *nh)
+{
+  QUICNetVConnection *vc;
+  uint8_t *buf;
+
+  vc  = static_cast<QUICNetVConnection *>(p->data.ptr);
+  buf = (uint8_t *)p->getIOBlockChain()->buf();
+  if (!QUICTypeUtil::has_connection_id(reinterpret_cast<const uint8_t *>(buf))) {
+    // TODO: Some packets may not have connection id
+    p->free();
+    return;
+  }
+
+  if (vc) {
+    vc->read.triggered = 1;
+    vc->handle_received_packet(p);
+  } else {
+    shortInQueue.push(p);
+  }
+
+  // Push QUICNetVC into nethandler's enabled list
+  if (vc != nullptr) {
+    int isin = ink_atomic_swap(&vc->read.in_enabled_list, 1);
+    if (!isin) {
+      nh->read_enable_list.push(vc);
+    }
+  }
+}
+
 //
 // QUICPollCont continuation which traverse the inQueue(ASLL)
 // and create new QUICNetVC for Initial Packet,
@@ -47,15 +122,10 @@ QUICPollCont::~QUICPollCont()
 int
 QUICPollCont::pollEvent(int, Event *)
 {
-  UnixUDPConnection *uc;
-  QUICPacketHandler *ph;
-  QUICNetVConnection *vc;
-  QUICConnectionId cid;
+  ink_assert(this->mutex->thread_holding == this_thread());
   uint8_t *buf;
-  uint8_t ptype;
-  UDPPacket *packet_r;
   UDPPacketInternal *p = nullptr;
-  NetHandler *nh       = get_NetHandler(t);
+  NetHandler *nh       = get_NetHandler(this->mutex->thread_holding);
 
   // Process the ASLL
   SList(UDPPacketInternal, alink) aq(inQueue.popall());
@@ -65,50 +135,14 @@ QUICPollCont::pollEvent(int, Event *)
   }
 
   while ((p = result.pop())) {
-    uc  = static_cast<UnixUDPConnection *>(p->getConnection());
-    ph  = static_cast<QUICPacketHandler *>(uc->continuation);
-    vc  = static_cast<QUICNetVConnection *>(p->data.ptr);
     buf = (uint8_t *)p->getIOBlockChain()->buf();
-    cid = QUICPacket::connection_id(buf)
-    if (buf[0] & 0x80) { // Long Header Packet with Connection ID, has a valid type value.
-      ptype = buf[0] & 0x7f;
-      if (ptype == QUICPacketType::INITIAL) { // Initial Packet
-        vc->read.triggered = 1;
-        vc->push_packet(p);
-        // reschedule the vc and callback vc->acceptEvent
-        this_ethread()->schedule_imm(vc);
-      } elseif (ptype == QUICPacketType::ZERO_RTT_PROTECTED) { // 0-RTT Packet
-        // TODO:
-      } elseif (ptype == QUICPacketType::HANDSHAKE) { // Handshake Packet
-        if (vc) {
-          vc->read.triggered = 1;
-          vc->push_packet(p);
-        } else {
-          longInQueue.push(p);
-        }
-      } else {
-        ink_assert(!"not reached!");
-      }
-    } elseif (buf[0] & 0x40) { // Short Header Packet with Connection ID, has a valid type value.
-      if (vc) {
-        vc->read.triggered = 1;
-        vc->push_packet(p);
-      } else {
-        shortInQueue.push(p);
-      }
-    } else {
-      ink_assert(!"not reached!");
-    }
-
-    // Push QUICNetVC into nethandler's enabled list
-    if (vc != nullptr) {
-      int isin = ink_atomic_swap(&vc->read.in_enabled_list, 1);
-      if (!isin) {
-        nh->read_enable_list.push(vc);
-      }
+    if (QUICTypeUtil::has_long_header(buf)) { // Long Header Packet with Connection ID, has a valid type value.
+      this->_process_long_header_packet(p, nh);
+    } else { // Short Header Packet with Connection ID, has a valid type value.
+      this->_process_short_header_packet(p, nh);
     }
   }
-  
+
   return EVENT_CONT;
 }
 
@@ -122,4 +156,3 @@ initialize_thread_for_quic_net(EThread *thread)
 
   thread->schedule_every(quicpc, -9);
 }
-
diff --git a/iocore/net/QUICNetProcessor.cc b/iocore/net/QUICNetProcessor.cc
index 8dc335f..da372c6 100644
--- a/iocore/net/QUICNetProcessor.cc
+++ b/iocore/net/QUICNetProcessor.cc
@@ -50,6 +50,16 @@ QUICNetProcessor::cleanup()
   SSL_CTX_free(this->_ssl_ctx);
 }
 
+void
+QUICNetProcessor::init()
+{
+  // first we allocate a QUICPollCont.
+  this->quicPollCont_offset = eventProcessor.allocate(sizeof(QUICPollCont));
+
+  // schedule event
+  eventProcessor.schedule_spawn(&initialize_thread_for_quic_net, ET_NET);
+}
+
 int
 QUICNetProcessor::start(int, size_t stacksize)
 {
diff --git a/iocore/net/QUICNetVConnection.cc b/iocore/net/QUICNetVConnection.cc
index b0088c8..e71c109 100644
--- a/iocore/net/QUICNetVConnection.cc
+++ b/iocore/net/QUICNetVConnection.cc
@@ -117,6 +117,7 @@ QUICNetVConnection::acceptEvent(int event, Event *e)
     free(t);
     return EVENT_DONE;
   }
+  this->read.enabled = 1;
 
   // Handshake callback handler.
   SET_HANDLER((NetVConnHandler)&QUICNetVConnection::state_pre_handshake);
diff --git a/iocore/net/QUICPacketHandler.cc b/iocore/net/QUICPacketHandler.cc
index 8ac76c0..a09c394 100644
--- a/iocore/net/QUICPacketHandler.cc
+++ b/iocore/net/QUICPacketHandler.cc
@@ -125,7 +125,8 @@ void
 QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet)
 {
   EThread *eth;
-  IOBufferBlock *block = udp_packet->getIOBlockChain();
+  QUICNetVConnection *vc = nullptr;
+  IOBufferBlock *block   = udp_packet->getIOBlockChain();
 
   if (is_debug_tag_set("quic_sec")) {
     ip_port_text_buffer ipb;
@@ -163,7 +164,7 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet)
 
     // Create a new NetVConnection
     QUICConnectionId original_cid = this->_read_connection_id(block);
-    QUICNetVConnection *vc        = static_cast<QUICNetVConnection *>(getNetProcessor()->allocate_vc(nullptr));
+    vc                            = static_cast<QUICNetVConnection *>(getNetProcessor()->allocate_vc(nullptr));
     vc->init(original_cid, udp_packet->getConnection(), this, &this->_ctable);
     vc->id = net_next_connection_number();
     vc->con.move(con);
@@ -173,16 +174,21 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet)
     vc->action_     = *this->action_;
     vc->set_is_transparent(this->opt.f_inbound_transparent);
     vc->set_context(NET_VCONNECTION_IN);
+    vc->read.triggered = 1;
+    vc->start(this->_ssl_ctx);
     vc->options.ip_proto  = NetVCOptions::USE_UDP;
     vc->options.ip_family = udp_packet->from.sa.sa_family;
 
     qc = vc;
+  } else {
+    vc  = static_cast<QUICNetVConnection *>(qc);
+    eth = vc->thread;
   }
 
   // Push the packet into QUICPollCont
   udp_packet->data.ptr = vc;
-  get_QUICPollCont(eth)->inQueue.push(udp_packet);
-
+  // should we use dynamic_cast ??
+  get_QUICPollCont(eth)->inQueue.push(static_cast<UDPPacketInternal *>(udp_packet));
 }
 
 // TODO: Should be called via eventProcessor?
diff --git a/proxy/Main.cc b/proxy/Main.cc
index 5e36c0e..5614444 100644
--- a/proxy/Main.cc
+++ b/proxy/Main.cc
@@ -1800,6 +1800,10 @@ main(int /* argc ATS_UNUSED */, const char **argv)
   // Do the inits for NetProcessors that use ET_NET threads. MUST be before starting those threads.
   netProcessor.init();
   init_HttpProxyServer();
+#if TS_USE_QUIC == 1
+  // OK, pushing a spawn scheduling here
+  quic_NetProcessor.init();
+#endif
 
   // !! ET_NET threads start here !!
   // This means any spawn scheduling must be done before this point.

-- 
To stop receiving notification emails like this one, please contact
scw00@apache.org.