You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/10/12 01:22:20 UTC

svn commit: r824198 [9/9] - in /qpid/branches/java-network-refactor: ./ qpid/cpp/ qpid/cpp/bindings/qmf/ qpid/cpp/bindings/qmf/python/ qpid/cpp/bindings/qmf/ruby/ qpid/cpp/bindings/qmf/tests/ qpid/cpp/boost-1.32-support/ qpid/cpp/etc/ qpid/cpp/examples...

Modified: qpid/branches/java-network-refactor/qpid/python/qpid/driver.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid/driver.py?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid/driver.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid/driver.py Sun Oct 11 23:22:08 2009
@@ -17,25 +17,23 @@
 # under the License.
 #
 
-import compat, connection, socket, sys, time
+import address, compat, connection, socket, struct, sys, time
 from concurrency import synchronized
-from datatypes import RangedSet, Message as Message010
-from exceptions import Timeout
+from datatypes import RangedSet, Serial
+from exceptions import Timeout, VersionError
+from framing import OpEncoder, SegmentEncoder, FrameEncoder, FrameDecoder, SegmentDecoder, OpDecoder
 from logging import getLogger
 from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED
-from ops import delivery_mode
-from session import Client, INCOMPLETE, SessionDetached
+from ops import *
+from selector import Selector
 from threading import Condition, Thread
 from util import connect
 
 log = getLogger("qpid.messaging")
 
-def parse_addr(address):
-  parts = address.split("/", 1)
-  if len(parts) == 1:
-    return parts[0], None
-  else:
-    return parts[0], parts[i1]
+def addr2reply_to(addr):
+  name, subject, options = address.parse(addr)
+  return ReplyTo(name, subject)
 
 def reply_to2addr(reply_to):
   if reply_to.routing_key is None:
@@ -50,287 +48,617 @@
   def __init__(self, target):
     self.target = target
 
+# XXX
+
 DURABLE_DEFAULT=True
 
+# XXX
+
 FILTER_DEFAULTS = {
   "topic": Pattern("*")
   }
 
-def delegate(handler, session):
-  class Delegate(Client):
+# XXX
+
+CLIENT_PROPERTIES = {"product": "qpid python client",
+                     "version": "development",
+                     "platform": os.name,
+                     "qpid.client_process": os.path.basename(sys.argv[0]),
+                     "qpid.client_pid": os.getpid(),
+                     "qpid.client_ppid": os.getppid()}
+
+def noop(): pass
+
+class SessionState:
+
+  def __init__(self, driver, session, name, channel):
+    self.driver = driver
+    self.session = session
+    self.name = name
+    self.channel = channel
+    self.detached = False
+    self.committing = False
+    self.aborting = False
+
+    # sender state
+    self.sent = Serial(0)
+    self.acknowledged = RangedSet()
+    self.completions = {}
+    self.min_completion = self.sent
+    self.max_completion = self.sent
+    self.results = {}
+
+    # receiver state
+    self.received = None
+    self.executed = RangedSet()
+
+    # XXX: need to periodically exchange completion/known_completion
+
+  def write_query(self, query, handler):
+    id = self.sent
+    query.sync = True
+    self.write_cmd(query, lambda: handler(self.results.pop(id)))
+
+  def write_cmd(self, cmd, completion=noop):
+    if self.detached:
+      raise Exception("detached")
+    cmd.id = self.sent
+    self.sent += 1
+    self.completions[cmd.id] = completion
+    self.max_completion = cmd.id
+    self.write_op(cmd)
+
+  def write_op(self, op):
+    op.channel = self.channel
+    self.driver.write_op(op)
 
-    def message_transfer(self, cmd):
-      return handler._message_transfer(session, cmd)
-  return Delegate
+# XXX
+HEADER="!4s4B"
+
+EMPTY_DP = DeliveryProperties()
+EMPTY_MP = MessageProperties()
 
 class Driver:
 
   def __init__(self, connection):
     self.connection = connection
     self._lock = self.connection._lock
-    self._wakeup_cond = Condition()
-    self._socket = None
-    self._conn = None
+
+    self._selector = Selector.default()
+    self.reset()
+
+  def reset(self):
+    self._opening = False
+    self._closing = False
     self._connected = False
     self._attachments = {}
-    self._modcount = self.connection._modcount
-    self.thread = Thread(target=self.run)
-    self.thread.setDaemon(True)
-    # XXX: need to figure out how to join on this thread
 
+    self._channel_max = 65536
+    self._channels = 0
+    self._sessions = {}
+
+    self._socket = None
+    self._buf = ""
+    self._hdr = ""
+    self._op_enc = OpEncoder()
+    self._seg_enc = SegmentEncoder()
+    self._frame_enc = FrameEncoder()
+    self._frame_dec = FrameDecoder()
+    self._seg_dec = SegmentDecoder()
+    self._op_dec = OpDecoder()
+    self._timeout = None
+
+    for ssn in self.connection.sessions.values():
+      for m in ssn.acked + ssn.unacked + ssn.incoming:
+        m._transfer_id = None
+      for snd in ssn.senders:
+        snd.linked = False
+      for rcv in ssn.receivers:
+        rcv.impending = rcv.received
+        rcv.linked = False
+
+  @synchronized
   def wakeup(self):
-    self._wakeup_cond.acquire()
-    try:
-      self._wakeup_cond.notifyAll()
-    finally:
-      self._wakeup_cond.release()
+    self.dispatch()
+    self._selector.wakeup()
 
   def start(self):
-    self.thread.start()
+    self._selector.register(self)
+
+  def fileno(self):
+    return self._socket.fileno()
+
+  @synchronized
+  def reading(self):
+    return self._socket is not None
+
+  @synchronized
+  def writing(self):
+    return self._socket is not None and self._buf
+
+  @synchronized
+  def timing(self):
+    return self._timeout
+
+  @synchronized
+  def readable(self):
+    error = None
+    recoverable = False
+    try:
+      data = self._socket.recv(64*1024)
+      if data:
+        log.debug("READ: %r", data)
+      else:
+        log.debug("ABORTED: %s", self._socket.getpeername())
+        error = "connection aborted"
+        recoverable = True
+    except socket.error, e:
+      error = e
+      recoverable = True
 
-  def run(self):
-    while True:
-      self._wakeup_cond.acquire()
+    if not error:
       try:
-        if self.connection._modcount <= self._modcount:
-          self._wakeup_cond.wait(10)
-      finally:
-        self._wakeup_cond.release()
-      self.dispatch(self.connection._modcount)
+        if len(self._hdr) < 8:
+          r = 8 - len(self._hdr)
+          self._hdr += data[:r]
+          data = data[r:]
+
+          if len(self._hdr) == 8:
+            self.do_header(self._hdr)
+
+        self._frame_dec.write(data)
+        self._seg_dec.write(*self._frame_dec.read())
+        self._op_dec.write(*self._seg_dec.read())
+        for op in self._op_dec.read():
+          self.assign_id(op)
+          log.debug("RCVD: %r", op)
+          op.dispatch(self)
+      except VersionError, e:
+        error = e
+      except:
+        msg = compat.format_exc()
+        error = msg
+
+    if error:
+      self._error(error, recoverable)
+    else:
+      self.dispatch()
+
+    self.connection._waiter.notifyAll()
+
+  def assign_id(self, op):
+    if isinstance(op, Command):
+      sst = self.get_sst(op)
+      op.id = sst.received
+      sst.received += 1
 
   @synchronized
-  def dispatch(self, modcount):
+  def writeable(self):
     try:
-      if self._conn is None and self.connection._connected:
+      n = self._socket.send(self._buf)
+      log.debug("SENT: %r", self._buf[:n])
+      self._buf = self._buf[n:]
+    except socket.error, e:
+      self._error(e, True)
+      self.connection._waiter.notifyAll()
+
+  @synchronized
+  def timeout(self):
+    log.warn("retrying ...")
+    self.dispatch()
+    self.connection._waiter.notifyAll()
+
+  def _error(self, err, recoverable):
+    if self._socket is not None:
+      self._socket.close()
+    self.reset()
+    if recoverable and self.connection.reconnect:
+      self._timeout = time.time() + 3
+      log.warn("recoverable error: %s" % err)
+      log.warn("sleeping 3 seconds")
+    else:
+      self.connection.error = (err,)
+
+  def write_op(self, op):
+    log.debug("SENT: %r", op)
+    self._op_enc.write(op)
+    self._seg_enc.write(*self._op_enc.read())
+    self._frame_enc.write(*self._seg_enc.read())
+    self._buf += self._frame_enc.read()
+
+  def do_header(self, hdr):
+    cli_major = 0; cli_minor = 10
+    magic, _, _, major, minor = struct.unpack(HEADER, hdr)
+    if major != cli_major or minor != cli_minor:
+      raise VersionError("client: %s-%s, server: %s-%s" %
+                         (cli_major, cli_minor, major, minor))
+
+  def do_connection_start(self, start):
+    # XXX: should we use some sort of callback for this?
+    r = "\0%s\0%s" % (self.connection.username, self.connection.password)
+    m = self.connection.mechanism
+    self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES,
+                                    mechanism=m, response=r))
+
+  def do_connection_tune(self, tune):
+    # XXX: is heartbeat protocol specific?
+    if tune.channel_max is not None:
+      self.channel_max = tune.channel_max
+    self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat,
+                                   channel_max=self.channel_max))
+    self.write_op(ConnectionOpen())
+
+  def do_connection_open_ok(self, open_ok):
+    self._connected = True
+
+  def connection_heartbeat(self, hrt):
+    self.write_op(ConnectionHeartbeat())
+
+  def do_connection_close(self, close):
+    self.write_op(ConnectionCloseOk())
+    if close.reply_code != close_code.normal:
+      self.connection.error = (close.reply_code, close.reply_text)
+    # XXX: should we do a half shutdown on the socket here?
+    # XXX: we really need to test this, we may end up reporting a
+    # connection abort after this, if we were to do a shutdown on read
+    # and stop reading, then we wouldn't report the abort, that's
+    # probably the right thing to do
+
+  def do_connection_close_ok(self, close_ok):
+    self._socket.close()
+    self.reset()
+
+  def do_session_attached(self, atc):
+    pass
+
+  def do_session_command_point(self, cp):
+    sst = self.get_sst(cp)
+    sst.received = cp.command_id
+
+  def do_session_completed(self, sc):
+    sst = self.get_sst(sc)
+    for r in sc.commands:
+      sst.acknowledged.add(r.lower, r.upper)
+
+    if not sc.commands.empty():
+      while sst.min_completion in sc.commands:
+        if sst.completions.has_key(sst.min_completion):
+          sst.completions.pop(sst.min_completion)()
+        sst.min_completion += 1
+
+  def session_known_completed(self, kcmp):
+    sst = self.get_sst(kcmp)
+    executed = RangedSet()
+    for e in sst.executed.ranges:
+      for ke in kcmp.ranges:
+        if e.lower in ke and e.upper in ke:
+          break
+      else:
+        executed.add_range(e)
+    sst.executed = completed
+
+  def do_session_flush(self, sf):
+    sst = self.get_sst(sf)
+    if sf.expected:
+      if sst.received is None:
+        exp = None
+      else:
+        exp = RangedSet(sst.received)
+      sst.write_op(SessionExpected(exp))
+    if sf.confirmed:
+      sst.write_op(SessionConfirmed(sst.executed))
+    if sf.completed:
+      sst.write_op(SessionCompleted(sst.executed))
+
+  def do_execution_result(self, er):
+    sst = self.get_sst(er)
+    sst.results[er.command_id] = er.value
+
+  def do_execution_exception(self, ex):
+    sst = self.get_sst(ex)
+    sst.session.error = (ex,)
+
+  def dispatch(self):
+    try:
+      if self._socket is None and self.connection._connected and not self._opening:
         self.connect()
