You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/27 05:02:41 UTC

svn commit: r928138 - /qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py

Author: tross
Date: Sat Mar 27 04:02:41 2010
New Revision: 928138

URL: http://svn.apache.org/viewvc?rev=928138&view=rev
Log:
Python console: cancel pending queries immediately upon loss of agent connectivity.

Modified:
    qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py

Modified: qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py?rev=928138&r1=928137&r2=928138&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py Sat Mar 27 04:02:41 2010
@@ -465,6 +465,7 @@ class Session:
     list: 21
     }  
 
+
   def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True,
                manageConnections=False, userBindings=False):
     """
@@ -513,6 +514,7 @@ class Session:
     if self.userBindings and not self.rcvObjects:
       raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
 
+
   def _getBrokerForAgentAddr(self, agent_addr):
     try:
       self.cv.acquire()
@@ -524,6 +526,7 @@ class Session:
       self.cv.release()
     return None
 
+
   def _getAgentForAgentAddr(self, agent_addr):
     try:
       self.cv.acquire()
@@ -535,9 +538,11 @@ class Session:
       self.cv.release()
     return None
 
+
   def __repr__(self):
     return "QMF Console Session Manager (brokers: %d)" % len(self.brokers)
 
+
   def addBroker(self, target="localhost", timeout=None, mechanisms=None):
     """ Connect to a Qpid broker.  Returns an object of type Broker. """
     url = BrokerURL(target)
@@ -549,6 +554,7 @@ class Session:
       self.getObjects(broker=broker, _class="agent", _agent=broker.getAgent(1,0))
     return broker
 
+
   def delBroker(self, broker):
     """ Disconnect from a broker.  The 'broker' argument is the object
     returned from the addBroker call """
@@ -559,24 +565,28 @@ class Session:
     self.brokers.remove(broker)
     del broker
 
+
   def getPackages(self):
     """ Get the list of known QMF packages """
     for broker in self.brokers:
       broker._waitForStable()
     return self.schemaCache.getPackages()
 
+
   def getClasses(self, packageName):
     """ Get the list of known classes within a QMF package """
     for broker in self.brokers:
       broker._waitForStable()
     return self.schemaCache.getClasses(packageName)
 
+
   def getSchema(self, classKey):
     """ Get the schema for a QMF class """
     for broker in self.brokers:
       broker._waitForStable()
     return self.schemaCache.getSchema(classKey)
 
+
   def bindPackage(self, packageName):
     """ Request object updates for all table classes within a package. """
     if not self.userBindings or not self.rcvObjects:
@@ -588,6 +598,7 @@ class Session:
         broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
                                          binding_key=key)
 
+
   def bindClass(self, pname, cname):
     """ Request object updates for a particular table class by package and class name. """
     if not self.userBindings or not self.rcvObjects:
@@ -598,6 +609,7 @@ class Session:
       if broker.isConnected():
         broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
                                          binding_key=key)
+
       
   def bindClassKey(self, classKey):
     """ Request object updates for a particular table class by class key. """
@@ -605,6 +617,7 @@ class Session:
     cname = classKey.getClassName()
     self.bindClass(pname, cname)
 
+
   def getAgents(self, broker=None):
     """ Get a list of currently known agents """
     brokerList = []
@@ -622,6 +635,7 @@ class Session:
         agentList.append(a)
     return agentList
 
+
   def makeObject(self, classKey, **kwargs):
     """ Create a new, unmanaged object of the schema indicated by classKey """
     schema = self.getSchema(classKey)
@@ -629,6 +643,7 @@ class Session:
       raise Exception("Schema not found for classKey")
     return Object(None, schema, None, True, True, kwargs)
 
+
   def getObjects(self, **kwargs):
     """ Get a list of objects from QMF agents.
     All arguments are passed by name(keyword).
@@ -768,10 +783,12 @@ class Session:
       raise RuntimeError("No agent responded within timeout period")
     return self.getResult
 
+
   def setEventFilter(self, **kwargs):
     """ """
     pass
 
+
   def _bindingKeys(self):
     keyList = []
     keyList.append("schema.#")
@@ -788,18 +805,21 @@ class Session:
         keyList.append("console.heartbeat.#")
     return keyList
 
+
   def _handleBrokerConnect(self, broker):
     if self.console:
       for agent in broker.getAgents():
         self.console.newAgent(agent)
       self.console.brokerConnected(broker)
 
+
   def _handleBrokerDisconnect(self, broker):
     if self.console:
       for agent in broker.getAgents():
         self.console.delAgent(agent)
       self.console.brokerDisconnected(broker)
 
+
   def _handleBrokerResp(self, broker, codec, seq):
     broker.brokerId = codec.read_uuid()
     if self.console != None:
@@ -813,6 +833,7 @@ class Session:
     smsg = broker._message(sendCodec.encoded)
     broker._send(smsg)
 
+
   def _handlePackageInd(self, broker, codec, seq):
     pname = str(codec.read_str8())
     notify = self.schemaCache.declarePackage(pname)
@@ -828,6 +849,7 @@ class Session:
     smsg = broker._message(sendCodec.encoded)
     broker._send(smsg)
 
+
   def _handleCommandComplete(self, broker, codec, seq, agent):
     code = codec.read_uint32()
     text = codec.read_str8()
@@ -869,6 +891,7 @@ class Session:
       smsg = broker._message(sendCodec.encoded)
       broker._send(smsg)
 
+
   def _handleHeartbeatInd(self, broker, codec, seq, msg):
     brokerBank = 1
     agentBank = 0
@@ -892,6 +915,7 @@ class Session:
       self.console.heartbeat(agent, timestamp)
     broker._ageAgents()
 
+
   def _handleSchemaResp(self, broker, codec, seq, agent_addr):
     kind  = codec.read_uint8()
     classKey = ClassKey(codec)
@@ -908,6 +932,7 @@ class Session:
       if agent:
         agent._schemaInfoFromV2Agent()
 
+
   def _v2HandleHeartbeatInd(self, broker, mp, ah, content):
     try:
       agentName = ah["qmf.agent"]
@@ -927,9 +952,11 @@ class Session:
       self.console.heartbeat(agent, timestamp)
     broker._ageAgents()
 
+
   def _v2HandleAgentLocateRsp(self, broker, mp, ah, content):
     self._v2HandleHeartbeatInd(broker, mp, ah, content)
 
+
   def _handleError(self, error):
     try:
       self.cv.acquire()
@@ -940,6 +967,7 @@ class Session:
     finally:
       self.cv.release()
 
+
   def _selectMatch(self, object):
     """ Check the object against self.getSelect to check for a match """
     for key, value in self.getSelect:
@@ -948,6 +976,7 @@ class Session:
           return False
     return True
   
+
   def _decodeValue(self, codec, typecode, broker=None):
     """ Decode, from the codec, a value based on its typecode. """
     if   typecode == 1:  data = codec.read_uint8()      # U8
@@ -1002,6 +1031,7 @@ class Session:
       raise ValueError("Invalid type code: %d" % typecode)
     return data
 
+
   def _encodeValue(self, codec, value, typecode):
     """ Encode, into the codec, a value based on its typecode. """
     if   typecode == 1:  codec.write_uint8  (int(value))    # U8
@@ -1043,9 +1073,11 @@ class Session:
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
 
+
   def encoding(self, value):
       return self._encoding(value.__class__)
       
+
   def _encoding(self, klass):
     if Session.ENCODINGS.has_key(klass):
       return self.ENCODINGS[klass]
@@ -1054,6 +1086,7 @@ class Session:
       if result != None:
         return result
             
+
   def _displayValue(self, value, typecode):
     """ """
     if   typecode == 1:  return unicode(value)
@@ -1082,6 +1115,7 @@ class Session:
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
     
+
   def _defaultValue(self, stype, broker=None, kwargs={}):
     """ """
     typecode = stype.type
@@ -1120,6 +1154,7 @@ class Session:
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
 
+
   def _bestClassKey(self, pname, cname, preferredList):
     """ """
     if pname == None or cname == None:
@@ -1135,6 +1170,7 @@ class Session:
         return c
     return None
     
+
   def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList):
     """ This function can be used to send a method request to an object given only the
     broker, schemaKey, and objectId.  This is an uncommon usage pattern as methods are
@@ -2227,6 +2263,17 @@ class Agent:
 
   def close(self):
     self.closed = True
+    copy = {}
+    try:
+      self.lock.acquire()
+      for seq in self.contextMap:
+        copy[seq] = self.contextMap[seq]
+    finally:
+      self.lock.release()
+
+    for seq in copy:
+      context = copy[seq]
+      context.cancel("Agent disconnected")
 
 
   def __repr__(self):
@@ -2588,7 +2635,7 @@ class RequestContext(object):
 
 
   def setSequence(self, sequence):
-    self.sequence =  sequence
+    self.sequence = sequence
 
 
   def addV1QueryResult(self, data):
@@ -2610,6 +2657,18 @@ class RequestContext(object):
     return time() - self.startTime
 
 
+  def cancel(self, exception):
+    self.setException(exception)
+    try:
+      self.cv.acquire()
+      self.blocked = None
+      self.waitingForSchema = None
+      self.cv.notify()
+    finally:
+      self.cv.release()
+    self._complete()
+
+
   def waitForSignal(self, timeout):
     try:
       self.cv.acquire()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org