You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2006/12/18 14:09:23 UTC

svn commit: r488264 - in /incubator/qpid/branches/jmsselectors/java/broker: ./ bin/ src/main/java/org/apache/qpid/server/exchange/ src/main/java/org/apache/qpid/server/filter/ src/main/java/org/apache/qpid/server/handler/ src/main/java/org/apache/qpid/...

Author: ritchiem
Date: Mon Dec 18 05:09:22 2006
New Revision: 488264

URL: http://svn.apache.org/viewvc?view=rev&rev=488264
Log:
FilterManagerFactory/JMSSelectorFilter.java - Added throw AMQInvalidSelectorException to be passed back to BasicConsumeMethodHandler.java and then to the Client.
Addition of ConcurrentSelectorDeliveryManager.java. This uses the additional methods on Subscription (hasFilters, hasInterest) to implement selectors.
Trunk Merges

Added:
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
      - copied unchanged from r488007, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
      - copied unchanged from r488007, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
      - copied unchanged from r488007, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java   (with props)
Modified:
    incubator/qpid/branches/jmsselectors/java/broker/bin/qpid-server.bat
    incubator/qpid/branches/jmsselectors/java/broker/pom.xml
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
    incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java

Modified: incubator/qpid/branches/jmsselectors/java/broker/bin/qpid-server.bat
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/bin/qpid-server.bat?view=diff&rev=488264&r1=488263&r2=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/bin/qpid-server.bat (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/bin/qpid-server.bat Mon Dec 18 05:09:22 2006
@@ -63,6 +63,6 @@
 :runCommand
 set LAUNCH_JAR=%QPID_HOME%\lib\qpid-incubating.jar
 set MODULE_JARS=%QPID_MODULE_JARS%
-"%JAVA_HOME%"\bin\java -server -Xmx1024m -DQPID_HOME="%QPID_HOME%" -cp "%LAUNCH_JAR%;%MODULE_JARS%" org.apache.qpid.server.Main %QPID_ARGS%
+"%JAVA_HOME%\bin\java" -server -Xmx1024m -DQPID_HOME="%QPID_HOME%" -cp "%LAUNCH_JAR%;%MODULE_JARS%" org.apache.qpid.server.Main %QPID_ARGS%
 
 :end

Modified: incubator/qpid/branches/jmsselectors/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/pom.xml?view=diff&rev=488264&r1=488263&r2=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/pom.xml (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/pom.xml Mon Dec 18 05:09:22 2006
@@ -34,6 +34,7 @@
 
     <properties>
         <topDirectoryLocation>..</topDirectoryLocation>
+        <amqj.logging.level>warn</amqj.logging.level>
     </properties>
 
     <dependencies>
@@ -53,7 +54,7 @@
             <groupId>commons-lang</groupId>
             <artifactId>commons-lang</artifactId>
         </dependency>
-          <dependency>
+        <dependency>
             <groupId>org.apache.geronimo.specs</groupId>
             <artifactId>geronimo-jms_1.1_spec</artifactId>
         </dependency>
@@ -85,7 +86,6 @@
 
     <build>
         <plugins>
-
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>javacc-maven-plugin</artifactId>
@@ -116,7 +116,7 @@
                         </property>
                         <property>
                             <name>amqj.logging.level</name>
-                            <value>WARN</value>
+                            <value>${amqj.logging.level}</value>
                         </property>
                         <property>
                             <name>log4j.configuration</name>

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?view=diff&rev=488264&r1=488263&r2=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java Mon Dec 18 05:09:22 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -196,5 +196,35 @@
                 q.deliver(payload);
             }
         }
+    }
+
+    public boolean isBound(String routingKey, AMQQueue queue) throws AMQException
+    {
+        final List<AMQQueue> queues = _index.get(routingKey);
+        return queues != null && queues.contains(queue);
+    }
+
+    public boolean isBound(String routingKey) throws AMQException
+    {
+        final List<AMQQueue> queues = _index.get(routingKey);
+        return queues != null && !queues.isEmpty();
+    }
+
+    public boolean isBound(AMQQueue queue) throws AMQException
+    {
+        Map<String, List<AMQQueue>> bindings = _index.getBindingsMap();
+        for (List<AMQQueue> queues : bindings.values())
+        {
+            if (queues.contains(queue))
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean hasBindings() throws AMQException
+    {
+        return !_index.getBindingsMap().isEmpty();
     }
 }

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=488264&r1=488263&r2=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Mon Dec 18 05:09:22 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -126,10 +126,11 @@
     } // End of MBean class
 
 
-    public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    public synchronized void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
     {
         assert queue != null;
         assert routingKey != null;
+        _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey);
         // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
         List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
         // if we got null back, no previous value was associated with the specified routing key hence
@@ -159,6 +160,8 @@
         // TODO: add support for the immediate flag
         if (queues == null)
         {
+            _logger.warn("No queues found for routing key " + routingKey);
+            _logger.warn("Routing map contains: " + _routingKey2queues);
             //todo Check for valid topic - mritchie
             return;
         }
@@ -172,7 +175,37 @@
         }
     }
 