-      elif self._conn is not None and not self.connection._connected:
+      elif self._socket is not None and not self.connection._connected and not self._closing:
         self.disconnect()
 
-      if self._conn is not None:
+      if self._connected and not self._closing:
         for ssn in self.connection.sessions.values():
           self.attach(ssn)
           self.process(ssn)
-
-      exi = None
     except:
-      exi = sys.exc_info()
-
-    if exi:
       msg = compat.format_exc()
-      recoverable = ["aborted", "Connection refused", "SessionDetached", "Connection reset by peer",
-                     "Bad file descriptor", "start timed out", "Broken pipe"]
-      for r in recoverable:
-        if self.connection.reconnect and r in msg:
-          print "waiting to retry"
-          self.reset()
-          time.sleep(3)
-          print "retrying..."
-          return
-      else:
-        self.connection.error = (msg,)
-
-    self._modcount = modcount
-    self.connection._waiter.notifyAll()
+      self.connection.error = (msg,)
 
   def connect(self):
-    if self._conn is not None:
-      return
     try:
+      # XXX: should make this non blocking
       self._socket = connect(self.connection.host, self.connection.port)
+      self._timeout = None
     except socket.error, e:
-      raise ConnectError(e)
-    self._conn = connection.Connection(self._socket)
-    try:
-      self._conn.start(timeout=10)
-      self._connected = True
-    except connection.VersionError, e:
-      raise ConnectError(e)
-    except Timeout:
-      print "start timed out"
-      raise ConnectError("start timed out")
+      if self.connection.reconnect:
+        self._error(e, True)
+        return
+      else:
+        raise e
+    self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
+    self._opening = True
 
   def disconnect(self):
-    self._conn.close()
-    self.reset()
-
-  def reset(self):
-    self._conn = None
-    self._connected = False
-    self._attachments.clear()
-    for ssn in self.connection.sessions.values():
-      for m in ssn.acked + ssn.unacked + ssn.incoming:
-        m._transfer_id = None
-      for rcv in ssn.receivers:
-        rcv.impending = rcv.received
-
-  def connected(self):
-    return self._conn is not None
+    self.write_op(ConnectionClose(close_code.normal))
+    self._closing = True
 
   def attach(self, ssn):
-    _ssn = self._attachments.get(ssn)
-    if _ssn is None:
-      _ssn = self._conn.session(ssn.name, delegate=delegate(self, ssn))
-      _ssn.auto_sync = False
-      _ssn.invoke_lock = self._lock
-      _ssn.lock = self._lock
-      _ssn.condition = self.connection._condition
+    sst = self._attachments.get(ssn)
+    if sst is None and not ssn.closed:
+      for i in xrange(0, self.channel_max):
+        if not self._sessions.has_key(i):
+          ch = i
+          break
+      else:
+        raise RuntimeError("all channels used")
+      sst = SessionState(self, ssn, ssn.name, ch)
+      sst.write_op(SessionAttach(name=ssn.name))
+      sst.write_op(SessionCommandPoint(sst.sent, 0))
+      sst.outgoing_idx = 0
+      sst.acked = []
       if ssn.transactional:
-        # XXX: adding an attribute to qpid.session.Session
-        _ssn.acked = []
-        _ssn.tx_select()
-      self._attachments[ssn] = _ssn
+        sst.write_cmd(TxSelect())
+      self._attachments[ssn] = sst
+      self._sessions[sst.channel] = sst
 
     for snd in ssn.senders:
       self.link_out(snd)
     for rcv in ssn.receivers:
       self.link_in(rcv)
 
-    if ssn.closing:
-      _ssn.close()
-      del self._attachments[ssn]
-      ssn.closed = True
-
-  def _exchange_query(self, ssn, address):
-    # XXX: auto sync hack is to avoid deadlock on future
-    result = ssn.exchange_query(name=address, sync=True)
-    ssn.sync()
-    return result.get()
+    if sst is not None and ssn.closing and not sst.detached:
+      sst.detached = True
+      sst.write_op(SessionDetach(name=ssn.name))
+
+  def get_sst(self, op):
+    return self._sessions[op.channel]
+
+  def do_session_detached(self, dtc):
+    sst = self._sessions.pop(dtc.channel)
+    ssn = sst.session
+    del self._attachments[ssn]
+    ssn.closed = True
+
+  def do_session_detach(self, dtc):
+    sst = self.get_sst(dtc)
+    sst.write_op(SessionDetached(name=dtc.name))
+    self.do_session_detached(dtc)
 
   def link_out(self, snd):
-    _ssn = self._attachments[snd.session]
+    sst = self._attachments.get(snd.session)
     _snd = self._attachments.get(snd)
-    if _snd is None:
+    if _snd is None and not snd.closing and not snd.closed:
       _snd = Attachment(snd)
-      node, _snd._subject = parse_addr(snd.target)
-      result = self._exchange_query(_ssn, node)
-      if result.not_found:
-        # XXX: should check 'create' option
-        _ssn.queue_declare(queue=node, durable=DURABLE_DEFAULT, sync=True)
-        _ssn.sync()
-        _snd._exchange = ""
-        _snd._routing_key = node
-      else:
-        _snd._exchange = node
-        _snd._routing_key = _snd._subject
+
+      try:
+        _snd.name, _snd.subject, _snd.options = address.parse(snd.target)
+      except address.LexError, e:
+        snd.error = e
+        snd.closed = True
+        return
+      except address.ParseError, e:
+        snd.error = e
+        snd.closed = True
+        return
+
+      # XXX: subject
+      if _snd.options is None:
+        _snd.options = {}
+
+      def do_link():
+        snd.linked = True
+
+      def do_queue_q(result):
+        if sst.detached:
+          return
+
+        if result.queue:
+          _snd._exchange = ""
+          _snd._routing_key = _snd.name
+          do_link()
+        else:
+          snd.error = ("no such queue: %s" % _snd.name,)
+          del self._attachments[snd]
+          snd.closed = True
+
+      def do_exchange_q(result):
+        if sst.detached:
+          return
+
+        if result.not_found:
+          if _snd.options.get("create") in ("always", "receiver"):
+            sst.write_cmd(QueueDeclare(queue=_snd.name, durable=DURABLE_DEFAULT))
+            _snd._exchange = ""
+            _snd._routing_key = _snd.name
+          else:
+            sst.write_query(QueueQuery(queue=_snd.name), do_queue_q)
+            return
+        else:
+          _snd._exchange = _snd.name
+          _snd._routing_key = _snd.subject
+        do_link()
+
+      sst.write_query(ExchangeQuery(name=_snd.name), do_exchange_q)
       self._attachments[snd] = _snd
 
-    if snd.closed:
+    if snd.closing and not snd.closed:
       del self._attachments[snd]
-      return None
-    else:
-      return _snd
+      snd.closed = True
 
   def link_in(self, rcv):
-    _ssn = self._attachments[rcv.session]
+    sst = self._attachments.get(rcv.session)
     _rcv = self._attachments.get(rcv)
-    if _rcv is None:
+    if _rcv is None and not rcv.closing and not rcv.closed:
       _rcv = Attachment(rcv)
-      result = self._exchange_query(_ssn, rcv.source)
-      if result.not_found:
-        _rcv._queue = rcv.source
-        # XXX: should check 'create' option
-        _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT)
-      else:
-        _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
-        _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)
-        if rcv.filter is None:
-          f = FILTER_DEFAULTS[result.type]
+      _rcv.canceled = False
+      _rcv.draining = False
+
+      try:
+        _rcv.name, _rcv.subject, _rcv.options = address.parse(rcv.source)
+      except address.LexError, e:
+        rcv.error = e
+        rcv.closed = True
+        return
+      except address.ParseError, e:
+        rcv.error = e
+        rcv.closed = True
+        return
+
+      # XXX: subject
+      if _rcv.options is None:
+        _rcv.options = {}
+
+      def do_link():
+        sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=rcv.destination))
+        sst.write_cmd(MessageSetFlowMode(rcv.destination, flow_mode.credit))
+        rcv.linked = True
+
+      def do_queue_q(result):
+        if sst.detached:
+          return
+        if result.queue:
+          _rcv._queue = _rcv.name
+          do_link()
         else:
