You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/03 03:15:01 UTC
svn commit: r1563758 [1/3] - in
/qpid/branches/java-broker-amqp-1-0-management/java:
broker-core/src/main/java/org/apache/qpid/server/exchange/
broker-core/src/main/java/org/apache/qpid/server/model/adapter/
broker-core/src/main/java/org/apache/qpid/se...
Author: rgodfrey
Date: Mon Feb 3 02:14:59 2014
New Revision: 1563758
URL: http://svn.apache.org/r1563758
Log:
Updates to subscription
Added:
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
- copied, changed from r1563433, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
- copied, changed from r1563431, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
Removed:
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
Modified:
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java
qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Mon Feb 3 02:14:59 2014
@@ -39,8 +39,10 @@ import org.apache.qpid.server.model.UUID
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -66,7 +68,7 @@ public abstract class AbstractExchange i
private VirtualHost _virtualHost;
- private final List<Task> _closeTaskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<Exchange>> _closeTaskList = new CopyOnWriteArrayList<Action<Exchange>>();
/**
* Whether the exchange is automatically deleted once all queues have detached from it
@@ -138,6 +140,12 @@ public abstract class AbstractExchange i
if(_closed.compareAndSet(false,true))
{
+ List<Binding> bindings = new ArrayList<Binding>(_bindings);
+ for(Binding binding : bindings)
+ {
+ removeBinding(binding);
+ }
+
if(_alternateExchange != null)
{
_alternateExchange.removeReference(this);
@@ -145,9 +153,9 @@ public abstract class AbstractExchange i
CurrentActor.get().message(_logSubject, ExchangeMessages.DELETED());
- for(Task task : _closeTaskList)
+ for(Action<Exchange> task : _closeTaskList)
{
- task.onClose(this);
+ task.performAction(this);
}
_closeTaskList.clear();
}
@@ -300,12 +308,12 @@ public abstract class AbstractExchange i
return !_referrers.isEmpty();
}
- public void addCloseTask(final Task task)
+ public void addCloseTask(final Action<Exchange> task)
{
_closeTaskList.add(task);
}
- public void removeCloseTask(final Task task)
+ public void removeCloseTask(final Action<Exchange> task)
{
_closeTaskList.remove(task);
}
@@ -421,7 +429,7 @@ public abstract class AbstractExchange i
public final int send(final ServerMessage message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final BaseQueue.PostEnqueueAction postEnqueueAction)
+ final Action<QueueEntry> postEnqueueAction)
{
List<? extends BaseQueue> queues = route(message, instanceProperties);
@@ -579,8 +587,6 @@ public abstract class AbstractExchange i
{
doRemoveBinding(b);
queue.removeBinding(b);
- removeCloseTask(b);
- queue.removeQueueDeleteTask(b);
if (b.isDurable())
{
@@ -659,8 +665,6 @@ public abstract class AbstractExchange i
DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b);
}
- queue.addQueueDeleteTask(b);
- addCloseTask(b);
queue.addBinding(b);
doAddBinding(b);
b.logCreation();
@@ -673,7 +677,7 @@ public abstract class AbstractExchange i
}
}
- private final class BindingImpl extends Binding implements AMQQueue.Task, Task
+ private final class BindingImpl extends Binding
{
private final BindingLogSubject _logSubject;
//TODO : persist creation time
@@ -689,12 +693,6 @@ public abstract class AbstractExchange i
}
-
- public void doTask(final AMQQueue queue) throws AMQException
- {
- removeBinding(this);
- }
-
public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException
{
removeBinding(this);
@@ -729,11 +727,4 @@ public abstract class AbstractExchange i
}
- public static interface Task
- {
- public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException;
- }
-
-
-
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Mon Feb 3 02:14:59 2014
@@ -42,8 +42,10 @@ import org.apache.qpid.server.model.UUID
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class DefaultExchange implements Exchange
@@ -334,7 +336,7 @@ public class DefaultExchange implements
public final int send(final ServerMessage message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final BaseQueue.PostEnqueueAction postEnqueueAction)
+ final Action<QueueEntry> postEnqueueAction)
{
final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
if(q == null)
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java Mon Feb 3 02:14:59 2014
@@ -29,7 +29,9 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -105,7 +107,7 @@ public interface Exchange extends Exchan
int send(ServerMessage message,
InstanceProperties instanceProperties,
ServerTransaction txn,
- BaseQueue.PostEnqueueAction postEnqueueAction);
+ Action<QueueEntry> postEnqueueAction);
/**
* Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java Mon Feb 3 02:14:59 2014
@@ -47,7 +47,7 @@ public class ConsumerAdapter extends Abs
queueAdapter.getName(),
subscription.getSessionModel().getConnectionModel().getRemoteAddressString(),
String.valueOf(subscription.getSessionModel().getChannelId()),
- subscription.getConsumerName()), queueAdapter.getTaskExecutor());
+ subscription.getName()), queueAdapter.getTaskExecutor());
_subscription = subscription;
_queue = queueAdapter;
_session = sessionAdapter;
@@ -57,7 +57,7 @@ public class ConsumerAdapter extends Abs
public String getName()
{
- return _subscription.getConsumerName();
+ return _subscription.getName();
}
public String setName(final String currentName, final String desiredName)
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Feb 3 02:14:59 2014
@@ -30,6 +30,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -132,8 +133,8 @@ public interface AMQQueue extends Compar
boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
- void addQueueDeleteTask(final Task task);
- void removeQueueDeleteTask(final Task task);
+ void addQueueDeleteTask(Action<AMQQueue> task);
+ void removeQueueDeleteTask(Action<AMQQueue> task);
@@ -271,11 +272,6 @@ public interface AMQQueue extends Compar
}
}
- static interface Task
- {
- public void doTask(AMQQueue queue) throws AMQException;
- }
-
void configure(QueueConfiguration config);
void setExclusive(boolean exclusive);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Mon Feb 3 02:14:59 2014
@@ -24,17 +24,12 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.util.Action;
public interface BaseQueue extends TransactionLogResource
{
- public static interface PostEnqueueAction
- {
- public void onEnqueue(QueueEntry entry);
- }
-
void enqueue(ServerMessage message) throws AMQException;
- void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException;
- void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException;
+ void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException;
boolean isDurable();
boolean isDeleted();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Mon Feb 3 02:14:59 2014
@@ -25,6 +25,8 @@ import org.apache.qpid.server.filter.Fil
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
{
@@ -42,11 +44,6 @@ public interface QueueEntry extends Mess
}
- public static interface StateChangeListener
- {
- public void stateChanged(QueueEntry entry, State oldSate, State newState);
- }
-
public abstract class EntryState
{
private EntryState()
@@ -198,7 +195,7 @@ public interface QueueEntry extends Mess
boolean isRejectedBy(long subscriptionId);
- int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn);
+ int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn);
boolean isQueueDeleted();
@@ -206,8 +203,8 @@ public interface QueueEntry extends Mess
QueueEntry getNextValidEntry();
- void addStateChangeListener(StateChangeListener listener);
- boolean removeStateChangeListener(StateChangeListener listener);
+ void addStateChangeListener(StateChangeListener<QueueEntry, State> listener);
+ boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener);
/**
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Feb 3 02:14:59 2014
@@ -31,6 +31,8 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
import java.util.EnumMap;
import java.util.HashSet;
@@ -59,7 +61,7 @@ public abstract class QueueEntryImpl imp
(QueueEntryImpl.class, EntryState.class, "_state");
- private volatile Set<StateChangeListener> _stateChangeListeners;
+ private volatile Set<StateChangeListener<QueueEntry, State>> _stateChangeListeners;
private static final
AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -231,11 +233,6 @@ public abstract class QueueEntryImpl imp
if(state instanceof SubscriptionAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
- Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription();
- if (subscription != null)
- {
- subscription.releaseQueueEntry(this);
- }
}
if(!getQueue().isDeleted())
@@ -320,8 +317,6 @@ public abstract class QueueEntryImpl imp
if (state instanceof SubscriptionAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
- s = ((SubscriptionAcquiredState) state).getSubscription();
- s.onDequeue(this);
}
getQueue().dequeue(this,s);
@@ -336,7 +331,7 @@ public abstract class QueueEntryImpl imp
private void notifyStateChange(final State oldState, final State newState)
{
- for(StateChangeListener l : _stateChangeListeners)
+ for(StateChangeListener<QueueEntry, State> l : _stateChangeListeners)
{
l.stateChanged(this, oldState, newState);
}
@@ -367,7 +362,7 @@ public abstract class QueueEntryImpl imp
dispose();
}
- public int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn)
+ public int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
Exchange alternateExchange = currentQueue.getAlternateExchange();
@@ -412,21 +407,21 @@ public abstract class QueueEntryImpl imp
return getQueue().isDeleted();
}
- public void addStateChangeListener(StateChangeListener listener)
+ public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener)
{
- Set<StateChangeListener> listeners = _stateChangeListeners;
+ Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners;
if(listeners == null)
{
- _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
+ _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<QueueEntry, State>>());
listeners = _stateChangeListeners;
}
listeners.add(listener);
}
- public boolean removeStateChangeListener(StateChangeListener listener)
+ public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener)
{
- Set<StateChangeListener> listeners = _stateChangeListeners;
+ Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners;
if(listeners != null)
{
return listeners.remove(listener);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Mon Feb 3 02:14:59 2014
@@ -61,9 +61,13 @@ import org.apache.qpid.server.subscripti
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
+public class SimpleAMQQueue implements AMQQueue,
+ StateChangeListener<Subscription, Subscription.State>,
+ MessageGroupManager.SubscriptionResetHelper
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
@@ -121,10 +125,6 @@ public class SimpleAMQQueue implements A
private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
private final AtomicInteger _consumerCountHigh = new AtomicInteger(0);
- private final AtomicLong _msgTxnEnqueues = new AtomicLong(0);
- private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
- private final AtomicLong _msgTxnDequeues = new AtomicLong(0);
- private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
private final AtomicLong _unackedMsgCount = new AtomicLong(0);
private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0);
private final AtomicLong _unackedMsgBytes = new AtomicLong();
@@ -165,7 +165,7 @@ public class SimpleAMQQueue implements A
private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
private final AtomicBoolean _deleted = new AtomicBoolean(false);
- private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>();
private LogSubject _logSubject;
@@ -451,7 +451,7 @@ public class SimpleAMQQueue implements A
if (isDeleted())
{
- subscription.queueDeleted(this);
+ subscription.queueDeleted();
}
}
else
@@ -505,7 +505,7 @@ public class SimpleAMQQueue implements A
// we need to manually fire the event to the removed subscription (which was the last one left for this
// queue. This is because the delete method uses the subscription set which has just been cleared
- subscription.queueDeleted(this);
+ subscription.queueDeleted();
}
}
@@ -622,18 +622,8 @@ public class SimpleAMQQueue implements A
enqueue(message, null);
}
- public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+ public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
{
- enqueue(message, false, action);
- }
-
- public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException
- {
-
- if(transactional)
- {
- incrementTxnEnqueueStats(message);
- }
incrementQueueCount();
incrementQueueSize(message);
@@ -715,7 +705,7 @@ public class SimpleAMQQueue implements A
if(action != null)
{
- action.onEnqueue(entry);
+ action.performAction(entry);
}
}
@@ -810,18 +800,6 @@ public class SimpleAMQQueue implements A
getAtomicQueueCount().incrementAndGet();
}
- private void incrementTxnEnqueueStats(final ServerMessage message)
- {
- _msgTxnEnqueues.incrementAndGet();
- _byteTxnEnqueues.addAndGet(message.getSize());
- }
-
- private void incrementTxnDequeueStats(QueueEntry entry)
- {
- _msgTxnDequeues.incrementAndGet();
- _byteTxnDequeues.addAndGet(entry.getSize());
- }
-
private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch)
throws AMQException
{
@@ -900,11 +878,6 @@ public class SimpleAMQQueue implements A
_deliveredMessages.decrementAndGet();
}
- if(sub != null && sub.isSessionTransactional())
- {
- incrementTxnDequeueStats(entry);
- }
-
checkCapacity();
}
@@ -1039,7 +1012,7 @@ public class SimpleAMQQueue implements A
}
- public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState)
+ public void stateChanged(Subscription sub, Subscription.State oldState, Subscription.State newState)
{
if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
{
@@ -1300,12 +1273,12 @@ public class SimpleAMQQueue implements A
});
}
- public void addQueueDeleteTask(final Task task)
+ public void addQueueDeleteTask(final Action<AMQQueue> task)
{
_deleteTaskList.add(task);
}
- public void removeQueueDeleteTask(final Task task)
+ public void removeQueueDeleteTask(final Action<AMQQueue> task)
{
_deleteTaskList.remove(task);
}
@@ -1322,7 +1295,9 @@ public class SimpleAMQQueue implements A
if (!_deleted.getAndSet(true))
{
- for (Binding b : _bindings)
+ final ArrayList<Binding> bindingCopy = new ArrayList<Binding>(_bindings);
+
+ for (Binding b : bindingCopy)
{
b.getExchange().removeBinding(b);
}
@@ -1334,7 +1309,7 @@ public class SimpleAMQQueue implements A
Subscription s = subscriptionIter.getNode().getSubscription();
if (s != null)
{
- s.queueDeleted(this);
+ s.queueDeleted();
}
}
@@ -1375,9 +1350,9 @@ public class SimpleAMQQueue implements A
}
- for (Task task : _deleteTaskList)
+ for (Action<AMQQueue> task : _deleteTaskList)
{
- task.doTask(this);
+ task.performAction(this);
}
_deleteTaskList.clear();
@@ -1984,7 +1959,7 @@ public class SimpleAMQQueue implements A
return _notificationChecks;
}
- private final class QueueEntryListener implements QueueEntry.StateChangeListener
+ private final class QueueEntryListener implements StateChangeListener<QueueEntry, QueueEntry.State>
{
private final Subscription _sub;
@@ -2076,26 +2051,6 @@ public class SimpleAMQQueue implements A
return _dequeueSize.get();
}
- public long getByteTxnEnqueues()
- {
- return _byteTxnEnqueues.get();
- }
-
- public long getByteTxnDequeues()
- {
- return _byteTxnDequeues.get();
- }
-
- public long getMsgTxnEnqueues()
- {
- return _msgTxnEnqueues.get();
- }
-
- public long getMsgTxnDequeues()
- {
- return _msgTxnDequeues.get();
- }
-
public long getPersistentByteEnqueues()
{
return _persistentMessageEnqueueSize.get();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Mon Feb 3 02:14:59 2014
@@ -21,6 +21,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
@@ -48,7 +49,7 @@ public class SortedQueue extends OutOfOr
return _sortedPropertyName;
}
- public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+ public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
{
synchronized (_sortedQueueLock)
{
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java?rev=1563758&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java Mon Feb 3 02:14:59 2014
@@ -0,0 +1,344 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.SubscriptionActor;
+import org.apache.qpid.server.logging.messages.SubscriptionMessages;
+import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.util.StateChangeListener;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public abstract class AbstractSubscription implements Subscription
+{
+ private final long _subscriptionID;
+ private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
+ private final Lock _stateChangeLock = new ReentrantLock();
+ private final long _createTime = System.currentTimeMillis();
+ private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+ private final boolean _acquires;
+ private final boolean _seesRequeues;
+ private final String _consumerName;
+ private final boolean _isTransient;
+
+
+ private final AtomicLong _deliveredCount = new AtomicLong(0);
+ private final AtomicLong _deliveredBytes = new AtomicLong(0);
+
+ private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+
+
+ private final FilterManager _filters;
+
+ private volatile AMQQueue.Context _queueContext;
+
+
+ private StateChangeListener<Subscription, State> _stateListener = new StateChangeListener<Subscription, State>()
+ {
+ public void stateChanged(Subscription sub, State oldState, State newState)
+ {
+ CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
+ }
+ };
+
+ private SubscriptionLogSubject _logSubject;
+ private AMQQueue _queue;
+ private String _traceExclude;
+ private String _trace;
+ private SubscriptionActor _logActor;
+ private final Class<? extends ServerMessage> _messageClass;
+ private final Object _sessionReference;
+ private boolean _noLocal;
+
+ protected AbstractSubscription(FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final Object sessionReference,
+ final boolean acquires,
+ final boolean seesRequeues,
+ final String consumerName, final boolean isTransient)
+ {
+ _messageClass = messageClass;
+ _sessionReference = sessionReference;
+ _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
+ _filters = filters;
+ _acquires = acquires;
+ _seesRequeues = seesRequeues;
+ _consumerName = consumerName;
+ _isTransient = isTransient;
+ }
+
+ public final long getSubscriptionID()
+ {
+ return _subscriptionID;
+ }
+
+
+ public final StateChangeListener<Subscription, State> getStateListener()
+ {
+ return _stateListener;
+ }
+
+ public final void setStateListener(StateChangeListener<Subscription, State> listener)
+ {
+ _stateListener = listener;
+ }
+
+
+ public final AMQQueue.Context getQueueContext()
+ {
+ return _queueContext;
+ }
+
+ public final void setQueueContext(AMQQueue.Context queueContext)
+ {
+ _queueContext = queueContext;
+ }
+
+
+ public State getState()
+ {
+ return _state.get();
+ }
+
+ protected boolean updateState(State from, State to)
+ {
+ return _state.compareAndSet(from, to);
+ }
+
+ public final boolean isActive()
+ {
+ return getState() == State.ACTIVE;
+ }
+
+ public final boolean isClosed()
+ {
+ return getState() == State.CLOSED;
+ }
+
+
+ public final void setNoLocal(boolean noLocal)
+ {
+ _noLocal = noLocal;
+ }
+
+
+ public final boolean hasInterest(QueueEntry entry)
+ {
+ //check that the message hasn't been rejected
+ if (entry.isRejectedBy(getSubscriptionID()))
+ {
+
+ return false;
+ }
+
+ if (entry.getMessage().getClass() == _messageClass)
+ {
+ if(_noLocal)
+ {
+ Object connectionRef = entry.getMessage().getConnectionReference();
+ if (connectionRef != null && connectionRef == _sessionReference)
+ {
+ return false;
+ }
+ }
+ }
+ else
+ {
+ // no interest in messages we can't convert
+ if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), _messageClass)==null)
+ {
+ return false;
+ }
+ }
+ return (_filters == null) || _filters.allAllow(entry.asFilterable());
+ }
+
+
+ protected String getFilterLogString()
+ {
+ StringBuilder filterLogString = new StringBuilder();
+ String delimiter = ", ";
+ boolean hasEntries = false;
+ if (_filters != null && _filters.hasFilters())
+ {
+ filterLogString.append(_filters.toString());
+ hasEntries = true;
+ }
+
+ if (!acquires())
+ {
+ if (hasEntries)
+ {
+ filterLogString.append(delimiter);
+ }
+ filterLogString.append("Browser");
+ hasEntries = true;
+ }
+
+ return filterLogString.toString();
+ }
+
+
+ public final boolean trySendLock()
+ {
+ return _stateChangeLock.tryLock();
+ }
+
+ public final void getSendLock()
+ {
+ _stateChangeLock.lock();
+ }
+
+ public final void releaseSendLock()
+ {
+ _stateChangeLock.unlock();
+ }
+
+ public final AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public final void setQueue(AMQQueue queue, boolean exclusive)
+ {
+ if(getQueue() != null)
+ {
+ throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
+ }
+ _queue = queue;
+
+ _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES);
+ _trace = (String) queue.getAttribute(Queue.FEDERATION_ID);
+
+ _logSubject = new SubscriptionLogSubject(this);
+ _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
+
+ if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, _logSubject, SubscriptionMessages.CREATE_LOG_HIERARCHY))
+ {
+ final String filterLogString = getFilterLogString();
+ CurrentActor.get().message(_logSubject, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
+ filterLogString.length() > 0));
+ }
+ }
+
+ protected final String getTraceExclude()
+ {
+ return _traceExclude;
+ }
+
+ protected final String getTrace()
+ {
+ return _trace;
+ }
+
+ protected final LogSubject getLogSubject()
+ {
+ return _logSubject;
+ }
+
+ public final LogActor getLogActor()
+ {
+ return _logActor;
+ }
+
+ public final long getCreateTime()
+ {
+ return _createTime;
+ }
+
+
+ public final QueueEntry.SubscriptionAcquiredState getOwningState()
+ {
+ return _owningState;
+ }
+
+
+ public final void set(String key, Object value)
+ {
+ _properties.put(key, value);
+ }
+
+ public final Object get(String key)
+ {
+ return _properties.get(key);
+ }
+
+ public final boolean acquires()
+ {
+ return _acquires;
+ }
+
+ public final boolean seesRequeues()
+ {
+ return _seesRequeues;
+ }
+
+ public final String getName()
+ {
+ return _consumerName;
+ }
+
+ public final boolean isTransient()
+ {
+ return _isTransient;
+ }
+
+
+ public final long getBytesOut()
+ {
+ return _deliveredBytes.longValue();
+ }
+
+ public final long getMessagesOut()
+ {
+ return _deliveredCount.longValue();
+ }
+
+ public final void send(final QueueEntry entry, final boolean batch) throws AMQException
+ {
+ _deliveredCount.incrementAndGet();
+ _deliveredBytes.addAndGet(entry.getMessage().getSize());
+ doSend(entry, batch);
+ }
+
+ protected abstract void doSend(final QueueEntry entry, final boolean batch) throws AMQException;
+
+ @Override
+ public final void flush() throws AMQException
+ {
+ getQueue().flushSubscription(this);
+ }
+}
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java?rev=1563758&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java Mon Feb 3 02:14:59 2014
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.util.StateChangeListener;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractSubscriptionTarget implements SubscriptionTarget
+{
+
+ private final AtomicReference<State> _state;
+ private final AtomicReference<StateChangeListener<SubscriptionTarget, State>> _stateListener =
+ new AtomicReference<StateChangeListener<SubscriptionTarget, State>>();
+
+ protected AbstractSubscriptionTarget(final State initialState)
+ {
+ _state = new AtomicReference<State>(initialState);
+ }
+
+
+ public final State getState()
+ {
+ return _state.get();
+ }
+
+ protected final boolean updateState(State from, State to)
+ {
+ if(_state.compareAndSet(from, to))
+ {
+ StateChangeListener<SubscriptionTarget, State> listener = _stateListener.get();
+ if(listener != null)
+ {
+ listener.stateChanged(this, from, to);
+ }
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+
+ public final void setStateListener(StateChangeListener<SubscriptionTarget, State> listener)
+ {
+ _stateListener.set(listener);
+ }
+
+ public final StateChangeListener<SubscriptionTarget, State> getStateListener()
+ {
+ return _stateListener.get();
+ }
+
+}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java Mon Feb 3 02:14:59 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.subscription;
import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.server.util.StateChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -240,7 +241,7 @@ public class DefinedGroupMessageGroupMan
return groupVal;
}
- private class GroupStateChangeListener implements QueueEntry.StateChangeListener
+ private class GroupStateChangeListener implements StateChangeListener<QueueEntry, QueueEntry.State>
{
private final Group _group;
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java?rev=1563758&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java Mon Feb 3 02:14:59 2014
@@ -0,0 +1,189 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.SubscriptionMessages;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.util.StateChangeListener;
+
+import java.util.EnumMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class DelegatingSubscription<T extends SubscriptionTarget> extends AbstractSubscription
+{
+ private static final Logger _logger = Logger.getLogger(DelegatingSubscription.class);
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+ static final EnumMap<SubscriptionTarget.State, State> STATE_MAP =
+ new EnumMap<SubscriptionTarget.State, State>(SubscriptionTarget.State.class);
+
+ static
+ {
+ STATE_MAP.put(SubscriptionTarget.State.ACTIVE, State.ACTIVE);
+ STATE_MAP.put(SubscriptionTarget.State.SUSPENDED, State.SUSPENDED);
+ STATE_MAP.put(SubscriptionTarget.State.CLOSED, State.CLOSED);
+ }
+
+ private final T _target;
+
+ public DelegatingSubscription(final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final boolean acquires,
+ final boolean seesRequeues,
+ final String consumerName,
+ final boolean isTransient,
+ T target)
+ {
+ super(filters, messageClass, target.getSessionModel().getConnectionReference(),
+ acquires, seesRequeues, consumerName, isTransient);
+ _target = target;
+ _target.setStateListener(
+ new StateChangeListener<SubscriptionTarget, SubscriptionTarget.State>()
+ {
+ @Override
+ public void stateChanged(final SubscriptionTarget object,
+ final SubscriptionTarget.State oldState,
+ final SubscriptionTarget.State newState)
+ {
+ targetStateChanged(oldState, newState);
+ }
+ });
+ }
+
+ private void targetStateChanged(final SubscriptionTarget.State oldState, final SubscriptionTarget.State newState)
+ {
+ if(oldState != newState)
+ {
+ if(newState == SubscriptionTarget.State.CLOSED)
+ {
+ if(_closed.compareAndSet(false,true))
+ {
+ CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE());
+ }
+ }
+ else
+ {
+ CurrentActor.get().message(getLogSubject(),SubscriptionMessages.STATE(newState.toString()));
+ }
+ }
+
+ if(newState == SubscriptionTarget.State.CLOSED && oldState != newState)
+ {
+ try
+ {
+ getQueue().unregisterSubscription(this);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Unable to remove to remove subscription", e);
+ throw new RuntimeException(e);
+ }
+ }
+ final StateChangeListener<Subscription, State> stateListener = getStateListener();
+ if(stateListener != null)
+ {
+ stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
+ }
+ }
+
+ public T getTarget()
+ {
+ return _target;
+ }
+
+ @Override
+ public long getUnacknowledgedBytes()
+ {
+ return _target.getUnacknowledgedBytes();
+ }
+
+ @Override
+ public long getUnacknowledgedMessages()
+ {
+ return _target.getUnacknowledgedMessages();
+ }
+
+ @Override
+ public AMQSessionModel getSessionModel()
+ {
+ return _target.getSessionModel();
+ }
+
+ @Override
+ public boolean isSuspended()
+ {
+ return _target.isSuspended();
+ }
+
+ @Override
+ public void close()
+ {
+ _target.close();
+ }
+
+ @Override
+ protected void doSend(final QueueEntry entry, final boolean batch) throws AMQException
+ {
+ _target.send(entry, batch);
+ }
+
+ @Override
+ public void flushBatched()
+ {
+ _target.flushBatched();
+ }
+
+ @Override
+ public void queueDeleted()
+ {
+ _target.queueDeleted();
+ }
+
+ @Override
+ public boolean wouldSuspend(final QueueEntry msg)
+ {
+ return !_target.allocateCredit(msg);
+ }
+
+ @Override
+ public void restoreCredit(final QueueEntry queueEntry)
+ {
+ _target.restoreCredit(queueEntry);
+ }
+
+ @Override
+ public void queueEmpty() throws AMQException
+ {
+ _target.queueEmpty();
+ }
+
+ @Override
+ public State getState()
+ {
+ return STATE_MAP.get(_target.getState());
+ }
+}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java Mon Feb 3 02:14:59 2014
@@ -26,6 +26,7 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.util.StateChangeListener;
public interface Subscription
{
@@ -50,11 +51,6 @@ public interface Subscription
CLOSED
}
- public static interface StateListener
- {
- public void stateChange(Subscription sub, State oldState, State newState);
- }
-
AMQQueue getQueue();
AMQSessionModel getSessionModel();
@@ -82,7 +78,7 @@ public interface Subscription
void flushBatched();
- void queueDeleted(AMQQueue queue);
+ void queueDeleted();
boolean wouldSuspend(QueueEntry msg);
@@ -94,13 +90,9 @@ public interface Subscription
void releaseSendLock();
- void releaseQueueEntry(final QueueEntry queueEntryImpl);
-
- void onDequeue(final QueueEntry queueEntry);
-
void restoreCredit(final QueueEntry queueEntry);
- void setStateListener(final StateListener listener);
+ void setStateListener(final StateChangeListener<Subscription, State> listener);
public State getState();
@@ -115,9 +107,9 @@ public interface Subscription
public Object get(String key);
- boolean isSessionTransactional();
-
void queueEmpty() throws AMQException;
- String getConsumerName();
+ String getName();
+
+ void flush() throws AMQException;
}
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java?rev=1563758&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java Mon Feb 3 02:14:59 2014
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.util.StateChangeListener;
+
+public interface SubscriptionTarget
+{
+
+
+ enum State
+ {
+ ACTIVE, SUSPENDED, CLOSED
+ }
+
+ State getState();
+
+ void setStateListener(StateChangeListener<SubscriptionTarget, State> listener);
+
+ long getUnacknowledgedBytes();
+
+ long getUnacknowledgedMessages();
+
+ AMQSessionModel getSessionModel();
+
+ void send(QueueEntry entry, boolean batch) throws AMQException;
+
+ void flushBatched();
+
+ void queueDeleted();
+
+ void queueEmpty() throws AMQException;
+
+ boolean allocateCredit(QueueEntry msg);
+
+ void restoreCredit(QueueEntry queueEntry);
+
+ boolean isSuspended();
+
+ boolean close();
+}
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java?rev=1563758&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java Mon Feb 3 02:14:59 2014
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public interface Action<T>
+{
+ void performAction(T object);
+}
Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java?rev=1563758&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java Mon Feb 3 02:14:59 2014
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public interface StateChangeListener<T, E extends Enum>
+{
+ void stateChanged(T object, E oldState, E newState);
+}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Mon Feb 3 02:14:59 2014
@@ -141,7 +141,7 @@ public class VirtualHostConfigRecoveryHa
try
{
- queue.enqueue(message, true, null);
+ queue.enqueue(message, null);
ref.release();
}
catch (AMQException e)
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Mon Feb 3 02:14:59 2014
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -297,23 +298,15 @@ public class MockAMQQueue implements AMQ
{
}
- public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+ public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
{
}
- public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException
- {
- }
-
public void requeue(QueueEntry entry)
{
}
- public void requeue(QueueEntryImpl storeContext, Subscription subscription)
- {
- }
-
public void dequeue(QueueEntry entry, Subscription sub)
{
}
@@ -323,11 +316,11 @@ public class MockAMQQueue implements AMQ
return false;
}
- public void addQueueDeleteTask(Task task)
+ public void addQueueDeleteTask(Action<AMQQueue> task)
{
}
- public void removeQueueDeleteTask(final Task task)
+ public void removeQueueDeleteTask(final Action<AMQQueue> task)
{
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Mon Feb 3 02:14:59 2014
@@ -27,6 +27,8 @@ import org.apache.qpid.server.message.In
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
public class MockQueueEntry implements QueueEntry
{
@@ -53,7 +55,7 @@ public class MockQueueEntry implements Q
return false;
}
- public void addStateChangeListener(StateChangeListener listener)
+ public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener)
{
}
@@ -63,7 +65,7 @@ public class MockQueueEntry implements Q
}
- public int routeToAlternate(final BaseQueue.PostEnqueueAction action, final ServerTransaction txn)
+ public int routeToAlternate(final Action<QueueEntry> action, final ServerTransaction txn)
{
return 0;
}
@@ -137,7 +139,7 @@ public class MockQueueEntry implements Q
}
- public boolean removeStateChangeListener(StateChangeListener listener)
+ public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener)
{
return false;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Mon Feb 3 02:14:59 2014
@@ -41,10 +41,10 @@ import org.apache.qpid.server.message.AM
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction;
import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -228,9 +228,9 @@ public class SimpleAMQQueueTest extends
_queue.registerSubscription(_subscription, false);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+ Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
{
- public void onEnqueue(QueueEntry entry)
+ public void performAction(QueueEntry entry)
{
queueEntries.add(entry);
}
@@ -276,9 +276,9 @@ public class SimpleAMQQueueTest extends
_queue.registerSubscription(_subscription, false);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+ Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
{
- public void onEnqueue(QueueEntry entry)
+ public void performAction(QueueEntry entry)
{
queueEntries.add(entry);
}
@@ -323,9 +323,9 @@ public class SimpleAMQQueueTest extends
_queue.registerSubscription(_subscription, false);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+ Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
{
- public void onEnqueue(QueueEntry entry)
+ public void performAction(QueueEntry entry)
{
queueEntries.add(entry);
}
@@ -376,9 +376,9 @@ public class SimpleAMQQueueTest extends
_queue.registerSubscription(subscription2, false);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
- PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+ Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
{
- public void onEnqueue(QueueEntry entry)
+ public void performAction(QueueEntry entry)
{
queueEntries.add(entry);
}
@@ -1011,37 +1011,37 @@ public class SimpleAMQQueueTest extends
//verify behaviour in face of expected state changes:
//verify a subscription going suspended->active increases the count
- queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE);
+ queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE);
assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
//verify a subscription going active->suspended decreases the count
- queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED);
+ queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify a subscription going suspended->closed doesn't change the count
- queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED);
+ queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify a subscription going active->closed decreases the count
- queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED);
+ queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED);
assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
//verify behaviour in face of unexpected state changes:
//verify a subscription going closed->active increases the count
- queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE);
+ queue.stateChanged(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify a subscription going active->active doesn't change the count
- queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE);
+ queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify a subscription going closed->suspended doesn't change the count
- queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED);
+ queue.stateChanged(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify a subscription going suspended->suspended doesn't change the count
- queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED);
+ queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Mon Feb 3 02:14:59 2014
@@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.AMQQ
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.StateChangeListener;
import java.util.ArrayList;
import java.util.List;
@@ -47,7 +48,7 @@ public class MockSubscription implements
private boolean _closed = false;
private String tag = "mocktag";
private AMQQueue queue = null;
- private StateListener _listener = null;
+ private StateChangeListener<Subscription, State> _listener = null;
private volatile AMQQueue.Context _queueContext = null;
private State _state = State.ACTIVE;
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
@@ -75,16 +76,22 @@ public class MockSubscription implements
_closed = true;
if (_listener != null)
{
- _listener.stateChange(this, _state, State.CLOSED);
+ _listener.stateChanged(this, _state, State.CLOSED);
}
_state = State.CLOSED;
}
- public String getConsumerName()
+ public String getName()
{
return tag;
}
+ @Override
+ public void flush() throws AMQException
+ {
+
+ }
+
public long getSubscriptionID()
{
return _subscriptionID;
@@ -202,7 +209,7 @@ public class MockSubscription implements
return false;
}
- public void queueDeleted(AMQQueue queue)
+ public void queueDeleted()
{
}
@@ -211,18 +218,10 @@ public class MockSubscription implements
_stateChangeLock.unlock();
}
- public void onDequeue(QueueEntry queueEntry)
- {
- }
-
public void restoreCredit(QueueEntry queueEntry)
{
}
- public void releaseQueueEntry(QueueEntry queueEntry)
- {
- }
-
public void send(QueueEntry entry, boolean batch) throws AMQException
{
if (messages.contains(entry))
@@ -251,7 +250,7 @@ public class MockSubscription implements
{
}
- public void setStateListener(StateListener listener)
+ public void setStateListener(StateChangeListener<Subscription, State> listener)
{
this._listener = listener;
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Mon Feb 3 02:14:59 2014
@@ -31,20 +31,19 @@ class ExplicitAcceptDispositionChangeLis
private final QueueEntry _entry;
- private final Subscription_0_10 _sub;
+ private final SubscriptionTarget_0_10 _target;
- public ExplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+ public ExplicitAcceptDispositionChangeListener(QueueEntry entry, SubscriptionTarget_0_10 target)
{
_entry = entry;
- _sub = subscription_0_10;
+ _target = target;
}
public void onAccept()
{
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
+ if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
{
- subscription.getSessionModel().acknowledge(subscription, _entry);
+ _target.getSessionModel().acknowledge(_target, _entry);
}
else
{
@@ -55,10 +54,9 @@ class ExplicitAcceptDispositionChangeLis
public void onRelease(boolean setRedelivered)
{
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
+ if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
{
- subscription.release(_entry, setRedelivered);
+ _target.release(_entry, setRedelivered);
}
else
{
@@ -68,10 +66,9 @@ class ExplicitAcceptDispositionChangeLis
public void onReject()
{
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
+ if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
{
- subscription.reject(_entry);
+ _target.reject(_entry);
}
else
{
@@ -82,12 +79,8 @@ class ExplicitAcceptDispositionChangeLis
public boolean acquire()
{
- return _entry.acquire(getSubscription());
+ return _entry.acquire(_target.getSubscription());
}
- private Subscription_0_10 getSubscription()
- {
- return _sub;
- }
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Mon Feb 3 02:14:59 2014
@@ -30,12 +30,12 @@ class ImplicitAcceptDispositionChangeLis
private final QueueEntry _entry;
- private Subscription_0_10 _sub;
+ private SubscriptionTarget_0_10 _target;
- public ImplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+ public ImplicitAcceptDispositionChangeListener(QueueEntry entry, SubscriptionTarget_0_10 target)
{
_entry = entry;
- _sub = subscription_0_10;
+ _target = target;
}
public void onAccept()
@@ -45,9 +45,9 @@ class ImplicitAcceptDispositionChangeLis
public void onRelease(boolean setRedelivered)
{
- if(_entry.isAcquiredBy(_sub))
+ if(_entry.isAcquiredBy(_target.getSubscription()))
{
- getSubscription().release(_entry, setRedelivered);
+ _target.release(_entry, setRedelivered);
}
else
{
@@ -57,9 +57,9 @@ class ImplicitAcceptDispositionChangeLis
public void onReject()
{
- if(_entry.isAcquiredBy(_sub))
+ if(_entry.isAcquiredBy(_target.getSubscription()))
{
- getSubscription().reject(_entry);
+ _target.reject(_entry);
}
else
{
@@ -70,19 +70,15 @@ class ImplicitAcceptDispositionChangeLis
public boolean acquire()
{
- boolean acquired = _entry.acquire(getSubscription());
+ boolean acquired = _entry.acquire(_target.getSubscription());
if(acquired)
{
- getSubscription().recordUnacknowledged(_entry);
+ _target.recordUnacknowledged(_entry);
}
return acquired;
}
- public Subscription_0_10 getSubscription()
- {
- return _sub;
- }
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Mon Feb 3 02:14:59 2014
@@ -26,12 +26,12 @@ import org.apache.qpid.transport.Method;
public class MessageAcceptCompletionListener implements Method.CompletionListener
{
- private final Subscription_0_10 _sub;
+ private final SubscriptionTarget_0_10 _sub;
private final QueueEntry _entry;
private final ServerSession _session;
private boolean _restoreCredit;
- public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit)
+ public MessageAcceptCompletionListener(SubscriptionTarget_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit)
{
super();
_sub = sub;
@@ -46,7 +46,7 @@ public class MessageAcceptCompletionList
{
_sub.restoreCredit(_entry);
}
- if(_entry.isAcquiredBy(_sub))
+ if(_entry.isAcquiredBy(_sub.getSubscription()))
{
_session.acknowledge(_sub, _entry);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org