You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by em...@apache.org on 2021/09/02 06:51:15 UTC

[thrift] branch master updated: Updated TNonblockingServerSocket to better match TServerSocket

This is an automated email from the ASF dual-hosted git repository.

emmenlau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git


The following commit(s) were added to refs/heads/master by this push:
     new dd0bf89  Updated TNonblockingServerSocket to better match TServerSocket
     new b8069cb  Merge pull request #2449 from BioDataAnalysis/bda_unify_nonblockingserversocket
dd0bf89 is described below

commit dd0bf89ca39e3b342183342df31f9e89525ef6af
Author: Mario Emmenlauer <me...@biodataanalysis.de>
AuthorDate: Wed Sep 1 11:33:04 2021 +0200

    Updated TNonblockingServerSocket to better match TServerSocket
---
 .../thrift/transport/TNonblockingServerSocket.cpp  | 136 ++++++++++++++-------
 .../thrift/transport/TNonblockingServerSocket.h    |  13 +-
 lib/cpp/src/thrift/transport/TServerSocket.cpp     |   5 +-
 3 files changed, 104 insertions(+), 50 deletions(-)

diff --git a/lib/cpp/src/thrift/transport/TNonblockingServerSocket.cpp b/lib/cpp/src/thrift/transport/TNonblockingServerSocket.cpp
index 5ef0835..28d94ce 100644
--- a/lib/cpp/src/thrift/transport/TNonblockingServerSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TNonblockingServerSocket.cpp
@@ -43,6 +43,9 @@
 #ifdef HAVE_UNISTD_H
 #include <unistd.h>
 #endif
+#ifdef HAVE_SYS_STAT_H
+#include <sys/stat.h>
+#endif
 
 #include <thrift/transport/PlatformSocket.h>
 #include <thrift/transport/TNonblockingServerSocket.h>
@@ -72,13 +75,13 @@ inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
   return reinterpret_cast<SOCKOPT_CAST_T*>(v);
 }
 