-          f = rcv.filter
-        f._bind(_ssn, rcv.source, _rcv._queue)
-      _ssn.message_subscribe(queue=_rcv._queue, destination=rcv.destination)
-      _ssn.message_set_flow_mode(rcv.destination, _ssn.flow_mode.credit, sync=True)
+          rcv.error = ("no such queue: %s" % _rcv.name,)
+          del self._attachments[rcv]
+          rcv.closed = True
+
+      def do_exchange_q(result):
+        if sst.detached:
+          return
+        if result.not_found:
+          if _rcv.options.get("create") in ("always", "receiver"):
+            _rcv._queue = _rcv.name
+            sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT))
+          else:
+            sst.write_query(QueueQuery(queue=_rcv.name), do_queue_q)
+            return
+        else:
+          _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
+          sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
+          filter = _rcv.options.get("filter")
+          if _rcv.subject is None and filter is None:
+            f = FILTER_DEFAULTS[result.type]
+          elif _rcv.subject and filter:
+            # XXX
+            raise Exception("can't supply both subject and filter")
+          elif _rcv.subject:
+            # XXX
+            from messaging import Pattern
+            f = Pattern(_rcv.subject)
+          else:
+            f = filter
+          f._bind(sst, _rcv.name, _rcv._queue)
+        do_link()
+      sst.write_query(ExchangeQuery(name=_rcv.name), do_exchange_q)
       self._attachments[rcv] = _rcv
-      # XXX: need to kill syncs
-      _ssn.sync()
 
-    if rcv.closing:
-      _ssn.message_cancel(rcv.destination, sync=True)
-      # XXX: need to kill syncs
-      _ssn.sync()
-      del self._attachments[rcv]
-      rcv.closed = True
-      return None
-    else:
-      return _rcv
+    if rcv.closing and not rcv.closed:
+      if rcv.linked:
+        if not _rcv.canceled:
+          def close_rcv():
+            del self._attachments[rcv]
+            rcv.closed = True
+          sst.write_cmd(MessageCancel(rcv.destination, sync=True), close_rcv)
+          _rcv.canceled = True
+      else:
+        rcv.closed = True
 
   def process(self, ssn):
     if ssn.closing: return
 
-    _ssn = self._attachments[ssn]
+    sst = self._attachments[ssn]
 
-    while ssn.outgoing:
-      msg = ssn.outgoing[0]
+    while sst.outgoing_idx < len(ssn.outgoing):
+      msg = ssn.outgoing[sst.outgoing_idx]
       snd = msg._sender
-      self.send(snd, msg)
-      ssn.outgoing.pop(0)
+      # XXX: should check for sender error here
+      _snd = self._attachments.get(snd)
+      if _snd and snd.linked:
+        self.send(snd, msg)
+        sst.outgoing_idx += 1
+      else:
+        break
 
     for rcv in ssn.receivers:
       self.process_receiver(rcv)
 
     if ssn.acked:
-      messages = ssn.acked[:]
-      ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
-      for range in ids:
-        _ssn.receiver._completed.add_range(range)
-      ch = _ssn.channel
-      if ch is None:
-        raise SessionDetached()
-      ch.session_completed(_ssn.receiver._completed)
-      _ssn.message_accept(ids, sync=True)
-      # XXX: really need to make this async so that we don't give up the lock
-      _ssn.sync()
-
-      # XXX: we're ignoring acks that get lost when disconnected
-      for m in messages:
-        ssn.acked.remove(m)
-        if ssn.transactional:
-          _ssn.acked.append(m)
-
-    if ssn.committing:
-      _ssn.tx_commit(sync=True)
-      # XXX: need to kill syncs
-      _ssn.sync()
-      del _ssn.acked[:]
-      ssn.committing = False
-      ssn.committed = True
-      ssn.aborting = False
-      ssn.aborted = False
+      messages = [m for m in ssn.acked if m not in sst.acked]
+      if messages:
+        # XXX: we're ignoring acks that get lost when disconnected,
+        # could we deal this via some message-id based purge?
+        ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
+        for range in ids:
+          sst.executed.add_range(range)
+        sst.write_op(SessionCompleted(sst.executed))
+        def ack_ack():
+          for m in messages:
+            ssn.acked.remove(m)
+            if not ssn.transactional:
+              sst.acked.remove(m)
+        sst.write_cmd(MessageAccept(ids, sync=True), ack_ack)
+        sst.acked.extend(messages)
+
+    if ssn.committing and not sst.committing:
+      def commit_ok():
+        del sst.acked[:]
+        ssn.committing = False
+        ssn.committed = True
+        ssn.aborting = False
+        ssn.aborted = False
+      sst.write_cmd(TxCommit(sync=True), commit_ok)
+      sst.committing = True
+
+    if ssn.aborting and not sst.aborting:
+      sst.aborting = True
+      def do_rb():
+        messages = sst.acked + ssn.unacked + ssn.incoming
+        ids = RangedSet(*[m._transfer_id for m in messages])
+        for range in ids:
+          sst.executed.add_range(range)
+        sst.write_op(SessionCompleted(sst.executed))
+        sst.write_cmd(MessageRelease(ids))
+        sst.write_cmd(TxRollback(sync=True), do_rb_ok)
+
+      def do_rb_ok():
+        del ssn.incoming[:]
+        del ssn.unacked[:]
+        del sst.acked[:]
+
+        for rcv in ssn.receivers:
+          rcv.impending = rcv.received
+          rcv.returned = rcv.received
+          # XXX: do we need to update granted here as well?
+
+        for rcv in ssn.receivers:
+          self.process_receiver(rcv)
+
+        ssn.aborting = False
+        ssn.aborted = True
+        ssn.committing = False
+        ssn.committed = False
+        sst.aborting = False
 
-    if ssn.aborting:
       for rcv in ssn.receivers:
-        _ssn.message_stop(rcv.destination)
-      _ssn.sync()
-
-      messages = _ssn.acked + ssn.unacked + ssn.incoming
-      ids = RangedSet(*[m._transfer_id for m in messages])
-      for range in ids:
-        _ssn.receiver._completed.add_range(range)
-      _ssn.channel.session_completed(_ssn.receiver._completed)
-      _ssn.message_release(ids)
-      _ssn.tx_rollback(sync=True)
-      _ssn.sync()
-
-      del ssn.incoming[:]
-      del ssn.unacked[:]
-      del _ssn.acked[:]
-
-      for rcv in ssn.receivers:
-        rcv.impending = rcv.received
-        rcv.returned = rcv.received
-        # XXX: do we need to update granted here as well?
-
-      for rcv in ssn.receivers:
-        self.process_receiver(rcv)
-
-      ssn.aborting = False
-      ssn.aborted = True
-      ssn.committing = False
-      ssn.committed = False
+        sst.write_cmd(MessageStop(rcv.destination))
+      sst.write_cmd(ExecutionSync(sync=True), do_rb)
 
   def grant(self, rcv):
-    _ssn = self._attachments[rcv.session]
-    _rcv = self.link_in(rcv)
+    sst = self._attachments[rcv.session]
+    _rcv = self._attachments.get(rcv)
+    if _rcv is None or not rcv.linked or _rcv.canceled or _rcv.draining:
+      return
 
     if rcv.granted is UNLIMITED:
       if rcv.impending is UNLIMITED:
@@ -343,30 +671,37 @@
       delta = max(rcv.granted, rcv.received) - rcv.impending
 
     if delta is UNLIMITED:
-      _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
-      _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value)
+      sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value))
+      sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, UNLIMITED.value))
       rcv.impending = UNLIMITED
     elif delta > 0:
-      _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
-      _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta)
+      sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value))
+      sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, delta))
       rcv.impending += delta
-    elif delta < 0:
-      if rcv.drain:
-        _ssn.message_flush(rcv.destination, sync=True)
-      else:
-        _ssn.message_stop(rcv.destination, sync=True)
-      # XXX: need to kill syncs
-      _ssn.sync()
-      rcv.impending = rcv.received
-      self.grant(rcv)
+    elif delta < 0 and not rcv.draining:
+      _rcv.draining = True
+      def do_stop():
+        rcv.impending = rcv.received
+        _rcv.draining = False
+        self.grant(rcv)
+      sst.write_cmd(MessageStop(rcv.destination, sync=True), do_stop)
+
+    if rcv.draining:
+      def do_flush():
+        rcv.impending = rcv.received
+        rcv.granted = rcv.impending
+        _rcv.draining = False
+        rcv.draining = False
+      sst.write_cmd(MessageFlush(rcv.destination, sync=True), do_flush)
+
 
   def process_receiver(self, rcv):
     if rcv.closed: return
     self.grant(rcv)
 
   def send(self, snd, msg):
-    _ssn = self._attachments[snd.session]
-    _snd = self.link_out(snd)
+    sst = self._attachments[snd.session]
+    _snd = self._attachments[snd]
 
     # XXX: what if subject is specified for a normal queue?
     if _snd._routing_key is None:
@@ -375,16 +710,16 @@
       rk = _snd._routing_key
     # XXX: do we need to query to figure out how to create the reply-to interoperably?
     if msg.reply_to:
-      rt = _ssn.reply_to(*parse_addr(msg.reply_to))
+      rt = addr2reply_to(msg.reply_to)
     else:
       rt = None
-    dp = _ssn.delivery_properties(routing_key=rk)
-    mp = _ssn.message_properties(message_id=msg.id,
-                                 user_id=msg.user_id,
-                                 reply_to=rt,
-                                 correlation_id=msg.correlation_id,
-                                 content_type=msg.content_type,
-                                 application_headers=msg.properties)
+    dp = DeliveryProperties(routing_key=rk)
+    mp = MessageProperties(message_id=msg.id,
+                           user_id=msg.user_id,
+                           reply_to=rt,
+                           correlation_id=msg.correlation_id,
+                           content_type=msg.content_type,
+                           application_headers=msg.properties)
     if msg.subject is not None:
       if mp.application_headers is None:
         mp.application_headers = {}
@@ -397,37 +732,42 @@
       dp.delivery_mode = delivery_mode.persistent
     enc, dec = get_codec(msg.content_type)
     body = enc(msg.content)
-    _ssn.message_transfer(destination=_snd._exchange,
-                          message=Message010(dp, mp, body),
-                          sync=True)
-    log.debug("SENT [%s] %s", snd.session, msg)
-    # XXX: really need to make this async so that we don't give up the lock
-    _ssn.sync()
-    # XXX: should we log the ack somehow too?
-    snd.acked += 1
+    def msg_acked():
+      # XXX: should we log the ack somehow too?
+      snd.acked += 1
+      m = snd.session.outgoing.pop(0)
+      sst.outgoing_idx -= 1
+      assert msg == m
+    sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
+                                   payload=body, sync=True), msg_acked)
+
+  def do_message_transfer(self, xfr):
+    sst = self.get_sst(xfr)
+    ssn = sst.session
 
