You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by sc...@apache.org on 2018/02/16 10:41:34 UTC

[trafficserver] branch quic-latest updated: QUIC: Push closed conn into closed queue

This is an automated email from the ASF dual-hosted git repository.

scw00 pushed a commit to branch quic-latest
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/quic-latest by this push:
     new e4c3201  QUIC: Push closed conn into closed queue
e4c3201 is described below

commit e4c3201015f3d62438b7843ad20072b12219f200
Author: scw00 <sc...@apache.org>
AuthorDate: Fri Feb 16 09:25:05 2018 +0800

    QUIC: Push closed conn into closed queue
---
 iocore/net/Makefile.am                 |  2 ++
 iocore/net/P_QUICClosedConCollector.h  | 39 +++++++++++++++++++++
 iocore/net/P_QUICNetVConnection.h      |  8 +++++
 iocore/net/P_QUICPacketHandler.h       |  8 +++++
 iocore/net/QUICClosedConCollector.cc   | 62 ++++++++++++++++++++++++++++++++++
 iocore/net/QUICNetVConnection.cc       | 13 +++++++
 iocore/net/QUICPacketHandler.cc        | 33 ++++++++++++++++++
 iocore/net/quic/QUICConnectionTable.cc |  7 +++-
 8 files changed, 171 insertions(+), 1 deletion(-)

diff --git a/iocore/net/Makefile.am b/iocore/net/Makefile.am
index f32cc13..0bc196f 100644
--- a/iocore/net/Makefile.am
+++ b/iocore/net/Makefile.am
@@ -161,11 +161,13 @@ libinknet_a_SOURCES = \
 
 if ENABLE_QUIC
 libinknet_a_SOURCES += \
+  P_QUICClosedConCollector.h \
   P_QUICPacketHandler.h \
   P_QUICNet.h \
   P_QUICNetProcessor.h \
   P_QUICNetVConnection.h \
   P_QUICNextProtocolAccept.h \
+  QUICClosedConCollector.cc \
   QUICPacketHandler.cc \
   QUICNet.cc \
   QUICNetProcessor.cc \
