You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/09/23 14:30:50 UTC

svn commit: r818075 - in /qpid/trunk/qpid/python/qpid: address.py driver.py messaging.py selector.py

Author: rhs
Date: Wed Sep 23 12:30:49 2009
New Revision: 818075

URL: http://svn.apache.org/viewvc?rev=818075&view=rev
Log:
switched API over to select based driver; added address parser

Added:
    qpid/trunk/qpid/python/qpid/address.py
    qpid/trunk/qpid/python/qpid/selector.py
Modified:
    qpid/trunk/qpid/python/qpid/driver.py
    qpid/trunk/qpid/python/qpid/messaging.py

Added: qpid/trunk/qpid/python/qpid/address.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/address.py?rev=818075&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid/address.py (added)
+++ qpid/trunk/qpid/python/qpid/address.py Wed Sep 23 12:30:49 2009
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import re
+
+TYPES = []
+
+class Type:
+
+  def __init__(self, name, pattern=None):
+    self.name = name
+    self.pattern = pattern
+    if self.pattern:
+      TYPES.append(self)
+
+  def __repr__(self):
+    return self.name
+
+LBRACE = Type("LBRACE", r"\{")
+RBRACE = Type("RBRACE", r"\}")
+COLON = Type("COLON", r":")
+COMMA = Type("COMMA", r",")
+ID = Type("ID", r'[a-zA-Z_][a-zA-Z0-9_]*')
+NUMBER = Type("NUMBER", r'[+-]?[0-9]*\.?[0-9]+')
+STRING = Type("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""")
+WSPACE = Type("WSPACE", r"[ \n\r\t]+")
+EOF = Type("EOF")
+
+class Token:
+
+  def __init__(self, type, value):
+    self.type = type
+    self.value = value
+
+  def __repr__(self):
+    return "%s: %r" % (self.type, self.value)
+
+joined = "|".join(["(%s)" % t.pattern for t in TYPES])
+LEXER = re.compile(joined)
+
+def lex(st):
+  pos = 0
+  while pos < len(st):
+    m = LEXER.match(st, pos)
+    if m is None:
+      raise ValueError(repr(st[pos:]))
+    else:
+      idx = m.lastindex
+      t = Token(TYPES[idx - 1], m.group(idx))
+      yield t
+    pos = m.end()
+  yield Token(EOF, None)
+
+class ParseError(Exception): pass
+
+class EOF(Exception): pass
+
+class Parser:
+
+  def __init__(self, tokens):
+    self.tokens = [t for t in tokens if t.type is not WSPACE]
+    self.idx = 0
+
+  def next(self):
+    return self.tokens[self.idx]
+
+  def matches(self, *types):
+    return self.next().type in types
+
+  def eat(self, *types):
+    if types and not self.matches(*types):
+      raise ParseError("expecting %s -- got %s" % (", ".join(map(str, types)), self.next()))
+    else:
+      t = self.next()
+      self.idx += 1
+      return t
+
+  def parse(self):
+    result = self.address()
+    self.eat(EOF)
+    return result
+
+  def address(self):
+    name = self.eat(ID).value
+    if self.matches(LBRACE):
+      options = self.map()
+    else:
+      options = None
+    return name, options
+
+  def map(self):
+    self.eat(LBRACE)
+    result = {}
+    while True:
+      if self.matches(RBRACE):
+        self.eat(RBRACE)
+        break
+      else:
+        if self.matches(ID):
+          n, v = self.nameval()
+          result[n] = v
+        elif self.matches(COMMA):
+          self.eat(COMMA)
+        else:
+          raise ParseError("expecting (ID, COMMA), got %s" % self.next())
+    return result
+
+  def nameval(self):
+    name = self.eat(ID).value
+    self.eat(COLON)
+    val = self.value()
+    return (name, val)
+
+  def value(self):
+    if self.matches(NUMBER, STRING):
+      return eval(self.eat().value)
+    elif self.matches(LBRACE):
+      return self.map()
+    else:
+      raise ParseError("expecting (NUMBER, STRING, LBRACE) got %s" % self.next())
+
+def parse(addr):
+  return Parser(lex(addr)).parse()
+
+__all__ = ["parse"]

Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=818075&r1=818074&r2=818075&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Wed Sep 23 12:30:49 2009
@@ -17,13 +17,15 @@
 # under the License.
 #
 
-import compat, connection, socket, sys, time
+import compat, connection, socket, struct, sys, time
 from concurrency import synchronized
