You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by jk...@apache.org on 2018/03/12 10:56:45 UTC

thrift git commit: THRIFT-4465: Fix C++ TNonblockingServer and THRIFT_EAGAIN issues Client: cpp

Repository: thrift
Updated Branches:
  refs/heads/master 70b33fb6b -> 8bcb7ac2b


THRIFT-4465: Fix C++ TNonblockingServer and THRIFT_EAGAIN issues
Client: cpp

This closes #1497


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/8bcb7ac2
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/8bcb7ac2
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/8bcb7ac2

Branch: refs/heads/master
Commit: 8bcb7ac2be2f28e5fddfe578645c2aaa98b94717
Parents: 70b33fb
Author: Bugra Gedik <bg...@unscrambl.com>
Authored: Sun Jan 21 09:43:49 2018 -0800
Committer: James E. King III <jk...@apache.org>
Committed: Mon Mar 12 06:55:14 2018 -0400

----------------------------------------------------------------------
 build/cmake/ConfigureChecks.cmake               |  3 ++-
 build/cmake/config.h.in                         |  7 ++++--
 configure.ac                                    |  1 +
 .../src/thrift/server/TNonblockingServer.cpp    | 23 +++++++++++---------
 lib/cpp/src/thrift/transport/PlatformSocket.h   |  4 ++++
 lib/cpp/src/thrift/transport/TSSLSocket.cpp     | 11 ++++++++++
 lib/cpp/src/thrift/transport/TSSLSocket.h       |  1 +
 lib/cpp/src/thrift/transport/TSocket.cpp        | 23 ++++++++++++++++++++
 lib/cpp/src/thrift/transport/TSocket.h          | 15 ++++++++++++-
 9 files changed, 74 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/8bcb7ac2/build/cmake/ConfigureChecks.cmake
----------------------------------------------------------------------
diff --git a/build/cmake/ConfigureChecks.cmake b/build/cmake/ConfigureChecks.cmake
index 12a50df..e4793d4 100644
--- a/build/cmake/ConfigureChecks.cmake
+++ b/build/cmake/ConfigureChecks.cmake
@@ -37,11 +37,12 @@ check_include_file(netinet/in.h HAVE_NETINET_IN_H)
 check_include_file(stdint.h HAVE_STDINT_H)
 check_include_file(unistd.h HAVE_UNISTD_H)
 check_include_file(pthread.h HAVE_PTHREAD_H)
-check_include_file(sys/time.h HAVE_SYS_TIME_H)
+check_include_file(sys/ioctl.h HAVE_SYS_IOCTL_H)
 check_include_file(sys/param.h HAVE_SYS_PARAM_H)
 check_include_file(sys/resource.h HAVE_SYS_RESOURCE_H)
 check_include_file(sys/socket.h HAVE_SYS_SOCKET_H)
 check_include_file(sys/stat.h HAVE_SYS_STAT_H)
+check_include_file(sys/time.h HAVE_SYS_TIME_H)
 check_include_file(sys/un.h HAVE_SYS_UN_H)
 check_include_file(sys/poll.h HAVE_SYS_POLL_H)
 check_include_file(sys/select.h HAVE_SYS_SELECT_H)

http://git-wip-us.apache.org/repos/asf/thrift/blob/8bcb7ac2/build/cmake/config.h.in
----------------------------------------------------------------------
diff --git a/build/cmake/config.h.in b/build/cmake/config.h.in
index 083bc55..21561b2 100644
--- a/build/cmake/config.h.in
+++ b/build/cmake/config.h.in
@@ -100,8 +100,8 @@
 /* Define to 1 if you have the <pthread.h> header file. */
 #cmakedefine HAVE_PTHREAD_H 1
 
-/* Define to 1 if you have the <sys/time.h> header file. */
-#cmakedefine HAVE_SYS_TIME_H 1
+/* Define to 1 if you have the <sys/ioctl.h> header file. */
+#cmakedefine HAVE_SYS_IOCTL_H 1
 
 /* Define to 1 if you have the <sys/param.h> header file. */
 #cmakedefine HAVE_SYS_PARAM_H 1
@@ -124,6 +124,9 @@
 /* Define to 1 if you have the <sys/select.h> header file. */
 #cmakedefine HAVE_SYS_SELECT_H 1
 
+/* Define to 1 if you have the <sys/time.h> header file. */
+#cmakedefine HAVE_SYS_TIME_H 1
+
 /* Define to 1 if you have the <sched.h> header file. */
 #cmakedefine HAVE_SCHED_H 1
 

