You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/11/14 02:12:54 UTC

svn commit: r836085 - in /qpid/trunk/qpid/python/qpid: messaging.py tests/messaging.py

Author: rhs
Date: Sat Nov 14 01:12:54 2009
New Revision: 836085

URL: http://svn.apache.org/viewvc?rev=836085&view=rev
Log:
removed start/stop

Modified:
    qpid/trunk/qpid/python/qpid/messaging.py
    qpid/trunk/qpid/python/qpid/tests/messaging.py

Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=836085&r1=836084&r2=836085&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Sat Nov 14 01:12:54 2009
@@ -97,7 +97,7 @@
                mechanism="PLAIN", heartbeat=None, **options):
     """
     Creates a connection. A newly created connection must be connected
-    with the Connection.connect() method before it can be started.
+    with the Connection.connect() method before it can be used.
 
     @type host: str
     @param host: the name or ip address of the remote host
@@ -113,7 +113,6 @@
     self.mechanism = mechanism
     self.heartbeat = heartbeat
 
-    self.started = False
     self.id = str(uuid4())
     self.session_counter = 0
     self.sessions = {}
@@ -166,7 +165,7 @@
     if self.sessions.has_key(name):
       return self.sessions[name]
     else:
-      ssn = Session(self, name, self.started, transactional=transactional)
+      ssn = Session(self, name, transactional)
       self.sessions[name] = ssn
       self._wakeup()
       return ssn
@@ -201,24 +200,6 @@
     return self._connected
 
   @synchronized
-  def start(self):
-    """
-    Start incoming message delivery for all sessions.
-    """
-    self.started = True
-    for ssn in self.sessions.values():
-      ssn.start()
-
-  @synchronized
-  def stop(self):
-    """
-    Stop incoming message deliveries for all sessions.
-    """
-    for ssn in self.sessions.values():
-      ssn.stop()
-    self.started = False
-
-  @synchronized
   def close(self):
     """
     Close the connection and all sessions.
@@ -269,10 +250,9 @@
   messages, and manage various Senders and Receivers.
   """
 
-  def __init__(self, connection, name, started, transactional):
+  def __init__(self, connection, name, transactional):
     self.connection = connection
     self.name = name
-    self.started = started
 
     self.transactional = transactional
 
@@ -346,8 +326,7 @@
     @rtype: Receiver
     @return: a new Receiver for the specified source
     """
-    receiver = Receiver(self, len(self.receivers), source, options,
-                        self.started)
+    receiver = Receiver(self, len(self.receivers), source, options)
     self.receivers.append(receiver)
     self._wakeup()
     return receiver
@@ -453,24 +432,6 @@
     assert self.aborted
 
   @synchronized
-  def start(self):
-    """
-    Start incoming message delivery for the session.
-    """
-    self.started = True
-    for rcv in self.receivers:
-      rcv.start()
-
-  @synchronized
-  def stop(self):
-    """
-    Stop incoming message delivery for the session.
-    """
-    for rcv in self.receivers:
-      rcv.stop()
-    self.started = False
-
-  @synchronized
   def close(self):
     """
     Close the session.
@@ -611,22 +572,20 @@
   """
   pass
 
-class Receiver:
+class Receiver(object):
 
   """
   Receives incoming messages from a remote source. Messages may be
   fetched with L{fetch}.
   """
 
-  def __init__(self, session, index, source, options, started):
+  def __init__(self, session, index, source, options):
     self.session = session
     self.index = index
     self.destination = str(self.index)
     self.source = source
     self.options = options
 
-    self.started = started
-    self.capacity = options.get("capacity", UNLIMITED)
     self.granted = Serial(0)
     self.draining = False
     self.impending = Serial(0)
@@ -638,6 +597,26 @@
     self.closing = False
     self.closed = False
     self._lock = self.session._lock
+    self._capacity = 0
+    self._set_capacity(options.get("capacity", 0), False)
+
+  @synchronized
+  def _set_capacity(self, c, wakeup=True):
+    if c is UNLIMITED:
+      self._capacity = c.value
+    else:
+      self._capacity = c
+    self._grant()
+    if wakeup:
+      self._wakeup()
+
+  def _get_capacity(self):
+    if self._capacity == UNLIMITED.value:
+      return UNLIMITED
+    else:
+      return self._capacity
+
+  capacity = property(_get_capacity, _set_capacity)
 
   def _wakeup(self):
     self.session._wakeup()
