You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ma...@apache.org on 2017/08/23 05:51:50 UTC

[trafficserver] 02/02: Add Flow Control support

This is an automated email from the ASF dual-hosted git repository.

masaori pushed a commit to branch quic-latest
in repository https://gitbox.apache.org/repos/asf/trafficserver.git

commit 08f2639c08f722660317b95f0c6d9116cb01f5ac
Author: Masaori Koshiba <ma...@apache.org>
AuthorDate: Wed Aug 23 11:47:56 2017 +0900

    Add Flow Control support
---
 iocore/net/P_QUICNetVConnection.h       |   5 +-
 iocore/net/QUICNetVConnection.cc        |   9 +-
 iocore/net/quic/Mock.h                  |  10 ++
 iocore/net/quic/QUICConnection.h        |   1 +
 iocore/net/quic/QUICStream.cc           | 128 +++++++++++++++++++++++--
 iocore/net/quic/QUICStream.h            |  17 +++-
 iocore/net/quic/QUICStreamManager.cc    | 164 ++++++++++++++++++++++++++++++--
 iocore/net/quic/QUICStreamManager.h     |  30 +++++-
 iocore/net/quic/QUICTypes.h             |  71 ++++++++++++--
 iocore/net/quic/test/test_QUICStream.cc |   8 +-
 10 files changed, 408 insertions(+), 35 deletions(-)

diff --git a/iocore/net/P_QUICNetVConnection.h b/iocore/net/P_QUICNetVConnection.h
index d42c63d..b4e07de 100644
--- a/iocore/net/P_QUICNetVConnection.h
+++ b/iocore/net/P_QUICNetVConnection.h
@@ -168,6 +168,7 @@ public:
   uint32_t pmtu() override;
   void set_transport_parameters(std::unique_ptr<QUICTransportParameters> tp) override;
   const QUICTransportParameters &local_transport_parameters() override;
+  const QUICTransportParameters &remote_transport_parameters() override;
   void close(QUICError error) override;
 
   // QUICConnection (QUICPacketTransmitter)
@@ -193,8 +194,8 @@ private:
 
   uint32_t _pmtu = 1280;
 
-  std::unique_ptr<QUICTransportParameters> _local_transport_parameters;
-  std::unique_ptr<QUICTransportParameters> _remote_transport_parameters;
+  std::unique_ptr<QUICTransportParameters> _local_transport_parameters  = nullptr;
+  std::unique_ptr<QUICTransportParameters> _remote_transport_parameters = nullptr;
 
   // TODO: use custom allocator and make them std::unique_ptr or std::shared_ptr
   // or make them just member variables.
diff --git a/iocore/net/QUICNetVConnection.cc b/iocore/net/QUICNetVConnection.cc
index 3fc88e0..7e92e60 100644
--- a/iocore/net/QUICNetVConnection.cc
+++ b/iocore/net/QUICNetVConnection.cc
@@ -94,7 +94,7 @@ QUICNetVConnection::start(SSL_CTX *ssl_ctx)
 
   // Create frame handlers
   this->_stream_manager = new QUICStreamManager();
-  this->_stream_manager->init(this, &this->_application_map);
+  this->_stream_manager->init(this, this, &this->_application_map);
   this->_flow_controller       = new QUICFlowController();
   this->_congestion_controller = new QUICCongestionController();
   this->_loss_detector         = new QUICLossDetector(this);
@@ -174,6 +174,7 @@ void
 QUICNetVConnection::set_transport_parameters(std::unique_ptr<QUICTransportParameters> tp)
 {
   this->_remote_transport_parameters = std::move(tp);
+  this->_stream_manager->init_flow_control_params(*this->_local_transport_parameters, *this->_remote_transport_parameters);
 
   const QUICTransportParametersInClientHello *tp_in_ch =
     dynamic_cast<QUICTransportParametersInClientHello *>(this->_remote_transport_parameters.get());
@@ -212,6 +213,12 @@ QUICNetVConnection::local_transport_parameters()
   return *this->_local_transport_parameters;
 }
 
