You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/08/09 00:54:37 UTC

svn commit: r1155136 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/server/transport/ broker/src/test/java/org/apache/qpid/server/proto...

Author: robbie
Date: Mon Aug  8 22:54:37 2011
New Revision: 1155136

URL: http://svn.apache.org/viewvc?rev=1155136&view=rev
Log:
QPID-3385: assign IDs from a generator within the MultiVersionProtocolEngineFactory, which is shared across all protocol versions

Added:
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
Removed:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java Mon Aug  8 22:54:37 2011
@@ -45,7 +45,6 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.actors.GenericActor;
 import org.apache.qpid.server.logging.management.LoggingManagementMBean;
 import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
 import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.server.protocol.AmqpProtocolVersion;
 import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -241,7 +240,7 @@ public class Broker
 
                     IncomingNetworkTransport transport = new MinaNetworkTransport();
 
-                    transport.accept(settings, new AMQProtocolEngineFactory(), sslFactory);
+                    transport.accept(settings, new MultiVersionProtocolEngineFactory(), sslFactory);
 
                     ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, sslPort),
                             new QpidAcceptor(transport,"TCP"));

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Mon Aug  8 22:54:37 2011
@@ -70,7 +70,7 @@ import org.apache.qpid.pool.ReferenceCou
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -98,7 +98,7 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.NetworkConnection;
 
-public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
+public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
 
@@ -151,8 +151,7 @@ public class AMQProtocolEngine implement
     private MethodDispatcher _dispatcher;
     private ProtocolSessionIdentifier _sessionIdentifier;
 
-    // Create a simple ID that increments for ever new Session
-    private final long _sessionID = idGenerator.getAndIncrement();
+    private final long _sessionID;
 
     private AMQPConnectionActor _actor;
     private LogSubject _logSubject;
@@ -184,7 +183,7 @@ public class AMQProtocolEngine implement
         return _managedObject;
     }
 
-    public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network)
+    public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
     {
         _stateManager = new AMQStateManager(virtualHostRegistry, this);
         _codecFactory = new AMQCodecFactory(true, this);
@@ -193,6 +192,7 @@ public class AMQProtocolEngine implement
         _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
         _network = network;
         _sender = _network.getSender();
+        _sessionID = connectionId;
 
         _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Mon Aug  8 22:54:37 2011
@@ -22,7 +22,7 @@ package org.apache.qpid.server.protocol;
 
 
 import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.transport.ServerConnection;
 import org.apache.qpid.transport.ConnectionDelegate;
@@ -33,23 +33,27 @@ import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Set;
 
-public class MultiVersionProtocolEngine implements ProtocolEngine
+public class MultiVersionProtocolEngine implements ServerProtocolEngine
 {
     private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
 
+    private final long _id;
+
     private Set<AmqpProtocolVersion> _supported;
     private String _fqdn;
     private IApplicationRegistry _appRegistry;
     private NetworkConnection _network;
     private Sender<ByteBuffer> _sender;
     
-    private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+    private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
 
     public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
                                       String fqdn,
                                       Set<AmqpProtocolVersion> supported,
-                                      NetworkConnection network)
+                                      NetworkConnection network,
+                                      long id)
     {
+        _id = id;
         _appRegistry = appRegistry;
         _fqdn = fqdn;
         _supported = supported;
@@ -102,6 +106,11 @@ public class MultiVersionProtocolEngine 
         _delegate.exception(t);
     }
 
+    public long getConnectionId()
+    {
+        return _delegate.getConnectionId();
+    }
+
     private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
 
     private static final byte[] AMQP_0_8_HEADER =
@@ -126,7 +135,7 @@ public class MultiVersionProtocolEngine 
                          (byte) 9
             };
 
-private static final byte[] AMQP_0_9_1_HEADER =
+    private static final byte[] AMQP_0_9_1_HEADER =
             new byte[] { (byte) 'A',
                          (byte) 'M',
                          (byte) 'Q',
@@ -153,7 +162,7 @@ private static final byte[] AMQP_0_9_1_H
     {
         AmqpProtocolVersion getVersion();
         byte[] getHeaderIdentifier();
-        ProtocolEngine getProtocolEngine();
+        ServerProtocolEngine getProtocolEngine();
     }
 
     private DelegateCreator creator_0_8 = new DelegateCreator()
@@ -169,9 +178,9 @@ private static final byte[] AMQP_0_9_1_H
             return AMQP_0_8_HEADER;
         }
 
-        public ProtocolEngine getProtocolEngine()
+        public ServerProtocolEngine getProtocolEngine()
         {
-            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
+            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
         }
     };
 
@@ -189,9 +198,9 @@ private static final byte[] AMQP_0_9_1_H
             return AMQP_0_9_HEADER;
         }
 
