You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by je...@apache.org on 2022/03/14 11:36:38 UTC

[thrift] branch master updated: THRIFT-5515: TConnection::workSocket reads all pending oneway requests.

This is an automated email from the ASF dual-hosted git repository.

jensg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git


The following commit(s) were added to refs/heads/master by this push:
     new b941b11  THRIFT-5515: TConnection::workSocket reads all pending oneway requests.
b941b11 is described below

commit b941b1124834d38daaa0e4355655b4ce63b80d3e
Author: Tamas Kovacs <ta...@nokia-sbell.com>
AuthorDate: Fri Feb 11 19:31:40 2022 +0800

    THRIFT-5515: TConnection::workSocket reads all pending oneway requests.
---
 lib/cpp/src/thrift/server/TNonblockingServer.cpp | 251 ++++++++++++-----------
 1 file changed, 129 insertions(+), 122 deletions(-)

diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index f2b3e70..ae92da3 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -419,154 +419,161 @@ void TNonblockingServer::TConnection::setSocket(std::shared_ptr<TSocket> socket)
 }
 
 void TNonblockingServer::TConnection::workSocket() {
-  int got = 0, left = 0, sent = 0;
-  uint32_t fetch = 0;
-
-  switch (socketState_) {
-  case SOCKET_RECV_FRAMING:
-    union {
-      uint8_t buf[sizeof(uint32_t)];
-      uint32_t size;
-    } framing;
-
-    // if we've already received some bytes we kept them here
-    framing.size = readWant_;
-    // determine size of this frame
-    try {
-      // Read from the socket
-      fetch = tSocket_->read(&framing.buf[readBufferPos_],
-                             uint32_t(sizeof(framing.size) - readBufferPos_));
-      if (fetch == 0) {
-        // Whenever we get here it means a remote disconnect
-        close();
-        return;
+  while (true) {
+    int got = 0, left = 0, sent = 0;
+    uint32_t fetch = 0;
+
+    switch (socketState_) {
+    case SOCKET_RECV_FRAMING:
+      union {
+        uint8_t buf[sizeof(uint32_t)];
+        uint32_t size;
+      } framing;
+
+      // if we've already received some bytes we kept them here
+      framing.size = readWant_;
+      // determine size of this frame
+      try {
+        // Read from the socket
+        fetch = tSocket_->read(&framing.buf[readBufferPos_],
+                               uint32_t(sizeof(framing.size) - readBufferPos_));
+        if (fetch == 0) {
+          // Whenever we get here it means a remote disconnect
+          close();
+          return;
+        }
+        readBufferPos_ += fetch;
+      } catch (TTransportException& te) {
+        //In Nonblocking SSLSocket some operations need to be retried again.
+        //Current approach is parsing exception message, but a better solution needs to be investigated.
+        if(!strstr(te.what(), "retry")) {
+          GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+          close();
+
+          return;
+        }
       }
-      readBufferPos_ += fetch;
-    } catch (TTransportException& te) {
-      //In Nonblocking SSLSocket some operations need to be retried again.
-      //Current approach is parsing exception message, but a better solution needs to be investigated.
-      if(!strstr(te.what(), "retry")) {
-        GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
-        close();
 
+      if (readBufferPos_ < sizeof(framing.size)) {
+        // more needed before frame size is known -- save what we have so far
+        readWant_ = framing.size;
         return;
       }
-    }
 
-    if (readBufferPos_ < sizeof(framing.size)) {
-      // more needed before frame size is known -- save what we have so far
-      readWant_ = framing.size;
-      return;
-    }
-
-    readWant_ = ntohl(framing.size);
-    if (readWant_ > server_->getMaxFrameSize()) {
-      // Don't allow giant frame sizes.  This prevents bad clients from
-      // causing us to try and allocate a giant buffer.
-      GlobalOutput.printf(
-          "TNonblockingServer: frame size too large "
-          "(%" PRIu32 " > %" PRIu64
-          ") from client %s. "
-          "Remote side not using TFramedTransport?",
-          readWant_,
-          (uint64_t)server_->getMaxFrameSize(),
-          tSocket_->getSocketInfo().c_str());
-      close();
-      return;
-    }
-    // size known; now get the rest of the frame
-    transition();
-
-    // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for
-    // regular sockets, because if there is more data, libevent will fire the event handler registered for read
-    // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the
-    // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In
-    // that case, not having this workSocket() call here would result in a hang as we will never get to work the socket,
-    // despite having more data.
-    if (tSocket_->hasPendingDataToRead())
-    {
-        workSocket();
-    }
+      readWant_ = ntohl(framing.size);
+      if (readWant_ > server_->getMaxFrameSize()) {
+        // Don't allow giant frame sizes.  This prevents bad clients from
+        // causing us to try and allocate a giant buffer.
+        GlobalOutput.printf(
+            "TNonblockingServer: frame size too large "
+            "(%" PRIu32 " > %" PRIu64
+            ") from client %s. "
+            "Remote side not using TFramedTransport?",
+            readWant_,
+            (uint64_t)server_->getMaxFrameSize(),
+            tSocket_->getSocketInfo().c_str());
+        close();
+        return;
+      }
+      // size known; now get the rest of the frame
+      transition();
 
-    return;
+      // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for
+      // regular sockets, because if there is more data, libevent will fire the event handler registered for read
+      // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the
+      // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In
+      // that case, not trying another processing cycle here would result in a hang as we will never get to work the socket,
+      // despite having more data.
+      if (tSocket_->hasPendingDataToRead())
+      {
+          continue;
+      }
 
-  case SOCKET_RECV:
-    // It is an error to be in this state if we already have all the data
-    if (!(readBufferPos_ < readWant_)) {
-      GlobalOutput.printf("TNonblockingServer: frame size too short");
-      close();
       return;
-    }
 
-    try {
-      // Read from the socket
-      fetch = readWant_ - readBufferPos_;
-      got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
-    } catch (TTransportException& te) {
-      //In Nonblocking SSLSocket some operations need to be retried again.
-      //Current approach is parsing exception message, but a better solution needs to be investigated.
-      if(!strstr(te.what(), "retry")) {
-        GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+    case SOCKET_RECV:
+      // It is an error to be in this state if we already have all the data
+      if (!(readBufferPos_ < readWant_)) {
+        GlobalOutput.printf("TNonblockingServer: frame size too short");
         close();
+        return;
       }
 
-      return;
-    }
+      try {
+        // Read from the socket
+        fetch = readWant_ - readBufferPos_;
+        got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
+      } catch (TTransportException& te) {
+        //In Nonblocking SSLSocket some operations need to be retried again.
+        //Current approach is parsing exception message, but a better solution needs to be investigated.
+        if(!strstr(te.what(), "retry")) {
+          GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+          close();
+        }
 
-    if (got > 0) {
-      // Move along in the buffer
-      readBufferPos_ += got;
+        return;
+      }
 
-      // Check that we did not overdo it
-      assert(readBufferPos_ <= readWant_);
+      if (got > 0) {
+        // Move along in the buffer
+        readBufferPos_ += got;
 
-      // We are done reading, move onto the next state
-      if (readBufferPos_ == readWant_) {
-        transition();
+        // Check that we did not overdo it
+        assert(readBufferPos_ <= readWant_);
+
+        // We are done reading, move onto the next state
+        if (readBufferPos_ == readWant_) {
+          transition();
+          if (socketState_ == SOCKET_RECV_FRAMING && tSocket_->hasPendingDataToRead())
+          {
+              continue;
+          }
+        }
+        return;
       }
-      return;
-    }
 
-    // Whenever we get down here it means a remote disconnect
-    close();
+      // Whenever we get down here it means a remote disconnect
+      close();
 
-    return;
+      return;
 
-  case SOCKET_SEND:
-    // Should never have position past size
-    assert(writeBufferPos_ <= writeBufferSize_);
+    case SOCKET_SEND:
+      // Should never have position past size
+      assert(writeBufferPos_ <= writeBufferSize_);
 
-    // If there is no data to send, then let us move on
-    if (writeBufferPos_ == writeBufferSize_) {
-      GlobalOutput("WARNING: Send state with no data to send");
-      transition();
-      return;
-    }
+      // If there is no data to send, then let us move on
+      if (writeBufferPos_ == writeBufferSize_) {
+        GlobalOutput("WARNING: Send state with no data to send");
+        transition();
+        return;
+      }
 
-    try {
-      left = writeBufferSize_ - writeBufferPos_;
-      sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
-    } catch (TTransportException& te) {
-      GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
-      close();
-      return;
-    }
+      try {
+        left = writeBufferSize_ - writeBufferPos_;
+        sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
+      } catch (TTransportException& te) {
+        GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
+        close();
+        return;
+      }
 
-    writeBufferPos_ += sent;
+      writeBufferPos_ += sent;
 
-    // Did we overdo it?
-    assert(writeBufferPos_ <= writeBufferSize_);
+      // Did we overdo it?
+      assert(writeBufferPos_ <= writeBufferSize_);
 
-    // We are done!
-    if (writeBufferPos_ == writeBufferSize_) {
-      transition();
-    }
+      // We are done!
+      if (writeBufferPos_ == writeBufferSize_) {
+        transition();
+      }
 
-    return;
+      return;
 
-  default:
-    GlobalOutput.printf("Unexpected Socket State %d", socketState_);
-    assert(0);
+    default:
+      GlobalOutput.printf("Unexpected Socket State %d", socketState_);
+      assert(0);
+      return;
+    }
   }
 }