diff --git a/iocore/net/P_QUICClosedConCollector.h b/iocore/net/P_QUICClosedConCollector.h
new file mode 100644
index 0000000..4484e97
--- /dev/null
+++ b/iocore/net/P_QUICClosedConCollector.h
@@ -0,0 +1,39 @@
+/** @file
+  This file implements an I/O Processor for network I/O
+  @section license License
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#ifndef _QUIC_CLOSED_CON_COLLECTOR_H_
+#define _QUIC_CLOSED_CON_COLLECTOR_H_
+
+#include "P_QUICNetVConnection.h"
+
+class QUICClosedConCollector : public Continuation
+{
+public:
+  QUICClosedConCollector();
+
+  int mainEvent(int event, Event *e);
+
+  ASLL(QUICNetVConnection, closed_alink) closedQueue;
+
+private:
+  Que(QUICNetVConnection, closed_link) _localClosedQueue;
+
+  void _process_closed_connection(EThread *t);
+};
+
+#endif
diff --git a/iocore/net/P_QUICNetVConnection.h b/iocore/net/P_QUICNetVConnection.h
index 8f4d7e5..a7c7ca0 100644
--- a/iocore/net/P_QUICNetVConnection.h
+++ b/iocore/net/P_QUICNetVConnection.h
@@ -164,6 +164,7 @@ public:
   int state_connection_closed(int event, Event *data);
   void start(SSL_CTX *);
   void free(EThread *t) override;
+  void destroy(EThread *t);
 
   UDPConnection *get_udp_con();
   virtual void net_read_io(NetHandler *nh, EThread *lthread) override;
@@ -203,6 +204,13 @@ public:
   std::vector<QUICFrameType> interests() override;
   QUICErrorUPtr handle_frame(std::shared_ptr<const QUICFrame> frame) override;
 
+  int in_closed_queue = 0;
+
+  bool shouldDestroy();
+
+  LINK(QUICNetVConnection, closed_link);
+  SLINK(QUICNetVConnection, closed_alink);
+
 private:
   class AltConnectionInfo
   {
diff --git a/iocore/net/P_QUICPacketHandler.h b/iocore/net/P_QUICPacketHandler.h
index 84d43b8..70b3048 100644
--- a/iocore/net/P_QUICPacketHandler.h
+++ b/iocore/net/P_QUICPacketHandler.h
@@ -29,18 +29,26 @@
 #include "quic/QUICTypes.h"
 #include "quic/QUICConnectionTable.h"
 
+class QUICClosedConCollector;
 class QUICNetVConnection;
 class QUICPacket;
 
 class QUICPacketHandler
 {
 public:
+  QUICPacketHandler();
+  ~QUICPacketHandler();
+
   virtual void send_packet(const QUICPacket &packet, QUICNetVConnection *vc) = 0;
+  virtual void close_conenction(QUICNetVConnection *conn);
 
 protected:
   static void _send_packet(Continuation *c, const QUICPacket &packet, UDPConnection *udp_con, IpEndpoint &addr, uint32_t pmtu);
   static QUICConnectionId _read_connection_id(IOBufferBlock *block);
 
+  Event *_collector_event                       = nullptr;
+  QUICClosedConCollector *_closed_con_collector = nullptr;
+
   virtual void _recv_packet(int event, UDPPacket *udpPacket) = 0;
 };
 
diff --git a/iocore/net/QUICClosedConCollector.cc b/iocore/net/QUICClosedConCollector.cc
new file mode 100644
index 0000000..19095fe
--- /dev/null
+++ b/iocore/net/QUICClosedConCollector.cc
@@ -0,0 +1,62 @@
+/** @file
+  This file implements an I/O Processor for network I/O
+  @section license License
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#include "P_QUICClosedConCollector.h"
+
+QUICClosedConCollector::QUICClosedConCollector()
+{
+  SET_HANDLER(&QUICClosedConCollector::mainEvent);
+}
+
+int
+QUICClosedConCollector::mainEvent(int event, Event *e)
+{
+  EThread *t = this->mutex->thread_holding;
+  ink_assert(t == this_thread());
+
+  this->_process_closed_connection(t);
+  return 0;
+}
+
+void
+QUICClosedConCollector::_process_closed_connection(EThread *t)
+{
+  ink_release_assert(t != nullptr);
+
+  QUICNetVConnection *qvc;
+  Que(QUICNetVConnection, closed_link) local_queue;
+
+  while ((qvc = this->_localClosedQueue.pop())) {
+    if (qvc->shouldDestroy()) {
+      qvc->free(t);
+    } else {
+      local_queue.push(qvc);
+    }
+  }
+
+  SList(QUICNetVConnection, closed_alink) aq(this->closedQueue.popall());
+  while ((qvc = aq.pop())) {
+    if (qvc->shouldDestroy()) {
+      qvc->free(t);
+    } else {
+      local_queue.push(qvc);
+    }
+  }
+
+  this->_localClosedQueue.append(local_queue);
+}
diff --git a/iocore/net/QUICNetVConnection.cc b/iocore/net/QUICNetVConnection.cc
index 372403e..8ea1461 100644
--- a/iocore/net/QUICNetVConnection.cc
+++ b/iocore/net/QUICNetVConnection.cc
@@ -79,6 +79,13 @@ QUICNetVConnection::init(QUICConnectionId original_cid, UDPConnection *udp_con,
                static_cast<uint64_t>(this->_quic_connection_id));
 }
 
+bool
+QUICNetVConnection::shouldDestroy()
+{
+  // TODO: return this->refcount == 0;
+  return true;
+}
+
 VIO *
 QUICNetVConnection::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
 {
@@ -654,6 +661,12 @@ QUICNetVConnection::state_connection_closed(int event, Event *data)
     // Shutdown loss detector
     this->_loss_detector->handleEvent(QUIC_EVENT_LD_SHUTDOWN, nullptr);
 
+    if (this->nh) {
+      this->nh->stopCop(this);
+      this->nh->stopIO(this);
+    }
+
+    this->_packet_handler->close_conenction(this);
     break;
   }
   case QUIC_EVENT_PACKET_WRITE_READY: {
diff --git a/iocore/net/QUICPacketHandler.cc b/iocore/net/QUICPacketHandler.cc
index 6bf37e7..2f94cb8 100644
--- a/iocore/net/QUICPacketHandler.cc
+++ b/iocore/net/QUICPacketHandler.cc
@@ -22,6 +22,7 @@
 #include "ts/ink_config.h"
 #include "P_Net.h"
 
+#include "P_QUICClosedConCollector.h"
 #include "QUICConfig.h"
 #include "QUICPacket.h"
 #include "QUICDebugNames.h"
@@ -30,6 +31,34 @@
 //
 // QUICPacketHandler
 //
+QUICPacketHandler::QUICPacketHandler()
+{
+  this->_closed_con_collector        = new QUICClosedConCollector;
+  this->_closed_con_collector->mutex = new_ProxyMutex();
+}
+
+QUICPacketHandler::~QUICPacketHandler()
+{
+  if (this->_collector_event != nullptr) {
+    this->_collector_event->cancel();
+    this->_collector_event = nullptr;
+  }
+
+  if (this->_closed_con_collector != nullptr) {
+    delete this->_closed_con_collector;
+    this->_closed_con_collector = nullptr;
+  }
+}
+
+void
+QUICPacketHandler::close_conenction(QUICNetVConnection *conn)
+{
+  int isin = ink_atomic_swap(&conn->in_closed_queue, 1);
+  if (!isin) {
+    this->_closed_con_collector->closedQueue.push(conn);
+  }
+}
+
 void
 QUICPacketHandler::_send_packet(Continuation *c, const QUICPacket &packet, UDPConnection *udp_con, IpEndpoint &addr, uint32_t pmtu)
 {
@@ -96,6 +125,10 @@ QUICPacketHandlerIn::acceptEvent(int event, void *data)
     // Nothing to do.
     return EVENT_CONT;
   } else if (event == NET_EVENT_DATAGRAM_READ_READY) {
+    if (this->_collector_event == nullptr) {
+      this->_collector_event = this_ethread()->schedule_every(this->_closed_con_collector, HRTIME_MSECONDS(100));
+    }
+
     Queue<UDPPacket> *queue = (Queue<UDPPacket> *)data;
     UDPPacket *packet_r;
     while ((packet_r = queue->dequeue())) {
diff --git a/iocore/net/quic/QUICConnectionTable.cc b/iocore/net/quic/QUICConnectionTable.cc
index ce48a8a..eb048d7 100644
--- a/iocore/net/quic/QUICConnectionTable.cc
+++ b/iocore/net/quic/QUICConnectionTable.cc
@@ -36,7 +36,12 @@ QUICConnectionTable::insert(QUICConnectionId cid, QUICConnection *connection)
 void
 QUICConnectionTable::erase(QUICConnectionId cid, QUICConnection *connection)
 {
-  ink_assert(this->_connections.get(cid) == connection);
+  QUICConnection *qc = this->_connections.get(cid);
+  if (qc == nullptr) {
+    return;
+  }
+  ink_assert(qc == connection);
+  Debug("quic_ctable", "ctable erase cid: [%" PRIx64 "] ", static_cast<uint64_t>(cid));
   // if (this->_cids.get(connection->endpoint(), connection->connection_id()) == cid) {
   //   this->_cids.put(connection->endpoint(), nullptr);
   // }

-- 
To stop receiving notification emails like this one, please contact
scw00@apache.org.