You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by fa...@apache.org on 2014/08/30 10:48:02 UTC

svn commit: r1621430 - /qpid/trunk/qpid/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Agent.java

Author: fadams
Date: Sat Aug 30 08:48:02 2014
New Revision: 1621430

URL: http://svn.apache.org/r1621430
Log:
QPID-6059: Java QMF2 Agent code made some now invalid assumptions about replyTo

Modified:
    qpid/trunk/qpid/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Agent.java

Modified: qpid/trunk/qpid/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Agent.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Agent.java?rev=1621430&r1=1621429&r2=1621430&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Agent.java (original)
+++ qpid/trunk/qpid/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Agent.java Sat Aug 30 08:48:02 2014
@@ -199,8 +199,8 @@ public class Agent extends QmfData imple
             
                 // Send heartbeat messages with a Time To Live (in msecs) set to two times the _heartbeatInterval
                 // to prevent stale heartbeats from getting to the consoles.
-                _broadcaster.send(response, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY,
-                                  _heartbeatInterval*2000);
+                _producer.send(_topicAddress, response, Message.DEFAULT_DELIVERY_MODE,
+                               Message.DEFAULT_PRIORITY, _heartbeatInterval*2000);
             }
             catch (JMSException jmse)
             {
@@ -311,46 +311,48 @@ public class Agent extends QmfData imple
     private MessageConsumer _mainConsumer;
     // _aliasConsumer is used for the alias address if the Agent is a broker Agent (used in Java Broker QMF plugin)
     private MessageConsumer _aliasConsumer;
-    private MessageProducer _responder; 
-    private MessageProducer _broadcaster;
 
-    /**
-     * This contains a String representation of the broadcastAddress used to decide whether to use _responder or
-     * _broadcaster as the MessageProducer for Message responses. See the comments for the sendResponse() method below.
-     */
-    private String _broadcastAddress;
+    private MessageProducer _producer;
+
+    private String _quotedDirectBase;
+    private Destination _directAddress;
+
+    private String _quotedTopicBase;
+    private Destination _topicAddress;
 
     //                                  private implementation methods
     // ********************************************************************************************************
 
 
     /**
-     * There's some slight "hackery" below. The Agent clearly needs to respond to requests and quite possibly using
-     * the JMS replyTo is the correct thing to do, however in older versions of Qpid invoking send() on the replyTo 
-     * causes spurious exchangeDeclares to occur and the caching of replyTo wasn't as good as it might be.
-     * To get around this the Agent actually uses the relevant exchange name as the core address and sets the Message
-     * "qpid.subject" with an appropriate Routing Key. The problem occurs if Console clients decide to use the
-     * qmf.default.topic as a replyTo instead of qmf.default.direct so we check the start of the replyTo using
-     * _broadcastAddress. It's slightly hacky because the Destination.toString() could change as it's implemenation
-     * specific. That shouldn't be too much of a pain as most clients should use qmf.default.direct.
+     * There's some slight "hackery" below. The Agent clearly needs to respond
+     * to requests and quite possibly using the JMS replyTo is the correct thing
+     * to do, however in older versions of Qpid invoking send() on the replyTo 
+     * causes spurious exchangeDeclares to occur and the caching of replyTo wasn't
+     * as good as it might be. To get around this the Agent uses exchange name
+     * as the core address and sets the Message "qpid.subject" property with an
+     * appropriate Routing Key.
      * @param handle the reply handle that contains the replyTo Address.
      * @param message the JMS Message to be sent.
      */
     private final void sendResponse(final Handle handle, final Message message) throws JMSException
     {
-        // A replyTo looks a bit like 'qmf.default.direct'/'direct.95dab79b-0d3e-4214-9f55-f9efb146c101'; None
-        // so we check if it starts with 'qmf.default.topic' and if so use the _broadcaster MessageProducer
-        // otherwise use the _responder. N.B. if the Destination.toString() format changes this will fail and
-        // always sent to _responder, though *most* clients (hear me qpid-config!!!) use qmf.default.direct.
+        // Just in case the replyTo issues still exist check if the replyTo starts
+        // with qmf.default.topic or qmf.default.direct and if so send to the
+        // main topic or direct Destinations, if not fall back to using the real
+        // replyTo Destination. TODO check if original replyTo issue still exists.
         String replyTo = handle.getReplyTo().toString();
-
-        if (replyTo.startsWith(_broadcastAddress))
+        if (replyTo.startsWith(_quotedTopicBase))
+        {
+            _producer.send(_topicAddress, message);
+        }
+        else if (replyTo.startsWith(_quotedDirectBase))
         {
-            _broadcaster.send(message);
+            _producer.send(_directAddress, message);
         }
         else
         {
-            _responder.send(message);
+            _producer.send(handle.getReplyTo(), message);
         }
     }
 
@@ -452,7 +454,7 @@ public class Agent extends QmfData imple
                     // taken by the C++ broker ManagementAgent, so if it's a problem here........
 
                     // N.B. the results list declared here is a generic List of Objects. We *must* only pass a List of
-                    // Map to queryResponse(), but conversely if the response items are sortable we need tp sort them
+                    // Map to queryResponse(), but conversely if the response items are sortable we need to sort them
                     // before doing mapEncode(). Unfortunately we don't know if the items are sortable a priori so
                     // we either add a Map or we add a QmfAgentData, then sort then mapEncode() each item. I'm not
                     // sure of a more elegant way to do this without creating two lists, which might not be so bad
@@ -1020,21 +1022,21 @@ public class Agent extends QmfData imple
 
         try
         {
-            String directBase = "qmf." + _domain + ".direct";
-            String topicBase  = "qmf." + _domain + ".topic";
-            String address = directBase + "/" + _name + addressOptions;
-
             _asyncSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             _syncSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-            // Create a MessageProducer for the QMF topic address used to broadcast Events & Heartbeats.
-            Destination topicAddress = _syncSession.createQueue(topicBase);
-            _broadcaster = _syncSession.createProducer(topicAddress);
-            _broadcastAddress = "'" + topicBase + "'";
-
-            // Create a MessageProducer for the QMF direct address, mainly used for request/response
-            Destination directAddress = _syncSession.createQueue(directBase);
-            _responder = _syncSession.createProducer(directAddress);
+            // Create a Destination for the QMF direct address, mainly used for request/response
+            String directBase = "qmf." + _domain + ".direct";
+            _quotedDirectBase = "'" + directBase + "'";
+            _directAddress = _syncSession.createQueue(directBase);
+
+            // Create a Destination for the QMF topic address used to broadcast Events & Heartbeats.
+            String topicBase  = "qmf." + _domain + ".topic";
+            _quotedTopicBase = "'" + topicBase + "'";
+            _topicAddress = _syncSession.createQueue(topicBase);
+
+            // Create an unidentified MessageProducer for sending to various destinations.
+            _producer = _syncSession.createProducer(null);
 
             // TODO it should be possible to bind _locateConsumer, _mainConsumer and _aliasConsumer to the
             // same queue if I can figure out the correct AddressString to use, probably not a big deal though.
@@ -1045,6 +1047,7 @@ public class Agent extends QmfData imple
             _locateConsumer.setMessageListener(this);
 
             // Set up MessageListener on the Agent address
+            String address = directBase + "/" + _name + addressOptions;
             Destination agentAddress = _asyncSession.createQueue(address);
             _mainConsumer = _asyncSession.createConsumer(agentAddress);
             _mainConsumer.setMessageListener(this);
@@ -1158,7 +1161,7 @@ public class Agent extends QmfData imple
             List<Map> results = new ArrayList<Map>();
             results.add(event.mapEncode());
             AMQPMessage.setList(response, results);
-            _broadcaster.send(response);
+            _producer.send(_topicAddress, response);
         }
         catch (JMSException jmse)
         {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org