-from datatypes import RangedSet, Message as Message010
-from exceptions import Timeout
+from datatypes import RangedSet, Serial
+from exceptions import Timeout, VersionError
+from framing import OpEncoder, SegmentEncoder, FrameEncoder, FrameDecoder, SegmentDecoder, OpDecoder
 from logging import getLogger
 from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED
-from ops import delivery_mode
+from ops import *
+from selector import Selector
 from session import Client, INCOMPLETE, SessionDetached
 from threading import Condition, Thread
 from util import connect
@@ -50,145 +52,361 @@
   def __init__(self, target):
     self.target = target
 
+# XXX
+
 DURABLE_DEFAULT=True
 
+# XXX
+
 FILTER_DEFAULTS = {
   "topic": Pattern("*")
   }
 
-def delegate(handler, session):
-  class Delegate(Client):
+# XXX
+
+CLIENT_PROPERTIES = {"product": "qpid python client",
+                     "version": "development",
+                     "platform": os.name,
+                     "qpid.client_process": os.path.basename(sys.argv[0]),
+                     "qpid.client_pid": os.getpid(),
+                     "qpid.client_ppid": os.getppid()}
+
+def noop(): pass
+
+class SessionState:
+
+  def __init__(self, driver, session, name, channel):
+    self.driver = driver
+    self.session = session
+    self.name = name
+    self.channel = channel
+    self.detached = False
+    self.committing = False
+    self.aborting = False
+
+    # sender state
+    self.sent = Serial(0)
+    self.acknowledged = RangedSet()
+    self.completions = {}
+    self.min_completion = self.sent
+    self.max_completion = self.sent
+
+    # receiver state
+    self.received = None
+    self.executed = RangedSet()
+
+    # XXX: need to periodically exchange completion/known_completion
+
+  def write_cmd(self, cmd, completion=noop):
+    cmd.id = self.sent
+    self.sent += 1
+    self.completions[cmd.id] = completion
+    self.max_completion = cmd.id
+    self.write_op(cmd)
+
+  def write_op(self, op):
+    op.channel = self.channel
+    self.driver.write_op(op)
 
-    def message_transfer(self, cmd):
-      return handler._message_transfer(session, cmd)
-  return Delegate
+# XXX
+HEADER="!4s4B"
+
+EMPTY_DP = DeliveryProperties()
+EMPTY_MP = MessageProperties()
 
 class Driver:
 
   def __init__(self, connection):
     self.connection = connection
     self._lock = self.connection._lock
-    self._wakeup_cond = Condition()
-    self._socket = None
-    self._conn = None
+
+    self._selector = Selector.default()
+    self.reset()
+
+  def reset(self):
+    self._opening = False
+    self._closing = False
     self._connected = False
     self._attachments = {}
-    self._modcount = self.connection._modcount
-    self.thread = Thread(target=self.run)
-    self.thread.setDaemon(True)
-    # XXX: need to figure out how to join on this thread
 
+    self._channel_max = 65536
+    self._channels = 0
+    self._sessions = {}
+
+    self._socket = None
+    self._buf = ""
+    self._hdr = ""
+    self._op_enc = OpEncoder()
+    self._seg_enc = SegmentEncoder()
+    self._frame_enc = FrameEncoder()
+    self._frame_dec = FrameDecoder()
+    self._seg_dec = SegmentDecoder()
+    self._op_dec = OpDecoder()
+    self._timeout = None
+
+    for ssn in self.connection.sessions.values():
+      for m in ssn.acked + ssn.unacked + ssn.incoming:
+        m._transfer_id = None
+      for rcv in ssn.receivers:
+        rcv.impending = rcv.received
+
+  @synchronized
   def wakeup(self):
-    self._wakeup_cond.acquire()
-    try:
-      self._wakeup_cond.notifyAll()
-    finally:
-      self._wakeup_cond.release()
+    self.dispatch()
+    self._selector.wakeup()
 
   def start(self):
-    self.thread.start()
+    self._selector.register(self)
 
-  def run(self):
-    while True:
-      self._wakeup_cond.acquire()
+  def fileno(self):
+    return self._socket.fileno()
+
+  @synchronized
+  def reading(self):
+    return self._socket is not None
+
+  @synchronized
+  def writing(self):
+    return self._socket is not None and self._buf
+
+  @synchronized
+  def timing(self):
+    return self._timeout
+
+  @synchronized
+  def readable(self):
+    error = None
+    recoverable = False
+    try:
+      data = self._socket.recv(64*1024)
+      if data:
+        log.debug("READ: %r", data)
+      else:
+        error = ("connection aborted",)
+        recoverable = True
+    except socket.error, e:
+      error = (e,)
+      recoverable = True
+
+    if not error:
       try:
