You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by sc...@apache.org on 2018/02/07 06:19:48 UTC
[trafficserver] 02/06: complete pollcont
This is an automated email from the ASF dual-hosted git repository.
scw00 pushed a commit to branch quic-latest
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit c85f903a11d12b4e1505bd68603f2c157805c4a8
Author: scw00 <sc...@apache.org>
AuthorDate: Sun Jan 28 21:22:41 2018 +0800
complete pollcont
---
cmd/traffic_quic/quic_client.h | 1 +
cmd/traffic_quic/traffic_quic.cc | 2 +
iocore/net/I_UDPPacket.h | 2 +-
iocore/net/Makefile.am | 2 +
iocore/net/P_Net.h | 1 +
iocore/net/P_QUICNet.h | 6 ++
iocore/net/P_QUICNetProcessor.h | 1 +
iocore/net/P_QUICNetVConnection.h | 3 +
iocore/net/QUICNet.cc | 141 +++++++++++++++++++++++---------------
iocore/net/QUICNetProcessor.cc | 10 +++
iocore/net/QUICNetVConnection.cc | 1 +
iocore/net/QUICPacketHandler.cc | 14 ++--
proxy/Main.cc | 4 ++
13 files changed, 129 insertions(+), 59 deletions(-)
diff --git a/cmd/traffic_quic/quic_client.h b/cmd/traffic_quic/quic_client.h
index 05d0c47..575d7a1 100644
--- a/cmd/traffic_quic/quic_client.h
+++ b/cmd/traffic_quic/quic_client.h
@@ -23,6 +23,7 @@
#pragma once
+#include "P_Net.h"
#include "I_EventSystem.h"
#include "I_NetVConnection.h"
#include "P_QUICNetProcessor.h"
diff --git a/cmd/traffic_quic/traffic_quic.cc b/cmd/traffic_quic/traffic_quic.cc
index 5e48349..19ea9c2 100644
--- a/cmd/traffic_quic/traffic_quic.cc
+++ b/cmd/traffic_quic/traffic_quic.cc
@@ -78,6 +78,8 @@ main(int argc, const char **argv)
SSLInitializeLibrary();
SSLConfig::startup();
+ quic_NetProcessor.init();
+
ink_event_system_init(EVENT_SYSTEM_MODULE_VERSION);
eventProcessor.start(THREADS);
udpNet.start(1, stacksize);
diff --git a/iocore/net/I_UDPPacket.h b/iocore/net/I_UDPPacket.h
index 271a250..fd047e5 100644
--- a/iocore/net/I_UDPPacket.h
+++ b/iocore/net/I_UDPPacket.h
@@ -63,7 +63,7 @@ public:
int from_size;
typedef union udppacket_data {
- void *ptr;
+ void *ptr;
uint32_t u32;
uint64_t u64;
} udppacket_data_t;
diff --git a/iocore/net/Makefile.am b/iocore/net/Makefile.am
index b044e80..f32cc13 100644
--- a/iocore/net/Makefile.am
+++ b/iocore/net/Makefile.am
@@ -162,10 +162,12 @@ libinknet_a_SOURCES = \
if ENABLE_QUIC
libinknet_a_SOURCES += \
P_QUICPacketHandler.h \
+ P_QUICNet.h \
P_QUICNetProcessor.h \
P_QUICNetVConnection.h \
P_QUICNextProtocolAccept.h \
QUICPacketHandler.cc \
+ QUICNet.cc \
QUICNetProcessor.cc \
QUICNetVConnection.cc \
QUICNextProtocolAccept.cc
diff --git a/iocore/net/P_Net.h b/iocore/net/P_Net.h
index 0a7a502..38504ee 100644
--- a/iocore/net/P_Net.h
+++ b/iocore/net/P_Net.h
@@ -113,6 +113,7 @@ extern RecRawStatBlock *net_rsb;
#include "P_QUICNetVConnection.h"
#include "P_QUICNetProcessor.h"
#include "P_QUICPacketHandler.h"
+#include "P_QUICNet.h"
#endif
// #include "P_QUICCertLookup.h"
diff --git a/iocore/net/P_QUICNet.h b/iocore/net/P_QUICNet.h
index fba0e96..7231207 100644
--- a/iocore/net/P_QUICNet.h
+++ b/iocore/net/P_QUICNet.h
@@ -32,6 +32,8 @@
class NetHandler;
typedef int (NetHandler::*NetContHandler)(int, void *);
+void initialize_thread_for_quic_net(EThread *thread);
+
struct QUICPollCont : public Continuation {
NetHandler *net_handler;
PollDescriptor *pollDescriptor;
@@ -49,6 +51,10 @@ public:
Que(UDPPacket, link) longInQueue;
// Internal Queue to save Short Header Packet
Que(UDPPacket, link) shortInQueue;
+
+private:
+ void _process_short_header_packet(UDPPacketInternal *p, NetHandler *nh);
+ void _process_long_header_packet(UDPPacketInternal *p, NetHandler *nh);
};
static inline QUICPollCont *
diff --git a/iocore/net/P_QUICNetProcessor.h b/iocore/net/P_QUICNetProcessor.h
index 2b7eb10..c9d6c2a 100644
--- a/iocore/net/P_QUICNetProcessor.h
+++ b/iocore/net/P_QUICNetProcessor.h
@@ -56,6 +56,7 @@ public:
QUICNetProcessor();
virtual ~QUICNetProcessor();
+ void init() override;
virtual int start(int, size_t stacksize) override;
void cleanup();
// TODO: refactoring NetProcessor::connect_re and UnixNetProcessor::connect_re_internal
diff --git a/iocore/net/P_QUICNetVConnection.h b/iocore/net/P_QUICNetVConnection.h
index 4e999d9..8f4d7e5 100644
--- a/iocore/net/P_QUICNetVConnection.h
+++ b/iocore/net/P_QUICNetVConnection.h
@@ -145,6 +145,9 @@ public:
QUICNetVConnection() {}
void init(QUICConnectionId original_cid, UDPConnection *, QUICPacketHandler *, QUICConnectionTable *ctable = nullptr);
+ // accept new conn_id
+ int acceptEvent(int event, Event *e);
+
// UnixNetVConnection
void reenable(VIO *vio) override;
VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) override;
diff --git a/iocore/net/QUICNet.cc b/iocore/net/QUICNet.cc
index c40121f..3e730af 100644
--- a/iocore/net/QUICNet.cc
+++ b/iocore/net/QUICNet.cc
@@ -23,14 +23,12 @@
#include "P_Net.h"
-QUICPollCont::QUICPollCont(Ptr<ProxyMutex> &m)
- : Continuation(m.get()), net_handler(nullptr)
+QUICPollCont::QUICPollCont(Ptr<ProxyMutex> &m) : Continuation(m.get()), net_handler(nullptr)
{
- SET_HANDLER(&PollCont::pollEvent);
+ SET_HANDLER(&QUICPollCont::pollEvent);
}
-QUICPollCont::QUICPollCont(Ptr<ProxyMutex> &m, NetHandler *nh)
- : Continuation(m.get()), net_handler(nh)
+QUICPollCont::QUICPollCont(Ptr<ProxyMutex> &m, NetHandler *nh) : Continuation(m.get()), net_handler(nh)
{
SET_HANDLER(&QUICPollCont::pollEvent);
}
@@ -39,6 +37,83 @@ QUICPollCont::~QUICPollCont()
{
}
+void
+QUICPollCont::_process_long_header_packet(UDPPacketInternal *p, NetHandler *nh)
+{
+ QUICNetVConnection *vc;
+ QUICPacketType ptype;
+ uint8_t *buf;
+
+ // FIXME: VC is nullptr ?
+ vc = static_cast<QUICNetVConnection *>(p->data.ptr);
+ buf = (uint8_t *)p->getIOBlockChain()->buf();
+ if (!QUICTypeUtil::has_connection_id(reinterpret_cast<const uint8_t *>(buf))) {
+ // TODO: Some packets may not have connection id
+ p->free();
+ return;
+ }
+
+ ptype = static_cast<QUICPacketType>(buf[0] & 0x7f);
+ switch (ptype) {
+ case QUICPacketType::INITIAL:
+ vc->read.triggered = 1;
+ vc->handle_received_packet(p);
+ this->mutex->thread_holding->schedule_imm(vc);
+ return;
+ case QUICPacketType::ZERO_RTT_PROTECTED:
+ // TODO:: do something ?
+ // break;
+ case QUICPacketType::HANDSHAKE:
+ default:
+ // Just Pass Through
+ if (vc) {
+ vc->read.triggered = 1;
+ vc->handle_received_packet(p);
+ } else {
+ longInQueue.push(p);
+ }
+
+ // Push QUICNetVC into nethandler's enabled list
+ if (vc != nullptr) {
+ int isin = ink_atomic_swap(&vc->read.in_enabled_list, 1);
+ if (!isin) {
+ nh->read_enable_list.push(vc);
+ }
+ }
+ break;
+ }
+}
+
+void
+QUICPollCont::_process_short_header_packet(UDPPacketInternal *p, NetHandler *nh)
+{
+ QUICNetVConnection *vc;
+ uint8_t *buf;
+
+ vc = static_cast<QUICNetVConnection *>(p->data.ptr);
+ buf = (uint8_t *)p->getIOBlockChain()->buf();
+ if (!QUICTypeUtil::has_connection_id(reinterpret_cast<const uint8_t *>(buf))) {
+ // TODO: Some packets may not have connection id
+ p->free();
+ return;
+ }
+
+ if (vc) {
+ vc->read.triggered = 1;
+ vc->handle_received_packet(p);
+ } else {
+ shortInQueue.push(p);
+ }
+
+ // Push QUICNetVC into nethandler's enabled list
+ if (vc != nullptr) {
+ int isin = ink_atomic_swap(&vc->read.in_enabled_list, 1);
+ if (!isin) {
+ nh->read_enable_list.push(vc);
+ }
+ }
+}
+
//
// QUICPollCont continuation which traverse the inQueue(ASLL)
// and create new QUICNetVC for Initial Packet,
@@ -47,15 +122,10 @@ QUICPollCont::~QUICPollCont()
int
QUICPollCont::pollEvent(int, Event *)
{
- UnixUDPConnection *uc;
- QUICPacketHandler *ph;
- QUICNetVConnection *vc;
- QUICConnectionId cid;
+ ink_assert(this->mutex->thread_holding == this_thread());
uint8_t *buf;
- uint8_t ptype;
- UDPPacket *packet_r;
UDPPacketInternal *p = nullptr;
- NetHandler *nh = get_NetHandler(t);
+ NetHandler *nh = get_NetHandler(this->mutex->thread_holding);
// Process the ASLL
SList(UDPPacketInternal, alink) aq(inQueue.popall());
@@ -65,50 +135,14 @@ QUICPollCont::pollEvent(int, Event *)
}
while ((p = result.pop())) {
- uc = static_cast<UnixUDPConnection *>(p->getConnection());
- ph = static_cast<QUICPacketHandler *>(uc->continuation);
- vc = static_cast<QUICNetVConnection *>(p->data.ptr);
buf = (uint8_t *)p->getIOBlockChain()->buf();
- cid = QUICPacket::connection_id(buf)
- if (buf[0] & 0x80) { // Long Header Packet with Connection ID, has a valid type value.
- ptype = buf[0] & 0x7f;
- if (ptype == QUICPacketType::INITIAL) { // Initial Packet
- vc->read.triggered = 1;
- vc->push_packet(p);
- // reschedule the vc and callback vc->acceptEvent
- this_ethread()->schedule_imm(vc);
- } elseif (ptype == QUICPacketType::ZERO_RTT_PROTECTED) { // 0-RTT Packet
- // TODO:
- } elseif (ptype == QUICPacketType::HANDSHAKE) { // Handshake Packet
- if (vc) {
- vc->read.triggered = 1;
- vc->push_packet(p);
- } else {
- longInQueue.push(p);
- }
- } else {
- ink_assert(!"not reached!");
- }
- } elseif (buf[0] & 0x40) { // Short Header Packet with Connection ID, has a valid type value.
- if (vc) {
- vc->read.triggered = 1;
- vc->push_packet(p);
- } else {
- shortInQueue.push(p);
- }
- } else {
- ink_assert(!"not reached!");
- }
-
- // Push QUICNetVC into nethandler's enabled list
- if (vc != nullptr) {
- int isin = ink_atomic_swap(&vc->read.in_enabled_list, 1);
- if (!isin) {
- nh->read_enable_list.push(vc);
- }
+ if (QUICTypeUtil::has_long_header(buf)) { // Long Header Packet with Connection ID, has a valid type value.
+ this->_process_long_header_packet(p, nh);
+ } else { // Short Header Packet with Connection ID, has a valid type value.
+ this->_process_short_header_packet(p, nh);
}
}
-
+
return EVENT_CONT;
}
@@ -122,4 +156,3 @@ initialize_thread_for_quic_net(EThread *thread)
thread->schedule_every(quicpc, -9);
}
-
diff --git a/iocore/net/QUICNetProcessor.cc b/iocore/net/QUICNetProcessor.cc
index 8dc335f..da372c6 100644
--- a/iocore/net/QUICNetProcessor.cc
+++ b/iocore/net/QUICNetProcessor.cc
@@ -50,6 +50,16 @@ QUICNetProcessor::cleanup()
SSL_CTX_free(this->_ssl_ctx);
}
+void
+QUICNetProcessor::init()
+{
+ // first we allocate a QUICPollCont.
+ this->quicPollCont_offset = eventProcessor.allocate(sizeof(QUICPollCont));
+
+ // schedule event
+ eventProcessor.schedule_spawn(&initialize_thread_for_quic_net, ET_NET);
+}
+
int
QUICNetProcessor::start(int, size_t stacksize)
{
diff --git a/iocore/net/QUICNetVConnection.cc b/iocore/net/QUICNetVConnection.cc
index b0088c8..e71c109 100644
--- a/iocore/net/QUICNetVConnection.cc
+++ b/iocore/net/QUICNetVConnection.cc
@@ -117,6 +117,7 @@ QUICNetVConnection::acceptEvent(int event, Event *e)
free(t);
return EVENT_DONE;
}
+ this->read.enabled = 1;
// Handshake callback handler.
SET_HANDLER((NetVConnHandler)&QUICNetVConnection::state_pre_handshake);
diff --git a/iocore/net/QUICPacketHandler.cc b/iocore/net/QUICPacketHandler.cc
index 8ac76c0..a09c394 100644
--- a/iocore/net/QUICPacketHandler.cc
+++ b/iocore/net/QUICPacketHandler.cc
@@ -125,7 +125,8 @@ void
QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet)
{
EThread *eth;
- IOBufferBlock *block = udp_packet->getIOBlockChain();
+ QUICNetVConnection *vc = nullptr;
+ IOBufferBlock *block = udp_packet->getIOBlockChain();
if (is_debug_tag_set("quic_sec")) {
ip_port_text_buffer ipb;
@@ -163,7 +164,7 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet)
// Create a new NetVConnection
QUICConnectionId original_cid = this->_read_connection_id(block);
- QUICNetVConnection *vc = static_cast<QUICNetVConnection *>(getNetProcessor()->allocate_vc(nullptr));
+ vc = static_cast<QUICNetVConnection *>(getNetProcessor()->allocate_vc(nullptr));
vc->init(original_cid, udp_packet->getConnection(), this, &this->_ctable);
vc->id = net_next_connection_number();
vc->con.move(con);
@@ -173,16 +174,21 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet)
vc->action_ = *this->action_;
vc->set_is_transparent(this->opt.f_inbound_transparent);
vc->set_context(NET_VCONNECTION_IN);
+ vc->read.triggered = 1;
+ vc->start(this->_ssl_ctx);
vc->options.ip_proto = NetVCOptions::USE_UDP;
vc->options.ip_family = udp_packet->from.sa.sa_family;
qc = vc;
+ } else {
+ vc = static_cast<QUICNetVConnection *>(qc);
+ eth = vc->thread;
}
// Push the packet into QUICPollCont
udp_packet->data.ptr = vc;
- get_QUICPollCont(eth)->inQueue.push(udp_packet);
-
+ // should we use dynamic_cast ??
+ get_QUICPollCont(eth)->inQueue.push(static_cast<UDPPacketInternal *>(udp_packet));
}
// TODO: Should be called via eventProcessor?
diff --git a/proxy/Main.cc b/proxy/Main.cc
index 5e36c0e..5614444 100644
--- a/proxy/Main.cc
+++ b/proxy/Main.cc
@@ -1800,6 +1800,10 @@ main(int /* argc ATS_UNUSED */, const char **argv)
// Do the inits for NetProcessors that use ET_NET threads. MUST be before starting those threads.
netProcessor.init();
init_HttpProxyServer();
+#if TS_USE_QUIC == 1
+ // OK, pushing a spawn scheduling here
+ quic_NetProcessor.init();
+#endif
// !! ET_NET threads start here !!
// This means any spawn scheduling must be done before this point.
--
To stop receiving notification emails like this one, please contact
scw00@apache.org.