+using std::shared_ptr;
+using std::string;
+
 namespace apache {
 namespace thrift {
 namespace transport {
 
-using std::shared_ptr;
-using std::string;
-
 TNonblockingServerSocket::TNonblockingServerSocket(int port)
   : port_(port),
     listenPort_(port),
@@ -145,6 +148,35 @@ TNonblockingServerSocket::~TNonblockingServerSocket() {
   close();
 }
 
+bool TNonblockingServerSocket::isOpen() const {
+  if (serverSocket_ == THRIFT_INVALID_SOCKET)
+    return false;
+
+  if (!listening_)
+    return false;
+
+  if (isUnixDomainSocket() && (path_[0] != '\0')) {
+    // On some platforms the domain socket file may not be instantly
+    // available yet, i.e. the Windows file system can be slow. Therefore
+    // we should check that the domain socket file actually exists.
+#ifdef _MSC_VER
+    // Currently there is a bug in ClangCl on Windows so the stat() call
+    // does not work. Workaround is a Windows-specific call if file exists:
+    DWORD const f_attrib = GetFileAttributesA(path_.c_str());
+    if (f_attrib == INVALID_FILE_ATTRIBUTES) {
+#else
+    struct THRIFT_STAT path_info;
+    if (::THRIFT_STAT(path_.c_str(), &path_info) < 0) {
+#endif
+      const std::string vError = "TNonblockingServerSocket::isOpen(): The domain socket path '" + path_ + "' does not exist (yet).";
+      GlobalOutput.perror(vError.c_str(), THRIFT_GET_SOCKET_ERROR);
+      return false;
+    }
+  }
+
+  return true;
+}
+
 void TNonblockingServerSocket::setSendTimeout(int sendTimeout) {
   sendTimeout_ = sendTimeout;
 }
@@ -174,26 +206,28 @@ void TNonblockingServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) {
 }
 
 void TNonblockingServerSocket::_setup_sockopts() {
-
-  // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept
   int one = 1;
-  if (-1 == setsockopt(serverSocket_,
-                       SOL_SOCKET,
-                       THRIFT_NO_SOCKET_CACHING,
-                       cast_sockopt(&one),
-                       sizeof(one))) {
-// ignore errors coming out of this setsockopt on Windows.  This is because
-// SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't
-// want to force servers to be an admin.
-#ifndef _WIN32
-    int errno_copy = THRIFT_GET_SOCKET_ERROR;
-    GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() THRIFT_NO_SOCKET_CACHING ",
-                        errno_copy);
-    close();
-    throw TTransportException(TTransportException::NOT_OPEN,
-                              "Could not set THRIFT_NO_SOCKET_CACHING",
-                              errno_copy);
-#endif
+  if (!isUnixDomainSocket()) {
+    // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept.
+    // This does not work with Domain sockets on most platforms. And
+    // on Windows it completely breaks the socket. Therefore do not
+    // use this on Domain sockets.
+    if (-1 == setsockopt(serverSocket_,
+                         SOL_SOCKET,
+                         THRIFT_NO_SOCKET_CACHING,
+                         cast_sockopt(&one),
+                         sizeof(one))) {
+      // NOTE: SO_EXCLUSIVEADDRUSE socket option can only be used by members
+      // of the Administrators security group on Windows XP and earlier. But
+      // we do not target WinXP anymore so no special checks required.
+      int errno_copy = THRIFT_GET_SOCKET_ERROR;
+      GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() THRIFT_NO_SOCKET_CACHING ",
+                          errno_copy);
+      close();
+      throw TTransportException(TTransportException::NOT_OPEN,
+                                "Could not set THRIFT_NO_SOCKET_CACHING",
+                                errno_copy);
+    }
   }
 
   // Set TCP buffer sizes
@@ -265,8 +299,10 @@ void TNonblockingServerSocket::_setup_sockopts() {
                               "THRIFT_FCNTL() THRIFT_F_SETFL THRIFT_O_NONBLOCK failed",
                               errno_copy);
   }
+}
 
-} // _setup_sockopts()
+void TNonblockingServerSocket::_setup_unixdomain_sockopts() {
+}
 
 void TNonblockingServerSocket::_setup_tcp_sockopts() {
   int one = 1;
@@ -302,13 +338,10 @@ void TNonblockingServerSocket::_setup_tcp_sockopts() {
 } // _setup_tcp_sockopts()
 
 void TNonblockingServerSocket::listen() {
-  listening_ = true;
 #ifdef _WIN32
   TWinsockSingleton::create();
 #endif // _WIN32
 
-  // tcp == false means Unix Domain socket
-  bool tcp = (path_.empty());
 
   // Validate port number
   if (port_ < 0 || port_ > 0xFFFF) {
@@ -317,12 +350,16 @@ void TNonblockingServerSocket::listen() {
 
   // Resolve host:port strings into an iterable of struct addrinfo*
   AddressResolutionHelper resolved_addresses;
-  if (tcp) {
+  if (!isUnixDomainSocket()) {
     try {
       resolved_addresses.resolve(address_, std::to_string(port_), SOCK_STREAM,
+#ifdef ANDROID
+                                 AI_PASSIVE | AI_ADDRCONFIG);
+#else
                                  AI_PASSIVE | AI_V4MAPPED);
+#endif
     } catch (const std::system_error& e) {
-      GlobalOutput.printf("getaddrinfo() -> %d. %s", e.code().value(), e.what());
+      GlobalOutput.printf("getaddrinfo() -> %d; %s", e.code().value(), e.what());
       close();
       throw TTransportException(TTransportException::NOT_OPEN,
                                 "Could not resolve host for server socket.");
@@ -334,14 +371,14 @@ void TNonblockingServerSocket::listen() {
   int retries = 0;
   int errno_copy = 0;
 
-  if (!tcp) {
+  if (isUnixDomainSocket()) {
     // -- Unix Domain Socket -- //
 
     serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
 
     if (serverSocket_ == THRIFT_INVALID_SOCKET) {
       int errno_copy = THRIFT_GET_SOCKET_ERROR;
-      GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
+      GlobalOutput.perror("TNonblockingServerSocket::listen() socket() ", errno_copy);
       close();
       throw TTransportException(TTransportException::NOT_OPEN,
                                 "Could not create server socket.",
@@ -349,14 +386,11 @@ void TNonblockingServerSocket::listen() {
     }
 
     _setup_sockopts();
-    //_setup_unixdomain_sockopts();
-
-/*
- * TODO: seems that windows now support unix sockets,
- *       see: https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/
- */
-#ifndef _WIN32
+    _setup_unixdomain_sockopts();
 
+    // Windows supports Unix domain sockets since it ships the header
+    // HAVE_AF_UNIX_H (see https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/)
+#if (!defined(_WIN32) || defined(HAVE_AF_UNIX_H))
     struct sockaddr_un address;
     socklen_t structlen = fillUnixSocketAddr(address, path_);
 
@@ -368,12 +402,11 @@ void TNonblockingServerSocket::listen() {
       // use short circuit evaluation here to only sleep if we need to
     } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
 #else
-    GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
+    GlobalOutput.perror("TNonblockingServerSocket::open() Unix Domain socket path not supported on this version of Windows", -99);
     throw TTransportException(TTransportException::NOT_OPEN,
                               " Unix Domain socket path not supported");
 #endif
   } else {
-
     // -- TCP socket -- //
 
     auto addr_iter = AddressResolutionHelper::Iter{};
@@ -405,7 +438,7 @@ void TNonblockingServerSocket::listen() {
                              IPV6_V6ONLY,
                              cast_sockopt(&zero),
                              sizeof(zero))) {
-          GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
+          GlobalOutput.perror("TNonblockingServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
         }
       }
 #endif // #ifdef IPV6_V6ONLY
@@ -440,7 +473,7 @@ void TNonblockingServerSocket::listen() {
 
   // throw error if socket still wasn't created successfully
   if (serverSocket_ == THRIFT_INVALID_SOCKET) {
-    GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
+    GlobalOutput.perror("TNonblockingServerSocket::listen() socket() ", errno_copy);
     close();
     throw TTransportException(TTransportException::NOT_OPEN,
                               "Could not create server socket.",
@@ -450,10 +483,15 @@ void TNonblockingServerSocket::listen() {
   // throw an error if we failed to bind properly
   if (retries > retryLimit_) {
     char errbuf[1024];
-    if (!tcp) {
-      THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() PATH %s", path_.c_str());
+    if (isUnixDomainSocket()) {
+#ifdef _WIN32
+      THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() Could not bind to domain socket path %s, error %d", path_.c_str(), WSAGetLastError());
+#else
+      // Fixme: This does not currently handle abstract domain sockets:
+      THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() Could not bind to domain socket path %s", path_.c_str());
+#endif
     } else {
-      THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() BIND %d", port_);
+      THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() Could not bind to port %d", port_);
     }
     GlobalOutput(errbuf);
     close();
@@ -474,6 +512,7 @@ void TNonblockingServerSocket::listen() {
   }
 
   // The socket is now listening!
+  listening_ = true;
 }
 
 int TNonblockingServerSocket::getPort() {
@@ -484,6 +523,14 @@ int TNonblockingServerSocket::getListenPort() {
   return listenPort_;
 }
 
+std::string TNonblockingServerSocket::getPath() const {
+    return path_;
+}
+
+bool TNonblockingServerSocket::isUnixDomainSocket() const {
+    return !path_.empty();
+}
+
 shared_ptr<TSocket> TNonblockingServerSocket::acceptImpl() {
   if (serverSocket_ == THRIFT_INVALID_SOCKET) {
     throw TTransportException(TTransportException::NOT_OPEN,
@@ -524,6 +571,7 @@ shared_ptr<TSocket> TNonblockingServerSocket::acceptImpl() {
   }
 
   shared_ptr<TSocket> client = createSocket(clientSocket);
+  client->setPath(path_);
   if (sendTimeout_ > 0) {
     client->setSendTimeout(sendTimeout_);
   }
diff --git a/lib/cpp/src/thrift/transport/TNonblockingServerSocket.h b/lib/cpp/src/thrift/transport/TNonblockingServerSocket.h
index 1ed2b07..a5cd664 100644
--- a/lib/cpp/src/thrift/transport/TNonblockingServerSocket.h
+++ b/lib/cpp/src/thrift/transport/TNonblockingServerSocket.h
@@ -73,6 +73,8 @@ public:
 
   ~TNonblockingServerSocket() override;
 
+  bool isOpen() const;
+
   void setSendTimeout(int sendTimeout);
   void setRecvTimeout(int recvTimeout);
 
@@ -103,6 +105,10 @@ public:
 
   int getListenPort() override;
 
+  std::string getPath() const;
+
+  bool isUnixDomainSocket() const;
+
   void listen() override;
   void close() override;
 
@@ -111,6 +117,10 @@ protected:
   virtual std::shared_ptr<TSocket> createSocket(THRIFT_SOCKET client);
 
 private:
+  void _setup_sockopts();
+  void _setup_unixdomain_sockopts();
+  void _setup_tcp_sockopts();
+
   int port_;
   int listenPort_;
   std::string address_;
@@ -128,9 +138,6 @@ private:
 
   socket_func_t listenCallback_;
   socket_func_t acceptCallback_;
-
-  void _setup_sockopts();
-  void _setup_tcp_sockopts();
 };
 }
 }
diff --git a/lib/cpp/src/thrift/transport/TServerSocket.cpp b/lib/cpp/src/thrift/transport/TServerSocket.cpp
index 8d6e7ef..ede6c9b 100644
--- a/lib/cpp/src/thrift/transport/TServerSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TServerSocket.cpp
@@ -90,13 +90,13 @@ void destroyer_of_fine_sockets(THRIFT_SOCKET* ssock) {
   delete ssock;
 }
 
+using std::shared_ptr;
 using std::string;
 
 namespace apache {
 namespace thrift {
 namespace transport {
 
-using std::shared_ptr;
 
 TServerSocket::TServerSocket(int port)
   : interruptableChildren_(true),
@@ -249,12 +249,12 @@ void TServerSocket::setInterruptableChildren(bool enable) {
 }
 
 void TServerSocket::_setup_sockopts() {
+  int one = 1;
   if (!isUnixDomainSocket()) {
     // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept.
     // This does not work with Domain sockets on most platforms. And
     // on Windows it completely breaks the socket. Therefore do not
     // use this on Domain sockets.
-    int one = 1;
     if (-1 == setsockopt(serverSocket_,
                         SOL_SOCKET,
                         THRIFT_NO_SOCKET_CACHING,
@@ -458,7 +458,6 @@ void TServerSocket::listen() {
                               " Unix Domain socket path not supported");
 #endif
   } else {
-
     // -- TCP socket -- //
 
     auto addr_iter = AddressResolutionHelper::Iter{};