@@ -663,14 +642,6 @@
     """
     return self.received - self.returned
 
-  def _capacity(self):
-    if not self.started:
-      return 0
-    elif self.capacity is UNLIMITED:
-      return self.capacity.value
-    else:
-      return self.capacity
-
   def _pred(self, msg):
     return msg._receiver == self
 
@@ -687,7 +658,7 @@
 
     self._ewait(lambda: self.linked)
 
-    if self._capacity() == 0:
+    if self._capacity == 0:
       self.granted = self.returned + 1
       self._wakeup()
     self._ewait(lambda: self.impending >= self.granted)
@@ -701,39 +672,16 @@
       msg = self.session._get(self._pred, timeout=0)
       if msg is None:
         raise Empty()
-    elif self._capacity() not in (0, UNLIMITED.value):
+    elif self._capacity not in (0, UNLIMITED.value):
       self.granted += 1
       self._wakeup()
     return msg
 
   def _grant(self):
-    if self.started:
-      if self.capacity is UNLIMITED:
-        self.granted = UNLIMITED
-      else:
-        self.granted = self.received + self._capacity()
+    if self._capacity == UNLIMITED.value:
+      self.granted = UNLIMITED
     else:
-      self.granted = self.received
-
-
-  @synchronized
-  def start(self):
-    """
-    Start incoming message delivery for this receiver.
-    """
-    self.started = True
-    self._grant()
-    self._wakeup()
-
-  @synchronized
-  def stop(self):
-    """
-    Stop incoming message delivery for this receiver.
-    """
-    self.started = False
-    self._grant()
-    self._wakeup()
-    self._ewait(lambda: self.impending == self.received)
+      self.granted = self.received + self._capacity
 
   @synchronized
   def close(self):

Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=836085&r1=836084&r2=836085&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Sat Nov 14 01:12:54 2009
@@ -169,23 +169,6 @@
     self.conn.connect()
     self.ping(ssn)
 
-  def testStart(self):
-    ssn = self.conn.session()
-    assert not ssn.started
-    self.conn.start()
-    assert ssn.started
-    ssn2 = self.conn.session()
-    assert ssn2.started
-
-  def testStop(self):
-    self.conn.start()
-    ssn = self.conn.session()
-    assert ssn.started
-    self.conn.stop()
-    assert not ssn.started
-    ssn2 = self.conn.session()
-    assert not ssn2.started
-
   def testClose(self):
     self.conn.close()
     assert not self.conn.connected()
@@ -234,9 +217,6 @@
     rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
     rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
 
-    # XXX: this won't work if it is before the receiver creation
-    self.ssn.start()
-
     snd = self.ssn.sender(ADDR)
 
     msgs = []
@@ -257,25 +237,6 @@
     assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched)
     self.ssn.acknowledge()
 
-  def testStart(self):
-    START_Q = 'test-start-queue; {create: always}'
-    rcv = self.ssn.receiver(START_Q)
-    assert not rcv.started
-    self.ssn.start()
-    assert rcv.started
-    rcv = self.ssn.receiver(START_Q)
-    assert rcv.started
-
-  def testStop(self):
-    STOP_Q = 'test-stop-queue; {create: always}'
-    self.ssn.start()
-    rcv = self.ssn.receiver(STOP_Q)
-    assert rcv.started
-    self.ssn.stop()
-    assert not rcv.started
-    rcv = self.ssn.receiver(STOP_Q)
-    assert not rcv.started
-
   # XXX, we need a convenient way to assert that required queues are
   # empty on setup, and possibly also to drain queues on teardown
   def ackTest(self, acker, ack_capacity=None):
@@ -491,11 +452,11 @@
     assert msg.content == three
     self.ssn.acknowledge()
 
-  def testStart(self):
-    content = self.send("testStart")
+  def testCapacityIncrease(self):
+    content = self.send("testCapacityIncrease")
     self.sleep()
     assert self.rcv.pending() == 0
-    self.rcv.start()
+    self.rcv.capacity = UNLIMITED
     self.sleep()
     assert self.rcv.pending() == 1
     msg = self.rcv.fetch(0)
@@ -503,17 +464,17 @@
     assert self.rcv.pending() == 0
     self.ssn.acknowledge()
 
-  def testStop(self):
-    self.rcv.start()
-    one = self.send("testStop", 1)
+  def testCapacityDecrease(self):
+    self.rcv.capacity = UNLIMITED
+    one = self.send("testCapacityDecrease", 1)
     self.sleep()
     assert self.rcv.pending() == 1
     msg = self.rcv.fetch(0)
     assert msg.content == one
 
-    self.rcv.stop()
+    self.rcv.capacity = 0
 
-    two = self.send("testStop", 2)
+    two = self.send("testCapacityDecrease", 2)
     self.sleep()
     assert self.rcv.pending() == 0
     msg = self.rcv.fetch(0)
@@ -522,7 +483,7 @@
     self.ssn.acknowledge()
 
   def testPending(self):
-    self.rcv.start()
+    self.rcv.capacity = UNLIMITED
     assert self.rcv.pending() == 0
 
     for i in range(3):
@@ -545,7 +506,6 @@
 
   def testCapacity(self):
     self.rcv.capacity = 5
-    self.rcv.start()
     self.assertPending(self.rcv, 0)
 
     for i in range(15):
@@ -565,7 +525,6 @@
 
   def testCapacityUNLIMITED(self):
     self.rcv.capacity = UNLIMITED
-    self.rcv.start()
     self.assertPending(self.rcv, 0)
 
     for i in range(10):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org