+const QUICTransportParameters &
+QUICNetVConnection::remote_transport_parameters()
+{
+  return *this->_remote_transport_parameters;
+}
+
 uint32_t
 QUICNetVConnection::minimum_quic_packet_size()
 {
diff --git a/iocore/net/quic/Mock.h b/iocore/net/quic/Mock.h
index 7848e5b..195b285 100644
--- a/iocore/net/quic/Mock.h
+++ b/iocore/net/quic/Mock.h
@@ -169,6 +169,12 @@ public:
     return dummy_transport_parameters;
   }
 
+  const QUICTransportParameters &
+  remote_transport_parameters() override
+  {
+    return dummy_transport_parameters;
+  }
+
   void
   close(QUICError error) override
   {
@@ -285,6 +291,10 @@ public:
     return _totalFrameCount;
   }
 
+  bool is_recv_avail_more_than(uint64_t /* size */) override { return true; }
+
+  void send_frame(std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> /* frame */) override { return; }
+
 private:
   int _totalFrameCount = 0;
   int _frameCount[256] = {0};
diff --git a/iocore/net/quic/QUICConnection.h b/iocore/net/quic/QUICConnection.h
index a90b73f..9aad687 100644
--- a/iocore/net/quic/QUICConnection.h
+++ b/iocore/net/quic/QUICConnection.h
@@ -38,5 +38,6 @@ public:
   virtual uint32_t pmtu()                                                            = 0;
   virtual void set_transport_parameters(std::unique_ptr<QUICTransportParameters> tp) = 0;
   virtual const QUICTransportParameters &local_transport_parameters()                = 0;
+  virtual const QUICTransportParameters &remote_transport_parameters()               = 0;
   virtual void close(QUICError error)                                                = 0;
 };
diff --git a/iocore/net/quic/QUICStream.cc b/iocore/net/quic/QUICStream.cc
index 8a10c57..3623377 100644
--- a/iocore/net/quic/QUICStream.cc
+++ b/iocore/net/quic/QUICStream.cc
@@ -26,15 +26,21 @@
 #include "I_Event.h"
 #include "QUICStreamManager.h"
 #include "QUICDebugNames.h"
+#include "QUICConfig.h"
 
 const static char *tag = "quic_stream";
 
+constexpr uint64_t MAX_DATA_HEADSPACE        = 10240; // in uints of octets
+constexpr uint64_t MAX_STREAM_DATA_HEADSPACE = 1024;
+
 void
-QUICStream::init(QUICStreamManager *manager, QUICFrameTransmitter *tx, QUICStreamId id)
+QUICStream::init(QUICStreamManager *manager, QUICFrameTransmitter *tx, QUICStreamId id, uint64_t recv_max_stream_data,
+                 uint64_t send_max_stream_data)
 {
   this->_streamManager = manager;
   this->_tx            = tx;
   this->_id            = id;
+  init_flow_control_params(recv_max_stream_data, send_max_stream_data);
 
   this->mutex = new_ProxyMutex();
 }
@@ -45,6 +51,14 @@ QUICStream::start()
   SET_HANDLER(&QUICStream::main_event_handler);
 }
 
