You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ma...@apache.org on 2018/05/25 00:02:44 UTC

[trafficserver] branch quic-latest updated: Set nbytes when total stream data length become clear

This is an automated email from the ASF dual-hosted git repository.

masaori pushed a commit to branch quic-latest
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/quic-latest by this push:
     new 7b3a66f  Set nbytes when total stream data length become clear
7b3a66f is described below

commit 7b3a66fda73336af6fa47b3c46080bd8756e8510
Author: Masaori Koshiba <ma...@apache.org>
AuthorDate: Wed May 23 16:14:59 2018 +0900

    Set nbytes when total stream data length become clear
    
    To indicate FIN flag beween QUICStream and QUICApplication. This is similar to processing chunked transfer encoding.
    Mark VIO wrapper apis in QUICStreamIO as deprecated. QUICApplications should use read/write/reenable apis.
---
 cmd/traffic_quic/quic_client.cc    |   7 ++-
 iocore/net/quic/QUICApplication.cc | 107 +++++++++++++++++++++++++------------
 iocore/net/quic/QUICApplication.h  |  31 +++++++----
 iocore/net/quic/QUICStream.cc      |  40 ++++----------
 iocore/net/quic/QUICStream.h       |   6 ---
 proxy/hq/HQClientTransaction.cc    |   8 ++-
 proxy/hq/HQFrameCollector.cc       |   2 +-
 proxy/hq/QUICSimpleApp.cc          |   2 +-
 8 files changed, 114 insertions(+), 89 deletions(-)

diff --git a/cmd/traffic_quic/quic_client.cc b/cmd/traffic_quic/quic_client.cc
index a39d160..da43583 100644
--- a/cmd/traffic_quic/quic_client.cc
+++ b/cmd/traffic_quic/quic_client.cc
@@ -147,7 +147,7 @@ QUICClientApp::start(const char *path)
   QUICStreamIO *stream_io = this->_find_stream_io(stream_id);
 
   stream_io->write(reinterpret_cast<uint8_t *>(request), request_len);
-  stream_io->shutdown();
+  stream_io->write_done();
   stream_io->write_reenable();
 }
 
@@ -191,6 +191,11 @@ QUICClientApp::main_event_handler(int event, Event *data)
       std::cout.rdbuf(default_stream);
     }
 
+    if (stream_io->is_read_done()) {
+      // Connection Close Exercise
+      this->_qc->close(QUICConnectionErrorUPtr(new QUICConnectionError(QUICTransErrorCode::NO_ERROR, "Close Exercise")));
+    }
+
     break;
   }
   case VC_EVENT_WRITE_READY:
diff --git a/iocore/net/quic/QUICApplication.cc b/iocore/net/quic/QUICApplication.cc
index b6b4b98..cd178f0 100644
--- a/iocore/net/quic/QUICApplication.cc
+++ b/iocore/net/quic/QUICApplication.cc
@@ -34,31 +34,41 @@ QUICStreamIO::QUICStreamIO(QUICApplication *app, QUICStream *stream) : _stream(s
   this->_read_buffer  = new_MIOBuffer(BUFFER_SIZE_INDEX_8K);
   this->_write_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_8K);
 
-  this->_read_buffer_reader  = _read_buffer->alloc_reader();
-  this->_write_buffer_reader = _write_buffer->alloc_reader();
+  this->_read_buffer_reader  = this->_read_buffer->alloc_reader();
+  this->_write_buffer_reader = this->_write_buffer->alloc_reader();
 
   this->_read_vio  = stream->do_io_read(app, INT64_MAX, this->_read_buffer);
   this->_write_vio = stream->do_io_write(app, INT64_MAX, this->_write_buffer_reader);
 }
 
-int64_t
-QUICStreamIO::read_avail()
+QUICStreamIO::~QUICStreamIO()
 {
-  return this->_read_buffer_reader->read_avail();
-}
+  // All readers will be deallocated
+  free_MIOBuffer(this->_read_buffer);
+  free_MIOBuffer(this->_write_buffer);
+};
 