-  @synchronized
-  def _message_transfer(self, ssn, cmd):
-    m = Message010(cmd.payload)
-    m.headers = cmd.headers
-    m.id = cmd.id
-    msg = self._decode(m)
-    rcv = ssn.receivers[int(cmd.destination)]
+    msg = self._decode(xfr)
+    rcv = ssn.receivers[int(xfr.destination)]
     msg._receiver = rcv
     if rcv.impending is not UNLIMITED:
-      assert rcv.received < rcv.impending
+      assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
     rcv.received += 1
     log.debug("RECV [%s] %s", ssn, msg)
     ssn.incoming.append(msg)
     self.connection._waiter.notifyAll()
-    return INCOMPLETE
 
-  def _decode(self, message):
-    dp = message.get("delivery_properties")
-    mp = message.get("message_properties")
+  def _decode(self, xfr):
+    dp = EMPTY_DP
+    mp = EMPTY_MP
+
+    for h in xfr.headers:
+      if isinstance(h, DeliveryProperties):
+        dp = h
+      elif isinstance(h, MessageProperties):
+        mp = h
+
     ap = mp.application_headers
     enc, dec = get_codec(mp.content_type)
-    content = dec(message.body)
+    content = dec(xfr.payload)
     msg = Message(content)
     msg.id = mp.message_id
     if ap is not None:
@@ -440,5 +780,5 @@
     msg.durable = dp.delivery_mode == delivery_mode.persistent
     msg.properties = mp.application_headers
     msg.content_type = mp.content_type
-    msg._transfer_id = message.id
+    msg._transfer_id = xfr.id
     return msg

Modified: qpid/branches/java-network-refactor/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid/messaging.py?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid/messaging.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid/messaging.py Sun Oct 11 23:22:08 2009
@@ -77,7 +77,8 @@
   """
 
   @static
-  def open(host, port=None):
+  def open(host, port=None, username="guest", password="guest",
+           mechanism="PLAIN", heartbeat=None, **options):
     """
     Creates an AMQP connection and connects it to the given host and port.
 
@@ -88,11 +89,12 @@
     @rtype: Connection
     @return: a connected Connection
     """
-    conn = Connection(host, port)
+    conn = Connection(host, port, username, password, mechanism, heartbeat, **options)
     conn.connect()
     return conn
 
-  def __init__(self, host, port=None):
+  def __init__(self, host, port=None, username="guest", password="guest",
+               mechanism="PLAIN", heartbeat=None, **options):
     """
     Creates a connection. A newly created connection must be connected
     with the Connection.connect() method before it can be started.
@@ -106,11 +108,16 @@
     """
     self.host = host
     self.port = default(port, AMQP_PORT)
+    self.username = username
+    self.password = password
+    self.mechanism = mechanism
+    self.heartbeat = heartbeat
+
     self.started = False
     self.id = str(uuid4())
     self.session_counter = 0
     self.sessions = {}
-    self.reconnect = False
+    self.reconnect = options.get("reconnect", False)
     self._connected = False
     self._lock = RLock()
     self._condition = Condition(self._lock)
@@ -230,9 +237,10 @@
     self.value = value
 
   # XXX: this should become part of the driver
-  def _bind(self, ssn, exchange, queue):
-    ssn.exchange_bind(exchange=exchange, queue=queue,
-                      binding_key=self.value.replace("*", "#"))
+  def _bind(self, sst, exchange, queue):
+    from qpid.ops import ExchangeBind
+    sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
+                               binding_key=self.value.replace("*", "#")))
 
 class SessionError(Exception):
   pass
@@ -282,6 +290,7 @@
     # XXX: I hate this name.
     self.ack_capacity = UNLIMITED
 
+    self.error = None
     self.closing = False
     self.closed = False
 
@@ -302,12 +311,16 @@
 
   def _check_error(self, exc=SessionError):
     self.connection._check_error(exc)
+    if self.error:
+      raise exc(*self.error)
 
   def _ewait(self, predicate, timeout=None, exc=SessionError):
-    return self.connection._ewait(predicate, timeout, exc)
+    result = self.connection._ewait(lambda: self.error or predicate(), timeout, exc)
+    self._check_error(exc)
+    return result
 
   @synchronized
-  def sender(self, target):
+  def sender(self, target, **options):
     """
     Creates a L{Sender} that may be used to send L{Messages<Message>}
     to the specified target.
@@ -317,7 +330,7 @@
     @rtype: Sender
     @return: a new Sender for the specified target
     """
-    sender = Sender(self, len(self.senders), target)
+    sender = Sender(self, len(self.senders), target, options)
     self.senders.append(sender)
     self._wakeup()
     # XXX: because of the lack of waiting here we can end up getting
@@ -327,7 +340,7 @@
     return sender
 
   @synchronized
-  def receiver(self, source, filter=None):
+  def receiver(self, source, **options):
     """
     Creates a receiver that may be used to actively fetch or to listen
     for the arrival of L{Messages<Message>} from the specified source.
@@ -337,7 +350,7 @@
     @rtype: Receiver
     @return: a new Receiver for the specified source
     """
-    receiver = Receiver(self, len(self.receivers), source, filter,
+    receiver = Receiver(self, len(self.receivers), source, options,
                         self.started)
     self.receivers.append(receiver)
     self._wakeup()
@@ -368,8 +381,8 @@
 
   @synchronized
   def _get(self, predicate, timeout=None):
-    if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing),
-                  timeout):
+    if self._ewait(lambda: ((self._peek(predicate) is not None) or self.closing),
+                   timeout):
       msg = self._pop(predicate)
       if msg is not None:
         msg._receiver.returned += 1
@@ -505,13 +518,18 @@
   Sends outgoing messages.
   """
 
-  def __init__(self, session, index, target):
+  def __init__(self, session, index, target, options):
     self.session = session
     self.index = index
     self.target = target
-    self.capacity = UNLIMITED
+    self.options = options
+    self.capacity = options.get("capacity", UNLIMITED)
+    self.durable = options.get("durable")
     self.queued = Serial(0)
     self.acked = Serial(0)
+    self.error = None
+    self.linked = False
+    self.closing = False
     self.closed = False
     self._lock = self.session._lock
 
@@ -520,9 +538,13 @@
 
   def _check_error(self, exc=SendError):
     self.session._check_error(exc)
+    if self.error:
+      raise exc(*self.error)
 
   def _ewait(self, predicate, timeout=None, exc=SendError):
-    return self.session._ewait(predicate, timeout, exc)
+    result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
+    self._check_error(exc)
+    return result
 
   @synchronized
   def pending(self):
@@ -558,11 +580,16 @@
     if not self.session.connection._connected or self.session.closing:
       raise Disconnected()
 
+    self._ewait(lambda: self.linked)
+
     if isinstance(object, Message):
       message = object
     else:
       message = Message(object)
 
+    if message.durable is None:
+      message.durable = self.durable
+
     if self.capacity is not UNLIMITED:
       if self.capacity <= 0:
         raise InsufficientCapacity("capacity = %s" % self.capacity)
@@ -573,15 +600,19 @@
     message._sender = self
     self.session.outgoing.append(message)
     self.queued += 1
-    mno = self.queued
 
     self._wakeup()
 
     if sync:
-      self._ewait(lambda: self.acked >= mno)
+      self.sync()
       assert message not in self.session.outgoing
 
   @synchronized
+  def sync(self):
+    mno = self.queued
+    self._ewait(lambda: self.acked >= mno)
+
+  @synchronized
   def close(self):
     """
     Close the Sender.
@@ -609,21 +640,23 @@
   L{listen}.
   """
 
-  def __init__(self, session, index, source, filter, started):
+  def __init__(self, session, index, source, options, started):
     self.session = session
     self.index = index
     self.destination = str(self.index)
     self.source = source
-    self.filter = filter
+    self.options = options
 
     self.started = started
-    self.capacity = UNLIMITED
+    self.capacity = options.get("capacity", UNLIMITED)
     self.granted = Serial(0)
-    self.drain = False
+    self.draining = False
     self.impending = Serial(0)
     self.received = Serial(0)
     self.returned = Serial(0)
 
+    self.error = None
+    self.linked = False
     self.closing = False
     self.closed = False
     self.listener = None
@@ -634,9 +667,13 @@
 
   def _check_error(self, exc=ReceiveError):
     self.session._check_error(exc)
+    if self.error:
+      raise exc(*self.error)
 
   def _ewait(self, predicate, timeout=None, exc=ReceiveError):
-    return self.session._ewait(predicate, timeout, exc)
+    result = self.session._ewait(lambda: self.error or predicate(), timeout, exc)
+    self._check_error(exc)
+    return result
 
   @synchronized
   def pending(self):
@@ -680,17 +717,18 @@
     @type timeout: float
     @param timeout: the time to wait for a message to be available
     """
+
+    self._ewait(lambda: self.linked)
+
     if self._capacity() == 0:
       self.granted = self.returned + 1
       self._wakeup()
     self._ewait(lambda: self.impending >= self.granted)
     msg = self.session._get(self._pred, timeout=timeout)
     if msg is None:
-      self.drain = True
-      self.granted = self.received
+      self.draining = True
       self._wakeup()
-      self._ewait(lambda: self.impending == self.received)
-      self.drain = False
+      self._ewait(lambda: not self.draining)
       self._grant()
       self._wakeup()
       msg = self.session._get(self._pred, timeout=0)
@@ -738,7 +776,7 @@
     self.closing = True
     self._wakeup()
     try:
-      self._ewait(lambda: self.closed)
+      self.session._ewait(lambda: self.closed)
     finally:
       self.session.receivers.remove(self)
 
@@ -778,6 +816,8 @@
 def get_codec(content_type):
   return TYPE_CODEC[content_type]
 
+UNSPECIFIED = object()
+
 class Message:
 
   """
@@ -802,7 +842,9 @@
   @ivar content: the message content
   """
 
-  def __init__(self, content=None):
+  def __init__(self, content=None, content_type=UNSPECIFIED, id=None,
+               subject=None, to=None, user_id=None, reply_to=None,
+               correlation_id=None, durable=None, properties=None):
     """
     Construct a new message with the supplied content. The
     content-type of the message will be automatically inferred from
@@ -810,20 +852,44 @@
 
     @type content: str, unicode, buffer, dict, list
     @param content: the message content
