You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/01 13:51:52 UTC

svn commit: r1295541 [3/10] - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ bdbstore/bin/ bdbstore/etc/scripts/ bdbstore/src/main/java/ bdbstore/src/resources/ bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ broker-plugins/access-...

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java Thu Mar  1 12:51:40 2012
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.management;
 
 import javax.management.JMException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanNotificationInfo;
 import javax.management.MalformedObjectNameException;
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
@@ -28,7 +30,6 @@ import javax.management.StandardMBean;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
 
 /**
  * Provides implementation of the boilerplate ManagedObject interface. Most managed objects should find it useful
@@ -37,11 +38,13 @@ import org.apache.qpid.server.registry.I
  */
 public abstract class DefaultManagedObject extends StandardMBean implements ManagedObject
 {
-    private static final Logger LOGGER = Logger.getLogger(ApplicationRegistry.class);
+    private static final Logger LOGGER = Logger.getLogger(DefaultManagedObject.class);
     
-    private Class<?> _managementInterface;
+    private final Class<?> _managementInterface;
 
-    private String _typeName;
+    private final String _typeName;
+
+    private final MBeanInfo _mbeanInfo;
 
     private ManagedObjectRegistry _registry;
 
@@ -51,6 +54,13 @@ public abstract class DefaultManagedObje
         super(managementInterface);
         _managementInterface = managementInterface;
         _typeName = typeName;
+        _mbeanInfo = buildMBeanInfo();
+    }
+
+    @Override
+    public MBeanInfo getMBeanInfo()
+    {
+        return _mbeanInfo;
     }
 
     public String getType()
@@ -98,7 +108,6 @@ public abstract class DefaultManagedObje
         return getObjectInstanceName() + "[" + getType() + "]";
     }
 
-
     /**
      * Created the ObjectName as per the JMX Specs
      * @return ObjectName
@@ -161,4 +170,18 @@ public abstract class DefaultManagedObje
             return "";
     }
 
+    private MBeanInfo buildMBeanInfo() throws NotCompliantMBeanException
+    {
+        return new MBeanInfo(this.getClass().getName(),
+                      MBeanIntrospector.getMBeanDescription(this.getClass()),
+                      MBeanIntrospector.getMBeanAttributesInfo(getManagementInterface()),
+                      MBeanIntrospector.getMBeanConstructorsInfo(this.getClass()),
+                      MBeanIntrospector.getMBeanOperationsInfo(getManagementInterface()),
+                      this.getNotificationInfo());
+    }
+
+    public MBeanNotificationInfo[] getNotificationInfo()
+    {
+        return null;
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java Thu Mar  1 12:51:40 2012
@@ -38,10 +38,13 @@ import java.rmi.server.RMIClientSocketFa
 import java.rmi.server.RMIServerSocketFactory;
 import java.rmi.server.UnicastRemoteObject;
 import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.management.JMException;
 import javax.management.MBeanServer;
 import javax.management.MBeanServerFactory;
+import javax.management.Notification;
 import javax.management.NotificationFilterSupport;
 import javax.management.NotificationListener;
 import javax.management.ObjectName;
@@ -49,11 +52,13 @@ import javax.management.remote.JMXConnec
 import javax.management.remote.JMXConnectorServer;
 import javax.management.remote.JMXServiceURL;
 import javax.management.remote.MBeanServerForwarder;
+import javax.management.remote.rmi.RMIConnection;
 import javax.management.remote.rmi.RMIConnectorServer;
 import javax.management.remote.rmi.RMIJRMPServerImpl;
 import javax.management.remote.rmi.RMIServerImpl;
 import javax.rmi.ssl.SslRMIClientSocketFactory;
 import javax.rmi.ssl.SslRMIServerSocketFactory;
+import javax.security.auth.Subject;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
@@ -63,6 +68,7 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.auth.rmi.RMIPasswordAuthenticator;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
 
 /**
  * This class starts up an MBeanserver. If out of the box agent has been enabled then there are no 
@@ -223,7 +229,40 @@ public class JMXManagedObjectRegistry im
          * The registry is exported on the defined management port 'port'. We will export the RMIConnectorServer
          * on 'port +1'. Use of these two well-defined ports will ease any navigation through firewall's. 
          */