-bool
-QUICStreamIO::is_read_avail_more_than(int64_t size)
+uint32_t
+QUICStreamIO::stream_id() const
 {
-  return this->_read_buffer_reader->is_read_avail_more_than(size);
+  return this->_stream->id();
 }
 
 int64_t
 QUICStreamIO::read(uint8_t *buf, int64_t len)
 {
-  int64_t read_len = this->_read_buffer_reader->read(const_cast<uint8_t *>(buf), len);
-  this->_read_vio->ndone += read_len;
-  return read_len;
+  int64_t nread = this->_read_buffer_reader->read(const_cast<uint8_t *>(buf), len);
+  if (nread > 0) {
+    this->_read_vio->ndone += nread;
+  }
+
+  return nread;
+}
+
+bool
+QUICStreamIO::is_read_done()
+{
+  return this->_read_vio->ntodo() == 0;
 }
 
 int64_t
@@ -66,32 +76,71 @@ QUICStreamIO::write(const uint8_t *buf, int64_t len)
 {
   SCOPED_MUTEX_LOCK(lock, this->_write_vio->mutex, this_ethread());
 
-  return this->_write_buffer->write(buf, len);
-}
+  int64_t nwritten = this->_write_buffer->write(buf, len);
+  if (nwritten > 0) {
+    this->_nwritten += nwritten;
+  }
 
-int64_t
-QUICStreamIO::write_avail()
-{
-  return this->_write_buffer->write_avail();
+  return len;
 }
 
 int64_t
-QUICStreamIO::write(IOBufferReader *r, int64_t alen, int64_t offset)
+QUICStreamIO::write(IOBufferReader *r, int64_t len)
 {
   SCOPED_MUTEX_LOCK(lock, this->_write_vio->mutex, this_ethread());
 
   int64_t bytes_avail = this->_write_buffer->write_avail();
-  Debug(tag, "nbytes=%" PRId64 " ndone=%" PRId64 " write_avail=%" PRId64 " write_len=%" PRId64, this->_write_vio->nbytes,
-        this->_write_vio->ndone, bytes_avail, alen);
 
   if (bytes_avail > 0) {
-    int64_t len = std::min(bytes_avail, alen);
-    return this->_write_buffer->write(r, len, offset);
+    Debug(tag, "nbytes=%" PRId64 " ndone=%" PRId64 " write_avail=%" PRId64 " write_len=%" PRId64, this->_write_vio->nbytes,
+          this->_write_vio->ndone, bytes_avail, len);
+
+    int64_t bytes_len = std::min(bytes_avail, len);
+    int64_t nwritten  = this->_write_buffer->write(r, bytes_len);
+
+    if (nwritten > 0) {
+      this->_nwritten += nwritten;
+    }
+
+    return nwritten;
   } else {
     return 0;
   }
 }
 
+// TODO: Similar to other "write" apis, but do not copy.
+int64_t
+QUICStreamIO::write(IOBufferBlock *b)
+{
+  ink_assert(!"not implemented yet");
+  return 0;
+}
+
+void
+QUICStreamIO::write_done()
+{
+  this->_write_vio->nbytes = this->_nwritten;
+}
+
+// !!! DEPRECATED !!!
+int64_t
+QUICStreamIO::read_avail()
+{
+  return this->_read_buffer_reader->read_avail();
+}
+
+bool
+QUICStreamIO::is_read_avail_more_than(int64_t size)
+{
+  return this->_read_buffer_reader->is_read_avail_more_than(size);
+}
+
+int64_t
+QUICStreamIO::write_avail()
+{
+  return this->_write_buffer->write_avail();
+}
+
 void
 QUICStreamIO::set_write_vio_nbytes(int64_t nbytes)
 {
@@ -116,18 +165,6 @@ QUICStreamIO::get_read_buffer_reader()
   return this->_read_buffer_reader;
 }
 
-void
-QUICStreamIO::shutdown()
-{
-  return this->_stream->shutdown();
-}
-
-uint32_t
-QUICStreamIO::get_transaction_id() const
-{
-  return this->_stream->id();
-}
-
 //
 // QUICApplication
 //