-    public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
+    public boolean isBound(String routingKey, AMQQueue queue) throws AMQException
+    {
+        List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+        return queues != null && queues.contains(queue);
+    }
+
+
+    public boolean isBound(String routingKey) throws AMQException
+    {
+        List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+        return queues != null && !queues.isEmpty();
+    }
+
+    public boolean isBound(AMQQueue queue) throws AMQException
+    {
+        for (List<AMQQueue> queues : _routingKey2queues.values())
+        {
+            if (queues.contains(queue))
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean hasBindings() throws AMQException
+    {
+        return !_routingKey2queues.isEmpty();
+    }
+
+    public synchronized void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
     {
         assert queue != null;
         assert routingKey != null;
@@ -189,6 +222,10 @@
         {
             throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
                                    " with routing key " + routingKey);
+        }
+        if (queues.isEmpty())
+        {
+            _routingKey2queues.remove(queues);
         }
     }
 

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?view=diff&rev=488264&r1=488263&r2=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Mon Dec 18 05:09:22 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -47,4 +47,30 @@
     void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException;
 
     void route(AMQMessage message) throws AMQException;
+
+    /**
+     * Determines whether a message would be isBound to a particular queue using a specific routing key
+     * @param routingKey
+     * @param queue
+     * @return
+     * @throws AMQException
+     */
+    boolean isBound(String routingKey, AMQQueue queue) throws AMQException;
+
+    /**
+     * Determines whether a message is routing to any queue using a specific routing key
+     * @param routingKey
+     * @return
+     * @throws AMQException
+     */
+    boolean isBound(String routingKey) throws AMQException;
+
+    boolean isBound(AMQQueue queue) throws AMQException;
+
+    /**
+     * Returns true if this exchange has at least one binding associated with it.
+     * @return
+     * @throws AMQException
+     */
+    boolean hasBindings() throws AMQException;
 }

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=488264&r1=488263&r2=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Mon Dec 18 05:09:22 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -206,8 +206,46 @@
         }
         if (!delivered)
         {
-            _logger.warn("Exchange " + getName() + ": message not routable.");
+
+            String msg = "Exchange " + getName() + ": message not routable.";
+
+            if (payload.getPublishBody().mandatory)
+            {
+                throw new NoRouteException(msg, payload);
+            }
+            else
+            {
+                _logger.warn(msg);
+            }
+
         }
+    }
+
+    public boolean isBound(String routingKey, AMQQueue queue) throws AMQException
+    {
+        return isBound(queue);
+    }
+
+    public boolean isBound(String routingKey) throws AMQException
+    {
+        return hasBindings();
+    }
+
+    public boolean isBound(AMQQueue queue) throws AMQException
+    {
+        for (Registration r : _bindings)
+        {
+            if (r.queue.equals(queue))
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean hasBindings() throws AMQException
+    {
+        return !_bindings.isEmpty();
     }
 
     protected Map getHeaders(ContentHeaderBody contentHeaderFrame)

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java?view=diff&rev=488264&r1=488263&r2=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java Mon Dec 18 05:09:22 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -37,7 +38,7 @@
     private ConcurrentMap<String, List<AMQQueue>> _index
             = new ConcurrentHashMap<String, List<AMQQueue>>();
 
-    boolean add(String key, AMQQueue queue)
+    synchronized boolean add(String key, AMQQueue queue)
     {
         List<AMQQueue> queues = _index.get(key);
         if(queues == null)
@@ -61,7 +62,7 @@
         }
     }
 
-    boolean remove(String key, AMQQueue queue)
+    synchronized boolean remove(String key, AMQQueue queue)
     {
         List<AMQQueue> queues = _index.get(key);
         if (queues != null)
@@ -83,6 +84,6 @@
 
     Map<String, List<AMQQueue>> getBindingsMap()
     {
-        return _index;
+        return new HashMap<String, List<AMQQueue>>(_index);
     }
 }

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java?view=diff&rev=488264&r1=488263&r2=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java Mon Dec 18 05:09:22 2006
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.filter;
 
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
 //import org.slf4j.Logger;
 //import org.slf4j.LoggerFactory;
 
@@ -35,7 +36,7 @@
     //fixme move to a common class so it can be refered to from client code.
     private static String JMS_SELECTOR_FILTER = "x-filter-jms-selector";
 
-    public static FilterManager createManager(FieldTable filters)
+    public static FilterManager createManager(FieldTable filters) throws AMQException
     {
         FilterManager manager = null;
 

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java?view=diff&rev=488264&r1=488263&r2=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java Mon Dec 18 05:09:22 2006
@@ -23,6 +23,9 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.filter.jms.selector.SelectorParser;
 import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.log4j.Logger;
 //import org.slf4j.Logger;
 //import org.slf4j.LoggerFactory;
@@ -33,13 +36,13 @@
 public class JMSSelectorFilter implements MessageFilter
 {
 
-    private final static Logger _logger =  org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
+    private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
 // LoggerFactory.getLogger(JMSSelectorFilter.class);
 
     private String _selector;
     private BooleanExpression _matcher;
 
-    public JMSSelectorFilter(String selector)
+    public JMSSelectorFilter(String selector) throws AMQException
     {
         _selector = selector;
         _logger.info("Created JMSSelectorFilter with selector:" + _selector);
@@ -53,8 +56,8 @@
         catch (InvalidSelectorException e)
         {
             // fixme
-            // Will have to throw this back to the client... in the future
-            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            // Is this the correct way of throwing exception
+            throw new AMQInvalidSelectorException(e.getMessage());
         }
 
     }
@@ -64,7 +67,7 @@
         try
         {
             boolean match = _matcher.matches(message);
-            _logger.info(message + " match(" + match + ") selector:" + _selector);
+            _logger.info(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
             return match;
         }
         catch (JMSException e)

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=488264&r1=488263&r2=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Mon Dec 18 05:09:22 2006
@@ -21,10 +21,12 @@
 package org.apache.qpid.server.handler;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.framing.BasicConsumeBody;
 import org.apache.qpid.framing.BasicConsumeOkBody;
 import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.ConsumerTagNotUniqueException;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -32,6 +34,7 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.log4j.Logger;
@@ -68,14 +71,14 @@
         {
             AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
 
-            if(queue == null)
+            if (queue == null)
             {
                 _log.info("No queue for '" + body.queue + "'");
             }
             try
             {
-                String consumerTag = channel.subscribeToQueue(body.consumerTag, queue,  session, !body.noAck, body.filter);
-                if(!body.nowait)
+                String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, body.arguments);
+                if (!body.nowait)
                 {
                     session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
                 }
@@ -83,10 +86,19 @@
                 //now allow queue to start async processing of any backlog of messages
                 queue.deliverAsync();
             }
-            catch(ConsumerTagNotUniqueException e)
+            catch (AMQInvalidSelectorException ise)
+            {
+                _log.info("Closing connection due to invalid selector");
+                session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
+                                                                      ise.getMessage(), BasicConsumeBody.CLASS_ID,
+                                                                      BasicConsumeBody.METHOD_ID));
+            }
+            catch (ConsumerTagNotUniqueException e)
             {
                 String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
-                session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, BasicConsumeBody.CLASS_ID, BasicConsumeBody.METHOD_ID));
+                session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
+                                                                      BasicConsumeBody.CLASS_ID,
+                                                                      BasicConsumeBody.METHOD_ID));
             }
         }
     }

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=488264&r1=488263&r2=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Dec 18 05:09:22 2006
@@ -40,9 +40,6 @@
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.queue.QueueRegistry;
@@ -50,24 +47,12 @@
 import org.apache.qpid.server.state.AMQStateManager;
 
 import javax.management.JMException;
