You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by je...@apache.org on 2022/04/23 08:30:56 UTC

[thrift] branch master updated: THRIFT-5449: Use poll instead of select in Python TNonblockingServer if available Client: Python

This is an automated email from the ASF dual-hosted git repository.

jensg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git


The following commit(s) were added to refs/heads/master by this push:
     new 88a45ac77 THRIFT-5449: Use poll instead of select in Python TNonblockingServer if available Client: Python
88a45ac77 is described below

commit 88a45ac77518eafb57db08938ecdf38c5fcf7a31
Author: Yiyang Zhou <yi...@ucla.edu>
AuthorDate: Wed Aug 4 21:55:04 2021 +0800

    THRIFT-5449: Use poll instead of select in Python TNonblockingServer if available
    Client: Python
---
 lib/py/src/server/TNonblockingServer.py | 46 +++++++++++++++++++++++++++++++--
 1 file changed, 44 insertions(+), 2 deletions(-)

diff --git a/lib/py/src/server/TNonblockingServer.py b/lib/py/src/server/TNonblockingServer.py
index ac0649651..fdf6779ad 100644
--- a/lib/py/src/server/TNonblockingServer.py
+++ b/lib/py/src/server/TNonblockingServer.py
@@ -253,6 +253,7 @@ class TNonblockingServer(object):
         self._read, self._write = socket.socketpair()
         self.prepared = False
         self._stop = False
+        self.poll = select.poll() if hasattr(select, 'poll') else None
 
     def setNumThreads(self, num):
         """Set the number of worker threads that should be created."""
@@ -318,13 +319,53 @@ class TNonblockingServer(object):
         else:
             return select.select(readable, writable, readable) + (True,)
 
+    def _poll_select(self):
+        """Does poll on open connections, if available."""
+        remaining = []
+
+        self.poll.register(self.socket.handle.fileno(), select.POLLIN | select.POLLRDNORM)
+        self.poll.register(self._read.fileno(), select.POLLIN | select.POLLRDNORM)
+
+        for i, connection in list(self.clients.items()):
+            if connection.is_readable():
+                self.poll.register(connection.fileno(), select.POLLIN | select.POLLRDNORM | select.POLLERR | select.POLLHUP | select.POLLNVAL)
+                if connection.remaining or connection.received:
+                    remaining.append(connection.fileno())
+            if connection.is_writeable():
+                self.poll.register(connection.fileno(), select.POLLOUT | select.POLLWRNORM)
+            if connection.is_closed():
+                try:
+                    self.poll.unregister(i)
+                except KeyError:
+                    logger.debug("KeyError in unregistering connections...")
+                del self.clients[i]
+        if remaining:
+            return remaining, [], [], False
+
+        rlist = []
+        wlist = []
+        xlist = []
+        pollres = self.poll.poll()
+        for fd, event in pollres:
+            if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL):
+                xlist.append(fd)
+            elif event & (select.POLLOUT | select.POLLWRNORM):
+                wlist.append(fd)
+            elif event & (select.POLLIN | select.POLLRDNORM):
+                rlist.append(fd)
+            else:  # should be impossible
+                logger.debug("reached an impossible state in _poll_select")
+                xlist.append(fd)
+
+        return rlist, wlist, xlist, True
+
     def handle(self):
         """Handle requests.
 
         WARNING! You must call prepare() BEFORE calling handle()
         """
         assert self.prepared, "You have to call prepare before handle"
-        rset, wset, xset, selected = self._select()
+        rset, wset, xset, selected = self._select() if not self.poll else self._poll_select()
         for readable in rset:
             if readable == self._read.fileno():
                 # don't care i just need to clean readable flag
@@ -343,6 +384,8 @@ class TNonblockingServer(object):
                     connection.read()
                 if connection.received:
                     connection.status = WAIT_PROCESS
+                    if self.poll:
+                        self.poll.unregister(connection.fileno())
                     msg = connection.received.popleft()
                     itransport = TTransport.TMemoryBuffer(msg.buffer, msg.offset)
                     otransport = TTransport.TMemoryBuffer()
@@ -354,7 +397,6 @@ class TNonblockingServer(object):
             self.clients[writeable].write()
         for oob in xset:
             self.clients[oob].close()
-            del self.clients[oob]
 
     def close(self):
         """Closes the server."""