-        if self.connection._modcount <= self._modcount:
-          self._wakeup_cond.wait(10)
-      finally:
-        self._wakeup_cond.release()
-      self.dispatch(self.connection._modcount)
+        if len(self._hdr) < 8:
+          r = 8 - len(self._hdr)
+          self._hdr += data[:r]
+          data = data[r:]
+
+          if len(self._hdr) == 8:
+            self.do_header(self._hdr)
+
+        self._frame_dec.write(data)
+        self._seg_dec.write(*self._frame_dec.read())
+        self._op_dec.write(*self._seg_dec.read())
+        for op in self._op_dec.read():
+          self.assign_id(op)
+          log.debug("RCVD: %r", op)
+          op.dispatch(self)
+      except VersionError, e:
+        error = (e,)
+      except:
+        msg = compat.format_exc()
+        error = (msg,)
+
+    if error:
+      self._socket.close()
+      self.reset()
+      if recoverable and self.connection.reconnect:
+        self._timeout = time.time() + 3
+        log.warn("recoverable error: %s" % error)
+        log.warn("sleeping 3 seconds")
+      else:
+        self.connection.error = error
+
+    self.connection._waiter.notifyAll()
+
+  def assign_id(self, op):
+    if isinstance(op, Command):
+      sst = self.get_sst(op)
+      op.id = sst.received
+      sst.received += 1
+
+  @synchronized
+  def writeable(self):
+    n = self._socket.send(self._buf)
+    log.debug("SENT: %r", self._buf[:n])
+    self._buf = self._buf[n:]
 
   @synchronized
-  def dispatch(self, modcount):
+  def timeout(self):
+    log.warn("retrying ...")
+    self.dispatch()
+    self.connection._waiter.notifyAll()
+
+  def write_op(self, op):
+    log.debug("SENT: %r", op)
+    self._op_enc.write(op)
+    self._seg_enc.write(*self._op_enc.read())
+    self._frame_enc.write(*self._seg_enc.read())
+    self._buf += self._frame_enc.read()
+
+  def do_header(self, hdr):
+    cli_major = 0; cli_minor = 10
+    magic, _, _, major, minor = struct.unpack(HEADER, hdr)
+    if major != cli_major or minor != cli_minor:
+      raise VersionError("client: %s-%s, server: %s-%s" %
+                         (cli_major, cli_minor, major, minor))
+
+  def do_connection_start(self, start):
+    # XXX: should we use some sort of callback for this?
+    r = "\0%s\0%s" % (self.connection.username, self.connection.password)
+    m = self.connection.mechanism
+    self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES,
+                                    mechanism=m, response=r))
+
+  def do_connection_tune(self, tune):
+    # XXX: is heartbeat protocol specific?
+    if tune.channel_max is not None:
+      self.channel_max = tune.channel_max
+    self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat,
+                                   channel_max=self.channel_max))
+    self.write_op(ConnectionOpen())
+
+  def do_connection_open_ok(self, open_ok):
+    self._connected = True
+    # XXX: maybe think about a more generic way to catchup with
+    # deferred work
+    self.dispatch()
+
+  def connection_heartbeat(self, hrt):
+    self.write_op(ConnectionHeartbeat())
+
+  def do_connection_close(self, close):
+    self.write_op(ConnectionCloseOk())
+    if close.reply_ok != close_code.normal:
+      self.connection.error = (close.reply_code, close.reply_text)
+    # XXX: should we do a half shutdown on the socket here?
+    # XXX: we really need to test this, we may end up reporting a
+    # connection abort after this, if we were to do a shutdown on read
+    # and stop reading, then we wouldn't report the abort, that's
+    # probably the right thing to do
+
+  def do_connection_close_ok(self, close_ok):
+    self._socket.close()
+    self.reset()
+
+  def do_session_attached(self, atc):
+    pass
+
+  def do_session_command_point(self, cp):
+    sst = self.get_sst(cp)
+    sst.received = cp.command_id
+
+  def do_session_completed(self, sc):
+    sst = self.get_sst(sc)
+    for r in sc.commands:
+      sst.acknowledged.add(r.lower, r.upper)
+
+    if not sc.commands.empty():
+      while sst.min_completion in sc.commands:
+        if sst.completions.has_key(sst.min_completion):
+          sst.completions.pop(sst.min_completion)()
+        sst.min_completion += 1
+
+  def session_known_completed(self, kcmp):
+    sst = self.get_sst(kcmp)
+    executed = RangedSet()
+    for e in sst.executed.ranges:
+      for ke in kcmp.ranges:
+        if e.lower in ke and e.upper in ke:
+          break
+      else:
+        executed.add_range(e)
+    sst.executed = completed
+
+  def do_session_flush(self, sf):
+    sst = self.get_sst(sf)
+    if sf.expected:
+      if sst.received is None:
+        exp = None
+      else:
+        exp = RangedSet(sst.received)
+      sst.write_op(SessionExpected(exp))
+    if sf.confirmed:
+      sst.write_op(SessionConfirmed(sst.executed))
+    if sf.completed:
+      sst.write_op(SessionCompleted(sst.executed))
+
+  def dispatch(self):
     try:
-      if self._conn is None and self.connection._connected:
+      if self._socket is None and self.connection._connected and not self._opening:
         self.connect()
-      elif self._conn is not None and not self.connection._connected:
+      elif self._socket is not None and not self.connection._connected and not self._closing:
         self.disconnect()
 
-      if self._conn is not None:
+      if self._connected and not self._closing:
         for ssn in self.connection.sessions.values():
           self.attach(ssn)
           self.process(ssn)
-
-      exi = None
     except:
-      exi = sys.exc_info()
-
-    if exi:
       msg = compat.format_exc()
-      recoverable = ["aborted", "Connection refused", "SessionDetached", "Connection reset by peer",
-                     "Bad file descriptor", "start timed out", "Broken pipe"]
-      for r in recoverable:
-        if self.connection.reconnect and r in msg:
-          print "waiting to retry"
-          self.reset()
-          time.sleep(3)
-          print "retrying..."
-          return
-      else:
-        self.connection.error = (msg,)
-
-    self._modcount = modcount
-    self.connection._waiter.notifyAll()
+      self.connection.error = (msg,)
 
   def connect(self):
-    if self._conn is not None:
-      return
     try:
+      # XXX: should make this non blocking
       self._socket = connect(self.connection.host, self.connection.port)
+      self._timeout = None
     except socket.error, e:
-      raise ConnectError(e)
-    self._conn = connection.Connection(self._socket)
-    try:
-      self._conn.start(timeout=10)
-      self._connected = True
-    except connection.VersionError, e:
-      raise ConnectError(e)
-    except Timeout:
-      print "start timed out"
-      raise ConnectError("start timed out")
+      if self.connection.reconnect:
+        self.reset()
+        self._timeout = time.time() + 3
+        log.warn("recoverable error: %s", e)
+        log.warn("sleeping 3 seconds")
+        return
+      else:
+        raise e
+    self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
+    self._opening = True
 
   def disconnect(self):
-    self._conn.close()
-    self.reset()
-
-  def reset(self):
-    self._conn = None
-    self._connected = False
-    self._attachments.clear()
-    for ssn in self.connection.sessions.values():
-      for m in ssn.acked + ssn.unacked + ssn.incoming:
-        m._transfer_id = None
-      for rcv in ssn.receivers:
-        rcv.impending = rcv.received
-
-  def connected(self):
-    return self._conn is not None
+    self.write_op(ConnectionClose(close_code.normal))
+    self._closing = True
 
   def attach(self, ssn):
-    _ssn = self._attachments.get(ssn)
-    if _ssn is None:
-      _ssn = self._conn.session(ssn.name, delegate=delegate(self, ssn))
-      _ssn.auto_sync = False
-      _ssn.invoke_lock = self._lock
-      _ssn.lock = self._lock
-      _ssn.condition = self.connection._condition
+    sst = self._attachments.get(ssn)
+    if sst is None:
+      for i in xrange(0, self.channel_max):
+        if not self._sessions.has_key(i):
+          ch = i
+          break
+      else:
+        raise RuntimeError("all channels used")
+      sst = SessionState(self, ssn, ssn.name, ch)
+      sst.write_op(SessionAttach(name=ssn.name))
+      sst.write_op(SessionCommandPoint(sst.sent, 0))
+      sst.outgoing_idx = 0
+      sst.acked = []
       if ssn.transactional:
-        # XXX: adding an attribute to qpid.session.Session
-        _ssn.acked = []
-        _ssn.tx_select()
-      self._attachments[ssn] = _ssn
+        sst.write_cmd(TxSelect())
+      self._attachments[ssn] = sst
+      self._sessions[sst.channel] = sst
 
     for snd in ssn.senders:
       self.link_out(snd)
     for rcv in ssn.receivers:
       self.link_in(rcv)
 