http://git-wip-us.apache.org/repos/asf/thrift/blob/8bcb7ac2/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index 7634823..9efc28b 100755
--- a/configure.ac
+++ b/configure.ac
@@ -635,6 +635,7 @@ AC_CHECK_HEADERS([netinet/in.h])
 AC_CHECK_HEADERS([pthread.h])
 AC_CHECK_HEADERS([stddef.h])
 AC_CHECK_HEADERS([stdlib.h])
+AC_CHECK_HEADERS([sys/ioctl.h])
 AC_CHECK_HEADERS([sys/socket.h])
 AC_CHECK_HEADERS([sys/time.h])
 AC_CHECK_HEADERS([sys/un.h])

http://git-wip-us.apache.org/repos/asf/thrift/blob/8bcb7ac2/lib/cpp/src/thrift/server/TNonblockingServer.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index d17f77c..e60bffc 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -472,6 +472,18 @@ void TNonblockingServer::TConnection::workSocket() {
     }
     // size known; now get the rest of the frame
     transition();
+
+    // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for
+    // regular sockets, because if there is more data, libevent will fire the event handler registered for read
+    // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the
+    // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In
+    // that case, not having this workSocket() call here would result in a hang as we will never get to work the socket,
+    // despite having more data.
+    if (tSocket_->hasPendingDataToRead())
+    {
+        workSocket();
+    }
+
     return;
 
   case SOCKET_RECV:
@@ -677,9 +689,6 @@ void TNonblockingServer::TConnection::transition() {
       appState_ = APP_SEND_RESULT;
       setWrite();
 
-      // Try to work the socket immediately
-      // workSocket();
-
       return;
     }
 
