You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2014/02/28 19:41:14 UTC

svn commit: r1573028 - /qpid/trunk/qpid/python/qpid/compat.py

Author: kgiusti
Date: Fri Feb 28 18:41:14 2014
New Revision: 1573028

URL: http://svn.apache.org/r1573028
Log:
QPID-5588: [python-client] use poll() instead of select() where available.

Modified:
    qpid/trunk/qpid/python/qpid/compat.py

Modified: qpid/trunk/qpid/python/qpid/compat.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/compat.py?rev=1573028&r1=1573027&r2=1573028&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/compat.py (original)
+++ qpid/trunk/qpid/python/qpid/compat.py Fri Feb 28 18:41:14 2014
@@ -20,6 +20,8 @@
 import sys
 import errno
 import time
+from logging import getLogger
+log = getLogger("qpid.messaging")
 
 try:
   set = set
@@ -38,13 +40,69 @@ except ImportError:
   def format_exc():
     return "".join(traceback.format_exception(*sys.exc_info()))
 
-if tuple(sys.version_info[0:2]) < (2, 4):
-  from select import select as old_select
+# prefer poll() to select(), as it performs better at scale:
+import select as _select_mod
+if hasattr(_select_mod, "poll"):
+
+  from select import error as SelectError
   def select(rlist, wlist, xlist, timeout=None):
-    return old_select(list(rlist), list(wlist), list(xlist), timeout)
+    fd_count = 0
+    rset = set(rlist)
+    wset = set(wlist)
+    xset = set(xlist)
+    if timeout:
+      # select expects seconds, poll milliseconds
+      timeout = float(timeout) * 1000
+    poller = _select_mod.poll()
+
+    rwset = rset.intersection(wset)
+    for rw in rwset:
+      poller.register(rw, (_select_mod.POLLIN | _select_mod.POLLOUT))
+      fd_count += 1
+    for ro in rset.difference(rwset):
+      poller.register(ro, _select_mod.POLLIN)
+      fd_count += 1
+    for wo in wset.difference(rwset):
+      poller.register(wo, _select_mod.POLLOUT)
+      fd_count += 1
+    for x in xset:
+      poller.register(x, _select_mod.POLLPRI)
+      fd_count += 1
+
+    # select returns the objects passed in, but poll gives us back only the
+    # integer fds.  Maintain a map to get back:
+    fd_map = {}
+    for o in rset | wset | xset:
+      if hasattr(o, "fileno"):
+        fd_map[o.fileno()] = o
+
+    log.debug("poll(%d fds, timeout=%s)", fd_count, timeout)
+    active = poller.poll(timeout)
+    log.debug("poll() returned %s fds", len(active))
+
+    rfds = []
+    wfds = []
+    xfds = []
+    # set the error conditions so we do a read(), which will report the error
+    rflags = (_select_mod.POLLIN | _select_mod.POLLERR | _select_mod.POLLHUP)
+    for fds, flags in active:
+      if fds in fd_map:
+        fds = fd_map[fds]
+      if (flags & rflags):
+        rfds.append(fds)
+      if (flags & _select_mod.POLLOUT):
+        wfds.append(fds)
+      if (flags & _select_mod.POLLPRI):
+        xfds.append(fds)
+    return (rfds, wfds, xfds)
 else:
-  from select import select
-  from select import error as SelectError
+  if tuple(sys.version_info[0:2]) < (2, 4):
+    from select import select as old_select
+    def select(rlist, wlist, xlist, timeout=None):
+      return old_select(list(rlist), list(wlist), list(xlist), timeout)
+  else:
+    from select import select
+    from select import error as SelectError
 
 class BaseWaiter:
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org