-        final RMIServerImpl rmiConnectorServerStub = new RMIJRMPServerImpl(_jmxPortConnectorServer, csf, ssf, env);
+        final Map<String, String> connectionIdUsernameMap = new ConcurrentHashMap<String, String>();
+        final RMIServerImpl rmiConnectorServerStub = new RMIJRMPServerImpl(_jmxPortConnectorServer, csf, ssf, env)
+        {
+
+            /**
+             * Override makeClient so we can cache the username of the client in a Map keyed by connectionId.
+             * ConnectionId is guaranteed to be unique per client connection, according to the JMS spec.
+             * An instance of NotificationListener (mapCleanupListener) will be responsible for removing these Map
+             * entries.
+             *
+             * @see javax.management.remote.rmi.RMIJRMPServerImpl#makeClient(java.lang.String, javax.security.auth.Subject)
+             */
+            @Override
+            protected RMIConnection makeClient(String connectionId, Subject subject) throws IOException
+            {
+                final RMIConnection makeClient = super.makeClient(connectionId, subject);
+                final UsernamePrincipal usernamePrincipalFromSubject = UsernamePrincipal.getUsernamePrincipalFromSubject(subject);
+                connectionIdUsernameMap.put(connectionId, usernamePrincipalFromSubject.getName());
+                return makeClient;
+            }
+        };
+
+        // Create a Listener responsible for removing the map entries add by the #makeClient entry above.
+        final NotificationListener mapCleanupListener = new NotificationListener()
+        {
+
+            @Override
+            public void handleNotification(Notification notification, Object handback)
+            {
+                final String connectionId = ((JMXConnectionNotification) notification).getConnectionId();
+                connectionIdUsernameMap.remove(connectionId);
+            }
+        };
+
         String localHost;
         try
         {
@@ -295,13 +334,26 @@ public class JMXManagedObjectRegistry im
         MBeanServerForwarder mbsf = MBeanInvocationHandlerImpl.newProxyInstance();
         _cs.setMBeanServerForwarder(mbsf);
 
-        NotificationFilterSupport filter = new NotificationFilterSupport();
-        filter.enableType(JMXConnectionNotification.OPENED);
-        filter.enableType(JMXConnectionNotification.CLOSED);
-        filter.enableType(JMXConnectionNotification.FAILED);
+
         // Get the handler that is used by the above MBInvocationHandler Proxy.
-        // which is the MBeanInvocationHandlerImpl and so also a NotificationListener
-        _cs.addNotificationListener((NotificationListener) Proxy.getInvocationHandler(mbsf), filter, null);
+        // which is the MBeanInvocationHandlerImpl and so also a NotificationListener.
+        final NotificationListener invocationHandler = (NotificationListener) Proxy.getInvocationHandler(mbsf);
+
+        // Install a notification listener on OPENED, CLOSED, and FAILED,
+        // passing the map of connection-ids to usernames as hand-back data.
+        final NotificationFilterSupport invocationHandlerFilter = new NotificationFilterSupport();
+        invocationHandlerFilter.enableType(JMXConnectionNotification.OPENED);
+        invocationHandlerFilter.enableType(JMXConnectionNotification.CLOSED);
+        invocationHandlerFilter.enableType(JMXConnectionNotification.FAILED);
+        _cs.addNotificationListener(invocationHandler, invocationHandlerFilter, connectionIdUsernameMap);
+
+        // Install a second notification listener on CLOSED AND FAILED only to remove the entry from the
+        // Map.  Here we rely on the fact that JMX will call the listeners in the order in which they are
+        // installed.
+        final NotificationFilterSupport mapCleanupHandlerFilter = new NotificationFilterSupport();
+        mapCleanupHandlerFilter.enableType(JMXConnectionNotification.CLOSED);
+        mapCleanupHandlerFilter.enableType(JMXConnectionNotification.FAILED);
+        _cs.addNotificationListener(mapCleanupListener, mapCleanupHandlerFilter, null);
 
         _cs.start();
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java Thu Mar  1 12:51:40 2012
@@ -26,6 +26,7 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.security.AccessControlContext;
 import java.security.AccessController;
+import java.util.Map;
 import java.util.Set;
 
 import javax.management.Attribute;
@@ -45,6 +46,7 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.server.logging.actors.ManagementActor;
 import org.apache.qpid.server.logging.messages.ManagementConsoleMessages;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.access.Operation;
 
@@ -56,22 +58,54 @@ public class MBeanInvocationHandlerImpl 
 {
     private static final Logger _logger = Logger.getLogger(MBeanInvocationHandlerImpl.class);
 
+    private final IApplicationRegistry _appRegistry = ApplicationRegistry.getInstance();
     private final static String DELEGATE = "JMImplementation:type=MBeanServerDelegate";
     private MBeanServer _mbs;
-    private static ManagementActor  _logActor;
-    
+    private final ManagementActor _logActor = new ManagementActor(_appRegistry.getRootMessageLogger());
+    private final boolean _managementRightsInferAllAccess =
+        _appRegistry.getConfiguration().getManagementRightsInferAllAccess();
+
     public static MBeanServerForwarder newProxyInstance()
     {
         final InvocationHandler handler = new MBeanInvocationHandlerImpl();
         final Class<?>[] interfaces = new Class[] { MBeanServerForwarder.class };
 
-
-        _logActor = new ManagementActor(ApplicationRegistry.getInstance().getRootMessageLogger());
-
         Object proxy = Proxy.newProxyInstance(MBeanServerForwarder.class.getClassLoader(), interfaces, handler);
         return MBeanServerForwarder.class.cast(proxy);
     }
 
+    private boolean invokeDirectly(String methodName, Object[] args, Subject subject)
+    {
+        // Allow operations performed locally on behalf of the connector server itself
+        if (subject == null)
+        {
+            return true;
+        }
+
+        if (args == null || DELEGATE.equals(args[0]))
+        {
+            return true;
+        }
+
+        // Allow querying available object names
+        if (methodName.equals("queryNames"))
+        {
+            return true;
+        }
+
+        if (args[0] instanceof ObjectName)
+        {
+            ObjectName mbean = (ObjectName) args[0];
+
+            if(!DefaultManagedObject.DOMAIN.equalsIgnoreCase(mbean.getDomain()))
+            {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
     {
         final String methodName = getMethodName(method, args);
@@ -95,36 +129,24 @@ public class MBeanInvocationHandlerImpl 
             return null;
         }
 
+        // Restrict access to "createMBean" and "unregisterMBean" to any user
+        if (methodName.equals("createMBean") || methodName.equals("unregisterMBean"))
+        {
+            _logger.debug("User trying to create or unregister an MBean");
+            throw new SecurityException("Access denied: " + methodName);
+        }
+
         // Retrieve Subject from current AccessControlContext
         AccessControlContext acc = AccessController.getContext();
         Subject subject = Subject.getSubject(acc);
 
         try
         {
-            // Allow operations performed locally on behalf of the connector server itself
-            if (subject == null)
+            if(invokeDirectly(methodName, args, subject))
             {
                 return method.invoke(_mbs, args);
             }
-    
-            if (args == null || DELEGATE.equals(args[0]))
-            {
-                return method.invoke(_mbs, args);
-            }
-    
-            // Restrict access to "createMBean" and "unregisterMBean" to any user
-            if (methodName.equals("createMBean") || methodName.equals("unregisterMBean"))
-            {
-                _logger.debug("User trying to create or unregister an MBean");
-                throw new SecurityException("Access denied: " + methodName);
-            }
-    
-            // Allow querying available object names
-            if (methodName.equals("queryNames"))
-            {
-                return method.invoke(_mbs, args);
-            }
-    
+
             // Retrieve JMXPrincipal from Subject
             Set<JMXPrincipal> principals = subject.getPrincipals(JMXPrincipal.class);
             if (principals == null || principals.isEmpty())
@@ -134,23 +156,23 @@ public class MBeanInvocationHandlerImpl 
 
             // Save the subject
             SecurityManager.setThreadSubject(subject);
-   
+
             // Get the component, type and impact, which may be null
             String type = getType(method, args);
             String vhost = getVirtualHost(method, args);
             int impact = getImpact(method, args);
-            
+
             // Get the security manager for the virtual host (if set)
             SecurityManager security;
             if (vhost == null)
             {
-                security = ApplicationRegistry.getInstance().getSecurityManager();
+                security = _appRegistry.getSecurityManager();
             }
             else
             {
-                security = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(vhost).getSecurityManager();
+                security = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhost).getSecurityManager();
             }
-            
+
 			if (isAccessMethod(methodName) || impact == MBeanOperationInfo.INFO)
 			{
 				// Check for read-only method invocation permission
@@ -159,25 +181,33 @@ public class MBeanInvocationHandlerImpl 
                     throw new SecurityException("Permission denied: Access " + methodName);
                 }
 			}
-			else if (isUpdateMethod(methodName))
-            {
-	            // Check for setting properties permission
-                if (!security.authoriseMethod(Operation.UPDATE, type, methodName))
-                {
-                    throw new SecurityException("Permission denied: Update " + methodName);
-                }
-            }
-			else 
-            {
-	            // Check for invoking/executing method action/operation permission
-                if (!security.authoriseMethod(Operation.EXECUTE, type, methodName))
-                {
-                    throw new SecurityException("Permission denied: Execute " + methodName);
-                }
-            }
-			
-			// Actually invoke the method
-			return method.invoke(_mbs, args);
+			else
+			{
+			    // Check for setting properties permission
+			    if (!security.authoriseMethod(Operation.UPDATE, type, methodName))
+			    {
+			        throw new SecurityException("Permission denied: Update " + methodName);
+			    }
+			}
+
+			boolean oldAccessChecksDisabled = false;
+			if(_managementRightsInferAllAccess)
+			{
+			    oldAccessChecksDisabled = SecurityManager.setAccessChecksDisabled(true);
+			}
+
+			try
+			{
+			    // Actually invoke the method
+			    return method.invoke(_mbs, args);
+			}
+			finally
+			{
+			    if(_managementRightsInferAllAccess)
+			    {
+			        SecurityManager.setAccessChecksDisabled(oldAccessChecksDisabled);
+			    }
+			}
         }
         catch (InvocationTargetException e)
         {
@@ -290,28 +320,44 @@ public class MBeanInvocationHandlerImpl 
         return (methodName.startsWith("query") || methodName.startsWith("get") || methodName.startsWith("is"));
     }
 
-
-    private boolean isUpdateMethod(String methodName)
-    {
-        //handle standard set methods from MBeanServer
-        return methodName.startsWith("set");
-    }
-
-    public void handleNotification(Notification notification, Object handback)
+    /**
+     * Receives notifications from the MBeanServer.
+     */
+    public void handleNotification(final Notification notification, final Object handback)
     {
         assert notification instanceof JMXConnectionNotification;
 
-        // only RMI Connections are serviced here, Local API atta
-        // rmi://169.24.29.116 guest 3
-        String[] connectionData = ((JMXConnectionNotification) notification).getConnectionId().split(" ");
-        String user = connectionData[1];
+        final String connectionId = ((JMXConnectionNotification) notification).getConnectionId();
+        final String type = notification.getType();
+
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Notification connectionId : " + connectionId + " type : " + type
+                    + " Notification handback : " + handback);
+        }
+
+        // Normally JMXManagedObjectRegistry provides a Map as handback data containing a map
+        // between connection id and username.
+        String user = null;
+        if (handback != null && handback instanceof Map)
+        {
+            final Map<String, String> connectionIdUsernameMap = (Map<String, String>) handback;
+            user = connectionIdUsernameMap.get(connectionId);
+        }
+
+        // If user is still null, fallback to an unordered list of Principals from the connection id.
+        if (user == null)
+        {
+            final String[] splitConnectionId = connectionId.split(" ");
+            user = splitConnectionId[1];
+        }
 
-        if (notification.getType().equals(JMXConnectionNotification.OPENED))
+        if (JMXConnectionNotification.OPENED.equals(type))
         {
             _logActor.message(ManagementConsoleMessages.OPEN(user));
         }
-        else if (notification.getType().equals(JMXConnectionNotification.CLOSED) ||
-                 notification.getType().equals(JMXConnectionNotification.FAILED))
+        else if (JMXConnectionNotification.CLOSED.equals(type) ||
+                 JMXConnectionNotification.FAILED.equals(type))
         {
             _logActor.message(ManagementConsoleMessages.CLOSE(user));
         }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java Thu Mar  1 12:51:40 2012
@@ -22,6 +22,7 @@ package org.apache.qpid.server.message;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
@@ -65,7 +66,6 @@ public class AMQMessage extends Abstract
 
     WeakReference<AMQChannel> _channelRef;
 
-
     public AMQMessage(StoredMessage<MessageMetaData> handle)
     {
         this(handle, null);
@@ -122,7 +122,15 @@ public class AMQMessage extends Abstract
 
     public String getRoutingKey()
     {
-        // TODO
+        MessageMetaData messageMetaData = getMessageMetaData();
+        if (messageMetaData != null)
+        {
+            AMQShortString routingKey = messageMetaData.getMessagePublishInfo().getRoutingKey();
+            if (routingKey != null)
+            {
+                return routingKey.asString();
+            }
+        }
         return null;
     }
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Thu Mar  1 12:51:40 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.message;
 
 import java.util.concurrent.atomic.AtomicInteger;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java Thu Mar  1 12:51:40 2012
@@ -79,7 +79,7 @@ class MessageTransferHeader implements A
 
     public byte getPriority()
     {
-        MessageDeliveryPriority priority = _deliveryProps == null
+        MessageDeliveryPriority priority = _deliveryProps == null || !_deliveryProps.hasPriority()
                                            ? MessageDeliveryPriority.MEDIUM
                                            : _deliveryProps.getPriority();
         return (byte) priority.getValue();

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties Thu Mar  1 12:51:40 2012
@@ -75,6 +75,7 @@ org.apache.qpid.server.exchange=0.0.0
 org.apache.qpid.server.logging=0.0.0
 org.apache.qpid.server.logging.actors=0.0.0
 org.apache.qpid.server.logging.subjects=0.0.0
+org.apache.qpid.server.message=0.0.0
 org.apache.qpid.server.management=0.0.0
 org.apache.qpid.server.persistent=0.0.0
 org.apache.qpid.server.plugins=0.0.0

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Thu Mar  1 12:51:40 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol;
 
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -36,36 +37,17 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import javax.management.JMException;
 import javax.security.auth.Subject;
 import javax.security.sasl.SaslServer;
-
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQProtocolHeaderException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
@@ -87,11 +69,15 @@ import org.apache.qpid.server.management
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 import org.apache.qpid.transport.Sender;
@@ -139,7 +125,7 @@ public class AMQProtocolEngine implement
 
     /* AMQP Version for this session */
     private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
-
+    private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
     private FieldTable _clientProperties;
     private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
 
@@ -173,6 +159,9 @@ public class AMQProtocolEngine implement
     private NetworkConnection _network;
     private Sender<ByteBuffer> _sender;
 
+    private volatile boolean _deferFlush;
+    private long _lastReceivedTime;
+
     public ManagedObject getManagedObject()
     {
         return _managedObject;
@@ -240,14 +229,29 @@ public class AMQProtocolEngine implement
         return _closing.get();
     }
 
+    public synchronized void flushBatched()
+    {
+        _sender.flush();
+    }
+
+
+    public ClientDeliveryMethod createDeliveryMethod(int channelId)
+    {
+        return new WriteDeliverMethod(channelId);
+    }
+
     public void received(final ByteBuffer msg)
     {
-        _lastIoTime = System.currentTimeMillis();
+        final long arrivalTime = System.currentTimeMillis();
+        _lastReceivedTime = arrivalTime;
+        _lastIoTime = arrivalTime;
         try
         {
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
-            for (AMQDataBlock dataBlock : dataBlocks)
+            final int len = dataBlocks.size();
+            for (int i = 0; i < len; i++)
             {
+                AMQDataBlock dataBlock = dataBlocks.get(i);
                 try
                 {
                     dataBlockReceived(dataBlock);
@@ -347,7 +351,7 @@ public class AMQProtocolEngine implement
         }
     }
 
-    private void protocolInitiationReceived(ProtocolInitiation pi)
+    private synchronized void protocolInitiationReceived(ProtocolInitiation pi)
     {
         // this ensures the codec never checks for a PI message again
         (_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
@@ -524,12 +528,15 @@ public class AMQProtocolEngine implement
      */
     public synchronized void writeFrame(AMQDataBlock frame)
     {
-        _lastSent = frame;
+
         final ByteBuffer buf = asByteBuffer(frame);
-        _lastIoTime = System.currentTimeMillis();
         _writtenBytes += buf.remaining();
         _sender.send(buf);
-        _sender.flush();
+        _lastIoTime = System.currentTimeMillis();
+        if(!_deferFlush)
+        {
+            _sender.flush();
+        }
     }
 
     public AMQShortString getContextKey()
@@ -918,7 +925,7 @@ public class AMQProtocolEngine implement
     private void setProtocolVersion(ProtocolVersion pv)
     {
         _protocolVersion = pv;
-
+        _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
         _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
         _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion);
     }
@@ -1023,7 +1030,7 @@ public class AMQProtocolEngine implement
 
     public MethodRegistry getMethodRegistry()
     {
-        return MethodRegistry.getMethodRegistry(getProtocolVersion());
+        return _methodRegistry;
     }
 
     public MethodDispatcher getMethodDispatcher()
@@ -1052,7 +1059,7 @@ public class AMQProtocolEngine implement
         // Nothing
     }
 
-    public void writerIdle()
+    public synchronized void writerIdle()
     {
         _sender.send(asByteBuffer(HeartbeatBody.FRAME));
     }
@@ -1109,6 +1116,11 @@ public class AMQProtocolEngine implement
         return _lastIoTime;
     }
 
+    public long getLastReceivedTime()
+    {
+        return _lastReceivedTime;
+    }
+
     public ProtocolSessionIdentifier getSessionIdentifier()
     {
         return _sessionIdentifier;
@@ -1395,16 +1407,220 @@ public class AMQProtocolEngine implement
         _statisticsEnabled = enabled;
     }
 
-    @Override
     public boolean isSessionNameUnique(byte[] name)
     {
         // 0-8/0-9/0-9-1 sessions don't have names
         return true;
     }
 
-    @Override
+    public void setDeferFlush(boolean deferFlush)
+    {
+        _deferFlush = deferFlush;
+    }
+
+
+
     public String getUserName()
     {
         return getAuthorizedPrincipal().getName();
     }
+
+    private static class ByteBufferOutputStream extends OutputStream
+    {
+
+
+        private final ByteBuffer _buf;
+
+        public ByteBufferOutputStream(ByteBuffer buf)
+        {
+            _buf = buf;
+        }
+
+        @Override
+        public void write(int b) throws IOException
+        {
+            _buf.put((byte) b);
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException
+        {
+            _buf.put(b, off, len);
+        }
+    }
+
+    public final class WriteDeliverMethod
+            implements ClientDeliveryMethod
+    {
+        private final int _channelId;
+
+        public WriteDeliverMethod(int channelId)
+        {
+            _channelId = channelId;
+        }
+
+        public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+                throws AMQException
+        {
+            registerMessageDelivered(entry.getMessage().getSize());
+            _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, ((SubscriptionImpl)sub).getConsumerTag());
+            entry.incrementDeliveryCount();
+        }
+
+    }
+
+    private static class BytesDataOutput implements DataOutput
+    {
+        int _pos = 0;
+        byte[] _buf;
+
+        public BytesDataOutput(byte[] buf)
+        {
+            _buf = buf;
+        }
+
+        public void setBuffer(byte[] buf)
+        {
+            _buf = buf;
+            _pos = 0;
+        }
+
+        public void reset()
+        {
+            _pos = 0;
+        }
+
+        public int length()
+        {
+            return _pos;
+        }
+
+        public void write(int b)
+        {
+            _buf[_pos++] = (byte) b;
+        }
+
+        public void write(byte[] b)
+        {
+            System.arraycopy(b, 0, _buf, _pos, b.length);
+            _pos+=b.length;
+        }
+
+
+        public void write(byte[] b, int off, int len)
+        {
+            System.arraycopy(b, off, _buf, _pos, len);
+            _pos+=len;
+
+        }
+
+        public void writeBoolean(boolean v)
+        {
+            _buf[_pos++] = v ? (byte) 1 : (byte) 0;
+        }
+
+        public void writeByte(int v)
+        {
+            _buf[_pos++] = (byte) v;
+        }
+
+        public void writeShort(int v)
+        {
+            _buf[_pos++] = (byte) (v >>> 8);
+            _buf[_pos++] = (byte) v;
+        }
+
+        public void writeChar(int v)
+        {
+            _buf[_pos++] = (byte) (v >>> 8);
+            _buf[_pos++] = (byte) v;
+        }
+
+        public void writeInt(int v)
+        {
+            _buf[_pos++] = (byte) (v >>> 24);
+            _buf[_pos++] = (byte) (v >>> 16);
+            _buf[_pos++] = (byte) (v >>> 8);
+            _buf[_pos++] = (byte) v;
+        }
+
+        public void writeLong(long v)
+        {
+            _buf[_pos++] = (byte) (v >>> 56);
+            _buf[_pos++] = (byte) (v >>> 48);
+            _buf[_pos++] = (byte) (v >>> 40);
+            _buf[_pos++] = (byte) (v >>> 32);
+            _buf[_pos++] = (byte) (v >>> 24);
+            _buf[_pos++] = (byte) (v >>> 16);
+            _buf[_pos++] = (byte) (v >>> 8);
+            _buf[_pos++] = (byte)v;
+        }
+
+        public void writeFloat(float v)
+        {
+            writeInt(Float.floatToIntBits(v));
+        }
+
+        public void writeDouble(double v)
+        {
+            writeLong(Double.doubleToLongBits(v));
+        }
+
+        public void writeBytes(String s)
+        {
+            int len = s.length();
+            for (int i = 0 ; i < len ; i++)
+            {
+                _buf[_pos++] = ((byte)s.charAt(i));
+            }
+        }
+
+        public void writeChars(String s)
+        {
+            int len = s.length();
+            for (int i = 0 ; i < len ; i++)
+            {
+                int v = s.charAt(i);
+                _buf[_pos++] = (byte) (v >>> 8);
+                _buf[_pos++] = (byte) v;
+            }
+        }
+
+        public void writeUTF(String s)
+        {
+            int strlen = s.length();
+
+            int pos = _pos;
+            _pos+=2;
+
+
+            for (int i = 0; i < strlen; i++)
+            {
+                int c = s.charAt(i);
+                if ((c >= 0x0001) && (c <= 0x007F))
+                {
+                    c = s.charAt(i);
+                    _buf[_pos++] = (byte) c;
+
+                }
+                else if (c > 0x07FF)
+                {
+                    _buf[_pos++]  = (byte) (0xE0 | ((c >> 12) & 0x0F));
+                    _buf[_pos++]  = (byte) (0x80 | ((c >>  6) & 0x3F));
+                    _buf[_pos++]  = (byte) (0x80 | (c & 0x3F));
+                }
+                else
+                {
+                    _buf[_pos++] = (byte) (0xC0 | ((c >>  6) & 0x1F));
+                    _buf[_pos++]  = (byte) (0x80 | (c & 0x3F));
+                }
+            }
+
+            int len = _pos - (pos + 2);
+
+            _buf[pos++] = (byte) (len >>> 8);
+            _buf[pos] = (byte) len;
+        }
+
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Thu Mar  1 12:51:40 2012
@@ -32,6 +32,7 @@ import org.apache.qpid.server.AMQChannel
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.List;
@@ -49,6 +50,14 @@ public interface AMQProtocolSession exte
 
     boolean isClosing();
 
+    void flushBatched();
+
+    void setDeferFlush(boolean defer);
+
+    ClientDeliveryMethod createDeliveryMethod(int channelId);
+
+    long getLastReceivedTime();
+
     public static final class ProtocolSessionIdentifier
     {
         private final Object _sessionIdentifier;
@@ -77,15 +86,6 @@ public interface AMQProtocolSession exte
     }
 
     /**
-     * Called when a protocol data block is received
-     *
-     * @param message the data block that has been received
-     *
-     * @throws Exception if processing the datablock fails
-     */
-    void dataBlockReceived(AMQDataBlock message) throws Exception;
-
-    /**
      * Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC
      * 6).
      *
@@ -234,4 +234,5 @@ public interface AMQProtocolSession exte
     List<AMQChannel> getChannels();
 
     void mgmtCloseChannel(int channelId);
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Thu Mar  1 12:51:40 2012
@@ -39,89 +39,44 @@ package org.apache.qpid.server.protocol;
 
 import java.util.Date;
 import java.util.List;
-
 import javax.management.JMException;
 import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
 import javax.management.NotCompliantMBeanException;
-import javax.management.Notification;
-import javax.management.ObjectName;
-import javax.management.monitor.MonitorNotification;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
 import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.ManagementActor;
-import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.AbstractAMQManagedConnectionObject;
 import org.apache.qpid.server.management.ManagedObject;
 
 /**
  * This MBean class implements the management interface. In order to make more attributes, operations and notifications
  * available over JMX simply augment the ManagedConnection interface and add the appropriate implementation here.
  */
-@MBeanDescription("Management Bean for an AMQ Broker Connection")
-public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection
+@MBeanDescription("Management Bean for an AMQ Broker 0-9-1/0-9/0-8 Connections")
+public class AMQProtocolSessionMBean extends AbstractAMQManagedConnectionObject
 {
     private AMQProtocolSession _protocolSession = null;
-    private String _name = null;
 
-    // openmbean data types for representing the channel attributes
-
-    private static final OpenType[] _channelAttributeTypes =
-        { SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER, SimpleType.BOOLEAN };
-    private static CompositeType _channelType = null; // represents the data type for channel data
-    private static TabularType _channelsType = null; // Data type for list of channels type
     private static final AMQShortString BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION =
-        new AMQShortString("Broker Management Console has closed the connection.");
+                                        new AMQShortString(BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION_STR);
 
-    @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
+    @MBeanConstructor("Creates an MBean exposing an AMQ Broker 0-9-1/0-9/0-8 Connection")
     public AMQProtocolSessionMBean(AMQProtocolSession amqProtocolSession) throws NotCompliantMBeanException, OpenDataException
     {
-        super(ManagedConnection.class, ManagedConnection.TYPE);
+        super(amqProtocolSession.getRemoteAddress().toString());
         _protocolSession = amqProtocolSession;
-        String remote = getRemoteAddress();
-        _name = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
-        init();
-    }
-
-    static
-    {
-        try
-        {
-            init();
-        }
-        catch (JMException ex)
-        {
-            // This is not expected to ever occur.
-            throw new RuntimeException("Got JMException in static initializer.", ex);
-        }
-    }
-
-    /**
-     * initialises the openmbean data types
-     */
-    private static void init() throws OpenDataException
-    {
-        _channelType =
-            new CompositeType("Channel", "Channel Details", COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]),
-                    COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]), _channelAttributeTypes);
-        _channelsType = new TabularType("Channels", "Channels", _channelType, TABULAR_UNIQUE_INDEX.toArray(new String[TABULAR_UNIQUE_INDEX.size()]));
     }
 
     public String getClientId()
@@ -169,16 +124,6 @@ public class AMQProtocolSessionMBean ext
         return _protocolSession.getMaximumNumberOfChannels();
     }
 
-    public void setMaximumNumberOfChannels(Long value)
-    {
-        _protocolSession.setMaximumNumberOfChannels(value);
-    }
-
-    public String getObjectInstanceName()
-    {
-        return ObjectName.quote(_name);
-    }
-
     /**
      * commits transactions for a transactional channel
      *
@@ -321,25 +266,6 @@ public class AMQProtocolSessionMBean ext
         }
     }
 
-    @Override
-    public MBeanNotificationInfo[] getNotificationInfo()
-    {
-        String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
-        String name = MonitorNotification.class.getName();
-        String description = "Channel count has reached threshold value";
-        MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
-
-        return new MBeanNotificationInfo[] { info1 };
-    }
-
-    public void notifyClients(String notificationMsg)
-    {
-        Notification n =
-            new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
-                System.currentTimeMillis(), notificationMsg);
-        _broadcaster.sendNotification(n);
-    }
-
     public void resetStatistics() throws Exception
     {
         _protocolSession.resetStatistics();

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Thu Mar  1 12:51:40 2012
@@ -100,6 +100,12 @@ public class ProtocolEngine_0_10  extend
         return _network.getLocalAddress();
     }
 
+    public void received(final ByteBuffer buf)
+    {
+        super.received(buf);
+        _connection.receivedComplete();
+    }
+
     public long getReadBytes()
     {
         return _readBytes;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Thu Mar  1 12:51:40 2012
@@ -20,6 +20,14 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.AMQSecurityException;
@@ -27,16 +35,28 @@ import org.apache.qpid.amqp_1_0.transpor
 import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
-import org.apache.qpid.amqp_1_0.type.*;
-
-import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.Released;
 import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
 import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeType;
 import org.apache.qpid.server.exchange.TopicExchange;
 import org.apache.qpid.server.filter.JMSSelectorMessageFilter;
 import org.apache.qpid.server.filter.SimpleFilterManager;
@@ -47,9 +67,6 @@ import org.apache.qpid.server.txn.AutoCo
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
 public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler
 {
     private VirtualHost _vhost;
@@ -140,7 +157,7 @@ public class SendingLink_1_0 implements 
             try
             {
                 queue = AMQQueueFactory.createAMQQueueImpl(UUID.randomUUID().toString(), false, null, true,
-                        false, _vhost, Collections.EMPTY_MAP);
+                            false, _vhost, Collections.EMPTY_MAP);
                 Exchange exchange = ((ExchangeDestination) destination).getExchange();
 
                 String binding = "";
@@ -183,6 +200,9 @@ public class SendingLink_1_0 implements 
             catch (AMQInternalException e)
             {
                 e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            } catch (AMQException e)
+            {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
             }
 
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Mar  1 12:51:40 2012
@@ -365,6 +365,9 @@ public class Session_1_0 implements Sess
         catch (AMQSecurityException e)
         {
             e.printStackTrace();  //TODO.
+        } catch (AMQException e)
+        {
+            e.printStackTrace();  //TODO
         }
 
         return queue;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Thu Mar  1 12:51:40 2012
@@ -20,6 +20,13 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.DeliveryState;
@@ -31,24 +38,14 @@ import org.apache.qpid.amqp_1_0.type.mes
 import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
 import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-
-import org.apache.qpid.AMQException;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.ServerTransaction;
 
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-
 class Subscription_1_0 implements Subscription
 {
     private SendingLink_1_0 _link;
@@ -171,6 +168,17 @@ class Subscription_1_0 implements Subscr
         getEndpoint().detach();
     }
 
+    public void send(QueueEntry entry, boolean batch) throws AMQException
+    {
+        // TODO
+        send(entry);
+    }
+
+    public void flushBatched()
+    {
+        // TODO
+    }
+
     public void send(final QueueEntry queueEntry) throws AMQException
     {
         //TODO
@@ -296,6 +304,11 @@ class Subscription_1_0 implements Subscr
         return !hasCredit;
     }
 
+    public boolean trySendLock()
+    {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public void suspend()
     {
         if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
@@ -314,6 +327,11 @@ class Subscription_1_0 implements Subscr
         _stateChangeLock.unlock();
     }
 
+    public void releaseQueueEntry(QueueEntry queueEntryImpl)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
 
     public void onDequeue(final QueueEntry queueEntry)
     {

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar  1 12:51:40 2012
@@ -4,4 +4,4 @@
 /qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
 /qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Thu Mar  1 12:51:40 2012
@@ -20,71 +20,25 @@
 */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
 import java.util.Map;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
-public class AMQPriorityQueue extends SimpleAMQQueue
+public class AMQPriorityQueue extends OutOfOrderQueue
 {
-    protected AMQPriorityQueue(final AMQShortString name,
-                               final boolean durable,
-                               final AMQShortString owner,
-                               final boolean autoDelete,
-                               boolean exclusive,
-                               final VirtualHost virtualHost, 
-                               int priorities, Map<String, Object> arguments)
-    {
-        super(name, durable, owner, autoDelete, exclusive, virtualHost,new PriorityQueueList.Factory(priorities), arguments);
-    }
-
-    public AMQPriorityQueue(String queueName,
-                            boolean durable,
-                            String owner,
-                            boolean autoDelete,
-                            boolean exclusive, VirtualHost virtualHost, int priorities, Map<String,Object> arguments)
+    protected AMQPriorityQueue(final String name,
+                                final boolean durable,
+                                final String owner,
+                                final boolean autoDelete,
+                                boolean exclusive,
+                                final VirtualHost virtualHost,
+                                Map<String, Object> arguments,
+                                int priorities)
     {
-        this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner),
-                autoDelete, exclusive,virtualHost, priorities, arguments);
+        super(name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments);
     }
 
     public int getPriorities()
     {
         return ((PriorityQueueList) _entries).getPriorities();
     }
-
-    @Override
-    protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
-    {
-        // check that all subscriptions are not in advance of the entry
-        SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
-        while(subIter.advance() && entry.isAvailable())
-        {
-            final Subscription subscription = subIter.getNode().getSubscription();
-            if(!subscription.isClosed())
-            {
-                QueueContext context = (QueueContext) subscription.getQueueContext();
-                if(context != null)
-                {
-                    QueueEntry subnode = context._lastSeenEntry;
-                    QueueEntry released = context._releasedEntry;
-                    while(subnode != null && entry.compareTo(subnode) < 0 && entry.isAvailable() && (released == null || released.compareTo(entry) < 0))
-                    {
-                        if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
-                        {
-                            break;
-                        }
-                        else
-                        {
-                            subnode = context._lastSeenEntry;
-                            released = context._releasedEntry;
-                        }
-                    }
-                }
-            }
-
-        }
-    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Mar  1 12:51:40 2012
@@ -213,6 +213,8 @@ public interface AMQQueue extends Managa
 
     void setAlternateExchange(Exchange exchange);
 
+    void setAlternateExchange(String exchangeName);
+
     Map<String, Object> getArguments();
 
     void checkCapacity(AMQChannel channel);
@@ -272,4 +274,22 @@ public interface AMQQueue extends Managa
     ManagedObject getManagedObject();
 
     void setExclusive(boolean exclusive) throws AMQException;
+
+    /**
+     * Gets the maximum delivery count.   If a message on this queue
+     * is delivered more than maximumDeliveryCount, the message will be
+     * routed to the {@link #getAlternateExchange()} (if set), or otherwise
+     * discarded. 0 indicates that maximum deliver count should not be enforced.
+     *
+     * @return maximum delivery count
+     */
+    int getMaximumDeliveryCount();
+
+    /**
+     * Sets the maximum delivery count.
+     *
+     * @param maximumDeliveryCount maximum delivery count
+     */
+    public void setMaximumDeliveryCount(final int maximumDeliveryCount);
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Thu Mar  1 12:51:40 2012
@@ -20,22 +20,33 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.configuration.QueueConfiguration;
-
-import java.util.Map;
-import java.util.HashMap;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class AMQQueueFactory
 {
-    public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
+    public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
     public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
     public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
     public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
+    public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
+
+    public static final String DLQ_ROUTING_KEY = "dlq";
+    public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
+    public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
+    public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
 
     private abstract static class QueueProperty
     {
@@ -80,6 +91,24 @@ public class AMQQueueFactory
 
     }
 
+    private abstract static class QueueIntegerProperty extends QueueProperty
+    {
+        public QueueIntegerProperty(String argumentName)
+        {
+            super(argumentName);
+        }
+
+        public void setPropertyValue(AMQQueue queue, Object value)
+        {
+            if(value instanceof Number)
+            {
+                setPropertyValue(queue, ((Number)value).intValue());
+            }
+
+        }
+        abstract void setPropertyValue(AMQQueue queue, int value);
+    }
+
     private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
             new QueueLongProperty("x-qpid-maximum-message-age")
             {
@@ -122,8 +151,14 @@ public class AMQQueueFactory
                 {
                     queue.setFlowResumeCapacity(value);
                 }
+            },
+            new QueueIntegerProperty(X_QPID_MAXIMUM_DELIVERY_COUNT)
+            {
+                public void setPropertyValue(AMQQueue queue, int value)
+                {
+                    queue.setMaximumDeliveryCount(value);
+                }
             }
-
     };
 
 
@@ -149,17 +184,31 @@ public class AMQQueueFactory
                                               String owner,
                                               boolean autoDelete,
                                               boolean exclusive,
-                                              VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException
+                                              VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException
     {
+        if (queueName == null)
+        {
+            throw new IllegalArgumentException("Queue name must not be null");
+        }
+
         // Access check
         if (!virtualHost.getSecurityManager().authoriseCreateQueue(autoDelete, durable, exclusive, null, null, new AMQShortString(queueName), owner))
         {
             String description = "Permission denied: queue-name '" + queueName + "'";
             throw new AMQSecurityException(description);
         }
-        
+
+        QueueConfiguration queueConfiguration = virtualHost.getConfiguration().getQueueConfiguration(queueName);
+        boolean isDLQEnabled = isDLQEnabled(autoDelete, arguments, queueConfiguration);
+        if (isDLQEnabled)
+        {
+            validateDLNames(queueName);
+        }
+
         int priorities = 1;
         String conflationKey = null;
+        String sortingKey = null;
+
         if(arguments != null)
         {
             if(arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
@@ -170,24 +219,32 @@ public class AMQQueueFactory
                     conflationKey = QPID_LVQ_KEY;
                 }
             }
-            else if(arguments.containsKey(X_QPID_PRIORITIES.toString()))
+            else if(arguments.containsKey(X_QPID_PRIORITIES))
             {
-                Object prioritiesObj = arguments.get(X_QPID_PRIORITIES.toString());
+                Object prioritiesObj = arguments.get(X_QPID_PRIORITIES);
                 if(prioritiesObj instanceof Number)
                 {
                     priorities = ((Number)prioritiesObj).intValue();
                 }
             }
+            else if(arguments.containsKey(QPID_QUEUE_SORT_KEY))
+            {
+                sortingKey = (String)arguments.get(QPID_QUEUE_SORT_KEY);
+            }
         }
 
         AMQQueue q;
-        if(conflationKey != null)
+        if(sortingKey != null)
+        {
+            q = new SortedQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey);
+        }
+        else if(conflationKey != null)
         {
             q = new ConflationQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey);
         }
         else if(priorities > 1)
         {
-            q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, priorities, arguments);
+            q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities);
         }
         else
         {
@@ -209,10 +266,63 @@ public class AMQQueueFactory
             }
         }
 
-        return q;
+        if(isDLQEnabled)
+        {
+            final String dlExchangeName = getDeadLetterExchangeName(queueName);
+            final String dlQueueName = getDeadLetterQueueName(queueName);
 
-    }
+            final ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+            final ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+            final QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+            Exchange dlExchange = null;
+            synchronized(exchangeRegistry)
+            {
+                dlExchange = exchangeRegistry.getExchange(dlExchangeName);
+
+                if(dlExchange == null)
+                {
+                    dlExchange = exchangeFactory.createExchange(new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
+
+                    exchangeRegistry.registerExchange(dlExchange);
+
+                    //enter the dle in the persistent store
+                    virtualHost.getDurableConfigurationStore().createExchange(dlExchange);
+                }
+            }
+
+            AMQQueue dlQueue = null;
+
+            synchronized(queueRegistry)
+            {
+                dlQueue = queueRegistry.getQueue(dlQueueName);
+
+                if(dlQueue == null)
+                {
+                    //set args to disable DLQ'ing/MDC from the DLQ itself, preventing loops etc
+                    final Map<String, Object> args = new HashMap<String, Object>();
+                    args.put(X_QPID_DLQ_ENABLED, false);
+                    args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0);
+
+                    dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, exclusive, virtualHost, args);
 
+                    //enter the dlq in the persistent store
+                    virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
+                }
+            }
+
+            //ensure the queue is bound to the exchange
+            if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue))
+            {
+                //actual routing key used does not matter due to use of fanout exchange,
+                //but we will make the key 'dlq' as it can be logged at creation.
+                virtualHost.getBindingFactory().addBinding(DLQ_ROUTING_KEY, dlQueue, dlExchange, null);
+            }
+            q.setAlternateExchange(dlExchange);
+        }
+
+        return q;
+    }
 
     public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException
     {
@@ -223,26 +333,30 @@ public class AMQQueueFactory
         boolean exclusive = config.getExclusive();
         String owner = config.getOwner();
         Map<String,Object> arguments = null;
+
         if(config.isLVQ() || config.getLVQKey() != null)
         {
-
             arguments = new HashMap<String,Object>();
             arguments.put(QPID_LAST_VALUE_QUEUE, 1);
             arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
         }
-        else
+        else if (config.getPriority() || config.getPriorities() > 0)
+        {
+            arguments = new HashMap<String,Object>();
+            arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
+        }
+        else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey()))
+        {
+            arguments = new HashMap<String,Object>();
+            arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey());
+        }
+        if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled())
         {
-            boolean priority = config.getPriority();
-            int priorities = config.getPriorities();
-            if(priority || priorities > 0)
+            if (arguments == null)
             {
                 arguments = new HashMap<String,Object>();
-                if (priorities < 0)
-                {
-                    priorities = 10;
-                }
-                arguments.put("x-qpid-priorities", priorities);
             }
+            arguments.put(X_QPID_DLQ_ENABLED, true);
         }
 
         if(config.isTopic())
