You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/07/15 00:55:54 UTC
svn commit: r1503076 - in /qpid/trunk/qpid/java/broker/src:
main/java/org/apache/qpid/server/logging/actors/
main/java/org/apache/qpid/server/logging/subjects/
main/java/org/apache/qpid/server/protocol/
main/java/org/apache/qpid/server/protocol/v0_10/ ...
Author: rgodfrey
Date: Sun Jul 14 22:55:54 2013
New Revision: 1503076
URL: http://svn.apache.org/r1503076
Log:
QPID-4659 : [Java Broker] fix protocol version specific code in logging, subscriptions
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java Sun Jul 14 22:55:54 2013
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.logging.actors;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
@@ -45,7 +46,7 @@ public class AMQPChannelActor extends Ab
* @param channel The Channel for this LogActor
* @param rootLogger The root Logger that this LogActor should use
*/
- public AMQPChannelActor(AMQChannel channel, RootMessageLogger rootLogger)
+ public AMQPChannelActor(AMQSessionModel channel, RootMessageLogger rootLogger)
{
super(rootLogger);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java Sun Jul 14 22:55:54 2013
@@ -14,14 +14,15 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.logging.actors;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -39,7 +40,7 @@ public class AMQPConnectionActor extends
{
private ConnectionLogSubject _logSubject;
- public AMQPConnectionActor(AMQProtocolSession session, RootMessageLogger rootLogger)
+ public AMQPConnectionActor(AMQConnectionModel session, RootMessageLogger rootLogger)
{
super(rootLogger);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java Sun Jul 14 22:55:54 2013
@@ -20,20 +20,16 @@
*/
package org.apache.qpid.server.logging.subjects;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_10.ServerConnection;
-import org.apache.qpid.server.protocol.v0_10.ServerSession;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
public class ChannelLogSubject extends AbstractLogSubject
{
- public ChannelLogSubject(AMQChannel channel)
+ public ChannelLogSubject(AMQSessionModel session)
{
- AMQProtocolSession session = channel.getProtocolSession();
-
/**
* LOG FORMAT used by the AMQPConnectorActor follows
* ChannelLogSubject.CHANNEL_FORMAT : con:{0}({1}@{2}/{3})/ch:{4}.
@@ -47,39 +43,14 @@ public class ChannelLogSubject extends A
* 3 - Virtualhost
* 4 - Channel ID
*/
+ AMQConnectionModel connection = session.getConnectionModel();
setLogStringWithFormat(CHANNEL_FORMAT,
- session.getSessionID(),
- session.getAuthorizedPrincipal().getName(),
- session.getRemoteAddress(),
- session.getVirtualHost().getName(),
- channel.getChannelId());
- }
+ connection == null ? -1L : connection.getConnectionId(),
+ (connection == null || connection.getPrincipalAsString() == null) ? "?" : connection.getPrincipalAsString(),
+ (connection == null || connection.getRemoteAddressString() == null) ? "?" : connection.getRemoteAddressString(),
+ (connection == null || connection.getVirtualHostName() == null) ? "?" : connection.getVirtualHostName(),
+ session.getChannelId());
- public ChannelLogSubject(ServerSession session)
- {
- /**
- * LOG FORMAT used by the AMQPConnectorActor follows
- * ChannelLogSubject.CHANNEL_FORMAT : con:{0}({1}@{2}/{3})/ch:{4}.
- *
- * Uses a MessageFormat call to insert the required values according to
- * these indices:
- *
- * 0 - Connection ID
- * 1 - User ID
- * 2 - IP
- * 3 - Virtualhost
- * 4 - Channel ID
- */
- if(session.getConnection() instanceof ServerConnection)
- {
- ServerConnection connection = (ServerConnection) session.getConnection();
- setLogStringWithFormat(CHANNEL_FORMAT,
- connection == null ? -1L : connection.getConnectionId(),
- session.getAuthorizedPrincipal() == null ? "?" : session.getAuthorizedPrincipal().getName(),
- (connection == null || connection.getRemoteAddressString() == null) ? "?" : connection.getRemoteAddressString(),
- session.getVirtualHost().getName(),
- session.getChannel());
- }
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java Sun Jul 14 22:55:54 2013
@@ -20,34 +20,33 @@
*/
package org.apache.qpid.server.logging.subjects;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
+import java.text.MessageFormat;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
-import java.text.MessageFormat;
-
/** The Connection LogSubject */
public class ConnectionLogSubject extends AbstractLogSubject
{
- public ConnectionLogSubject(AMQProtocolSession session)
+ // The Session this Actor is representing
+ private AMQConnectionModel _session;
+
+ public ConnectionLogSubject(AMQConnectionModel session)
{
_session = session;
}
- // The Session this Actor is representing
- private AMQProtocolSession _session;
-
// Used to stop re-creating the _logString when we reach our final format
private boolean _upToDate = false;
/**
* Update the LogString as the Connection process proceeds.
- *
+ *
* When the Session has an authorized ID add that to the string.
- *
+ *
* When the Session then gains a Vhost add that to the string, at this point
* we can set upToDate = true as the _logString will not need to be updated
* from this point onwards.
@@ -56,44 +55,44 @@ public class ConnectionLogSubject extend
{
if (!_upToDate)
{
- if (_session.getAuthorizedPrincipal() != null)
+ if (_session.getPrincipalAsString() != null)
{
- if (_session.getVirtualHost() != null)
+ if (_session.getVirtualHostName() != null)
{
/**
* LOG FORMAT used by the AMQPConnectorActor follows
* ConnectionLogSubject.CONNECTION_FORMAT :
* con:{0}({1}@{2}/{3})
- *
+ *
* Uses a MessageFormat call to insert the required values
* according to these indices:
- *
+ *
* 0 - Connection ID 1 - User ID 2 - IP 3 - Virtualhost
*/
setLogString("[" + MessageFormat.format(CONNECTION_FORMAT,
- _session.getSessionID(),
- _session.getAuthorizedPrincipal().getName(),
- _session.getRemoteAddress(),
- _session.getVirtualHost().getName())
+ _session.getConnectionId(),
+ _session.getPrincipalAsString(),
+ _session.getRemoteAddressString(),
+ _session.getVirtualHostName())
+ "] ");
_upToDate = true;
- }
+ }
else
{
setLogString("[" + MessageFormat.format(USER_FORMAT,
- _session.getSessionID(),
- _session.getAuthorizedPrincipal().getName(),
- _session.getRemoteAddress())
+ _session.getConnectionId(),
+ _session.getPrincipalAsString(),
+ _session.getRemoteAddressString())
+ "] ");
}
- }
+ }
else
{
setLogString("[" + MessageFormat.format(SOCKET_FORMAT,
- _session.getSessionID(),
- _session.getRemoteAddress())
+ _session.getConnectionId(),
+ _session.getRemoteAddressString())
+ "] ");
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Sun Jul 14 22:55:54 2013
@@ -92,4 +92,6 @@ public interface AMQConnectionModel exte
void stop();
boolean isStopped();
+
+ String getVirtualHostName();
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java Sun Jul 14 22:55:54 2013
@@ -76,7 +76,7 @@ public class ProtocolEngineCreator_0_10
final ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker,
fqdn, broker.getSubjectCreator(address));
- ServerConnection conn = new ServerConnection(id);
+ ServerConnection conn = new ServerConnection(id,broker);
conn.setConnectionDelegate(connDelegate);
conn.setRemoteAddress(network.getRemoteAddress());
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Sun Jul 14 22:55:54 2013
@@ -32,9 +32,11 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -60,7 +62,7 @@ public class ServerConnection extends Co
{
private Runnable _onOpenTask;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
- private LogActor _actor = GenericActor.getInstance(this);
+ private LogActor _actor;
private Subject _authorizedSubject = null;
private Principal _authorizedPrincipal = null;
@@ -76,9 +78,10 @@ public class ServerConnection extends Co
private Transport _transport;
private volatile boolean _stopped;
- public ServerConnection(final long connectionId)
+ public ServerConnection(final long connectionId, Broker broker)
{
_connectionId = connectionId;
+ _actor = new AMQPConnectionActor(this, broker.getRootMessageLogger());
}
public Object getReference()
@@ -154,6 +157,12 @@ public class ServerConnection extends Co
}
@Override
+ public String getVirtualHostName()
+ {
+ return _virtualHost == null ? null : _virtualHost.getName();
+ }
+
+ @Override
public Port getPort()
{
return _port;
@@ -503,7 +512,7 @@ public class ServerConnection extends Co
public String getPrincipalAsString()
{
- return getAuthorizedPrincipal().getName();
+ return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
}
public long getSessionCountLimit()
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Sun Jul 14 22:55:54 2013
@@ -45,7 +45,6 @@ import org.apache.qpid.server.store.Dura
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.DtxNotSelectedException;
import org.apache.qpid.server.txn.IncorrectDtxStateException;
@@ -250,7 +249,7 @@ public class ServerSessionDelegate exten
return;
}
- Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
+ Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
destination,
method.getAcceptMode(),
method.getAcquireMode(),
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java Sun Jul 14 22:55:54 2013
@@ -132,9 +132,9 @@ public class Subscription_0_10 implement
MessageAcquireMode acquireMode,
MessageFlowMode flowMode,
FlowCreditManager_0_10 creditManager,
- FilterManager filters,Map<String, Object> arguments, long subscriptionId)
+ FilterManager filters,Map<String, Object> arguments)
{
- _subscriptionID = subscriptionId;
+ _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
_session = session;
_postIdSettingAction = new AddMessageDispositionListenerAction(session);
_destination = destination;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Sun Jul 14 22:55:54 2013
@@ -1322,6 +1322,12 @@ public class AMQProtocolEngine implement
return _stopped;
}
+ @Override
+ public String getVirtualHostName()
+ {
+ return _virtualHost == null ? null : _virtualHost.getName();
+ }
+
public long getLastReceivedTime()
{
return _lastReceivedTime;
@@ -1359,7 +1365,7 @@ public class AMQProtocolEngine implement
public String getAuthId()
{
- return getAuthorizedPrincipal().getName();
+ return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
}
public Integer getRemotePID()
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java Sun Jul 14 22:55:54 2013
@@ -56,33 +56,23 @@ public interface SubscriptionFactory
Subscription createSubscription(AMQChannel channel,
- AMQProtocolSession protocolSession,
- AMQShortString consumerTag,
- boolean acks,
- FieldTable filters,
- boolean noLocal,
- FlowCreditManager creditManager,
- ClientDeliveryMethod clientMethod,
- RecordDeliveryMethod recordMethod
- )
- throws AMQException;
+ AMQProtocolSession protocolSession,
+ AMQShortString consumerTag,
+ boolean acks,
+ FieldTable filters,
+ boolean noLocal,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod clientMethod,
+ RecordDeliveryMethod recordMethod) throws AMQException;
- SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(AMQChannel channel,
- AMQProtocolSession session,
- AMQShortString consumerTag,
- FieldTable filters,
- boolean noLocal,
- FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod) throws AMQException;
+ Subscription createBasicGetNoAckSubscription(AMQChannel channel,
+ AMQProtocolSession session,
+ AMQShortString consumerTag,
+ FieldTable filters,
+ boolean noLocal,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod) throws AMQException;
- Subscription_0_10 createSubscription(final ServerSession session,
- final String destination,
- final MessageAcceptMode acceptMode,
- final MessageAcquireMode acquireMode,
- final MessageFlowMode flowMode,
- final FlowCreditManager_0_10 creditManager,
- final FilterManager filterManager,
- final Map<String,Object> arguments);
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java Sun Jul 14 22:55:54 2013
@@ -25,25 +25,14 @@ import org.apache.qpid.common.AMQPFilter
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.protocol.v0_10.FlowCreditManager_0_10;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_10.Subscription_0_10;
-import org.apache.qpid.server.protocol.v0_10.ServerSession;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageFlowMode;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
public class SubscriptionFactoryImpl implements SubscriptionFactory
{
- private static final AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession,
AMQShortString consumerTag, boolean acks, FieldTable filters,
@@ -92,15 +81,15 @@ public class SubscriptionFactoryImpl imp
if(isBrowser)
{
- return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
+ return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
}
else if(acks)
{
- return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
+ return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
}
else
{
- return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
+ return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
}
}
@@ -113,26 +102,9 @@ public class SubscriptionFactoryImpl imp
final ClientDeliveryMethod deliveryMethod,
final RecordDeliveryMethod recordMethod) throws AMQException
{
- return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod, getNextSubscriptionId());
- }
-
- public Subscription_0_10 createSubscription(final ServerSession session,
- final String destination,
- final MessageAcceptMode acceptMode,
- final MessageAcquireMode acquireMode,
- final MessageFlowMode flowMode,
- final FlowCreditManager_0_10 creditManager,
- final FilterManager filterManager,
- final Map<String,Object> arguments)
- {
- return new Subscription_0_10(session, destination, acceptMode, acquireMode,
- flowMode, creditManager, filterManager, arguments, getNextSubscriptionId());
+ return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod);
}
public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl();
- private static long getNextSubscriptionId()
- {
- return SUB_ID_GENERATOR.getAndIncrement();
- }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java Sun Jul 14 22:55:54 2013
@@ -101,11 +101,10 @@ public abstract class SubscriptionImpl i
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
@@ -152,12 +151,11 @@ public abstract class SubscriptionImpl i
public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
@@ -241,14 +239,13 @@ public abstract class SubscriptionImpl i
public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
{
public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ AMQShortString consumerTag, FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
public boolean isTransient()
@@ -268,12 +265,11 @@ public abstract class SubscriptionImpl i
public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
@@ -336,15 +332,14 @@ public abstract class SubscriptionImpl i
- public SubscriptionImpl(AMQChannel channel , AMQProtocolSession protocolSession,
+ public SubscriptionImpl(AMQChannel channel, AMQProtocolSession protocolSession,
AMQShortString consumerTag, FieldTable arguments,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- _subscriptionID = subscriptionID;
+ _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
_channel = channel;
_consumerTag = consumerTag;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Sun Jul 14 22:55:54 2013
@@ -242,6 +242,12 @@ public class Connection_1_0 implements C
}
@Override
+ public String getVirtualHostName()
+ {
+ return _vhost == null ? null : _vhost.getName();
+ }
+
+ @Override
public Port getPort()
{
return _port;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Sun Jul 14 22:55:54 2013
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.subscription;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.logging.LogActor;
@@ -29,6 +30,8 @@ import org.apache.qpid.server.queue.Queu
public interface Subscription
{
+ AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
+
LogActor getLogActor();
boolean isTransient();
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Sun Jul 14 22:55:54 2013
@@ -18,16 +18,18 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.protocol.v0_10.ServerConnection;
-import org.apache.qpid.server.protocol.v0_10.ServerSession;
-import org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.Binary;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class ServerSessionTest extends QpidTestCase
{
@@ -61,13 +63,15 @@ public class ServerSessionTest extends Q
public void testCompareTo() throws Exception
{
- ServerConnection connection = new ServerConnection(1);
+ final Broker broker = mock(Broker.class);
+ when(broker.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
+ ServerConnection connection = new ServerConnection(1, broker);
connection.setVirtualHost(_virtualHost);
ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(),
new Binary(getName().getBytes()), 0);
// create a session with the same name but on a different connection
- ServerConnection connection2 = new ServerConnection(2);
+ ServerConnection connection2 = new ServerConnection(2, broker);
connection2.setVirtualHost(_virtualHost);
ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(),
new Binary(getName().getBytes()), 0);
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Sun Jul 14 22:55:54 2013
@@ -569,5 +569,11 @@ public class MockSubscription implements
{
return false;
}
+
+ @Override
+ public String getVirtualHostName()
+ {
+ return null;
+ }
}
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java?rev=1503076&r1=1503075&r2=1503076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java Sun Jul 14 22:55:54 2013
@@ -23,6 +23,9 @@ package org.apache.qpid.server.subscript
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.protocol.v0_10.Subscription_0_10;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_10.WindowCreditManager;
import org.apache.qpid.server.logging.UnitTestMessageLogger;
@@ -41,6 +44,9 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.TestNetworkConnection;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class SubscriptionFactoryImplTest extends QpidTestCase
{
private AMQChannel _channel;
@@ -104,14 +110,17 @@ public class SubscriptionFactoryImplTest
previousId = getNoAckSub.getSubscriptionID();
//create a 0-10 subscription
- ServerConnection conn = new ServerConnection(1);
+ final Broker broker = mock(Broker.class);
+ when(broker.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
+
+ ServerConnection conn = new ServerConnection(1, broker);
ProtocolEngine_0_10 engine = new ProtocolEngine_0_10(conn, new TestNetworkConnection(), null, null);
conn.setVirtualHost(_session.getVirtualHost());
ServerSessionDelegate sesDel = new ServerSessionDelegate();
Binary name = new Binary(new byte[]{new Byte("1")});
ServerSession session = new ServerSession(conn, sesDel, name, 0);
- Subscription sub_0_10 = SubscriptionFactoryImpl.INSTANCE.createSubscription(session, "1", MessageAcceptMode.EXPLICIT,
+ Subscription sub_0_10 = new Subscription_0_10(session, "1", MessageAcceptMode.EXPLICIT,
MessageAcquireMode.PRE_ACQUIRED, MessageFlowMode.WINDOW, new WindowCreditManager(), null, null);
assertEquals("Unexpected Subscription ID allocated", previousId + 1, sub_0_10.getSubscriptionID());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org