You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/21 02:15:31 UTC
svn commit: r1570411 - in /qpid/trunk/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/binding/
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/exchange/
broker-core/src/main/java/o...
Author: rgodfrey
Date: Fri Feb 21 01:15:30 2014
New Revision: 1570411
URL: http://svn.apache.org/r1570411
Log:
QPID-5567 : Move acl checks into the objects being created
Added:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
- copied, changed from r1569102, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
Removed:
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java Fri Feb 21 01:15:30 2014
@@ -48,6 +48,10 @@ public class Binding
_queue = queue;
_exchange = exchange;
_arguments = arguments == null ? Collections.EMPTY_MAP : Collections.unmodifiableMap(arguments);
+
+ //Perform ACLs
+ queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this);
+
}
public UUID getId()
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java Fri Feb 21 01:15:30 2014
@@ -21,6 +21,8 @@
package org.apache.qpid.server.consumer;
import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
public interface Consumer
@@ -48,7 +50,7 @@ public interface Consumer
AMQSessionModel getSessionModel();
- void setNoLocal(boolean noLocal);
+ MessageSource getMessageSource();
long getId();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Fri Feb 21 01:15:30 2014
@@ -119,6 +119,9 @@ public abstract class AbstractExchange i
_id = id;
_logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
+ // check ACL
+ host.getSecurityManager().authoriseCreateExchange(this);
+
// Log Exchange creation
CurrentActor.get().message(ExchangeMessages.CREATED(getType().getType(), name, durable));
}
@@ -624,9 +627,6 @@ public abstract class AbstractExchange i
arguments = Collections.emptyMap();
}
- //Perform ACLs
- _virtualHost.getSecurityManager().authoriseBind(AbstractExchange.this, queue, bindingKey);
-
if (id == null)
{
id = UUIDGenerator.generateBindingUUID(getName(),
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Fri Feb 21 01:15:30 2014
@@ -116,8 +116,6 @@ public class DefaultExchangeFactory impl
public Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete)
throws AMQUnknownExchangeType
{
- // Check access
- _host.getSecurityManager().authoriseCreateExchange(autoDelete, durable, exchange, null, null, null, type);
ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(type);
if (exchType == null)
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Fri Feb 21 01:15:30 2014
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1570411&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Fri Feb 21 01:15:30 2014
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.message.MessageInstance;
+
+interface QueueConsumer<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends Consumer
+{
+
+ void flushBatched();
+
+ void queueEmpty();
+
+ boolean hasInterest(E node);
+
+ boolean wouldSuspend(E entry);
+
+ void restoreCredit(E entry);
+
+ void send(E entry, boolean batch);
+
+ void queueDeleted();
+
+ SubFlushRunner getRunner();
+
+ Q getQueue();
+
+ boolean resend(E e);
+
+ public static enum State
+ {
+ ACTIVE,
+ SUSPENDED,
+ CLOSED
+ }
+
+ MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState();
+
+ QueueContext<E,Q,L> getQueueContext();
+}
Copied: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (from r1569102, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?p2=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java&p1=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java&r1=1569102&r2=1570411&rev=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Fri Feb 21 01:15:30 2014
@@ -29,6 +29,7 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
@@ -38,6 +39,7 @@ import org.apache.qpid.server.util.State
import java.text.MessageFormat;
import java.util.EnumMap;
+import java.util.EnumSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -45,17 +47,11 @@ import java.util.concurrent.locks.Reentr
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
-class QueueConsumer<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements Consumer
+class QueueConsumerImpl<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements QueueConsumer<T,E,Q,L>
{
- public static enum State
- {
- ACTIVE,
- SUSPENDED,
- CLOSED
- }
- private static final Logger _logger = Logger.getLogger(QueueConsumer.class);
+ private static final Logger _logger = Logger.getLogger(QueueConsumerImpl.class);
private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final long _id;
@@ -71,7 +67,7 @@ class QueueConsumer<T extends ConsumerTa
private final FilterManager _filters;
private final Class<? extends ServerMessage> _messageClass;
private final Object _sessionReference;
- private Q _queue;
+ private final Q _queue;
private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId())
+ "(UNKNOWN)"
+ "] ");
@@ -89,32 +85,39 @@ class QueueConsumer<T extends ConsumerTa
private final T _target;
private final SubFlushRunner _runner = new SubFlushRunner(this);
private volatile QueueContext<E,Q,L> _queueContext;
- private StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> _stateListener = new StateChangeListener<QueueConsumer<T,E,Q,L>, State>()
+ private StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> _stateListener = new StateChangeListener<QueueConsumerImpl<T,E,Q,L>, State>()
{
- public void stateChanged(QueueConsumer sub, State oldState, State newState)
+ public void stateChanged(QueueConsumerImpl sub, State oldState, State newState)
{
CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
}
};
- private boolean _noLocal;
+ private final boolean _noLocal;
- QueueConsumer(final FilterManager filters,
- final Class<? extends ServerMessage> messageClass,
- final boolean acquires,
- final boolean seesRequeues,
- final String consumerName,
- final boolean isTransient,
- T target)
+ QueueConsumerImpl(final Q queue,
+ T target, final String consumerName,
+ final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ EnumSet<Option> optionSet)
{
+
_messageClass = messageClass;
_sessionReference = target.getSessionModel().getConnectionReference();
_id = SUB_ID_GENERATOR.getAndIncrement();
_filters = filters;
- _acquires = acquires;
- _seesRequeues = seesRequeues;
+ _acquires = optionSet.contains(Option.ACQUIRES);
+ _seesRequeues = optionSet.contains(Option.SEES_REQUEUES);
_consumerName = consumerName;
- _isTransient = isTransient;
+ _isTransient = optionSet.contains(Option.TRANSIENT);
_target = target;
+ _queue = queue;
+ _noLocal = optionSet.contains(Option.NO_LOCAL);
+ setupLogging(optionSet.contains(Option.EXCLUSIVE));
+
+ // Access control
+ _queue.getVirtualHost().getSecurityManager().authoriseCreateConsumer(this);
+
+
_target.setStateListener(
new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
{
@@ -149,7 +152,7 @@ class QueueConsumer<T extends ConsumerTa
{
close();
}
- final StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> stateListener = getStateListener();
+ final StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> stateListener = getStateListener();
if(stateListener != null)
{
stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
@@ -186,6 +189,12 @@ class QueueConsumer<T extends ConsumerTa
}
@Override
+ public MessageSource getMessageSource()
+ {
+ return _queue;
+ }
+
+ @Override
public boolean isSuspended()
{
return _target.isSuspended();
@@ -211,27 +220,27 @@ class QueueConsumer<T extends ConsumerTa
}
}
- void flushBatched()
+ public void flushBatched()
{
_target.flushBatched();
}
- void queueDeleted()
+ public void queueDeleted()
{
_target.queueDeleted();
}
- boolean wouldSuspend(final MessageInstance msg)
+ public boolean wouldSuspend(final E msg)
{
return !_target.allocateCredit(msg.getMessage());
}
- void restoreCredit(final MessageInstance queueEntry)
+ public void restoreCredit(final E queueEntry)
{
_target.restoreCredit(queueEntry.getMessage());
}
- void queueEmpty()
+ public void queueEmpty()
{
_target.queueEmpty();
}
@@ -246,14 +255,8 @@ class QueueConsumer<T extends ConsumerTa
return _queue;
}
- final void setQueue(Q queue, boolean exclusive)
+ private void setupLogging(final boolean exclusive)
{
- if(getQueue() != null)
- {
- throw new IllegalStateException("Attempt to set queue for consumer " + this + " to " + queue + "when already set to " + getQueue());
- }
- _queue = queue;
-
String queueString = new QueueLogSubject(_queue).toLogString();
_logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId())
@@ -268,7 +271,7 @@ class QueueConsumer<T extends ConsumerTa
if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, _logActor.getLogSubject(), SubscriptionMessages.CREATE_LOG_HIERARCHY))
{
final String filterLogString = getFilterLogString();
- CurrentActor.get().message(_logActor.getLogSubject(), SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
+ CurrentActor.get().message(_logActor.getLogSubject(), SubscriptionMessages.CREATE(filterLogString, _queue.isDurable() && exclusive,
filterLogString.length() > 0));
}
}
@@ -290,12 +293,12 @@ class QueueConsumer<T extends ConsumerTa
getQueue().flushConsumer(this);
}
- boolean resend(final E entry)
+ public boolean resend(final E entry)
{
return getQueue().resend(entry, this);
}
- final SubFlushRunner getRunner()
+ public final SubFlushRunner getRunner()
{
return _runner;
}
@@ -305,17 +308,17 @@ class QueueConsumer<T extends ConsumerTa
return _id;
}
- public final StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> getStateListener()
+ public final StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> getStateListener()
{
return _stateListener;
}
- public final void setStateListener(StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> listener)
+ public final void setStateListener(StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> listener)
{
_stateListener = listener;
}
- final QueueContext<E,Q,L> getQueueContext()
+ public final QueueContext<E,Q,L> getQueueContext()
{
return _queueContext;
}
@@ -335,11 +338,6 @@ class QueueConsumer<T extends ConsumerTa
return getState() == State.CLOSED;
}
- public final void setNoLocal(boolean noLocal)
- {
- _noLocal = noLocal;
- }
-
public final boolean hasInterest(E entry)
{
//check that the message hasn't been rejected
@@ -415,7 +413,7 @@ class QueueConsumer<T extends ConsumerTa
return _createTime;
}
- final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState()
+ public final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState()
{
return _owningState;
}
@@ -450,7 +448,7 @@ class QueueConsumer<T extends ConsumerTa
return _deliveredCount.longValue();
}
- final void send(final E entry, final boolean batch)
+ public final void send(final E entry, final boolean batch)
{
_deliveredCount.incrementAndGet();
ServerMessage message = entry.getMessage();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri Feb 21 01:15:30 2014
@@ -18,7 +18,6 @@
*/
package org.apache.qpid.server.queue;
-import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.Principal;
import java.util.*;
@@ -192,8 +191,19 @@ abstract class SimpleAMQQueue<E extends
Map<String, Object> attributes,
QueueEntryListFactory<E, Q, L> entryListFactory)
{
+ if (virtualHost == null)
+ {
+ throw new IllegalArgumentException("Virtual Host must not be null");
+ }
+
UUID id = MapValueConverter.getUUIDAttribute(Queue.ID, attributes);
String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes);
+
+ if (name == null)
+ {
+ throw new IllegalArgumentException("Queue name must not be null");
+ }
+
boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE,attributes,false);
@@ -206,6 +216,30 @@ abstract class SimpleAMQQueue<E extends
attributes,
LifetimePolicy.PERMANENT);
+
+ _name = name;
+ _durable = durable;
+ _virtualHost = virtualHost;
+ _entries = entryListFactory.createQueueEntryList((Q) this);
+ final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes);
+
+ arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy);
+ arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy);
+
+ _arguments = Collections.synchronizedMap(arguments);
+ _description = MapValueConverter.getStringAttribute(Queue.DESCRIPTION, attributes, null);
+
+ _noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false);
+
+
+ _id = id;
+ _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
+
+ _logSubject = new QueueLogSubject(this);
+ _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
+
+ virtualHost.getSecurityManager().authoriseCreateQueue(this);
+
Subject activeSubject = Subject.getSubject(AccessController.getContext());
Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
AMQSessionModel<?,?> sessionModel;
@@ -294,36 +328,7 @@ abstract class SimpleAMQQueue<E extends
}
}
- if (name == null)
- {
- throw new IllegalArgumentException("Queue name must not be null");
- }
-
- if (virtualHost == null)
- {
- throw new IllegalArgumentException("Virtual Host must not be null");
- }
-
- _name = name;
- _durable = durable;
- _virtualHost = virtualHost;
- _entries = entryListFactory.createQueueEntryList((Q) this);
- final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes);
- arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy);
- arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy);
-
- _arguments = Collections.synchronizedMap(arguments);
- _description = MapValueConverter.getStringAttribute(Queue.DESCRIPTION, attributes, null);
-
- _noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false);
-
-
- _id = id;
- _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
-
- _logSubject = new QueueLogSubject(this);
- _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE))
@@ -593,40 +598,38 @@ abstract class SimpleAMQQueue<E extends
ConsumerAccessRefused
{
- // Access control
- getVirtualHost().getSecurityManager().authoriseConsume(this);
-
if (hasExclusiveConsumer())
{
throw new ExistingExclusiveConsumer();
}
+ Object exclusiveOwner = _exclusiveOwner;
switch(_exclusivityPolicy)
{
case CONNECTION:
- if(_exclusiveOwner == null)
+ if(exclusiveOwner == null)
{
- _exclusiveOwner = target.getSessionModel().getConnectionModel();
+ exclusiveOwner = target.getSessionModel().getConnectionModel();
addExclusivityConstraint(target.getSessionModel().getConnectionModel());
}
else
{
- if(_exclusiveOwner != target.getSessionModel().getConnectionModel())
+ if(exclusiveOwner != target.getSessionModel().getConnectionModel())
{
throw new ConsumerAccessRefused();
}
}
break;
case SESSION:
- if(_exclusiveOwner == null)
+ if(exclusiveOwner == null)
{
- _exclusiveOwner = target.getSessionModel();
+ exclusiveOwner = target.getSessionModel();
addExclusivityConstraint(target.getSessionModel());
}
else
{
- if(_exclusiveOwner != target.getSessionModel())
+ if(exclusiveOwner != target.getSessionModel())
{
throw new ConsumerAccessRefused();
}
@@ -639,26 +642,26 @@ abstract class SimpleAMQQueue<E extends
}
break;
case PRINCIPAL:
- if(_exclusiveOwner == null)
+ if(exclusiveOwner == null)
{
- _exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+ exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
}
else
{
- if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal()))
+ if(!exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal()))
{
throw new ConsumerAccessRefused();
}
}
break;
case CONTAINER:
- if(_exclusiveOwner == null)
+ if(exclusiveOwner == null)
{
- _exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName();
+ exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName();
}
else
{
- if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName()))
+ if(!exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName()))
{
throw new ConsumerAccessRefused();
}
@@ -673,15 +676,24 @@ abstract class SimpleAMQQueue<E extends
boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE);
boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT);
+ if(_noLocal && !optionSet.contains(Consumer.Option.NO_LOCAL))
+ {
+ optionSet = EnumSet.copyOf(optionSet);
+ optionSet.add(Consumer.Option.NO_LOCAL);
+ }
+
if(exclusive && getConsumerCount() != 0)
{
throw new ExistingConsumerPreventsExclusive();
}
- QueueConsumer<T,E,Q,L> consumer = new QueueConsumer<T,E,Q,L>(filters, messageClass,
- optionSet.contains(Consumer.Option.ACQUIRES),
- optionSet.contains(Consumer.Option.SEES_REQUEUES),
- consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target);
+ QueueConsumerImpl<T,E,Q,L> consumer = new QueueConsumerImpl<T,E,Q,L>((Q)this,
+ target,
+ consumerName,
+ filters, messageClass,
+ optionSet);
+
+ _exclusiveOwner = exclusiveOwner;
target.consumerAdded(consumer);
@@ -700,12 +712,6 @@ abstract class SimpleAMQQueue<E extends
if (!isDeleted())
{
- consumer.setQueue((Q)this, exclusive);
- if(_noLocal)
- {
- consumer.setNoLocal(true);
- }
-
synchronized (_consumerListeners)
{
for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
@@ -732,7 +738,7 @@ abstract class SimpleAMQQueue<E extends
}
- synchronized void unregisterConsumer(final QueueConsumer<?,E,Q,L> consumer)
+ synchronized void unregisterConsumer(final QueueConsumerImpl<?,E,Q,L> consumer)
{
if (consumer == null)
{
@@ -1021,7 +1027,7 @@ abstract class SimpleAMQQueue<E extends
{
return true;
}
- QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry);
+ QueueConsumer<?,E,Q,L> assigned = _messageGroupManager.getAssignedConsumer(entry);
return (assigned == null) || (assigned == sub);
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Fri Feb 21 01:15:30 2014
@@ -22,7 +22,6 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Fri Feb 21 01:15:30 2014
@@ -36,7 +36,7 @@ class SubFlushRunner implements Runnable
private static final Logger _logger = Logger.getLogger(SubFlushRunner.class);
- private final QueueConsumer _sub;
+ private final QueueConsumerImpl _sub;
private static int IDLE = 0;
private static int SCHEDULED = 1;
@@ -49,7 +49,7 @@ class SubFlushRunner implements Runnable
private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
private final AtomicBoolean _stateChange = new AtomicBoolean();
- public SubFlushRunner(QueueConsumer sub)
+ public SubFlushRunner(QueueConsumerImpl sub)
{
_sub = sub;
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java Fri Feb 21 01:15:30 2014
@@ -20,15 +20,14 @@ package org.apache.qpid.server.security;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.model.AccessControlProvider;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.ConfigurationChangeListener;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.*;
import org.apache.qpid.server.plugin.AccessControlFactory;
import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.access.FileAccessControlProviderConstants;
import org.apache.qpid.server.security.access.ObjectProperties;
@@ -253,20 +252,24 @@ public class SecurityManager implements
return true;
}
- public void authoriseBind(final Exchange exch, final AMQQueue queue, final String routingKey)
+ public void authoriseCreateBinding(Binding binding)
{
+ final Exchange exch = binding.getExchange();
+ final AMQQueue queue = binding.getQueue();
+ final String bindingKey = binding.getBindingKey();
+
boolean allowed =
checkAllPlugins(new AccessCheck()
{
Result allowed(AccessControl plugin)
{
- return plugin.authorise(BIND, EXCHANGE, new ObjectProperties(exch, queue, routingKey));
+ return plugin.authorise(BIND, EXCHANGE, new ObjectProperties(exch, queue, bindingKey));
}
});
if(!allowed)
{
- throw new AccessControlException("Permission denied: binding " + routingKey);
+ throw new AccessControlException("Permission denied: binding " + bindingKey);
}
}
@@ -306,7 +309,7 @@ public class SecurityManager implements
}
}
- public void accessVirtualhost(final String vhostname)
+ public void authoriseCreateConnection(final AMQConnectionModel connection)
{
if(!checkAllPlugins(new AccessCheck()
{
@@ -316,12 +319,15 @@ public class SecurityManager implements
}
}))
{
- throw new AccessControlException("Permission denied: " + vhostname);
+ throw new AccessControlException("Permission denied: " + connection.getVirtualHostName());
}
}
- public void authoriseConsume(final AMQQueue queue)
+ public void authoriseCreateConsumer(final Consumer consumer)
{
+ // TODO
+ final AMQQueue queue = (AMQQueue) consumer.getMessageSource();
+
if(!checkAllPlugins(new AccessCheck()
{
Result allowed(AccessControl plugin)
@@ -334,20 +340,17 @@ public class SecurityManager implements
}
}
- public void authoriseCreateExchange(final Boolean autoDelete,
- final Boolean durable,
- final String exchangeName,
- final Boolean internal,
- final Boolean nowait,
- final Boolean passive,
- final String exchangeType)
+ public void authoriseCreateExchange(final Exchange exchange)
{
+ final String exchangeName = exchange.getName();
if(!checkAllPlugins(new AccessCheck()
{
Result allowed(AccessControl plugin)
{
- return plugin.authorise(CREATE, EXCHANGE, new ObjectProperties(autoDelete, durable, exchangeName,
- internal, nowait, passive, exchangeType));
+ return plugin.authorise(CREATE, EXCHANGE, new ObjectProperties(exchange.isAutoDelete(),
+ exchange.isDurable(),
+ exchangeName,
+ exchange.getTypeName()));
}
}))
{
@@ -355,14 +358,18 @@ public class SecurityManager implements
}
}
- public void authoriseCreateQueue(final Boolean autoDelete, final Boolean durable, final Boolean exclusive,
- final Boolean nowait, final Boolean passive, final String queueName, final String owner)
+ public void authoriseCreateQueue(final AMQQueue queue)
{
+ final String queueName = queue.getName();
if(! checkAllPlugins(new AccessCheck()
{
Result allowed(AccessControl plugin)
{
- return plugin.authorise(CREATE, QUEUE, new ObjectProperties(autoDelete, durable, exclusive, nowait, passive, queueName, owner));
+ return plugin.authorise(CREATE, QUEUE, new ObjectProperties(queue.getAttribute(Queue.LIFETIME_POLICY) != LifetimePolicy.PERMANENT,
+ Boolean.TRUE.equals(queue.getAttribute(Queue.DURABLE)),
+ queue.getAttribute(Queue.EXCLUSIVE) != ExclusivityPolicy.NONE,
+ queueName,
+ queue.getOwner()));
}
}))
{
@@ -370,6 +377,7 @@ public class SecurityManager implements
}
}
+
public void authoriseDelete(final AMQQueue queue)
{
if(!checkAllPlugins(new AccessCheck()
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java Fri Feb 21 01:15:30 2014
@@ -182,7 +182,7 @@ public class ObjectProperties
}
public ObjectProperties(Boolean autoDelete, Boolean durable, String exchangeName,
- Boolean internal, Boolean nowait, Boolean passive, String exchangeType)
+ String exchangeType)
{
super();
@@ -191,14 +191,11 @@ public class ObjectProperties
put(Property.AUTO_DELETE, autoDelete);
put(Property.TEMPORARY, autoDelete);
put(Property.DURABLE, durable);
- put(Property.INTERNAL, internal);
- put(Property.NO_WAIT, nowait);
- put(Property.PASSIVE, passive);
put(Property.TYPE, exchangeType);
}
- public ObjectProperties(Boolean autoDelete, Boolean durable, Boolean exclusive, Boolean nowait, Boolean passive,
- String queueName, String owner)
+ public ObjectProperties(Boolean autoDelete, Boolean durable, Boolean exclusive,
+ String queueName, String owner)
{
super();
@@ -208,8 +205,6 @@ public class ObjectProperties
put(Property.TEMPORARY, autoDelete);
put(Property.DURABLE, durable);
put(Property.EXCLUSIVE, exclusive);
- put(Property.NO_WAIT, nowait);
- put(Property.PASSIVE, passive);
put(Property.OWNER, owner);
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Fri Feb 21 01:15:30 2014
@@ -550,15 +550,6 @@ public abstract class AbstractVirtualHos
ExclusivityPolicy exclusive = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,Queue.EXCLUSIVE, attributes, ExclusivityPolicy.NONE);
String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
- // Access check
- getSecurityManager().authoriseCreateQueue(autoDelete,
- durable,
- exclusive != null && exclusive != ExclusivityPolicy.NONE,
- null,
- null,
- queueName,
- owner);
-
synchronized (_queueRegistry)
{
if(_queueRegistry.getQueue(queueName) != null)
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Fri Feb 21 01:15:30 2014
@@ -25,12 +25,17 @@ import junit.framework.TestCase;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.security.*;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
*/
public class HeadersBindingTest extends TestCase
@@ -73,7 +78,7 @@ public class HeadersBindingTest extends
public String getEncoding()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public byte getPriority()
@@ -131,12 +136,15 @@ public class HeadersBindingTest extends
private Map<String,Object> bindHeaders = new HashMap<String,Object>();
private MockHeader matchHeaders = new MockHeader();
private int _count = 0;
- private MockAMQQueue _queue;
+ private AMQQueue _queue;
protected void setUp()
{
_count++;
- _queue = new MockAMQQueue(getQueueName());
+ _queue = mock(AMQQueue.class);
+ VirtualHost vhost = mock(VirtualHost.class);
+ when(_queue.getVirtualHost()).thenReturn(vhost);
+ when(vhost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
}
protected String getQueueName()
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java Fri Feb 21 01:15:30 2014
@@ -22,10 +22,12 @@ package org.apache.qpid.server.logging.s
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* Validate BindingLogSubjects are logged as expected
*/
@@ -45,8 +47,9 @@ public class BindingLogSubjectTest exten
_testVhost = BrokerTestHelper.createVirtualHost("test");
_routingKey = "RoutingKey";
_exchange = _testVhost.getExchange("amq.direct");
- _queue = new MockAMQQueue("BindingLogSubjectTest");
- ((MockAMQQueue) _queue).setVirtualHost(_testVhost);
+ _queue = mock(AMQQueue.class);
+ when(_queue.getName()).thenReturn("BindingLogSubjectTest");
+ when(_queue.getVirtualHost()).thenReturn(_testVhost);
_subject = new BindingLogSubject(_routingKey, _exchange, _queue);
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java Fri Feb 21 01:15:30 2014
@@ -21,10 +21,12 @@
package org.apache.qpid.server.logging.subjects;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* Validate QueueLogSubjects are logged as expected
*/
@@ -41,8 +43,9 @@ public class QueueLogSubjectTest extends
_testVhost = BrokerTestHelper.createVirtualHost("test");
- _queue = new MockAMQQueue("QueueLogSubjectTest");
- ((MockAMQQueue) _queue).setVirtualHost(_testVhost);
+ _queue = mock(AMQQueue.class);
+ when(_queue.getName()).thenReturn("QueueLogSubjectTest");
+ when(_queue.getVirtualHost()).thenReturn(_testVhost);
_subject = new QueueLogSubject(_queue);
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java Fri Feb 21 01:15:30 2014
@@ -43,6 +43,8 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.security.*;
+import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -66,6 +68,7 @@ public class AMQQueueFactoryTest extends
_queues = new ArrayList<AMQQueue>();
_virtualHost = mock(VirtualHost.class);
+ when(_virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class));
VirtualHostConfiguration vhostConfig = mock(VirtualHostConfiguration.class);
when(_virtualHost.getConfiguration()).thenReturn(vhostConfig);
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java Fri Feb 21 01:15:30 2014
@@ -27,6 +27,8 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.security.*;
+import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.HashMap;
@@ -52,7 +54,10 @@ public class ConflationQueueListTest ext
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.LVQ_KEY, CONFLATION_KEY);
- _queue = new ConflationQueue(mock(VirtualHost.class), queueAttributes);
+ final VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class));
+
+ _queue = new ConflationQueue(virtualHost, queueAttributes);
_list = _queue.getEntries();
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java Fri Feb 21 01:15:30 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -55,9 +56,11 @@ public class ConsumerListTest extends Qp
private QueueConsumer newMockConsumer()
{
- ConsumerTarget target = mock(ConsumerTarget.class);
- when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class));
- return new QueueConsumer(null,null,true,true,"sub",false,target);
+ QueueConsumer consumer = mock(QueueConsumer.class);
+ MessageInstance.ConsumerAcquiredState owningState = new QueueEntryImpl.ConsumerAcquiredState(consumer);
+ when(consumer.getOwningState()).thenReturn(owningState);
+
+ return consumer;
}
/**
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java Fri Feb 21 01:15:30 2014
@@ -27,6 +27,8 @@ import org.apache.qpid.server.message.AM
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
+
+import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -51,7 +53,9 @@ public class PriorityQueueListTest exten
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.PRIORITIES, 10);
- PriorityQueue queue = new PriorityQueue(mock(VirtualHost.class), queueAttributes);
+ final VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class));
+ PriorityQueue queue = new PriorityQueue(virtualHost, queueAttributes);
_list = queue.getEntries();
for (int i = 0; i < PRIORITIES.length; i++)
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Fri Feb 21 01:15:30 2014
@@ -24,11 +24,13 @@ import org.apache.qpid.server.consumer.C
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageInstance.EntryState;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.security.*;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.lang.reflect.Field;
@@ -48,6 +50,7 @@ public abstract class QueueEntryImplTest
protected QueueEntryImpl _queueEntry;
protected QueueEntryImpl _queueEntry2;
protected QueueEntryImpl _queueEntry3;
+ private long _consumerId;
public abstract QueueEntryImpl getQueueEntryImpl(int msgId);
@@ -136,9 +139,11 @@ public abstract class QueueEntryImplTest
private QueueConsumer newConsumer()
{
- final ConsumerTarget target = mock(ConsumerTarget.class);
- when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class));
- final QueueConsumer consumer = new QueueConsumer(null,null,true,true,"mock",false,target);
+ final QueueConsumer consumer = mock(QueueConsumer.class);
+
+ MessageInstance.ConsumerAcquiredState owningState = new QueueEntryImpl.ConsumerAcquiredState(consumer);
+ when(consumer.getOwningState()).thenReturn(owningState);
+ when(consumer.getId()).thenReturn(_consumerId++);
return consumer;
}
@@ -204,7 +209,10 @@ public abstract class QueueEntryImplTest
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
- StandardQueue queue = new StandardQueue(mock(VirtualHost.class), queueAttributes);
+ final VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
+
+ StandardQueue queue = new StandardQueue(virtualHost, queueAttributes);
OrderedQueueEntryList queueEntryList = queue.getEntries();
// create test entries
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java Fri Feb 21 01:15:30 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.security.*;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.HashMap;
@@ -44,7 +45,10 @@ public class SimpleQueueEntryImplTest ex
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, "SimpleQueueEntryImplTest");
- StandardQueue queue = new StandardQueue(mock(VirtualHost.class), queueAttributes);
+ final VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
+
+ StandardQueue queue = new StandardQueue(virtualHost, queueAttributes);
queueEntryList = queue.getEntries();
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java Fri Feb 21 01:15:30 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.security.*;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Arrays;
@@ -86,7 +87,10 @@ public class SortedQueueEntryListTest ex
attributes.put(Queue.SORT_KEY, "KEY");
// Create test list
- _testQueue = new SortedQueue(mock(VirtualHost.class), attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
+ final VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
+
+ _testQueue = new SortedQueue(virtualHost, attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
{
@Override
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java Fri Feb 21 01:15:30 2014
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.security.*;
import org.apache.qpid.server.virtualhost.VirtualHost;
import static org.mockito.Matchers.eq;
@@ -54,7 +55,10 @@ public class SortedQueueEntryTest extend
attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
attributes.put(Queue.SORT_KEY, "KEY");
- SortedQueue queue = new SortedQueue(mock(VirtualHost.class), attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
+ final VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
+
+ SortedQueue queue = new SortedQueue(virtualHost, attributes, new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
{
@Override
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java Fri Feb 21 01:15:30 2014
@@ -20,9 +20,14 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.security.*;
+import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.HashMap;
@@ -50,7 +55,12 @@ public class StandardQueueEntryListTest
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
- _testQueue = new StandardQueue(mock(VirtualHost.class), queueAttributes);
+ final VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
+ final LogActor logActor = mock(LogActor.class);
+ when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
+ CurrentActor.set(logActor);
+ _testQueue = new StandardQueue(virtualHost, queueAttributes);
_sqel = _testQueue.getEntries();
for(int i = 1; i <= 100; i++)
@@ -93,7 +103,9 @@ public class StandardQueueEntryListTest
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
- StandardQueue queue = new StandardQueue(mock(VirtualHost.class), queueAttributes);
+ final VirtualHost virtualHost = mock(VirtualHost.class);
+ when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class));
+ StandardQueue queue = new StandardQueue(virtualHost, queueAttributes);
return queue.getEntries();
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java Fri Feb 21 01:15:30 2014
@@ -47,6 +47,7 @@ import org.apache.qpid.server.model.Queu
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.Transaction.Record;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -103,6 +104,7 @@ public abstract class AbstractDurableCon
_queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class);
_dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class);
_virtualHost = mock(VirtualHost.class);
+ when(_virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
@@ -362,6 +364,9 @@ public abstract class AbstractDurableCon
when(queue.isExclusive()).thenReturn(exclusive);
when(queue.getId()).thenReturn(_queueId);
when(queue.getAlternateExchange()).thenReturn(alternateExchange);
+ final org.apache.qpid.server.virtualhost.VirtualHost vh = mock(org.apache.qpid.server.virtualhost.VirtualHost.class);
+ when(vh.getSecurityManager()).thenReturn(mock(SecurityManager.class));
+ when(queue.getVirtualHost()).thenReturn(vh);
final Map<String,Object> attributes = arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments);
attributes.put(Queue.NAME, queueName);
if(exclusive)
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java Fri Feb 21 01:15:30 2014
@@ -22,8 +22,7 @@ package org.apache.qpid.server.txn;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.MockMessageInstance;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -34,6 +33,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* A unit test ensuring that AutoCommitTransaction creates a separate transaction for
* each dequeue/enqueue operation that involves enlistable messages. Verifies
@@ -46,8 +48,8 @@ public class AutoCommitTransactionTest e
private ServerTransaction _transaction = null; // Class under test
private MessageStore _transactionLog;
- private AMQQueue _queue;
- private List<AMQQueue> _queues;
+ private BaseQueue _queue;
+ private List<BaseQueue> _queues;
private Collection<MessageInstance> _queueEntries;
private ServerMessage _message;
private MockAction _action;
@@ -382,7 +384,7 @@ public class AutoCommitTransactionTest e
for(int i = 0; i < queueDurableFlags.length; i++)
{
- final AMQQueue queue = createTestAMQQueue(queueDurableFlags[i]);
+ final BaseQueue queue = createTestAMQQueue(queueDurableFlags[i]);
final ServerMessage message = createTestMessage(messagePersistentFlags[i]);
queueEntries.add(new MockMessageInstance()
@@ -411,9 +413,9 @@ public class AutoCommitTransactionTest e
return new MockStoreTransaction(throwException);
}
- private List<AMQQueue> createTestBaseQueues(boolean[] durableFlags)
+ private List<BaseQueue> createTestBaseQueues(boolean[] durableFlags)
{
- List<AMQQueue> queues = new ArrayList<AMQQueue>();
+ List<BaseQueue> queues = new ArrayList<BaseQueue>();
for (boolean b: durableFlags)
{
queues.add(createTestAMQQueue(b));
@@ -422,17 +424,11 @@ public class AutoCommitTransactionTest e
return queues;
}
- private AMQQueue createTestAMQQueue(final boolean durable)
+ private BaseQueue createTestAMQQueue(final boolean durable)
{
- return new MockAMQQueue("mockQueue")
- {
- @Override
- public boolean isDurable()
- {
- return durable;
- }
-
- };
+ BaseQueue queue = mock(BaseQueue.class);
+ when(queue.isDurable()).thenReturn(durable);
+ return queue;
}
private ServerMessage createTestMessage(final boolean persistent)
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java Fri Feb 21 01:15:30 2014
@@ -22,8 +22,7 @@ package org.apache.qpid.server.txn;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.MockMessageInstance;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -34,6 +33,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* A unit test ensuring that LocalTransactionTest creates a long-lived store transaction
* that spans many dequeue/enqueue operations of enlistable messages. Verifies
@@ -45,8 +47,8 @@ public class LocalTransactionTest extend
{
private ServerTransaction _transaction = null; // Class under test
- private AMQQueue _queue;
- private List<AMQQueue> _queues;
+ private BaseQueue _queue;
+ private List<BaseQueue> _queues;
private Collection<MessageInstance> _queueEntries;
private ServerMessage _message;
private MockAction _action1;
@@ -77,7 +79,7 @@ public class LocalTransactionTest extend
public void testEnqueueToNonDurableQueueOfNonPersistentMessage() throws Exception
{
_message = createTestMessage(false);
- _queue = createTestAMQQueue(false);
+ _queue = createQueue(false);
_transaction.enqueue(_queue, _message, _action1);
@@ -93,7 +95,7 @@ public class LocalTransactionTest extend
public void testEnqueueToDurableQueueOfPersistentMessage() throws Exception
{
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
_transaction.enqueue(_queue, _message, _action1);
@@ -109,7 +111,7 @@ public class LocalTransactionTest extend
public void testStoreEnqueueCausesException() throws Exception
{
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
_storeTransaction = createTestStoreTransaction(true);
_transactionLog = MockStoreTransaction.createTestTransactionLog(_storeTransaction);
@@ -217,7 +219,7 @@ public class LocalTransactionTest extend
public void testDequeueFromNonDurableQueueOfNonPersistentMessage() throws Exception
{
_message = createTestMessage(false);
- _queue = createTestAMQQueue(false);
+ _queue = createQueue(false);
_transaction.dequeue(_queue, _message, _action1);
@@ -234,7 +236,7 @@ public class LocalTransactionTest extend
public void testDequeueFromDurableQueueOfPersistentMessage() throws Exception
{
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
_transaction.dequeue(_queue, _message, _action1);
@@ -250,7 +252,7 @@ public class LocalTransactionTest extend
public void testStoreDequeueCausesException() throws Exception
{
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
_storeTransaction = createTestStoreTransaction(true);
_transactionLog = MockStoreTransaction.createTestTransactionLog(_storeTransaction);
@@ -396,7 +398,7 @@ public class LocalTransactionTest extend
{
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
assertFalse("Post commit action must not be fired yet", _action1.isPostCommitActionFired());
@@ -419,7 +421,7 @@ public class LocalTransactionTest extend
{
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -445,7 +447,7 @@ public class LocalTransactionTest extend
{
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
_transaction.addPostTransactionAction(_action1);
_transaction.dequeue(_queue, _message, _action2);
@@ -467,7 +469,7 @@ public class LocalTransactionTest extend
public void testRollbackWorkWithAdditionalPostAction() throws Exception
{
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
_transaction.addPostTransactionAction(_action1);
_transaction.dequeue(_queue, _message, _action2);
@@ -488,7 +490,7 @@ public class LocalTransactionTest extend
assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
long startTime = System.currentTimeMillis();
_transaction.enqueue(_queue, _message, _action1);
@@ -503,7 +505,7 @@ public class LocalTransactionTest extend
assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
_transaction.enqueue(_queue, _message, _action1);
@@ -526,7 +528,7 @@ public class LocalTransactionTest extend
assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
long startTime = System.currentTimeMillis();
_transaction.dequeue(_queue, _message, _action1);
@@ -541,7 +543,7 @@ public class LocalTransactionTest extend
assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
_transaction.enqueue(_queue, _message, _action1);
@@ -564,7 +566,7 @@ public class LocalTransactionTest extend
assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
long startTime = System.currentTimeMillis();
_transaction.enqueue(_queue, _message, _action1);
@@ -584,7 +586,7 @@ public class LocalTransactionTest extend
assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
_message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
+ _queue = createQueue(true);
long startTime = System.currentTimeMillis();
_transaction.enqueue(_queue, _message, _action1);
@@ -606,7 +608,7 @@ public class LocalTransactionTest extend
for(int i = 0; i < queueDurableFlags.length; i++)
{
- final AMQQueue queue = createTestAMQQueue(queueDurableFlags[i]);
+ final TransactionLogResource queue = createQueue(queueDurableFlags[i]);
final ServerMessage message = createTestMessage(messagePersistentFlags[i]);
queueEntries.add(new MockMessageInstance()
@@ -635,28 +637,22 @@ public class LocalTransactionTest extend
return new MockStoreTransaction(throwException);
}
- private List<AMQQueue> createTestBaseQueues(boolean[] durableFlags)
+ private List<BaseQueue> createTestBaseQueues(boolean[] durableFlags)
{
- List<AMQQueue> queues = new ArrayList<AMQQueue>();
+ List<BaseQueue> queues = new ArrayList<BaseQueue>();
for (boolean b: durableFlags)
{
- queues.add(createTestAMQQueue(b));
+ queues.add(createQueue(b));
}
return queues;
}
- private AMQQueue createTestAMQQueue(final boolean durable)
+ private BaseQueue createQueue(final boolean durable)
{
- return new MockAMQQueue("mockQueue")
- {
- @Override
- public boolean isDurable()
- {
- return durable;
- }
-
- };
+ BaseQueue queue = mock(BaseQueue.class);
+ when(queue.isDurable()).thenReturn(durable);
+ return queue;
}
private ServerMessage createTestMessage(final boolean persistent)
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Fri Feb 21 01:15:30 2014
@@ -197,7 +197,7 @@ public class ServerConnectionDelegate ex
try
{
- vhost.getSecurityManager().accessVirtualhost(vhostName);
+ vhost.getSecurityManager().authoriseCreateConnection(sconn);
}
catch (AccessControlException e)
{
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java Fri Feb 21 01:15:30 2014
@@ -83,7 +83,7 @@ public class ConnectionOpenMethodHandler
// Check virtualhost access
try
{
- virtualHost.getSecurityManager().accessVirtualhost(virtualHostName);
+ virtualHost.getSecurityManager().authoriseCreateConnection(session);
}
catch (AccessControlException e)
{
Modified: qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1570411&r1=1570410&r2=1570411&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Fri Feb 21 01:15:30 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.managemen
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.util.StateChangeListener;
@@ -88,8 +89,9 @@ class ManagementNodeConsumer implements
}
@Override
- public void setNoLocal(final boolean noLocal)
+ public MessageSource getMessageSource()
{
+ return _managementNode;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org