@@ -259,4 +373,94 @@ public class AMQQueueFactory
         return q;
     }
 
+
+    /**
+     * Validates DLQ and DLE names
+     * <p>
+     * DLQ name and DLQ exchange name need to be validated in order to keep
+     * integrity in cases when queue name passes validation check but DLQ name
+     * or DL exchange name fails to pass it. Otherwise, we might have situations
+     * when queue is created but DL exchange or/and DLQ creation fail.
+     * <p>
+     *
+     * @param name
+     *            queue name
+     * @throws IllegalArgumentException
+     *             thrown if length of queue name or exchange name exceed 255
+     */
+    protected static void validateDLNames(String name)
+    {
+        // check if DLQ name and DLQ exchange name do not exceed 255
+        String exchangeName = getDeadLetterExchangeName(name);
+        if (exchangeName.length() > AMQShortString.MAX_LENGTH)
+        {
+            throw new IllegalArgumentException("DL exchange name '" + exchangeName
+                    + "' length exceeds limit of " + AMQShortString.MAX_LENGTH + " characters for queue " + name);
+        }
+        String queueName = getDeadLetterQueueName(name);
+        if (queueName.length() > AMQShortString.MAX_LENGTH)
+        {
+            throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of "
+                    + AMQShortString.MAX_LENGTH + " characters for queue " + name);
+        }
+    }
+
+    /**
+     * Checks if DLQ is enabled for the queue.
+     *
+     * @param autoDelete
+     *            queue auto-delete flag
+     * @param arguments
+     *            queue arguments
+     * @param qConfig
+     *            queue configuration
+     * @return true if DLQ enabled
+     */
+    protected static boolean isDLQEnabled(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig)
+    {
+        //feature is not to be enabled for temporary queues or when explicitly disabled by argument
+        if (!autoDelete)
+        {
+            boolean dlqArgumentPresent = arguments != null && arguments.containsKey(X_QPID_DLQ_ENABLED);
+            if (dlqArgumentPresent || qConfig.isDeadLetterQueueEnabled())
+            {
+                boolean dlqEnabled = true;
+                if (dlqArgumentPresent)
+                {
+                    Object argument = arguments.get(X_QPID_DLQ_ENABLED);
+                    dlqEnabled = argument instanceof Boolean && ((Boolean)argument).booleanValue();
+                }
+                return dlqEnabled;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Generates a dead letter queue name for a given queue name
+     *
+     * @param name
+     *            queue name
+     * @return DLQ name
+     */
+    protected static String getDeadLetterQueueName(String name)
+    {
+        ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
+        String dlQueueName = name + serverConfig.getDeadLetterQueueSuffix();
+        return dlQueueName;
+    }
+
+    /**
+     * Generates a dead letter exchange name for a given queue name
+     *
+     * @param name
+     *            queue name
+     * @return DL exchange name
+     */
+    protected static String getDeadLetterExchangeName(String name)
+    {
+        ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
+        String dlExchangeName = name + serverConfig.getDeadLetterExchangeSuffix();
+        return dlExchangeName;
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Thu Mar  1 12:51:40 2012
@@ -28,7 +28,7 @@ import org.apache.qpid.framing.ContentHe
 import org.apache.qpid.management.common.mbeans.ManagedQueue;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.message.ServerMessage;
@@ -63,23 +63,25 @@ import java.util.*;
 /**
  * AMQQueueMBean is the management bean for an {@link AMQQueue}.
  *
- * <p/><tablse id="crc"><caption>CRC Caption</caption>
+ * <p/><table id="crc"><caption>CRC Caption</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * </table>
  */
 @MBeanDescription("Management Interface for AMQQueue")
 public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
 {
+
     /** Used for debugging purposes. */
     private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
 
-    private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
+    /** Date/time format used for message expiration and message timestamp formatting */
+    public static final String JMSTIMESTAMP_DATETIME_FORMAT = "MM-dd-yy HH:mm:ss.SSS z";
 
-    private AMQQueue _queue = null;
-    private String _queueName = null;
+    private final AMQQueue _queue;
+    private final String _queueName;
     // OpenMBean data types for viewMessages method
 
-    private static OpenType[] _msgAttributeTypes = new OpenType[5]; // AMQ message attribute types.
+    private static OpenType[] _msgAttributeTypes = new OpenType[6]; // AMQ message attribute types.
     private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
     private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
 
@@ -138,6 +140,7 @@ public class AMQQueueMBean extends AMQMa
         _msgAttributeTypes[2] = SimpleType.LONG; // For size
         _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
         _msgAttributeTypes[4] = SimpleType.LONG; // For queue position
+        _msgAttributeTypes[5] = SimpleType.INTEGER; // For delivery count
 
         _messageDataType = new CompositeType("Message", "AMQ Message", 
                 VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]),
@@ -176,6 +179,11 @@ public class AMQQueueMBean extends AMQMa
         return _queue.getMessageCount();
     }
 
+    public Integer getMaximumDeliveryCount()
+    {
+        return _queue.getMaximumDeliveryCount();
+    }
+
     public Long getMaximumMessageSize()
     {
         return _queue.getMaximumMessageSize();
@@ -294,6 +302,18 @@ public class AMQQueueMBean extends AMQMa
         }
     }
 
+    public void setAlternateExchange(String exchangeName)
+    {
+        _queue.setAlternateExchange(exchangeName);
+    }
+
+    public String getAlternateExchange()
+    {
+        Exchange exchange = _queue.getAlternateExchange();
+        String name = exchange == null ? null : exchange.getName();
+        return name == null ? null : name;
+    }
+
     /**
      * Checks if there is any notification to be send to the listeners
      */
@@ -471,7 +491,7 @@ public class AMQQueueMBean extends AMQMa
                     ContentHeaderBody headerBody = msg.getContentHeaderBody();
                     // Create header attributes list
                     headerAttributes = getMessageHeaderProperties(headerBody);
-                    itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position};
+                    itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
                 }
                 else if(serverMsg instanceof MessageTransferMessage)
                 {
@@ -480,13 +500,13 @@ public class AMQQueueMBean extends AMQMa
 
                     // Create header attributes list
                     headerAttributes = getMessageTransferMessageHeaderProps(msg);
-                    itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position};
+                    itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
                 }
                 else
                 {
                     //unknown message
                     headerAttributes = new String[]{"N/A"};
-                    itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position};
+                    itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
                 }
                 
                 CompositeData messageData = new CompositeDataSupport(_messageDataType, 
@@ -523,13 +543,11 @@ public class AMQQueueMBean extends AMQMa
         list.add("JMSPriority = " + headerProperties.getPriority());
         list.add("JMSType = " + headerProperties.getType());
 
-        long longDate = headerProperties.getExpiration();
-        String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
-        list.add("JMSExpiration = " + strDate);
-
-        longDate = headerProperties.getTimestamp();
-        strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
-        list.add("JMSTimestamp = " + strDate);
+        final long expirationDate = headerProperties.getExpiration();
+        final long timestampDate = headerProperties.getTimestamp();
+
+        addStringifiedJMSTimestamoAndJMSExpiration(list, expirationDate,
+                timestampDate);
 
         return list.toArray(new String[list.size()]);
     }
@@ -561,17 +579,32 @@ public class AMQQueueMBean extends AMQMa
         list.add("JMSPriority = " + header.getPriority());
         list.add("JMSType = " + header.getType());
 
-        long longDate = header.getExpiration();
-        String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
-        list.add("JMSExpiration = " + strDate);
-
-        longDate = header.getTimestamp();
-        strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
-        list.add("JMSTimestamp = " + strDate);
+        final long expirationDate = header.getExpiration();
+        final long timestampDate = header.getTimestamp();
+        addStringifiedJMSTimestamoAndJMSExpiration(list, expirationDate, timestampDate);
 
         return list.toArray(new String[list.size()]);
     }
 
+    private void addStringifiedJMSTimestamoAndJMSExpiration(final List<String> list,
+            final long expirationDate, final long timestampDate)
+    {
+        final SimpleDateFormat dateFormat;
+        if (expirationDate != 0 || timestampDate != 0)
+        {
+            dateFormat = new SimpleDateFormat(JMSTIMESTAMP_DATETIME_FORMAT);
+        }
+        else
+        {
+            dateFormat = null;
+        }
+
+        final String formattedExpirationDate = (expirationDate != 0) ? dateFormat.format(new Date(expirationDate)) : null;
+        final String formattedTimestampDate = (timestampDate != 0) ? dateFormat.format(new Date(timestampDate)) : null;
+        list.add("JMSExpiration = " + formattedExpirationDate);
+        list.add("JMSTimestamp = " + formattedTimestampDate);
+    }
+
     /**
      * @see ManagedQueue#moveMessages
      * @param fromMessageId



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org