You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/11/24 20:49:57 UTC
svn commit: r720275 - /incubator/qpid/trunk/qpid/python/qmf/console.py
Author: tross
Date: Mon Nov 24 11:49:57 2008
New Revision: 720275
URL: http://svn.apache.org/viewvc?rev=720275&view=rev
Log:
QPID-1483 - Connection handling in the Python QMF console API
Modified:
incubator/qpid/trunk/qpid/python/qmf/console.py
Modified: incubator/qpid/trunk/qpid/python/qmf/console.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qmf/console.py?rev=720275&r1=720274&r2=720275&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qmf/console.py (original)
+++ incubator/qpid/trunk/qpid/python/qmf/console.py Mon Nov 24 11:49:57 2008
@@ -29,7 +29,7 @@
from qpid.datatypes import UUID, uuid4, Message, RangedSet
from qpid.util import connect, ssl, URL
from qpid.codec010 import StringCodec as Codec
-from threading import Lock, Condition
+from threading import Lock, Condition, Thread
from time import time, strftime, gmtime
from cStringIO import StringIO
@@ -165,22 +165,18 @@
if self.userBindings and not self.rcvObjects:
raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
- if manageConnections:
- raise Exception("manageConnections - not yet implemented")
-
def __repr__(self):
- return "QMF Console Session Manager (brokers connected: %d)" % len(self.brokers)
+ return "QMF Console Session Manager (brokers: %d)" % len(self.brokers)
def addBroker(self, target="localhost"):
""" Connect to a Qpid broker. Returns an object of type Broker. """
url = BrokerURL(target)
broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass,
ssl = url.scheme == URL.AMQPS)
- if not broker.isConnected and not self.manageConnections:
- raise Exception(broker.error)
self.brokers.append(broker)
- self.getObjects(broker=broker, _class="agent")
+ if not self.manageConnections:
+ self.getObjects(broker=broker, _class="agent")
return broker
def delBroker(self, broker):
@@ -220,22 +216,32 @@
return self.packages[pname][pkey]
def bindPackage(self, packageName):
- """ """
+ """ Request object updates for all table classes within a package. """
if not self.userBindings or not self.rcvObjects:
raise Exception("userBindings option not set for Session")
+ key = "console.obj.*.*.%s.#" % packageName
+ self.bindingKeyList.append(key)
for broker in self.brokers:
- broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
- binding_key="console.obj.*.*.%s.#" % packageName)
+ if broker.isConnected():
+ broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
+ binding_key=key)
- def bindClass(self, classKey):
- """ """
+ def bindClass(self, pname, cname):
+ """ Request object updates for a particular table class by package and class name. """
if not self.userBindings or not self.rcvObjects:
raise Exception("userBindings option not set for Session")
+ key = "console.obj.*.*.%s.%s.#" % (pname, cname)
+ self.bindingKeyList.append(key)
+ for broker in self.brokers:
+ if broker.isConnected():
+ broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
+ binding_key=key)
+
+ def bindClassKey(self, classKey):
+ """ Request object updates for a particular table class by class key. """
pname = classKey.getPackageName()
cname = classKey.getClassName()
- for broker in self.brokers:
- broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
- binding_key="console.obj.*.*.%s.%s.#" % (pname, cname))
+ self.bindClass(pname, cname)
def getAgents(self, broker=None):
""" Get a list of currently known agents """
@@ -293,11 +299,16 @@
agent = kwargs["_agent"]
if agent.broker not in brokerList:
raise Exception("Supplied agent is not accessible through the supplied broker")
- agentList.append(agent)
+ if agent.broker.isConnected():
+ agentList.append(agent)
else:
for broker in brokerList:
for agent in broker.getAgents():
- agentList.append(agent)
+ if agent.broker.isConnected():
+ agentList.append(agent)
+
+ if len(agentList) == 0:
+ return []
pname = None
cname = None
@@ -387,10 +398,12 @@
return keyList
def _handleBrokerConnect(self, broker):
- pass
+ if self.console:
+ self.console.brokerConnected(broker)
def _handleBrokerDisconnect(self, broker):
- pass
+ if self.console:
+ self.console.brokerDisconnected(broker)
def _handleBrokerResp(self, broker, codec, seq):
broker.brokerId = UUID(codec.read_uuid())
@@ -1127,8 +1140,63 @@
def __repr__(self):
return "%s (%d) - %s" % (self.text, self.status, self.outArgs)
+class ManagedConnection(Thread):
+ """ Thread class for managing a connection. """
+ DELAY_MIN = 1
+ DELAY_MAX = 128
+ DELAY_FACTOR = 2
+
+ def __init__(self, broker):
+ Thread.__init__(self)
+ self.broker = broker
+ self.cv = Condition()
+ self.canceled = False
+
+ def stop(self):
+ """ Tell this thread to stop running and return. """
+ try:
+ self.cv.acquire()
+ self.canceled = True
+ self.cv.notify()
+ finally:
+ self.cv.release()
+
+ def disconnected(self):
+ """ Notify the thread that the connection was lost. """
+ try:
+ self.cv.acquire()
+ self.cv.notify()
+ finally:
+ self.cv.release()
+
+ def run(self):
+ """ Main body of the running thread. """
+ delay = self.DELAY_MIN
+ while True:
+ try:
+ self.broker._tryToConnect()
+ try:
+ self.cv.acquire()
+ while (not self.canceled) and self.broker.connected:
+ self.cv.wait()
+ if self.canceled:
+ return
+ delay = self.DELAY_MIN
+ except:
+ self.cv.release()
+ except:
+ if delay < self.DELAY_MAX:
+ delay *= self.DELAY_FACTOR
+ try:
+ self.cv.acquire()
+ self.cv.wait(delay)
+ if self.canceled:
+ return
+ finally:
+ self.cv.release()
+
class Broker:
- """ """
+ """ This object represents a connection (or potential connection) to a QMF broker. """
SYNC_TIME = 60
def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False):
@@ -1138,24 +1206,24 @@
self.ssl = ssl
self.authUser = authUser
self.authPass = authPass
- self.agents = {}
- self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
- self.topicBound = False
self.cv = Condition()
- self.syncInFlight = False
- self.syncRequest = 0
- self.syncResult = None
- self.reqsOutstanding = 1
self.error = None
self.brokerId = None
- self.isConnected = False
+ self.connected = False
self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid())
- self._tryToConnect()
+ if self.session.manageConnections:
+ self.thread = ManagedConnection(self)
+ self.thread.start()
+ else:
+ self.thread = None
+ self._tryToConnect()
def isConnected(self):
- return self.isConnected
+ """ Return True if there is an active connection to the broker. """
+ return self.connected
def getError(self):
+ """ Return the last error message seen while trying to connect to the broker. """
return self.error
def getBrokerId(self):
@@ -1163,9 +1231,13 @@
return self.brokerId
def getBrokerBank(self):
+ """ Return the broker-bank value. This is the value that the broker assigns to
+ objects within its control. This value appears as a field in the ObjectId
+ of objects created by agents controlled by this broker. """
return 1
def getAgent(self, brokerBank, agentBank):
+ """ Return the agent object associated with a particular broker and agent bank value."""
bankKey = (brokerBank, agentBank)
if bankKey in self.agents:
return self.agents[bankKey]
@@ -1199,13 +1271,21 @@
return "amqp%s://%s%s:%d" % (ssl, auth, self.host, self.port or 5672)
def __repr__(self):
- if self.isConnected:
+ if self.connected:
return "Broker connected at: %s" % self.getUrl()
else:
return "Disconnected Broker"
def _tryToConnect(self):
try:
+ self.agents = {}
+ self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
+ self.topicBound = False
+ self.syncInFlight = False
+ self.syncRequest = 0
+ self.syncResult = None
+ self.reqsOutstanding = 1
+
sock = connect(self.host, self.port)
if self.ssl:
sock = ssl(sock)
@@ -1235,7 +1315,7 @@
self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF)
self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFF)
- self.isConnected = True
+ self.connected = True
self.session._handleBrokerConnect(self)
codec = Codec(self.conn.spec)
@@ -1245,10 +1325,13 @@
except socket.error, e:
self.error = "Socket Error %s - %s" % (e[0], e[1])
+ raise Exception(self.error)
except Closed, e:
self.error = "Connect Failed %d - %s" % (e[0], e[1])
+ raise Exception(self.error)
except ConnectionFailed, e:
self.error = "Connect Failed %d - %s" % (e[0], e[1])
+ raise Exception(self.error)
def _updateAgent(self, obj):
bankKey = (obj.brokerBank, obj.agentBank)
@@ -1301,19 +1384,22 @@
self.amqpSession.message_transfer(destination=dest, message=msg)
def _shutdown(self):
- if self.isConnected:
+ if self.thread:
+ self.thread.stop()
+ self.thread.join()
+ if self.connected:
self.amqpSession.incoming("rdest").stop()
if self.session.console != None:
self.amqpSession.incoming("tdest").stop()
self.amqpSession.close()
self.conn.close()
- self.isConnected = False
- else:
- raise Exception("Broker already disconnected")
+ self.connected = False
def _waitForStable(self):
try:
self.cv.acquire()
+ if not self.connected:
+ return
if self.reqsOutstanding == 0:
return
self.syncInFlight = True
@@ -1365,7 +1451,7 @@
elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True)
def _exceptionCb(self, data):
- self.isConnected = False
+ self.connected = False
self.error = data
try:
self.cv.acquire()
@@ -1375,6 +1461,8 @@
self.cv.release()
self.session._handleError(self.error)
self.session._handleBrokerDisconnect(self)
+ if self.thread:
+ self.thread.disconnected()
class Agent:
""" """