You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/08/09 00:55:35 UTC
svn commit: r1155137 - in /qpid/trunk/qpid/java/broker/src:
main/java/org/apache/qpid/server/federation/
main/java/org/apache/qpid/server/handler/
main/java/org/apache/qpid/server/subscription/
main/java/org/apache/qpid/server/transport/ test/java/org/...
Author: robbie
Date: Mon Aug 8 22:55:35 2011
New Revision: 1155137
URL: http://svn.apache.org/viewvc?rev=1155137&view=rev
Log:
QPID-3386: move all server Subscription creation into the SubscriptionFactoryImpl, ensure all Subscription implementations share a common ID sequence generator
Added:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
Removed:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java Mon Aug 8 22:55:35 2011
@@ -38,6 +38,7 @@ import org.apache.qpid.server.queue.Base
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.transport.ServerSession;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -696,7 +697,7 @@ public class Bridge implements BridgeCon
//TODO Handle the passing of non-null Filters and Arguments here
- Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
+ Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
_destination,
MessageAcceptMode.NONE,
MessageAcquireMode.PRE_ACQUIRED,
@@ -768,7 +769,7 @@ public class Bridge implements BridgeCon
//TODO Handle the passing of non-null Filters and Arguments here
- Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
+ Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
_destination,
MessageAcceptMode.NONE,
MessageAcquireMode.PRE_ACQUIRED,
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Mon Aug 8 22:55:35 2011
@@ -162,14 +162,7 @@ public class BasicGetMethodHandler imple
}
else
{
- sub = new GetNoAckSubscription(channel,
- session,
- null,
- null,
- false,
- singleMessageCredit,
- getDeliveryMethod,
- getRecordMethod);
+ sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
queue.registerSubscription(sub,false);
@@ -180,27 +173,5 @@ public class BasicGetMethodHandler imple
}
- public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
- {
- public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
- throws AMQException
- {
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
- }
-
- public boolean isTransient()
- {
- return true;
- }
- public boolean wouldSuspend(QueueEntry msg)
- {
- return !getCreditManager().useCreditForMessage(msg.getMessage());
- }
-
- }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java Mon Aug 8 22:55:35 2011
@@ -20,13 +20,21 @@
*/
package org.apache.qpid.server.subscription;
+import java.util.Map;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager_0_10;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.transport.ServerSession;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageFlowMode;
/**
* Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory
@@ -56,4 +64,23 @@ public interface SubscriptionFactory
RecordDeliveryMethod recordMethod
)
throws AMQException;
+
+
+ SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(AMQChannel channel,
+ AMQProtocolSession session,
+ AMQShortString consumerTag,
+ FieldTable filters,
+ boolean noLocal,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod) throws AMQException;
+
+ Subscription_0_10 createSubscription(final ServerSession session,
+ final String destination,
+ final MessageAcceptMode acceptMode,
+ final MessageAcquireMode acquireMode,
+ final MessageFlowMode flowMode,
+ final FlowCreditManager_0_10 creditManager,
+ final FilterManager filterManager,
+ final Map<String,Object> arguments);
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java Mon Aug 8 22:55:35 2011
@@ -20,17 +20,28 @@
*/
package org.apache.qpid.server.subscription;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager_0_10;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageFlowMode;
public class SubscriptionFactoryImpl implements SubscriptionFactory
{
+ private static final AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
+
public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession,
AMQShortString consumerTag, boolean acks, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager) throws AMQException
@@ -78,18 +89,47 @@ public class SubscriptionFactoryImpl imp
if(isBrowser)
{
- return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
+ return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
}
else if(acks)
{
- return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
+ return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
}
else
{
- return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
+ return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
}
}
+ public SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(final AMQChannel channel,
+ final AMQProtocolSession session,
+ final AMQShortString consumerTag,
+ final FieldTable filters,
+ final boolean noLocal,
+ final FlowCreditManager creditManager,
+ final ClientDeliveryMethod deliveryMethod,
+ final RecordDeliveryMethod recordMethod) throws AMQException
+ {
+ return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod, getNextSubscriptionId());
+ }
+
+ public Subscription_0_10 createSubscription(final ServerSession session,
+ final String destination,
+ final MessageAcceptMode acceptMode,
+ final MessageAcquireMode acquireMode,
+ final MessageFlowMode flowMode,
+ final FlowCreditManager_0_10 creditManager,
+ final FilterManager filterManager,
+ final Map<String,Object> arguments)
+ {
+ return new Subscription_0_10(session, destination, acceptMode, acquireMode,
+ flowMode, creditManager, filterManager, arguments, getNextSubscriptionId());
+ }
public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl();
+
+ private static long getNextSubscriptionId()
+ {
+ return SUB_ID_GENERATOR.getAndIncrement();
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Mon Aug 8 22:55:35 2011
@@ -88,9 +88,7 @@ public abstract class SubscriptionImpl i
private final Lock _stateChangeLock;
- private static final AtomicLong idGenerator = new AtomicLong(0);
- // Create a simple ID that increments for ever new Subscription
- private final long _subscriptionID = idGenerator.getAndIncrement();
+ private final long _subscriptionID;
private LogSubject _logSubject;
private LogActor _logActor;
private UUID _id;
@@ -104,10 +102,11 @@ public abstract class SubscriptionImpl i
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ RecordDeliveryMethod recordMethod,
+ long subscriptionID)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
}
@@ -151,10 +150,11 @@ public abstract class SubscriptionImpl i
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ RecordDeliveryMethod recordMethod,
+ long subscriptionID)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
}
@@ -211,16 +211,45 @@ public abstract class SubscriptionImpl i
}
+ /**
+ * NoAck Subscription for use with BasicGet method.
+ */
+ public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
+ {
+ public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod,
+ long subscriptionID)
+ throws AMQException
+ {
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+ }
+
+ public boolean isTransient()
+ {
+ return true;
+ }
+
+ public boolean wouldSuspend(QueueEntry msg)
+ {
+ return !getCreditManager().useCreditForMessage(msg.getMessage());
+ }
+
+ }
+
static final class AckSubscription extends SubscriptionImpl
{
public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ RecordDeliveryMethod recordMethod,
+ long subscriptionID)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
}
@@ -296,10 +325,11 @@ public abstract class SubscriptionImpl i
AMQShortString consumerTag, FieldTable arguments,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ RecordDeliveryMethod recordMethod,
+ long subscriptionID)
throws AMQException
{
-
+ _subscriptionID = subscriptionID;
_channel = channel;
_consumerTag = consumerTag;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Mon Aug 8 22:55:35 2011
@@ -40,7 +40,6 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.SubscriptionActor;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.AMQMessage;
@@ -80,10 +79,7 @@ import java.nio.ByteBuffer;
public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
{
-
- private static final AtomicLong idGenerator = new AtomicLong(0);
- // Create a simple ID that increments for ever new Subscription
- private final long _subscriptionID = idGenerator.getAndIncrement();
+ private final long _subscriptionID;
private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
@@ -114,7 +110,6 @@ public class Subscription_0_10 implement
private MessageFlowMode _flowMode;
private final ServerSession _session;
private AtomicBoolean _stopped = new AtomicBoolean(true);
- private ConcurrentHashMap<Integer, QueueEntry> _sentMap = new ConcurrentHashMap<Integer, QueueEntry>();
private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
private LogActor _logActor;
@@ -131,8 +126,9 @@ public class Subscription_0_10 implement
MessageAcquireMode acquireMode,
MessageFlowMode flowMode,
FlowCreditManager_0_10 creditManager,
- FilterManager filters,Map<String, Object> arguments)
+ FilterManager filters,Map<String, Object> arguments, long subscriptionId)
{
+ _subscriptionID = subscriptionId;
_session = session;
_destination = destination;
_acceptMode = acceptMode;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Mon Aug 8 22:55:35 2011
@@ -74,7 +74,7 @@ public class ServerConnectionDelegate ex
public ServerSession getSession(Connection conn, SessionAttach atc)
{
- SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry);
+ SessionDelegate serverSessionDelegate = new ServerSessionDelegate();
ServerSession ssn = new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Mon Aug 8 22:55:35 2011
@@ -129,16 +129,6 @@ public class ServerSession extends Sessi
this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
}
- protected void setState(State state)
- {
- super.setState(state);
-
- if (state == State.OPEN)
- {
- _actor.message(ChannelMessages.CREATE());
- }
- }
-
public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
{
super(connection, delegate, name, expiry);
@@ -150,6 +140,16 @@ public class ServerSession extends Sessi
getConfigStore().addConfiguredObject(this);
}
+ protected void setState(State state)
+ {
+ super.setState(state);
+
+ if (state == State.OPEN)
+ {
+ _actor.message(ChannelMessages.CREATE());
+ }
+ }
+
private ConfigStore getConfigStore()
{
return getConnectionConfig().getConfigStore();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Mon Aug 8 22:55:35 2011
@@ -47,11 +47,11 @@ import org.apache.qpid.server.queue.AMQQ
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Acquired;
@@ -98,11 +98,10 @@ import org.apache.qpid.transport.TxSelec
public class ServerSessionDelegate extends SessionDelegate
{
private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class);
- private final IApplicationRegistry _appRegistry;
- public ServerSessionDelegate(IApplicationRegistry appRegistry)
+ public ServerSessionDelegate()
{
- _appRegistry = appRegistry;
+
}
@Override
@@ -254,7 +253,7 @@ public class ServerSessionDelegate exten
return;
}
- Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
+ Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
destination,
method.getAcceptMode(),
method.getAcquireMode(),
Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java?rev=1155137&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java (added)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java Mon Aug 8 22:55:35 2011
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.flow.WindowCreditManager;
+import org.apache.qpid.server.protocol.ProtocolEngine_0_10;
+import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.transport.ServerSessionDelegate;
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.TestNetworkConnection;
+
+public class SubscriptionFactoryImplTest extends InternalBrokerBaseCase
+{
+ /**
+ * Tests that while creating Subscriptions of various types, the
+ * ID numbers assigned are allocated from a common sequence
+ * (in increasing order).
+ */
+ public void testDifferingSubscriptionTypesShareCommonIdNumberingSequence() throws Exception
+ {
+ //create a No-Ack subscription, get the first Subscription ID
+ long previousId = 0;
+ Subscription noAckSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, getSession(), new AMQShortString("1"), false, null, false, getChannel().getCreditManager());
+ previousId = noAckSub.getSubscriptionID();
+
+ //create an ack subscription, verify the next Subscription ID is used
+ Subscription ackSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, getSession(), new AMQShortString("1"), true, null, false, getChannel().getCreditManager());
+ assertEquals("Unexpected Subscription ID allocated", previousId + 1, ackSub.getSubscriptionID());
+ previousId = ackSub.getSubscriptionID();
+
+ //create a browser subscription
+ FieldTable filters = new FieldTable();
+ filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
+ Subscription browerSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, getSession(), new AMQShortString("1"), true, null, false, getChannel().getCreditManager());
+ assertEquals("Unexpected Subscription ID allocated", previousId + 1, browerSub.getSubscriptionID());
+ previousId = browerSub.getSubscriptionID();
+
+ //create an BasicGet NoAck subscription
+ Subscription getNoAckSub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(getChannel(), getSession(), new AMQShortString("1"), null, false,
+ getChannel().getCreditManager(),getChannel().getClientDeliveryMethod(), getChannel().getRecordDeliveryMethod());
+ assertEquals("Unexpected Subscription ID allocated", previousId + 1, getNoAckSub.getSubscriptionID());
+ previousId = getNoAckSub.getSubscriptionID();
+
+ //create a 0-10 subscription
+ ServerConnection conn = new ServerConnection(1);
+ ProtocolEngine_0_10 engine = new ProtocolEngine_0_10(conn, new TestNetworkConnection(), getRegistry());
+ conn.setVirtualHost(getVirtualHost());
+ conn.setConnectionConfig(engine);
+ ServerSessionDelegate sesDel = new ServerSessionDelegate();
+ Binary name = new Binary(new byte[]{new Byte("1")});
+ ServerSession session = new ServerSession(conn, sesDel, name, 0, engine);
+
+ Subscription sub_0_10 = SubscriptionFactoryImpl.INSTANCE.createSubscription(session, "1", MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED, MessageFlowMode.WINDOW, new WindowCreditManager(), null, null);
+ assertEquals("Unexpected Subscription ID allocated", previousId + 1, sub_0_10.getSubscriptionID());
+ }
+
+}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=1155137&r1=1155136&r2=1155137&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java Mon Aug 8 22:55:35 2011
@@ -44,14 +44,13 @@ import org.apache.qpid.server.store.Mess
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.util.MockChannel;
public class InternalBrokerBaseCase extends QpidTestCase
{
private IApplicationRegistry _registry;
private MessageStore _messageStore;
- private MockChannel _channel;
+ private AMQChannel _channel;
private InternalTestProtocolSession _session;
private VirtualHost _virtualHost;
private AMQQueue _queue;
@@ -111,7 +110,7 @@ public class InternalBrokerBaseCase exte
_session = new InternalTestProtocolSession(_virtualHost);
CurrentActor.set(_session.getLogActor());
- _channel = new MockChannel(_session, 1, _messageStore);
+ _channel = new AMQChannel(_session, 1, _messageStore);
_session.addChannel(_channel);
}
@@ -283,12 +282,12 @@ public class InternalBrokerBaseCase exte
_messageStore = messageStore;
}
- public MockChannel getChannel()
+ public AMQChannel getChannel()
{
return _channel;
}
- public void setChannel(MockChannel channel)
+ public void setChannel(AMQChannel channel)
{
_channel = channel;
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org