+
+    @type content_type: str
+    @param content_type: the content-type of the message
     """
-    self.id = None
-    self.subject = None
-    self.user_id = None
-    self.to = None
-    self.reply_to = None
-    self.correlation_id = None
-    self.durable = False
-    self.properties = {}
-    self.content_type = get_type(content)
+    self.id = id
+    self.subject = subject
+    self.to = to
+    self.user_id = user_id
+    self.reply_to = reply_to
+    self.correlation_id = correlation_id
+    self.durable = durable
+    if properties is None:
+      self.properties = {}
+    else:
+      self.properties = properties
+    if content_type is UNSPECIFIED:
+      self.content_type = get_type(content)
+    else:
+      self.content_type = content_type
     self.content = content
 
   def __repr__(self):
-    return "Message(%r)" % self.content
+    args = []
+    for name in ["id", "subject", "to", "user_id", "reply_to",
+                 "correlation_id"]:
+      value = self.__dict__[name]
+      if value is not None: args.append("%s=%r" % (name, value))
+    for name in ["durable", "properties"]:
+      value = self.__dict__[name]
+      if value: args.append("%s=%r" % (name, value))
+    if self.content_type != get_type(self.content):
+      args.append("content_type=%r" % self.content_type)
+    if self.content is not None:
+      if args:
+        args.append("content=%r" % self.content)
+      else:
+        args.append(repr(self.content))
+    return "Message(%s)" % ", ".join(args)
 
 __all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message",
            "ConnectionError", "ConnectError", "SessionError", "Disconnected",

Modified: qpid/branches/java-network-refactor/qpid/python/qpid/ops.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid/ops.py?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid/ops.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid/ops.py Sun Oct 11 23:22:08 2009
@@ -80,7 +80,7 @@
     return "%s(%s)" % (self.__class__.__name__,
                        ", ".join(["%s=%r" % (f.name, getattr(self, f.name))
                                   for f in self.ARGS
-                                  if getattr(self, f.name) is not f.default]))
+                                  if getattr(self, f.name) != f.default]))
 
 class Command(Compound):
   UNENCODED=[Field("channel", "uint16", 0),
@@ -209,8 +209,8 @@
 from qpid_config import amqp_spec as file
 pclfile = "%s.ops.pcl" % file
 
-if False and (os.path.exists(pclfile) and
-              os.path.getmtime(pclfile) > os.path.getmtime(file)):
+if os.path.exists(pclfile) and \
+      os.path.getmtime(pclfile) > os.path.getmtime(file):
   f = open(pclfile, "read")
   types = pickle.load(f)
   f.close()

Modified: qpid/branches/java-network-refactor/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid/tests/messaging.py?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid/tests/messaging.py Sun Oct 11 23:22:08 2009
@@ -24,7 +24,7 @@
 from qpid.tests import Test
 from qpid.harness import Skipped
 from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
-    InsufficientCapacity, Message, UNLIMITED, uuid4
+    InsufficientCapacity, Message, ReceiveError, SendError, UNLIMITED, uuid4
 from Queue import Queue, Empty as QueueEmpty
 
 class Base(Test):
@@ -50,6 +50,8 @@
       raise Skipped(e)
     self.ssn = self.setup_session()
     self.snd = self.setup_sender()
+    if self.snd is not None:
+      self.snd.durable = self.durable()
     self.rcv = self.setup_receiver()
 
   def teardown(self):
@@ -63,11 +65,12 @@
       return "%s[%s, %s]" % (base, count, self.test_id)
 
   def ping(self, ssn):
+    PING_Q = 'ping-queue {create: always}'
     # send a message
-    sender = ssn.sender("ping-queue")
+    sender = ssn.sender(PING_Q, durable=self.durable())
     content = self.content("ping")
     sender.send(content)
-    receiver = ssn.receiver("ping-queue")
+    receiver = ssn.receiver(PING_Q)
     msg = receiver.fetch(0)
     ssn.acknowledge()
     assert msg.content == content, "expected %r, got %r" % (content, msg.content)
@@ -97,16 +100,27 @@
   def delay(self):
     return float(self.config.defines.get("delay", "2"))
 
+  def get_bool(self, name):
+    return self.config.defines.get(name, "false").lower() in ("true", "yes", "1")
+
+  def durable(self):
+    return self.get_bool("durable")
+
+  def reconnect(self):
+    return self.get_bool("reconnect")
+
 class SetupTests(Base):
 
   def testOpen(self):
     # XXX: need to flesh out URL support/syntax
-    self.conn = Connection.open(self.broker.host, self.broker.port)
+    self.conn = Connection.open(self.broker.host, self.broker.port,
+                                reconnect=self.reconnect())
     self.ping(self.conn.session())
 
   def testConnect(self):
     # XXX: need to flesh out URL support/syntax
-    self.conn = Connection(self.broker.host, self.broker.port)
+    self.conn = Connection(self.broker.host, self.broker.port,
+                           reconnect=self.reconnect())
     self.conn.connect()
     self.ping(self.conn.session())
 
@@ -121,7 +135,8 @@
 class ConnectionTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port)
+    return Connection.open(self.broker.host, self.broker.port,
+                           reconnect=self.reconnect())
 
   def testSessionAnon(self):
     ssn1 = self.conn.session()
@@ -174,17 +189,21 @@
     self.conn.close()
     assert not self.conn.connected()
 
+ACK_Q = 'test-ack-queue {create: always}'
+
 class SessionTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port)
+    return Connection.open(self.broker.host, self.broker.port,
+                           reconnect=self.reconnect())
 
   def setup_session(self):
     return self.conn.session()
 
   def testSender(self):
-    snd = self.ssn.sender("test-snd-queue")
-    snd2 = self.ssn.sender(snd.target)
+    snd = self.ssn.sender('test-snd-queue {create: always}',
+                          durable=self.durable())
+    snd2 = self.ssn.sender(snd.target, durable=self.durable())
     assert snd is not snd2
     snd2.close()
 
@@ -196,47 +215,49 @@
     self.ssn.acknowledge(msg)
 
   def testReceiver(self):
-    rcv = self.ssn.receiver("test-rcv-queue")
+    rcv = self.ssn.receiver('test-rcv-queue {create: always}')
     rcv2 = self.ssn.receiver(rcv.source)
     assert rcv is not rcv2
     rcv2.close()
 
     content = self.content("testReceiver")
-    snd = self.ssn.sender(rcv.source)
+    snd = self.ssn.sender(rcv.source, durable=self.durable())
     snd.send(content)
     msg = rcv.fetch(0)
     assert msg.content == content
     self.ssn.acknowledge(msg)
 
   def testStart(self):
-    rcv = self.ssn.receiver("test-start-queue")
+    START_Q = 'test-start-queue {create: always}'
+    rcv = self.ssn.receiver(START_Q)
     assert not rcv.started
     self.ssn.start()
     assert rcv.started
-    rcv = self.ssn.receiver("test-start-queue")
+    rcv = self.ssn.receiver(START_Q)
     assert rcv.started
 
   def testStop(self):
+    STOP_Q = 'test-stop-queue {create: always}'
     self.ssn.start()
-    rcv = self.ssn.receiver("test-stop-queue")
+    rcv = self.ssn.receiver(STOP_Q)
     assert rcv.started
     self.ssn.stop()
     assert not rcv.started
-    rcv = self.ssn.receiver("test-stop-queue")
+    rcv = self.ssn.receiver(STOP_Q)
     assert not rcv.started
 
   # XXX, we need a convenient way to assert that required queues are
   # empty on setup, and possibly also to drain queues on teardown
   def ackTest(self, acker, ack_capacity=None):
     # send a bunch of messages
-    snd = self.ssn.sender("test-ack-queue")
+    snd = self.ssn.sender(ACK_Q, durable=self.durable())
     contents = [self.content("ackTest", i) for i in range(15)]
     for c in contents:
       snd.send(c)
 
     # drain the queue, verify the messages are there and then close
     # without acking
-    rcv = self.ssn.receiver(snd.target)
+    rcv = self.ssn.receiver(ACK_Q)
     self.drain(rcv, expected=contents)
     self.ssn.close()
 
@@ -245,7 +266,7 @@
     self.ssn = self.conn.session()
     if ack_capacity is not None:
       self.ssn.ack_capacity = ack_capacity
-    rcv = self.ssn.receiver("test-ack-queue")
+    rcv = self.ssn.receiver(ACK_Q)
     self.drain(rcv, expected=contents)
     acker(self.ssn)
     self.ssn.close()
@@ -253,7 +274,7 @@
     # drain the queue a final time and verify that the messages were
     # dequeued
     self.ssn = self.conn.session()
-    rcv = self.ssn.receiver("test-ack-queue")
+    rcv = self.ssn.receiver(ACK_Q)
     self.assertEmpty(rcv)
 
   def testAcknowledge(self):
@@ -271,7 +292,7 @@
         pass
     finally:
       self.ssn.ack_capacity = UNLIMITED
-      self.drain(self.ssn.receiver("test-ack-queue"))
+      self.drain(self.ssn.receiver(ACK_Q))
       self.ssn.acknowledge()
 
   def testAcknowledgeAsyncAckCap1(self):
@@ -284,7 +305,7 @@
     self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
 
   def send(self, ssn, queue, base, count=1):
-    snd = ssn.sender(queue)
+    snd = ssn.sender(queue, durable=self.durable())
     contents = []
     for i in range(count):
       c = self.content(base, i)
@@ -294,10 +315,12 @@
     return contents
 
   def txTest(self, commit):
+    TX_Q = 'test-tx-queue {create: always}'
+    TX_Q_COPY = 'test-tx-queue-copy {create: always}'
     txssn = self.conn.session(transactional=True)
-    contents = self.send(self.ssn, "test-tx-queue", "txTest", 3)
-    txrcv = txssn.receiver("test-tx-queue")
-    txsnd = txssn.sender("test-tx-queue-copy")
+    contents = self.send(self.ssn, TX_Q, "txTest", 3)
+    txrcv = txssn.receiver(TX_Q)
+    txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
     rcv = self.ssn.receiver(txrcv.source)
     copy_rcv = self.ssn.receiver(txsnd.target)
     self.assertEmpty(copy_rcv)
@@ -323,9 +346,10 @@
     self.txTest(False)
 
   def txTestSend(self, commit):
+    TX_SEND_Q = 'test-tx-send-queue {create: always}'
     txssn = self.conn.session(transactional=True)
-    contents = self.send(txssn, "test-tx-send-queue", "txTestSend", 3)
-    rcv = self.ssn.receiver("test-tx-send-queue")
+    contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
+    rcv = self.ssn.receiver(TX_SEND_Q)
     self.assertEmpty(rcv)
 
     if commit:
@@ -345,10 +369,11 @@
     self.txTestSend(False)
 
   def txTestAck(self, commit):
+    TX_ACK_Q = 'test-tx-ack-queue {create: always}'
     txssn = self.conn.session(transactional=True)
-    txrcv = txssn.receiver("test-tx-ack-queue")
+    txrcv = txssn.receiver(TX_ACK_Q)
     self.assertEmpty(txrcv)
-    contents = self.send(self.ssn, "test-tx-ack-queue", "txTestAck", 3)
+    contents = self.send(self.ssn, TX_ACK_Q, "txTestAck", 3)
     assert contents == self.drain(txrcv)
 
     if commit:
@@ -366,11 +391,11 @@
     txssn.close()
 
     txssn = self.conn.session(transactional=True)
-    txrcv = txssn.receiver("test-tx-ack-queue")
+    txrcv = txssn.receiver(TX_ACK_Q)
     assert contents == self.drain(txrcv)
     txssn.acknowledge()
     txssn.commit()
-    rcv = self.ssn.receiver("test-tx-ack-queue")
+    rcv = self.ssn.receiver(TX_ACK_Q)
     self.assertEmpty(rcv)
     txssn.close()
     self.assertEmpty(rcv)
@@ -389,19 +414,22 @@
     except Disconnected:
       pass
 
+RECEIVER_Q = 'test-receiver-queue {create: always}'
+
 class ReceiverTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port)
+    return Connection.open(self.broker.host, self.broker.port,
+                           reconnect=self.reconnect())
 
   def setup_session(self):
     return self.conn.session()
 
   def setup_sender(self):
-    return self.ssn.sender("test-receiver-queue")
+    return self.ssn.sender(RECEIVER_Q)
 
   def setup_receiver(self):
-    return self.ssn.receiver("test-receiver-queue")
+    return self.ssn.receiver(RECEIVER_Q)
 
   def send(self, base, count = None):
     content = self.content(base, count)
@@ -516,7 +544,7 @@
     self.assertPending(self.rcv, 5)
 
     drained = self.drain(self.rcv)
-    assert len(drained) == 10
+    assert len(drained) == 10, "%s, %s" % (len(drained), drained)
     self.assertPending(self.rcv, 0)
 
     self.ssn.acknowledge()
@@ -538,19 +566,81 @@
 
   # XXX: need testClose
 
+NOSUCH_Q = "this-queue-should-not-exist"
+UNPARSEABLE_ADDR = "{bad address}"
+UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
+
+class AddressErrorTests(Base):
+
+  def setup_connection(self):
+    return Connection.open(self.broker.host, self.broker.port,
+                           reconnect=self.reconnect())
+
+  def setup_session(self):
+    return self.conn.session()
+
+  def sendErrorTest(self, addr, exc, check=lambda e: True):
+    snd = self.ssn.sender(addr, durable=self.durable())
+    try:
+      snd.send("hello")
+      assert False, "send succeeded"
+    except exc, e:
+      assert check(e), "unexpected error: %s" % e
+      snd.close()
+
+  def fetchErrorTest(self, addr, exc, check=lambda e: True):
+    rcv = self.ssn.receiver(addr)
+    try:
+      rcv.fetch(timeout=0)
+      assert False, "fetch succeeded"
+    except exc, e:
+      assert check(e), "unexpected error: %s" % e
+      rcv.close()
+
+  def testNoTarget(self):
+    # XXX: should have specific exception for this
+    self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e))
+
+  def testNoSource(self):
+    # XXX: should have specific exception for this
+    self.fetchErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e))
+
+  def testUnparseableTarget(self):
+    # XXX: should have specific exception for this
+    self.sendErrorTest(UNPARSEABLE_ADDR, SendError,
+                       lambda e: "expecting ID" in str(e))
+
+  def testUnparseableSource(self):
+    # XXX: should have specific exception for this
+    self.fetchErrorTest(UNPARSEABLE_ADDR, ReceiveError,
+                        lambda e: "expecting ID" in str(e))
+
+  def testUnlexableTarget(self):
+    # XXX: should have specific exception for this
+    self.sendErrorTest(UNLEXABLE_ADDR, SendError,
+                       lambda e: "unrecognized character" in str(e))
+
+  def testUnlexableSource(self):
+    # XXX: should have specific exception for this
+    self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError,
+                        lambda e: "unrecognized character" in str(e))
+
+SENDER_Q = 'test-sender-q {create: always}'
+
 class SenderTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port)
+    return Connection.open(self.broker.host, self.broker.port,
+                           reconnect=self.reconnect())
 
   def setup_session(self):
     return self.conn.session()
 
   def setup_sender(self):
-    return self.ssn.sender("test-sender-queue")
+    return self.ssn.sender(SENDER_Q)
 
   def setup_receiver(self):
-    return self.ssn.receiver("test-sender-queue")
+    return self.ssn.receiver(SENDER_Q)
 
   def checkContent(self, content):
     self.snd.send(content)
@@ -611,6 +701,7 @@
       except InsufficientCapacity:
         caught = True
         break
+    self.snd.sync()
     self.drain(self.rcv, expected=msgs)
     self.ssn.acknowledge()
     assert caught, "did not exceed capacity"
@@ -643,19 +734,22 @@
     m.content = u"<html/>"
     assert m.content_type == "text/html; charset=utf8"
 
+ECHO_Q = 'test-message-echo-queue {create: always}'
+
 class MessageEchoTests(Base):
 
   def setup_connection(self):
-    return Connection.open(self.broker.host, self.broker.port)
+    return Connection.open(self.broker.host, self.broker.port,
+                           reconnect=self.reconnect())
 
   def setup_session(self):
     return self.conn.session()
 
   def setup_sender(self):
-    return self.ssn.sender("test-message-echo-queue")
+    return self.ssn.sender(ECHO_Q)
 
   def setup_receiver(self):
-    return self.ssn.receiver("test-message-echo-queue")
+    return self.ssn.receiver(ECHO_Q)
 
   def check(self, msg):
     self.snd.send(msg)

Modified: qpid/branches/java-network-refactor/qpid/wcf/ReadMe.txt
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/ReadMe.txt?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/ReadMe.txt (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/ReadMe.txt Sun Oct 11 23:22:08 2009
@@ -49,9 +49,9 @@
 qpid source code location e.g. C:\trunk\qpid
 
 5. Build Qpid cpp
-Run CMake and choose "%QPID_ROOT%\cpp\build" as the location for "Where to
-build the binaries".  Build at least the "qpidd", "qpidclient" and
-"qpidcommon" projects.
+Build at least the "qpidd", "qpidclient" and "qpidcommon" projects.
+Create an environment variable called QPID_BUILD_ROOT and store the 
+path to the Qpid build directory in it.
 
 
 4. Building the solution file
@@ -81,7 +81,7 @@
    %QPID_ROOT%\wcf\test\Apache\Qpid\Test\Channel\Functional\RunTests.bat has the correct
    values for the nunit_exe, qpid_dll_location and configuration_name variables as per
    your installation.
-2. Start the qpid broker from the qpid build folder e.g. %QPID_ROOT%\cpp\build\src\Debug.
+2. Start the qpid broker from the qpid build folder e.g. %QPID_BUILD_ROOT%\src\Debug.
 3. Execute RunTests.bat from its location e.g. %QPID_ROOT%\wcf\test\Apache\Qpid\Test\Channel\Functional.
 
 

Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs Sun Oct 11 23:22:08 2009
@@ -64,6 +64,12 @@
             set { transport.BrokerPort = value; }
         }
 
+        public int PrefetchLimit
+        {
+            get { return transport.PrefetchLimit; }
+            set { transport.PrefetchLimit = value; }
+        }
+
         public bool Shared
         {
             get { return transport.Shared; }

Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs Sun Oct 11 23:22:08 2009
@@ -63,6 +63,13 @@
             set { brokerPort = value; }
         }
 
+        [ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit, DefaultValue = false)]
+        public int PrefetchLimit
+        {
+            get { return (int)base[AmqpConfigurationStrings.PrefetchLimit]; }
+            set { base[AmqpConfigurationStrings.PrefetchLimit] = value; }
+        }
+
         [ConfigurationProperty(AmqpConfigurationStrings.Shared, DefaultValue = false)]
         public bool Shared
         {
@@ -95,6 +102,8 @@
             get
             {
                 ConfigurationPropertyCollection properties = base.Properties;
+                properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit,
+                    typeof(int), 0, null, null, ConfigurationPropertyOptions.None));
                 properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.Shared,
                     typeof(bool), false, null, null, ConfigurationPropertyOptions.None));
                 properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.TransferMode,
@@ -112,6 +121,7 @@
             this.BrokerPort = amqpBinding.BrokerPort;
             this.TransferMode = amqpBinding.TransferMode;
             this.Shared = amqpBinding.Shared;
+            this.PrefetchLimit = amqpBinding.PrefetchLimit;
 
             AmqpProperties props = amqpBinding.DefaultMessageProperties;
         }
@@ -133,6 +143,7 @@
             amqpBinding.BrokerPort = this.BrokerPort;
             amqpBinding.TransferMode = this.TransferMode;
             amqpBinding.Shared = this.Shared;
+            amqpBinding.PrefetchLimit = this.PrefetchLimit;
         }
 
         protected override void PostDeserialize()

Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs Sun Oct 11 23:22:08 2009
@@ -32,6 +32,7 @@
         AmqpChannelProperties channelProperties;
         long maxBufferPoolSize;
         bool shared;
+	int prefetchLimit;
 
         internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context)
             : base(context.Binding)
@@ -39,6 +40,7 @@
             this.bindingElement = bindingElement;
             this.channelProperties = bindingElement.ChannelProperties.Clone();
             this.shared = bindingElement.Shared;
+            this.prefetchLimit = bindingElement.PrefetchLimit;
             this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize;
             Collection<MessageEncodingBindingElement> messageEncoderBindingElements
                 = context.BindingParameters.FindAll<MessageEncodingBindingElement>();
@@ -91,7 +93,7 @@
 
         protected override TChannel OnCreateChannel(EndpointAddress remoteAddress, Uri via)
         {
-            return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared);
+            return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared, this.prefetchLimit);
         }
 
     }

Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs Sun Oct 11 23:22:08 2009
@@ -46,6 +46,7 @@
         public const string TransferMode = "transferMode";
         public const string Brokers = "brokers";
         public const string Shared = "shared";
+        public const string PrefetchLimit = "prefetchLimit";
         public const string MaxBufferPoolSize = "maxBufferPoolSize";
         public const string MaxReceivedMessageSize = "maxReceivedMessageSize";
     }
@@ -55,7 +56,6 @@
         internal const string BrokerHost = "localhost";
         internal const int BrokerPort = 5672;
         internal const TransferMode TransferMode = System.ServiceModel.TransferMode.Buffered;
-        internal const byte Priority = 4;
         internal const long MaxBufferPoolSize = 64 * 1024;
         internal const int MaxReceivedMessageSize = 5 * 1024 * 1024; //64 * 1024;
     }

Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs Sun Oct 11 23:22:08 2009
@@ -32,6 +32,7 @@
         AmqpTransportBindingElement bindingElement;
         AmqpChannelProperties channelProperties;
         bool shared;
+        int prefetchLimit;
         long maxBufferPoolSize;
         Uri uri;
         AmqpTransportChannel amqpTransportChannel;
@@ -45,6 +46,7 @@
             this.bindingElement = bindingElement;
             this.channelProperties = bindingElement.ChannelProperties.Clone();
             this.shared = bindingElement.Shared;
+            this.prefetchLimit = bindingElement.PrefetchLimit;
 
             this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize;
 
@@ -132,7 +134,7 @@
             {
                 amqpTransportChannel = new AmqpTransportChannel(this, this.channelProperties,
                         new EndpointAddress(uri), messageEncoderFactory.Encoder,
-                        maxBufferPoolSize, this.shared);
+                        maxBufferPoolSize, this.shared, this.prefetchLimit);
                 return (IInputChannel)(object) amqpTransportChannel;
             }
 

Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs Sun Oct 11 23:22:08 2009
@@ -29,6 +29,7 @@
     {
         AmqpChannelProperties channelProperties;
         bool shared;
+	int prefetchLimit;
 
         public AmqpTransportBindingElement()
         {
@@ -41,6 +42,7 @@
         {
             this.channelProperties = other.channelProperties.Clone();
             this.shared = other.shared;
+            this.prefetchLimit = other.prefetchLimit;
         }
 
         public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
@@ -98,6 +100,12 @@
             set { this.channelProperties.BrokerPort = value; }
         }
 
+        public int PrefetchLimit
+        {
+            get { return this.prefetchLimit; }
+            set { this.prefetchLimit = value; }
+        }
+
         public bool Shared
         {
             get { return this.shared; }

Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs Sun Oct 11 23:22:08 2009
@@ -50,6 +50,7 @@
         private MessageEncoder encoder;
         private AmqpChannelProperties factoryChannelProperties;
         private bool shared;
+        private int prefetchLimit;
         private string encoderContentType;
 
         // input = 0-10 queue, output = 0-10 exchange
@@ -68,7 +69,7 @@
         private AsyncTimeSpanCaller asyncOpenCaller;
         private AsyncTimeSpanCaller asyncCloseCaller;
 
-        internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection)
+        internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection, int prefetchLimit)
             : base(factory)
         {
             this.isInputChannel = (factory is ChannelListenerBase) || (factory is AmqpChannelFactory<IInputChannel>);
@@ -80,6 +81,7 @@
 
             this.factoryChannelProperties = channelProperties;
             this.shared = sharedConnection;
+            this.prefetchLimit = prefetchLimit;
             this.remoteAddress = remoteAddress;
 
             // pull out host, port, queue, and connection arguments
@@ -128,6 +130,7 @@
             if (this.isInputChannel)
             {
                 this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, this.queueName);
+                this.inputLink.PrefetchLimit = this.prefetchLimit;
             }
             else
             {
@@ -287,7 +290,7 @@
 
             return false;
         }
- 
+
         public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
         {
             return this.inputLink.BeginTryReceive(timeout, callback, state);
@@ -464,7 +467,7 @@
             }
             return amqpMessage;
         }
-     
+
 
         private Message QpidToWcf(AmqpMessage amqpMessage)
         {
@@ -531,7 +534,7 @@
                 {
                     this.bufferManager.ReturnBuffer(managedBuffer);
                 }
-            }  
+            }
 
             return wcfMessage;
         }

Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp Sun Oct 11 23:22:08 2009
@@ -63,11 +63,12 @@
 	sessionp = new qpid::client::AsyncSession;
 	*sessionp = qpidConnectionp->newSession();
 	subs_mgrp = new SubscriptionManager (*sessionp);
-	success = true;
 	waiters = gcnew Collections::Generic::List<CompletionWaiter^>();
+	success = true;
     } finally {
         if (!success) {
  	    Cleanup();
+	    // TODO: include inner exception information
 	    throw gcnew QpidException ("session creation failure");
 	}
     }
@@ -76,12 +77,6 @@
 
 void AmqpSession::Cleanup()
 {
-    if (subscriptionp != NULL) {
-        subscriptionp->cancel();
-	delete subscriptionp;
-	subscriptionp=NULL;
-    }
-
     if (subs_mgrp != NULL) {
 	subs_mgrp->stop();
 	delete subs_mgrp;
@@ -112,6 +107,7 @@
 
 void AmqpSession::ConnectionClosed()
 {
+    lock l(waiters);
     Cleanup();
 }
 
@@ -283,5 +279,27 @@
     }
 }
 
+bool AmqpSession::MessageStop(Completion &comp, std::string &name)
+{
+    lock l(waiters);
+
+    if (sessionp == NULL)
+	return false;
+
+    comp = sessionp->messageStop(name, true);
+    return true;
+}
+
+void AmqpSession::AcceptAndComplete(SequenceSet& transfers)
+{
+    lock l(waiters);
+
+    if (sessionp == NULL)
+	throw gcnew ObjectDisposedException("Accept");
+
+    sessionp->markCompleted(transfers, false);
+    sessionp->messageAccept(transfers, false);
+}
+
 
 }}} // namespace Apache::Qpid::Cli

Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h Sun Oct 11 23:22:08 2009
@@ -44,7 +44,6 @@
     AsyncSession* sessionp;
     SessionImpl* sessionImplp;
     SubscriptionManager* subs_mgrp;
-    Subscription* subscriptionp;
     LocalQueue* localQueuep;
     Collections::Generic::List<CompletionWaiter^>^ waiters;
     bool helperRunning;
@@ -69,6 +68,8 @@
     void ConnectionClosed();
     void internalWaitForCompletion(IntPtr Future);
     void removeWaiter(CompletionWaiter^ waiter);
+    bool MessageStop(Completion &comp, std::string &name);
+    void AcceptAndComplete(SequenceSet& transfers);
 
     property AmqpConnection^ Connection {
 	AmqpConnection^ get () { return connection; }

Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h Sun Oct 11 23:22:08 2009
@@ -32,7 +32,6 @@
     bool timedOut;
     // has an owner thread
     bool assigned;
-    // can Run (i.e. earlier CompletionWaiters in the queue have completed)
     System::Exception^ runException;
     AsyncCallback^ asyncCallback;
     Threading::Timer ^timer;

Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp Sun Oct 11 23:22:08 2009
@@ -59,6 +59,12 @@
 // with proposed changes to the native library to reduce the number of servicing
 // threads for large numbers of subscriptions.
 
+// synchronization is accomplished with locks, but also by ensuring that only one
+// MessageWaiter (the one at the front of the line) is ever active.  
+// async threads to watch for: Close/finalizer, Timers, SyncCredit and the native Dispatch
+// thread (who deposits FrameSets into the local queue and is oblivious to the
+// managed space locks).
+
 
 // The folowing def must match the "Frames" private typedef.
 // TODO, make Qpid-cpp "Frames" definition visible.
@@ -94,6 +100,8 @@
 	localQueuep = new LocalQueue;
 	SubscriptionSettings settings;
 	settings.flowControl = FlowControl::messageCredit(0);
+	settings.completionMode = CompletionMode::MANUAL_COMPLETION;
+
 	Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings);
 	subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup
 
@@ -197,6 +205,7 @@
 IntPtr InputLink::nextLocalMessage()
 {
     lock l(waiters);
+
     if (disposed)
 	return (IntPtr) NULL;
 
@@ -279,8 +288,7 @@
 	if (haveMessage())
 	    return true;
 
-	// TODO: prefetch window of messages, compatible with both 0-10 and 1.0.
-	subscriptionp->grantMessageCredit(1);
+	AdjustCredit();
 
 	// get a scoped smart ptr ref to guard against async close or hangup
 	demuxQueuePtr = *queuePtrp;
@@ -350,7 +358,12 @@
 	}
 	return;
     }
+
     waiters->RemoveAt(idx);
+    if (waiter->TimedOut) {
+	// may have to give back message if it arrives momentarily
+	AdjustCredit();
+    }
 
     // let the next waiter know it's his turn. 
     if (waiters->Count > 0) {
@@ -411,6 +424,129 @@
 }
 
 
+void InputLink::PrefetchLimit::set(int value)
+{
+    lock l(waiters);
+    prefetchLimit = value;
+
+    int delta = 0;
+
+    // rough rule of thumb to keep the flow, but reduce chatter.
+    // for small messages, the credit request is almost as expensive as the transfer itself.
+    // experience may suggest a better heuristic or require a property for the low water mark
+    if (prefetchLimit >= 3) {
+	delta = prefetchLimit / 3;
+    }
+    minWorkingCredit = prefetchLimit - delta;
+    AdjustCredit();
+}
+
+
+// call with lock held
+void InputLink::AdjustCredit()
+{
+    if (creditSyncPending || disposed)
+	return;
+
+    // low watermark check
+    if ((prefetchLimit != 0) &&
+	(workingCredit >= minWorkingCredit) &&
+	(workingCredit >= waiters->Count))
+	return;
+
+    // should have enough for all waiters or to satisfy the prefetch window
+    int targetCredit = waiters->Count;
+    if (targetCredit < prefetchLimit)
+	targetCredit = prefetchLimit;
+
+    if (targetCredit > workingCredit) {
+	subscriptionp->grantMessageCredit(targetCredit - workingCredit);
+	workingCredit = targetCredit;
+	return;
+    }
+    if (targetCredit < workingCredit) {
+	if ((targetCredit == 0) && (prefetchLimit == 0)) {
+	    creditSyncPending = true;
+	    ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &InputLink::SyncCredit));
+	}
+	// TODO: also shrink credit when prefetchLimit != 0
+    }
+}
+
+void InputLink::SyncCredit(Object ^unused)
+{
+    lock l(waiters);
+
+    try {
+	if (disposed)
+	    return;
+
+	Completion comp;
+	if (!amqpSession->MessageStop(comp, subscriptionp->getName())) {
+	    // connection closed
+	    return;
+	}
+
+	// get a private scoped copy to use outside the lock
+	Subscription s(*subscriptionp);
+
+	l.release();
+	// use setFlowControl to re-enable credit flow on the broker.
+	// previously used comp.wait() here, but setFlowControl is a sync operation
+	s.setFlowControl(s.getSettings().flowControl);
+	l.acquire();
+
+	if (disposed)
+	    return;
+
+	// let existing waiters use up any
+	// local queue size can only decrease until more credit is issued
+	while (true) {
+	    if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) {
+		l.release();
+		// a rare use case and not used in performance oriented code.
+		// optimization can wait until the qpid/messaging api is used
+		Thread::Sleep(10);
+		l.acquire();
+		if (disposed)
+		    return;
+	    }
+	    else {
+		break;
+	    }
+	}
+
+	// At this point, the lock is held and we are fully synced with the broker
+	// so we have a valid snapshot 
+
+	if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) {
+	    // can't be sure application will request a message again any time soon
+	    QpidFrameSetPtr frameSetp;
+	    while (!(*queuePtrp)->empty()) {
+		(*queuePtrp)->pop(frameSetp);
+		SequenceSet frameSetID(frameSetp->getId());
+		subscriptionp->release(frameSetID);
+	    }
+
+	    // don't touch dequeuedFrameSetpp.  It is spoken for: explicitely from a
+	    // MessageWaiter about to to get the nextLocalMessage(), or implicitely
+	    // from a WaitForMessage().
+	}
+	// TODO: if prefetchLimit != 0, release messages from back of the queue that exceed targetCredit
+
+	workingCredit = (*queuePtrp)->size();
+	if (dequeuedFrameSetpp != NULL) {
+	    workingCredit++;
+	}
+    }
+    finally {
+	creditSyncPending = false;
+    }
+
+    AdjustCredit();
+}
+
+
 AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
 {
     QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer();
@@ -539,7 +675,15 @@
 
 	// We have a message we can return to the caller.
 	// Tell the broker we got it.
-	subscriptionp->accept(frameSetID);
+	
+	// subscriptionp->accept(frameSetID) is a slow sync operation in the native API
+	// so do it within the AsyncSession directly
+	amqpSession->AcceptAndComplete(frameSetID);
+
+	workingCredit--;
+	// check if more messages need to be requested from broker
+	AdjustCredit();
+
 	return amqpMessage;
     }
     finally {

Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h Sun Oct 11 23:22:08 2009
@@ -47,6 +47,14 @@
     bool finalizing;
     QpidFrameSetPtr* dequeuedFrameSetpp;
     ManualResetEvent^ asyncHelperWaitHandle;
+    // number of messages to buffer locally for future consumption
+    int prefetchLimit;
+    // the number of messages requested and not yet processed
+    int workingCredit;
+    // stopping and restarting the message flow
+    bool creditSyncPending;
+    // working credit low water mark
+    int minWorkingCredit;
 
     void Cleanup();
     void ReleaseNative();
@@ -54,6 +62,8 @@
     void addWaiter(MessageWaiter^ waiter);
     void asyncHelper();
     AmqpMessage^ createAmqpMessage(IntPtr msgp);
+    void AdjustCredit();
+    void SyncCredit(Object ^);
 
 internal:
     InputLink(AmqpSession^ session, System::String^ sourceQueue, qpid::client::AsyncSession *qpidSessionp,
@@ -80,6 +90,11 @@
     IAsyncResult^ BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state);
     bool EndWaitForMessage(IAsyncResult^ result);
 
+    property int PrefetchLimit {
+	int get () { return prefetchLimit; }
+	void set (int value);
+    }
+
 };
 
 }}} // namespace Apache::Qpid::Interop

Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj Sun Oct 11 23:22:08 2009
@@ -65,7 +65,7 @@
 				Name="VCCLCompilerTool"
 				AdditionalOptions="/FU Debug\Apache.Qpid.AmqpTypes.netmodule"
 				Optimization="0"
-				AdditionalIncludeDirectories="..\..\..\..\..\cpp\build\include;..\..\..\..\..\cpp\build\src;..\..\..\..\..\cpp\include;..\..\..\..\..\cpp\src;&quot;$(BOOST_ROOT)&quot;"
+				AdditionalIncludeDirectories="&quot;$(QPID_BUILD_ROOT)\include&quot;;&quot;$(QPID_BUILD_ROOT)\src&quot;;..\..\..\..\..\cpp\include;..\..\..\..\..\cpp\src;&quot;$(BOOST_ROOT)&quot;"
 				PreprocessorDefinitions="WIN32;_DEBUG;_CRT_NONSTDC_NO_WARNINGS;WIN32_LEAN_AND_MEAN;NOMINMAX;_SCL_SECURE_NO_WARNINGS;BOOST_ALL_DYN_LINK"
 				RuntimeLibrary="3"
 				UsePrecompiledHeader="0"
@@ -83,7 +83,7 @@
 			/>
 			<Tool
 				Name="VCLinkerTool"
-				AdditionalOptions="..\..\..\..\..\cpp\build\src\Debug\qpidcommon.lib ..\..\..\..\..\cpp\build\src\Debug\qpidclient.lib Debug\Apache.Qpid.AmqpTypes.netmodule"
+				AdditionalOptions="$(QPID_BUILD_ROOT)\src\Debug\qpidcommond.lib $(QPID_BUILD_ROOT)\src\Debug\qpidclientd.lib Debug\Apache.Qpid.AmqpTypes.netmodule"
 				AdditionalDependencies="$(NoInherit)"
 				OutputFile="$(OutDir)\Apache.Qpid.Interop.dll"
 				LinkIncremental="2"

Modified: qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h Sun Oct 11 23:22:08 2009
@@ -66,7 +66,6 @@
     void Activate();
     void WaitForCompletion();
 
-//    inline void SetCompletedSynchronously (bool v) { completedSynchronously = v; }
 
     property IntPtr Message {
 	IntPtr get () { 
@@ -78,7 +77,6 @@
 	    GC::SuppressFinalize(this);
 	    return v;
 	}
-	// void set (IntPtr v) { message = v; }
     }
 
     property bool Assigned {
@@ -89,11 +87,11 @@
 	bool get () { return timedOut; }
     }
 
-
     property System::Exception^ RunException {
 	System::Exception^ get() { return runException; }
     }
 
+
  public:
 
     virtual property bool IsCompleted {

Modified: qpid/branches/java-network-refactor/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat Sun Oct 11 23:22:08 2009
@@ -19,15 +19,15 @@
 
 
 set nunit_exe=%programfiles%\NUnit 2.5.1\bin\net-2.0\nunit-console.exe
-set qpid_dll_location=..\..\..\..\..\..\..\cpp\build\src\Debug
+set qpid_dll_location=%QPID_BUILD_ROOT%\src\Debug
 set configuration_name=bin\Debug
 set qcreate_location=..\..\..\..\..\..\tools\QCreate\Debug
 
-copy %qpid_dll_location%\qpidclient.dll %configuration_name%
-copy %qpid_dll_location%\qpidcommon.dll %configuration_name%
+copy %qpid_dll_location%\qpidclientd.dll %configuration_name%
+copy %qpid_dll_location%\qpidcommond.dll %configuration_name%
 
-copy %qpid_dll_location%\qpidclient.dll %qcreate_location%
-copy %qpid_dll_location%\qpidcommon.dll %qcreate_location%
+copy %qpid_dll_location%\qpidclientd.dll %qcreate_location%
+copy %qpid_dll_location%\qpidcommond.dll %qcreate_location%
 
 %qcreate_location%\QCreate.exe amq.direct routing_key message_queue
 

Modified: qpid/branches/java-network-refactor/qpid/wcf/tools/QCreate/QCreate.vcproj
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/wcf/tools/QCreate/QCreate.vcproj?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/wcf/tools/QCreate/QCreate.vcproj (original)
+++ qpid/branches/java-network-refactor/qpid/wcf/tools/QCreate/QCreate.vcproj Sun Oct 11 23:22:08 2009
@@ -61,7 +61,7 @@
 			<Tool
 				Name="VCCLCompilerTool"
 				Optimization="0"
-				AdditionalIncludeDirectories="&quot;$(BOOST_ROOT)\include\$(BOOST_VERSION)&quot;;&quot;$(BOOST_ROOT)\.&quot;;..\..\..\cpp\include;..\..\..\cpp\build\include"
+				AdditionalIncludeDirectories="&quot;$(BOOST_ROOT)\include\$(BOOST_VERSION)&quot;;&quot;$(BOOST_ROOT)\.&quot;;..\..\..\cpp\include;&quot;$(QPID_BUILD_ROOT)\include&quot;"
 				PreprocessorDefinitions="WIN32;_DEBUG;_CONSOLE"
 				MinimalRebuild="true"
 				BasicRuntimeChecks="3"
@@ -81,9 +81,9 @@
 			/>
 			<Tool
 				Name="VCLinkerTool"
-				AdditionalDependencies="qpidcommon.lib qpidclient.lib"
+				AdditionalDependencies="qpidcommond.lib qpidclientd.lib"
 				LinkIncremental="2"
-				AdditionalLibraryDirectories=".;&quot;$(BOOST_ROOT)\lib&quot;;..\..\..\cpp\build\src\Debug"
+				AdditionalLibraryDirectories=".;&quot;$(BOOST_ROOT)\lib&quot;;&quot;$(QPID_BUILD_ROOT)\src\Debug&quot;"
 				GenerateDebugInformation="true"
 				SubSystem="1"
 				TargetMachine="1"



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org