+void
+QUICStream::init_flow_control_params(uint32_t recv_max_stream_data, uint32_t send_max_stream_data)
+{
+  this->_recv_max_stream_data        = recv_max_stream_data;
+  this->_recv_max_stream_data_deleta = recv_max_stream_data;
+  this->_send_max_stream_data        = send_max_stream_data;
+}
+
 uint32_t
 QUICStream::id()
 {
@@ -248,7 +262,7 @@ QUICStream::_reorder_data()
  * If the reordering or writting operation is heavy, split out them to read function,
  * which is called by application via do_io_read() or reenable().
  */
-void
+QUICError
 QUICStream::recv(std::shared_ptr<const QUICStreamFrame> frame)
 {
   ink_assert(_id == frame->stream_id());
@@ -256,17 +270,20 @@ QUICStream::recv(std::shared_ptr<const QUICStreamFrame> frame)
 
   if (!this->_state.is_allowed_to_receive(*frame)) {
     this->reset();
-    return;
+    return QUICError(QUICErrorClass::QUIC_TRANSPORT, QUICErrorCode::QUIC_INTERNAL_ERROR);
   }
   this->_state.update_with_received_frame(*frame);
 
-  if (frame->offset() > this->_recv_largest_offset) {
-    this->_recv_largest_offset = frame->offset();
+  // Flow Control
+  QUICError error = this->_recv_flow_control(frame->offset());
+  if (error.cls != QUICErrorClass::NONE) {
+    return error;
   }
 
+  // Reordering
   if (this->_recv_offset > frame->offset()) {
     // Do nothing. Just ignore STREAM frame.
-    return;
+    return QUICError(QUICErrorClass::NONE);
   } else if (this->_recv_offset == frame->offset()) {
     this->_write_to_read_vio(frame);
     this->_reorder_data();
@@ -276,7 +293,71 @@ QUICStream::recv(std::shared_ptr<const QUICStreamFrame> frame)
     this->_request_stream_frame_buffer.insert(std::make_pair(frame->offset(), frame));
   }
 
-  return;
+  return QUICError(QUICErrorClass::NONE);
+}
+
+QUICError
+QUICStream::recv(std::shared_ptr<const QUICMaxStreamDataFrame> frame)
+{
+  this->_send_max_stream_data += frame->maximum_stream_data();
+  return QUICError(QUICErrorClass::NONE);
+}
+
+QUICError
+QUICStream::recv(std::shared_ptr<const QUICStreamBlockedFrame> frame)
+{
+  this->_slide_recv_max_stream_data();
+  return QUICError(QUICErrorClass::NONE);
+}
+
+void
+QUICStream::_slide_recv_max_stream_data()
+{
+  // TODO: How much should this be increased?
+  this->_recv_max_stream_data += this->_recv_max_stream_data_deleta;
+  this->_streamManager->send_frame(QUICFrameFactory::create_max_stream_data_frame(this->_id, this->_recv_max_stream_data));
+}
+
+QUICError
+QUICStream::_recv_flow_control(uint64_t new_offset)
+{
+  if (this->_recv_largest_offset > new_offset) {
+    return QUICError(QUICErrorClass::NONE);
+  }
+
+  uint64_t delta = new_offset - this->_recv_largest_offset;
+
+  Debug("quic_flow_ctrl", "Con: %" PRIu64 "/%" PRIu64 " Stream: %" PRIu64 "/%" PRIu64,
+        (this->_streamManager->recv_total_offset() + delta) / 1024, this->_streamManager->recv_max_data(), new_offset,
+        this->_recv_max_stream_data);
+
+  // Connection Level Flow Control
+  if (this->_id != STREAM_ID_FOR_HANDSHAKE) {
+    if (!this->_streamManager->is_recv_avail_more_than(delta)) {
+      return QUICError(QUICErrorClass::QUIC_TRANSPORT, QUICErrorCode::QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA);
+    }
+
+    if (!this->_streamManager->is_recv_avail_more_than(delta + MAX_DATA_HEADSPACE)) {
+      this->_streamManager->slide_recv_max_data();
+    }
+
+    this->_streamManager->add_recv_total_offset(delta);
+  }
+
+  // Stream Level Flow Control
+  if (this->_recv_max_stream_data > 0) {
+    if (this->_recv_max_stream_data < new_offset) {
+      return QUICError(QUICErrorClass::QUIC_TRANSPORT, QUICErrorCode::QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA);
+    }
+
+    if (this->_recv_max_stream_data < new_offset + MAX_STREAM_DATA_HEADSPACE) {
+      this->_slide_recv_max_stream_data();
+    }
+  }
+
+  this->_recv_largest_offset = new_offset;
+
+  return QUICError(QUICErrorClass::NONE);
 }
 
 /**
@@ -302,6 +383,10 @@ QUICStream::_send()
       len = data_len;
     }
 
+    if (!this->_send_flow_control(len)) {
+      break;
+    }
+
     std::unique_ptr<QUICStreamFrame, QUICFrameDeleterFunc> frame =
       QUICFrameFactory::create_stream_frame(reinterpret_cast<const uint8_t *>(reader->start()), len, this->_id, this->_send_offset);
 
@@ -315,12 +400,39 @@ QUICStream::_send()
       break;
     }
     this->_state.update_with_sent_frame(*frame);
-    this->_streamManager->send_frame(std::move(frame));
+    this->_streamManager->send_stream_frame(std::move(frame));
   }
 
   return;
 }
 
+bool
+QUICStream::_send_flow_control(uint64_t len)
+{
+  Debug("quic_flow_ctrl", "Con: %" PRIu64 "/%" PRIu64 " Stream: %" PRIu64 "/%" PRIu64,
+        (this->_streamManager->send_total_offset() + len) / 1024, this->_streamManager->send_max_data(), this->_send_offset + len,
+        this->_send_max_stream_data);
+
+  // Stream Level Flow Control
+  // TODO: remove check of _send_max_stream_data when moved to Second Implementation completely
+  if (this->_send_max_stream_data > 0 && len > this->_send_max_stream_data) {
+    this->_streamManager->send_frame(QUICFrameFactory::create_stream_blocked_frame(this->_id));
+
+    return false;
+  }
+
+  // Connection Level Flow Control
+  if (this->_id != STREAM_ID_FOR_HANDSHAKE) {
+    if (!this->_streamManager->is_send_avail_more_than(len)) {
+      this->_streamManager->send_frame(QUICFrameFactory::create_blocked_frame());
+
+      return false;
+    }
+  }
+
+  return true;
+}
+
 void
 QUICStream::reset()
 {
diff --git a/iocore/net/quic/QUICStream.h b/iocore/net/quic/QUICStream.h
index 1992d21..6d35e6d 100644
--- a/iocore/net/quic/QUICStream.h
+++ b/iocore/net/quic/QUICStream.h
@@ -45,8 +45,10 @@ public:
   QUICStream() : VConnection(nullptr) {}
   ~QUICStream() {}
 
-  void init(QUICStreamManager *manager, QUICFrameTransmitter *tx, uint32_t id);
+  void init(QUICStreamManager *manager, QUICFrameTransmitter *tx, uint32_t id, uint64_t recv_max_stream_data = 0,
+            uint64_t send_max_stream_data = 0);
   void start();
+  void init_flow_control_params(uint32_t recv_max_stream_data, uint32_t send_max_stream_data);
   int main_event_handler(int event, void *data);
 
   uint32_t id();
@@ -58,7 +60,10 @@ public:
   void do_io_shutdown(ShutdownHowTo_t howto) override;
   void reenable(VIO *vio) override;
 
-  void recv(std::shared_ptr<const QUICStreamFrame> frame);
+  QUICError recv(std::shared_ptr<const QUICStreamFrame> frame);
+  QUICError recv(std::shared_ptr<const QUICMaxStreamDataFrame> frame);
+  QUICError recv(std::shared_ptr<const QUICStreamBlockedFrame> frame);
+
   void reset();
 
   bool is_read_ready();
@@ -81,11 +86,19 @@ private:
 
   Event *_send_tracked_event(Event *event, int send_event, VIO *vio);
 
+  void _slide_recv_max_stream_data();
+  QUICError _recv_flow_control(uint64_t new_offset);
+  bool _send_flow_control(uint64_t len);
+
   QUICStreamId _id                = 0;
   QUICOffset _recv_offset         = 0;
   QUICOffset _recv_largest_offset = 0;
   QUICOffset _send_offset         = 0;
 
+  uint64_t _recv_max_stream_data        = 0;
+  uint64_t _recv_max_stream_data_deleta = 0;
+  uint64_t _send_max_stream_data        = 0;
+
   VIO _read_vio;
   VIO _write_vio;
 
diff --git a/iocore/net/quic/QUICStreamManager.cc b/iocore/net/quic/QUICStreamManager.cc
index 9b08fd2..fab8dea 100644
--- a/iocore/net/quic/QUICStreamManager.cc
+++ b/iocore/net/quic/QUICStreamManager.cc
@@ -21,9 +21,11 @@
  *  limitations under the License.
  */
 
-#include <QUICStreamManager.h>
+#include "QUICStreamManager.h"
 
-#include <QUICApplication.h>
+#include "QUICApplication.h"
+#include "QUICTransportParameters.h"
+#include "QUICConnection.h"
 
 const static char *tag = "quic_stream_manager";
 
@@ -31,34 +33,112 @@ ClassAllocator<QUICStreamManager> quicStreamManagerAllocator("quicStreamManagerA
 ClassAllocator<QUICStream> quicStreamAllocator("quicStreamAllocator");
 
 int
-QUICStreamManager::init(QUICFrameTransmitter *tx, QUICApplicationMap *app_map)
+QUICStreamManager::init(QUICFrameTransmitter *tx, QUICConnection *qc, QUICApplicationMap *app_map)
 {
   this->_tx      = tx;
+  this->_qc      = qc;
   this->_app_map = app_map;
+
   return 0;
 }
 
 std::vector<QUICFrameType>
 QUICStreamManager::interests()
 {
-  return {QUICFrameType::STREAM, QUICFrameType::RST_STREAM};
+  return {QUICFrameType::STREAM, QUICFrameType::RST_STREAM, QUICFrameType::MAX_DATA, QUICFrameType::MAX_STREAM_DATA,
+          QUICFrameType::BLOCKED};
+}
+
+void
+QUICStreamManager::init_flow_control_params(const QUICTransportParameters &local_tp, const QUICTransportParameters &remote_tp)
+{
+  // Connection level
+  this->_recv_max_data = QUICMaximumData(local_tp.initial_max_data());
+  this->_send_max_data = QUICMaximumData(remote_tp.initial_max_data());
+
+  // Setup a stream for Handshake
+  QUICStream *stream = this->_find_stream(STREAM_ID_FOR_HANDSHAKE);
+  stream->init_flow_control_params(local_tp.initial_max_stream_data(), remote_tp.initial_max_stream_data());
 }
 
 void
 QUICStreamManager::handle_frame(std::shared_ptr<const QUICFrame> frame)
 {
+  QUICError error = QUICError(QUICErrorClass::NONE);
+
   switch (frame->type()) {
+  case QUICFrameType::MAX_DATA: {
+    error = this->_handle_max_data_frame(std::dynamic_pointer_cast<const QUICMaxDataFrame>(frame));
+    break;
+  }
+  case QUICFrameType::BLOCKED: {
+    this->slide_recv_max_data();
+    break;
+  }
+  case QUICFrameType::MAX_STREAM_DATA: {
+    error = this->_handle_max_stream_data_frame(std::dynamic_pointer_cast<const QUICMaxStreamDataFrame>(frame));
+    break;
+  }
+  case QUICFrameType::STREAM_BLOCKED: {
+    error = this->_handle_stream_blocked_frame(std::dynamic_pointer_cast<const QUICStreamBlockedFrame>(frame));
+    break;
+  }
   case QUICFrameType::STREAM:
-    this->_handle_stream_frame(std::dynamic_pointer_cast<const QUICStreamFrame>(frame));
+    error = this->_handle_stream_frame(std::dynamic_pointer_cast<const QUICStreamFrame>(frame));
     break;
   default:
     Debug(tag, "Unexpected frame type: %02x", static_cast<unsigned int>(frame->type()));
     ink_assert(false);
     break;
   }
+
+  if (error.cls != QUICErrorClass::NONE) {
+    // TODO return error
+  }
+}
+
+QUICError
+QUICStreamManager::_handle_max_data_frame(std::shared_ptr<const QUICMaxDataFrame> frame)
+{
+  this->_send_max_data = frame->maximum_data();
+  return QUICError(QUICErrorClass::NONE);
 }
 
 void
+QUICStreamManager::slide_recv_max_data()
+{
+  // TODO: How much should this be increased?
+  this->_recv_max_data += this->_qc->local_transport_parameters().initial_max_data();
+  this->send_frame(QUICFrameFactory::create_max_data_frame(this->_recv_max_data));
+}
+
+QUICError
+QUICStreamManager::_handle_max_stream_data_frame(std::shared_ptr<const QUICMaxStreamDataFrame> frame)
+{
+  QUICStream *stream = this->_find_stream(frame->stream_id());
+  if (stream) {
+    stream->recv(frame);
+  } else {
+    // TODO: connection error?
+  }
+
+  return QUICError(QUICErrorClass::NONE);
+}
+
+QUICError
+QUICStreamManager::_handle_stream_blocked_frame(std::shared_ptr<const QUICStreamBlockedFrame> frame)
+{
+  QUICStream *stream = this->_find_stream(frame->stream_id());
+  if (stream) {
+    stream->recv(frame);
+  } else {
+    // TODO: connection error?
+  }
+
+  return QUICError(QUICErrorClass::NONE);
+}
+
+QUICError
 QUICStreamManager::_handle_stream_frame(std::shared_ptr<const QUICStreamFrame> frame)
 {
   QUICStream *stream           = this->_find_or_create_stream(frame->stream_id());
@@ -68,18 +148,34 @@ QUICStreamManager::_handle_stream_frame(std::shared_ptr<const QUICStreamFrame> f
     application->set_stream(stream);
   }
 
-  stream->recv(frame);
+  QUICError error = stream->recv(frame);
+
   // FIXME: schedule VC_EVENT_READ_READY to application every single frame?
   // If application reading buffer continuously, do not schedule event.
   this_ethread()->schedule_imm(application, VC_EVENT_READ_READY, stream);
 
-  return;
+  return error;
 }
 
 /**
  * @brief Send stream frame
  */
 void
+QUICStreamManager::send_stream_frame(std::unique_ptr<QUICStreamFrame, QUICFrameDeleterFunc> frame)
+{
+  // XXX The offset of sending frame is always largest offset by sending side
+  if (frame->stream_id() != STREAM_ID_FOR_HANDSHAKE) {
+    this->_send_total_offset += frame->size();
+  }
+  this->_tx->transmit_frame(std::move(frame));
+
+  return;
+}
+
+/**
+ * @brief Send frame
+ */
+void
 QUICStreamManager::send_frame(std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> frame)
 {
   this->_tx->transmit_frame(std::move(frame));
@@ -87,6 +183,24 @@ QUICStreamManager::send_frame(std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> f
   return;
 }
 
+bool
+QUICStreamManager::is_send_avail_more_than(uint64_t size)
+{
+  return this->_send_max_data > (this->_send_total_offset + size);
+}
+
+bool
+QUICStreamManager::is_recv_avail_more_than(uint64_t size)
+{
+  return this->_recv_max_data > (this->_recv_total_offset + size);
+}
+
+void
+QUICStreamManager::add_recv_total_offset(uint64_t delta)
+{
+  this->_recv_total_offset += delta;
+}
+
 QUICStream *
 QUICStreamManager::_find_stream(QUICStreamId id)
 {
@@ -105,10 +219,44 @@ QUICStreamManager::_find_or_create_stream(QUICStreamId stream_id)
   if (!stream) {
     // TODO Free the stream somewhere
     stream = THREAD_ALLOC_INIT(quicStreamAllocator, this_ethread());
-    stream->init(this, this->_tx, stream_id);
+    if (stream_id == STREAM_ID_FOR_HANDSHAKE) {
+      // XXX rece/send max_stream_data are going to be set by init_flow_control_params()
+      stream->init(this, this->_tx, stream_id);
+    } else {
+      const QUICTransportParameters &local_tp  = this->_qc->local_transport_parameters();
+      const QUICTransportParameters &remote_tp = this->_qc->remote_transport_parameters();
+
+      // TODO: check local_tp and remote_tp is initialized
+      stream->init(this, this->_tx, stream_id, local_tp.initial_max_stream_data(), remote_tp.initial_max_stream_data());
+    }
+
     stream->start();
 
     this->stream_list.push(stream);
   }
   return stream;
 }
+
+uint64_t
+QUICStreamManager::recv_max_data() const
+{
+  return this->_recv_max_data;
+}
+
+uint64_t
+QUICStreamManager::send_max_data() const
+{
+  return this->_send_max_data;
+}
+
+uint64_t
+QUICStreamManager::recv_total_offset() const
+{
+  return this->_recv_total_offset;
+}
+
+uint64_t
+QUICStreamManager::send_total_offset() const
+{
+  return this->_send_total_offset;
+}
diff --git a/iocore/net/quic/QUICStreamManager.h b/iocore/net/quic/QUICStreamManager.h
index ae80bcd..6df6db3 100644
--- a/iocore/net/quic/QUICStreamManager.h
+++ b/iocore/net/quic/QUICStreamManager.h
@@ -30,25 +30,47 @@
 #include "QUICFrame.h"
 #include "QUICFrameTransmitter.h"
 
+class QUICTransportParameters;
+
 class QUICStreamManager : public QUICFrameHandler
 {
 public:
   QUICStreamManager(){};
 
-  int init(QUICFrameTransmitter *tx, QUICApplicationMap *app_map);
+  int init(QUICFrameTransmitter *tx, QUICConnection *qc, QUICApplicationMap *app_map);
   virtual std::vector<QUICFrameType> interests() override;
   virtual void handle_frame(std::shared_ptr<const QUICFrame>) override;
-  void send_frame(std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> frame);
+  virtual void send_frame(std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> frame);
+  void send_stream_frame(std::unique_ptr<QUICStreamFrame, QUICFrameDeleterFunc> frame);
+  virtual bool is_send_avail_more_than(uint64_t size);
+  virtual bool is_recv_avail_more_than(uint64_t size);
+  void add_recv_total_offset(uint64_t delta);
+  void slide_recv_max_data();
+  void init_flow_control_params(const QUICTransportParameters &local_tp, const QUICTransportParameters &remote_tp);
+  uint64_t recv_max_data() const;
+  uint64_t send_max_data() const;
+  uint64_t recv_total_offset() const;
+  uint64_t send_total_offset() const;
 
   DLL<QUICStream> stream_list;
 
 private:
   QUICStream *_find_or_create_stream(QUICStreamId stream_id);
   QUICStream *_find_stream(QUICStreamId id);
+  QUICError _handle_max_data_frame(std::shared_ptr<const QUICMaxDataFrame>);
+  QUICError _handle_stream_frame(std::shared_ptr<const QUICStreamFrame>);
+  QUICError _handle_max_stream_data_frame(std::shared_ptr<const QUICMaxStreamDataFrame>);
+  QUICError _handle_stream_blocked_frame(std::shared_ptr<const QUICStreamBlockedFrame>);
 
   QUICApplicationMap *_app_map = nullptr;
   QUICFrameTransmitter *_tx    = nullptr;
+  QUICConnection *_qc          = nullptr;
 
-private:
-  void _handle_stream_frame(std::shared_ptr<const QUICStreamFrame>);
+  QUICMaximumData _recv_max_data = {0};
+  QUICMaximumData _send_max_data = {0};
+
+  // TODO: Maximum Data is in units of 1024 octets, but those total offset are in units of octets.
+  // Add new uint16_t fields for remainder and treat those total offset in units of 1024 octets if needed
+  uint64_t _recv_total_offset = 0;
+  uint64_t _send_total_offset = 0;
 };
diff --git a/iocore/net/quic/QUICTypes.h b/iocore/net/quic/QUICTypes.h
index c595abf..83a0500 100644
--- a/iocore/net/quic/QUICTypes.h
+++ b/iocore/net/quic/QUICTypes.h
@@ -122,14 +122,16 @@ enum class QUICErrorClass {
   CRYPTOGRAPHIC,
 };
 
+// TODO: fix for draft-05
 enum class QUICErrorCode : uint32_t {
-  APPLICATION_SPECIFIC_ERROR        = 0,
-  HOST_LOCAL_ERROR                  = 0x40000000,
-  QUIC_TRANSPORT_ERROR              = 0x80000000,
-  QUIC_INTERNAL_ERROR               = 0x80000001,
-  QUIC_VERSION_NEGOTIATION_MISMATCH = 0x80000037,
-  CRYPTOGRAPHIC_ERROR               = 0xC0000000,
-  TLS_HANDSHAKE_FAILED              = 0xC000001C,
+  APPLICATION_SPECIFIC_ERROR               = 0,
+  HOST_LOCAL_ERROR                         = 0x40000000,
+  QUIC_TRANSPORT_ERROR                     = 0x80000000,
+  QUIC_INTERNAL_ERROR                      = 0x80000001,
+  QUIC_VERSION_NEGOTIATION_MISMATCH        = 0x80000037,
+  QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA = 0x8000003b,
+  CRYPTOGRAPHIC_ERROR                      = 0xC0000000,
+  TLS_HANDSHAKE_FAILED                     = 0xC000001C,
   // TODO Add error codes
 };
 
@@ -168,6 +170,61 @@ private:
   uint64_t _id;
 };
 
+class QUICMaximumData
+{
+public:
+  QUICMaximumData(uint64_t d) : _data(d) {}
+
+  bool
+  operator>(uint64_t r) const
+  {
+    return this->_data > (r / 1024);
+  }
+
+  bool
+  operator<(uint64_t r) const
+  {
+    return this->_data < (r / 1024);
+  }
+
+  bool
+  operator>=(uint64_t r) const
+  {
+    return this->_data >= (r / 1024);
+  }
+
+  bool
+  operator<=(uint64_t r) const
+  {
+    return this->_data <= (r / 1024);
+  }
+
+  bool
+  operator==(uint64_t r) const
+  {
+    return this->_data == (r / 1024);
+  }
+
+  QUICMaximumData &
+  operator=(uint64_t d)
+  {
+    this->_data = d;
+    return *this;
+  }
+
+  QUICMaximumData &
+  operator+=(uint64_t d)
+  {
+    this->_data += d;
+    return *this;
+  }
+
+  operator uint64_t() const { return _data; }
+
+private:
+  uint64_t _data = 0; // in units of 1024 octets
+};
+
 class QUICTypeUtil
 {
 public:
diff --git a/iocore/net/quic/test/test_QUICStream.cc b/iocore/net/quic/test/test_QUICStream.cc
index d60867a..e296621 100644
--- a/iocore/net/quic/test/test_QUICStream.cc
+++ b/iocore/net/quic/test/test_QUICStream.cc
@@ -41,13 +41,15 @@ std::shared_ptr<QUICStreamFrame> frame_6 = std::make_shared<QUICStreamFrame>(pay
 std::shared_ptr<QUICStreamFrame> frame_7 = std::make_shared<QUICStreamFrame>(payload + 12, 2, stream_id, 12);
 std::shared_ptr<QUICStreamFrame> frame_8 = std::make_shared<QUICStreamFrame>(payload + 14, 2, stream_id, 14);
 
+MockQUICStreamManager *manager = new MockQUICStreamManager();
+
 TEST_CASE("QUICStream_assembling_byte_stream_1", "[quic]")
 {
   MIOBuffer *read_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_4K);
   IOBufferReader *reader = read_buffer->alloc_reader();
 
   std::unique_ptr<QUICStream> stream(new QUICStream());
-  stream->init(nullptr, nullptr, stream_id);
+  stream->init(manager, nullptr, stream_id, 1024, 1024);
   stream->do_io_read(nullptr, 0, read_buffer);
 
   stream->recv(frame_1);
@@ -73,7 +75,7 @@ TEST_CASE("QUICStream_assembling_byte_stream_2", "[quic]")
   IOBufferReader *reader = read_buffer->alloc_reader();
 
   std::unique_ptr<QUICStream> stream(new QUICStream());
-  stream->init(nullptr, nullptr, stream_id);
+  stream->init(manager, nullptr, stream_id);
   stream->do_io_read(nullptr, 0, read_buffer);
 
   stream->recv(frame_8);
@@ -99,7 +101,7 @@ TEST_CASE("QUICStream_assembling_byte_stream_3", "[quic]")
   IOBufferReader *reader = read_buffer->alloc_reader();
 
   std::unique_ptr<QUICStream> stream(new QUICStream());
-  stream->init(nullptr, nullptr, stream_id);
+  stream->init(manager, nullptr, stream_id);
   stream->do_io_read(nullptr, 0, read_buffer);
 
   stream->recv(frame_8);

-- 
To stop receiving notification emails like this one, please contact
"commits@trafficserver.apache.org" <co...@trafficserver.apache.org>.