-    if ssn.closing:
-      _ssn.close()
-      del self._attachments[ssn]
-      ssn.closed = True
+    if ssn.closing and not sst.detached:
+      sst.detached = True
+      sst.write_op(SessionDetach(name=ssn.name))
+
+  def get_sst(self, op):
+    return self._sessions[op.channel]
+
+  def do_session_detached(self, dtc):
+    sst = self._sessions.pop(dtc.channel)
+    ssn = sst.session
+    del self._attachments[ssn]
+    ssn.closed = True
 
   def _exchange_query(self, ssn, address):
     # XXX: auto sync hack is to avoid deadlock on future
@@ -197,16 +415,16 @@
     return result.get()
 
   def link_out(self, snd):
-    _ssn = self._attachments[snd.session]
+    sst = self._attachments[snd.session]
     _snd = self._attachments.get(snd)
     if _snd is None:
       _snd = Attachment(snd)
       node, _snd._subject = parse_addr(snd.target)
-      result = self._exchange_query(_ssn, node)
-      if result.not_found:
+      # XXX: result = self._exchange_query(sst, node)
+#      if result.not_found:
+      if True:
         # XXX: should check 'create' option
-        _ssn.queue_declare(queue=node, durable=DURABLE_DEFAULT, sync=True)
-        _ssn.sync()
+        sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT))
         _snd._exchange = ""
         _snd._routing_key = node
       else:
@@ -221,35 +439,37 @@
       return _snd
 
   def link_in(self, rcv):
-    _ssn = self._attachments[rcv.session]
+    sst = self._attachments[rcv.session]
     _rcv = self._attachments.get(rcv)
     if _rcv is None:
       _rcv = Attachment(rcv)
-      result = self._exchange_query(_ssn, rcv.source)
-      if result.not_found:
+      # XXX: result = self._exchange_query(sst, rcv.source)
+#      if result.not_found:
+      _rcv.canceled = False
+      _rcv.draining = False
+      if True:
         _rcv._queue = rcv.source
         # XXX: should check 'create' option
-        _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT)
+        sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
       else:
         _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
-        _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)
+        sst.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)
         if rcv.filter is None:
           f = FILTER_DEFAULTS[result.type]
         else:
           f = rcv.filter
-        f._bind(_ssn, rcv.source, _rcv._queue)
-      _ssn.message_subscribe(queue=_rcv._queue, destination=rcv.destination)
-      _ssn.message_set_flow_mode(rcv.destination, _ssn.flow_mode.credit, sync=True)
+        f._bind(sst, rcv.source, _rcv._queue)
+      sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
+      sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
       self._attachments[rcv] = _rcv
-      # XXX: need to kill syncs
-      _ssn.sync()
 
     if rcv.closing:
-      _ssn.message_cancel(rcv.destination, sync=True)
-      # XXX: need to kill syncs
-      _ssn.sync()
-      del self._attachments[rcv]
-      rcv.closed = True
+      if not _rcv.canceled:
+        def close_rcv():
+          del self._attachments[rcv]
+          rcv.closed = True
+        sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
+        _rcv.canceled = True
       return None
     else:
       return _rcv
@@ -257,80 +477,83 @@
   def process(self, ssn):
     if ssn.closing: return
 
-    _ssn = self._attachments[ssn]
+    sst = self._attachments[ssn]
 
-    while ssn.outgoing:
-      msg = ssn.outgoing[0]
+    while sst.outgoing_idx < len(ssn.outgoing):
+      msg = ssn.outgoing[sst.outgoing_idx]
       snd = msg._sender
       self.send(snd, msg)
-      ssn.outgoing.pop(0)
+      sst.outgoing_idx += 1
 
     for rcv in ssn.receivers:
       self.process_receiver(rcv)
 
     if ssn.acked:
-      messages = ssn.acked[:]
-      ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
-      for range in ids:
-        _ssn.receiver._completed.add_range(range)
-      ch = _ssn.channel
-      if ch is None:
-        raise SessionDetached()
-      ch.session_completed(_ssn.receiver._completed)
-      _ssn.message_accept(ids, sync=True)
-      # XXX: really need to make this async so that we don't give up the lock
-      _ssn.sync()
-
-      # XXX: we're ignoring acks that get lost when disconnected
-      for m in messages:
-        ssn.acked.remove(m)
-        if ssn.transactional:
-          _ssn.acked.append(m)
-
-    if ssn.committing:
-      _ssn.tx_commit(sync=True)
-      # XXX: need to kill syncs
-      _ssn.sync()
-      del _ssn.acked[:]
-      ssn.committing = False
-      ssn.committed = True
-      ssn.aborting = False
-      ssn.aborted = False
+      messages = [m for m in ssn.acked if m not in sst.acked]
+      if messages:
+        # XXX: we're ignoring acks that get lost when disconnected,
+        # could we deal this via some message-id based purge?
+        ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
+        for range in ids:
+          sst.executed.add_range(range)
+        sst.write_op(SessionCompleted(sst.executed))
+        def ack_ack():
+          for m in messages:
+            ssn.acked.remove(m)
+            if not ssn.transactional:
+              sst.acked.remove(m)
+        sst.write_cmd(MessageAccept(ids, sync=True), ack_ack)
+        sst.acked.extend(messages)
+
+    if ssn.committing and not sst.committing:
+      def commit_ok():
+        del sst.acked[:]
+        ssn.committing = False
+        ssn.committed = True
+        ssn.aborting = False
+        ssn.aborted = False
+      sst.write_cmd(TxCommit(sync=True), commit_ok)
+      sst.committing = True
+
+    if ssn.aborting and not sst.aborting:
+      sst.aborting = True
+      def do_rb():
+        messages = sst.acked + ssn.unacked + ssn.incoming
+        ids = RangedSet(*[m._transfer_id for m in messages])
+        for range in ids:
+          sst.executed.add_range(range)
+        sst.write_op(SessionCompleted(sst.executed))
+        sst.write_cmd(MessageRelease(ids))
+        sst.write_cmd(TxRollback(sync=True), do_rb_ok)
+
+      def do_rb_ok():
+        del ssn.incoming[:]
+        del ssn.unacked[:]
+        del sst.acked[:]
+
+        for rcv in ssn.receivers:
+          rcv.impending = rcv.received
+          rcv.returned = rcv.received
+          # XXX: do we need to update granted here as well?
+
+        for rcv in ssn.receivers:
+          self.process_receiver(rcv)
+
+        ssn.aborting = False
+        ssn.aborted = True
+        ssn.committing = False
+        ssn.committed = False
+        sst.aborting = False
 
-    if ssn.aborting:
       for rcv in ssn.receivers:
-        _ssn.message_stop(rcv.destination)
-      _ssn.sync()
-
-      messages = _ssn.acked + ssn.unacked + ssn.incoming
-      ids = RangedSet(*[m._transfer_id for m in messages])
-      for range in ids:
-        _ssn.receiver._completed.add_range(range)
-      _ssn.channel.session_completed(_ssn.receiver._completed)
-      _ssn.message_release(ids)
-      _ssn.tx_rollback(sync=True)
-      _ssn.sync()
-
-      del ssn.incoming[:]
-      del ssn.unacked[:]
-      del _ssn.acked[:]
-
-      for rcv in ssn.receivers:
-        rcv.impending = rcv.received
-        rcv.returned = rcv.received
-        # XXX: do we need to update granted here as well?
-
-      for rcv in ssn.receivers:
-        self.process_receiver(rcv)
-
-      ssn.aborting = False
-      ssn.aborted = True
-      ssn.committing = False
-      ssn.committed = False
+        sst.write_cmd(MessageStop(rcv.destination))
+      sst.write_cmd(ExecutionSync(sync=True), do_rb)
 
   def grant(self, rcv):
-    _ssn = self._attachments[rcv.session]
+    sst = self._attachments[rcv.session]
     _rcv = self.link_in(rcv)
+    if _rcv is None or _rcv.draining:
+      return
 
     if rcv.granted is UNLIMITED:
       if rcv.impending is UNLIMITED:
@@ -343,29 +566,30 @@
       delta = max(rcv.granted, rcv.received) - rcv.impending
 
     if delta is UNLIMITED:
-      _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
-      _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value)
+      sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value))
+      sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, UNLIMITED.value))
       rcv.impending = UNLIMITED
     elif delta > 0:
-      _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
-      _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta)
+      sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value))
+      sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, delta))
       rcv.impending += delta
     elif delta < 0:
+      _rcv.draining = True
+      def flush_stop_cmplt():
+        rcv.impending = rcv.received
+        _rcv.draining = False
+        self.grant(rcv)
       if rcv.drain:
-        _ssn.message_flush(rcv.destination, sync=True)
+        sst.write_cmd(MessageFlush(rcv.destination, sync=True), flush_stop_cmplt)
       else:
-        _ssn.message_stop(rcv.destination, sync=True)
-      # XXX: need to kill syncs
-      _ssn.sync()
-      rcv.impending = rcv.received
-      self.grant(rcv)
+        sst.write_cmd(MessageStop(rcv.destination, sync=True), flush_stop_cmplt)
 
   def process_receiver(self, rcv):
     if rcv.closed: return
     self.grant(rcv)
 
   def send(self, snd, msg):
-    _ssn = self._attachments[snd.session]
+    sst = self._attachments[snd.session]
     _snd = self.link_out(snd)
 
     # XXX: what if subject is specified for a normal queue?
@@ -375,16 +599,16 @@
       rk = _snd._routing_key
     # XXX: do we need to query to figure out how to create the reply-to interoperably?
     if msg.reply_to:
-      rt = _ssn.reply_to(*parse_addr(msg.reply_to))
+      rt = ReplyTo(*parse_addr(msg.reply_to))
     else:
       rt = None
-    dp = _ssn.delivery_properties(routing_key=rk)
-    mp = _ssn.message_properties(message_id=msg.id,
-                                 user_id=msg.user_id,
-                                 reply_to=rt,
-                                 correlation_id=msg.correlation_id,
-                                 content_type=msg.content_type,
-                                 application_headers=msg.properties)
+    dp = DeliveryProperties(routing_key=rk)
+    mp = MessageProperties(message_id=msg.id,
+                           user_id=msg.user_id,
+                           reply_to=rt,
+                           correlation_id=msg.correlation_id,
+                           content_type=msg.content_type,
+                           application_headers=msg.properties)
     if msg.subject is not None:
       if mp.application_headers is None:
         mp.application_headers = {}
@@ -397,37 +621,43 @@
       dp.delivery_mode = delivery_mode.persistent
     enc, dec = get_codec(msg.content_type)
     body = enc(msg.content)
-    _ssn.message_transfer(destination=_snd._exchange,
-                          message=Message010(dp, mp, body),
-                          sync=True)
-    log.debug("SENT [%s] %s", snd.session, msg)
-    # XXX: really need to make this async so that we don't give up the lock
-    _ssn.sync()
-    # XXX: should we log the ack somehow too?
-    snd.acked += 1
+    def msg_acked():
+      # XXX: should we log the ack somehow too?
+      snd.acked += 1
+      m = snd.session.outgoing.pop(0)
+      sst.outgoing_idx -= 1
+      assert msg == m
+    sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
+                                   payload=body, sync=True), msg_acked)
+
+  def do_message_transfer(self, xfr):
+    sst = self.get_sst(xfr)
+    ssn = sst.session
 
-  @synchronized
-  def _message_transfer(self, ssn, cmd):
-    m = Message010(cmd.payload)
-    m.headers = cmd.headers
-    m.id = cmd.id
-    msg = self._decode(m)
-    rcv = ssn.receivers[int(cmd.destination)]
+    msg = self._decode(xfr)
+    rcv = ssn.receivers[int(xfr.destination)]
     msg._receiver = rcv
     if rcv.impending is not UNLIMITED:
-      assert rcv.received < rcv.impending
+      assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
     rcv.received += 1
     log.debug("RECV [%s] %s", ssn, msg)
     ssn.incoming.append(msg)
     self.connection._waiter.notifyAll()
     return INCOMPLETE
 
-  def _decode(self, message):
-    dp = message.get("delivery_properties")
-    mp = message.get("message_properties")
+  def _decode(self, xfr):
+    dp = EMPTY_DP
+    mp = EMPTY_MP
+
+    for h in xfr.headers:
+      if isinstance(h, DeliveryProperties):
+        dp = h
+      elif isinstance(h, MessageProperties):
+        mp = h
+
     ap = mp.application_headers
     enc, dec = get_codec(mp.content_type)
-    content = dec(message.body)
+    content = dec(xfr.payload)
     msg = Message(content)
     msg.id = mp.message_id
     if ap is not None:
@@ -440,5 +670,5 @@
     msg.durable = dp.delivery_mode == delivery_mode.persistent
     msg.properties = mp.application_headers
     msg.content_type = mp.content_type
-    msg._transfer_id = message.id
+    msg._transfer_id = xfr.id
     return msg

Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=818075&r1=818074&r2=818075&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Wed Sep 23 12:30:49 2009
@@ -77,7 +77,8 @@
   """
 
   @static
-  def open(host, port=None):
+  def open(host, port=None, username="guest", password="guest",
+           mechanism="PLAIN", heartbeat=None):
     """
     Creates an AMQP connection and connects it to the given host and port.
 
@@ -88,11 +89,12 @@
     @rtype: Connection
     @return: a connected Connection
     """
-    conn = Connection(host, port)
+    conn = Connection(host, port, username, password, mechanism, heartbeat)
     conn.connect()
     return conn
 
-  def __init__(self, host, port=None):
+  def __init__(self, host, port=None, username="guest", password="guest",
+               mechanism="PLAIN", heartbeat=None):
     """
     Creates a connection. A newly created connection must be connected
     with the Connection.connect() method before it can be started.
@@ -106,6 +108,11 @@
     """
     self.host = host
     self.port = default(port, AMQP_PORT)
+    self.username = username
+    self.password = password
+    self.mechanism = mechanism
+    self.heartbeat = heartbeat
+
     self.started = False
     self.id = str(uuid4())
     self.session_counter = 0

Added: qpid/trunk/qpid/python/qpid/selector.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/selector.py?rev=818075&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid/selector.py (added)
+++ qpid/trunk/qpid/python/qpid/selector.py Wed Sep 23 12:30:49 2009
@@ -0,0 +1,157 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import atexit, os, time
+from select import select
+from threading import Thread, Lock
+
+class Acceptor:
+
+  def __init__(self, sock, handler):
+    self.sock = sock
+    self.handler = handler
+
+  def fileno(self):
+    return self.sock.fileno()
+
+  def reading(self):
+    return True
+
+  def writing(self):
+    return False
+
+  def readable(self):
+    sock, addr = self.sock.accept()
+    self.handler(sock)
+
+class Sink:
+
+  def __init__(self, fd):
+    self.fd = fd
+
+  def fileno(self):
+    return self.fd
+
+  def reading(self):
+    return True
+
+  def readable(self):
+    os.read(self.fd, 65536)
+
+  def __repr__(self):
+    return "Sink(%r)" % self.fd
+
+class Selector:
+
+  lock = Lock()
+  DEFAULT = None
+
+  @staticmethod
+  def default():
+    Selector.lock.acquire()
+    try:
+      if Selector.DEFAULT is None:
+        sel = Selector()
+        atexit.register(sel.stop)
+        sel.start()
+        Selector.DEFAULT = sel
+      return Selector.DEFAULT
+    finally:
+      Selector.lock.release()
+
+  def __init__(self):
+    self.selectables = set()
+    self.reading = set()
+    self.writing = set()
+    self.wait_fd, self.wakeup_fd = os.pipe()
+    self.reading.add(Sink(self.wait_fd))
+    self.stopped = False
+    self.thread = None
+
+  def wakeup(self):
+    while True:
+      select([], [self.wakeup_fd], [])
+      if os.write(self.wakeup_fd, "\0") > 0:
+        break
+
+  def register(self, selectable):
+    self.selectables.add(selectable)
+    self.modify(selectable)
+
+  def _update(self, selectable):
+    if selectable.reading():
+      self.reading.add(selectable)
+    else:
+      self.reading.discard(selectable)
+    if selectable.writing():
+      self.writing.add(selectable)
+    else:
+      self.writing.discard(selectable)
+    return selectable.timing()
+
+  def modify(self, selectable):
+    self._update(selectable)
+    self.wakeup()
+
+  def unregister(self, selectable):
+    self.reading.discard(selectable)
+    self.writing.discard(selectable)
+    self.selectables.discard(selectable)
+    self.wakeup()
+
+  def start(self):
+    self.stopped = False
+    self.thread = Thread(target=self.run)
+    self.thread.setDaemon(True)
+    self.thread.start();
+
+  def run(self):
+    while not self.stopped:
+      wakeup = None
+      for sel in self.selectables.copy():
+        t = self._update(sel)
+        if t is not None:
+          if wakeup is None:
+            wakeup = t
+          else:
+            wakeup = min(wakeup, t)
+
+      if wakeup is None:
+        timeout = None
+      else:
+        timeout = max(0, wakeup - time.time())
+
+      rd, wr, ex = select(self.reading, self.writing, (), timeout)
+
+      for sel in wr:
+        sel.writeable()
+
+      for sel in rd:
+        sel.readable()
+
+      now = time.time()
+      for sel in self.selectables.copy():
+        w = sel.timing()
+        if w is not None and now > w:
+          sel.timeout()
+
+  def stop(self, timeout=None):
+    self.stopped = True
+    self.wakeup()
+    self.thread.join(timeout)
+    self.thread = None



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org