You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/01 13:51:52 UTC
svn commit: r1295541 [3/10] - in
/qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ bdbstore/bin/
bdbstore/etc/scripts/ bdbstore/src/main/java/ bdbstore/src/resources/
bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/
broker-plugins/access-...
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java Thu Mar 1 12:51:40 2012
@@ -21,6 +21,8 @@
package org.apache.qpid.server.management;
import javax.management.JMException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanNotificationInfo;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
@@ -28,7 +30,6 @@ import javax.management.StandardMBean;
import org.apache.log4j.Logger;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
/**
* Provides implementation of the boilerplate ManagedObject interface. Most managed objects should find it useful
@@ -37,11 +38,13 @@ import org.apache.qpid.server.registry.I
*/
public abstract class DefaultManagedObject extends StandardMBean implements ManagedObject
{
- private static final Logger LOGGER = Logger.getLogger(ApplicationRegistry.class);
+ private static final Logger LOGGER = Logger.getLogger(DefaultManagedObject.class);
- private Class<?> _managementInterface;
+ private final Class<?> _managementInterface;
- private String _typeName;
+ private final String _typeName;
+
+ private final MBeanInfo _mbeanInfo;
private ManagedObjectRegistry _registry;
@@ -51,6 +54,13 @@ public abstract class DefaultManagedObje
super(managementInterface);
_managementInterface = managementInterface;
_typeName = typeName;
+ _mbeanInfo = buildMBeanInfo();
+ }
+
+ @Override
+ public MBeanInfo getMBeanInfo()
+ {
+ return _mbeanInfo;
}
public String getType()
@@ -98,7 +108,6 @@ public abstract class DefaultManagedObje
return getObjectInstanceName() + "[" + getType() + "]";
}
-
/**
* Created the ObjectName as per the JMX Specs
* @return ObjectName
@@ -161,4 +170,18 @@ public abstract class DefaultManagedObje
return "";
}
+ private MBeanInfo buildMBeanInfo() throws NotCompliantMBeanException
+ {
+ return new MBeanInfo(this.getClass().getName(),
+ MBeanIntrospector.getMBeanDescription(this.getClass()),
+ MBeanIntrospector.getMBeanAttributesInfo(getManagementInterface()),
+ MBeanIntrospector.getMBeanConstructorsInfo(this.getClass()),
+ MBeanIntrospector.getMBeanOperationsInfo(getManagementInterface()),
+ this.getNotificationInfo());
+ }
+
+ public MBeanNotificationInfo[] getNotificationInfo()
+ {
+ return null;
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java Thu Mar 1 12:51:40 2012
@@ -38,10 +38,13 @@ import java.rmi.server.RMIClientSocketFa
import java.rmi.server.RMIServerSocketFactory;
import java.rmi.server.UnicastRemoteObject;
import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
+import javax.management.Notification;
import javax.management.NotificationFilterSupport;
import javax.management.NotificationListener;
import javax.management.ObjectName;
@@ -49,11 +52,13 @@ import javax.management.remote.JMXConnec
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXServiceURL;
import javax.management.remote.MBeanServerForwarder;
+import javax.management.remote.rmi.RMIConnection;
import javax.management.remote.rmi.RMIConnectorServer;
import javax.management.remote.rmi.RMIJRMPServerImpl;
import javax.management.remote.rmi.RMIServerImpl;
import javax.rmi.ssl.SslRMIClientSocketFactory;
import javax.rmi.ssl.SslRMIServerSocketFactory;
+import javax.security.auth.Subject;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
@@ -63,6 +68,7 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.auth.rmi.RMIPasswordAuthenticator;
+import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
/**
* This class starts up an MBeanserver. If out of the box agent has been enabled then there are no
@@ -223,7 +229,40 @@ public class JMXManagedObjectRegistry im
* The registry is exported on the defined management port 'port'. We will export the RMIConnectorServer
* on 'port +1'. Use of these two well-defined ports will ease any navigation through firewall's.
*/
- final RMIServerImpl rmiConnectorServerStub = new RMIJRMPServerImpl(_jmxPortConnectorServer, csf, ssf, env);
+ final Map<String, String> connectionIdUsernameMap = new ConcurrentHashMap<String, String>();
+ final RMIServerImpl rmiConnectorServerStub = new RMIJRMPServerImpl(_jmxPortConnectorServer, csf, ssf, env)
+ {
+
+ /**
+ * Override makeClient so we can cache the username of the client in a Map keyed by connectionId.
+ * ConnectionId is guaranteed to be unique per client connection, according to the JMS spec.
+ * An instance of NotificationListener (mapCleanupListener) will be responsible for removing these Map
+ * entries.
+ *
+ * @see javax.management.remote.rmi.RMIJRMPServerImpl#makeClient(java.lang.String, javax.security.auth.Subject)
+ */
+ @Override
+ protected RMIConnection makeClient(String connectionId, Subject subject) throws IOException
+ {
+ final RMIConnection makeClient = super.makeClient(connectionId, subject);
+ final UsernamePrincipal usernamePrincipalFromSubject = UsernamePrincipal.getUsernamePrincipalFromSubject(subject);
+ connectionIdUsernameMap.put(connectionId, usernamePrincipalFromSubject.getName());
+ return makeClient;
+ }
+ };
+
+ // Create a Listener responsible for removing the map entries add by the #makeClient entry above.
+ final NotificationListener mapCleanupListener = new NotificationListener()
+ {
+
+ @Override
+ public void handleNotification(Notification notification, Object handback)
+ {
+ final String connectionId = ((JMXConnectionNotification) notification).getConnectionId();
+ connectionIdUsernameMap.remove(connectionId);
+ }
+ };
+
String localHost;
try
{
@@ -295,13 +334,26 @@ public class JMXManagedObjectRegistry im
MBeanServerForwarder mbsf = MBeanInvocationHandlerImpl.newProxyInstance();
_cs.setMBeanServerForwarder(mbsf);
- NotificationFilterSupport filter = new NotificationFilterSupport();
- filter.enableType(JMXConnectionNotification.OPENED);
- filter.enableType(JMXConnectionNotification.CLOSED);
- filter.enableType(JMXConnectionNotification.FAILED);
+
// Get the handler that is used by the above MBInvocationHandler Proxy.
- // which is the MBeanInvocationHandlerImpl and so also a NotificationListener
- _cs.addNotificationListener((NotificationListener) Proxy.getInvocationHandler(mbsf), filter, null);
+ // which is the MBeanInvocationHandlerImpl and so also a NotificationListener.
+ final NotificationListener invocationHandler = (NotificationListener) Proxy.getInvocationHandler(mbsf);
+
+ // Install a notification listener on OPENED, CLOSED, and FAILED,
+ // passing the map of connection-ids to usernames as hand-back data.
+ final NotificationFilterSupport invocationHandlerFilter = new NotificationFilterSupport();
+ invocationHandlerFilter.enableType(JMXConnectionNotification.OPENED);
+ invocationHandlerFilter.enableType(JMXConnectionNotification.CLOSED);
+ invocationHandlerFilter.enableType(JMXConnectionNotification.FAILED);
+ _cs.addNotificationListener(invocationHandler, invocationHandlerFilter, connectionIdUsernameMap);
+
+ // Install a second notification listener on CLOSED AND FAILED only to remove the entry from the
+ // Map. Here we rely on the fact that JMX will call the listeners in the order in which they are
+ // installed.
+ final NotificationFilterSupport mapCleanupHandlerFilter = new NotificationFilterSupport();
+ mapCleanupHandlerFilter.enableType(JMXConnectionNotification.CLOSED);
+ mapCleanupHandlerFilter.enableType(JMXConnectionNotification.FAILED);
+ _cs.addNotificationListener(mapCleanupListener, mapCleanupHandlerFilter, null);
_cs.start();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java Thu Mar 1 12:51:40 2012
@@ -26,6 +26,7 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.security.AccessControlContext;
import java.security.AccessController;
+import java.util.Map;
import java.util.Set;
import javax.management.Attribute;
@@ -45,6 +46,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.logging.actors.ManagementActor;
import org.apache.qpid.server.logging.messages.ManagementConsoleMessages;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
@@ -56,22 +58,54 @@ public class MBeanInvocationHandlerImpl
{
private static final Logger _logger = Logger.getLogger(MBeanInvocationHandlerImpl.class);
+ private final IApplicationRegistry _appRegistry = ApplicationRegistry.getInstance();
private final static String DELEGATE = "JMImplementation:type=MBeanServerDelegate";
private MBeanServer _mbs;
- private static ManagementActor _logActor;
-
+ private final ManagementActor _logActor = new ManagementActor(_appRegistry.getRootMessageLogger());
+ private final boolean _managementRightsInferAllAccess =
+ _appRegistry.getConfiguration().getManagementRightsInferAllAccess();
+
public static MBeanServerForwarder newProxyInstance()
{
final InvocationHandler handler = new MBeanInvocationHandlerImpl();
final Class<?>[] interfaces = new Class[] { MBeanServerForwarder.class };
-
- _logActor = new ManagementActor(ApplicationRegistry.getInstance().getRootMessageLogger());
-
Object proxy = Proxy.newProxyInstance(MBeanServerForwarder.class.getClassLoader(), interfaces, handler);
return MBeanServerForwarder.class.cast(proxy);
}
+ private boolean invokeDirectly(String methodName, Object[] args, Subject subject)
+ {
+ // Allow operations performed locally on behalf of the connector server itself
+ if (subject == null)
+ {
+ return true;
+ }
+
+ if (args == null || DELEGATE.equals(args[0]))
+ {
+ return true;
+ }
+
+ // Allow querying available object names
+ if (methodName.equals("queryNames"))
+ {
+ return true;
+ }
+
+ if (args[0] instanceof ObjectName)
+ {
+ ObjectName mbean = (ObjectName) args[0];
+
+ if(!DefaultManagedObject.DOMAIN.equalsIgnoreCase(mbean.getDomain()))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{
final String methodName = getMethodName(method, args);
@@ -95,36 +129,24 @@ public class MBeanInvocationHandlerImpl
return null;
}
+ // Restrict access to "createMBean" and "unregisterMBean" to any user
+ if (methodName.equals("createMBean") || methodName.equals("unregisterMBean"))
+ {
+ _logger.debug("User trying to create or unregister an MBean");
+ throw new SecurityException("Access denied: " + methodName);
+ }
+
// Retrieve Subject from current AccessControlContext
AccessControlContext acc = AccessController.getContext();
Subject subject = Subject.getSubject(acc);
try
{
- // Allow operations performed locally on behalf of the connector server itself
- if (subject == null)
+ if(invokeDirectly(methodName, args, subject))
{
return method.invoke(_mbs, args);
}
-
- if (args == null || DELEGATE.equals(args[0]))
- {
- return method.invoke(_mbs, args);
- }
-
- // Restrict access to "createMBean" and "unregisterMBean" to any user
- if (methodName.equals("createMBean") || methodName.equals("unregisterMBean"))
- {
- _logger.debug("User trying to create or unregister an MBean");
- throw new SecurityException("Access denied: " + methodName);
- }
-
- // Allow querying available object names
- if (methodName.equals("queryNames"))
- {
- return method.invoke(_mbs, args);
- }
-
+
// Retrieve JMXPrincipal from Subject
Set<JMXPrincipal> principals = subject.getPrincipals(JMXPrincipal.class);
if (principals == null || principals.isEmpty())
@@ -134,23 +156,23 @@ public class MBeanInvocationHandlerImpl
// Save the subject
SecurityManager.setThreadSubject(subject);
-
+
// Get the component, type and impact, which may be null
String type = getType(method, args);
String vhost = getVirtualHost(method, args);
int impact = getImpact(method, args);
-
+
// Get the security manager for the virtual host (if set)
SecurityManager security;
if (vhost == null)
{
- security = ApplicationRegistry.getInstance().getSecurityManager();
+ security = _appRegistry.getSecurityManager();
}
else
{
- security = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(vhost).getSecurityManager();
+ security = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhost).getSecurityManager();
}
-
+
if (isAccessMethod(methodName) || impact == MBeanOperationInfo.INFO)
{
// Check for read-only method invocation permission
@@ -159,25 +181,33 @@ public class MBeanInvocationHandlerImpl
throw new SecurityException("Permission denied: Access " + methodName);
}
}
- else if (isUpdateMethod(methodName))
- {
- // Check for setting properties permission
- if (!security.authoriseMethod(Operation.UPDATE, type, methodName))
- {
- throw new SecurityException("Permission denied: Update " + methodName);
- }
- }
- else
- {
- // Check for invoking/executing method action/operation permission
- if (!security.authoriseMethod(Operation.EXECUTE, type, methodName))
- {
- throw new SecurityException("Permission denied: Execute " + methodName);
- }
- }
-
- // Actually invoke the method
- return method.invoke(_mbs, args);
+ else
+ {
+ // Check for setting properties permission
+ if (!security.authoriseMethod(Operation.UPDATE, type, methodName))
+ {
+ throw new SecurityException("Permission denied: Update " + methodName);
+ }
+ }
+
+ boolean oldAccessChecksDisabled = false;
+ if(_managementRightsInferAllAccess)
+ {
+ oldAccessChecksDisabled = SecurityManager.setAccessChecksDisabled(true);
+ }
+
+ try
+ {
+ // Actually invoke the method
+ return method.invoke(_mbs, args);
+ }
+ finally
+ {
+ if(_managementRightsInferAllAccess)
+ {
+ SecurityManager.setAccessChecksDisabled(oldAccessChecksDisabled);
+ }
+ }
}
catch (InvocationTargetException e)
{
@@ -290,28 +320,44 @@ public class MBeanInvocationHandlerImpl
return (methodName.startsWith("query") || methodName.startsWith("get") || methodName.startsWith("is"));
}
-
- private boolean isUpdateMethod(String methodName)
- {
- //handle standard set methods from MBeanServer
- return methodName.startsWith("set");
- }
-
- public void handleNotification(Notification notification, Object handback)
+ /**
+ * Receives notifications from the MBeanServer.
+ */
+ public void handleNotification(final Notification notification, final Object handback)
{
assert notification instanceof JMXConnectionNotification;
- // only RMI Connections are serviced here, Local API atta
- // rmi://169.24.29.116 guest 3
- String[] connectionData = ((JMXConnectionNotification) notification).getConnectionId().split(" ");
- String user = connectionData[1];
+ final String connectionId = ((JMXConnectionNotification) notification).getConnectionId();
+ final String type = notification.getType();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Notification connectionId : " + connectionId + " type : " + type
+ + " Notification handback : " + handback);
+ }
+
+ // Normally JMXManagedObjectRegistry provides a Map as handback data containing a map
+ // between connection id and username.
+ String user = null;
+ if (handback != null && handback instanceof Map)
+ {
+ final Map<String, String> connectionIdUsernameMap = (Map<String, String>) handback;
+ user = connectionIdUsernameMap.get(connectionId);
+ }
+
+ // If user is still null, fallback to an unordered list of Principals from the connection id.
+ if (user == null)
+ {
+ final String[] splitConnectionId = connectionId.split(" ");
+ user = splitConnectionId[1];
+ }
- if (notification.getType().equals(JMXConnectionNotification.OPENED))
+ if (JMXConnectionNotification.OPENED.equals(type))
{
_logActor.message(ManagementConsoleMessages.OPEN(user));
}
- else if (notification.getType().equals(JMXConnectionNotification.CLOSED) ||
- notification.getType().equals(JMXConnectionNotification.FAILED))
+ else if (JMXConnectionNotification.CLOSED.equals(type) ||
+ JMXConnectionNotification.FAILED.equals(type))
{
_logActor.message(ManagementConsoleMessages.CLOSE(user));
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java Thu Mar 1 12:51:40 2012
@@ -22,6 +22,7 @@ package org.apache.qpid.server.message;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
@@ -65,7 +66,6 @@ public class AMQMessage extends Abstract
WeakReference<AMQChannel> _channelRef;
-
public AMQMessage(StoredMessage<MessageMetaData> handle)
{
this(handle, null);
@@ -122,7 +122,15 @@ public class AMQMessage extends Abstract
public String getRoutingKey()
{
- // TODO
+ MessageMetaData messageMetaData = getMessageMetaData();
+ if (messageMetaData != null)
+ {
+ AMQShortString routingKey = messageMetaData.getMessagePublishInfo().getRoutingKey();
+ if (routingKey != null)
+ {
+ return routingKey.asString();
+ }
+ }
return null;
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Thu Mar 1 12:51:40 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.message;
import java.util.concurrent.atomic.AtomicInteger;
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java Thu Mar 1 12:51:40 2012
@@ -79,7 +79,7 @@ class MessageTransferHeader implements A
public byte getPriority()
{
- MessageDeliveryPriority priority = _deliveryProps == null
+ MessageDeliveryPriority priority = _deliveryProps == null || !_deliveryProps.hasPriority()
? MessageDeliveryPriority.MEDIUM
: _deliveryProps.getPriority();
return (byte) priority.getValue();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties Thu Mar 1 12:51:40 2012
@@ -75,6 +75,7 @@ org.apache.qpid.server.exchange=0.0.0
org.apache.qpid.server.logging=0.0.0
org.apache.qpid.server.logging.actors=0.0.0
org.apache.qpid.server.logging.subjects=0.0.0
+org.apache.qpid.server.message=0.0.0
org.apache.qpid.server.management=0.0.0
org.apache.qpid.server.persistent=0.0.0
org.apache.qpid.server.plugins=0.0.0
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Thu Mar 1 12:51:40 2012
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -36,36 +37,17 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
-
import javax.management.JMException;
import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQProtocolHeaderException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -87,11 +69,15 @@ import org.apache.qpid.server.management
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.transport.Sender;
@@ -139,7 +125,7 @@ public class AMQProtocolEngine implement
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
-
+ private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
private FieldTable _clientProperties;
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
@@ -173,6 +159,9 @@ public class AMQProtocolEngine implement
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
+ private volatile boolean _deferFlush;
+ private long _lastReceivedTime;
+
public ManagedObject getManagedObject()
{
return _managedObject;
@@ -240,14 +229,29 @@ public class AMQProtocolEngine implement
return _closing.get();
}
+ public synchronized void flushBatched()
+ {
+ _sender.flush();
+ }
+
+
+ public ClientDeliveryMethod createDeliveryMethod(int channelId)
+ {
+ return new WriteDeliverMethod(channelId);
+ }
+
public void received(final ByteBuffer msg)
{
- _lastIoTime = System.currentTimeMillis();
+ final long arrivalTime = System.currentTimeMillis();
+ _lastReceivedTime = arrivalTime;
+ _lastIoTime = arrivalTime;
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- for (AMQDataBlock dataBlock : dataBlocks)
+ final int len = dataBlocks.size();
+ for (int i = 0; i < len; i++)
{
+ AMQDataBlock dataBlock = dataBlocks.get(i);
try
{
dataBlockReceived(dataBlock);
@@ -347,7 +351,7 @@ public class AMQProtocolEngine implement
}
}
- private void protocolInitiationReceived(ProtocolInitiation pi)
+ private synchronized void protocolInitiationReceived(ProtocolInitiation pi)
{
// this ensures the codec never checks for a PI message again
(_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
@@ -524,12 +528,15 @@ public class AMQProtocolEngine implement
*/
public synchronized void writeFrame(AMQDataBlock frame)
{
- _lastSent = frame;
+
final ByteBuffer buf = asByteBuffer(frame);
- _lastIoTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
_sender.send(buf);
- _sender.flush();
+ _lastIoTime = System.currentTimeMillis();
+ if(!_deferFlush)
+ {
+ _sender.flush();
+ }
}
public AMQShortString getContextKey()
@@ -918,7 +925,7 @@ public class AMQProtocolEngine implement
private void setProtocolVersion(ProtocolVersion pv)
{
_protocolVersion = pv;
-
+ _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
_protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
_dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion);
}
@@ -1023,7 +1030,7 @@ public class AMQProtocolEngine implement
public MethodRegistry getMethodRegistry()
{
- return MethodRegistry.getMethodRegistry(getProtocolVersion());
+ return _methodRegistry;
}
public MethodDispatcher getMethodDispatcher()
@@ -1052,7 +1059,7 @@ public class AMQProtocolEngine implement
// Nothing
}
- public void writerIdle()
+ public synchronized void writerIdle()
{
_sender.send(asByteBuffer(HeartbeatBody.FRAME));
}
@@ -1109,6 +1116,11 @@ public class AMQProtocolEngine implement
return _lastIoTime;
}
+ public long getLastReceivedTime()
+ {
+ return _lastReceivedTime;
+ }
+
public ProtocolSessionIdentifier getSessionIdentifier()
{
return _sessionIdentifier;
@@ -1395,16 +1407,220 @@ public class AMQProtocolEngine implement
_statisticsEnabled = enabled;
}
- @Override
public boolean isSessionNameUnique(byte[] name)
{
// 0-8/0-9/0-9-1 sessions don't have names
return true;
}
- @Override
+ public void setDeferFlush(boolean deferFlush)
+ {
+ _deferFlush = deferFlush;
+ }
+
+
+
public String getUserName()
{
return getAuthorizedPrincipal().getName();
}
+
+ private static class ByteBufferOutputStream extends OutputStream
+ {
+
+
+ private final ByteBuffer _buf;
+
+ public ByteBufferOutputStream(ByteBuffer buf)
+ {
+ _buf = buf;
+ }
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ _buf.put((byte) b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ _buf.put(b, off, len);
+ }
+ }
+
+ public final class WriteDeliverMethod
+ implements ClientDeliveryMethod
+ {
+ private final int _channelId;
+
+ public WriteDeliverMethod(int channelId)
+ {
+ _channelId = channelId;
+ }
+
+ public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ throws AMQException
+ {
+ registerMessageDelivered(entry.getMessage().getSize());
+ _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, ((SubscriptionImpl)sub).getConsumerTag());
+ entry.incrementDeliveryCount();
+ }
+
+ }
+
+ private static class BytesDataOutput implements DataOutput
+ {
+ int _pos = 0;
+ byte[] _buf;
+
+ public BytesDataOutput(byte[] buf)
+ {
+ _buf = buf;
+ }
+
+ public void setBuffer(byte[] buf)
+ {
+ _buf = buf;
+ _pos = 0;
+ }
+
+ public void reset()
+ {
+ _pos = 0;
+ }
+
+ public int length()
+ {
+ return _pos;
+ }
+
+ public void write(int b)
+ {
+ _buf[_pos++] = (byte) b;
+ }
+
+ public void write(byte[] b)
+ {
+ System.arraycopy(b, 0, _buf, _pos, b.length);
+ _pos+=b.length;
+ }
+
+
+ public void write(byte[] b, int off, int len)
+ {
+ System.arraycopy(b, off, _buf, _pos, len);
+ _pos+=len;
+
+ }
+
+ public void writeBoolean(boolean v)
+ {
+ _buf[_pos++] = v ? (byte) 1 : (byte) 0;
+ }
+
+ public void writeByte(int v)
+ {
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeShort(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeChar(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeInt(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 24);
+ _buf[_pos++] = (byte) (v >>> 16);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeLong(long v)
+ {
+ _buf[_pos++] = (byte) (v >>> 56);
+ _buf[_pos++] = (byte) (v >>> 48);
+ _buf[_pos++] = (byte) (v >>> 40);
+ _buf[_pos++] = (byte) (v >>> 32);
+ _buf[_pos++] = (byte) (v >>> 24);
+ _buf[_pos++] = (byte) (v >>> 16);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte)v;
+ }
+
+ public void writeFloat(float v)
+ {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ public void writeDouble(double v)
+ {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ public void writeBytes(String s)
+ {
+ int len = s.length();
+ for (int i = 0 ; i < len ; i++)
+ {
+ _buf[_pos++] = ((byte)s.charAt(i));
+ }
+ }
+
+ public void writeChars(String s)
+ {
+ int len = s.length();
+ for (int i = 0 ; i < len ; i++)
+ {
+ int v = s.charAt(i);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+ }
+
+ public void writeUTF(String s)
+ {
+ int strlen = s.length();
+
+ int pos = _pos;
+ _pos+=2;
+
+
+ for (int i = 0; i < strlen; i++)
+ {
+ int c = s.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F))
+ {
+ c = s.charAt(i);
+ _buf[_pos++] = (byte) c;
+
+ }
+ else if (c > 0x07FF)
+ {
+ _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
+ }
+ else
+ {
+ _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
+ }
+ }
+
+ int len = _pos - (pos + 2);
+
+ _buf[pos++] = (byte) (len >>> 8);
+ _buf[pos] = (byte) len;
+ }
+
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Thu Mar 1 12:51:40 2012
@@ -32,6 +32,7 @@ import org.apache.qpid.server.AMQChannel
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.List;
@@ -49,6 +50,14 @@ public interface AMQProtocolSession exte
boolean isClosing();
+ void flushBatched();
+
+ void setDeferFlush(boolean defer);
+
+ ClientDeliveryMethod createDeliveryMethod(int channelId);
+
+ long getLastReceivedTime();
+
public static final class ProtocolSessionIdentifier
{
private final Object _sessionIdentifier;
@@ -77,15 +86,6 @@ public interface AMQProtocolSession exte
}
/**
- * Called when a protocol data block is received
- *
- * @param message the data block that has been received
- *
- * @throws Exception if processing the datablock fails
- */
- void dataBlockReceived(AMQDataBlock message) throws Exception;
-
- /**
* Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC
* 6).
*
@@ -234,4 +234,5 @@ public interface AMQProtocolSession exte
List<AMQChannel> getChannels();
void mgmtCloseChannel(int channelId);
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Thu Mar 1 12:51:40 2012
@@ -39,89 +39,44 @@ package org.apache.qpid.server.protocol;
import java.util.Date;
import java.util.List;
-
import javax.management.JMException;
import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
import javax.management.NotCompliantMBeanException;
-import javax.management.Notification;
-import javax.management.ObjectName;
-import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.ManagementActor;
-import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.AbstractAMQManagedConnectionObject;
import org.apache.qpid.server.management.ManagedObject;
/**
* This MBean class implements the management interface. In order to make more attributes, operations and notifications
* available over JMX simply augment the ManagedConnection interface and add the appropriate implementation here.
*/
-@MBeanDescription("Management Bean for an AMQ Broker Connection")
-public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection
+@MBeanDescription("Management Bean for an AMQ Broker 0-9-1/0-9/0-8 Connections")
+public class AMQProtocolSessionMBean extends AbstractAMQManagedConnectionObject
{
private AMQProtocolSession _protocolSession = null;
- private String _name = null;
- // openmbean data types for representing the channel attributes
-
- private static final OpenType[] _channelAttributeTypes =
- { SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER, SimpleType.BOOLEAN };
- private static CompositeType _channelType = null; // represents the data type for channel data
- private static TabularType _channelsType = null; // Data type for list of channels type
private static final AMQShortString BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION =
- new AMQShortString("Broker Management Console has closed the connection.");
+ new AMQShortString(BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION_STR);
- @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
+ @MBeanConstructor("Creates an MBean exposing an AMQ Broker 0-9-1/0-9/0-8 Connection")
public AMQProtocolSessionMBean(AMQProtocolSession amqProtocolSession) throws NotCompliantMBeanException, OpenDataException
{
- super(ManagedConnection.class, ManagedConnection.TYPE);
+ super(amqProtocolSession.getRemoteAddress().toString());
_protocolSession = amqProtocolSession;
- String remote = getRemoteAddress();
- _name = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
- init();
- }
-
- static
- {
- try
- {
- init();
- }
- catch (JMException ex)
- {
- // This is not expected to ever occur.
- throw new RuntimeException("Got JMException in static initializer.", ex);
- }
- }
-
- /**
- * initialises the openmbean data types
- */
- private static void init() throws OpenDataException
- {
- _channelType =
- new CompositeType("Channel", "Channel Details", COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]),
- COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]), _channelAttributeTypes);
- _channelsType = new TabularType("Channels", "Channels", _channelType, TABULAR_UNIQUE_INDEX.toArray(new String[TABULAR_UNIQUE_INDEX.size()]));
}
public String getClientId()
@@ -169,16 +124,6 @@ public class AMQProtocolSessionMBean ext
return _protocolSession.getMaximumNumberOfChannels();
}
- public void setMaximumNumberOfChannels(Long value)
- {
- _protocolSession.setMaximumNumberOfChannels(value);
- }
-
- public String getObjectInstanceName()
- {
- return ObjectName.quote(_name);
- }
-
/**
* commits transactions for a transactional channel
*
@@ -321,25 +266,6 @@ public class AMQProtocolSessionMBean ext
}
}
- @Override
- public MBeanNotificationInfo[] getNotificationInfo()
- {
- String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
- String name = MonitorNotification.class.getName();
- String description = "Channel count has reached threshold value";
- MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
-
- return new MBeanNotificationInfo[] { info1 };
- }
-
- public void notifyClients(String notificationMsg)
- {
- Notification n =
- new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
- System.currentTimeMillis(), notificationMsg);
- _broadcaster.sendNotification(n);
- }
-
public void resetStatistics() throws Exception
{
_protocolSession.resetStatistics();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Thu Mar 1 12:51:40 2012
@@ -100,6 +100,12 @@ public class ProtocolEngine_0_10 extend
return _network.getLocalAddress();
}
+ public void received(final ByteBuffer buf)
+ {
+ super.received(buf);
+ _connection.receivedComplete();
+ }
+
public long getReadBytes()
{
return _readBytes;
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Thu Mar 1 12:51:40 2012
@@ -20,6 +20,14 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQSecurityException;
@@ -27,16 +35,28 @@ import org.apache.qpid.amqp_1_0.transpor
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
-import org.apache.qpid.amqp_1_0.type.*;
-
-import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.Released;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeType;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.filter.JMSSelectorMessageFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
@@ -47,9 +67,6 @@ import org.apache.qpid.server.txn.AutoCo
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler
{
private VirtualHost _vhost;
@@ -140,7 +157,7 @@ public class SendingLink_1_0 implements
try
{
queue = AMQQueueFactory.createAMQQueueImpl(UUID.randomUUID().toString(), false, null, true,
- false, _vhost, Collections.EMPTY_MAP);
+ false, _vhost, Collections.EMPTY_MAP);
Exchange exchange = ((ExchangeDestination) destination).getExchange();
String binding = "";
@@ -183,6 +200,9 @@ public class SendingLink_1_0 implements
catch (AMQInternalException e)
{
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ } catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Mar 1 12:51:40 2012
@@ -365,6 +365,9 @@ public class Session_1_0 implements Sess
catch (AMQSecurityException e)
{
e.printStackTrace(); //TODO.
+ } catch (AMQException e)
+ {
+ e.printStackTrace(); //TODO
}
return queue;
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Thu Mar 1 12:51:40 2012
@@ -20,6 +20,13 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.DeliveryState;
@@ -31,24 +38,14 @@ import org.apache.qpid.amqp_1_0.type.mes
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-
class Subscription_1_0 implements Subscription
{
private SendingLink_1_0 _link;
@@ -171,6 +168,17 @@ class Subscription_1_0 implements Subscr
getEndpoint().detach();
}
+ public void send(QueueEntry entry, boolean batch) throws AMQException
+ {
+ // TODO
+ send(entry);
+ }
+
+ public void flushBatched()
+ {
+ // TODO
+ }
+
public void send(final QueueEntry queueEntry) throws AMQException
{
//TODO
@@ -296,6 +304,11 @@ class Subscription_1_0 implements Subscr
return !hasCredit;
}
+ public boolean trySendLock()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void suspend()
{
if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
@@ -314,6 +327,11 @@ class Subscription_1_0 implements Subscr
_stateChangeLock.unlock();
}
+ public void releaseQueueEntry(QueueEntry queueEntryImpl)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void onDequeue(final QueueEntry queueEntry)
{
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 1 12:51:40 2012
@@ -4,4 +4,4 @@
/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1200000
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1157765,1160415-1162726,1162729-1166086,1166089-1225000
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Thu Mar 1 12:51:40 2012
@@ -20,71 +20,25 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
import java.util.Map;
+import org.apache.qpid.server.virtualhost.VirtualHost;
-public class AMQPriorityQueue extends SimpleAMQQueue
+public class AMQPriorityQueue extends OutOfOrderQueue
{
- protected AMQPriorityQueue(final AMQShortString name,
- final boolean durable,
- final AMQShortString owner,
- final boolean autoDelete,
- boolean exclusive,
- final VirtualHost virtualHost,
- int priorities, Map<String, Object> arguments)
- {
- super(name, durable, owner, autoDelete, exclusive, virtualHost,new PriorityQueueList.Factory(priorities), arguments);
- }
-
- public AMQPriorityQueue(String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive, VirtualHost virtualHost, int priorities, Map<String,Object> arguments)
+ protected AMQPriorityQueue(final String name,
+ final boolean durable,
+ final String owner,
+ final boolean autoDelete,
+ boolean exclusive,
+ final VirtualHost virtualHost,
+ Map<String, Object> arguments,
+ int priorities)
{
- this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner),
- autoDelete, exclusive,virtualHost, priorities, arguments);
+ super(name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments);
}
public int getPriorities()
{
return ((PriorityQueueList) _entries).getPriorities();
}
-
- @Override
- protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
- {
- // check that all subscriptions are not in advance of the entry
- SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
- while(subIter.advance() && entry.isAvailable())
- {
- final Subscription subscription = subIter.getNode().getSubscription();
- if(!subscription.isClosed())
- {
- QueueContext context = (QueueContext) subscription.getQueueContext();
- if(context != null)
- {
- QueueEntry subnode = context._lastSeenEntry;
- QueueEntry released = context._releasedEntry;
- while(subnode != null && entry.compareTo(subnode) < 0 && entry.isAvailable() && (released == null || released.compareTo(entry) < 0))
- {
- if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
- {
- break;
- }
- else
- {
- subnode = context._lastSeenEntry;
- released = context._releasedEntry;
- }
- }
- }
- }
-
- }
- }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Mar 1 12:51:40 2012
@@ -213,6 +213,8 @@ public interface AMQQueue extends Managa
void setAlternateExchange(Exchange exchange);
+ void setAlternateExchange(String exchangeName);
+
Map<String, Object> getArguments();
void checkCapacity(AMQChannel channel);
@@ -272,4 +274,22 @@ public interface AMQQueue extends Managa
ManagedObject getManagedObject();
void setExclusive(boolean exclusive) throws AMQException;
+
+ /**
+ * Gets the maximum delivery count. If a message on this queue
+ * is delivered more than maximumDeliveryCount, the message will be
+ * routed to the {@link #getAlternateExchange()} (if set), or otherwise
+ * discarded. 0 indicates that maximum deliver count should not be enforced.
+ *
+ * @return maximum delivery count
+ */
+ int getMaximumDeliveryCount();
+
+ /**
+ * Sets the maximum delivery count.
+ *
+ * @param maximumDeliveryCount maximum delivery count
+ */
+ public void setMaximumDeliveryCount(final int maximumDeliveryCount);
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Thu Mar 1 12:51:40 2012
@@ -20,22 +20,33 @@
*/
package org.apache.qpid.server.queue;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.configuration.QueueConfiguration;
-
-import java.util.Map;
-import java.util.HashMap;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class AMQQueueFactory
{
- public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
+ public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
+ public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
+
+ public static final String DLQ_ROUTING_KEY = "dlq";
+ public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
+ public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
+ public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
private abstract static class QueueProperty
{
@@ -80,6 +91,24 @@ public class AMQQueueFactory
}
+ private abstract static class QueueIntegerProperty extends QueueProperty
+ {
+ public QueueIntegerProperty(String argumentName)
+ {
+ super(argumentName);
+ }
+
+ public void setPropertyValue(AMQQueue queue, Object value)
+ {
+ if(value instanceof Number)
+ {
+ setPropertyValue(queue, ((Number)value).intValue());
+ }
+
+ }
+ abstract void setPropertyValue(AMQQueue queue, int value);
+ }
+
private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
new QueueLongProperty("x-qpid-maximum-message-age")
{
@@ -122,8 +151,14 @@ public class AMQQueueFactory
{
queue.setFlowResumeCapacity(value);
}
+ },
+ new QueueIntegerProperty(X_QPID_MAXIMUM_DELIVERY_COUNT)
+ {
+ public void setPropertyValue(AMQQueue queue, int value)
+ {
+ queue.setMaximumDeliveryCount(value);
+ }
}
-
};
@@ -149,17 +184,31 @@ public class AMQQueueFactory
String owner,
boolean autoDelete,
boolean exclusive,
- VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException
+ VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException
{
+ if (queueName == null)
+ {
+ throw new IllegalArgumentException("Queue name must not be null");
+ }
+
// Access check
if (!virtualHost.getSecurityManager().authoriseCreateQueue(autoDelete, durable, exclusive, null, null, new AMQShortString(queueName), owner))
{
String description = "Permission denied: queue-name '" + queueName + "'";
throw new AMQSecurityException(description);
}
-
+
+ QueueConfiguration queueConfiguration = virtualHost.getConfiguration().getQueueConfiguration(queueName);
+ boolean isDLQEnabled = isDLQEnabled(autoDelete, arguments, queueConfiguration);
+ if (isDLQEnabled)
+ {
+ validateDLNames(queueName);
+ }
+
int priorities = 1;
String conflationKey = null;
+ String sortingKey = null;
+
if(arguments != null)
{
if(arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
@@ -170,24 +219,32 @@ public class AMQQueueFactory
conflationKey = QPID_LVQ_KEY;
}
}
- else if(arguments.containsKey(X_QPID_PRIORITIES.toString()))
+ else if(arguments.containsKey(X_QPID_PRIORITIES))
{
- Object prioritiesObj = arguments.get(X_QPID_PRIORITIES.toString());
+ Object prioritiesObj = arguments.get(X_QPID_PRIORITIES);
if(prioritiesObj instanceof Number)
{
priorities = ((Number)prioritiesObj).intValue();
}
}
+ else if(arguments.containsKey(QPID_QUEUE_SORT_KEY))
+ {
+ sortingKey = (String)arguments.get(QPID_QUEUE_SORT_KEY);
+ }
}
AMQQueue q;
- if(conflationKey != null)
+ if(sortingKey != null)
+ {
+ q = new SortedQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey);
+ }
+ else if(conflationKey != null)
{
q = new ConflationQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey);
}
else if(priorities > 1)
{
- q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, priorities, arguments);
+ q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities);
}
else
{
@@ -209,10 +266,63 @@ public class AMQQueueFactory
}
}
- return q;
+ if(isDLQEnabled)
+ {
+ final String dlExchangeName = getDeadLetterExchangeName(queueName);
+ final String dlQueueName = getDeadLetterQueueName(queueName);
- }
+ final ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ final ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+ final QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ Exchange dlExchange = null;
+ synchronized(exchangeRegistry)
+ {
+ dlExchange = exchangeRegistry.getExchange(dlExchangeName);
+
+ if(dlExchange == null)
+ {
+ dlExchange = exchangeFactory.createExchange(new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
+
+ exchangeRegistry.registerExchange(dlExchange);
+
+ //enter the dle in the persistent store
+ virtualHost.getDurableConfigurationStore().createExchange(dlExchange);
+ }
+ }
+
+ AMQQueue dlQueue = null;
+
+ synchronized(queueRegistry)
+ {
+ dlQueue = queueRegistry.getQueue(dlQueueName);
+
+ if(dlQueue == null)
+ {
+ //set args to disable DLQ'ing/MDC from the DLQ itself, preventing loops etc
+ final Map<String, Object> args = new HashMap<String, Object>();
+ args.put(X_QPID_DLQ_ENABLED, false);
+ args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0);
+
+ dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, exclusive, virtualHost, args);
+ //enter the dlq in the persistent store
+ virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
+ }
+ }
+
+ //ensure the queue is bound to the exchange
+ if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue))
+ {
+ //actual routing key used does not matter due to use of fanout exchange,
+ //but we will make the key 'dlq' as it can be logged at creation.
+ virtualHost.getBindingFactory().addBinding(DLQ_ROUTING_KEY, dlQueue, dlExchange, null);
+ }
+ q.setAlternateExchange(dlExchange);
+ }
+
+ return q;
+ }
public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException
{
@@ -223,26 +333,30 @@ public class AMQQueueFactory
boolean exclusive = config.getExclusive();
String owner = config.getOwner();
Map<String,Object> arguments = null;
+
if(config.isLVQ() || config.getLVQKey() != null)
{
-
arguments = new HashMap<String,Object>();
arguments.put(QPID_LAST_VALUE_QUEUE, 1);
arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
}
- else
+ else if (config.getPriority() || config.getPriorities() > 0)
+ {
+ arguments = new HashMap<String,Object>();
+ arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
+ }
+ else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey()))
+ {
+ arguments = new HashMap<String,Object>();
+ arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey());
+ }
+ if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled())
{
- boolean priority = config.getPriority();
- int priorities = config.getPriorities();
- if(priority || priorities > 0)
+ if (arguments == null)
{
arguments = new HashMap<String,Object>();
- if (priorities < 0)
- {
- priorities = 10;
- }
- arguments.put("x-qpid-priorities", priorities);
}
+ arguments.put(X_QPID_DLQ_ENABLED, true);
}
if(config.isTopic())
@@ -259,4 +373,94 @@ public class AMQQueueFactory
return q;
}
+
+ /**
+ * Validates DLQ and DLE names
+ * <p>
+ * DLQ name and DLQ exchange name need to be validated in order to keep
+ * integrity in cases when queue name passes validation check but DLQ name
+ * or DL exchange name fails to pass it. Otherwise, we might have situations
+ * when queue is created but DL exchange or/and DLQ creation fail.
+ * <p>
+ *
+ * @param name
+ * queue name
+ * @throws IllegalArgumentException
+ * thrown if length of queue name or exchange name exceed 255
+ */
+ protected static void validateDLNames(String name)
+ {
+ // check if DLQ name and DLQ exchange name do not exceed 255
+ String exchangeName = getDeadLetterExchangeName(name);
+ if (exchangeName.length() > AMQShortString.MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("DL exchange name '" + exchangeName
+ + "' length exceeds limit of " + AMQShortString.MAX_LENGTH + " characters for queue " + name);
+ }
+ String queueName = getDeadLetterQueueName(name);
+ if (queueName.length() > AMQShortString.MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of "
+ + AMQShortString.MAX_LENGTH + " characters for queue " + name);
+ }
+ }
+
+ /**
+ * Checks if DLQ is enabled for the queue.
+ *
+ * @param autoDelete
+ * queue auto-delete flag
+ * @param arguments
+ * queue arguments
+ * @param qConfig
+ * queue configuration
+ * @return true if DLQ enabled
+ */
+ protected static boolean isDLQEnabled(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig)
+ {
+ //feature is not to be enabled for temporary queues or when explicitly disabled by argument
+ if (!autoDelete)
+ {
+ boolean dlqArgumentPresent = arguments != null && arguments.containsKey(X_QPID_DLQ_ENABLED);
+ if (dlqArgumentPresent || qConfig.isDeadLetterQueueEnabled())
+ {
+ boolean dlqEnabled = true;
+ if (dlqArgumentPresent)
+ {
+ Object argument = arguments.get(X_QPID_DLQ_ENABLED);
+ dlqEnabled = argument instanceof Boolean && ((Boolean)argument).booleanValue();
+ }
+ return dlqEnabled;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Generates a dead letter queue name for a given queue name
+ *
+ * @param name
+ * queue name
+ * @return DLQ name
+ */
+ protected static String getDeadLetterQueueName(String name)
+ {
+ ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
+ String dlQueueName = name + serverConfig.getDeadLetterQueueSuffix();
+ return dlQueueName;
+ }
+
+ /**
+ * Generates a dead letter exchange name for a given queue name
+ *
+ * @param name
+ * queue name
+ * @return DL exchange name
+ */
+ protected static String getDeadLetterExchangeName(String name)
+ {
+ ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
+ String dlExchangeName = name + serverConfig.getDeadLetterExchangeSuffix();
+ return dlExchangeName;
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1295541&r1=1295540&r2=1295541&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Thu Mar 1 12:51:40 2012
@@ -28,7 +28,7 @@ import org.apache.qpid.framing.ContentHe
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
-import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
@@ -63,23 +63,25 @@ import java.util.*;
/**
* AMQQueueMBean is the management bean for an {@link AMQQueue}.
*
- * <p/><tablse id="crc"><caption>CRC Caption</caption>
+ * <p/><table id="crc"><caption>CRC Caption</caption>
* <tr><th> Responsibilities <th> Collaborations
* </table>
*/
@MBeanDescription("Management Interface for AMQQueue")
public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
{
+
/** Used for debugging purposes. */
private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
- private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
+ /** Date/time format used for message expiration and message timestamp formatting */
+ public static final String JMSTIMESTAMP_DATETIME_FORMAT = "MM-dd-yy HH:mm:ss.SSS z";
- private AMQQueue _queue = null;
- private String _queueName = null;
+ private final AMQQueue _queue;
+ private final String _queueName;
// OpenMBean data types for viewMessages method
- private static OpenType[] _msgAttributeTypes = new OpenType[5]; // AMQ message attribute types.
+ private static OpenType[] _msgAttributeTypes = new OpenType[6]; // AMQ message attribute types.
private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
@@ -138,6 +140,7 @@ public class AMQQueueMBean extends AMQMa
_msgAttributeTypes[2] = SimpleType.LONG; // For size
_msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
_msgAttributeTypes[4] = SimpleType.LONG; // For queue position
+ _msgAttributeTypes[5] = SimpleType.INTEGER; // For delivery count
_messageDataType = new CompositeType("Message", "AMQ Message",
VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]),
@@ -176,6 +179,11 @@ public class AMQQueueMBean extends AMQMa
return _queue.getMessageCount();
}
+ public Integer getMaximumDeliveryCount()
+ {
+ return _queue.getMaximumDeliveryCount();
+ }
+
public Long getMaximumMessageSize()
{
return _queue.getMaximumMessageSize();
@@ -294,6 +302,18 @@ public class AMQQueueMBean extends AMQMa
}
}
+ public void setAlternateExchange(String exchangeName)
+ {
+ _queue.setAlternateExchange(exchangeName);
+ }
+
+ public String getAlternateExchange()
+ {
+ Exchange exchange = _queue.getAlternateExchange();
+ String name = exchange == null ? null : exchange.getName();
+ return name == null ? null : name;
+ }
+
/**
* Checks if there is any notification to be send to the listeners
*/
@@ -471,7 +491,7 @@ public class AMQQueueMBean extends AMQMa
ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
headerAttributes = getMessageHeaderProperties(headerBody);
- itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position};
+ itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
}
else if(serverMsg instanceof MessageTransferMessage)
{
@@ -480,13 +500,13 @@ public class AMQQueueMBean extends AMQMa
// Create header attributes list
headerAttributes = getMessageTransferMessageHeaderProps(msg);
- itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position};
+ itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
}
else
{
//unknown message
headerAttributes = new String[]{"N/A"};
- itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position};
+ itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
}
CompositeData messageData = new CompositeDataSupport(_messageDataType,
@@ -523,13 +543,11 @@ public class AMQQueueMBean extends AMQMa
list.add("JMSPriority = " + headerProperties.getPriority());
list.add("JMSType = " + headerProperties.getType());
- long longDate = headerProperties.getExpiration();
- String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
- list.add("JMSExpiration = " + strDate);
-
- longDate = headerProperties.getTimestamp();
- strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
- list.add("JMSTimestamp = " + strDate);
+ final long expirationDate = headerProperties.getExpiration();
+ final long timestampDate = headerProperties.getTimestamp();
+
+ addStringifiedJMSTimestamoAndJMSExpiration(list, expirationDate,
+ timestampDate);
return list.toArray(new String[list.size()]);
}
@@ -561,17 +579,32 @@ public class AMQQueueMBean extends AMQMa
list.add("JMSPriority = " + header.getPriority());
list.add("JMSType = " + header.getType());
- long longDate = header.getExpiration();
- String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
- list.add("JMSExpiration = " + strDate);
-
- longDate = header.getTimestamp();
- strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
- list.add("JMSTimestamp = " + strDate);
+ final long expirationDate = header.getExpiration();
+ final long timestampDate = header.getTimestamp();
+ addStringifiedJMSTimestamoAndJMSExpiration(list, expirationDate, timestampDate);
return list.toArray(new String[list.size()]);
}
+ private void addStringifiedJMSTimestamoAndJMSExpiration(final List<String> list,
+ final long expirationDate, final long timestampDate)
+ {
+ final SimpleDateFormat dateFormat;
+ if (expirationDate != 0 || timestampDate != 0)
+ {
+ dateFormat = new SimpleDateFormat(JMSTIMESTAMP_DATETIME_FORMAT);
+ }
+ else
+ {
+ dateFormat = null;
+ }
+
+ final String formattedExpirationDate = (expirationDate != 0) ? dateFormat.format(new Date(expirationDate)) : null;
+ final String formattedTimestampDate = (timestampDate != 0) ? dateFormat.format(new Date(timestampDate)) : null;
+ list.add("JMSExpiration = " + formattedExpirationDate);
+ list.add("JMSTimestamp = " + formattedTimestampDate);
+ }
+
/**
* @see ManagedQueue#moveMessages
* @param fromMessageId
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org