You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/08/09 00:54:37 UTC
svn commit: r1155136 - in /qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/protocol/
broker/src/main/java/org/apache/qpid/server/transport/
broker/src/test/java/org/apache/qpid/server/proto...
Author: robbie
Date: Mon Aug 8 22:54:37 2011
New Revision: 1155136
URL: http://svn.apache.org/viewvc?rev=1155136&view=rev
Log:
QPID-3385: assign IDs from a generator within the MultiVersionProtocolEngineFactory, which is shared across all protocol versions
Added:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
Removed:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java Mon Aug 8 22:54:37 2011
@@ -45,7 +45,6 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.management.LoggingManagementMBean;
import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.protocol.AmqpProtocolVersion;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -241,7 +240,7 @@ public class Broker
IncomingNetworkTransport transport = new MinaNetworkTransport();
- transport.accept(settings, new AMQProtocolEngineFactory(), sslFactory);
+ transport.accept(settings, new MultiVersionProtocolEngineFactory(), sslFactory);
ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, sslPort),
new QpidAcceptor(transport,"TCP"));
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Mon Aug 8 22:54:37 2011
@@ -70,7 +70,7 @@ import org.apache.qpid.pool.ReferenceCou
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -98,7 +98,7 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
-public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
+public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -151,8 +151,7 @@ public class AMQProtocolEngine implement
private MethodDispatcher _dispatcher;
private ProtocolSessionIdentifier _sessionIdentifier;
- // Create a simple ID that increments for ever new Session
- private final long _sessionID = idGenerator.getAndIncrement();
+ private final long _sessionID;
private AMQPConnectionActor _actor;
private LogSubject _logSubject;
@@ -184,7 +183,7 @@ public class AMQProtocolEngine implement
return _managedObject;
}
- public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network)
+ public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_codecFactory = new AMQCodecFactory(true, this);
@@ -193,6 +192,7 @@ public class AMQProtocolEngine implement
_writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
_network = network;
_sender = _network.getSender();
+ _sessionID = connectionId;
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Mon Aug 8 22:54:37 2011
@@ -22,7 +22,7 @@ package org.apache.qpid.server.protocol;
import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.transport.ConnectionDelegate;
@@ -33,23 +33,27 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Set;
-public class MultiVersionProtocolEngine implements ProtocolEngine
+public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
+ private final long _id;
+
private Set<AmqpProtocolVersion> _supported;
private String _fqdn;
private IApplicationRegistry _appRegistry;
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
- private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+ private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
String fqdn,
Set<AmqpProtocolVersion> supported,
- NetworkConnection network)
+ NetworkConnection network,
+ long id)
{
+ _id = id;
_appRegistry = appRegistry;
_fqdn = fqdn;
_supported = supported;
@@ -102,6 +106,11 @@ public class MultiVersionProtocolEngine
_delegate.exception(t);
}
+ public long getConnectionId()
+ {
+ return _delegate.getConnectionId();
+ }
+
private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
private static final byte[] AMQP_0_8_HEADER =
@@ -126,7 +135,7 @@ public class MultiVersionProtocolEngine
(byte) 9
};
-private static final byte[] AMQP_0_9_1_HEADER =
+ private static final byte[] AMQP_0_9_1_HEADER =
new byte[] { (byte) 'A',
(byte) 'M',
(byte) 'Q',
@@ -153,7 +162,7 @@ private static final byte[] AMQP_0_9_1_H
{
AmqpProtocolVersion getVersion();
byte[] getHeaderIdentifier();
- ProtocolEngine getProtocolEngine();
+ ServerProtocolEngine getProtocolEngine();
}
private DelegateCreator creator_0_8 = new DelegateCreator()
@@ -169,9 +178,9 @@ private static final byte[] AMQP_0_9_1_H
return AMQP_0_8_HEADER;
}
- public ProtocolEngine getProtocolEngine()
+ public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
}
};
@@ -189,9 +198,9 @@ private static final byte[] AMQP_0_9_1_H
return AMQP_0_9_HEADER;
}
- public ProtocolEngine getProtocolEngine()
+ public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
}
};
@@ -209,9 +218,9 @@ private static final byte[] AMQP_0_9_1_H
return AMQP_0_9_1_HEADER;
}
- public ProtocolEngine getProtocolEngine()
+ public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
}
};
@@ -230,12 +239,12 @@ private static final byte[] AMQP_0_9_1_H
return AMQP_0_10_HEADER;
}
- public ProtocolEngine getProtocolEngine()
+ public ServerProtocolEngine getProtocolEngine()
{
final ConnectionDelegate connDelegate =
new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn);
- ServerConnection conn = new ServerConnection();
+ ServerConnection conn = new ServerConnection(_id);
conn.setConnectionDelegate(connDelegate);
return new ProtocolEngine_0_10( conn, _network, _appRegistry);
@@ -246,7 +255,7 @@ private static final byte[] AMQP_0_9_1_H
new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
- private class ClosedDelegateProtocolEngine implements ProtocolEngine
+ private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
{
public SocketAddress getRemoteAddress()
{
@@ -292,9 +301,14 @@ private static final byte[] AMQP_0_9_1_H
{
}
+
+ public long getConnectionId()
+ {
+ return _id;
+ }
}
- private class SelfDelegateProtocolEngine implements ProtocolEngine
+ private class SelfDelegateProtocolEngine implements ServerProtocolEngine
{
private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
@@ -340,7 +354,7 @@ private static final byte[] AMQP_0_9_1_H
_header.get(headerBytes);
- ProtocolEngine newDelegate = null;
+ ServerProtocolEngine newDelegate = null;
byte[] newestSupported = null;
for(int i = 0; newDelegate == null && i < _creators.length; i++)
@@ -385,6 +399,11 @@ private static final byte[] AMQP_0_9_1_H
}
+ public long getConnectionId()
+ {
+ return _id;
+ }
+
public void exception(Throwable t)
{
_logger.error("Error establishing session", t);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Mon Aug 8 22:54:37 2011
@@ -22,9 +22,10 @@ package org.apache.qpid.server.protocol;
import java.util.EnumSet;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
-import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -32,38 +33,31 @@ import org.apache.qpid.transport.network
public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
{
private static final Set<AmqpProtocolVersion> ALL_VERSIONS = EnumSet.allOf(AmqpProtocolVersion.class);
+ private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
private final IApplicationRegistry _appRegistry;
private final String _fqdn;
private final Set<AmqpProtocolVersion> _supported;
-
public MultiVersionProtocolEngineFactory()
{
- this(1, "localhost", ALL_VERSIONS);
- }
-
- public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> versions)
- {
- this(1, fqdn, versions);
+ this("localhost", ALL_VERSIONS);
}
-
public MultiVersionProtocolEngineFactory(String fqdn)
{
- this(1, fqdn, ALL_VERSIONS);
+ this(fqdn, ALL_VERSIONS);
}
- public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<AmqpProtocolVersion> supportedVersions)
+ public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions)
{
_appRegistry = ApplicationRegistry.getInstance();
_fqdn = fqdn;
_supported = supportedVersions;
}
-
- public ProtocolEngine newProtocolEngine(NetworkConnection network)
+ public ServerProtocolEngine newProtocolEngine(NetworkConnection network)
{
- return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network);
+ return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement());
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Mon Aug 8 22:54:37 2011
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
@@ -33,7 +33,7 @@ import org.apache.qpid.server.registry.I
import java.net.SocketAddress;
import java.util.UUID;
-public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine, ConnectionConfig
+public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine, ConnectionConfig
{
public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
@@ -191,4 +191,9 @@ public class ProtocolEngine_0_10 extend
{
_connection.mgmtClose();
}
+
+ public long getConnectionId()
+ {
+ return _connection.getConnectionId();
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Mon Aug 8 22:54:37 2011
@@ -64,10 +64,11 @@ public class ServerConnection extends Co
private Principal _authorizedPrincipal = null;
private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+ private final long _connectionId;
- public ServerConnection()
+ public ServerConnection(final long connectionId)
{
-
+ _connectionId = connectionId;
}
public UUID getId()
@@ -379,4 +380,9 @@ public class ServerConnection extends Co
{
return _authorizedPrincipal;
}
+
+ public long getConnectionId()
+ {
+ return _connectionId;
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Mon Aug 8 22:54:37 2011
@@ -161,6 +161,26 @@ public class ServerConnectionDelegate ex
}
}
+
+ @Override
+ public void connectionTuneOk(final Connection conn, final ConnectionTuneOk ok)
+ {
+ ServerConnection sconn = (ServerConnection) conn;
+ int okChannelMax = ok.getChannelMax();
+
+ if (okChannelMax > getChannelMax())
+ {
+ _logger.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
+ "client connectionTuneOk returned a channelMax (" + okChannelMax +
+ ") above the servers offered limit (" + getChannelMax() +")");
+
+ //Due to the error we must forcefully close the connection without negotiation
+ sconn.getSender().close();
+ return;
+ }
+
+ setConnectionTuneOkChannelMax(sconn, okChannelMax);
+ }
@Override
protected int getHeartbeatMax()
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Mon Aug 8 22:54:37 2011
@@ -670,7 +670,7 @@ public class ServerSession extends Sessi
{
return "[" +
MessageFormat.format(CHANNEL_FORMAT,
- getConnection().getConnectionId(),
+ ((ServerConnection) getConnection()).getConnectionId(),
getClientID(),
((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(),
getVirtualHost().getName(),
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Mon Aug 8 22:54:37 2011
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol;
-import java.security.Principal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -28,6 +27,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
@@ -43,7 +43,6 @@ import org.apache.qpid.server.output.Pro
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
-import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TestNetworkConnection;
@@ -52,10 +51,11 @@ public class InternalTestProtocolSession
// ChannelID(LIST) -> LinkedList<Pair>
final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
private AtomicInteger _deliveryCount = new AtomicInteger(0);
+ private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException
{
- super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkConnection());
+ super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkConnection(), ID_GENERATOR.getAndIncrement());
_channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java?rev=1155136&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java (added)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java Mon Aug 8 22:54:37 2011
@@ -0,0 +1,146 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+import java.util.Set;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.TestNetworkConnection;
+
+public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
+{
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ //the factory needs a registry instance
+ ApplicationRegistry.initialise(new TestApplicationRegistry(new ServerConfiguration(new XMLConfiguration())));
+ }
+
+ protected void tearDown()
+ {
+ //the factory opens a registry instance
+ ApplicationRegistry.remove();
+ }
+
+ private static final byte[] AMQP_0_8_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 1,
+ (byte) 1,
+ (byte) 8,
+ (byte) 0
+ };
+
+ private static final byte[] AMQP_0_9_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 1,
+ (byte) 1,
+ (byte) 0,
+ (byte) 9
+ };
+
+ private static final byte[] AMQP_0_9_1_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 0,
+ (byte) 0,
+ (byte) 9,
+ (byte) 1
+ };
+
+
+ private static final byte[] AMQP_0_10_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 1,
+ (byte) 1,
+ (byte) 0,
+ (byte) 10
+ };
+
+ private byte[] getAmqpHeader(final AmqpProtocolVersion version)
+ {
+ switch(version)
+ {
+ case v0_8:
+ return AMQP_0_8_HEADER;
+ case v0_9:
+ return AMQP_0_9_HEADER;
+ case v0_9_1:
+ return AMQP_0_9_1_HEADER;
+ case v0_10:
+ return AMQP_0_10_HEADER;
+ default:
+ fail("unknown AMQP version, appropriate header must be added for new protocol version");
+ return null;
+ }
+ }
+
+ /**
+ * Test to verify that connections established using a MultiVersionProtocolEngine are assigned
+ * IDs from a common sequence, independent of the protocol version under use.
+ */
+ public void testDifferentProtocolVersionsShareCommonIDNumberingSequence()
+ {
+ Set<AmqpProtocolVersion> versions = EnumSet.allOf(AmqpProtocolVersion.class);
+
+ MultiVersionProtocolEngineFactory factory =
+ new MultiVersionProtocolEngineFactory("localhost", versions);
+
+ //create a dummy to retrieve the 'current' ID number
+ long previousId = factory.newProtocolEngine(new TestNetworkConnection()).getConnectionId();
+
+ //create a protocol engine and send the AMQP header for all supported AMQP verisons,
+ //ensuring the ID assigned increases as expected
+ for(AmqpProtocolVersion version : versions)
+ {
+ long expectedID = previousId + 1;
+ byte[] header = getAmqpHeader(version);
+ assertNotNull("protocol header should not be null", header);
+
+ ServerProtocolEngine engine = factory.newProtocolEngine(new TestNetworkConnection());
+ assertEquals("ID did not increment as expected", expectedID, engine.getConnectionId());
+
+ //actually feed in the AMQP header for this protocol version, and ensure the ID remains consistent
+ engine.received(ByteBuffer.wrap(header));
+ assertEquals("ID was not as expected following receipt of the AMQP version header", expectedID, engine.getConnectionId());
+
+ previousId = expectedID;
+ }
+ }
+}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java Mon Aug 8 22:54:37 2011
@@ -27,6 +27,7 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.logging.NullRootMessageLogger;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabase;
@@ -45,6 +46,7 @@ public class TestApplicationRegistry ext
public void initialise() throws Exception
{
CurrentActor.setDefault(new BrokerActor(new NullRootMessageLogger()));
+ GenericActor.setDefaultMessageLogger(new NullRootMessageLogger());
super.initialise();
}
Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java?rev=1155136&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java Mon Aug 8 22:54:37 2011
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+public interface ServerProtocolEngine extends ProtocolEngine
+{
+ /**
+ * Gets the connection ID associated with this ProtocolEngine
+ */
+ long getConnectionId();
+}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Mon Aug 8 22:54:37 2011
@@ -126,8 +126,6 @@ public class Connection extends Connecti
private SecurityLayer securityLayer;
private String _clientId;
- private static final AtomicLong idGenerator = new AtomicLong(0);
- private final long _connectionId = idGenerator.incrementAndGet();
private final AtomicBoolean connectionLost = new AtomicBoolean(false);
public Connection() {}
@@ -360,11 +358,6 @@ public class Connection extends Connecti
_sessionFactory = sessionFactory;
}
- public long getConnectionId()
- {
- return _connectionId;
- }
-
public ConnectionDelegate getConnectionDelegate()
{
return delegate;
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Mon Aug 8 22:54:37 2011
@@ -170,22 +170,7 @@ public class ServerDelegate extends Conn
@Override
public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
{
- int okChannelMax = ok.getChannelMax();
-
- if (okChannelMax > getChannelMax())
- {
- _logger.error("Connection '" + conn.getConnectionId() + "' being severed, " +
- "client connectionTuneOk returned a channelMax (" + okChannelMax +
- ") above the servers offered limit (" + getChannelMax() +")");
-
- //Due to the error we must forcefully close the connection without negotiation
- conn.getSender().close();
- return;
- }
- //0 means no implied limit, except available server resources
- //(or that forced by protocol limitations [0xFFFF])
- conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
}
@Override
@@ -215,4 +200,11 @@ public class ServerDelegate extends Conn
ssn.sessionAttached(atc.getName());
ssn.setState(Session.State.OPEN);
}
+
+ protected void setConnectionTuneOkChannelMax(final Connection conn, final int okChannelMax)
+ {
+ //0 means no implied limit, except available server resources
+ //(or that forced by protocol limitations [0xFFFF])
+ conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
+ }
}
Modified: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java (original)
+++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java Mon Aug 8 22:54:37 2011
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.NetworkTransportConfiguration;
@@ -333,7 +334,7 @@ public class MinaNetworkHandlerTest exte
}
}
- public class CountingProtocolEngine implements ProtocolEngine
+ public class CountingProtocolEngine implements ServerProtocolEngine
{
public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>();
private int _readBytes;
@@ -447,6 +448,11 @@ public class MinaNetworkHandlerTest exte
return _closed;
}
+ public long getConnectionId()
+ {
+ return -1;
+ }
+
}
private class EchoProtocolEngine extends CountingProtocolEngine
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org