-import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.Notification;
-import javax.management.monitor.MonitorNotification;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
 import javax.security.sasl.SaslServer;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.Date;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArraySet;
 
@@ -93,7 +78,7 @@
 
     private AMQCodecFactory _codecFactory;
 
-    private ManagedAMQProtocolSession _managedObject;
+    private AMQProtocolSessionMBean _managedObject;
 
     private SaslServer _saslServer;
 
@@ -102,11 +87,10 @@
     private Object _lastSent;
 
     private boolean _closed;
-
+    // maximum number of channels this session should have
     private long _maxNoOfChannels = 1000;
     
     /* AMQP Version for this session */
-    
     private byte _major;
     private byte _minor;
 
@@ -115,190 +99,6 @@
         return _managedObject;
     }
 
-    /**
-     * This class implements the management interface (is an MBean). In order to
-     * make more attributes, operations and notifications available over JMX simply
-     * augment the ManagedConnection interface and add the appropriate implementation here.
-     */
-    @MBeanDescription("Management Bean for an AMQ Broker Connection")
-    private final class ManagedAMQProtocolSession extends AMQManagedObject implements ManagedConnection
-    {
-        private String _name = null;
-        //openmbean data types for representing the channel attributes
-        private String[] _channelAtttibuteNames = { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"};
-        private String[] _indexNames = {_channelAtttibuteNames[0]};
-        private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER};
-        private CompositeType _channelType = null;      // represents the data type for channel data
-        private TabularType _channelsType = null;       // Data type for list of channels type
-        private TabularDataSupport _channelsList = null;
-
-        @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
-        public ManagedAMQProtocolSession() throws JMException
-        {
-            super(ManagedConnection.class, ManagedConnection.TYPE);
-            init();
-        }
-
-        /**
-         * initialises the openmbean data types
-         */
-        private void init() throws OpenDataException
-        {
-            String remote = getRemoteAddress();
-            remote = "anonymous".equals(remote) ? remote + hashCode() : remote;
-            _name = jmxEncode(new StringBuffer(remote), 0).toString();
-            _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
-                                             _channelAtttibuteNames, _channelAttributeTypes);
-            _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
-        }
-
-        public Date getLastIoTime()
-        {
-            return new Date(_minaProtocolSession.getLastIoTime());
-        }
-
-        public String getRemoteAddress()
-        {
-            return _minaProtocolSession.getRemoteAddress().toString();
-        }
-
-        public Long getWrittenBytes()
-        {
-            return _minaProtocolSession.getWrittenBytes();
-        }
-
-        public Long getReadBytes()
-        {
-            return _minaProtocolSession.getReadBytes();
-        }
-
-        public Long getMaximumNumberOfChannels()
-        {
-            return _maxNoOfChannels;
-        }
-
-        public void setMaximumNumberOfChannels(Long value)
-        {
-            _maxNoOfChannels = value;
-        }
-
-        public String getObjectInstanceName()
-        {
-            return _name;
-        }
-
-        public void commitTransactions(int channelId) throws JMException
-        {
-            try
-            {
-                AMQChannel channel = _channelMap.get(channelId);
-                if (channel == null)
-                {
-                    throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
-                }
-                if (channel.isTransactional())
-                {
-                    channel.commit();
-                }
-            }
-            catch(AMQException ex)
-            {
-                throw new MBeanException(ex, ex.toString());
-            }
-        }
-
-        public void rollbackTransactions(int channelId) throws JMException
-        {
-            try
-            {
-                AMQChannel channel = _channelMap.get(channelId);
-                if (channel == null)
-                {
-                    throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
-                }
-                if (channel.isTransactional())
-                {
-                    channel.rollback();
-                }
-            }
-            catch(AMQException ex)
-            {
-                throw new MBeanException(ex, ex.toString());
-            }
-        }
-
-        /**
-         * Creates the list of channels in tabular form from the _channelMap.
-         * @return  list of channels in tabular form.
-         * @throws OpenDataException
-         */
-        public TabularData channels() throws OpenDataException
-        {
-            _channelsList = new TabularDataSupport(_channelsType);
-
-            for (Map.Entry<Integer, AMQChannel> entry : _channelMap.entrySet())
-            {
-                AMQChannel channel = entry.getValue();
-                Object[] itemValues = {channel.getChannelId(), channel.isTransactional(),
-                                       (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null,
-                                       channel.getUnacknowledgedMessageMap().size()};
-
-                CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues);
-                _channelsList.put(channelData);
-            }
-
-            return _channelsList;
-        }
-        
-        public void closeChannel(int id) throws Exception
-        {
-            try
-            {
-                AMQMinaProtocolSession.this.closeChannel(id);
-            }
-            catch (AMQException ex)
-            {
-                throw new Exception(ex.toString());
-            }
-        }
-
-        public void closeConnection() throws Exception
-        {
-            try
-            {
-                AMQMinaProtocolSession.this.closeSession();
-            }
-            catch (AMQException ex)
-            {
-                throw new Exception(ex.toString());
-            }
-        }
-
-        @Override
-        public MBeanNotificationInfo[] getNotificationInfo()
-        {
-            String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
-            String name = MonitorNotification.class.getName();
-            String description = "Channel count has reached threshold value";
-            MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
-
-            return new MBeanNotificationInfo[] {info1};
-        }
-
-        private void checkForNotification()
-        {
-            int channelsCount = _channelMap.size();
-            if (channelsCount >= getMaximumNumberOfChannels())
-            {
-                Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
-                                     ++_notificationSequenceNumber, System.currentTimeMillis(),
-                                     "Channel count (" + channelsCount + ") has reached the threshold value");
-
-                _broadcaster.sendNotification(n);
-            }
-        }
-
-    } // End of MBean class
 
     public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
                                   AMQCodecFactory codecFactory)
@@ -322,11 +122,11 @@
         _managedObject.register();
     }
 
-    private ManagedAMQProtocolSession createMBean() throws AMQException
+    private AMQProtocolSessionMBean createMBean() throws AMQException
     {
         try
         {
-            return new ManagedAMQProtocolSession();
+            return new AMQProtocolSessionMBean(this);
         }
         catch(JMException ex)
         {
@@ -335,6 +135,11 @@
         }
     }
 
+    public IoSession getIOSession()
+    {
+        return _minaProtocolSession;
+    }
+
     public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession)
     {
         return (AMQProtocolSession) minaProtocolSession.getAttachment();
@@ -495,6 +300,11 @@
         _contextKey = contextKey;
     }
 
+    public List<AMQChannel> getChannels()
+    {
+        return new ArrayList<AMQChannel>(_channelMap.values());
+    }
+
     public AMQChannel getChannel(int channelId) throws AMQException
     {
         return _channelMap.get(channelId);
@@ -503,7 +313,42 @@
     public void addChannel(AMQChannel channel)
     {
         _channelMap.put(channel.getChannelId(), channel);
-        _managedObject.checkForNotification();
+        checkForNotification();
+    }
+
+    private void checkForNotification()
+    {
+        int channelsCount = _channelMap.size();
+        if (channelsCount >= _maxNoOfChannels)
+        {
+            _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
+        }
+    }
+
+    public Long getMaximumNumberOfChannels()
+    {
+        return _maxNoOfChannels;
+    }
+
+    public void setMaximumNumberOfChannels(Long value)
+    {
+        _maxNoOfChannels = value;
+    }
+
+    public void commitTransactions(AMQChannel channel) throws AMQException
+    {
+        if (channel != null && channel.isTransactional())
+        {
+            channel.commit();
+        }
+    }
+
+    public void rollbackTransactions(AMQChannel channel) throws AMQException
+    {
+        if (channel != null && channel.isTransactional())
+        {
+            channel.rollback();
+        }
     }
 
     /**

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=488264&r1=488263&r2=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Dec 18 05:09:22 2006
@@ -21,16 +21,9 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -38,15 +31,8 @@
 import org.apache.qpid.server.txn.TxnOp;
 
 import javax.management.JMException;
-import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.Notification;
-import javax.management.monitor.MonitorNotification;
-import javax.management.openmbean.*;
 import java.text.MessageFormat;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Properties;
 import java.util.concurrent.Executor;
 
 /**
@@ -110,7 +96,7 @@
      * max allowed number of messages on a queue.
      */
     private Integer _maxMessageCount = 10000;
-
+    
     /**
      * max queue depth(KB) for the queue
      */
@@ -126,322 +112,6 @@
         return _name.compareTo(((AMQQueue) o).getName());
     }
 
-    /**
-     * MBean class for AMQQueue. It implements all the management features exposed
-     * for an AMQQueue.
-     */
-    @MBeanDescription("Management Interface for AMQQueue")
-    private final class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
-    {
-        private String _queueName = null;
-        // OpenMBean data types for viewMessages method
-        private String[] _msgAttributeNames = {"Message Id", "Header", "Size(bytes)", "Redelivered"};
-        private String[] _msgAttributeIndex = {_msgAttributeNames[0]};
-        private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
-        private CompositeType _messageDataType = null;           // Composite type for representing AMQ Message data.
-        private TabularType _messagelistDataType = null;         // Datatype for representing AMQ messages list.
-
-        // OpenMBean data types for viewMessageContent method
-        private CompositeType _msgContentType = null;
-        private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"};
-        private OpenType[] _msgContentAttributeTypes = new OpenType[4];
-
-        @MBeanConstructor("Creates an MBean exposing an AMQQueue")
-        public AMQQueueMBean() throws JMException
-        {
-            super(ManagedQueue.class, ManagedQueue.TYPE);
-            init();
-        }
-
-        /**
-         * initialises the openmbean data types
-         */
-        private void init() throws OpenDataException
-        {
-            _queueName = jmxEncode(new StringBuffer(_name), 0).toString();
-            _msgContentAttributeTypes[0] = SimpleType.LONG;                    // For message id
-            _msgContentAttributeTypes[1] = SimpleType.STRING;                  // For MimeType
-            _msgContentAttributeTypes[2] = SimpleType.STRING;                  // For Encoding
-            _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE);  // For message content
-            _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
-                                                _msgContentAttributes, _msgContentAttributeTypes);
-
-            _msgAttributeTypes[0] = SimpleType.LONG;                      // For message id
-            _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING);  // For header attributes
-            _msgAttributeTypes[2] = SimpleType.LONG;                      // For size
-            _msgAttributeTypes[3] = SimpleType.BOOLEAN;                   // For redelivered
-
-            _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
-            _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
-        }
-
-        public String getObjectInstanceName()
-        {
-            return _queueName;
-        }
-
-        public String getName()
-        {
-            return _name;
-        }
-
-        public boolean isDurable()
-        {
-            return _durable;
-        }
-
-        public String getOwner()
-        {
-            return _owner;
-        }
-
-        public boolean isAutoDelete()
-        {
-            return _autoDelete;
-        }
-
-        public Integer getMessageCount()
-        {
-            return _deliveryMgr.getQueueMessageCount();
-        }
-
-        public Long getMaximumMessageSize()
-        {
-            return _maxMessageSize;
-        }
-
-        public void setMaximumMessageSize(Long value)
-        {
-            _maxMessageSize = value;
-        }
-
-        public Integer getConsumerCount()
-        {
-            return _subscribers.size();
-        }
-
-        public Integer getActiveConsumerCount()
-        {
-            return _subscribers.getWeight();
-        }
-
-        public Long getReceivedMessageCount()
-        {
-            return _totalMessagesReceived;
-        }
-
-        public Integer getMaximumMessageCount()
-        {
-            return _maxMessageCount;
-        }
-
-        public void setMaximumMessageCount(Integer value)
-        {
-            _maxMessageCount = value;
-        }
-
-        public Long getMaximumQueueDepth()
-        {
-            return _maxQueueDepth;
-        }
-
-        // Sets the queue depth, the max queue size
-        public void setMaximumQueueDepth(Long value)
-        {
-            _maxQueueDepth = value;
-        }
-
-        /**
-         * returns the size of messages(KB) in the queue.
-         */
-        public Long getQueueDepth()
-        {
-            List<AMQMessage> list = _deliveryMgr.getMessages();
-            if (list.size() == 0)
-            {
-                return 0l;
-            }
-
-            long queueDepth = 0;
-            for (AMQMessage message : list)
-            {
-                queueDepth = queueDepth + getMessageSize(message);
-            }
-            return (long) Math.round(queueDepth / 1000);
-        }
-
-        /**
-         * returns size of message in bytes
-         */
-        private long getMessageSize(AMQMessage msg)
-        {
-            if (msg == null)
-            {
-                return 0l;
-            }
-
-            return msg.getContentHeaderBody().bodySize;
-        }
-
-        /**
-         * Checks if there is any notification to be send to the listeners
-         */
-        private void checkForNotification(AMQMessage msg)
-        {
-            // Check for threshold message count
-            Integer msgCount = getMessageCount();
-            if (msgCount >= getMaximumMessageCount())
-            {
-                notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value");
-            }
-
-            // Check for threshold message size
-            long messageSize = getMessageSize(msg);
-            if (messageSize >= _maxMessageSize)
-            {
-                notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value");
-            }
-
-            // Check for threshold queue depth in bytes
-            long queueDepth = getQueueDepth();
-            if (queueDepth >= _maxQueueDepth)
-            {
-                notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
-            }
-        }
-
-        /**
-         * Sends the notification to the listeners
-         */
-        private void notifyClients(String notificationMsg)
-        {
-            Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
-                                              ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
-
-            _broadcaster.sendNotification(n);
-        }
-
-        public void deleteMessageFromTop() throws JMException
-        {
-            try
-            {
-                _deliveryMgr.removeAMessageFromTop();
-            }
-            catch (AMQException ex)
-            {
-                throw new MBeanException(ex, ex.toString());
-            }
-        }
-
-        public void clearQueue() throws JMException
-        {
-            try
-            {
-                _deliveryMgr.clearAllMessages();
-            }
-            catch (AMQException ex)
-            {
-                throw new MBeanException(ex, ex.toString());
-            }
-        }
-
-        /**
-         * returns message content as byte array and related attributes for the given message id.
-         */
-        public CompositeData viewMessageContent(long msgId) throws JMException
-        {
-            List<AMQMessage> list = _deliveryMgr.getMessages();
-            AMQMessage msg = null;
-            for (AMQMessage message : list)
-            {
-                if (message.getMessageId() == msgId)
-                {
-                    msg = message;
-                    break;
-                }
-            }
-
-            if (msg == null)
-            {
-                throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
-            }
-            // get message content
-            List<ContentBody> cBodies = msg.getContentBodies();
-            List<Byte> msgContent = new ArrayList<Byte>();
-            for (ContentBody body : cBodies)
-            {
-                if (body.getSize() != 0)
-                {
-                    ByteBuffer slice = body.payload.slice();
-                    for (int j = 0; j < slice.limit(); j++)
-                    {
-                        msgContent.add(slice.get());
-                    }
-                }
-            }
-
-            // Create header attributes list
-            BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
-            String mimeType = headerProperties.getContentType();
-            String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
-            Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
-
-            return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
-        }
-
-        /**
-         * Returns the header contents of the messages stored in this queue in tabular form.
-         */
-        public TabularData viewMessages(int beginIndex, int endIndex) throws JMException
-        {
-            if ((beginIndex > endIndex) || (beginIndex < 1))
-            {
-                throw new JMException("From Index = " + beginIndex + ", To Index = " + endIndex +
-                                      "\nFrom Index should be greater than 0 and less than To Index");
-            }
-
-            List<AMQMessage> list = _deliveryMgr.getMessages();
-            TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
-
-            // Create the tabular list of message header contents
-            for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
-            {
-                AMQMessage msg = list.get(i - 1);
-                ContentHeaderBody headerBody = msg.getContentHeaderBody();
-                // Create header attributes list
-                BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
-                List<String> headerAttribsList = new ArrayList<String>();
-                headerAttribsList.add("App Id=" + headerProperties.getAppId());
-                headerAttribsList.add("MimeType=" + headerProperties.getContentType());
-                headerAttribsList.add("Correlation Id=" + headerProperties.getCorrelationId());
-                headerAttribsList.add("Encoding=" + headerProperties.getEncoding());
-                headerAttribsList.add(headerProperties.toString());
-
-                Object[] itemValues = {msg.getMessageId(), headerAttribsList.toArray(new String[0]),
-                                       headerBody.bodySize, msg.isRedelivered()};
-                CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
-                _messageList.put(messageData);
-            }
-
-            return _messageList;
-        }
-
-        /**
-         * returns Notifications sent by this MBean.
-         */
-        @Override
-        public MBeanNotificationInfo[] getNotificationInfo()
-        {
-            String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
-            String name = MonitorNotification.class.getName();
-            String description = "Either Message count or Queue depth or Message size has reached threshold high value";
-            MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
-
-            return new MBeanNotificationInfo[]{info1};
-        }
-
-    } // End of AMQMBean class
-
     public AMQQueue(String name, boolean durable, String owner,
                     boolean autoDelete, QueueRegistry queueRegistry)
             throws AMQException