diff --git a/iocore/net/quic/QUICApplication.h b/iocore/net/quic/QUICApplication.h
index eef90a0..e9b8e4c 100644
--- a/iocore/net/quic/QUICApplication.h
+++ b/iocore/net/quic/QUICApplication.h
@@ -32,26 +32,31 @@
 class QUICApplication;
 
 /**
- * @brief QUICStream I/O interface from QUIC Application
+ @brief QUICStream I/O Interface for QUICApplication
  */
 class QUICStreamIO
 {
 public:
   QUICStreamIO(QUICApplication *app, QUICStream *stream);
-  virtual ~QUICStreamIO(){};
+  virtual ~QUICStreamIO();
+
+  uint32_t stream_id() const;
 
-  int64_t read_avail();
-  bool is_read_avail_more_than(int64_t size);
   int64_t read(uint8_t *buf, int64_t len);
-  int64_t write_avail();
-  int64_t write(const uint8_t *buf, int64_t len);
-  int64_t write(IOBufferReader *r, int64_t len = INT64_MAX, int64_t offset = 0);
-  void set_write_vio_nbytes(int64_t);
+  bool is_read_done();
   virtual void read_reenable();
+
+  int64_t write(const uint8_t *buf, int64_t len);
+  int64_t write(IOBufferReader *r, int64_t len);
+  int64_t write(IOBufferBlock *b);
+  void write_done();
   virtual void write_reenable();
-  IOBufferReader *get_read_buffer_reader();
-  void shutdown();
-  uint32_t get_transaction_id() const;
+
+  [[deprecated]] int64_t read_avail();
+  [[deprecated]] bool is_read_avail_more_than(int64_t size);
+  [[deprecated]] int64_t write_avail();
+  [[deprecated]] void set_write_vio_nbytes(int64_t);
+  [[deprecated]] IOBufferReader *get_read_buffer_reader();
 
 protected:
   MIOBuffer *_read_buffer  = nullptr;
@@ -65,6 +70,10 @@ private:
 
   VIO *_read_vio  = nullptr;
   VIO *_write_vio = nullptr;
+
+  // Track how much data is written to _write_vio. When total size of data become clear,
+  // set it to _write_vio.nbytes.
+  uint64_t _nwritten = 0;
 };
 
 /**
diff --git a/iocore/net/quic/QUICStream.cc b/iocore/net/quic/QUICStream.cc
index c4ac423..f2142c0 100644
--- a/iocore/net/quic/QUICStream.cc
+++ b/iocore/net/quic/QUICStream.cc
@@ -184,6 +184,7 @@ QUICStream::state_stream_closed(int event, void *data)
   return EVENT_DONE;
 }
 
+// this->_read_vio.nbytes should be INT64_MAX until receive FIN flag
 VIO *
 QUICStream::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
 {
@@ -272,25 +273,17 @@ QUICStream::reenable(VIO *vio)
 }
 
 void
-QUICStream::set_read_vio_nbytes(int64_t nbytes)
-{
-  this->_read_vio.nbytes = nbytes;
-}
-
-void
-QUICStream::set_write_vio_nbytes(int64_t nbytes)
-{
-  this->_write_vio.nbytes = nbytes;
-}
-
-void
 QUICStream::_write_to_read_vio(const std::shared_ptr<const QUICStreamFrame> &frame)
 {
   SCOPED_MUTEX_LOCK(lock, this->_read_vio.mutex, this_ethread());
 
-  int bytes_added = this->_read_vio.buffer.writer()->write(frame->data(), frame->data_length());
-  this->_read_vio.nbytes += bytes_added;
-  // frame->offset() + frame->data_length() == this->_recv_offset
+  uint64_t bytes_added = this->_read_vio.buffer.writer()->write(frame->data(), frame->data_length());
+
+  // Until receive FIN flag, keep nbytes INT64_MAX
+  if (frame->has_fin_flag() && bytes_added == frame->data_length()) {
+    this->_read_vio.nbytes = frame->offset() + frame->data_length();
+  }
+
   this->_local_flow_controller.forward_limit(frame->offset() + frame->data_length() + this->_flow_control_buffer_size);
   QUICStreamFCDebug("[LOCAL] %" PRIu64 "/%" PRIu64, this->_local_flow_controller.current_offset(),
                     this->_local_flow_controller.current_limit());
@@ -408,8 +401,9 @@ QUICStream::generate_frame(uint16_t connection_credit, uint16_t maximum_frame_si
   len = std::min(data_len, static_cast<int64_t>(
                              std::min(static_cast<uint64_t>(maximum_frame_size),
                                       std::min(this->_remote_flow_controller.credit(), static_cast<uint64_t>(connection_credit)))));
-  if (len >= bytes_avail) {
-    fin = this->_fin;
+
+  if (this->_write_vio.ntodo() == bytes_avail) {
+    fin = true;
   }
 
   if (len > 0) {
@@ -559,18 +553,6 @@ QUICStream::reset(QUICStreamErrorUPtr error)
   this->_reset_reason = std::move(error);
 }
 
-void
-QUICStream::shutdown()
-{
-  this->_fin = true;
-}
-
-size_t
-QUICStream::nbytes_to_read()
-{
-  return this->_read_vio.ntodo();
-}
-
 QUICOffset
 QUICStream::largest_offset_received()
 {
diff --git a/iocore/net/quic/QUICStream.h b/iocore/net/quic/QUICStream.h
index c7cc012..99c501c 100644
--- a/iocore/net/quic/QUICStream.h
+++ b/iocore/net/quic/QUICStream.h
@@ -74,8 +74,6 @@ public:
   void do_io_close(int lerrno = -1) override;
   void do_io_shutdown(ShutdownHowTo_t howto) override;
   void reenable(VIO *vio) override;
-  void set_read_vio_nbytes(int64_t);
-  void set_write_vio_nbytes(int64_t);
 
   QUICErrorUPtr recv(const std::shared_ptr<const QUICStreamFrame> frame);
   QUICErrorUPtr recv(const std::shared_ptr<const QUICMaxStreamDataFrame> frame);
@@ -83,9 +81,6 @@ public:
   QUICErrorUPtr recv(const std::shared_ptr<const QUICStopSendingFrame> frame);
 
   void reset(QUICStreamErrorUPtr error);
-  void shutdown();
-
-  size_t nbytes_to_read();
 
   QUICOffset largest_offset_received();
   QUICOffset largest_offset_sent();
@@ -106,7 +101,6 @@ private:
   void _write_to_read_vio(const std::shared_ptr<const QUICStreamFrame> &);
 
   QUICStreamState _state;
-  bool _fin                         = false;
   QUICStreamErrorUPtr _reset_reason = nullptr;
   QUICConnectionId _connection_id   = QUICConnectionId::ZERO();
   QUICStreamId _id                  = 0;
diff --git a/proxy/hq/HQClientTransaction.cc b/proxy/hq/HQClientTransaction.cc
index 3574b65..13ad9f6 100644
--- a/proxy/hq/HQClientTransaction.cc
+++ b/proxy/hq/HQClientTransaction.cc
@@ -494,9 +494,6 @@ HQClientTransaction::_process_write_vio()
       int64_t headers_size   = headers->read_avail();
       reader->consume(headers_size);
       this->_write_vio.ndone += headers_size;
-
-      // The size of respons to client
-      this->_stream_io->set_write_vio_nbytes(this->_write_vio.nbytes - headers_size);
     }
 
     // Write HTTP/1.1 response body
@@ -520,7 +517,8 @@ HQClientTransaction::_process_write_vio()
     // NOTE: When Chunked Transfer Coding is supported, check ChunkedState of ChunkedHandler
     // is CHUNK_READ_DONE and set FIN flag
     if (this->_write_vio.ntodo() == 0) {
-      this->_stream_io->shutdown();
+      // The size of respons to client
+      this->_stream_io->write_done();
     }
 
     return total_written;
@@ -541,7 +539,7 @@ HQClientTransaction::transaction_done()
 int
 HQClientTransaction::get_transaction_id() const
 {
-  return this->_stream_io->get_transaction_id();
+  return this->_stream_io->stream_id();
 }
 
 bool
diff --git a/proxy/hq/HQFrameCollector.cc b/proxy/hq/HQFrameCollector.cc
index f316524..c19ca19 100644
--- a/proxy/hq/HQFrameCollector.cc
+++ b/proxy/hq/HQFrameCollector.cc
@@ -49,7 +49,7 @@ HQFrameCollector::on_write_ready(QUICStreamIO *stream_io, size_t &nwritten)
   }
 
   if (all_done) {
-    stream_io->shutdown();
+    stream_io->write_done();
   }
 
   return HQErrorUPtr(new HQNoError());
diff --git a/proxy/hq/QUICSimpleApp.cc b/proxy/hq/QUICSimpleApp.cc
index b06821d..8ce397e 100644
--- a/proxy/hq/QUICSimpleApp.cc
+++ b/proxy/hq/QUICSimpleApp.cc
@@ -65,7 +65,7 @@ QUICSimpleApp::main_event_handler(int event, Event *data)
     return -1;
   }
 
-  QUICStreamId stream_id   = stream_io->get_transaction_id();
+  QUICStreamId stream_id   = stream_io->stream_id();
   HQClientTransaction *txn = this->_client_session->get_transaction(stream_id);
 
   switch (event) {

-- 
To stop receiving notification emails like this one, please contact
masaori@apache.org.