@@ -718,9 +727,6 @@ void TNonblockingServer::TConnection::transition() {
     // Register read event
     setRead();
 
-    // Try to work the socket right away
-    // workSocket();
-
     return;
 
   case APP_READ_FRAME_SIZE:
@@ -753,9 +759,6 @@ void TNonblockingServer::TConnection::transition() {
     socketState_ = SOCKET_RECV;
     appState_ = APP_READ_REQUEST;
 
-    // Work the socket right away
-    workSocket();
-
     return;
 
   case APP_CLOSE_CONNECTION:
@@ -1063,7 +1066,7 @@ void TNonblockingServer::expireClose(stdcxx::shared_ptr<Runnable> task) {
   connection->forceClose();
 }
 
-void TNonblockingServer::stop() { 
+void TNonblockingServer::stop() {
   // Breaks the event loop in all threads so that they end ASAP.
   for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
     ioThreads_[i]->stop();

http://git-wip-us.apache.org/repos/asf/thrift/blob/8bcb7ac2/lib/cpp/src/thrift/transport/PlatformSocket.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/PlatformSocket.h b/lib/cpp/src/thrift/transport/PlatformSocket.h
index 1890b60..9591058 100644
--- a/lib/cpp/src/thrift/transport/PlatformSocket.h
+++ b/lib/cpp/src/thrift/transport/PlatformSocket.h
@@ -51,6 +51,8 @@
 #  define THRIFT_LSEEK _lseek
 #  define THRIFT_WRITE _write
 #  define THRIFT_READ _read
+#  define THRIFT_IOCTL_SOCKET ioctlsocket
+#  define THRIFT_IOCTL_SOCKET_NUM_BYTES_TYPE u_long
 #  define THRIFT_FSTAT _fstat
 #  define THRIFT_STAT _stat
 #  ifdef _WIN32_WCE
@@ -111,6 +113,8 @@
 #  define THRIFT_LSEEK lseek
 #  define THRIFT_WRITE write
 #  define THRIFT_READ read
+#  define THRIFT_IOCTL_SOCKET ioctl
+#  define THRIFT_IOCTL_SOCKET_NUM_BYTES_TYPE int
 #  define THRIFT_STAT stat
 #  define THRIFT_FSTAT fstat
 #  define THRIFT_GAI_STRERROR gai_strerror

http://git-wip-us.apache.org/repos/asf/thrift/blob/8bcb7ac2/lib/cpp/src/thrift/transport/TSSLSocket.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TSSLSocket.cpp b/lib/cpp/src/thrift/transport/TSSLSocket.cpp
index 3f0e28e..7bdacb0 100644
--- a/lib/cpp/src/thrift/transport/TSSLSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSSLSocket.cpp
@@ -249,6 +249,17 @@ TSSLSocket::~TSSLSocket() {
   close();
 }
 
+bool TSSLSocket::hasPendingDataToRead() {
+  if (!isOpen()) {
+    return false;
+  }
+  initializeHandshake();
+  if (!checkHandshake())
+    throw TSSLException("TSSLSocket::hasPendingDataToRead: Handshake is not completed");
+  // data may be available in SSL buffers (note: SSL_pending does not have a failure mode)
+  return SSL_pending(ssl_) > 0 || TSocket::hasPendingDataToRead();
+}
+
 void TSSLSocket::init() {
   handshakeCompleted_ = false;
   readRetryCount_ = 0;

http://git-wip-us.apache.org/repos/asf/thrift/blob/8bcb7ac2/lib/cpp/src/thrift/transport/TSSLSocket.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TSSLSocket.h b/lib/cpp/src/thrift/transport/TSSLSocket.h
index 8527209..ec30cc1 100644
--- a/lib/cpp/src/thrift/transport/TSSLSocket.h
+++ b/lib/cpp/src/thrift/transport/TSSLSocket.h
@@ -78,6 +78,7 @@ public:
   bool peek();
   void open();
   void close();
+  bool hasPendingDataToRead();
   uint32_t read(uint8_t* buf, uint32_t len);
   void write(const uint8_t* buf, uint32_t len);
   uint32_t write_partial(const uint8_t* buf, uint32_t len);

http://git-wip-us.apache.org/repos/asf/thrift/blob/8bcb7ac2/lib/cpp/src/thrift/transport/TSocket.cpp
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TSocket.cpp b/lib/cpp/src/thrift/transport/TSocket.cpp
index d93d0ff..c90593d 100644
--- a/lib/cpp/src/thrift/transport/TSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSocket.cpp
@@ -21,6 +21,9 @@
 
 #include <cstring>
 #include <sstream>
+#ifdef HAVE_SYS_IOCTL_H
+#include <sys/ioctl.h>
+#endif
 #ifdef HAVE_SYS_SOCKET_H
 #include <sys/socket.h>
 #endif
@@ -167,6 +170,26 @@ TSocket::~TSocket() {
   close();
 }
 
+bool TSocket::hasPendingDataToRead() {
+  if (!isOpen()) {
+    return false;
+  }
+
+  int32_t retries = 0;
+  THRIFT_IOCTL_SOCKET_NUM_BYTES_TYPE numBytesAvailable;
+try_again:
+  int r = THRIFT_IOCTL_SOCKET(socket_, FIONREAD, &numBytesAvailable);
+  if (r == -1) {
+    int errno_copy = THRIFT_GET_SOCKET_ERROR;
+    if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
+      goto try_again;
+    }
+    GlobalOutput.perror("TSocket::hasPendingDataToRead() THRIFT_IOCTL_SOCKET() " + getSocketInfo(), errno_copy);
+    throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
+  }
+  return numBytesAvailable > 0;
+}
+
 bool TSocket::isOpen() {
   return (socket_ != THRIFT_INVALID_SOCKET);
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/8bcb7ac2/lib/cpp/src/thrift/transport/TSocket.h
----------------------------------------------------------------------
diff --git a/lib/cpp/src/thrift/transport/TSocket.h b/lib/cpp/src/thrift/transport/TSocket.h
index 1f95e68..66d9e6c 100644
--- a/lib/cpp/src/thrift/transport/TSocket.h
+++ b/lib/cpp/src/thrift/transport/TSocket.h
@@ -84,7 +84,9 @@ public:
   virtual bool isOpen();
 
   /**
-   * Calls select on the socket to see if there is more data available.
+   * Checks whether there is more data available in the socket to read.
+   *
+   * This call blocks until at least one byte is available or the socket is closed.
    */
   virtual bool peek();
 
@@ -101,6 +103,17 @@ public:
   virtual void close();
 
   /**
+   * Determines whether there is pending data to read or not.
+   *
+   * This call does not block.
+   * \throws TTransportException of types:
+   *           NOT_OPEN means the socket has been closed
+   *           UNKNOWN means something unexpected happened
+   * \returns true if there is pending data to read, false otherwise
+   */
+  virtual bool hasPendingDataToRead();
+
+  /**
    * Reads from the underlying socket.
    * \returns the number of bytes read or 0 indicates EOF
    * \throws TTransportException of types: