You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/09/23 14:30:50 UTC
svn commit: r818075 - in /qpid/trunk/qpid/python/qpid: address.py driver.py
messaging.py selector.py
Author: rhs
Date: Wed Sep 23 12:30:49 2009
New Revision: 818075
URL: http://svn.apache.org/viewvc?rev=818075&view=rev
Log:
switched API over to select based driver; added address parser
Added:
qpid/trunk/qpid/python/qpid/address.py
qpid/trunk/qpid/python/qpid/selector.py
Modified:
qpid/trunk/qpid/python/qpid/driver.py
qpid/trunk/qpid/python/qpid/messaging.py
Added: qpid/trunk/qpid/python/qpid/address.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/address.py?rev=818075&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid/address.py (added)
+++ qpid/trunk/qpid/python/qpid/address.py Wed Sep 23 12:30:49 2009
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import re
+
+TYPES = []
+
+class Type:
+
+ def __init__(self, name, pattern=None):
+ self.name = name
+ self.pattern = pattern
+ if self.pattern:
+ TYPES.append(self)
+
+ def __repr__(self):
+ return self.name
+
+LBRACE = Type("LBRACE", r"\{")
+RBRACE = Type("RBRACE", r"\}")
+COLON = Type("COLON", r":")
+COMMA = Type("COMMA", r",")
+ID = Type("ID", r'[a-zA-Z_][a-zA-Z0-9_]*')
+NUMBER = Type("NUMBER", r'[+-]?[0-9]*\.?[0-9]+')
+STRING = Type("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""")
+WSPACE = Type("WSPACE", r"[ \n\r\t]+")
+EOF = Type("EOF")
+
+class Token:
+
+ def __init__(self, type, value):
+ self.type = type
+ self.value = value
+
+ def __repr__(self):
+ return "%s: %r" % (self.type, self.value)
+
+joined = "|".join(["(%s)" % t.pattern for t in TYPES])
+LEXER = re.compile(joined)
+
+def lex(st):
+ pos = 0
+ while pos < len(st):
+ m = LEXER.match(st, pos)
+ if m is None:
+ raise ValueError(repr(st[pos:]))
+ else:
+ idx = m.lastindex
+ t = Token(TYPES[idx - 1], m.group(idx))
+ yield t
+ pos = m.end()
+ yield Token(EOF, None)
+
+class ParseError(Exception): pass
+
+class EOF(Exception): pass
+
+class Parser:
+
+ def __init__(self, tokens):
+ self.tokens = [t for t in tokens if t.type is not WSPACE]
+ self.idx = 0
+
+ def next(self):
+ return self.tokens[self.idx]
+
+ def matches(self, *types):
+ return self.next().type in types
+
+ def eat(self, *types):
+ if types and not self.matches(*types):
+ raise ParseError("expecting %s -- got %s" % (", ".join(map(str, types)), self.next()))
+ else:
+ t = self.next()
+ self.idx += 1
+ return t
+
+ def parse(self):
+ result = self.address()
+ self.eat(EOF)
+ return result
+
+ def address(self):
+ name = self.eat(ID).value
+ if self.matches(LBRACE):
+ options = self.map()
+ else:
+ options = None
+ return name, options
+
+ def map(self):
+ self.eat(LBRACE)
+ result = {}
+ while True:
+ if self.matches(RBRACE):
+ self.eat(RBRACE)
+ break
+ else:
+ if self.matches(ID):
+ n, v = self.nameval()
+ result[n] = v
+ elif self.matches(COMMA):
+ self.eat(COMMA)
+ else:
+ raise ParseError("expecting (ID, COMMA), got %s" % self.next())
+ return result
+
+ def nameval(self):
+ name = self.eat(ID).value
+ self.eat(COLON)
+ val = self.value()
+ return (name, val)
+
+ def value(self):
+ if self.matches(NUMBER, STRING):
+ return eval(self.eat().value)
+ elif self.matches(LBRACE):
+ return self.map()
+ else:
+ raise ParseError("expecting (NUMBER, STRING, LBRACE) got %s" % self.next())
+
+def parse(addr):
+ return Parser(lex(addr)).parse()
+
+__all__ = ["parse"]
Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=818075&r1=818074&r2=818075&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Wed Sep 23 12:30:49 2009
@@ -17,13 +17,15 @@
# under the License.
#
-import compat, connection, socket, sys, time
+import compat, connection, socket, struct, sys, time
from concurrency import synchronized
-from datatypes import RangedSet, Message as Message010
-from exceptions import Timeout
+from datatypes import RangedSet, Serial
+from exceptions import Timeout, VersionError
+from framing import OpEncoder, SegmentEncoder, FrameEncoder, FrameDecoder, SegmentDecoder, OpDecoder
from logging import getLogger
from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED
-from ops import delivery_mode
+from ops import *
+from selector import Selector
from session import Client, INCOMPLETE, SessionDetached
from threading import Condition, Thread
from util import connect
@@ -50,145 +52,361 @@
def __init__(self, target):
self.target = target
+# XXX
+
DURABLE_DEFAULT=True
+# XXX
+
FILTER_DEFAULTS = {
"topic": Pattern("*")
}
-def delegate(handler, session):
- class Delegate(Client):
+# XXX
+
+CLIENT_PROPERTIES = {"product": "qpid python client",
+ "version": "development",
+ "platform": os.name,
+ "qpid.client_process": os.path.basename(sys.argv[0]),
+ "qpid.client_pid": os.getpid(),
+ "qpid.client_ppid": os.getppid()}
+
+def noop(): pass
+
+class SessionState:
+
+ def __init__(self, driver, session, name, channel):
+ self.driver = driver
+ self.session = session
+ self.name = name
+ self.channel = channel
+ self.detached = False
+ self.committing = False
+ self.aborting = False
+
+ # sender state
+ self.sent = Serial(0)
+ self.acknowledged = RangedSet()
+ self.completions = {}
+ self.min_completion = self.sent
+ self.max_completion = self.sent
+
+ # receiver state
+ self.received = None
+ self.executed = RangedSet()
+
+ # XXX: need to periodically exchange completion/known_completion
+
+ def write_cmd(self, cmd, completion=noop):
+ cmd.id = self.sent
+ self.sent += 1
+ self.completions[cmd.id] = completion
+ self.max_completion = cmd.id
+ self.write_op(cmd)
+
+ def write_op(self, op):
+ op.channel = self.channel
+ self.driver.write_op(op)
- def message_transfer(self, cmd):
- return handler._message_transfer(session, cmd)
- return Delegate
+# XXX
+HEADER="!4s4B"
+
+EMPTY_DP = DeliveryProperties()
+EMPTY_MP = MessageProperties()
class Driver:
def __init__(self, connection):
self.connection = connection
self._lock = self.connection._lock
- self._wakeup_cond = Condition()
- self._socket = None
- self._conn = None
+
+ self._selector = Selector.default()
+ self.reset()
+
+ def reset(self):
+ self._opening = False
+ self._closing = False
self._connected = False
self._attachments = {}
- self._modcount = self.connection._modcount
- self.thread = Thread(target=self.run)
- self.thread.setDaemon(True)
- # XXX: need to figure out how to join on this thread
+ self._channel_max = 65536
+ self._channels = 0
+ self._sessions = {}
+
+ self._socket = None
+ self._buf = ""
+ self._hdr = ""
+ self._op_enc = OpEncoder()
+ self._seg_enc = SegmentEncoder()
+ self._frame_enc = FrameEncoder()
+ self._frame_dec = FrameDecoder()
+ self._seg_dec = SegmentDecoder()
+ self._op_dec = OpDecoder()
+ self._timeout = None
+
+ for ssn in self.connection.sessions.values():
+ for m in ssn.acked + ssn.unacked + ssn.incoming:
+ m._transfer_id = None
+ for rcv in ssn.receivers:
+ rcv.impending = rcv.received
+
+ @synchronized
def wakeup(self):
- self._wakeup_cond.acquire()
- try:
- self._wakeup_cond.notifyAll()
- finally:
- self._wakeup_cond.release()
+ self.dispatch()
+ self._selector.wakeup()
def start(self):
- self.thread.start()
+ self._selector.register(self)
- def run(self):
- while True:
- self._wakeup_cond.acquire()
+ def fileno(self):
+ return self._socket.fileno()
+
+ @synchronized
+ def reading(self):
+ return self._socket is not None
+
+ @synchronized
+ def writing(self):
+ return self._socket is not None and self._buf
+
+ @synchronized
+ def timing(self):
+ return self._timeout
+
+ @synchronized
+ def readable(self):
+ error = None
+ recoverable = False
+ try:
+ data = self._socket.recv(64*1024)
+ if data:
+ log.debug("READ: %r", data)
+ else:
+ error = ("connection aborted",)
+ recoverable = True
+ except socket.error, e:
+ error = (e,)
+ recoverable = True
+
+ if not error:
try:
- if self.connection._modcount <= self._modcount:
- self._wakeup_cond.wait(10)
- finally:
- self._wakeup_cond.release()
- self.dispatch(self.connection._modcount)
+ if len(self._hdr) < 8:
+ r = 8 - len(self._hdr)
+ self._hdr += data[:r]
+ data = data[r:]
+
+ if len(self._hdr) == 8:
+ self.do_header(self._hdr)
+
+ self._frame_dec.write(data)
+ self._seg_dec.write(*self._frame_dec.read())
+ self._op_dec.write(*self._seg_dec.read())
+ for op in self._op_dec.read():
+ self.assign_id(op)
+ log.debug("RCVD: %r", op)
+ op.dispatch(self)
+ except VersionError, e:
+ error = (e,)
+ except:
+ msg = compat.format_exc()
+ error = (msg,)
+
+ if error:
+ self._socket.close()
+ self.reset()
+ if recoverable and self.connection.reconnect:
+ self._timeout = time.time() + 3
+ log.warn("recoverable error: %s" % error)
+ log.warn("sleeping 3 seconds")
+ else:
+ self.connection.error = error
+
+ self.connection._waiter.notifyAll()
+
+ def assign_id(self, op):
+ if isinstance(op, Command):
+ sst = self.get_sst(op)
+ op.id = sst.received
+ sst.received += 1
+
+ @synchronized
+ def writeable(self):
+ n = self._socket.send(self._buf)
+ log.debug("SENT: %r", self._buf[:n])
+ self._buf = self._buf[n:]
@synchronized
- def dispatch(self, modcount):
+ def timeout(self):
+ log.warn("retrying ...")
+ self.dispatch()
+ self.connection._waiter.notifyAll()
+
+ def write_op(self, op):
+ log.debug("SENT: %r", op)
+ self._op_enc.write(op)
+ self._seg_enc.write(*self._op_enc.read())
+ self._frame_enc.write(*self._seg_enc.read())
+ self._buf += self._frame_enc.read()
+
+ def do_header(self, hdr):
+ cli_major = 0; cli_minor = 10
+ magic, _, _, major, minor = struct.unpack(HEADER, hdr)
+ if major != cli_major or minor != cli_minor:
+ raise VersionError("client: %s-%s, server: %s-%s" %
+ (cli_major, cli_minor, major, minor))
+
+ def do_connection_start(self, start):
+ # XXX: should we use some sort of callback for this?
+ r = "\0%s\0%s" % (self.connection.username, self.connection.password)
+ m = self.connection.mechanism
+ self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES,
+ mechanism=m, response=r))
+
+ def do_connection_tune(self, tune):
+ # XXX: is heartbeat protocol specific?
+ if tune.channel_max is not None:
+ self.channel_max = tune.channel_max
+ self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat,
+ channel_max=self.channel_max))
+ self.write_op(ConnectionOpen())
+
+ def do_connection_open_ok(self, open_ok):
+ self._connected = True
+ # XXX: maybe think about a more generic way to catchup with
+ # deferred work
+ self.dispatch()
+
+ def connection_heartbeat(self, hrt):
+ self.write_op(ConnectionHeartbeat())
+
+ def do_connection_close(self, close):
+ self.write_op(ConnectionCloseOk())
+ if close.reply_ok != close_code.normal:
+ self.connection.error = (close.reply_code, close.reply_text)
+ # XXX: should we do a half shutdown on the socket here?
+ # XXX: we really need to test this, we may end up reporting a
+ # connection abort after this, if we were to do a shutdown on read
+ # and stop reading, then we wouldn't report the abort, that's
+ # probably the right thing to do
+
+ def do_connection_close_ok(self, close_ok):
+ self._socket.close()
+ self.reset()
+
+ def do_session_attached(self, atc):
+ pass
+
+ def do_session_command_point(self, cp):
+ sst = self.get_sst(cp)
+ sst.received = cp.command_id
+
+ def do_session_completed(self, sc):
+ sst = self.get_sst(sc)
+ for r in sc.commands:
+ sst.acknowledged.add(r.lower, r.upper)
+
+ if not sc.commands.empty():
+ while sst.min_completion in sc.commands:
+ if sst.completions.has_key(sst.min_completion):
+ sst.completions.pop(sst.min_completion)()
+ sst.min_completion += 1
+
+ def session_known_completed(self, kcmp):
+ sst = self.get_sst(kcmp)
+ executed = RangedSet()
+ for e in sst.executed.ranges:
+ for ke in kcmp.ranges:
+ if e.lower in ke and e.upper in ke:
+ break
+ else:
+ executed.add_range(e)
+ sst.executed = completed
+
+ def do_session_flush(self, sf):
+ sst = self.get_sst(sf)
+ if sf.expected:
+ if sst.received is None:
+ exp = None
+ else:
+ exp = RangedSet(sst.received)
+ sst.write_op(SessionExpected(exp))
+ if sf.confirmed:
+ sst.write_op(SessionConfirmed(sst.executed))
+ if sf.completed:
+ sst.write_op(SessionCompleted(sst.executed))
+
+ def dispatch(self):
try:
- if self._conn is None and self.connection._connected:
+ if self._socket is None and self.connection._connected and not self._opening:
self.connect()
- elif self._conn is not None and not self.connection._connected:
+ elif self._socket is not None and not self.connection._connected and not self._closing:
self.disconnect()
- if self._conn is not None:
+ if self._connected and not self._closing:
for ssn in self.connection.sessions.values():
self.attach(ssn)
self.process(ssn)
-
- exi = None
except:
- exi = sys.exc_info()
-
- if exi:
msg = compat.format_exc()
- recoverable = ["aborted", "Connection refused", "SessionDetached", "Connection reset by peer",
- "Bad file descriptor", "start timed out", "Broken pipe"]
- for r in recoverable:
- if self.connection.reconnect and r in msg:
- print "waiting to retry"
- self.reset()
- time.sleep(3)
- print "retrying..."
- return
- else:
- self.connection.error = (msg,)
-
- self._modcount = modcount
- self.connection._waiter.notifyAll()
+ self.connection.error = (msg,)
def connect(self):
- if self._conn is not None:
- return
try:
+ # XXX: should make this non blocking
self._socket = connect(self.connection.host, self.connection.port)
+ self._timeout = None
except socket.error, e:
- raise ConnectError(e)
- self._conn = connection.Connection(self._socket)
- try:
- self._conn.start(timeout=10)
- self._connected = True
- except connection.VersionError, e:
- raise ConnectError(e)
- except Timeout:
- print "start timed out"
- raise ConnectError("start timed out")
+ if self.connection.reconnect:
+ self.reset()
+ self._timeout = time.time() + 3
+ log.warn("recoverable error: %s", e)
+ log.warn("sleeping 3 seconds")
+ return
+ else:
+ raise e
+ self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
+ self._opening = True
def disconnect(self):
- self._conn.close()
- self.reset()
-
- def reset(self):
- self._conn = None
- self._connected = False
- self._attachments.clear()
- for ssn in self.connection.sessions.values():
- for m in ssn.acked + ssn.unacked + ssn.incoming:
- m._transfer_id = None
- for rcv in ssn.receivers:
- rcv.impending = rcv.received
-
- def connected(self):
- return self._conn is not None
+ self.write_op(ConnectionClose(close_code.normal))
+ self._closing = True
def attach(self, ssn):
- _ssn = self._attachments.get(ssn)
- if _ssn is None:
- _ssn = self._conn.session(ssn.name, delegate=delegate(self, ssn))
- _ssn.auto_sync = False
- _ssn.invoke_lock = self._lock
- _ssn.lock = self._lock
- _ssn.condition = self.connection._condition
+ sst = self._attachments.get(ssn)
+ if sst is None:
+ for i in xrange(0, self.channel_max):
+ if not self._sessions.has_key(i):
+ ch = i
+ break
+ else:
+ raise RuntimeError("all channels used")
+ sst = SessionState(self, ssn, ssn.name, ch)
+ sst.write_op(SessionAttach(name=ssn.name))
+ sst.write_op(SessionCommandPoint(sst.sent, 0))
+ sst.outgoing_idx = 0
+ sst.acked = []
if ssn.transactional:
- # XXX: adding an attribute to qpid.session.Session
- _ssn.acked = []
- _ssn.tx_select()
- self._attachments[ssn] = _ssn
+ sst.write_cmd(TxSelect())
+ self._attachments[ssn] = sst
+ self._sessions[sst.channel] = sst
for snd in ssn.senders:
self.link_out(snd)
for rcv in ssn.receivers:
self.link_in(rcv)
- if ssn.closing:
- _ssn.close()
- del self._attachments[ssn]
- ssn.closed = True
+ if ssn.closing and not sst.detached:
+ sst.detached = True
+ sst.write_op(SessionDetach(name=ssn.name))
+
+ def get_sst(self, op):
+ return self._sessions[op.channel]
+
+ def do_session_detached(self, dtc):
+ sst = self._sessions.pop(dtc.channel)
+ ssn = sst.session
+ del self._attachments[ssn]
+ ssn.closed = True
def _exchange_query(self, ssn, address):
# XXX: auto sync hack is to avoid deadlock on future
@@ -197,16 +415,16 @@
return result.get()
def link_out(self, snd):
- _ssn = self._attachments[snd.session]
+ sst = self._attachments[snd.session]
_snd = self._attachments.get(snd)
if _snd is None:
_snd = Attachment(snd)
node, _snd._subject = parse_addr(snd.target)
- result = self._exchange_query(_ssn, node)
- if result.not_found:
+ # XXX: result = self._exchange_query(sst, node)
+# if result.not_found:
+ if True:
# XXX: should check 'create' option
- _ssn.queue_declare(queue=node, durable=DURABLE_DEFAULT, sync=True)
- _ssn.sync()
+ sst.write_cmd(QueueDeclare(queue=node, durable=DURABLE_DEFAULT))
_snd._exchange = ""
_snd._routing_key = node
else:
@@ -221,35 +439,37 @@
return _snd
def link_in(self, rcv):
- _ssn = self._attachments[rcv.session]
+ sst = self._attachments[rcv.session]
_rcv = self._attachments.get(rcv)
if _rcv is None:
_rcv = Attachment(rcv)
- result = self._exchange_query(_ssn, rcv.source)
- if result.not_found:
+ # XXX: result = self._exchange_query(sst, rcv.source)
+# if result.not_found:
+ _rcv.canceled = False
+ _rcv.draining = False
+ if True:
_rcv._queue = rcv.source
# XXX: should check 'create' option
- _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT)
+ sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
else:
_rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
- _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)
+ sst.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)
if rcv.filter is None:
f = FILTER_DEFAULTS[result.type]
else:
f = rcv.filter
- f._bind(_ssn, rcv.source, _rcv._queue)
- _ssn.message_subscribe(queue=_rcv._queue, destination=rcv.destination)
- _ssn.message_set_flow_mode(rcv.destination, _ssn.flow_mode.credit, sync=True)
+ f._bind(sst, rcv.source, _rcv._queue)
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
+ sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
self._attachments[rcv] = _rcv
- # XXX: need to kill syncs
- _ssn.sync()
if rcv.closing:
- _ssn.message_cancel(rcv.destination, sync=True)
- # XXX: need to kill syncs
- _ssn.sync()
- del self._attachments[rcv]
- rcv.closed = True
+ if not _rcv.canceled:
+ def close_rcv():
+ del self._attachments[rcv]
+ rcv.closed = True
+ sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
+ _rcv.canceled = True
return None
else:
return _rcv
@@ -257,80 +477,83 @@
def process(self, ssn):
if ssn.closing: return
- _ssn = self._attachments[ssn]
+ sst = self._attachments[ssn]
- while ssn.outgoing:
- msg = ssn.outgoing[0]
+ while sst.outgoing_idx < len(ssn.outgoing):
+ msg = ssn.outgoing[sst.outgoing_idx]
snd = msg._sender
self.send(snd, msg)
- ssn.outgoing.pop(0)
+ sst.outgoing_idx += 1
for rcv in ssn.receivers:
self.process_receiver(rcv)
if ssn.acked:
- messages = ssn.acked[:]
- ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
- for range in ids:
- _ssn.receiver._completed.add_range(range)
- ch = _ssn.channel
- if ch is None:
- raise SessionDetached()
- ch.session_completed(_ssn.receiver._completed)
- _ssn.message_accept(ids, sync=True)
- # XXX: really need to make this async so that we don't give up the lock
- _ssn.sync()
-
- # XXX: we're ignoring acks that get lost when disconnected
- for m in messages:
- ssn.acked.remove(m)
- if ssn.transactional:
- _ssn.acked.append(m)
-
- if ssn.committing:
- _ssn.tx_commit(sync=True)
- # XXX: need to kill syncs
- _ssn.sync()
- del _ssn.acked[:]
- ssn.committing = False
- ssn.committed = True
- ssn.aborting = False
- ssn.aborted = False
+ messages = [m for m in ssn.acked if m not in sst.acked]
+ if messages:
+ # XXX: we're ignoring acks that get lost when disconnected,
+ # could we deal this via some message-id based purge?
+ ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
+ for range in ids:
+ sst.executed.add_range(range)
+ sst.write_op(SessionCompleted(sst.executed))
+ def ack_ack():
+ for m in messages:
+ ssn.acked.remove(m)
+ if not ssn.transactional:
+ sst.acked.remove(m)
+ sst.write_cmd(MessageAccept(ids, sync=True), ack_ack)
+ sst.acked.extend(messages)
+
+ if ssn.committing and not sst.committing:
+ def commit_ok():
+ del sst.acked[:]
+ ssn.committing = False
+ ssn.committed = True
+ ssn.aborting = False
+ ssn.aborted = False
+ sst.write_cmd(TxCommit(sync=True), commit_ok)
+ sst.committing = True
+
+ if ssn.aborting and not sst.aborting:
+ sst.aborting = True
+ def do_rb():
+ messages = sst.acked + ssn.unacked + ssn.incoming
+ ids = RangedSet(*[m._transfer_id for m in messages])
+ for range in ids:
+ sst.executed.add_range(range)
+ sst.write_op(SessionCompleted(sst.executed))
+ sst.write_cmd(MessageRelease(ids))
+ sst.write_cmd(TxRollback(sync=True), do_rb_ok)
+
+ def do_rb_ok():
+ del ssn.incoming[:]
+ del ssn.unacked[:]
+ del sst.acked[:]
+
+ for rcv in ssn.receivers:
+ rcv.impending = rcv.received
+ rcv.returned = rcv.received
+ # XXX: do we need to update granted here as well?
+
+ for rcv in ssn.receivers:
+ self.process_receiver(rcv)
+
+ ssn.aborting = False
+ ssn.aborted = True
+ ssn.committing = False
+ ssn.committed = False
+ sst.aborting = False
- if ssn.aborting:
for rcv in ssn.receivers:
- _ssn.message_stop(rcv.destination)
- _ssn.sync()
-
- messages = _ssn.acked + ssn.unacked + ssn.incoming
- ids = RangedSet(*[m._transfer_id for m in messages])
- for range in ids:
- _ssn.receiver._completed.add_range(range)
- _ssn.channel.session_completed(_ssn.receiver._completed)
- _ssn.message_release(ids)
- _ssn.tx_rollback(sync=True)
- _ssn.sync()
-
- del ssn.incoming[:]
- del ssn.unacked[:]
- del _ssn.acked[:]
-
- for rcv in ssn.receivers:
- rcv.impending = rcv.received
- rcv.returned = rcv.received
- # XXX: do we need to update granted here as well?
-
- for rcv in ssn.receivers:
- self.process_receiver(rcv)
-
- ssn.aborting = False
- ssn.aborted = True
- ssn.committing = False
- ssn.committed = False
+ sst.write_cmd(MessageStop(rcv.destination))
+ sst.write_cmd(ExecutionSync(sync=True), do_rb)
def grant(self, rcv):
- _ssn = self._attachments[rcv.session]
+ sst = self._attachments[rcv.session]
_rcv = self.link_in(rcv)
+ if _rcv is None or _rcv.draining:
+ return
if rcv.granted is UNLIMITED:
if rcv.impending is UNLIMITED:
@@ -343,29 +566,30 @@
delta = max(rcv.granted, rcv.received) - rcv.impending
if delta is UNLIMITED:
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value)
+ sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value))
+ sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, UNLIMITED.value))
rcv.impending = UNLIMITED
elif delta > 0:
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta)
+ sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value))
+ sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, delta))
rcv.impending += delta
elif delta < 0:
+ _rcv.draining = True
+ def flush_stop_cmplt():
+ rcv.impending = rcv.received
+ _rcv.draining = False
+ self.grant(rcv)
if rcv.drain:
- _ssn.message_flush(rcv.destination, sync=True)
+ sst.write_cmd(MessageFlush(rcv.destination, sync=True), flush_stop_cmplt)
else:
- _ssn.message_stop(rcv.destination, sync=True)
- # XXX: need to kill syncs
- _ssn.sync()
- rcv.impending = rcv.received
- self.grant(rcv)
+ sst.write_cmd(MessageStop(rcv.destination, sync=True), flush_stop_cmplt)
def process_receiver(self, rcv):
if rcv.closed: return
self.grant(rcv)
def send(self, snd, msg):
- _ssn = self._attachments[snd.session]
+ sst = self._attachments[snd.session]
_snd = self.link_out(snd)
# XXX: what if subject is specified for a normal queue?
@@ -375,16 +599,16 @@
rk = _snd._routing_key
# XXX: do we need to query to figure out how to create the reply-to interoperably?
if msg.reply_to:
- rt = _ssn.reply_to(*parse_addr(msg.reply_to))
+ rt = ReplyTo(*parse_addr(msg.reply_to))
else:
rt = None
- dp = _ssn.delivery_properties(routing_key=rk)
- mp = _ssn.message_properties(message_id=msg.id,
- user_id=msg.user_id,
- reply_to=rt,
- correlation_id=msg.correlation_id,
- content_type=msg.content_type,
- application_headers=msg.properties)
+ dp = DeliveryProperties(routing_key=rk)
+ mp = MessageProperties(message_id=msg.id,
+ user_id=msg.user_id,
+ reply_to=rt,
+ correlation_id=msg.correlation_id,
+ content_type=msg.content_type,
+ application_headers=msg.properties)
if msg.subject is not None:
if mp.application_headers is None:
mp.application_headers = {}
@@ -397,37 +621,43 @@
dp.delivery_mode = delivery_mode.persistent
enc, dec = get_codec(msg.content_type)
body = enc(msg.content)
- _ssn.message_transfer(destination=_snd._exchange,
- message=Message010(dp, mp, body),
- sync=True)
- log.debug("SENT [%s] %s", snd.session, msg)
- # XXX: really need to make this async so that we don't give up the lock
- _ssn.sync()
- # XXX: should we log the ack somehow too?
- snd.acked += 1
+ def msg_acked():
+ # XXX: should we log the ack somehow too?
+ snd.acked += 1
+ m = snd.session.outgoing.pop(0)
+ sst.outgoing_idx -= 1
+ assert msg == m
+ sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
+ payload=body, sync=True), msg_acked)
+
+ def do_message_transfer(self, xfr):
+ sst = self.get_sst(xfr)
+ ssn = sst.session
- @synchronized
- def _message_transfer(self, ssn, cmd):
- m = Message010(cmd.payload)
- m.headers = cmd.headers
- m.id = cmd.id
- msg = self._decode(m)
- rcv = ssn.receivers[int(cmd.destination)]
+ msg = self._decode(xfr)
+ rcv = ssn.receivers[int(xfr.destination)]
msg._receiver = rcv
if rcv.impending is not UNLIMITED:
- assert rcv.received < rcv.impending
+ assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
rcv.received += 1
log.debug("RECV [%s] %s", ssn, msg)
ssn.incoming.append(msg)
self.connection._waiter.notifyAll()
return INCOMPLETE
- def _decode(self, message):
- dp = message.get("delivery_properties")
- mp = message.get("message_properties")
+ def _decode(self, xfr):
+ dp = EMPTY_DP
+ mp = EMPTY_MP
+
+ for h in xfr.headers:
+ if isinstance(h, DeliveryProperties):
+ dp = h
+ elif isinstance(h, MessageProperties):
+ mp = h
+
ap = mp.application_headers
enc, dec = get_codec(mp.content_type)
- content = dec(message.body)
+ content = dec(xfr.payload)
msg = Message(content)
msg.id = mp.message_id
if ap is not None:
@@ -440,5 +670,5 @@
msg.durable = dp.delivery_mode == delivery_mode.persistent
msg.properties = mp.application_headers
msg.content_type = mp.content_type
- msg._transfer_id = message.id
+ msg._transfer_id = xfr.id
return msg
Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=818075&r1=818074&r2=818075&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Wed Sep 23 12:30:49 2009
@@ -77,7 +77,8 @@
"""
@static
- def open(host, port=None):
+ def open(host, port=None, username="guest", password="guest",
+ mechanism="PLAIN", heartbeat=None):
"""
Creates an AMQP connection and connects it to the given host and port.
@@ -88,11 +89,12 @@
@rtype: Connection
@return: a connected Connection
"""
- conn = Connection(host, port)
+ conn = Connection(host, port, username, password, mechanism, heartbeat)
conn.connect()
return conn
- def __init__(self, host, port=None):
+ def __init__(self, host, port=None, username="guest", password="guest",
+ mechanism="PLAIN", heartbeat=None):
"""
Creates a connection. A newly created connection must be connected
with the Connection.connect() method before it can be started.
@@ -106,6 +108,11 @@
"""
self.host = host
self.port = default(port, AMQP_PORT)
+ self.username = username
+ self.password = password
+ self.mechanism = mechanism
+ self.heartbeat = heartbeat
+
self.started = False
self.id = str(uuid4())
self.session_counter = 0
Added: qpid/trunk/qpid/python/qpid/selector.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/selector.py?rev=818075&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid/selector.py (added)
+++ qpid/trunk/qpid/python/qpid/selector.py Wed Sep 23 12:30:49 2009
@@ -0,0 +1,157 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import atexit, os, time
+from select import select
+from threading import Thread, Lock
+
+class Acceptor:
+
+ def __init__(self, sock, handler):
+ self.sock = sock
+ self.handler = handler
+
+ def fileno(self):
+ return self.sock.fileno()
+
+ def reading(self):
+ return True
+
+ def writing(self):
+ return False
+
+ def readable(self):
+ sock, addr = self.sock.accept()
+ self.handler(sock)
+
+class Sink:
+
+ def __init__(self, fd):
+ self.fd = fd
+
+ def fileno(self):
+ return self.fd
+
+ def reading(self):
+ return True
+
+ def readable(self):
+ os.read(self.fd, 65536)
+
+ def __repr__(self):
+ return "Sink(%r)" % self.fd
+
+class Selector:
+
+ lock = Lock()
+ DEFAULT = None
+
+ @staticmethod
+ def default():
+ Selector.lock.acquire()
+ try:
+ if Selector.DEFAULT is None:
+ sel = Selector()
+ atexit.register(sel.stop)
+ sel.start()
+ Selector.DEFAULT = sel
+ return Selector.DEFAULT
+ finally:
+ Selector.lock.release()
+
+ def __init__(self):
+ self.selectables = set()
+ self.reading = set()
+ self.writing = set()
+ self.wait_fd, self.wakeup_fd = os.pipe()
+ self.reading.add(Sink(self.wait_fd))
+ self.stopped = False
+ self.thread = None
+
+ def wakeup(self):
+ while True:
+ select([], [self.wakeup_fd], [])
+ if os.write(self.wakeup_fd, "\0") > 0:
+ break
+
+ def register(self, selectable):
+ self.selectables.add(selectable)
+ self.modify(selectable)
+
+ def _update(self, selectable):
+ if selectable.reading():
+ self.reading.add(selectable)
+ else:
+ self.reading.discard(selectable)
+ if selectable.writing():
+ self.writing.add(selectable)
+ else:
+ self.writing.discard(selectable)
+ return selectable.timing()
+
+ def modify(self, selectable):
+ self._update(selectable)
+ self.wakeup()
+
+ def unregister(self, selectable):
+ self.reading.discard(selectable)
+ self.writing.discard(selectable)
+ self.selectables.discard(selectable)
+ self.wakeup()
+
+ def start(self):
+ self.stopped = False
+ self.thread = Thread(target=self.run)
+ self.thread.setDaemon(True)
+ self.thread.start();
+
+ def run(self):
+ while not self.stopped:
+ wakeup = None
+ for sel in self.selectables.copy():
+ t = self._update(sel)
+ if t is not None:
+ if wakeup is None:
+ wakeup = t
+ else:
+ wakeup = min(wakeup, t)
+
+ if wakeup is None:
+ timeout = None
+ else:
+ timeout = max(0, wakeup - time.time())
+
+ rd, wr, ex = select(self.reading, self.writing, (), timeout)
+
+ for sel in wr:
+ sel.writeable()
+
+ for sel in rd:
+ sel.readable()
+
+ now = time.time()
+ for sel in self.selectables.copy():
+ w = sel.timing()
+ if w is not None and now > w:
+ sel.timeout()
+
+ def stop(self, timeout=None):
+ self.stopped = True
+ self.wakeup()
+ self.thread.join(timeout)
+ self.thread = None
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org