You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/02/18 05:43:57 UTC

svn commit: r911251 - /qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java

Author: rajith
Date: Thu Feb 18 04:43:57 2010
New Revision: 911251

URL: http://svn.apache.org/viewvc?rev=911251&view=rev
Log:
This is related to QPID-2242
I modified the code to keep a map consisting of exchange_type and exchange_name, so it can be properly used when needed.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java?rev=911251&r1=911250&r2=911251&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java Thu Feb 18 04:43:57 2010
@@ -20,16 +20,16 @@
  */
 package org.apache.qpid.client.message;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQUndefinedDestination;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * This abstract class provides exchange lookup functionality that is shared
  * between all MessageDelegates. Update facilities are provided so that the 0-10
@@ -43,28 +43,42 @@
 public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
 {
 
-    private static Map<AMQShortString, Integer> _exchangeTypeMap = new ConcurrentHashMap<AMQShortString, Integer>();
-    private static Map<String, Integer> _exchangeTypeStringMap = new ConcurrentHashMap<String, Integer>();
-    private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>();
+    private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>();    
+    private static Map<String,ExchangeInfo> _exchangeMap = new  ConcurrentHashMap<String, ExchangeInfo>();
 
     /**
      * Add default Mappings for the Direct, Default, Topic and Fanout exchanges.
      */
     static
     {
-        _exchangeTypeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME, AMQDestination.QUEUE_TYPE);
-        _exchangeTypeMap.put(AMQShortString.EMPTY_STRING, AMQDestination.QUEUE_TYPE);
-        _exchangeTypeMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE);
-        _exchangeTypeMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE);
-
-        _exchangeTypeStringMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(), AMQDestination.QUEUE_TYPE);
-        _exchangeTypeStringMap.put("", AMQDestination.QUEUE_TYPE);
-        _exchangeTypeStringMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
-        _exchangeTypeStringMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
-
-        _exchangeTypeToDestinationType.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(), AMQDestination.QUEUE_TYPE);
-        _exchangeTypeToDestinationType.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
-        _exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
+        _exchangeTypeToDestinationType.put("", AMQDestination.QUEUE_TYPE);
+        _exchangeTypeToDestinationType.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.toString(), AMQDestination.QUEUE_TYPE);
+        _exchangeTypeToDestinationType.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS.toString(), AMQDestination.TOPIC_TYPE);
+        _exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_CLASS.toString(), AMQDestination.TOPIC_TYPE);
+        _exchangeTypeToDestinationType.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS.toString(), AMQDestination.QUEUE_TYPE);
+        
+        _exchangeMap.put("", new ExchangeInfo("","",AMQDestination.QUEUE_TYPE));
+        
+        _exchangeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(),
+                         new ExchangeInfo(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(),
+                                          ExchangeDefaults.DIRECT_EXCHANGE_CLASS.toString(),
+                                          AMQDestination.QUEUE_TYPE));
+        
+        _exchangeMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(),
+                         new ExchangeInfo(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(),
+                                          ExchangeDefaults.TOPIC_EXCHANGE_CLASS.toString(),
+                                          AMQDestination.TOPIC_TYPE));
+        
+        _exchangeMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(),
+                         new ExchangeInfo(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(),
+                                          ExchangeDefaults.FANOUT_EXCHANGE_CLASS.toString(),
+                                          AMQDestination.TOPIC_TYPE));
+        
+        _exchangeMap.put(ExchangeDefaults.HEADERS_EXCHANGE_NAME.toString(),
+                         new ExchangeInfo(ExchangeDefaults.HEADERS_EXCHANGE_NAME.toString(),
+                                          ExchangeDefaults.HEADERS_EXCHANGE_CLASS.toString(),
+                                          AMQDestination.QUEUE_TYPE));        
+        
     }
 
     /**
@@ -79,16 +93,26 @@
     protected AMQDestination generateDestination(AMQShortString exchange, AMQShortString routingKey)
     {
         AMQDestination dest;
-        switch (getExchangeType(exchange))
+        ExchangeInfo exchangeInfo = _exchangeMap.get(exchange.asString());
+        
+        if ("topic".equals(exchangeInfo.exchangeType))
+        {
+            dest = new AMQTopic(exchange, routingKey, null);
+        }
+        else if ("direct".equals(exchangeInfo.exchangeType))
         {
-            case AMQDestination.QUEUE_TYPE:
-                dest = new AMQQueue(exchange, routingKey, routingKey);
-                break;
-            case AMQDestination.TOPIC_TYPE:
-                dest = new AMQTopic(exchange, routingKey, null);
-                break;
-            default:
-                dest = new AMQUndefinedDestination(exchange, routingKey, null);
+            dest = new AMQQueue(exchange, routingKey, routingKey); 
+        }
+        else
+        {
+            dest = new AMQAnyDestination(exchange,
+                                         new AMQShortString(exchangeInfo.exchangeType),
+                                         routingKey,
+                                         false,
+                                         false,
+                                         routingKey,
+                                         false,
+                                         new AMQShortString[] {routingKey});
         }
 
         return dest;
@@ -102,7 +126,7 @@
      * create a suitable AMQDestination representation
      *
      * @param exchange the name of the exchange
-     * @param newtype the AMQP exchange class name i.e. amq.direct
+     * @param newtype the AMQP exchange class name i.e. direct
      */
     protected static void updateExchangeType(String exchange, String newtype)
     {
@@ -111,8 +135,8 @@
         {
             type = AMQDestination.UNKNOWN_TYPE;
         }
-        _exchangeTypeStringMap.put(exchange, type);
-        _exchangeTypeMap.put(new AMQShortString(exchange), type);
+        
+        _exchangeMap.put(exchange, new ExchangeInfo(exchange,newtype,type));
     }
 
     /**
@@ -126,26 +150,52 @@
      */
     protected static boolean exchangeMapContains(String exchange)
     {
-        return _exchangeTypeStringMap.containsKey(exchange);
+        return _exchangeMap.containsKey(exchange);
     }
+}
 
-    /**
-     * Returns an int representing the exchange type. This is used in the
-     * createDestination method to ensure the correct AMQDestiation is created. 
-     *
-     * @param exchange the exchange name to lookup
-     * @return int representing the Exchange type
-     */
-    private int getExchangeType(AMQShortString exchange)
+class ExchangeInfo
+{
+    String exchangeName;
+    String exchangeType;
+    int destType = AMQDestination.QUEUE_TYPE;
+    
+    public ExchangeInfo(String exchangeName, String exchangeType,
+                        int destType)
     {
-        Integer type = _exchangeTypeMap.get(exchange == null ? AMQShortString.EMPTY_STRING : exchange);
+        super();
+        this.exchangeName = exchangeName;
+        this.exchangeType = exchangeType;
+        this.destType = destType;
+    }
 
-        if (type == null)
-        {
-            return AMQDestination.UNKNOWN_TYPE;
-        }
+    public String getExchangeName()
+    {
+        return exchangeName;
+    }
+
+    public void setExchangeName(String exchangeName)
+    {
+        this.exchangeName = exchangeName;
+    }
 
-        return type;
+    public String getExchangeType()
+    {
+        return exchangeType;
+    }
+
+    public void setExchangeType(String exchangeType)
+    {
+        this.exchangeType = exchangeType;
     }
 
+    public int getDestType()
+    {
+        return destType;
+    }
+
+    public void setDestType(int destType)
+    {
+        this.destType = destType;
+    }        
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org