@@ -523,33 +193,32 @@
         {
             if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
             {
-                _logger.warn("Using ConcurrentSelectorDeliveryManager");
+                _logger.info("Using ConcurrentSelectorDeliveryManager");
                 _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
             }
             else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
             {
-                _logger.warn("Using ConcurrentDeliveryManager");
+                _logger.info("Using ConcurrentDeliveryManager");
                 _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
             }
             else
             {
-                _logger.warn("Using SynchronizedDeliveryManager");
+                _logger.info("Using SynchronizedDeliveryManager");
                 _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
             }
         }
         else
         {
-            _logger.warn("Using SynchronizedDeliveryManager");
+            _logger.info("Using SynchronizedDeliveryManager");
             _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
         }
-
     }
 
     private AMQQueueMBean createMBean() throws AMQException
     {
         try
         {
-            return new AMQQueueMBean();
+            return new AMQQueueMBean(this);
         }
         catch (JMException ex)
         {
@@ -582,14 +251,110 @@
         return _autoDelete;
     }
 
+    /**
+     * @return no of messages(undelivered) on the queue.
+     */
     public int getMessageCount()
     {
         return _deliveryMgr.getQueueMessageCount();
     }
 
+    /**
+     * @return List of messages(undelivered) on the queue.
+     */
+    public List<AMQMessage> getMessagesOnTheQueue()
+    {
+        return _deliveryMgr.getMessages();
+    }
+
+    /**
+     * @param messageId
+     * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
+     */
+    public AMQMessage getMessageOnTheQueue(long messageId)
+    {
+        List<AMQMessage> list = getMessagesOnTheQueue();
+        AMQMessage msg = null;
+        for (AMQMessage message : list)
+        {
+            if (message.getMessageId() == messageId)
+            {
+                msg = message;
+                break;
+            }
+        }
+
+        return msg;
+    }
+
+    /**
+     * @return MBean object associated with this Queue
+     */
     public ManagedObject getManagedObject()
     {
         return _managedObject;
+    }
+
+    public Long getMaximumMessageSize()
+    {
+        return _maxMessageSize;
+    }
+
+    public void setMaximumMessageSize(Long value)
+    {
+        _maxMessageSize = value;
+    }
+
+    public Integer getConsumerCount()
+    {
+        return _subscribers.size();
+    }
+
+    public Integer getActiveConsumerCount()
+    {
+        return _subscribers.getWeight();
+    }
+
+    public Long getReceivedMessageCount()
+    {
+        return _totalMessagesReceived;
+    }
+
+    public Integer getMaximumMessageCount()
+    {
+        return _maxMessageCount;
+    }
+
+    public void setMaximumMessageCount(Integer value)
+    {
+        _maxMessageCount = value;
+    }
+
+    public Long getMaximumQueueDepth()
+    {
+        return _maxQueueDepth;
+    }
+
+    // Sets the queue depth, the max queue size
+    public void setMaximumQueueDepth(Long value)
+    {
+        _maxQueueDepth = value;
+    }
+
+    /**
+     * Removes the AMQMessage from the top of the queue.
+     */
+    public void deleteMessageFromTop() throws AMQException
+    {
+        _deliveryMgr.removeAMessageFromTop();
+    }
+
+    /**
+     * removes all the messages from the queue.
+     */
+    public void clearQueue() throws AMQException
+    {
+        _deliveryMgr.clearAllMessages();
     }
 
     public void bind(String routingKey, Exchange exchange)

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java?view=diff&rev=488264&r1=488263&r2=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java Mon Dec 18 05:09:22 2006
@@ -281,7 +281,12 @@
             //are we already running? if so, don't re-run
             if (_processing.compareAndSet(false, true))
             {
-                executor.execute(asyncDelivery);
+                // Do we need this?
+                // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok.
+                //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown())
+                {
+                    executor.execute(asyncDelivery);
+                }
             }
         }
     }

