You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/10/12 01:22:20 UTC
svn commit: r824198 [9/9] - in /qpid/branches/java-network-refactor: ./
qpid/cpp/ qpid/cpp/bindings/qmf/ qpid/cpp/bindings/qmf/python/
qpid/cpp/bindings/qmf/ruby/ qpid/cpp/bindings/qmf/tests/
qpid/cpp/boost-1.32-support/ qpid/cpp/etc/ qpid/cpp/examples...
Modified: qpid/branches/java-network-refactor/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid/driver.py?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid/driver.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid/driver.py Sun Oct 11 23:22:08 2009
@@ -17,25 +17,23 @@
# under the License.
#
-import compat, connection, socket, sys, time
+import address, compat, connection, socket, struct, sys, time
from concurrency import synchronized
-from datatypes import RangedSet, Message as Message010
-from exceptions import Timeout
+from datatypes import RangedSet, Serial
+from exceptions import Timeout, VersionError
+from framing import OpEncoder, SegmentEncoder, FrameEncoder, FrameDecoder, SegmentDecoder, OpDecoder
from logging import getLogger
from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED
-from ops import delivery_mode
-from session import Client, INCOMPLETE, SessionDetached
+from ops import *
+from selector import Selector
from threading import Condition, Thread
from util import connect
log = getLogger("qpid.messaging")
-def parse_addr(address):
- parts = address.split("/", 1)
- if len(parts) == 1:
- return parts[0], None
- else:
- return parts[0], parts[i1]
+def addr2reply_to(addr):
+ name, subject, options = address.parse(addr)
+ return ReplyTo(name, subject)
def reply_to2addr(reply_to):
if reply_to.routing_key is None:
@@ -50,287 +48,617 @@
def __init__(self, target):
self.target = target
+# XXX
+
DURABLE_DEFAULT=True
+# XXX
+
FILTER_DEFAULTS = {
"topic": Pattern("*")
}
-def delegate(handler, session):
- class Delegate(Client):
+# XXX
+
+CLIENT_PROPERTIES = {"product": "qpid python client",
+ "version": "development",
+ "platform": os.name,
+ "qpid.client_process": os.path.basename(sys.argv[0]),
+ "qpid.client_pid": os.getpid(),
+ "qpid.client_ppid": os.getppid()}
+
+def noop(): pass
+
+class SessionState:
+
+ def __init__(self, driver, session, name, channel):
+ self.driver = driver
+ self.session = session
+ self.name = name
+ self.channel = channel
+ self.detached = False
+ self.committing = False
+ self.aborting = False
+
+ # sender state
+ self.sent = Serial(0)
+ self.acknowledged = RangedSet()
+ self.completions = {}
+ self.min_completion = self.sent
+ self.max_completion = self.sent
+ self.results = {}
+
+ # receiver state
+ self.received = None
+ self.executed = RangedSet()
+
+ # XXX: need to periodically exchange completion/known_completion
+
+ def write_query(self, query, handler):
+ id = self.sent
+ query.sync = True
+ self.write_cmd(query, lambda: handler(self.results.pop(id)))
+
+ def write_cmd(self, cmd, completion=noop):
+ if self.detached:
+ raise Exception("detached")
+ cmd.id = self.sent
+ self.sent += 1
+ self.completions[cmd.id] = completion
+ self.max_completion = cmd.id
+ self.write_op(cmd)
+
+ def write_op(self, op):
+ op.channel = self.channel
+ self.driver.write_op(op)
- def message_transfer(self, cmd):
- return handler._message_transfer(session, cmd)
- return Delegate
+# XXX
+HEADER="!4s4B"
+
+EMPTY_DP = DeliveryProperties()
+EMPTY_MP = MessageProperties()
class Driver:
def __init__(self, connection):
self.connection = connection
self._lock = self.connection._lock
- self._wakeup_cond = Condition()
- self._socket = None
- self._conn = None
+
+ self._selector = Selector.default()
+ self.reset()
+
+ def reset(self):
+ self._opening = False
+ self._closing = False
self._connected = False
self._attachments = {}
- self._modcount = self.connection._modcount
- self.thread = Thread(target=self.run)
- self.thread.setDaemon(True)
- # XXX: need to figure out how to join on this thread
+ self._channel_max = 65536
+ self._channels = 0
+ self._sessions = {}
+
+ self._socket = None
+ self._buf = ""
+ self._hdr = ""
+ self._op_enc = OpEncoder()
+ self._seg_enc = SegmentEncoder()
+ self._frame_enc = FrameEncoder()
+ self._frame_dec = FrameDecoder()
+ self._seg_dec = SegmentDecoder()
+ self._op_dec = OpDecoder()
+ self._timeout = None
+
+ for ssn in self.connection.sessions.values():
+ for m in ssn.acked + ssn.unacked + ssn.incoming:
+ m._transfer_id = None
+ for snd in ssn.senders:
+ snd.linked = False
+ for rcv in ssn.receivers:
+ rcv.impending = rcv.received
+ rcv.linked = False
+
+ @synchronized
def wakeup(self):
- self._wakeup_cond.acquire()
- try:
- self._wakeup_cond.notifyAll()
- finally:
- self._wakeup_cond.release()
+ self.dispatch()
+ self._selector.wakeup()
def start(self):
- self.thread.start()
+ self._selector.register(self)
+
+ def fileno(self):
+ return self._socket.fileno()
+
+ @synchronized
+ def reading(self):
+ return self._socket is not None
+
+ @synchronized
+ def writing(self):
+ return self._socket is not None and self._buf
+
+ @synchronized
+ def timing(self):
+ return self._timeout
+
+ @synchronized
+ def readable(self):
+ error = None
+ recoverable = False
+ try:
+ data = self._socket.recv(64*1024)
+ if data:
+ log.debug("READ: %r", data)
+ else:
+ log.debug("ABORTED: %s", self._socket.getpeername())
+ error = "connection aborted"
+ recoverable = True
+ except socket.error, e:
+ error = e
+ recoverable = True
- def run(self):
- while True:
- self._wakeup_cond.acquire()
+ if not error:
try:
- if self.connection._modcount <= self._modcount:
- self._wakeup_cond.wait(10)
- finally:
- self._wakeup_cond.release()
- self.dispatch(self.connection._modcount)
+ if len(self._hdr) < 8:
+ r = 8 - len(self._hdr)
+ self._hdr += data[:r]
+ data = data[r:]
+
+ if len(self._hdr) == 8:
+ self.do_header(self._hdr)
+
+ self._frame_dec.write(data)
+ self._seg_dec.write(*self._frame_dec.read())
+ self._op_dec.write(*self._seg_dec.read())
+ for op in self._op_dec.read():
+ self.assign_id(op)
+ log.debug("RCVD: %r", op)
+ op.dispatch(self)
+ except VersionError, e:
+ error = e
+ except:
+ msg = compat.format_exc()
+ error = msg
+
+ if error:
+ self._error(error, recoverable)
+ else:
+ self.dispatch()
+
+ self.connection._waiter.notifyAll()
+
+ def assign_id(self, op):
+ if isinstance(op, Command):
+ sst = self.get_sst(op)
+ op.id = sst.received
+ sst.received += 1
@synchronized
- def dispatch(self, modcount):
+ def writeable(self):
try:
- if self._conn is None and self.connection._connected:
+ n = self._socket.send(self._buf)
+ log.debug("SENT: %r", self._buf[:n])
+ self._buf = self._buf[n:]
+ except socket.error, e:
+ self._error(e, True)
+ self.connection._waiter.notifyAll()
+
+ @synchronized
+ def timeout(self):
+ log.warn("retrying ...")
+ self.dispatch()
+ self.connection._waiter.notifyAll()
+
+ def _error(self, err, recoverable):
+ if self._socket is not None:
+ self._socket.close()
+ self.reset()
+ if recoverable and self.connection.reconnect:
+ self._timeout = time.time() + 3
+ log.warn("recoverable error: %s" % err)
+ log.warn("sleeping 3 seconds")
+ else:
+ self.connection.error = (err,)
+
+ def write_op(self, op):
+ log.debug("SENT: %r", op)
+ self._op_enc.write(op)
+ self._seg_enc.write(*self._op_enc.read())
+ self._frame_enc.write(*self._seg_enc.read())
+ self._buf += self._frame_enc.read()
+
+ def do_header(self, hdr):
+ cli_major = 0; cli_minor = 10
+ magic, _, _, major, minor = struct.unpack(HEADER, hdr)
+ if major != cli_major or minor != cli_minor:
+ raise VersionError("client: %s-%s, server: %s-%s" %
+ (cli_major, cli_minor, major, minor))
+
+ def do_connection_start(self, start):
+ # XXX: should we use some sort of callback for this?
+ r = "\0%s\0%s" % (self.connection.username, self.connection.password)
+ m = self.connection.mechanism
+ self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES,
+ mechanism=m, response=r))
+
+ def do_connection_tune(self, tune):
+ # XXX: is heartbeat protocol specific?
+ if tune.channel_max is not None:
+ self.channel_max = tune.channel_max
+ self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat,
+ channel_max=self.channel_max))
+ self.write_op(ConnectionOpen())
+
+ def do_connection_open_ok(self, open_ok):
+ self._connected = True
+
+ def connection_heartbeat(self, hrt):
+ self.write_op(ConnectionHeartbeat())
+
+ def do_connection_close(self, close):
+ self.write_op(ConnectionCloseOk())
+ if close.reply_code != close_code.normal:
+ self.connection.error = (close.reply_code, close.reply_text)
+ # XXX: should we do a half shutdown on the socket here?
+ # XXX: we really need to test this, we may end up reporting a
+ # connection abort after this, if we were to do a shutdown on read
+ # and stop reading, then we wouldn't report the abort, that's
+ # probably the right thing to do
+
+ def do_connection_close_ok(self, close_ok):
+ self._socket.close()
+ self.reset()
+
+ def do_session_attached(self, atc):
+ pass
+
+ def do_session_command_point(self, cp):
+ sst = self.get_sst(cp)
+ sst.received = cp.command_id
+
+ def do_session_completed(self, sc):
+ sst = self.get_sst(sc)
+ for r in sc.commands:
+ sst.acknowledged.add(r.lower, r.upper)
+
+ if not sc.commands.empty():
+ while sst.min_completion in sc.commands:
+ if sst.completions.has_key(sst.min_completion):
+ sst.completions.pop(sst.min_completion)()
+ sst.min_completion += 1
+
+ def session_known_completed(self, kcmp):
+ sst = self.get_sst(kcmp)
+ executed = RangedSet()
+ for e in sst.executed.ranges:
+ for ke in kcmp.ranges:
+ if e.lower in ke and e.upper in ke:
+ break
+ else:
+ executed.add_range(e)
+ sst.executed = completed
+
+ def do_session_flush(self, sf):
+ sst = self.get_sst(sf)
+ if sf.expected:
+ if sst.received is None:
+ exp = None
+ else:
+ exp = RangedSet(sst.received)
+ sst.write_op(SessionExpected(exp))
+ if sf.confirmed:
+ sst.write_op(SessionConfirmed(sst.executed))
+ if sf.completed:
+ sst.write_op(SessionCompleted(sst.executed))
+
+ def do_execution_result(self, er):
+ sst = self.get_sst(er)
+ sst.results[er.command_id] = er.value
+
+ def do_execution_exception(self, ex):
+ sst = self.get_sst(ex)
+ sst.session.error = (ex,)
+
+ def dispatch(self):
+ try:
+ if self._socket is None and self.connection._connected and not self._opening:
self.connect()
- elif self._conn is not None and not self.connection._connected:
+ elif self._socket is not None and not self.connection._connected and not self._closing:
self.disconnect()
- if self._conn is not None:
+ if self._connected and not self._closing:
for ssn in self.connection.sessions.values():
self.attach(ssn)
self.process(ssn)
-
- exi = None
except:
- exi = sys.exc_info()
-
- if exi:
msg = compat.format_exc()
- recoverable = ["aborted", "Connection refused", "SessionDetached", "Connection reset by peer",
- "Bad file descriptor", "start timed out", "Broken pipe"]
- for r in recoverable:
- if self.connection.reconnect and r in msg:
- print "waiting to retry"
- self.reset()
- time.sleep(3)
- print "retrying..."
- return
- else:
- self.connection.error = (msg,)
-
- self._modcount = modcount
- self.connection._waiter.notifyAll()
+ self.connection.error = (msg,)
def connect(self):
- if self._conn is not None:
- return
try:
+ # XXX: should make this non blocking
self._socket = connect(self.connection.host, self.connection.port)
+ self._timeout = None
except socket.error, e:
- raise ConnectError(e)
- self._conn = connection.Connection(self._socket)
- try:
- self._conn.start(timeout=10)
- self._connected = True
- except connection.VersionError, e:
- raise ConnectError(e)
- except Timeout:
- print "start timed out"
- raise ConnectError("start timed out")
+ if self.connection.reconnect:
+ self._error(e, True)
+ return
+ else:
+ raise e
+ self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
+ self._opening = True
def disconnect(self):
- self._conn.close()
- self.reset()
-
- def reset(self):
- self._conn = None
- self._connected = False
- self._attachments.clear()
- for ssn in self.connection.sessions.values():
- for m in ssn.acked + ssn.unacked + ssn.incoming:
- m._transfer_id = None
- for rcv in ssn.receivers:
- rcv.impending = rcv.received
-
- def connected(self):
- return self._conn is not None
+ self.write_op(ConnectionClose(close_code.normal))
+ self._closing = True
def attach(self, ssn):
- _ssn = self._attachments.get(ssn)
- if _ssn is None:
- _ssn = self._conn.session(ssn.name, delegate=delegate(self, ssn))
- _ssn.auto_sync = False
- _ssn.invoke_lock = self._lock
- _ssn.lock = self._lock
- _ssn.condition = self.connection._condition
+ sst = self._attachments.get(ssn)
+ if sst is None and not ssn.closed:
+ for i in xrange(0, self.channel_max):
+ if not self._sessions.has_key(i):
+ ch = i
+ break
+ else:
+ raise RuntimeError("all channels used")
+ sst = SessionState(self, ssn, ssn.name, ch)
+ sst.write_op(SessionAttach(name=ssn.name))
+ sst.write_op(SessionCommandPoint(sst.sent, 0))
+ sst.outgoing_idx = 0
+ sst.acked = []
if ssn.transactional:
- # XXX: adding an attribute to qpid.session.Session
- _ssn.acked = []
- _ssn.tx_select()
- self._attachments[ssn] = _ssn
+ sst.write_cmd(TxSelect())
+ self._attachments[ssn] = sst
+ self._sessions[sst.channel] = sst
for snd in ssn.senders:
self.link_out(snd)
for rcv in ssn.receivers:
self.link_in(rcv)
- if ssn.closing:
- _ssn.close()
- del self._attachments[ssn]
- ssn.closed = True
-
- def _exchange_query(self, ssn, address):
- # XXX: auto sync hack is to avoid deadlock on future
- result = ssn.exchange_query(name=address, sync=True)
- ssn.sync()
- return result.get()
+ if sst is not None and ssn.closing and not sst.detached:
+ sst.detached = True
+ sst.write_op(SessionDetach(name=ssn.name))
+
+ def get_sst(self, op):
+ return self._sessions[op.channel]
+
+ def do_session_detached(self, dtc):
+ sst = self._sessions.pop(dtc.channel)
+ ssn = sst.session
+ del self._attachments[ssn]
+ ssn.closed = True
+
+ def do_session_detach(self, dtc):
+ sst = self.get_sst(dtc)
+ sst.write_op(SessionDetached(name=dtc.name))
+ self.do_session_detached(dtc)
def link_out(self, snd):
- _ssn = self._attachments[snd.session]
+ sst = self._attachments.get(snd.session)
_snd = self._attachments.get(snd)
- if _snd is None:
+ if _snd is None and not snd.closing and not snd.closed:
_snd = Attachment(snd)
- node, _snd._subject = parse_addr(snd.target)
- result = self._exchange_query(_ssn, node)
- if result.not_found:
- # XXX: should check 'create' option
- _ssn.queue_declare(queue=node, durable=DURABLE_DEFAULT, sync=True)
- _ssn.sync()
- _snd._exchange = ""
- _snd._routing_key = node
- else:
- _snd._exchange = node
- _snd._routing_key = _snd._subject
+
+ try:
+ _snd.name, _snd.subject, _snd.options = address.parse(snd.target)
+ except address.LexError, e:
+ snd.error = e
+ snd.closed = True
+ return
+ except address.ParseError, e:
+ snd.error = e
+ snd.closed = True
+ return
+
+ # XXX: subject
+ if _snd.options is None:
+ _snd.options = {}
+
+ def do_link():
+ snd.linked = True
+
+ def do_queue_q(result):
+ if sst.detached:
+ return
+
+ if result.queue:
+ _snd._exchange = ""
+ _snd._routing_key = _snd.name
+ do_link()
+ else:
+ snd.error = ("no such queue: %s" % _snd.name,)
+ del self._attachments[snd]
+ snd.closed = True
+
+ def do_exchange_q(result):
+ if sst.detached:
+ return
+
+ if result.not_found:
+ if _snd.options.get("create") in ("always", "receiver"):
+ sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT))
+ _snd._exchange = ""
+ _snd._routing_key = _snd.name
+ else:
+ sst.write_query(QueueQuery(queue=_snd.name), do_queue_q)
+ return
+ else:
+ _snd._exchange = _snd.name
+ _snd._routing_key = _snd.subject
+ do_link()
+
+ sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q)
self._attachments[snd] = _snd
- if snd.closed:
+ if snd.closing and not snd.closed:
del self._attachments[snd]
- return None
- else:
- return _snd
+ snd.closed = True
def link_in(self, rcv):
- _ssn = self._attachments[rcv.session]
+ sst = self._attachments.get(rcv.session)
_rcv = self._attachments.get(rcv)
- if _rcv is None:
+ if _rcv is None and not rcv.closing and not rcv.closed:
_rcv = Attachment(rcv)
- result = self._exchange_query(_ssn, rcv.source)
- if result.not_found:
- _rcv._queue = rcv.source
- # XXX: should check 'create' option
- _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT)
- else:
- _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
- _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)
- if rcv.filter is None:
- f = FILTER_DEFAULTS[result.type]
+ _rcv.canceled = False
+ _rcv.draining = False
+
+ try:
+ _rcv.name, _rcv.subject, _rcv.options = address.parse(rcv.source)
+ except address.LexError, e:
+ rcv.error = e
+ rcv.closed = True
+ return
+ except address.ParseError, e:
+ rcv.error = e
+ rcv.closed = True
+ return
+
+ # XXX: subject
+ if _rcv.options is None:
+ _rcv.options = {}
+
+ def do_link():
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
+ sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+ rcv.linked = True
+
+ def do_queue_q(result):
+ if sst.detached:
+ return
+ if result.queue:
+ _rcv._queue = _rcv.name
+ do_link()
else:
- f = rcv.filter
- f._bind(_ssn, rcv.source, _rcv._queue)
- _ssn.message_subscribe(queue=_rcv._queue, destination=rcv.destination)
- _ssn.message_set_flow_mode(rcv.destination, _ssn.flow_mode.credit, sync=True)
+ rcv.error = ("no such queue: %s" % _rcv.name,)
+ del self._attachments[rcv]
+ rcv.closed = True
+
+ def do_exchange_q(result):
+ if sst.detached:
+ return
+ if result.not_found:
+ if _rcv.options.get("create") in ("always", "receiver"):
+ _rcv._queue = _rcv.name
+ sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
+ else:
+ sst.write_query(QueueQuery(queue=_rcv.name), do_queue_q)
+ return
+ else:
+ _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
+ sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
+ filter = _rcv.options.get("filter")
+ if _rcv.subject is None and filter is None:
+ f = FILTER_DEFAULTS[result.type]
+ elif _rcv.subject and filter:
+ # XXX
+ raise Exception("can't supply both subject and filter")
+ elif _rcv.subject:
+ # XXX
+ from messaging import Pattern
+ f = Pattern(_rcv.subject)
+ else:
+ f = filter
+ f._bind(sst, _rcv.name, _rcv._queue)
+ do_link()
+ sst.write_query(ExchangeQuery(name=_rcv.name), do_exchange_q)
self._attachments[rcv] = _rcv
- # XXX: need to kill syncs
- _ssn.sync()
- if rcv.closing:
- _ssn.message_cancel(rcv.destination, sync=True)
- # XXX: need to kill syncs
- _ssn.sync()
- del self._attachments[rcv]
- rcv.closed = True
- return None
- else:
- return _rcv
+ if rcv.closing and not rcv.closed:
+ if rcv.linked:
+ if not _rcv.canceled:
+ def close_rcv():
+ del self._attachments[rcv]
+ rcv.closed = True
+ sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
+ _rcv.canceled = True
+ else:
+ rcv.closed = True
def process(self, ssn):
if ssn.closing: return
- _ssn = self._attachments[ssn]
+ sst = self._attachments[ssn]
- while ssn.outgoing:
- msg = ssn.outgoing[0]
+ while sst.outgoing_idx < len(ssn.outgoing):
+ msg = ssn.outgoing[sst.outgoing_idx]
snd = msg._sender
- self.send(snd, msg)
- ssn.outgoing.pop(0)
+ # XXX: should check for sender error here
+ _snd = self._attachments.get(snd)
+ if _snd and snd.linked:
+ self.send(snd, msg)
+ sst.outgoing_idx += 1
+ else:
+ break
for rcv in ssn.receivers:
self.process_receiver(rcv)
if ssn.acked:
- messages = ssn.acked[:]
- ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
- for range in ids:
- _ssn.receiver._completed.add_range(range)
- ch = _ssn.channel
- if ch is None:
- raise SessionDetached()
- ch.session_completed(_ssn.receiver._completed)
- _ssn.message_accept(ids, sync=True)
- # XXX: really need to make this async so that we don't give up the lock
- _ssn.sync()
-
- # XXX: we're ignoring acks that get lost when disconnected
- for m in messages:
- ssn.acked.remove(m)
- if ssn.transactional:
- _ssn.acked.append(m)
-
- if ssn.committing:
- _ssn.tx_commit(sync=True)
- # XXX: need to kill syncs
- _ssn.sync()
- del _ssn.acked[:]
- ssn.committing = False
- ssn.committed = True
- ssn.aborting = False
- ssn.aborted = False
+ messages = [m for m in ssn.acked if m not in sst.acked]
+ if messages:
+ # XXX: we're ignoring acks that get lost when disconnected,
+ # could we deal this via some message-id based purge?
+ ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
+ for range in ids:
+ sst.executed.add_range(range)
+ sst.write_op(SessionCompleted(sst.executed))
+ def ack_ack():
+ for m in messages:
+ ssn.acked.remove(m)
+ if not ssn.transactional:
+ sst.acked.remove(m)
+ sst.write_cmd(MessageAccept(ids, sync=True), ack_ack)
+ sst.acked.extend(messages)
+
+ if ssn.committing and not sst.committing:
+ def commit_ok():
+ del sst.acked[:]
+ ssn.committing = False
+ ssn.committed = True
+ ssn.aborting = False
+ ssn.aborted = False
+ sst.write_cmd(TxCommit(sync=True), commit_ok)
+ sst.committing = True
+
+ if ssn.aborting and not sst.aborting:
+ sst.aborting = True
+ def do_rb():
+ messages = sst.acked + ssn.unacked + ssn.incoming
+ ids = RangedSet(*[m._transfer_id for m in messages])
+ for range in ids:
+ sst.executed.add_range(range)
+ sst.write_op(SessionCompleted(sst.executed))
+ sst.write_cmd(MessageRelease(ids))
+ sst.write_cmd(TxRollback(sync=True), do_rb_ok)
+
+ def do_rb_ok():
+ del ssn.incoming[:]
+ del ssn.unacked[:]
+ del sst.acked[:]
+
+ for rcv in ssn.receivers:
+ rcv.impending = rcv.received
+ rcv.returned = rcv.received
+ # XXX: do we need to update granted here as well?
+
+ for rcv in ssn.receivers:
+ self.process_receiver(rcv)
+
+ ssn.aborting = False
+ ssn.aborted = True
+ ssn.committing = False
+ ssn.committed = False
+ sst.aborting = False
- if ssn.aborting:
for rcv in ssn.receivers:
- _ssn.message_stop(rcv.destination)
- _ssn.sync()
-
- messages = _ssn.acked + ssn.unacked + ssn.incoming
- ids = RangedSet(*[m._transfer_id for m in messages])
- for range in ids:
- _ssn.receiver._completed.add_range(range)
- _ssn.channel.session_completed(_ssn.receiver._completed)
- _ssn.message_release(ids)
- _ssn.tx_rollback(sync=True)
- _ssn.sync()
-
- del ssn.incoming[:]
- del ssn.unacked[:]
- del _ssn.acked[:]
-
- for rcv in ssn.receivers:
- rcv.impending = rcv.received
- rcv.returned = rcv.received
- # XXX: do we need to update granted here as well?
-
- for rcv in ssn.receivers:
- self.process_receiver(rcv)
-
- ssn.aborting = False
- ssn.aborted = True
- ssn.committing = False
- ssn.committed = False
+ sst.write_cmd(MessageStop(rcv.destination))
+ sst.write_cmd(ExecutionSync(sync=True), do_rb)
def grant(self, rcv):
- _ssn = self._attachments[rcv.session]
- _rcv = self.link_in(rcv)
+ sst = self._attachments[rcv.session]
+ _rcv = self._attachments.get(rcv)
+ if _rcv is None or not rcv.linked or _rcv.canceled or _rcv.draining:
+ return
if rcv.granted is UNLIMITED:
if rcv.impending is UNLIMITED:
@@ -343,30 +671,37 @@
delta = max(rcv.granted, rcv.received) - rcv.impending
if delta is UNLIMITED:
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value)
+ sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value))
+ sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, UNLIMITED.value))
rcv.impending = UNLIMITED
elif delta > 0:
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta)
+ sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value))
+ sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, delta))
rcv.impending += delta
- elif delta < 0:
- if rcv.drain:
- _ssn.message_flush(rcv.destination, sync=True)
- else:
- _ssn.message_stop(rcv.destination, sync=True)
- # XXX: need to kill syncs
- _ssn.sync()
- rcv.impending = rcv.received
- self.grant(rcv)
+ elif delta < 0 and not rcv.draining:
+ _rcv.draining = True
+ def do_stop():
+ rcv.impending = rcv.received
+ _rcv.draining = False
+ self.grant(rcv)
+ sst.write_cmd(MessageStop(rcv.destination, sync=True), do_stop)
+
+ if rcv.draining:
+ def do_flush():
+ rcv.impending = rcv.received
+ rcv.granted = rcv.impending
+ _rcv.draining = False
+ rcv.draining = False
+ sst.write_cmd(MessageFlush(rcv.destination, sync=True), do_flush)
+
def process_receiver(self, rcv):
if rcv.closed: return
self.grant(rcv)
def send(self, snd, msg):
- _ssn = self._attachments[snd.session]
- _snd = self.link_out(snd)
+ sst = self._attachments[snd.session]
+ _snd = self._attachments[snd]
# XXX: what if subject is specified for a normal queue?
if _snd._routing_key is None:
@@ -375,16 +710,16 @@
rk = _snd._routing_key
# XXX: do we need to query to figure out how to create the reply-to interoperably?
if msg.reply_to:
- rt = _ssn.reply_to(*parse_addr(msg.reply_to))
+ rt = addr2reply_to(msg.reply_to)
else:
rt = None
- dp = _ssn.delivery_properties(routing_key=rk)
- mp = _ssn.message_properties(message_id=msg.id,
- user_id=msg.user_id,
- reply_to=rt,
- correlation_id=msg.correlation_id,
- content_type=msg.content_type,
- application_headers=msg.properties)
+ dp = DeliveryProperties(routing_key=rk)
+ mp = MessageProperties(message_id=msg.id,
+ user_id=msg.user_id,
+ reply_to=rt,
+ correlation_id=msg.correlation_id,
+ content_type=msg.content_type,
+ application_headers=msg.properties)
if msg.subject is not None:
if mp.application_headers is None:
mp.application_headers = {}
@@ -397,37 +732,42 @@
dp.delivery_mode = delivery_mode.persistent
enc, dec = get_codec(msg.content_type)
body = enc(msg.content)
- _ssn.message_transfer(destination=_snd._exchange,
- message=Message010(dp, mp, body),
- sync=True)
- log.debug("SENT [%s] %s", snd.session, msg)
- # XXX: really need to make this async so that we don't give up the lock
- _ssn.sync()
- # XXX: should we log the ack somehow too?
- snd.acked += 1
+ def msg_acked():
+ # XXX: should we log the ack somehow too?
+ snd.acked += 1
+ m = snd.session.outgoing.pop(0)
+ sst.outgoing_idx -= 1
+ assert msg == m
+ sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
+ payload=body, sync=True), msg_acked)
+
+ def do_message_transfer(self, xfr):
+ sst = self.get_sst(xfr)
+ ssn = sst.session
- @synchronized
- def _message_transfer(self, ssn, cmd):
- m = Message010(cmd.payload)
- m.headers = cmd.headers
- m.id = cmd.id
- msg = self._decode(m)
- rcv = ssn.receivers[int(cmd.destination)]
+ msg = self._decode(xfr)
+ rcv = ssn.receivers[int(xfr.destination)]
msg._receiver = rcv
if rcv.impending is not UNLIMITED:
- assert rcv.received < rcv.impending
+ assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
rcv.received += 1
log.debug("RECV [%s] %s", ssn, msg)
ssn.incoming.append(msg)
self.connection._waiter.notifyAll()
- return INCOMPLETE
- def _decode(self, message):
- dp = message.get("delivery_properties")
- mp = message.get("message_properties")
+ def _decode(self, xfr):
+ dp = EMPTY_DP
+ mp = EMPTY_MP
+
+ for h in xfr.headers:
+ if isinstance(h, DeliveryProperties):
+ dp = h
+ elif isinstance(h, MessageProperties):
+ mp = h
+
ap = mp.application_headers
enc, dec = get_codec(mp.content_type)
- content = dec(message.body)
+ content = dec(xfr.payload)
msg = Message(content)
msg.id = mp.message_id
if ap is not None:
@@ -440,5 +780,5 @@
msg.durable = dp.delivery_mode == delivery_mode.persistent
msg.properties = mp.application_headers
msg.content_type = mp.content_type
- msg._transfer_id = message.id
+ msg._transfer_id = xfr.id
return msg
Modified: qpid/branches/java-network-refactor/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid/messaging.py?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid/messaging.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid/messaging.py Sun Oct 11 23:22:08 2009
@@ -77,7 +77,8 @@
"""
@static
- def open(host, port=None):
+ def open(host, port=None, username="guest", password="guest",
+ mechanism="PLAIN", heartbeat=None, **options):
"""
Creates an AMQP connection and connects it to the given host and port.
@@ -88,11 +89,12 @@
@rtype: Connection
@return: a connected Connection
"""
- conn = Connection(host, port)
+ conn = Connection(host, port, username, password, mechanism, heartbeat, **options)
conn.connect()
return conn
- def __init__(self, host, port=None):
+ def __init__(self, host, port=None, username="guest", password="guest",
+ mechanism="PLAIN", heartbeat=None, **options):
"""
Creates a connection. A newly created connection must be connected
with the Connection.connect() method before it can be started.
@@ -106,11 +108,16 @@
"""
self.host = host
self.port = default(port, AMQP_PORT)
+ self.username = username
+ self.password = password
+ self.mechanism = mechanism
+ self.heartbeat = heartbeat
+
self.started = False
self.id = str(uuid4())
self.session_counter = 0
self.sessions = {}
- self.reconnect = False
+ self.reconnect = options.get("reconnect", False)
self._connected = False
self._lock = RLock()
self._condition = Condition(self._lock)
@@ -230,9 +237,10 @@
self.value = value
# XXX: this should become part of the driver
- def _bind(self, ssn, exchange, queue):
- ssn.exchange_bind(exchange=exchange, queue=queue,
- binding_key=self.value.replace("*", "#"))
+ def _bind(self, sst, exchange, queue):
+ from qpid.ops import ExchangeBind
+ sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
+ binding_key=self.value.replace("*", "#")))
class SessionError(Exception):
pass
@@ -282,6 +290,7 @@
# XXX: I hate this name.
self.ack_capacity = UNLIMITED
+ self.error = None
self.closing = False
self.closed = False
@@ -302,12 +311,16 @@
def _check_error(self, exc=SessionError):
self.connection._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=SessionError):
- return self.connection._ewait(predicate, timeout, exc)
+ result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
@synchronized
- def sender(self, target):
+ def sender(self, target, **options):
"""
Creates a L{Sender} that may be used to send L{Messages<Message>}
to the specified target.
@@ -317,7 +330,7 @@
@rtype: Sender
@return: a new Sender for the specified target
"""
- sender = Sender(self, len(self.senders), target)
+ sender = Sender(self, len(self.senders), target, options)
self.senders.append(sender)
self._wakeup()
# XXX: because of the lack of waiting here we can end up getting
@@ -327,7 +340,7 @@
return sender
@synchronized
- def receiver(self, source, filter=None):
+ def receiver(self, source, **options):
"""
Creates a receiver that may be used to actively fetch or to listen
for the arrival of L{Messages<Message>} from the specified source.
@@ -337,7 +350,7 @@
@rtype: Receiver
@return: a new Receiver for the specified source
"""
- receiver = Receiver(self, len(self.receivers), source, filter,
+ receiver = Receiver(self, len(self.receivers), source, options,
self.started)
self.receivers.append(receiver)
self._wakeup()
@@ -368,8 +381,8 @@
@synchronized
def _get(self, predicate, timeout=None):
- if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing),
- timeout):
+ if self._ewait(lambda: ((self._peek(predicate) is not None) or self.closing),
+ timeout):
msg = self._pop(predicate)
if msg is not None:
msg._receiver.returned += 1
@@ -505,13 +518,18 @@
Sends outgoing messages.
"""
- def __init__(self, session, index, target):
+ def __init__(self, session, index, target, options):
self.session = session
self.index = index
self.target = target
- self.capacity = UNLIMITED
+ self.options = options
+ self.capacity = options.get("capacity", UNLIMITED)
+ self.durable = options.get("durable")
self.queued = Serial(0)
self.acked = Serial(0)
+ self.error = None
+ self.linked = False
+ self.closing = False
self.closed = False
self._lock = self.session._lock
@@ -520,9 +538,13 @@
def _check_error(self, exc=SendError):
self.session._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=SendError):
- return self.session._ewait(predicate, timeout, exc)
+ result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
@synchronized
def pending(self):
@@ -558,11 +580,16 @@
if not self.session.connection._connected or self.session.closing:
raise Disconnected()
+ self._ewait(lambda: self.linked)
+
if isinstance(object, Message):
message = object
else:
message = Message(object)
+ if message.durable is None:
+ message.durable = self.durable
+
if self.capacity is not UNLIMITED:
if self.capacity <= 0:
raise InsufficientCapacity("capacity = %s" % self.capacity)
@@ -573,15 +600,19 @@
message._sender = self
self.session.outgoing.append(message)
self.queued += 1
- mno = self.queued
self._wakeup()
if sync:
- self._ewait(lambda: self.acked >= mno)
+ self.sync()
assert message not in self.session.outgoing
@synchronized
+ def sync(self):
+ mno = self.queued
+ self._ewait(lambda: self.acked >= mno)
+
+ @synchronized
def close(self):
"""
Close the Sender.
@@ -609,21 +640,23 @@
L{listen}.
"""
- def __init__(self, session, index, source, filter, started):
+ def __init__(self, session, index, source, options, started):
self.session = session
self.index = index
self.destination = str(self.index)
self.source = source
- self.filter = filter
+ self.options = options
self.started = started
- self.capacity = UNLIMITED
+ self.capacity = options.get("capacity", UNLIMITED)
self.granted = Serial(0)
- self.drain = False
+ self.draining = False
self.impending = Serial(0)
self.received = Serial(0)
self.returned = Serial(0)
+ self.error = None
+ self.linked = False
self.closing = False
self.closed = False
self.listener = None
@@ -634,9 +667,13 @@
def _check_error(self, exc=ReceiveError):
self.session._check_error(exc)
+ if self.error:
+ raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=ReceiveError):
- return self.session._ewait(predicate, timeout, exc)
+ result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
+ self._check_error(exc)
+ return result
@synchronized
def pending(self):
@@ -680,17 +717,18 @@
@type timeout: float
@param timeout: the time to wait for a message to be available
"""
+
+ self._ewait(lambda: self.linked)
+
if self._capacity() == 0:
self.granted = self.returned + 1
self._wakeup()
self._ewait(lambda: self.impending >= self.granted)
msg = self.session._get(self._pred, timeout=timeout)
if msg is None:
- self.drain = True
- self.granted = self.received
+ self.draining = True
self._wakeup()
- self._ewait(lambda: self.impending == self.received)
- self.drain = False
+ self._ewait(lambda: not self.draining)
self._grant()
self._wakeup()
msg = self.session._get(self._pred, timeout=0)
@@ -738,7 +776,7 @@
self.closing = True
self._wakeup()
try:
- self._ewait(lambda: self.closed)
+ self.session._ewait(lambda: self.closed)
finally:
self.session.receivers.remove(self)
@@ -778,6 +816,8 @@
def get_codec(content_type):
return TYPE_CODEC[content_type]
+UNSPECIFIED = object()
+
class Message:
"""
@@ -802,7 +842,9 @@
@ivar content: the message content
"""
- def __init__(self, content=None):
+ def __init__(self, content=None, content_type=UNSPECIFIED, id=None,
+ subject=None, to=None, user_id=None, reply_to=None,
+ correlation_id=None, durable=None, properties=None):
"""
Construct a new message with the supplied content. The
content-type of the message will be automatically inferred from
@@ -810,20 +852,44 @@
@type content: str, unicode, buffer, dict, list
@param content: the message content
+
+ @type content_type: str
+ @param content_type: the content-type of the message
"""
- self.id = None
- self.subject = None
- self.user_id = None
- self.to = None
- self.reply_to = None
- self.correlation_id = None
- self.durable = False
- self.properties = {}
- self.content_type = get_type(content)
+ self.id = id
+ self.subject = subject
+ self.to = to
+ self.user_id = user_id
+ self.reply_to = reply_to
+ self.correlation_id = correlation_id
+ self.durable = durable
+ if properties is None:
+ self.properties = {}
+ else:
+ self.properties = properties
+ if content_type is UNSPECIFIED:
+ self.content_type = get_type(content)
+ else:
+ self.content_type = content_type
self.content = content
def __repr__(self):
- return "Message(%r)" % self.content
+ args = []
+ for name in ["id", "subject", "to", "user_id", "reply_to",
+ "correlation_id"]:
+ value = self.__dict__[name]
+ if value is not None: args.append("%s=%r" % (name, value))
+ for name in ["durable", "properties"]:
+ value = self.__dict__[name]
+ if value: args.append("%s=%r" % (name, value))
+ if self.content_type != get_type(self.content):
+ args.append("content_type=%r" % self.content_type)
+ if self.content is not None:
+ if args:
+ args.append("content=%r" % self.content)
+ else:
+ args.append(repr(self.content))
+ return "Message(%s)" % ", ".join(args)
__all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message",
"ConnectionError", "ConnectError", "SessionError", "Disconnected",
Modified: qpid/branches/java-network-refactor/qpid/python/qpid/ops.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid/ops.py?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid/ops.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid/ops.py Sun Oct 11 23:22:08 2009
@@ -80,7 +80,7 @@
return "%s(%s)" % (self.__class__.__name__,
", ".join(["%s=%r" % (f.name, getattr(self, f.name))
for f in self.ARGS
- if getattr(self, f.name) is not f.default]))
+ if getattr(self, f.name) != f.default]))
class Command(Compound):
UNENCODED=[Field("channel", "uint16", 0),
@@ -209,8 +209,8 @@
from qpid_config import amqp_spec as file
pclfile = "%s.ops.pcl" % file
-if False and (os.path.exists(pclfile) and
- os.path.getmtime(pclfile) > os.path.getmtime(file)):
+if os.path.exists(pclfile) and \
+ os.path.getmtime(pclfile) > os.path.getmtime(file):
f = open(pclfile, "read")
types = pickle.load(f)
f.close()
Modified: qpid/branches/java-network-refactor/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid/tests/messaging.py?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid/tests/messaging.py Sun Oct 11 23:22:08 2009
@@ -24,7 +24,7 @@
from qpid.tests import Test
from qpid.harness import Skipped
from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
- InsufficientCapacity, Message, UNLIMITED, uuid4
+ InsufficientCapacity, Message, ReceiveError, SendError, UNLIMITED, uuid4
from Queue import Queue, Empty as QueueEmpty
class Base(Test):
@@ -50,6 +50,8 @@
raise Skipped(e)
self.ssn = self.setup_session()
self.snd = self.setup_sender()
+ if self.snd is not None:
+ self.snd.durable = self.durable()
self.rcv = self.setup_receiver()
def teardown(self):
@@ -63,11 +65,12 @@
return "%s[%s, %s]" % (base, count, self.test_id)
def ping(self, ssn):
+ PING_Q = 'ping-queue {create: always}'
# send a message
- sender = ssn.sender("ping-queue")
+ sender = ssn.sender(PING_Q, durable=self.durable())
content = self.content("ping")
sender.send(content)
- receiver = ssn.receiver("ping-queue")
+ receiver = ssn.receiver(PING_Q)
msg = receiver.fetch(0)
ssn.acknowledge()
assert msg.content == content, "expected %r, got %r" % (content, msg.content)
@@ -97,16 +100,27 @@
def delay(self):
return float(self.config.defines.get("delay", "2"))
+ def get_bool(self, name):
+ return self.config.defines.get(name, "false").lower() in ("true", "yes", "1")
+
+ def durable(self):
+ return self.get_bool("durable")
+
+ def reconnect(self):
+ return self.get_bool("reconnect")
+
class SetupTests(Base):
def testOpen(self):
# XXX: need to flesh out URL support/syntax
- self.conn = Connection.open(self.broker.host, self.broker.port)
+ self.conn = Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
self.ping(self.conn.session())
def testConnect(self):
# XXX: need to flesh out URL support/syntax
- self.conn = Connection(self.broker.host, self.broker.port)
+ self.conn = Connection(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
self.conn.connect()
self.ping(self.conn.session())
@@ -121,7 +135,8 @@
class ConnectionTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port)
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
def testSessionAnon(self):
ssn1 = self.conn.session()
@@ -174,17 +189,21 @@
self.conn.close()
assert not self.conn.connected()
+ACK_Q = 'test-ack-queue {create: always}'
+
class SessionTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port)
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
def setup_session(self):
return self.conn.session()
def testSender(self):
- snd = self.ssn.sender("test-snd-queue")
- snd2 = self.ssn.sender(snd.target)
+ snd = self.ssn.sender('test-snd-queue {create: always}',
+ durable=self.durable())
+ snd2 = self.ssn.sender(snd.target, durable=self.durable())
assert snd is not snd2
snd2.close()
@@ -196,47 +215,49 @@
self.ssn.acknowledge(msg)
def testReceiver(self):
- rcv = self.ssn.receiver("test-rcv-queue")
+ rcv = self.ssn.receiver('test-rcv-queue {create: always}')
rcv2 = self.ssn.receiver(rcv.source)
assert rcv is not rcv2
rcv2.close()
content = self.content("testReceiver")
- snd = self.ssn.sender(rcv.source)
+ snd = self.ssn.sender(rcv.source, durable=self.durable())
snd.send(content)
msg = rcv.fetch(0)
assert msg.content == content
self.ssn.acknowledge(msg)
def testStart(self):
- rcv = self.ssn.receiver("test-start-queue")
+ START_Q = 'test-start-queue {create: always}'
+ rcv = self.ssn.receiver(START_Q)
assert not rcv.started
self.ssn.start()
assert rcv.started
- rcv = self.ssn.receiver("test-start-queue")
+ rcv = self.ssn.receiver(START_Q)
assert rcv.started
def testStop(self):
+ STOP_Q = 'test-stop-queue {create: always}'
self.ssn.start()
- rcv = self.ssn.receiver("test-stop-queue")
+ rcv = self.ssn.receiver(STOP_Q)
assert rcv.started
self.ssn.stop()
assert not rcv.started
- rcv = self.ssn.receiver("test-stop-queue")
+ rcv = self.ssn.receiver(STOP_Q)
assert not rcv.started
# XXX, we need a convenient way to assert that required queues are
# empty on setup, and possibly also to drain queues on teardown
def ackTest(self, acker, ack_capacity=None):
# send a bunch of messages
- snd = self.ssn.sender("test-ack-queue")
+ snd = self.ssn.sender(ACK_Q, durable=self.durable())
contents = [self.content("ackTest", i) for i in range(15)]
for c in contents:
snd.send(c)
# drain the queue, verify the messages are there and then close
# without acking
- rcv = self.ssn.receiver(snd.target)
+ rcv = self.ssn.receiver(ACK_Q)
self.drain(rcv, expected=contents)
self.ssn.close()
@@ -245,7 +266,7 @@
self.ssn = self.conn.session()
if ack_capacity is not None:
self.ssn.ack_capacity = ack_capacity
- rcv = self.ssn.receiver("test-ack-queue")
+ rcv = self.ssn.receiver(ACK_Q)
self.drain(rcv, expected=contents)
acker(self.ssn)
self.ssn.close()
@@ -253,7 +274,7 @@
# drain the queue a final time and verify that the messages were
# dequeued
self.ssn = self.conn.session()
- rcv = self.ssn.receiver("test-ack-queue")
+ rcv = self.ssn.receiver(ACK_Q)
self.assertEmpty(rcv)
def testAcknowledge(self):
@@ -271,7 +292,7 @@
pass
finally:
self.ssn.ack_capacity = UNLIMITED
- self.drain(self.ssn.receiver("test-ack-queue"))
+ self.drain(self.ssn.receiver(ACK_Q))
self.ssn.acknowledge()
def testAcknowledgeAsyncAckCap1(self):
@@ -284,7 +305,7 @@
self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
def send(self, ssn, queue, base, count=1):
- snd = ssn.sender(queue)
+ snd = ssn.sender(queue, durable=self.durable())
contents = []
for i in range(count):
c = self.content(base, i)
@@ -294,10 +315,12 @@
return contents
def txTest(self, commit):
+ TX_Q = 'test-tx-queue {create: always}'
+ TX_Q_COPY = 'test-tx-queue-copy {create: always}'
txssn = self.conn.session(transactional=True)
- contents = self.send(self.ssn, "test-tx-queue", "txTest", 3)
- txrcv = txssn.receiver("test-tx-queue")
- txsnd = txssn.sender("test-tx-queue-copy")
+ contents = self.send(self.ssn, TX_Q, "txTest", 3)
+ txrcv = txssn.receiver(TX_Q)
+ txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
rcv = self.ssn.receiver(txrcv.source)
copy_rcv = self.ssn.receiver(txsnd.target)
self.assertEmpty(copy_rcv)
@@ -323,9 +346,10 @@
self.txTest(False)
def txTestSend(self, commit):
+ TX_SEND_Q = 'test-tx-send-queue {create: always}'
txssn = self.conn.session(transactional=True)
- contents = self.send(txssn, "test-tx-send-queue", "txTestSend", 3)
- rcv = self.ssn.receiver("test-tx-send-queue")
+ contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
+ rcv = self.ssn.receiver(TX_SEND_Q)
self.assertEmpty(rcv)
if commit:
@@ -345,10 +369,11 @@
self.txTestSend(False)
def txTestAck(self, commit):
+ TX_ACK_Q = 'test-tx-ack-queue {create: always}'
txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver("test-tx-ack-queue")
+ txrcv = txssn.receiver(TX_ACK_Q)
self.assertEmpty(txrcv)
- contents = self.send(self.ssn, "test-tx-ack-queue", "txTestAck", 3)
+ contents = self.send(self.ssn, TX_ACK_Q, "txTestAck", 3)
assert contents == self.drain(txrcv)
if commit:
@@ -366,11 +391,11 @@
txssn.close()
txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver("test-tx-ack-queue")
+ txrcv = txssn.receiver(TX_ACK_Q)
assert contents == self.drain(txrcv)
txssn.acknowledge()
txssn.commit()
- rcv = self.ssn.receiver("test-tx-ack-queue")
+ rcv = self.ssn.receiver(TX_ACK_Q)
self.assertEmpty(rcv)
txssn.close()
self.assertEmpty(rcv)
@@ -389,19 +414,22 @@
except Disconnected:
pass
+RECEIVER_Q = 'test-receiver-queue {create: always}'
+
class ReceiverTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port)
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
def setup_session(self):
return self.conn.session()
def setup_sender(self):
- return self.ssn.sender("test-receiver-queue")
+ return self.ssn.sender(RECEIVER_Q)
def setup_receiver(self):
- return self.ssn.receiver("test-receiver-queue")
+ return self.ssn.receiver(RECEIVER_Q)
def send(self, base, count = None):
content = self.content(base, count)
@@ -516,7 +544,7 @@
self.assertPending(self.rcv, 5)
drained = self.drain(self.rcv)
- assert len(drained) == 10
+ assert len(drained) == 10, "%s, %s" % (len(drained), drained)
self.assertPending(self.rcv, 0)
self.ssn.acknowledge()
@@ -538,19 +566,81 @@
# XXX: need testClose
+NOSUCH_Q = "this-queue-should-not-exist"
+UNPARSEABLE_ADDR = "{bad address}"
+UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
+
+class AddressErrorTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def sendErrorTest(self, addr, exc, check=lambda e: True):
+ snd = self.ssn.sender(addr, durable=self.durable())
+ try:
+ snd.send("hello")
+ assert False, "send succeeded"
+ except exc, e:
+ assert check(e), "unexpected error: %s" % e
+ snd.close()
+
+ def fetchErrorTest(self, addr, exc, check=lambda e: True):
+ rcv = self.ssn.receiver(addr)
+ try:
+ rcv.fetch(timeout=0)
+ assert False, "fetch succeeded"
+ except exc, e:
+ assert check(e), "unexpected error: %s" % e
+ rcv.close()
+
+ def testNoTarget(self):
+ # XXX: should have specific exception for this
+ self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e))
+
+ def testNoSource(self):
+ # XXX: should have specific exception for this
+ self.fetchErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e))
+
+ def testUnparseableTarget(self):
+ # XXX: should have specific exception for this
+ self.sendErrorTest(UNPARSEABLE_ADDR, SendError,
+ lambda e: "expecting ID" in str(e))
+
+ def testUnparseableSource(self):
+ # XXX: should have specific exception for this
+ self.fetchErrorTest(UNPARSEABLE_ADDR, ReceiveError,
+ lambda e: "expecting ID" in str(e))
+
+ def testUnlexableTarget(self):
+ # XXX: should have specific exception for this
+ self.sendErrorTest(UNLEXABLE_ADDR, SendError,
+ lambda e: "unrecognized character" in str(e))
+
+ def testUnlexableSource(self):
+ # XXX: should have specific exception for this
+ self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError,
+ lambda e: "unrecognized character" in str(e))
+
+SENDER_Q = 'test-sender-q {create: always}'
+
class SenderTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port)
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
def setup_session(self):
return self.conn.session()
def setup_sender(self):
- return self.ssn.sender("test-sender-queue")
+ return self.ssn.sender(SENDER_Q)
def setup_receiver(self):
- return self.ssn.receiver("test-sender-queue")
+ return self.ssn.receiver(SENDER_Q)
def checkContent(self, content):
self.snd.send(content)
@@ -611,6 +701,7 @@
except InsufficientCapacity:
caught = True
break
+ self.snd.sync()
self.drain(self.rcv, expected=msgs)
self.ssn.acknowledge()
assert caught, "did not exceed capacity"
@@ -643,19 +734,22 @@
m.content = u"<html/>"
assert m.content_type == "text/html; charset=utf8"
+ECHO_Q = 'test-message-echo-queue {create: always}'
+
class MessageEchoTests(Base):
def setup_connection(self):
- return Connection.open(self.broker.host, self.broker.port)
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
def setup_session(self):
return self.conn.session()
def setup_sender(self):
- return self.ssn.sender("test-message-echo-queue")
+ return self.ssn.sender(ECHO_Q)
def setup_receiver(self):
- return self.ssn.receiver("test-message-echo-queue")
+ return self.ssn.receiver(ECHO_Q)
def check(self, msg):
self.snd.send(msg)
Modified: qpid/branches/java-network-refactor/qpid/wcf/ReadMe.txt
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/ReadMe.txt?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/ReadMe.txt (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/ReadMe.txt Sun Oct 11 23:22:08 2009
@@ -49,9 +49,9 @@
qpid source code location e.g. C:\trunk\qpid
5. Build Qpid cpp
-Run CMake and choose "%QPID_ROOT%\cpp\build" as the location for "Where to
-build the binaries". Build at least the "qpidd", "qpidclient" and
-"qpidcommon" projects.
+Build at least the "qpidd", "qpidclient" and "qpidcommon" projects.
+Create an environment variable called QPID_BUILD_ROOT and store the
+path to the Qpid build directory in it.
4. Building the solution file
@@ -81,7 +81,7 @@
%QPID_ROOT%\wcf\test\Apache\Qpid\Test\Channel\Functional\RunTests.bat has the correct
values for the nunit_exe, qpid_dll_location and configuration_name variables as per
your installation.
-2. Start the qpid broker from the qpid build folder e.g. %QPID_ROOT%\cpp\build\src\Debug.
+2. Start the qpid broker from the qpid build folder e.g. %QPID_BUILD_ROOT%\src\Debug.
3. Execute RunTests.bat from its location e.g. %QPID_ROOT%\wcf\test\Apache\Qpid\Test\Channel\Functional.
Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs Sun Oct 11 23:22:08 2009
@@ -64,6 +64,12 @@
set { transport.BrokerPort = value; }
}
+ public int PrefetchLimit
+ {
+ get { return transport.PrefetchLimit; }
+ set { transport.PrefetchLimit = value; }
+ }
+
public bool Shared
{
get { return transport.Shared; }
Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs Sun Oct 11 23:22:08 2009
@@ -63,6 +63,13 @@
set { brokerPort = value; }
}
+ [ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit, DefaultValue = false)]
+ public int PrefetchLimit
+ {
+ get { return (int)base[AmqpConfigurationStrings.PrefetchLimit]; }
+ set { base[AmqpConfigurationStrings.PrefetchLimit] = value; }
+ }
+
[ConfigurationProperty(AmqpConfigurationStrings.Shared, DefaultValue = false)]
public bool Shared
{
@@ -95,6 +102,8 @@
get
{
ConfigurationPropertyCollection properties = base.Properties;
+ properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit,
+ typeof(int), 0, null, null, ConfigurationPropertyOptions.None));
properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.Shared,
typeof(bool), false, null, null, ConfigurationPropertyOptions.None));
properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.TransferMode,
@@ -112,6 +121,7 @@
this.BrokerPort = amqpBinding.BrokerPort;
this.TransferMode = amqpBinding.TransferMode;
this.Shared = amqpBinding.Shared;
+ this.PrefetchLimit = amqpBinding.PrefetchLimit;
AmqpProperties props = amqpBinding.DefaultMessageProperties;
}
@@ -133,6 +143,7 @@
amqpBinding.BrokerPort = this.BrokerPort;
amqpBinding.TransferMode = this.TransferMode;
amqpBinding.Shared = this.Shared;
+ amqpBinding.PrefetchLimit = this.PrefetchLimit;
}
protected override void PostDeserialize()
Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs Sun Oct 11 23:22:08 2009
@@ -32,6 +32,7 @@
AmqpChannelProperties channelProperties;
long maxBufferPoolSize;
bool shared;
+ int prefetchLimit;
internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context)
: base(context.Binding)
@@ -39,6 +40,7 @@
this.bindingElement = bindingElement;
this.channelProperties = bindingElement.ChannelProperties.Clone();
this.shared = bindingElement.Shared;
+ this.prefetchLimit = bindingElement.PrefetchLimit;
this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize;
Collection<MessageEncodingBindingElement> messageEncoderBindingElements
= context.BindingParameters.FindAll<MessageEncodingBindingElement>();
@@ -91,7 +93,7 @@
protected override TChannel OnCreateChannel(EndpointAddress remoteAddress, Uri via)
{
- return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared);
+ return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared, this.prefetchLimit);
}
}
Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs Sun Oct 11 23:22:08 2009
@@ -46,6 +46,7 @@
public const string TransferMode = "transferMode";
public const string Brokers = "brokers";
public const string Shared = "shared";
+ public const string PrefetchLimit = "prefetchLimit";
public const string MaxBufferPoolSize = "maxBufferPoolSize";
public const string MaxReceivedMessageSize = "maxReceivedMessageSize";
}
@@ -55,7 +56,6 @@
internal const string BrokerHost = "localhost";
internal const int BrokerPort = 5672;
internal const TransferMode TransferMode = System.ServiceModel.TransferMode.Buffered;
- internal const byte Priority = 4;
internal const long MaxBufferPoolSize = 64 * 1024;
internal const int MaxReceivedMessageSize = 5 * 1024 * 1024; //64 * 1024;
}
Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs Sun Oct 11 23:22:08 2009
@@ -32,6 +32,7 @@
AmqpTransportBindingElement bindingElement;
AmqpChannelProperties channelProperties;
bool shared;
+ int prefetchLimit;
long maxBufferPoolSize;
Uri uri;
AmqpTransportChannel amqpTransportChannel;
@@ -45,6 +46,7 @@
this.bindingElement = bindingElement;
this.channelProperties = bindingElement.ChannelProperties.Clone();
this.shared = bindingElement.Shared;
+ this.prefetchLimit = bindingElement.PrefetchLimit;
this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize;
@@ -132,7 +134,7 @@
{
amqpTransportChannel = new AmqpTransportChannel(this, this.channelProperties,
new EndpointAddress(uri), messageEncoderFactory.Encoder,
- maxBufferPoolSize, this.shared);
+ maxBufferPoolSize, this.shared, this.prefetchLimit);
return (IInputChannel)(object) amqpTransportChannel;
}
Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs Sun Oct 11 23:22:08 2009
@@ -29,6 +29,7 @@
{
AmqpChannelProperties channelProperties;
bool shared;
+ int prefetchLimit;
public AmqpTransportBindingElement()
{
@@ -41,6 +42,7 @@
{
this.channelProperties = other.channelProperties.Clone();
this.shared = other.shared;
+ this.prefetchLimit = other.prefetchLimit;
}
public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
@@ -98,6 +100,12 @@
set { this.channelProperties.BrokerPort = value; }
}
+ public int PrefetchLimit
+ {
+ get { return this.prefetchLimit; }
+ set { this.prefetchLimit = value; }
+ }
+
public bool Shared
{
get { return this.shared; }
Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs Sun Oct 11 23:22:08 2009
@@ -50,6 +50,7 @@
private MessageEncoder encoder;
private AmqpChannelProperties factoryChannelProperties;
private bool shared;
+ private int prefetchLimit;
private string encoderContentType;
// input = 0-10 queue, output = 0-10 exchange
@@ -68,7 +69,7 @@
private AsyncTimeSpanCaller asyncOpenCaller;
private AsyncTimeSpanCaller asyncCloseCaller;
- internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection)
+ internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection, int prefetchLimit)
: base(factory)
{
this.isInputChannel = (factory is ChannelListenerBase) || (factory is AmqpChannelFactory<IInputChannel>);
@@ -80,6 +81,7 @@
this.factoryChannelProperties = channelProperties;
this.shared = sharedConnection;
+ this.prefetchLimit = prefetchLimit;
this.remoteAddress = remoteAddress;
// pull out host, port, queue, and connection arguments
@@ -128,6 +130,7 @@
if (this.isInputChannel)
{
this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, this.queueName);
+ this.inputLink.PrefetchLimit = this.prefetchLimit;
}
else
{
@@ -287,7 +290,7 @@
return false;
}
-
+
public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
{
return this.inputLink.BeginTryReceive(timeout, callback, state);
@@ -464,7 +467,7 @@
}
return amqpMessage;
}
-
+
private Message QpidToWcf(AmqpMessage amqpMessage)
{
@@ -531,7 +534,7 @@
{
this.bufferManager.ReturnBuffer(managedBuffer);
}
- }
+ }
return wcfMessage;
}
Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp Sun Oct 11 23:22:08 2009
@@ -63,11 +63,12 @@
sessionp = new qpid::client::AsyncSession;
*sessionp = qpidConnectionp->newSession();
subs_mgrp = new SubscriptionManager (*sessionp);
- success = true;
waiters = gcnew Collections::Generic::List<CompletionWaiter^>();
+ success = true;
} finally {
if (!success) {
Cleanup();
+ // TODO: include inner exception information
throw gcnew QpidException ("session creation failure");
}
}
@@ -76,12 +77,6 @@
void AmqpSession::Cleanup()
{
- if (subscriptionp != NULL) {
- subscriptionp->cancel();
- delete subscriptionp;
- subscriptionp=NULL;
- }
-
if (subs_mgrp != NULL) {
subs_mgrp->stop();
delete subs_mgrp;
@@ -112,6 +107,7 @@
void AmqpSession::ConnectionClosed()
{
+ lock l(waiters);
Cleanup();
}
@@ -283,5 +279,27 @@
}
}
+bool AmqpSession::MessageStop(Completion &comp, std::string &name)
+{
+ lock l(waiters);
+
+ if (sessionp == NULL)
+ return false;
+
+ comp = sessionp->messageStop(name, true);
+ return true;
+}
+
+void AmqpSession::AcceptAndComplete(SequenceSet& transfers)
+{
+ lock l(waiters);
+
+ if (sessionp == NULL)
+ throw gcnew ObjectDisposedException("Accept");
+
+ sessionp->markCompleted(transfers, false);
+ sessionp->messageAccept(transfers, false);
+}
+
}}} // namespace Apache::Qpid::Cli
Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h Sun Oct 11 23:22:08 2009
@@ -44,7 +44,6 @@
AsyncSession* sessionp;
SessionImpl* sessionImplp;
SubscriptionManager* subs_mgrp;
- Subscription* subscriptionp;
LocalQueue* localQueuep;
Collections::Generic::List<CompletionWaiter^>^ waiters;
bool helperRunning;
@@ -69,6 +68,8 @@
void ConnectionClosed();
void internalWaitForCompletion(IntPtr Future);
void removeWaiter(CompletionWaiter^ waiter);
+ bool MessageStop(Completion &comp, std::string &name);
+ void AcceptAndComplete(SequenceSet& transfers);
property AmqpConnection^ Connection {
AmqpConnection^ get () { return connection; }
Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h Sun Oct 11 23:22:08 2009
@@ -32,7 +32,6 @@
bool timedOut;
// has an owner thread
bool assigned;
- // can Run (i.e. earlier CompletionWaiters in the queue have completed)
System::Exception^ runException;
AsyncCallback^ asyncCallback;
Threading::Timer ^timer;
Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp Sun Oct 11 23:22:08 2009
@@ -59,6 +59,12 @@
// with proposed changes to the native library to reduce the number of servicing
// threads for large numbers of subscriptions.
+// synchronization is accomplished with locks, but also by ensuring that only one
+// MessageWaiter (the one at the front of the line) is ever active.
+// async threads to watch for: Close/finalizer, Timers, SyncCredit and the native Dispatch
+// thread (who deposits FrameSets into the local queue and is oblivious to the
+// managed space locks).
+
// The folowing def must match the "Frames" private typedef.
// TODO, make Qpid-cpp "Frames" definition visible.
@@ -94,6 +100,8 @@
localQueuep = new LocalQueue;
SubscriptionSettings settings;
settings.flowControl = FlowControl::messageCredit(0);
+ settings.completionMode = CompletionMode::MANUAL_COMPLETION;
+
Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings);
subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup
@@ -197,6 +205,7 @@
IntPtr InputLink::nextLocalMessage()
{
lock l(waiters);
+
if (disposed)
return (IntPtr) NULL;
@@ -279,8 +288,7 @@
if (haveMessage())
return true;
- // TODO: prefetch window of messages, compatible with both 0-10 and 1.0.
- subscriptionp->grantMessageCredit(1);
+ AdjustCredit();
// get a scoped smart ptr ref to guard against async close or hangup
demuxQueuePtr = *queuePtrp;
@@ -350,7 +358,12 @@
}
return;
}
+
waiters->RemoveAt(idx);
+ if (waiter->TimedOut) {
+ // may have to give back message if it arrives momentarily
+ AdjustCredit();
+ }
// let the next waiter know it's his turn.
if (waiters->Count > 0) {
@@ -411,6 +424,129 @@
}
+void InputLink::PrefetchLimit::set(int value)
+{
+ lock l(waiters);
+ prefetchLimit = value;
+
+ int delta = 0;
+
+ // rough rule of thumb to keep the flow, but reduce chatter.
+ // for small messages, the credit request is almost as expensive as the transfer itself.
+ // experience may suggest a better heuristic or require a property for the low water mark
+ if (prefetchLimit >= 3) {
+ delta = prefetchLimit / 3;
+ }
+ minWorkingCredit = prefetchLimit - delta;
+ AdjustCredit();
+}
+
+
+// call with lock held
+void InputLink::AdjustCredit()
+{
+ if (creditSyncPending || disposed)
+ return;
+
+ // low watermark check
+ if ((prefetchLimit != 0) &&
+ (workingCredit >= minWorkingCredit) &&
+ (workingCredit >= waiters->Count))
+ return;
+
+ // should have enough for all waiters or to satisfy the prefetch window
+ int targetCredit = waiters->Count;
+ if (targetCredit < prefetchLimit)
+ targetCredit = prefetchLimit;
+
+ if (targetCredit > workingCredit) {
+ subscriptionp->grantMessageCredit(targetCredit - workingCredit);
+ workingCredit = targetCredit;
+ return;
+ }
+ if (targetCredit < workingCredit) {
+ if ((targetCredit == 0) && (prefetchLimit == 0)) {
+ creditSyncPending = true;
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &InputLink::SyncCredit));
+ }
+ // TODO: also shrink credit when prefetchLimit != 0
+ }
+}
+
+void InputLink::SyncCredit(Object ^unused)
+{
+ lock l(waiters);
+
+ try {
+ if (disposed)
+ return;
+
+ Completion comp;
+ if (!amqpSession->MessageStop(comp, subscriptionp->getName())) {
+ // connection closed
+ return;
+ }
+
+ // get a private scoped copy to use outside the lock
+ Subscription s(*subscriptionp);
+
+ l.release();
+ // use setFlowControl to re-enable credit flow on the broker.
+ // previously used comp.wait() here, but setFlowControl is a sync operation
+ s.setFlowControl(s.getSettings().flowControl);
+ l.acquire();
+
+ if (disposed)
+ return;
+
+ // let existing waiters use up any
+ // local queue size can only decrease until more credit is issued
+ while (true) {
+ if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) {
+ l.release();
+ // a rare use case and not used in performance oriented code.
+ // optimization can wait until the qpid/messaging api is used
+ Thread::Sleep(10);
+ l.acquire();
+ if (disposed)
+ return;
+ }
+ else {
+ break;
+ }
+ }
+
+ // At this point, the lock is held and we are fully synced with the broker
+ // so we have a valid snapshot
+
+ if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) {
+ // can't be sure application will request a message again any time soon
+ QpidFrameSetPtr frameSetp;
+ while (!(*queuePtrp)->empty()) {
+ (*queuePtrp)->pop(frameSetp);
+ SequenceSet frameSetID(frameSetp->getId());
+ subscriptionp->release(frameSetID);
+ }
+
+ // don't touch dequeuedFrameSetpp. It is spoken for: explicitely from a
+ // MessageWaiter about to to get the nextLocalMessage(), or implicitely
+ // from a WaitForMessage().
+ }
+ // TODO: if prefetchLimit != 0, release messages from back of the queue that exceed targetCredit
+
+ workingCredit = (*queuePtrp)->size();
+ if (dequeuedFrameSetpp != NULL) {
+ workingCredit++;
+ }
+ }
+ finally {
+ creditSyncPending = false;
+ }
+
+ AdjustCredit();
+}
+
+
AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
{
QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer();
@@ -539,7 +675,15 @@
// We have a message we can return to the caller.
// Tell the broker we got it.
- subscriptionp->accept(frameSetID);
+
+ // subscriptionp->accept(frameSetID) is a slow sync operation in the native API
+ // so do it within the AsyncSession directly
+ amqpSession->AcceptAndComplete(frameSetID);
+
+ workingCredit--;
+ // check if more messages need to be requested from broker
+ AdjustCredit();
+
return amqpMessage;
}
finally {
Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h Sun Oct 11 23:22:08 2009
@@ -47,6 +47,14 @@
bool finalizing;
QpidFrameSetPtr* dequeuedFrameSetpp;
ManualResetEvent^ asyncHelperWaitHandle;
+ // number of messages to buffer locally for future consumption
+ int prefetchLimit;
+ // the number of messages requested and not yet processed
+ int workingCredit;
+ // stopping and restarting the message flow
+ bool creditSyncPending;
+ // working credit low water mark
+ int minWorkingCredit;
void Cleanup();
void ReleaseNative();
@@ -54,6 +62,8 @@
void addWaiter(MessageWaiter^ waiter);
void asyncHelper();
AmqpMessage^ createAmqpMessage(IntPtr msgp);
+ void AdjustCredit();
+ void SyncCredit(Object ^);
internal:
InputLink(AmqpSession^ session, System::String^ sourceQueue, qpid::client::AsyncSession *qpidSessionp,
@@ -80,6 +90,11 @@
IAsyncResult^ BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state);
bool EndWaitForMessage(IAsyncResult^ result);
+ property int PrefetchLimit {
+ int get () { return prefetchLimit; }
+ void set (int value);
+ }
+
};
}}} // namespace Apache::Qpid::Interop
Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj Sun Oct 11 23:22:08 2009
@@ -65,7 +65,7 @@
Name="VCCLCompilerTool"
AdditionalOptions="/FU Debug\Apache.Qpid.AmqpTypes.netmodule"
Optimization="0"
- AdditionalIncludeDirectories="..\..\..\..\..\cpp\build\include;..\..\..\..\..\cpp\build\src;..\..\..\..\..\cpp\include;..\..\..\..\..\cpp\src;"$(BOOST_ROOT)""
+ AdditionalIncludeDirectories=""$(QPID_BUILD_ROOT)\include";"$(QPID_BUILD_ROOT)\src";..\..\..\..\..\cpp\include;..\..\..\..\..\cpp\src;"$(BOOST_ROOT)""
PreprocessorDefinitions="WIN32;_DEBUG;_CRT_NONSTDC_NO_WARNINGS;WIN32_LEAN_AND_MEAN;NOMINMAX;_SCL_SECURE_NO_WARNINGS;BOOST_ALL_DYN_LINK"
RuntimeLibrary="3"
UsePrecompiledHeader="0"
@@ -83,7 +83,7 @@
/>
<Tool
Name="VCLinkerTool"
- AdditionalOptions="..\..\..\..\..\cpp\build\src\Debug\qpidcommon.lib ..\..\..\..\..\cpp\build\src\Debug\qpidclient.lib Debug\Apache.Qpid.AmqpTypes.netmodule"
+ AdditionalOptions="$(QPID_BUILD_ROOT)\src\Debug\qpidcommond.lib $(QPID_BUILD_ROOT)\src\Debug\qpidclientd.lib Debug\Apache.Qpid.AmqpTypes.netmodule"
AdditionalDependencies="$(NoInherit)"
OutputFile="$(OutDir)\Apache.Qpid.Interop.dll"
LinkIncremental="2"
Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h Sun Oct 11 23:22:08 2009
@@ -66,7 +66,6 @@
void Activate();
void WaitForCompletion();
-// inline void SetCompletedSynchronously (bool v) { completedSynchronously = v; }
property IntPtr Message {
IntPtr get () {
@@ -78,7 +77,6 @@
GC::SuppressFinalize(this);
return v;
}
- // void set (IntPtr v) { message = v; }
}
property bool Assigned {
@@ -89,11 +87,11 @@
bool get () { return timedOut; }
}
-
property System::Exception^ RunException {
System::Exception^ get() { return runException; }
}
+
public:
virtual property bool IsCompleted {
Modified: qpid/branches/java-network-refactor/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat Sun Oct 11 23:22:08 2009
@@ -19,15 +19,15 @@
set nunit_exe=%programfiles%\NUnit 2.5.1\bin\net-2.0\nunit-console.exe
-set qpid_dll_location=..\..\..\..\..\..\..\cpp\build\src\Debug
+set qpid_dll_location=%QPID_BUILD_ROOT%\src\Debug
set configuration_name=bin\Debug
set qcreate_location=..\..\..\..\..\..\tools\QCreate\Debug
-copy %qpid_dll_location%\qpidclient.dll %configuration_name%
-copy %qpid_dll_location%\qpidcommon.dll %configuration_name%
+copy %qpid_dll_location%\qpidclientd.dll %configuration_name%
+copy %qpid_dll_location%\qpidcommond.dll %configuration_name%
-copy %qpid_dll_location%\qpidclient.dll %qcreate_location%
-copy %qpid_dll_location%\qpidcommon.dll %qcreate_location%
+copy %qpid_dll_location%\qpidclientd.dll %qcreate_location%
+copy %qpid_dll_location%\qpidcommond.dll %qcreate_location%
%qcreate_location%\QCreate.exe amq.direct routing_key message_queue
Modified: qpid/branches/java-network-refactor/qpid/wcf/tools/QCreate/QCreate.vcproj
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/tools/QCreate/QCreate.vcproj?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/tools/QCreate/QCreate.vcproj (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/tools/QCreate/QCreate.vcproj Sun Oct 11 23:22:08 2009
@@ -61,7 +61,7 @@
<Tool
Name="VCCLCompilerTool"
Optimization="0"
- AdditionalIncludeDirectories=""$(BOOST_ROOT)\include\$(BOOST_VERSION)";"$(BOOST_ROOT)\.";..\..\..\cpp\include;..\..\..\cpp\build\include"
+ AdditionalIncludeDirectories=""$(BOOST_ROOT)\include\$(BOOST_VERSION)";"$(BOOST_ROOT)\.";..\..\..\cpp\include;"$(QPID_BUILD_ROOT)\include""
PreprocessorDefinitions="WIN32;_DEBUG;_CONSOLE"
MinimalRebuild="true"
BasicRuntimeChecks="3"
@@ -81,9 +81,9 @@
/>
<Tool
Name="VCLinkerTool"
- AdditionalDependencies="qpidcommon.lib qpidclient.lib"
+ AdditionalDependencies="qpidcommond.lib qpidclientd.lib"
LinkIncremental="2"
- AdditionalLibraryDirectories=".;"$(BOOST_ROOT)\lib";..\..\..\cpp\build\src\Debug"
+ AdditionalLibraryDirectories=".;"$(BOOST_ROOT)\lib";"$(QPID_BUILD_ROOT)\src\Debug""
GenerateDebugInformation="true"
SubSystem="1"
TargetMachine="1"
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org