-        public ProtocolEngine getProtocolEngine()
+        public ServerProtocolEngine getProtocolEngine()
         {
-            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
+            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
         }
     };
 
@@ -209,9 +218,9 @@ private static final byte[] AMQP_0_9_1_H
             return AMQP_0_9_1_HEADER;
         }
 
-        public ProtocolEngine getProtocolEngine()
+        public ServerProtocolEngine getProtocolEngine()
         {
-            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
+            return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
         }
     };
 
@@ -230,12 +239,12 @@ private static final byte[] AMQP_0_9_1_H
             return AMQP_0_10_HEADER;
         }
 
-        public ProtocolEngine getProtocolEngine()
+        public ServerProtocolEngine getProtocolEngine()
         {
             final ConnectionDelegate connDelegate =
                     new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn);
 
-            ServerConnection conn = new ServerConnection();
+            ServerConnection conn = new ServerConnection(_id);
             conn.setConnectionDelegate(connDelegate);
 
             return new ProtocolEngine_0_10( conn, _network, _appRegistry);
@@ -246,7 +255,7 @@ private static final byte[] AMQP_0_9_1_H
             new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
 
 
-    private class ClosedDelegateProtocolEngine implements ProtocolEngine
+    private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
     {
         public SocketAddress getRemoteAddress()
         {
@@ -292,9 +301,14 @@ private static final byte[] AMQP_0_9_1_H
         {
 
         }
+
+        public long getConnectionId()
+        {
+            return _id;
+        }
     }
 
-    private class SelfDelegateProtocolEngine implements ProtocolEngine
+    private class SelfDelegateProtocolEngine implements ServerProtocolEngine
     {
         private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
 
@@ -340,7 +354,7 @@ private static final byte[] AMQP_0_9_1_H
                 _header.get(headerBytes);
 
 
-                ProtocolEngine newDelegate = null;
+                ServerProtocolEngine newDelegate = null;
                 byte[] newestSupported = null;
 
                 for(int i = 0; newDelegate == null && i < _creators.length; i++)
@@ -385,6 +399,11 @@ private static final byte[] AMQP_0_9_1_H
 
         }
 
+        public long getConnectionId()
+        {
+            return _id;
+        }
+
         public void exception(Throwable t)
         {
             _logger.error("Error establishing session", t);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Mon Aug  8 22:54:37 2011
@@ -22,9 +22,10 @@ package org.apache.qpid.server.protocol;
 
 import java.util.EnumSet;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.transport.network.NetworkConnection;
@@ -32,38 +33,31 @@ import org.apache.qpid.transport.network
 public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
 {
     private static final Set<AmqpProtocolVersion> ALL_VERSIONS = EnumSet.allOf(AmqpProtocolVersion.class);
+    private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
 
     private final IApplicationRegistry _appRegistry;
     private final String _fqdn;
     private final Set<AmqpProtocolVersion> _supported;
 
-
     public MultiVersionProtocolEngineFactory()
     {
-        this(1, "localhost", ALL_VERSIONS);
-    }
-
-    public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> versions)
-    {
-        this(1, fqdn, versions);
+        this("localhost", ALL_VERSIONS);
     }
 
-
     public MultiVersionProtocolEngineFactory(String fqdn)
     {
-        this(1, fqdn, ALL_VERSIONS);
+        this(fqdn, ALL_VERSIONS);
     }
 
-    public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<AmqpProtocolVersion> supportedVersions)
+    public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions)
     {
         _appRegistry = ApplicationRegistry.getInstance();
         _fqdn = fqdn;
         _supported = supportedVersions;
     }
 
-
-    public ProtocolEngine newProtocolEngine(NetworkConnection network)
+    public ServerProtocolEngine newProtocolEngine(NetworkConnection network)
     {
-        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network);
+        return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement());
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Mon Aug  8 22:54:37 2011
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.server.protocol;
 
-import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.transport.network.InputHandler;
 import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.Disassembler;
@@ -33,7 +33,7 @@ import org.apache.qpid.server.registry.I
 import java.net.SocketAddress;
 import java.util.UUID;
 
-public class ProtocolEngine_0_10  extends InputHandler implements ProtocolEngine, ConnectionConfig
+public class ProtocolEngine_0_10  extends InputHandler implements ServerProtocolEngine, ConnectionConfig
 {
     public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
 
@@ -191,4 +191,9 @@ public class ProtocolEngine_0_10  extend
     {
         _connection.mgmtClose();
     }
+
+    public long getConnectionId()
+    {
+        return _connection.getConnectionId();
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Mon Aug  8 22:54:37 2011
@@ -64,10 +64,11 @@ public class ServerConnection extends Co
     private Principal _authorizedPrincipal = null;
     private boolean _statisticsEnabled = false;
     private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+    private final long _connectionId;
     
-    public ServerConnection()
+    public ServerConnection(final long connectionId)
     {
-
+        _connectionId = connectionId;
     }
 
     public UUID getId()
@@ -379,4 +380,9 @@ public class ServerConnection extends Co
     {
         return _authorizedPrincipal;
     }
+
+    public long getConnectionId()
+    {
+        return _connectionId;
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Mon Aug  8 22:54:37 2011
@@ -161,6 +161,26 @@ public class ServerConnectionDelegate ex
         }
         
     }
+
+    @Override
+    public void connectionTuneOk(final Connection conn, final ConnectionTuneOk ok)
+    {
+        ServerConnection sconn = (ServerConnection) conn;
+        int okChannelMax = ok.getChannelMax();
+
+        if (okChannelMax > getChannelMax())
+        {
+            _logger.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
+                    "client connectionTuneOk returned a channelMax (" + okChannelMax +
+                    ") above the servers offered limit (" + getChannelMax() +")");
+
+            //Due to the error we must forcefully close the connection without negotiation
+            sconn.getSender().close();
+            return;
+        }
+
+        setConnectionTuneOkChannelMax(sconn, okChannelMax);
+    }
     
     @Override
     protected int getHeartbeatMax()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Mon Aug  8 22:54:37 2011
@@ -670,7 +670,7 @@ public class ServerSession extends Sessi
     {
        return "[" +
                MessageFormat.format(CHANNEL_FORMAT,
-                                   getConnection().getConnectionId(),
+                                   ((ServerConnection) getConnection()).getConnectionId(),
                                    getClientID(),
                                    ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(),
                                    getVirtualHost().getName(),

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Mon Aug  8 22:54:37 2011
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.protocol;
 
-import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,6 +27,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.security.auth.Subject;
 
@@ -43,7 +43,6 @@ import org.apache.qpid.server.output.Pro
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
-import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.TestNetworkConnection;
 
@@ -52,10 +51,11 @@ public class InternalTestProtocolSession
     // ChannelID(LIST)  -> LinkedList<Pair>
     final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
     private AtomicInteger _deliveryCount = new AtomicInteger(0);
+    private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
 
     public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException
     {
-        super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkConnection());
+        super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkConnection(), ID_GENERATOR.getAndIncrement());
 
         _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
 

Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java?rev=1155136&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java (added)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java Mon Aug  8 22:54:37 2011
@@ -0,0 +1,146 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+import java.util.Set;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.TestNetworkConnection;
+
+public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
+{
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+
+        //the factory needs a registry instance
+        ApplicationRegistry.initialise(new TestApplicationRegistry(new ServerConfiguration(new XMLConfiguration())));
+    }
+
+    protected void tearDown()
+    {
+        //the factory opens a registry instance
+        ApplicationRegistry.remove();
+    }
+    
+    private static final byte[] AMQP_0_8_HEADER =
+        new byte[] { (byte) 'A',
+                     (byte) 'M',
+                     (byte) 'Q',
+                     (byte) 'P',
+                     (byte) 1,
+                     (byte) 1,
+                     (byte) 8,
+                     (byte) 0
+        };
+
+    private static final byte[] AMQP_0_9_HEADER =
+        new byte[] { (byte) 'A',
+                     (byte) 'M',
+                     (byte) 'Q',
+                     (byte) 'P',
+                     (byte) 1,
+                     (byte) 1,
+                     (byte) 0,
+                     (byte) 9
+        };
+
+    private static final byte[] AMQP_0_9_1_HEADER =
+        new byte[] { (byte) 'A',
+                     (byte) 'M',
+                     (byte) 'Q',
+                     (byte) 'P',
+                     (byte) 0,
+                     (byte) 0,
+                     (byte) 9,
+                     (byte) 1
+        };
+
+
+    private static final byte[] AMQP_0_10_HEADER =
+        new byte[] { (byte) 'A',
+                     (byte) 'M',
+                     (byte) 'Q',
+                     (byte) 'P',
+                     (byte) 1,
+                     (byte) 1,
+                     (byte) 0,
+                     (byte) 10
+        };
+    
+    private byte[] getAmqpHeader(final AmqpProtocolVersion version)
+    {
+        switch(version)
+        {
+            case v0_8:
+                return AMQP_0_8_HEADER;
+            case v0_9:
+                return AMQP_0_9_HEADER;
+            case v0_9_1:
+                return AMQP_0_9_1_HEADER;
+            case v0_10:
+                return AMQP_0_10_HEADER;
+            default:
+                fail("unknown AMQP version, appropriate header must be added for new protocol version");
+                return null;
+        }
+    }
+
+    /**
+     * Test to verify that connections established using a MultiVersionProtocolEngine are assigned
+     * IDs from a common sequence, independent of the protocol version under use.
+     */
+    public void testDifferentProtocolVersionsShareCommonIDNumberingSequence()
+    {
+        Set<AmqpProtocolVersion> versions = EnumSet.allOf(AmqpProtocolVersion.class);
+
+        MultiVersionProtocolEngineFactory factory =
+            new MultiVersionProtocolEngineFactory("localhost", versions);
+
+        //create a dummy to retrieve the 'current' ID number
+        long previousId = factory.newProtocolEngine(new TestNetworkConnection()).getConnectionId();
+
+        //create a protocol engine and send the AMQP header for all supported AMQP verisons,
+        //ensuring the ID assigned increases as expected
+        for(AmqpProtocolVersion version : versions)
+        {
+            long expectedID = previousId + 1;
+            byte[] header = getAmqpHeader(version);
+            assertNotNull("protocol header should not be null", header);
+
+            ServerProtocolEngine engine = factory.newProtocolEngine(new TestNetworkConnection());
+            assertEquals("ID did not increment as expected", expectedID, engine.getConnectionId());
+
+            //actually feed in the AMQP header for this protocol version, and ensure the ID remains consistent
+            engine.received(ByteBuffer.wrap(header));
+            assertEquals("ID was not as expected following receipt of the AMQP version header", expectedID, engine.getConnectionId());
+
+            previousId = expectedID;
+        }
+    }
+}

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java Mon Aug  8 22:54:37 2011
@@ -27,6 +27,7 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.logging.NullRootMessageLogger;
 import org.apache.qpid.server.logging.actors.BrokerActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
 import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabase;
@@ -45,6 +46,7 @@ public class TestApplicationRegistry ext
     public void initialise() throws Exception
     {
         CurrentActor.setDefault(new BrokerActor(new NullRootMessageLogger()));
+        GenericActor.setDefaultMessageLogger(new NullRootMessageLogger());
         super.initialise();
     }
 

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java?rev=1155136&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java Mon Aug  8 22:54:37 2011
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+public interface ServerProtocolEngine extends ProtocolEngine
+{
+    /**
+     * Gets the connection ID associated with this ProtocolEngine
+     */
+    long getConnectionId();
+}

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Mon Aug  8 22:54:37 2011
@@ -126,8 +126,6 @@ public class Connection extends Connecti
     private SecurityLayer securityLayer;
     private String _clientId;
     
-    private static final AtomicLong idGenerator = new AtomicLong(0);
-    private final long _connectionId = idGenerator.incrementAndGet();
     private final AtomicBoolean connectionLost = new AtomicBoolean(false);
     
     public Connection() {}
@@ -360,11 +358,6 @@ public class Connection extends Connecti
         _sessionFactory = sessionFactory;
     }
 
-    public long getConnectionId()
-    {
-        return _connectionId;
-    }
-
     public ConnectionDelegate getConnectionDelegate()
     {
         return delegate;

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Mon Aug  8 22:54:37 2011
@@ -170,22 +170,7 @@ public class ServerDelegate extends Conn
     @Override
     public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
     {
-        int okChannelMax = ok.getChannelMax();
-        
-        if (okChannelMax > getChannelMax())
-        {
-            _logger.error("Connection '" + conn.getConnectionId() + "' being severed, " +
-                    "client connectionTuneOk returned a channelMax (" + okChannelMax +
-                    ") above the servers offered limit (" + getChannelMax() +")");
-
-            //Due to the error we must forcefully close the connection without negotiation
-            conn.getSender().close();
-            return;
-        }
 
-        //0 means no implied limit, except available server resources
-        //(or that forced by protocol limitations [0xFFFF])
-        conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
     }
 
     @Override
@@ -215,4 +200,11 @@ public class ServerDelegate extends Conn
         ssn.sessionAttached(atc.getName());
         ssn.setState(Session.State.OPEN);
     }
+
+    protected void setConnectionTuneOkChannelMax(final Connection conn, final int okChannelMax)
+    {
+        //0 means no implied limit, except available server resources
+        //(or that forced by protocol limitations [0xFFFF])
+        conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
+    }
 }

Modified: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java?rev=1155136&r1=1155135&r2=1155136&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java (original)
+++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java Mon Aug  8 22:54:37 2011
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.NetworkTransportConfiguration;
@@ -333,7 +334,7 @@ public class MinaNetworkHandlerTest exte
         }
     }
     
-    public class CountingProtocolEngine implements ProtocolEngine
+    public class CountingProtocolEngine implements ServerProtocolEngine
     {
         public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>();
         private int _readBytes;
@@ -447,6 +448,11 @@ public class MinaNetworkHandlerTest exte
             return _closed;
         }
 
+        public long getConnectionId()
+        {
+            return -1;
+        }
+
     }
 
     private class EchoProtocolEngine extends CountingProtocolEngine



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org