Added: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=auto&rev=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (added)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Mon Dec 18 05:09:22 2006
@@ -0,0 +1,352 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
+import org.apache.qpid.configuration.Configured;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.configuration.Configurator;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * Manages delivery of messages on behalf of a queue
+ */
+public class ConcurrentSelectorDeliveryManager implements DeliveryManager
+{
+    private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class);
+
+    @Configured(path = "advanced.compressBufferOnQueue",
+                defaultValue = "false")
+    public boolean compressBufferOnQueue;
+    /**
+     * Holds any queued messages
+     */
+    private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+    //private int _messageCount;
+    /**
+     * Ensures that only one asynchronous task is running for this manager at
+     * any time.
+     */
+    private final AtomicBoolean _processing = new AtomicBoolean();
+    /**
+     * The subscriptions on the queue to whom messages are delivered
+     */
+    private final SubscriptionManager _subscriptions;
+
+    /**
+     * A reference to the queue we are delivering messages for. We need this to be able
+     * to pass the code that handles acknowledgements a handle on the queue.
+     */
+    private final AMQQueue _queue;
+
+
+    /**
+     * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
+     * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered
+     * via the async thread.
+     * <p/>
+     * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
+     */
+    private ReentrantLock _lock = new ReentrantLock();
+
+
+    ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
+    {
+
+        //Set values from configuration
+        Configurator.configure(this);
+
+        if (compressBufferOnQueue)
+        {
+            _log.warn("Compressing Buffers on queue.");
+        }
+
+        _subscriptions = subscriptions;
+        _queue = queue;
+    }
+
+
+    private boolean addMessageToQueue(AMQMessage msg)
+    {
+        // Shrink the ContentBodies to their actual size to save memory.
+        if (compressBufferOnQueue)
+        {
+            Iterator it = msg.getContentBodies().iterator();
+            while (it.hasNext())
+            {
+                ContentBody cb = (ContentBody) it.next();
+                cb.reduceBufferToFit();
+            }
+        }
+
+        _messages.offer(msg);
+
+        return true;
+    }
+
+
+    public boolean hasQueuedMessages()
+    {
+        _lock.lock();
+        try
+        {
+            return !_messages.isEmpty();
+        }
+        finally
+        {
+            _lock.unlock();
+        }
+    }
+
+    public int getQueueMessageCount()
+    {
+        return getMessageCount();
+    }
+
+    /**
+     * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
+     * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
+     *
+     * @return int the number of messages in the delivery queue.
+     */
+    private int getMessageCount()
+    {
+        return _messages.size();
+    }
+
+
+    public synchronized List<AMQMessage> getMessages()
+    {
+        return new ArrayList<AMQMessage>(_messages);
+    }
+
+    public synchronized void removeAMessageFromTop() throws AMQException
+    {
+        AMQMessage msg = poll();
+        if (msg != null)
+        {
+            msg.dequeue(_queue);
+        }
+    }
+
+    public synchronized void clearAllMessages() throws AMQException
+    {
+        AMQMessage msg = poll();
+        while (msg != null)
+        {
+            msg.dequeue(_queue);
+            msg = poll();
+        }
+    }
+
+
+    private AMQMessage getNextMessage(Queue<AMQMessage> messages)
+    {
+        AMQMessage message = messages.peek();
+
+        while (message != null && message.taken())
+        {
+            //remove the already taken message
+            messages.poll();
+            // try the next message
+            message = messages.peek();
+        }
+        return message;
+    }
+
+    public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue, AMQQueue queue)
+    {
+        AMQMessage message = null;
+        try
+        {
+            message = getNextMessage(messageQueue);
+
+            // message will be null if we have no messages in the messageQueue.
+            if (message == null)
+            {
+                return;
+            }
+            _log.info("Async Delivery Message:" + message + " to :" + sub);
+
+            sub.send(message, queue);
+            message.setDeliveredToConsumer();
+
+            //remove sent message from our queue.
+            messageQueue.poll();
+        }
+        catch (FailedDequeueException e)
+        {
+            message.release();
+            _log.error("Unable to deliver message as dequeue failed: " + e, e);
+        }
+    }
+
+    /**
+     * Only one thread should ever execute this method concurrently, but
+     * it can do so while other threads invoke deliver().
+     */
+    private void processQueue()
+    {
+        // Continue to process delivery while we haveSubscribers and messages
+        boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
+
+        while (hasSubscribers && hasQueuedMessages())
+        {
+            for (Subscription sub : _subscriptions.getSubscriptions())
+            {
+                if (!sub.isSuspended())
+                {
+                    if (sub.hasFilters())
+                    {
+                        sendNextMessage(sub, sub.getPreDeliveryQueue(), _queue);
+                    }
+                    else
+                    {
+                        sendNextMessage(sub, _messages, _queue);
+                    }
+
+                    hasSubscribers = true;
+                }
+                else
+                {
+                    hasSubscribers = false;
+                }
+            }
+        }
+    }
+
+    private AMQMessage poll()
+    {
+        return _messages.poll();
+    }
+
+    public void deliver(String name, AMQMessage msg) throws FailedDequeueException
+    {
+        _log.info("deliver :" + msg);
+
+        //Check if we have someone to deliver the message to.
+        _lock.lock();
+        try
+        {
+            Subscription s = _subscriptions.nextSubscriber(msg);
+
+            if (s == null) //no-one can take the message right now.
+            {
+                _log.info("Testing Message(" + msg + ") for Queued Delivery");
+                if (!msg.isImmediate())
+                {
+                    addMessageToQueue(msg);
+
+                    //release lock now message is on queue.
+                    _lock.unlock();
+
+                    //Pre Deliver to all subscriptions
+                    _log.info("We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to.");
+                    for (Subscription sub : _subscriptions.getSubscriptions())
+                    {
+
+                        // stop if the message gets delivered whilst PreDelivering if we have a shared queue.
+                        if (_queue.isShared() && msg.getDeliveredToConsumer())
+                        {
+                            _log.info("Stopping PreDelivery as message(" + msg + ") is already delivered.");
+                            continue;
+                        }
+
+                        // Only give the message to those that want them.
+                        if (sub.hasFilters() && sub.hasInterest(msg))
+                        {
+                            sub.enqueueForPreDelivery(msg);
+                        }
+                    }
+                }
+            }
+            else
+            {
+                //release lock now
+                _lock.unlock();
+
+                _log.info("Delivering Message:" + msg + " to(" + System.identityHashCode(s) + ") :" + s);
+                //Deliver the message
+                s.send(msg, _queue);
+                msg.setDeliveredToConsumer();
+            }
+        }
+        finally
+        {
+            //ensure lock is released
+            if (_lock.isLocked())
+            {
+                _lock.unlock();
+            }
+        }
+    }
+
+    Runner asyncDelivery = new Runner();
+
+    private class Runner implements Runnable
+    {
+        public void run()
+        {
+            boolean running = true;
+            while (running)
+            {
+                processQueue();
+
+                //Check that messages have not been added since we did our last peek();
+                // Synchronize with the thread that adds to the queue.
+                // If the queue is still empty then we can exit
+
+                if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers()))
+                {
+                    running = false;
+                    _processing.set(false);
+                }
+            }
+        }
+    }
+
+    public void processAsync(Executor executor)
+    {
+        _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
+                   " Active:" + _subscriptions.hasActiveSubscribers() +
+                   " Processing:" + _processing.get());
+
+        if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
+        {
+            //are we already running? if so, don't re-run
+            if (_processing.compareAndSet(false, true))
+            {
+                executor.execute(asyncDelivery);
+            }
+        }
+    }
+
+}

