You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/17 21:19:38 UTC
svn commit: r1569102 [4/5] - in
/qpid/branches/java-broker-amqp-1-0-management/java:
amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/
bdbstore/src/test/java/org/ap...
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java Mon Feb 17 20:19:36 2014
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.times;
import java.io.File;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
@@ -40,6 +41,7 @@ import org.apache.qpid.common.AMQPFilter
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
@@ -143,7 +145,7 @@ public abstract class AbstractDurableCon
verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(EXCHANGE),
eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(),
org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type",
- org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.toString())));
+ org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name())));
}
private Map<String,Object> map(Object... vals)
@@ -220,7 +222,7 @@ public abstract class AbstractDurableCon
Map<String, Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
- queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
+ queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
}
@@ -240,7 +242,7 @@ public abstract class AbstractDurableCon
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
- queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
+ queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
queueAttributes.putAll(attributes);
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -258,7 +260,7 @@ public abstract class AbstractDurableCon
Map<String, Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
- queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
+ queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -292,8 +294,6 @@ public abstract class AbstractDurableCon
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.NAME, getName());
- queueAttributes.put(Queue.OWNER, getName()+"Owner");
- queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
queueAttributes.putAll(attributes);
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -320,8 +320,6 @@ public abstract class AbstractDurableCon
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.NAME, getName());
- queueAttributes.put(Queue.OWNER, getName()+"Owner");
- queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
queueAttributes.putAll(attributes);
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
@@ -361,13 +359,19 @@ public abstract class AbstractDurableCon
{
AMQQueue queue = mock(AMQQueue.class);
when(queue.getName()).thenReturn(queueName);
- when(queue.getOwner()).thenReturn(queueOwner);
when(queue.isExclusive()).thenReturn(exclusive);
when(queue.getId()).thenReturn(_queueId);
when(queue.getAlternateExchange()).thenReturn(alternateExchange);
- if(arguments != null && !arguments.isEmpty())
+ final Map<String,Object> attributes = arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments);
+ attributes.put(Queue.NAME, queueName);
+ if(exclusive)
{
- when(queue.getAvailableAttributes()).thenReturn(arguments.keySet());
+ when(queue.getOwner()).thenReturn(queueOwner);
+
+ attributes.put(Queue.OWNER, queueOwner);
+ attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER);
+ }
+ when(queue.getAvailableAttributes()).thenReturn(attributes.keySet());
final ArgumentCaptor<String> requestedAttribute = ArgumentCaptor.forClass(String.class);
when(queue.getAttribute(requestedAttribute.capture())).then(
new Answer()
@@ -377,10 +381,9 @@ public abstract class AbstractDurableCon
public Object answer(final InvocationOnMock invocation) throws Throwable
{
String attrName = requestedAttribute.getValue();
- return arguments.get(attrName);
+ return attributes.get(attrName);
}
});
- }
return queue;
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java Mon Feb 17 20:19:36 2014
@@ -25,11 +25,13 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.net.SocketAddress;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -182,8 +184,10 @@ public class BrokerTestHelper
public static AMQQueue createQueue(String queueName, VirtualHost virtualHost)
throws QpidSecurityException, QueueExistsException
{
- AMQQueue queue = virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null,
- false, false, false, Collections.<String, Object>emptyMap());
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+ attributes.put(Queue.NAME, queueName);
+ AMQQueue queue = virtualHost.createQueue(null, attributes);
return queue;
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java Mon Feb 17 20:19:36 2014
@@ -46,6 +46,7 @@ import org.apache.qpid.server.store.Conf
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
+import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.test.utils.QpidTestCase;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
@@ -117,14 +118,11 @@ public class DurableConfigurationRecover
- final ArgumentCaptor<UUID> idArg = ArgumentCaptor.forClass(UUID.class);
- final ArgumentCaptor<String> queueArg = ArgumentCaptor.forClass(String.class);
- final ArgumentCaptor<Map> argsArg = ArgumentCaptor.forClass(Map.class);
+ final ArgumentCaptor<Map> attributesArg = ArgumentCaptor.forClass(Map.class);
_queueFactory = mock(QueueFactory.class);
- when(_queueFactory.restoreQueue(idArg.capture(), queueArg.capture(),
- anyString(), anyBoolean(), anyBoolean(), anyBoolean(), argsArg.capture())).then(
+ when(_queueFactory.restoreQueue(attributesArg.capture())).then(
new Answer()
{
@@ -133,8 +131,9 @@ public class DurableConfigurationRecover
{
final AMQQueue queue = mock(AMQQueue.class);
- final String queueName = queueArg.getValue();
- final UUID queueId = idArg.getValue();
+ final Map attributes = attributesArg.getValue();
+ final String queueName = (String) attributes.get(Queue.NAME);
+ final UUID queueId = MapValueConverter.getUUIDAttribute(Queue.ID, attributes);
when(queue.getName()).thenReturn(queueName);
when(queue.getId()).thenReturn(queueId);
@@ -153,10 +152,10 @@ public class DurableConfigurationRecover
return null;
}
}
- ).when(queue).setAlternateExchange(altExchangeArg.capture());
+ ).when(queue).setAlternateExchange(altExchangeArg.capture());
- Map args = argsArg.getValue();
- if(args.containsKey(Queue.ALTERNATE_EXCHANGE))
+ Map args = attributes;
+ if (args.containsKey(Queue.ALTERNATE_EXCHANGE))
{
final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString());
final Exchange exchange = _exchangeRegistry.getExchange(exchangeId);
@@ -470,7 +469,6 @@ public class DurableConfigurationRecover
{
queue.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeId.toString());
}
- queue.put(Queue.EXCLUSIVE, false);
return queue;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java Mon Feb 17 20:19:36 2014
@@ -29,6 +29,7 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -152,14 +153,7 @@ public class MockVirtualHost implements
}
@Override
- public AMQQueue createQueue(UUID id,
- String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive,
- boolean deleteOnNoConsumer,
- Map<String, Object> arguments)
+ public AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> arguments)
{
return null;
}
@@ -314,4 +308,52 @@ public class MockVirtualHost implements
public void unblock()
{
}
+
+ @Override
+ public long getDefaultAlertThresholdMessageAge()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getDefaultAlertThresholdMessageSize()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getDefaultAlertThresholdQueueDepthMessages()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getDefaultAlertThresholdQueueDepthBytes()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getDefaultAlertRepeatGap()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getDefaultQueueFlowControlSizeBytes()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getDefaultQueueFlowResumeSizeBytes()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getDefaultMaximumDeliveryAttempts()
+ {
+ return 0;
+ }
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Mon Feb 17 20:19:36 2014
@@ -25,6 +25,7 @@ import java.security.Principal;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
@@ -42,6 +43,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Connection;
@@ -56,7 +58,8 @@ import static org.apache.qpid.server.log
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
-public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
+public class ServerConnection extends Connection implements AMQConnectionModel<ServerConnection, ServerSession>,
+ LogSubject, AuthorizationHolder
{
private Runnable _onOpenTask;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
@@ -72,6 +75,10 @@ public class ServerConnection extends Co
private AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
private Transport _transport;
+
+ private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList =
+ new CopyOnWriteArrayList<Action<? super ServerConnection>>();
+
private volatile boolean _stopped;
public ServerConnection(final long connectionId, Broker broker)
@@ -197,7 +204,7 @@ public class ServerConnection extends Co
_onOpenTask = task;
}
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
+ public void closeSession(ServerSession session, AMQConstant cause, String message)
{
ExecutionException ex = new ExecutionException();
ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR;
@@ -211,7 +218,7 @@ public class ServerConnection extends Co
}
ex.setErrorCode(code);
ex.setDescription(message);
- ((ServerSession)session).invoke(ex);
+ session.invoke(ex);
session.close(cause, message);
}
@@ -315,6 +322,7 @@ public class ServerConnection extends Co
public void close(AMQConstant cause, String message)
{
closeSubscriptions();
+ performDeleteTasks();
ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
try
{
@@ -327,6 +335,14 @@ public class ServerConnection extends Co
close(replyCode, message);
}
+ protected void performDeleteTasks()
+ {
+ for(Action<? super ServerConnection> task : _taskList)
+ {
+ task.performAction(this);
+ }
+ }
+
public synchronized void block()
{
if(!_blocking)
@@ -367,12 +383,12 @@ public class ServerConnection extends Co
super.removeSession(ssn);
}
- public List<AMQSessionModel> getSessionModels()
+ public List<ServerSession> getSessionModels()
{
- List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+ List<ServerSession> sessions = new ArrayList<ServerSession>();
for (Session ssn : getChannels())
{
- sessions.add((AMQSessionModel) ssn);
+ sessions.add((ServerSession) ssn);
}
return sessions;
}
@@ -475,14 +491,10 @@ public class ServerConnection extends Co
return String.valueOf(getRemoteAddress());
}
- public String getUserName()
- {
- return _authorizedPrincipal.getName();
- }
-
@Override
public void closed()
{
+ performDeleteTasks();
closeSubscriptions();
super.closed();
}
@@ -522,6 +534,12 @@ public class ServerConnection extends Co
}
@Override
+ public String getRemoteContainerName()
+ {
+ return getConnectionDelegate().getClientId();
+ }
+
+ @Override
public String getClientVersion()
{
return getConnectionDelegate().getClientVersion();
@@ -533,11 +551,6 @@ public class ServerConnection extends Co
return getConnectionDelegate().getClientProduct();
}
- public String getPrincipalAsString()
- {
- return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
- }
-
public long getSessionCountLimit()
{
return getChannelMax();
@@ -565,4 +578,16 @@ public class ServerConnection extends Co
super.doHeartBeat();
}
+
+ @Override
+ public void addDeleteTask(final Action<? super ServerConnection> task)
+ {
+ _taskList.add(task);
+ }
+
+ @Override
+ public void removeDeleteTask(final Action<? super ServerConnection> task)
+ {
+ _taskList.remove(task);
+ }
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Mon Feb 17 20:19:36 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.security.Principal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -310,14 +311,18 @@ public class ServerConnectionDelegate ex
private boolean isSessionNameUnique(final byte[] name, final Connection conn)
{
final ServerConnection sconn = (ServerConnection) conn;
- final String userId = sconn.getUserName();
+ final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal();
+ final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName();
final Iterator<AMQConnectionModel> connections =
((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator();
while(connections.hasNext())
{
- final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next();
- if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name))
+ final AMQConnectionModel amqConnectionModel = connections.next();
+ final String userName = amqConnectionModel.getAuthorizedPrincipal() == null
+ ? ""
+ : amqConnectionModel.getAuthorizedPrincipal().getName();
+ if (userId.equals(userName) && !amqConnectionModel.isSessionNameUnique(name))
{
return false;
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Mon Feb 17 20:19:36 2014
@@ -78,6 +78,7 @@ import org.apache.qpid.server.txn.Suspen
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
import org.slf4j.Logger;
@@ -88,7 +89,9 @@ import static org.apache.qpid.util.Seria
public class ServerSession extends Session
implements AuthorizationHolder,
- AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
+ AMQSessionModel<ServerSession,ServerConnection>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder,
+ Deletable<ServerSession>
+
{
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
@@ -132,7 +135,7 @@ public class ServerSession extends Sessi
private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
- private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>();
+ private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>();
private final TransactionTimeoutHelper _transactionTimeoutHelper;
@@ -374,7 +377,7 @@ public class ServerSession extends Sessi
}
_messageDispositionListenerMap.clear();
- for (Action<ServerSession> task : _taskList)
+ for (Action<? super ServerSession> task : _taskList)
{
task.performAction(this);
}
@@ -610,12 +613,12 @@ public class ServerSession extends Sessi
return getConnection().getAuthorizedSubject();
}
- public void addSessionCloseTask(Action<ServerSession> task)
+ public void addDeleteTask(Action<? super ServerSession> task)
{
_taskList.add(task);
}
- public void removeSessionCloseTask(Action<ServerSession> task)
+ public void removeDeleteTask(Action<? super ServerSession> task)
{
_taskList.remove(task);
}
@@ -652,7 +655,7 @@ public class ServerSession extends Sessi
return _id;
}
- public AMQConnectionModel getConnectionModel()
+ public ServerConnection getConnectionModel()
{
return getConnection();
}
@@ -922,7 +925,7 @@ public class ServerSession extends Sessi
}
@Override
- public int compareTo(AMQSessionModel o)
+ public int compareTo(ServerSession o)
{
return getId().compareTo(o.getId());
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Mon Feb 17 20:19:36 2014
@@ -25,6 +25,8 @@ import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.server.exchange.Exchange;
@@ -204,47 +206,12 @@ public class ServerSessionDelegate exten
{
exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
}
- else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
- {
- exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
- }
- else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session)
+ else if(!queue.verifySessionAccess((ServerSession)session))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
else
{
- if(queue.isExclusive())
- {
- ServerSession s = (ServerSession) session;
- queue.setExclusiveOwningSession(s);
-
- ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
- {
- public void performAction(ServerSession session)
- {
- if(queue.getExclusiveOwningSession() == session)
- {
- queue.setExclusiveOwningSession(null);
- }
- }
- });
-
- if(queue.getAuthorizationHolder() == null)
- {
- queue.setAuthorizationHolder(s);
- ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
- {
- public void performAction(ServerSession session)
- {
- if(queue.getAuthorizationHolder() == session)
- {
- queue.setAuthorizationHolder(null);
- }
- }
- });
- }
- }
FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
@@ -302,6 +269,10 @@ public class ServerSessionDelegate exten
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
}
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an incompatible exclusivity policy");
+ }
}
}
}
@@ -1197,7 +1168,7 @@ public class ServerSessionDelegate exten
exception(session, method, errorCode, description);
}
- else if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ else if (!queue.verifySessionAccess((ServerSession)session))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1214,7 +1185,6 @@ public class ServerSessionDelegate exten
try
{
- String owner = method.getExclusive() ? ((ServerSession)session).getClientID() : null;
final String alternateExchangeName = method.getAlternateExchange();
@@ -1227,66 +1197,36 @@ public class ServerSessionDelegate exten
final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName());
- final boolean deleteOnNoConsumer = !exclusive && autoDelete;
+ arguments.put(Queue.ID, id);
+ arguments.put(Queue.NAME, queueName);
- queue = virtualHost.createQueue(id, queueName, method.getDurable(), owner,
- autoDelete, exclusive, deleteOnNoConsumer,
- arguments);
-
- if (autoDelete && exclusive)
+ LifetimePolicy lifetime;
+ if(autoDelete)
{
- final AMQQueue q = queue;
- final Action<ServerSession> deleteQueueTask = new Action<ServerSession>()
- {
- public void performAction(ServerSession session)
- {
- try
- {
- virtualHost.removeQueue(q);
- }
- catch (QpidSecurityException e)
- {
- exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
- }
- }
- };
- final ServerSession s = (ServerSession) session;
- s.addSessionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new Action<AMQQueue>()
- {
- public void performAction(AMQQueue queue)
- {
- s.removeSessionCloseTask(deleteQueueTask);
- }
- });
+ lifetime = exclusive ? LifetimePolicy.DELETE_ON_SESSION_END
+ : LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS;
}
- if (exclusive)
+ else
{
- final AMQQueue q = queue;
- final Action<ServerSession> removeExclusive = new Action<ServerSession>()
- {
- public void performAction(ServerSession session)
- {
- q.setAuthorizationHolder(null);
- q.setExclusiveOwningSession(null);
- }
- };
- final ServerSession s = (ServerSession) session;
- q.setExclusiveOwningSession(s);
- s.addSessionCloseTask(removeExclusive);
- queue.addQueueDeleteTask(new Action<AMQQueue>()
- {
- public void performAction(AMQQueue queue)
- {
- s.removeSessionCloseTask(removeExclusive);
- }
- });
+ lifetime = LifetimePolicy.PERMANENT;
}
+
+ arguments.put(Queue.LIFETIME_POLICY, lifetime);
+
+ ExclusivityPolicy exclusivityPolicy = exclusive ? ExclusivityPolicy.SESSION : ExclusivityPolicy.NONE;
+
+
+ arguments.put(Queue.DURABLE, method.getDurable());
+
+ arguments.put(Queue.EXCLUSIVE, exclusivityPolicy);
+
+ queue = virtualHost.createQueue((ServerSession)session, arguments);
+
}
catch(QueueExistsException qe)
{
queue = qe.getExistingQueue();
- if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ if (!queue.verifySessionAccess((ServerSession)session))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1347,11 +1287,7 @@ public class ServerSessionDelegate exten
}
else
{
- if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
- {
- exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
- }
- else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session)
+ if(!queue.verifySessionAccess((ServerSession)session))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
@@ -1424,7 +1360,7 @@ public class ServerSessionDelegate exten
result.setQueue(queue.getName());
result.setDurable(queue.isDurable());
result.setExclusive(queue.isExclusive());
- result.setAutoDelete(queue.isAutoDelete());
+ result.setAutoDelete(queue.getLifetimePolicy() != LifetimePolicy.PERMANENT);
Map<String, Object> arguments = new LinkedHashMap<String, Object>();
Collection<String> availableAttrs = queue.getAvailableAttributes();
@@ -1500,7 +1436,6 @@ public class ServerSessionDelegate exten
public void closed(Session session)
{
setThreadSubject(session);
-
ServerSession serverSession = (ServerSession)session;
serverSession.stopSubscriptions();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Feb 17 20:19:36 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -30,6 +31,8 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQMethodBody;
@@ -86,7 +89,9 @@ import org.apache.qpid.server.util.Conne
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;
-public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder
+public class AMQChannel<T extends AMQProtocolSession<T>>
+ implements AMQSessionModel<AMQChannel<T>,T>,
+ AsyncAutoCommitTransaction.FutureRecorder
{
public static final int DEFAULT_PREFETCH = 4096;
@@ -140,7 +145,7 @@ public class AMQChannel implements AMQSe
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private final AMQProtocolSession _session;
+ private final T _session;
private AtomicBoolean _closing = new AtomicBoolean(false);
private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
@@ -163,12 +168,15 @@ public class AMQChannel implements AMQSe
private final TransactionTimeoutHelper _transactionTimeoutHelper;
private final UUID _id = UUID.randomUUID();
+ private final List<Action<? super AMQChannel<T>>> _taskList =
+ new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>();
+
private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
private final ImmediateAction _immediateAction = new ImmediateAction();
- public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
+ public AMQChannel(T session, int channelId, MessageStore messageStore)
throws AMQException
{
_session = session;
@@ -526,7 +534,8 @@ public class AMQChannel implements AMQSe
public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
FieldTable filters, boolean exclusive, boolean noLocal)
throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive,
- MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException
+ MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException,
+ MessageSource.ConsumerAccessRefused
{
if (tag == null)
{
@@ -580,7 +589,15 @@ public class AMQChannel implements AMQSe
{
filterManager = new SimpleFilterManager();
}
- filterManager.add(new FilterSupport.NoLocalFilter(source));
+ final Object connectionReference = getConnectionReference();
+ filterManager.add(new MessageFilter()
+ {
+ @Override
+ public boolean matches(final Filterable message)
+ {
+ return message.getConnectionReference() != connectionReference;
+ }
+ });
}
Consumer sub =
source.addConsumer(target,
@@ -609,6 +626,11 @@ public class AMQChannel implements AMQSe
_tag2SubscriptionTargetMap.remove(tag);
throw e;
}
+ catch (MessageSource.ConsumerAccessRefused e)
+ {
+ _tag2SubscriptionTargetMap.remove(tag);
+ throw e;
+ }
return tag;
}
@@ -657,6 +679,13 @@ public class AMQChannel implements AMQSe
CurrentActor.get().message(_logSubject, operationalLogMessage);
unsubscribeAllConsumers();
+
+ for (Action<? super AMQChannel<T>> task : _taskList)
+ {
+ task.performAction(this);
+ }
+
+
_transaction.rollback();
try
@@ -692,9 +721,10 @@ public class AMQChannel implements AMQSe
Consumer sub = me.getValue().getConsumer();
-
- sub.close();
-
+ if(sub != null)
+ {
+ sub.close();
+ }
}
_tag2SubscriptionTargetMap.clear();
@@ -1192,7 +1222,8 @@ public class AMQChannel implements AMQSe
return _id;
}
- public AMQConnectionModel getConnectionModel()
+ @Override
+ public T getConnectionModel()
{
return _session;
}
@@ -1208,11 +1239,23 @@ public class AMQChannel implements AMQSe
}
@Override
- public int compareTo(AMQSessionModel o)
+ public int compareTo(AMQChannel o)
{
return getId().compareTo(o.getId());
}
+ @Override
+ public void addDeleteTask(final Action<? super AMQChannel<T>> task)
+ {
+ _taskList.add(task);
+ }
+
+ @Override
+ public void removeDeleteTask(final Action<? super AMQChannel<T>> task)
+ {
+ _taskList.remove(task);
+ }
+
private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>>
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Mon Feb 17 20:19:36 2014
@@ -33,7 +33,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -47,18 +46,15 @@ import org.apache.qpid.AMQChannelExcepti
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
-import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.logging.LogActor;
@@ -78,6 +74,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -86,7 +83,7 @@ import org.apache.qpid.transport.Transpo
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;
-public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession
+public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine>
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -103,9 +100,11 @@ public class AMQProtocolEngine implement
private VirtualHost _virtualHost;
- private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+ private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap =
+ new HashMap<Integer, AMQChannel<AMQProtocolEngine>>();
- private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
+ @SuppressWarnings("unchecked")
+ private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
/**
* The channels that the latest call to {@link #received(ByteBuffer)} applied to.
@@ -114,9 +113,8 @@ public class AMQProtocolEngine implement
*
* Thread-safety: guarded by {@link #_receivedLock}.
*/
- private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<AMQChannel>();
-
- private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
+ private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage =
+ new HashSet<AMQChannel<AMQProtocolEngine>>();
private final AMQStateManager _stateManager;
@@ -124,10 +122,6 @@ public class AMQProtocolEngine implement
private SaslServer _saslServer;
- private Object _lastReceived;
-
- private Object _lastSent;
-
private volatile boolean _closed;
// maximum number of channels this session should have
@@ -136,8 +130,8 @@ public class AMQProtocolEngine implement
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
- private FieldTable _clientProperties;
- private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<? super AMQProtocolEngine>> _taskList =
+ new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>();
private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
private ProtocolOutputConverter _protocolOutputConverter;
@@ -153,12 +147,9 @@ public class AMQProtocolEngine implement
private long _lastIoTime;
private long _writtenBytes;
- private long _readBytes;
-
private long _maxFrameSize;
private final AtomicBoolean _closing = new AtomicBoolean(false);
- private long _createTime = System.currentTimeMillis();
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
@@ -176,6 +167,7 @@ public class AMQProtocolEngine implement
private volatile boolean _closeWhenNoRoute;
private volatile boolean _stopped;
+ private long _readBytes;
public AMQProtocolEngine(Broker broker,
NetworkConnection network,
@@ -258,15 +250,14 @@ public class AMQProtocolEngine implement
final long arrivalTime = System.currentTimeMillis();
_lastReceivedTime = arrivalTime;
_lastIoTime = arrivalTime;
+ _readBytes += msg.remaining();
_receivedLock.lock();
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- final int len = dataBlocks.size();
- for (int i = 0; i < len; i++)
+ for (AMQDataBlock dataBlock : dataBlocks)
{
- AMQDataBlock dataBlock = dataBlocks.get(i);
try
{
dataBlockReceived(dataBlock);
@@ -316,7 +307,7 @@ public class AMQProtocolEngine implement
private void receivedComplete()
{
- for (AMQChannel channel : _channelsForCurrentMessage)
+ for (AMQChannel<AMQProtocolEngine> channel : _channelsForCurrentMessage)
{
channel.receivedComplete();
}
@@ -334,7 +325,6 @@ public class AMQProtocolEngine implement
*/
private void dataBlockReceived(AMQDataBlock message) throws Exception
{
- _lastReceived = message;
if (message instanceof ProtocolInitiation)
{
protocolInitiationReceived((ProtocolInitiation) message);
@@ -363,7 +353,7 @@ public class AMQProtocolEngine implement
private void frameReceived(AMQFrame frame) throws AMQException
{
int channelId = frame.getChannel();
- AMQChannel amqChannel = _channelMap.get(channelId);
+ AMQChannel<AMQProtocolEngine> amqChannel = _channelMap.get(channelId);
if(amqChannel != null)
{
// The _receivedLock is already acquired in the caller
@@ -558,14 +548,6 @@ public class AMQProtocolEngine implement
{
boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
- if (!_frameListeners.isEmpty())
- {
- for (AMQMethodListener listener : _frameListeners)
- {
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
- }
- }
-
if (!wasAnyoneInterested)
{
throw new AMQNoMethodHandlerException(evt);
@@ -611,11 +593,6 @@ public class AMQProtocolEngine implement
}
catch (Exception e)
{
- for (AMQMethodListener listener : _frameListeners)
- {
- listener.error(e);
- }
-
_logger.error("Unexpected exception while processing frame. Closing connection.", e);
closeProtocolSession();
@@ -625,7 +602,7 @@ public class AMQProtocolEngine implement
public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
- AMQChannel channel = getAndAssertChannel(channelId);
+ AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
channel.publishContentHeader(body);
@@ -633,7 +610,7 @@ public class AMQProtocolEngine implement
public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
{
- AMQChannel channel = getAndAssertChannel(channelId);
+ AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
channel.publishContentBody(body);
}
@@ -681,17 +658,17 @@ public class AMQProtocolEngine implement
_contextKey = contextKey;
}
- public List<AMQChannel> getChannels()
+ public List<AMQChannel<AMQProtocolEngine>> getChannels()
{
synchronized (_channelMap)
{
- return new ArrayList<AMQChannel>(_channelMap.values());
+ return new ArrayList<AMQChannel<AMQProtocolEngine>>(_channelMap.values());
}
}
- public AMQChannel getAndAssertChannel(int channelId) throws AMQException
+ public AMQChannel<AMQProtocolEngine> getAndAssertChannel(int channelId) throws AMQException
{
- AMQChannel channel = getChannel(channelId);
+ AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
if (channel == null)
{
throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
@@ -700,9 +677,9 @@ public class AMQProtocolEngine implement
return channel;
}
- public AMQChannel getChannel(int channelId)
+ public AMQChannel<AMQProtocolEngine> getChannel(int channelId)
{
- final AMQChannel channel =
+ final AMQChannel<AMQProtocolEngine> channel =
((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
if ((channel == null) || channel.isClosing())
{
@@ -719,7 +696,7 @@ public class AMQProtocolEngine implement
return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
}
- public void addChannel(AMQChannel channel) throws AMQException
+ public void addChannel(AMQChannel<AMQProtocolEngine> channel) throws AMQException
{
if (_closed)
{
@@ -770,7 +747,7 @@ public class AMQProtocolEngine implement
_maxNoOfChannels = value;
}
- public void commitTransactions(AMQChannel channel) throws AMQException
+ public void commitTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException
{
if ((channel != null) && channel.isTransactional())
{
@@ -778,7 +755,7 @@ public class AMQProtocolEngine implement
}
}
- public void rollbackTransactions(AMQChannel channel) throws AMQException
+ public void rollbackTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException
{
if ((channel != null) && channel.isTransactional())
{
@@ -802,7 +779,7 @@ public class AMQProtocolEngine implement
public void closeChannel(int channelId, AMQConstant cause, String message)
{
- final AMQChannel channel = getChannel(channelId);
+ final AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
if (channel == null)
{
throw new IllegalArgumentException("Unknown channel id");
@@ -879,12 +856,10 @@ public class AMQProtocolEngine implement
/**
* Closes all channels that were opened by this protocol session. This frees up all resources used by the channel.
- *
- * @throws AMQException if an error occurs while closing any channel
*/
private void closeAllChannels()
{
- for (AMQChannel channel : getChannels())
+ for (AMQChannel<AMQProtocolEngine> channel : getChannels())
{
channel.close();
}
@@ -929,9 +904,9 @@ public class AMQProtocolEngine implement
closeAllChannels();
- for (Task task : _taskList)
+ for (Action<? super AMQProtocolEngine> task : _taskList)
{
- task.doTask(this);
+ task.performAction(this);
}
synchronized(this)
@@ -961,7 +936,7 @@ public class AMQProtocolEngine implement
}
catch (InterruptedException e)
{
-
+ // do nothing
}
finally
{
@@ -1027,11 +1002,6 @@ public class AMQProtocolEngine implement
return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")");
}
- public String dump()
- {
- return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived;
- }
-
/** @return an object that can be used to identity */
public Object getKey()
{
@@ -1069,10 +1039,9 @@ public class AMQProtocolEngine implement
public void setClientProperties(FieldTable clientProperties)
{
- _clientProperties = clientProperties;
- if (_clientProperties != null)
+ if (clientProperties != null)
{
- String closeWhenNoRoute = _clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE);
+ String closeWhenNoRoute = clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE);
if (closeWhenNoRoute != null)
{
_closeWhenNoRoute = Boolean.parseBoolean(closeWhenNoRoute);
@@ -1082,10 +1051,10 @@ public class AMQProtocolEngine implement
}
}
- _clientVersion = _clientProperties.getString(ConnectionStartProperties.VERSION_0_8);
- _clientProduct = _clientProperties.getString(ConnectionStartProperties.PRODUCT);
+ _clientVersion = clientProperties.getString(ConnectionStartProperties.VERSION_0_8);
+ _clientProduct = clientProperties.getString(ConnectionStartProperties.PRODUCT);
- String clientId = _clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8);
+ String clientId = clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8);
if (clientId != null)
{
setContextKey(new AMQShortString(clientId));
@@ -1118,11 +1087,6 @@ public class AMQProtocolEngine implement
return _protocolVersion.getMinorVersion();
}
- public boolean isProtocolVersion(byte major, byte minor)
- {
- return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor);
- }
-
public MethodRegistry getRegistry()
{
return getMethodRegistry();
@@ -1141,12 +1105,12 @@ public class AMQProtocolEngine implement
}
- public void addSessionCloseTask(Task task)
+ public void addDeleteTask(Action<? super AMQProtocolEngine> task)
{
_taskList.add(task);
}
- public void removeSessionCloseTask(Task task)
+ public void removeDeleteTask(Action<? super AMQProtocolEngine> task)
{
_taskList.remove(task);
}
@@ -1341,51 +1305,11 @@ public class AMQProtocolEngine implement
return _clientProduct;
}
- public String getPrincipalAsString()
- {
- return getAuthId();
- }
-
public long getSessionCountLimit()
{
return getMaximumNumberOfChannels();
}
- public Boolean isIncoming()
- {
- return true;
- }
-
- public Boolean isSystemConnection()
- {
- return false;
- }
-
- public Boolean isFederationLink()
- {
- return false;
- }
-
- public String getAuthId()
- {
- return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
- }
-
- public Integer getRemotePID()
- {
- return null;
- }
-
- public String getRemoteProcessName()
- {
- return null;
- }
-
- public Integer getRemoteParentPID()
- {
- return null;
- }
-
public boolean isDurable()
{
return false;
@@ -1401,52 +1325,6 @@ public class AMQProtocolEngine implement
return String.valueOf(getRemoteAddress());
}
- public long getCreateTime()
- {
- return _createTime;
- }
-
- public Boolean isShadow()
- {
- return false;
- }
-
- public void mgmtClose()
- {
- MethodRegistry methodRegistry = getMethodRegistry();
- ConnectionCloseBody responseBody =
- methodRegistry.createConnectionCloseBody(
- AMQConstant.REPLY_SUCCESS.getCode(),
- new AMQShortString("The connection was closed using the broker's management interface."),
- 0,0);
-
- // This seems ugly but because we use closeConnection in both normal
- // broker operation and as part of the management interface it cannot
- // be avoided. The Current Actor will be null when this method is
- // called via the QMF management interface. As such we need to set one.
- boolean removeActor = false;
- if (CurrentActor.get() == null)
- {
- removeActor = true;
- CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger()));
- }
-
- try
- {
- writeFrame(responseBody.generateFrame(0));
-
- closeSession();
-
- }
- finally
- {
- if (removeActor)
- {
- CurrentActor.remove();
- }
- }
- }
-
public void mgmtCloseChannel(int channelId)
{
MethodRegistry methodRegistry = getMethodRegistry();
@@ -1481,14 +1359,9 @@ public class AMQProtocolEngine implement
}
}
- public String getClientID()
- {
- return getContextKey().toString();
- }
-
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
+ public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message)
{
- int channelId = ((AMQChannel)session).getChannelId();
+ int channelId = session.getChannelId();
closeChannel(channelId, cause, message);
MethodRegistry methodRegistry = getMethodRegistry();
@@ -1506,7 +1379,7 @@ public class AMQProtocolEngine implement
closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
getProtocolOutputConverter().getProtocolMajorVersion(),
getProtocolOutputConverter().getProtocolMinorVersion(),
- (Throwable) null));
+ null));
}
public void block()
@@ -1516,7 +1389,7 @@ public class AMQProtocolEngine implement
if(!_blocking)
{
_blocking = true;
- for(AMQChannel channel : _channelMap.values())
+ for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
{
channel.block();
}
@@ -1531,7 +1404,7 @@ public class AMQProtocolEngine implement
if(_blocking)
{
_blocking = false;
- for(AMQChannel channel : _channelMap.values())
+ for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
{
channel.unblock();
}
@@ -1544,9 +1417,9 @@ public class AMQProtocolEngine implement
return _closed;
}
- public List<AMQSessionModel> getSessionModels()
+ public List<AMQChannel<AMQProtocolEngine>> getSessionModels()
{
- return new ArrayList<AMQSessionModel>(getChannels());
+ return new ArrayList<AMQChannel<AMQProtocolEngine>>(getChannels());
}
public LogSubject getLogSubject()
@@ -1620,14 +1493,15 @@ public class AMQProtocolEngine implement
return String.valueOf(getContextKey());
}
- public void setDeferFlush(boolean deferFlush)
+ @Override
+ public String getRemoteContainerName()
{
- _deferFlush = deferFlush;
+ return String.valueOf(getContextKey());
}
- public String getUserName()
+ public void setDeferFlush(boolean deferFlush)
{
- return getAuthorizedPrincipal().getName();
+ _deferFlush = deferFlush;
}
public final class WriteDeliverMethod
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java Mon Feb 17 20:19:36 2014
@@ -37,12 +37,14 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel
+public interface AMQProtocolSession<T extends AMQProtocolSession<T>>
+ extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel<T,AMQChannel<T>>
{
long getSessionID();
@@ -69,11 +71,6 @@ public interface AMQProtocolSession exte
*/
SocketAddress getLocalAddress();
- public static interface Task
- {
- public void doTask(AMQProtocolSession session);
- }
-
/**
* Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC
* 6).
@@ -98,7 +95,7 @@ public interface AMQProtocolSession exte
*
* @return null if no channel exists, the channel otherwise
*/
- AMQChannel getChannel(int channelId);
+ AMQChannel<T> getChannel(int channelId);
/**
* Associate a channel with this session.
@@ -106,7 +103,7 @@ public interface AMQProtocolSession exte
* @param channel the channel to associate with this session. It is an error to associate the same channel with more
* than one session but this is not validated.
*/
- void addChannel(AMQChannel channel) throws AMQException;
+ void addChannel(AMQChannel<T> channel) throws AMQException;
/**
* Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
@@ -185,10 +182,6 @@ public interface AMQProtocolSession exte
void setVirtualHost(VirtualHost virtualHost) throws AMQException;
- void addSessionCloseTask(Task task);
-
- void removeSessionCloseTask(Task task);
-
public ProtocolOutputConverter getProtocolOutputConverter();
void setAuthorizedSubject(Subject authorizedSubject);
@@ -209,11 +202,11 @@ public interface AMQProtocolSession exte
void setMaximumNumberOfChannels(Long value);
- void commitTransactions(AMQChannel channel) throws AMQException;
+ void commitTransactions(AMQChannel<T> channel) throws AMQException;
- void rollbackTransactions(AMQChannel channel) throws AMQException;
+ void rollbackTransactions(AMQChannel<T> channel) throws AMQException;
- List<AMQChannel> getChannels();
+ List<AMQChannel<T>> getChannels();
void mgmtCloseChannel(int channelId);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Mon Feb 17 20:19:36 2014
@@ -99,16 +99,6 @@ public class BasicConsumeMethodHandler i
{
final AMQShortString consumerTagName;
- if (queue.isExclusive() && !queue.isDurable())
- {
- AMQSessionModel session = queue.getExclusiveOwningSession();
- if (session == null || session.getConnectionModel() != protocolConnection)
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
- }
- }
-
if (body.getConsumerTag() != null)
{
consumerTagName = body.getConsumerTag().intern(false);
@@ -184,6 +174,13 @@ public class BasicConsumeMethodHandler i
+ queue.getName()
+ " permission denied");
}
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " as it already has an incompatible exclusivity policy");
+ }
}
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Mon Feb 17 20:19:36 2014
@@ -98,15 +98,6 @@ public class BasicGetMethodHandler imple
}
else
{
- if (queue.isExclusive())
- {
- AMQSessionModel session = queue.getExclusiveOwningSession();
- if (session == null || session.getConnectionModel() != protocolConnection)
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue is exclusive, but not created on this Connection.");
- }
- }
try
{
@@ -136,6 +127,11 @@ public class BasicGetMethodHandler imple
"The GET request has been evaluated as an exclusive consumer, " +
"this is likely due to a programming error in the Qpid broker");
}
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue has an incompatible exclusivit policy");
+ }
}
}
}
@@ -145,7 +141,7 @@ public class BasicGetMethodHandler imple
final AMQChannel channel,
final boolean acks)
throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive,
- MessageSource.ExistingExclusiveConsumer
+ MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused
{
final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java Mon Feb 17 20:19:36 2014
@@ -116,15 +116,6 @@ public class QueueBindHandler implements
try
{
- if (queue.isExclusive() && !queue.isDurable())
- {
- AMQSessionModel session = queue.getExclusiveOwningSession();
- if (session == null || session.getConnectionModel() != protocolConnection)
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
- }
- }
Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments());
String bindingKey = String.valueOf(routingKey);
@@ -144,10 +135,6 @@ public class QueueBindHandler implements
}
}
}
- catch (AMQException e)
- {
- throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
- }
catch (QpidSecurityException e)
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java Mon Feb 17 20:19:36 2014
@@ -29,6 +29,9 @@ import org.apache.qpid.framing.MethodReg
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -39,7 +42,6 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
@@ -96,9 +98,7 @@ public class QueueDeclareHandler impleme
}
else
{
- AMQSessionModel owningSession = queue.getExclusiveOwningSession();
- if (queue.isExclusive() && !queue.isDurable()
- && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
+ if (!queue.verifySessionAccess(channel))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
@@ -114,42 +114,15 @@ public class QueueDeclareHandler impleme
try
{
- queue = createQueue(queueName, body, virtualHost, protocolConnection);
- queue.setAuthorizationHolder(protocolConnection);
-
- if (body.getExclusive())
- {
- queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId));
- queue.setAuthorizationHolder(protocolConnection);
-
- if(!body.getDurable())
- {
- final AMQQueue q = queue;
- final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
- {
- public void doTask(AMQProtocolSession session)
- {
- q.setExclusiveOwningSession(null);
- }
- };
- protocolConnection.addSessionCloseTask(sessionCloseTask);
- queue.addQueueDeleteTask(new Action<AMQQueue>() {
- public void performAction(AMQQueue queue)
- {
- protocolConnection.removeSessionCloseTask(sessionCloseTask);
- }
- });
- }
- }
+ queue = createQueue(channel, queueName, body, virtualHost, protocolConnection);
}
catch(QueueExistsException qe)
{
queue = qe.getExistingQueue();
- AMQSessionModel owningSession = queue.getExclusiveOwningSession();
- if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
+ if (!queue.verifySessionAccess(channel))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
@@ -161,19 +134,12 @@ public class QueueDeclareHandler impleme
"Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: "
+ queue.isExclusive() + " requested " + body.getExclusive() + ")");
}
- else if (body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection)))
- {
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), "
- + "as exclusive queue with same name "
- + "declared on another client ID('"
- + queue.getOwner() + "') your clientID('" + session.getClientID() + "')");
-
- }
- else if(queue.isAutoDelete() != body.getAutoDelete())
+ else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
+ || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT)))
{
throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
- "Cannot re-declare queue '" + queue.getName() + "' with different auto-delete (was: "
- + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")");
+ "Cannot re-declare queue '" + queue.getName() + "' with different lifetime policy (was: "
+ + queue.getLifetimePolicy() + " requested autodelete: " + body.getAutoDelete() + ")");
}
else if(queue.isDurable() != body.getDurable())
{
@@ -211,7 +177,7 @@ public class QueueDeclareHandler impleme
return new AMQShortString("tmp_" + UUID.randomUUID());
}
- protected AMQQueue createQueue(final AMQShortString queueName,
+ protected AMQQueue createQueue(final AMQChannel channel, final AMQShortString queueName,
QueueDeclareBody body,
final VirtualHost virtualHost,
final AMQProtocolSession session)
@@ -222,47 +188,35 @@ public class QueueDeclareHandler impleme
final boolean autoDelete = body.getAutoDelete();
final boolean exclusive = body.getExclusive();
- String owner = exclusive ? AMQShortString.toString(session.getContextKey()) : null;
- Map<String, Object> arguments =
+ Map<String, Object> attributes =
QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments()));
- String queueNameString = AMQShortString.toString(queueName);
- final UUID id = UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName());
-
- final AMQQueue queue = virtualHost.createQueue(id, queueNameString, durable, owner, autoDelete,
- exclusive, autoDelete, arguments);
-
- if (exclusive && !durable)
+ final String queueNameString = AMQShortString.toString(queueName);
+ attributes.put(Queue.NAME, queueNameString);
+ attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()));
+ attributes.put(Queue.DURABLE, durable);
+
+ LifetimePolicy lifetimePolicy;
+ ExclusivityPolicy exclusivityPolicy;
+
+ if(exclusive)
+ {
+ lifetimePolicy = autoDelete
+ ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
+ : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
+ exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION;
+ }
+ else
{
- final AMQProtocolSession.Task deleteQueueTask =
- new AMQProtocolSession.Task()
- {
- public void doTask(AMQProtocolSession session)
- {
- if (virtualHost.getQueue(queueName.toString()) == queue)
- {
- try
- {
- virtualHost.removeQueue(queue);
- }
- catch (QpidSecurityException e)
- {
- throw new ConnectionScopedRuntimeException("Permission exception: Unable to remove a temporary queue created by a session which has now removed itself", e);
- }
- }
- }
- };
+ lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
+ exclusivityPolicy = ExclusivityPolicy.NONE;
+ }
- session.addSessionCloseTask(deleteQueueTask);
+ attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+ attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
- queue.addQueueDeleteTask(new Action<AMQQueue>()
- {
- public void performAction(AMQQueue queue)
- {
- session.removeSessionCloseTask(deleteQueueTask);
- }
- });
- }
+
+ final AMQQueue queue = virtualHost.createQueue(channel, attributes);
return queue;
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java Mon Feb 17 20:19:36 2014
@@ -105,8 +105,7 @@ public class QueueDeleteHandler implemen
}
else
{
- AMQSessionModel session = queue.getExclusiveOwningSession();
- if (queue.isExclusive() && !queue.isDurable() && (session == null || session.getConnectionModel() != protocolConnection))
+ if (!queue.verifySessionAccess(channel))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org