You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/11/14 02:12:54 UTC
svn commit: r836085 - in /qpid/trunk/qpid/python/qpid: messaging.py
tests/messaging.py
Author: rhs
Date: Sat Nov 14 01:12:54 2009
New Revision: 836085
URL: http://svn.apache.org/viewvc?rev=836085&view=rev
Log:
removed start/stop
Modified:
qpid/trunk/qpid/python/qpid/messaging.py
qpid/trunk/qpid/python/qpid/tests/messaging.py
Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=836085&r1=836084&r2=836085&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Sat Nov 14 01:12:54 2009
@@ -97,7 +97,7 @@
mechanism="PLAIN", heartbeat=None, **options):
"""
Creates a connection. A newly created connection must be connected
- with the Connection.connect() method before it can be started.
+ with the Connection.connect() method before it can be used.
@type host: str
@param host: the name or ip address of the remote host
@@ -113,7 +113,6 @@
self.mechanism = mechanism
self.heartbeat = heartbeat
- self.started = False
self.id = str(uuid4())
self.session_counter = 0
self.sessions = {}
@@ -166,7 +165,7 @@
if self.sessions.has_key(name):
return self.sessions[name]
else:
- ssn = Session(self, name, self.started, transactional=transactional)
+ ssn = Session(self, name, transactional)
self.sessions[name] = ssn
self._wakeup()
return ssn
@@ -201,24 +200,6 @@
return self._connected
@synchronized
- def start(self):
- """
- Start incoming message delivery for all sessions.
- """
- self.started = True
- for ssn in self.sessions.values():
- ssn.start()
-
- @synchronized
- def stop(self):
- """
- Stop incoming message deliveries for all sessions.
- """
- for ssn in self.sessions.values():
- ssn.stop()
- self.started = False
-
- @synchronized
def close(self):
"""
Close the connection and all sessions.
@@ -269,10 +250,9 @@
messages, and manage various Senders and Receivers.
"""
- def __init__(self, connection, name, started, transactional):
+ def __init__(self, connection, name, transactional):
self.connection = connection
self.name = name
- self.started = started
self.transactional = transactional
@@ -346,8 +326,7 @@
@rtype: Receiver
@return: a new Receiver for the specified source
"""
- receiver = Receiver(self, len(self.receivers), source, options,
- self.started)
+ receiver = Receiver(self, len(self.receivers), source, options)
self.receivers.append(receiver)
self._wakeup()
return receiver
@@ -453,24 +432,6 @@
assert self.aborted
@synchronized
- def start(self):
- """
- Start incoming message delivery for the session.
- """
- self.started = True
- for rcv in self.receivers:
- rcv.start()
-
- @synchronized
- def stop(self):
- """
- Stop incoming message delivery for the session.
- """
- for rcv in self.receivers:
- rcv.stop()
- self.started = False
-
- @synchronized
def close(self):
"""
Close the session.
@@ -611,22 +572,20 @@
"""
pass
-class Receiver:
+class Receiver(object):
"""
Receives incoming messages from a remote source. Messages may be
fetched with L{fetch}.
"""
- def __init__(self, session, index, source, options, started):
+ def __init__(self, session, index, source, options):
self.session = session
self.index = index
self.destination = str(self.index)
self.source = source
self.options = options
- self.started = started
- self.capacity = options.get("capacity", UNLIMITED)
self.granted = Serial(0)
self.draining = False
self.impending = Serial(0)
@@ -638,6 +597,26 @@
self.closing = False
self.closed = False
self._lock = self.session._lock
+ self._capacity = 0
+ self._set_capacity(options.get("capacity", 0), False)
+
+ @synchronized
+ def _set_capacity(self, c, wakeup=True):
+ if c is UNLIMITED:
+ self._capacity = c.value
+ else:
+ self._capacity = c
+ self._grant()
+ if wakeup:
+ self._wakeup()
+
+ def _get_capacity(self):
+ if self._capacity == UNLIMITED.value:
+ return UNLIMITED
+ else:
+ return self._capacity
+
+ capacity = property(_get_capacity, _set_capacity)
def _wakeup(self):
self.session._wakeup()
@@ -663,14 +642,6 @@
"""
return self.received - self.returned
- def _capacity(self):
- if not self.started:
- return 0
- elif self.capacity is UNLIMITED:
- return self.capacity.value
- else:
- return self.capacity
-
def _pred(self, msg):
return msg._receiver == self
@@ -687,7 +658,7 @@
self._ewait(lambda: self.linked)
- if self._capacity() == 0:
+ if self._capacity == 0:
self.granted = self.returned + 1
self._wakeup()
self._ewait(lambda: self.impending >= self.granted)
@@ -701,39 +672,16 @@
msg = self.session._get(self._pred, timeout=0)
if msg is None:
raise Empty()
- elif self._capacity() not in (0, UNLIMITED.value):
+ elif self._capacity not in (0, UNLIMITED.value):
self.granted += 1
self._wakeup()
return msg
def _grant(self):
- if self.started:
- if self.capacity is UNLIMITED:
- self.granted = UNLIMITED
- else:
- self.granted = self.received + self._capacity()
+ if self._capacity == UNLIMITED.value:
+ self.granted = UNLIMITED
else:
- self.granted = self.received
-
-
- @synchronized
- def start(self):
- """
- Start incoming message delivery for this receiver.
- """
- self.started = True
- self._grant()
- self._wakeup()
-
- @synchronized
- def stop(self):
- """
- Stop incoming message delivery for this receiver.
- """
- self.started = False
- self._grant()
- self._wakeup()
- self._ewait(lambda: self.impending == self.received)
+ self.granted = self.received + self._capacity
@synchronized
def close(self):
Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=836085&r1=836084&r2=836085&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Sat Nov 14 01:12:54 2009
@@ -169,23 +169,6 @@
self.conn.connect()
self.ping(ssn)
- def testStart(self):
- ssn = self.conn.session()
- assert not ssn.started
- self.conn.start()
- assert ssn.started
- ssn2 = self.conn.session()
- assert ssn2.started
-
- def testStop(self):
- self.conn.start()
- ssn = self.conn.session()
- assert ssn.started
- self.conn.stop()
- assert not ssn.started
- ssn2 = self.conn.session()
- assert not ssn2.started
-
def testClose(self):
self.conn.close()
assert not self.conn.connected()
@@ -234,9 +217,6 @@
rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
- # XXX: this won't work if it is before the receiver creation
- self.ssn.start()
-
snd = self.ssn.sender(ADDR)
msgs = []
@@ -257,25 +237,6 @@
assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched)
self.ssn.acknowledge()
- def testStart(self):
- START_Q = 'test-start-queue; {create: always}'
- rcv = self.ssn.receiver(START_Q)
- assert not rcv.started
- self.ssn.start()
- assert rcv.started
- rcv = self.ssn.receiver(START_Q)
- assert rcv.started
-
- def testStop(self):
- STOP_Q = 'test-stop-queue; {create: always}'
- self.ssn.start()
- rcv = self.ssn.receiver(STOP_Q)
- assert rcv.started
- self.ssn.stop()
- assert not rcv.started
- rcv = self.ssn.receiver(STOP_Q)
- assert not rcv.started
-
# XXX, we need a convenient way to assert that required queues are
# empty on setup, and possibly also to drain queues on teardown
def ackTest(self, acker, ack_capacity=None):
@@ -491,11 +452,11 @@
assert msg.content == three
self.ssn.acknowledge()
- def testStart(self):
- content = self.send("testStart")
+ def testCapacityIncrease(self):
+ content = self.send("testCapacityIncrease")
self.sleep()
assert self.rcv.pending() == 0
- self.rcv.start()
+ self.rcv.capacity = UNLIMITED
self.sleep()
assert self.rcv.pending() == 1
msg = self.rcv.fetch(0)
@@ -503,17 +464,17 @@
assert self.rcv.pending() == 0
self.ssn.acknowledge()
- def testStop(self):
- self.rcv.start()
- one = self.send("testStop", 1)
+ def testCapacityDecrease(self):
+ self.rcv.capacity = UNLIMITED
+ one = self.send("testCapacityDecrease", 1)
self.sleep()
assert self.rcv.pending() == 1
msg = self.rcv.fetch(0)
assert msg.content == one
- self.rcv.stop()
+ self.rcv.capacity = 0
- two = self.send("testStop", 2)
+ two = self.send("testCapacityDecrease", 2)
self.sleep()
assert self.rcv.pending() == 0
msg = self.rcv.fetch(0)
@@ -522,7 +483,7 @@
self.ssn.acknowledge()
def testPending(self):
- self.rcv.start()
+ self.rcv.capacity = UNLIMITED
assert self.rcv.pending() == 0
for i in range(3):
@@ -545,7 +506,6 @@
def testCapacity(self):
self.rcv.capacity = 5
- self.rcv.start()
self.assertPending(self.rcv, 0)
for i in range(15):
@@ -565,7 +525,6 @@
def testCapacityUNLIMITED(self):
self.rcv.capacity = UNLIMITED
- self.rcv.start()
self.assertPending(self.rcv, 0)
for i in range(10):
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org