Propchange: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=488264&r1=488263&r2=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Mon Dec 18 05:09:22 2006
@@ -118,6 +118,12 @@
         }
     }
 
+    public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
+                            String consumerTag)
+            throws AMQException
+    {
+        this(channel, protocolSession, consumerTag, false);
+    }
 
     public boolean equals(Object o)
     {

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java?view=diff&rev=488264&r1=488263&r2=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java Mon Dec 18 05:09:22 2006
@@ -206,7 +206,12 @@
             //are we already running? if so, don't re-run
             if (_processing.compareAndSet(false, true))
             {
-                executor.execute(new Runner());
+                // Do we need this?
+                // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok.
+                //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown())
+                {
+                    executor.execute(new Runner());
+                }
             }
         }
     }

Modified: incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=488264&r1=488263&r2=488264
==============================================================================
--- incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Mon Dec 18 05:09:22 2006
@@ -109,6 +109,7 @@
         frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
         frame2handlerMap.put(ExchangeDeclareBody.class, ExchangeDeclareHandler.getInstance());
         frame2handlerMap.put(ExchangeDeleteBody.class, ExchangeDeleteHandler.getInstance());
+        frame2handlerMap.put(ExchangeBoundBody.class, ExchangeBoundHandler.getInstance());
         frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance());
         frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance());
         frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance());