You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/12 17:43:41 UTC

svn commit: r1631192 - in /qpid/branches/QPID-6125-ProtocolRefactoring/java: broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ commo...

Author: rgodfrey
Date: Sun Oct 12 15:43:40 2014
New Revision: 1631192

URL: http://svn.apache.org/r1631192
Log:
Move connection methods into AMQProtocolEngine

Added:
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java   (with props)
Modified:
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1631192&r1=1631191&r2=1631192&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sun Oct 12 15:43:40 2014
@@ -202,7 +202,6 @@ public class AMQChannel
 
 
     public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
-            throws AMQException
     {
         _connection = connection;
         _channelId = channelId;

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1631192&r1=1631191&r2=1631192&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Sun Oct 12 15:43:40 2014
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
+import java.security.AccessControlException;
 import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
@@ -43,6 +44,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.security.auth.Subject;
+import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
 import org.apache.log4j.Logger;
@@ -70,12 +72,15 @@ import org.apache.qpid.server.message.In
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.SessionModelListener;
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -88,7 +93,8 @@ import org.apache.qpid.util.BytesDataOut
 
 public class AMQProtocolEngine implements ServerProtocolEngine,
                                           AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
-                                          AMQVersionAwareProtocolSession
+                                          AMQVersionAwareProtocolSession,
+                                          ConnectionMethodProcessor
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
 
@@ -836,38 +842,17 @@ public class AMQProtocolEngine implement
         return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
     }
 
-    public void addChannel(AMQChannel channel) throws AMQException
+    public void addChannel(AMQChannel channel)
     {
-        if (_closed)
-        {
-            throw new AMQException("Session is closed");
-        }
-
         final int channelId = channel.getChannelId();
 
-        if (_closingChannelsList.containsKey(channelId))
-        {
-            throw new AMQException("Session is marked awaiting channel close");
-        }
-
-        if (_channelMap.size() == _maxNoOfChannels)
-        {
-            String errorMessage =
-                    toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
-                    + "); can't create channel";
-            _logger.error(errorMessage);
-            throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
-        }
-        else
+        synchronized (_channelMap)
         {
-            synchronized (_channelMap)
+            _channelMap.put(channel.getChannelId(), channel);
+            sessionAdded(channel);
+            if(_blocking)
             {
-                _channelMap.put(channel.getChannelId(), channel);
-                sessionAdded(channel);
-                if(_blocking)
-                {
-                    channel.block();
-                }
+                channel.block();
             }
         }
 
@@ -893,7 +878,7 @@ public class AMQProtocolEngine implement
         }
     }
 
-    public Long getMaximumNumberOfChannels()
+    public long getMaximumNumberOfChannels()
     {
         return _maxNoOfChannels;
     }
@@ -1269,7 +1254,7 @@ public class AMQProtocolEngine implement
         return _virtualHost;
     }
 
-    public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException
+    public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost)
     {
         _virtualHost = virtualHost;
 
@@ -1676,6 +1661,336 @@ public class AMQProtocolEngine implement
         _deferFlush = deferFlush;
     }
 
+    @Override
+    public void receiveChannelOpen(final int channelId)
+    {
+        // Protect the broker against out of order frame request.
+        if (_virtualHost == null)
+        {
+            closeConnection(AMQConstant.COMMAND_INVALID,
+                            "Virtualhost has not yet been set. ConnectionOpen has not been called.", channelId);
+        }
+        else if(getChannel(channelId) != null || channelAwaitingClosure(channelId))
+        {
+            closeConnection(AMQConstant.CHANNEL_ERROR, "Channel " + channelId + " already exists", channelId);
+        }
+        else if(channelId > getMaximumNumberOfChannels())
+        {
+            closeConnection(AMQConstant.CHANNEL_ERROR,
+                            "Channel " + channelId + " cannot be created as the max allowed channel id is "
+                            + getMaximumNumberOfChannels(),
+                            channelId);
+        }
+        else
+        {
+            _logger.info("Connecting to: " + _virtualHost.getName());
+
+            final AMQChannel channel = new AMQChannel(this, channelId, _virtualHost.getMessageStore());
+
+            addChannel(channel);
+
+            ChannelOpenOkBody response;
+
+
+            response = getMethodRegistry().createChannelOpenOkBody();
+
+
+            writeFrame(response.generateFrame(channelId));
+        }
+    }
+
+    @Override
+    public void receiveConnectionOpen(AMQShortString virtualHostName,
+                                      AMQShortString capabilities,
+                                      boolean insist)
+    {
+        String virtualHostStr;
+        if ((virtualHostName != null) && virtualHostName.charAt(0) == '/')
+        {
+            virtualHostStr = virtualHostName.toString().substring(1);
+        }
+        else
+        {
+            virtualHostStr = virtualHostName == null ? null : virtualHostName.toString();
+        }
+
+        VirtualHostImpl virtualHost = ((AmqpPort)getPort()).getVirtualHost(virtualHostStr);
+
+        if (virtualHost == null)
+        {
+            closeConnection(AMQConstant.NOT_FOUND,
+                            "Unknown virtual host: '" + virtualHostName + "'",0);
+
+        }
+        else
+        {
+            // Check virtualhost access
+            if (virtualHost.getState() != State.ACTIVE)
+            {
+                closeConnection(AMQConstant.CONNECTION_FORCED,
+                                "Virtual host '" + virtualHost.getName() + "' is not active",0);
+
+            }
+            else
+            {
+                setVirtualHost(virtualHost);
+                try
+                {
+                    virtualHost.getSecurityManager().authoriseCreateConnection(this);
+                    if (getContextKey() == null)
+                    {
+                        setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis())));
+                    }
+
+                    MethodRegistry methodRegistry = getMethodRegistry();
+                    AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(virtualHostName);
+
+                    writeFrame(responseBody.generateFrame(0));
+                }
+                catch (AccessControlException e)
+                {
+                    closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(),0);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void receiveConnectionClose(final int replyCode,
+                                       final AMQShortString replyText,
+                                       final int classId,
+                                       final int methodId)
+    {
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("ConnectionClose received with reply code/reply text " + replyCode + "/" +
+                         replyText + " for " + this);
+        }
+        try
+        {
+            closeSession();
+        }
+        catch (Exception e)
+        {
+            _logger.error("Error closing protocol session: " + e, e);
+        }
+
+        MethodRegistry methodRegistry = getMethodRegistry();
+        ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
+        writeFrame(responseBody.generateFrame(0));
+
+        closeProtocolSession();
+
+    }
+
+    @Override
+    public void receiveConnectionCloseOk()
+    {
+
+        _logger.info("Received Connection-close-ok");
+
+        try
+        {
+            closeSession();
+        }
+        catch (Exception e)
+        {
+            _logger.error("Error closing protocol session: " + e, e);
+        }
+    }
+
+    @Override
+    public void receiveConnectionSecureOk(final byte[] response)
+    {
+
+        Broker<?> broker = getBroker();
+
+        SubjectCreator subjectCreator = getSubjectCreator();
+
+        SaslServer ss = getSaslServer();
+        if (ss == null)
+        {
+            closeConnection(AMQConstant.INTERNAL_ERROR, "No SASL context set up in session",0 );
+        }
+        MethodRegistry methodRegistry = getMethodRegistry();
+        SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response);
+        switch (authResult.getStatus())
+        {
+            case ERROR:
+                Exception cause = authResult.getCause();
+
+                _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+                closeConnection(AMQConstant.NOT_ALLOWED, "Authentication failed",0);
+
+                disposeSaslServer();
+                break;
+            case SUCCESS:
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Connected as: " + authResult.getSubject());
+                }
+
+                int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+
+                if (frameMax <= 0)
+                {
+                    frameMax = Integer.MAX_VALUE;
+                }
+
+                ConnectionTuneBody tuneBody =
+                        methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
+                                                                frameMax,
+                                                                broker.getConnection_heartBeatDelay());
+                writeFrame(tuneBody.generateFrame(0));
+                setAuthorizedSubject(authResult.getSubject());
+                disposeSaslServer();
+                break;
+            case CONTINUE:
+
+                ConnectionSecureBody
+                        secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+                writeFrame(secureBody.generateFrame(0));
+        }
+    }
+
+
+    private void disposeSaslServer()
+    {
+        SaslServer ss = getSaslServer();
+        if (ss != null)
+        {
+            setSaslServer(null);
+            try
+            {
+                ss.dispose();
+            }
+            catch (SaslException e)
+            {
+                _logger.error("Error disposing of Sasl server: " + e);
+            }
+        }
+    }
+
+    @Override
+    public void receiveConnectionStartOk(final FieldTable clientProperties,
+                                         final AMQShortString mechanism,
+                                         final byte[] response,
+                                         final AMQShortString locale)
+    {
+        Broker<?> broker = getBroker();
+
+        _logger.info("SASL Mechanism selected: " + mechanism);
+        _logger.info("Locale selected: " + locale);
+
+        SubjectCreator subjectCreator = getSubjectCreator();
+        SaslServer ss = null;
+        try
+        {
+            ss = subjectCreator.createSaslServer(String.valueOf(mechanism),
+                                                 getLocalFQDN(),
+                                                 getPeerPrincipal());
+
+            if (ss == null)
+            {
+                closeConnection(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + mechanism, 0);
+
+            }
+            else
+            {
+                //save clientProperties
+                setClientProperties(clientProperties);
+
+                setSaslServer(ss);
+
+                final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response);
+
+                MethodRegistry methodRegistry = getMethodRegistry();
+
+                switch (authResult.getStatus())
+                {
+                    case ERROR:
+                        Exception cause = authResult.getCause();
+
+                        _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+                        closeConnection(AMQConstant.NOT_ALLOWED, "Authentication failed", 0);
+
+                        disposeSaslServer();
+                        break;
+
+                    case SUCCESS:
+                        if (_logger.isInfoEnabled())
+                        {
+                            _logger.info("Connected as: " + authResult.getSubject());
+                        }
+                        setAuthorizedSubject(authResult.getSubject());
+
+                        int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+
+                        if (frameMax <= 0)
+                        {
+                            frameMax = Integer.MAX_VALUE;
+                        }
+
+                        ConnectionTuneBody
+                                tuneBody =
+                                methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
+                                                                        frameMax,
+                                                                        broker.getConnection_heartBeatDelay());
+                        writeFrame(tuneBody.generateFrame(0));
+                        break;
+                    case CONTINUE:
+                        ConnectionSecureBody
+                                secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+                        writeFrame(secureBody.generateFrame(0));
+                }
+            }
+        }
+        catch (SaslException e)
+        {
+            disposeSaslServer();
+            closeConnection(AMQConstant.INTERNAL_ERROR, "SASL error: " + e, 0);
+        }
+    }
+
+    @Override
+    public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat)
+    {
+        initHeartbeats(heartbeat);
+
+        int brokerFrameMax = getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+        if (brokerFrameMax <= 0)
+        {
+            brokerFrameMax = Integer.MAX_VALUE;
+        }
+
+        if (frameMax > (long) brokerFrameMax)
+        {
+            closeConnection(AMQConstant.SYNTAX_ERROR,
+                            "Attempt to set max frame size to " + frameMax
+                            + " greater than the broker will allow: "
+                            + brokerFrameMax, 0);
+        }
+        else if (frameMax > 0 && frameMax < AMQConstant.FRAME_MIN_SIZE.getCode())
+        {
+            closeConnection(AMQConstant.SYNTAX_ERROR,
+                            "Attempt to set max frame size to " + frameMax
+                            + " which is smaller than the specification defined minimum: "
+                            + AMQConstant.FRAME_MIN_SIZE.getCode(), 0);
+        }
+        else
+        {
+            int calculatedFrameMax = frameMax == 0 ? brokerFrameMax : (int) frameMax;
+            setMaxFrameSize(calculatedFrameMax);
+
+            //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
+            setMaximumNumberOfChannels( ((channelMax == 0l) || (channelMax > 0xFFFFL))
+                                               ? 0xFFFFL
+                                               : channelMax);
+        }
+    }
+
     public final class WriteDeliverMethod
             implements ClientDeliveryMethod
     {

Added: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java?rev=1631192&view=auto
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java (added)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java Sun Oct 12 15:43:40 2014
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public interface ConnectionMethodProcessor
+{
+    void receiveChannelOpen(int channelId);
+
+    void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist);
+
+    void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId);
+
+    void receiveConnectionCloseOk();
+
+    void receiveConnectionSecureOk(byte[] response);
+
+    void receiveConnectionStartOk(FieldTable clientProperties,
+                                  AMQShortString mechanism,
+                                  byte[] response,
+                                  AMQShortString locale);
+
+    void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat);
+}

Propchange: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConnectionMethodProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java?rev=1631192&r1=1631191&r2=1631192&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java Sun Oct 12 15:43:40 2014
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
-import java.security.AccessControlException;
 import java.security.PrivilegedAction;
 
 import javax.security.auth.Subject;
@@ -29,16 +28,10 @@ import javax.security.sasl.SaslServer;
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public class ServerMethodDispatcherImpl implements MethodDispatcher
 {
@@ -52,6 +45,13 @@ public class ServerMethodDispatcherImpl 
         void onChannel(ChannelMethodProcessor channel);
     }
 
+
+    private static interface ConnectionAction
+    {
+        void onConnection(ConnectionMethodProcessor connection);
+    }
+
+
     public static MethodDispatcher createMethodDispatcher(AMQProtocolEngine connection)
     {
         return new ServerMethodDispatcherImpl(connection);
@@ -91,6 +91,21 @@ public class ServerMethodDispatcherImpl 
 
     }
 
+    private void processConnectionMethod(final ConnectionAction action)
+    {
+            Subject.doAs(_connection.getSubject(), new PrivilegedAction<Void>()
+            {
+                @Override
+                public Void run()
+                {
+                    action.onConnection(_connection);
+                    return null;
+                }
+            });
+
+
+    }
+
     public boolean dispatchAccessRequest(final AccessRequestBody body, int channelId)
     {
         processChannelMethod(channelId,
@@ -240,7 +255,7 @@ public class ServerMethodDispatcherImpl 
         return true;
     }
 
-    public boolean dispatchBasicReject(final BasicRejectBody body, int channelId) throws AMQException
+    public boolean dispatchBasicReject(final BasicRejectBody body, int channelId)
     {
 
         processChannelMethod(channelId,
@@ -257,30 +272,16 @@ public class ServerMethodDispatcherImpl 
         return true;
     }
 
-    public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
+    public boolean dispatchChannelOpen(ChannelOpenBody body, final int channelId)
     {
-        VirtualHostImpl virtualHost = _connection.getVirtualHost();
-
-        // Protect the broker against out of order frame request.
-        if (virtualHost == null)
+        processConnectionMethod(new ConnectionAction()
         {
-            throw new AMQException(AMQConstant.COMMAND_INVALID,
-                                   "Virtualhost has not yet been set. ConnectionOpen has not been called.",
-                                   null);
-        }
-        _logger.info("Connecting to: " + virtualHost.getName());
-
-        final AMQChannel channel = new AMQChannel(_connection, channelId, virtualHost.getMessageStore());
-
-        _connection.addChannel(channel);
-
-        ChannelOpenOkBody response;
-
-
-        response = _connection.getMethodRegistry().createChannelOpenOkBody();
-
-
-        _connection.writeFrame(response.generateFrame(channelId));
+            @Override
+            public void onConnection(final ConnectionMethodProcessor connection)
+            {
+                connection.receiveChannelOpen(channelId);
+            }
+        });
         return true;
     }
 
@@ -326,7 +327,7 @@ public class ServerMethodDispatcherImpl 
         throw new UnexpectedMethodException(body);
     }
 
-    public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
+    public boolean dispatchChannelClose(ChannelCloseBody body, int channelId)
     {
 
         processChannelMethod(channelId,
@@ -344,7 +345,7 @@ public class ServerMethodDispatcherImpl 
     }
 
 
-    public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
+    public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId)
     {
 
         processChannelMethod(channelId,
@@ -362,7 +363,7 @@ public class ServerMethodDispatcherImpl 
     }
 
 
-    public boolean dispatchChannelFlow(final ChannelFlowBody body, int channelId) throws AMQException
+    public boolean dispatchChannelFlow(final ChannelFlowBody body, int channelId)
     {
 
         processChannelMethod(channelId,
@@ -389,103 +390,52 @@ public class ServerMethodDispatcherImpl 
     }
 
 
-    public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
+    public boolean dispatchConnectionOpen(final ConnectionOpenBody body, int channelId)
     {
-
-        //ignore leading '/'
-        String virtualHostName;
-        if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/')
-        {
-            virtualHostName =
-                    new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString();
-        }
-        else
-        {
-            virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost());
-        }
-
-        VirtualHostImpl virtualHost = ((AmqpPort) _connection.getPort()).getVirtualHost(virtualHostName);
-
-        if (virtualHost == null)
+        processConnectionMethod(new ConnectionAction()
         {
-            closeConnection(AMQConstant.NOT_FOUND,
-                            "Unknown virtual host: '" + virtualHostName + "'");
-
-        }
-        else
-        {
-            // Check virtualhost access
-            if (virtualHost.getState() != State.ACTIVE)
+            @Override
+            public void onConnection(final ConnectionMethodProcessor connection)
             {
-                closeConnection(AMQConstant.CONNECTION_FORCED,
-                                "Virtual host '" + virtualHost.getName() + "' is not active"
-                               );
-
+                connection.receiveConnectionOpen(body.getVirtualHost(), body.getCapabilities(), body.getInsist());
             }
-            else
-            {
-                _connection.setVirtualHost(virtualHost);
-                try
-                {
-                    virtualHost.getSecurityManager().authoriseCreateConnection(_connection);
-                    if (_connection.getContextKey() == null)
-                    {
-                        _connection.setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis())));
-                    }
+        });
 
-                    MethodRegistry methodRegistry = _connection.getMethodRegistry();
-                    AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
-
-                    _connection.writeFrame(responseBody.generateFrame(channelId));
-                }
-                catch (AccessControlException e)
-                {
-                    closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
-                }
-            }
-        }
         return true;
     }
 
 
-    public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
+    public boolean dispatchConnectionClose(final ConnectionCloseBody body, int channelId)
     {
-        if (_logger.isInfoEnabled())
-        {
-            _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +
-                         body.getReplyText() + " for " + _connection);
-        }
-        try
-        {
-            _connection.closeSession();
-        }
-        catch (Exception e)
-        {
-            _logger.error("Error closing protocol session: " + e, e);
-        }
-
-        MethodRegistry methodRegistry = _connection.getMethodRegistry();
-        ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
-        _connection.writeFrame(responseBody.generateFrame(channelId));
 
-        _connection.closeProtocolSession();
+        processConnectionMethod(new ConnectionAction()
+        {
+            @Override
+            public void onConnection(final ConnectionMethodProcessor connection)
+            {
+                connection.receiveConnectionClose(body.getReplyCode(),
+                                                  body.getReplyText(),
+                                                  body.getClassId(),
+                                                  body.getMethodId());
+            }
+        });
 
         return true;
     }
 
 
-    public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
+    public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId)
     {
-        _logger.info("Received Connection-close-ok");
 
-        try
-        {
-            _connection.closeSession();
-        }
-        catch (Exception e)
+        processConnectionMethod(new ConnectionAction()
         {
-            _logger.error("Error closing protocol session: " + e, e);
-        }
+            @Override
+            public void onConnection(final ConnectionMethodProcessor connection)
+            {
+                connection.receiveConnectionCloseOk();
+            }
+        });
+
         return true;
     }
 
@@ -566,62 +516,18 @@ public class ServerMethodDispatcherImpl 
     }
 
 
-    public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
+    public boolean dispatchConnectionSecureOk(final ConnectionSecureOkBody body, int channelId)
     {
-        Broker<?> broker = _connection.getBroker();
 
-        SubjectCreator subjectCreator = _connection.getSubjectCreator();
-
-        SaslServer ss = _connection.getSaslServer();
-        if (ss == null)
-        {
-            throw new AMQException("No SASL context set up in session");
-        }
-        MethodRegistry methodRegistry = _connection.getMethodRegistry();
-        SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
-        switch (authResult.getStatus())
+        processConnectionMethod(new ConnectionAction()
         {
-            case ERROR:
-                Exception cause = authResult.getCause();
-
-                _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
-
-                ConnectionCloseBody connectionCloseBody =
-                        methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
-                                                                 AMQConstant.NOT_ALLOWED.getName(),
-                                                                 body.getClazz(),
-                                                                 body.getMethod());
-
-                _connection.writeFrame(connectionCloseBody.generateFrame(0));
-                disposeSaslServer(_connection);
-                break;
-            case SUCCESS:
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Connected as: " + authResult.getSubject());
-                }
-
-                int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
-
-                if (frameMax <= 0)
-                {
-                    frameMax = Integer.MAX_VALUE;
-                }
+            @Override
+            public void onConnection(final ConnectionMethodProcessor connection)
+            {
+                connection.receiveConnectionSecureOk(body.getResponse());
+            }
+        });
 
-                ConnectionTuneBody tuneBody =
-                        methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
-                                                                frameMax,
-                                                                broker.getConnection_heartBeatDelay());
-                _connection.writeFrame(tuneBody.generateFrame(0));
-                _connection.setAuthorizedSubject(authResult.getSubject());
-                disposeSaslServer(_connection);
-                break;
-            case CONTINUE:
-
-                ConnectionSecureBody
-                        secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
-                _connection.writeFrame(secureBody.generateFrame(0));
-        }
         return true;
     }
 
@@ -642,129 +548,40 @@ public class ServerMethodDispatcherImpl 
         }
     }
 
-    public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
+    public boolean dispatchConnectionStartOk(final ConnectionStartOkBody body, int channelId)
     {
-        Broker<?> broker = _connection.getBroker();
-
-        _logger.info("SASL Mechanism selected: " + body.getMechanism());
-        _logger.info("Locale selected: " + body.getLocale());
 
-        SubjectCreator subjectCreator = _connection.getSubjectCreator();
-        SaslServer ss = null;
-        try
+        processConnectionMethod(new ConnectionAction()
         {
-            ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()),
-                                                 _connection.getLocalFQDN(),
-                                                 _connection.getPeerPrincipal());
-
-            if (ss == null)
-            {
-                closeConnection(AMQConstant.RESOURCE_ERROR,
-                                "Unable to create SASL Server:" + body.getMechanism()
-                               );
-
-            }
-            else
+            @Override
+            public void onConnection(final ConnectionMethodProcessor connection)
             {
-
-                _connection.setSaslServer(ss);
-
-                final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
-                //save clientProperties
-                _connection.setClientProperties(body.getClientProperties());
-
-                MethodRegistry methodRegistry = _connection.getMethodRegistry();
-
-                switch (authResult.getStatus())
-                {
-                    case ERROR:
-                        Exception cause = authResult.getCause();
-
-                        _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
-
-                        ConnectionCloseBody closeBody =
-                                methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
-                                                                         // replyCode
-                                                                         AMQConstant.NOT_ALLOWED.getName(),
-                                                                         body.getClazz(),
-                                                                         body.getMethod());
-
-                        _connection.writeFrame(closeBody.generateFrame(0));
-                        disposeSaslServer(_connection);
-                        break;
-
-                    case SUCCESS:
-                        if (_logger.isInfoEnabled())
-                        {
-                            _logger.info("Connected as: " + authResult.getSubject());
-                        }
-                        _connection.setAuthorizedSubject(authResult.getSubject());
-
-                        int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
-
-                        if (frameMax <= 0)
-                        {
-                            frameMax = Integer.MAX_VALUE;
-                        }
-
-                        ConnectionTuneBody
-                                tuneBody =
-                                methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
-                                                                        frameMax,
-                                                                        broker.getConnection_heartBeatDelay());
-                        _connection.writeFrame(tuneBody.generateFrame(0));
-                        break;
-                    case CONTINUE:
-                        ConnectionSecureBody
-                                secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
-                        _connection.writeFrame(secureBody.generateFrame(0));
-                }
+                connection.receiveConnectionStartOk(body.getClientProperties(),
+                                                    body.getMechanism(),
+                                                    body.getResponse(),
+                                                    body.getLocale());
             }
-        }
-        catch (SaslException e)
-        {
-            disposeSaslServer(_connection);
-            throw new AMQException("SASL error: " + e, e);
-        }
+        });
+
         return true;
     }
 
-    public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
+    public boolean dispatchConnectionTuneOk(final ConnectionTuneOkBody body, int channelId)
     {
-        final AMQProtocolEngine connection = getConnection();
-
-        connection.initHeartbeats(body.getHeartbeat());
 
-        int brokerFrameMax = connection.getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
-        if (brokerFrameMax <= 0)
+        processConnectionMethod(new ConnectionAction()
         {
-            brokerFrameMax = Integer.MAX_VALUE;
-        }
+            @Override
+            public void onConnection(final ConnectionMethodProcessor connection)
+            {
+                connection.receiveConnectionTuneOk(body.getChannelMax(),
+                                                    body.getFrameMax(),
+                                                    body.getHeartbeat());
+            }
+        });
+        final AMQProtocolEngine connection = getConnection();
 
-        if (body.getFrameMax() > (long) brokerFrameMax)
-        {
-            throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
-                                             "Attempt to set max frame size to " + body.getFrameMax()
-                                             + " greater than the broker will allow: "
-                                             + brokerFrameMax,
-                                             body.getClazz(), body.getMethod(),
-                                             connection.getMethodRegistry(), null);
-        }
-        else if (body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode())
-        {
-            throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
-                                             "Attempt to set max frame size to " + body.getFrameMax()
-                                             + " which is smaller than the specification definined minimum: "
-                                             + AMQConstant.FRAME_MIN_SIZE.getCode(),
-                                             body.getClazz(), body.getMethod(),
-                                             connection.getMethodRegistry(), null);
-        }
-        int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax();
-        connection.setMaxFrameSize(frameMax);
 
-        long maxChannelNumber = body.getChannelMax();
-        //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
-        connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber);
         return true;
     }
 
@@ -825,11 +642,6 @@ public class ServerMethodDispatcherImpl 
         return true;
     }
 
-    private boolean isDefaultExchange(final AMQShortString exchangeName)
-    {
-        return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING);
-    }
-
     public boolean dispatchQueueBind(final QueueBindBody body, int channelId)
     {
         processChannelMethod(channelId,
@@ -891,7 +703,7 @@ public class ServerMethodDispatcherImpl 
         return true;
     }
 
-    public boolean dispatchQueuePurge(final QueuePurgeBody body, int channelId) throws AMQException
+    public boolean dispatchQueuePurge(final QueuePurgeBody body, int channelId)
     {
 
         processChannelMethod(channelId,
@@ -910,7 +722,7 @@ public class ServerMethodDispatcherImpl 
     }
 
 
-    public boolean dispatchTxCommit(TxCommitBody body, final int channelId) throws AMQException
+    public boolean dispatchTxCommit(TxCommitBody body, final int channelId)
     {
 
         processChannelMethod(channelId,
@@ -927,7 +739,7 @@ public class ServerMethodDispatcherImpl 
         return true;
     }
 
-    public boolean dispatchTxRollback(TxRollbackBody body, final int channelId) throws AMQException
+    public boolean dispatchTxRollback(TxRollbackBody body, final int channelId)
     {
 
         processChannelMethod(channelId,
@@ -943,7 +755,7 @@ public class ServerMethodDispatcherImpl 
         return true;
     }
 
-    public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
+    public boolean dispatchTxSelect(TxSelectBody body, int channelId)
     {
         processChannelMethod(channelId,
                              new ChannelAction()
@@ -958,7 +770,7 @@ public class ServerMethodDispatcherImpl 
         return true;
     }
 
-    public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, int channelId) throws AMQException
+    public boolean dispatchBasicRecoverSync(final BasicRecoverSyncBody body, int channelId)
     {
         processChannelMethod(channelId,
                              new ChannelAction()
@@ -991,7 +803,7 @@ public class ServerMethodDispatcherImpl 
         throw new UnexpectedMethodException(body);
     }
 
-    public boolean dispatchQueueUnbind(final QueueUnbindBody body, int channelId) throws AMQException
+    public boolean dispatchQueueUnbind(final QueueUnbindBody body, int channelId)
     {
 
         processChannelMethod(channelId,

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java?rev=1631192&r1=1631191&r2=1631192&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java Sun Oct 12 15:43:40 2014
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -46,23 +44,16 @@ public class MaxChannelsTest extends Qpi
 
         long maxChannels = 10L;
         _session.setMaximumNumberOfChannels(maxChannels);
-        assertEquals("Number of channels not correctly set.", new Long(maxChannels), _session.getMaximumNumberOfChannels());
+        assertEquals("Number of channels not correctly set.", maxChannels, _session.getMaximumNumberOfChannels());
 
-        for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++)
+        for (long currentChannel = 1L; currentChannel <= maxChannels; currentChannel++)
         {
-            _session.addChannel(new AMQChannel(_session, (int) currentChannel, null));
+            _session.receiveChannelOpen( (int) currentChannel);
         }
-
-        try
-        {
-            _session.addChannel(new AMQChannel(_session, (int) maxChannels, null));
-            fail("Cannot create more channels then maximum");
-        }
-        catch (AMQException e)
-        {
-            assertEquals("Wrong exception received.", e.getErrorCode(), AMQConstant.NOT_ALLOWED);
-        }
-        assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_session.getChannels().size()));
+        assertFalse("Connection should not be closed after opening " + maxChannels + " channels",_session.isClosed());
+        assertEquals("Maximum number of channels not set.", maxChannels, _session.getChannels().size());
+        _session.receiveChannelOpen((int) maxChannels+1);
+        assertTrue("Connection should be closed after opening " + (maxChannels + 1) + " channels",_session.isClosed());
     }
 
     @Override

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=1631192&r1=1631191&r2=1631192&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Sun Oct 12 15:43:40 2014
@@ -389,6 +389,17 @@ public final class AMQShortString implem
         {
             return new CharSubSequence(start + _sequenceOffset, end + _sequenceOffset);
         }
+
+        @Override
+        public String toString()
+        {
+            char[] chars = new char[length()];
+            for(int i = 0; i < length(); i++)
+            {
+                chars[i] = charAt(i);
+            }
+            return new String(chars);
+        }
     }
 
     public char[] asChars()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org