You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/01/27 23:59:46 UTC

svn commit: r1655186 - in /qpid/trunk/qpid/java: broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ common/src/main/java/org/apache/q...

Author: rgodfrey
Date: Tue Jan 27 22:59:46 2015
New Revision: 1655186

URL: http://svn.apache.org/r1655186
Log:
QPID-6342 : Fail fast when commands sent in wrong order

Modified:
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1655186&r1=1655185&r2=1655186&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Tue Jan 27 22:59:46 2015
@@ -96,6 +96,16 @@ public class AMQProtocolEngine implement
                                           AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
                                           ServerMethodProcessor<ServerChannelMethodProcessor>
 {
+    enum ConnectionState
+    {
+        INIT,
+        AWAIT_START_OK,
+        AWAIT_SECURE_OK,
+        AWAIT_TUNE_OK,
+        AWAIT_OPEN,
+        OPEN
+    }
+
     private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
 
     // to save boxing the channelId and looking up in a map... cache in an array the low numbered
@@ -123,6 +133,8 @@ public class AMQProtocolEngine implement
 
     private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
 
+    private ConnectionState _state = ConnectionState.INIT;
+
     /**
      * The channels that the latest call to {@link #received(ByteBuffer)} applied to.
      * Used so we know which channels we need to call {@link AMQChannel#receivedComplete()}
@@ -469,16 +481,15 @@ public class AMQProtocolEngine implement
                                                                                        serverProperties,
                                                                                        mechanisms.getBytes(),
                                                                                        locales.getBytes());
-            _sender.send(asByteBuffer(responseBody.generateFrame(0)));
-            _sender.flush();
+            writeFrame(responseBody.generateFrame(0));
+            _state = ConnectionState.AWAIT_START_OK;
 
         }
         catch (AMQException e)
         {
             _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
 
-            _sender.send(asByteBuffer(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())));
-            _sender.flush();
+            writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
         }
     }
 
@@ -1467,6 +1478,7 @@ public class AMQProtocolEngine implement
         {
             _logger.debug("RECV[" + channelId + "] ChannelOpen");
         }
+        assertState(ConnectionState.OPEN);
 
         // Protect the broker against out of order frame request.
         if (_virtualHost == null)
@@ -1503,6 +1515,15 @@ public class AMQProtocolEngine implement
         }
     }
 
+    void assertState(final ConnectionState requiredState)
+    {
+        if(_state != requiredState)
+        {
+            closeConnection(AMQConstant.COMMAND_INVALID, "Command Invalid", 0);
+
+        }
+    }
+
     @Override
     public void receiveConnectionOpen(AMQShortString virtualHostName,
                                       AMQShortString capabilities,
@@ -1555,6 +1576,7 @@ public class AMQProtocolEngine implement
                     AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(virtualHostName);
 
                     writeFrame(responseBody.generateFrame(0));
+                    _state = ConnectionState.OPEN;
                 }
                 catch (AccessControlException e)
                 {
@@ -1625,6 +1647,8 @@ public class AMQProtocolEngine implement
             _logger.debug("RECV ConnectionSecureOk[ response: ******** ] ");
         }
 
+        assertState(ConnectionState.AWAIT_SECURE_OK);
+
         Broker<?> broker = getBroker();
 
         SubjectCreator subjectCreator = getSubjectCreator();
@@ -1665,6 +1689,7 @@ public class AMQProtocolEngine implement
                                                                 frameMax,
                                                                 broker.getConnection_heartBeatDelay());
                 writeFrame(tuneBody.generateFrame(0));
+                _state = ConnectionState.AWAIT_TUNE_OK;
                 setAuthorizedSubject(authResult.getSubject());
                 disposeSaslServer();
                 break;
@@ -1713,6 +1738,8 @@ public class AMQProtocolEngine implement
                           + " ]");
         }
 
+        assertState(ConnectionState.AWAIT_START_OK);
+
         Broker<?> broker = getBroker();
 
         _logger.info("SASL Mechanism selected: " + mechanism);
@@ -1774,11 +1801,14 @@ public class AMQProtocolEngine implement
                                                                         frameMax,
                                                                         broker.getConnection_heartBeatDelay());
                         writeFrame(tuneBody.generateFrame(0));
+                        _state = ConnectionState.AWAIT_TUNE_OK;
                         break;
                     case CONTINUE:
                         ConnectionSecureBody
                                 secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
                         writeFrame(secureBody.generateFrame(0));
+
+                        _state = ConnectionState.AWAIT_SECURE_OK;
                 }
             }
         }
@@ -1797,6 +1827,8 @@ public class AMQProtocolEngine implement
             _logger.debug("RECV ConnectionTuneOk[" +" channelMax: " + channelMax + " frameMax: " + frameMax + " heartbeat: " + heartbeat + " ]");
         }
 
+        assertState(ConnectionState.AWAIT_TUNE_OK);
+
         initHeartbeats(heartbeat);
 
         int brokerFrameMax = getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
@@ -1828,7 +1860,10 @@ public class AMQProtocolEngine implement
             setMaximumNumberOfChannels( ((channelMax == 0l) || (channelMax > 0xFFFFL))
                                                ? 0xFFFFL
                                                : channelMax);
+
         }
+        _state = ConnectionState.AWAIT_OPEN;
+
     }
 
     public int getBinaryDataLimit()
@@ -1928,6 +1963,8 @@ public class AMQProtocolEngine implement
     @Override
     public ServerChannelMethodProcessor getChannelMethodProcessor(final int channelId)
     {
+        assertState(ConnectionState.OPEN);
+
         ServerChannelMethodProcessor channelMethodProcessor = getChannel(channelId);
         if(channelMethodProcessor == null)
         {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1655186&r1=1655185&r2=1655186&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Tue Jan 27 22:59:46 2015
@@ -275,6 +275,12 @@ public class InternalTestProtocolSession
         }
     }
 
+    void assertState(final ConnectionState requiredState)
+    {
+        // no-op
+    }
+
+
     private static final AtomicInteger portNumber = new AtomicInteger(0);
     
     private static class TestNetworkConnection implements NetworkConnection

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java?rev=1655186&r1=1655185&r2=1655186&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java Tue Jan 27 22:59:46 2015
@@ -42,7 +42,6 @@ public class ServerDecoder extends AMQDe
             throws AMQFrameDecodingException, IOException
     {
         ServerMethodProcessor<? extends ServerChannelMethodProcessor> methodProcessor = getMethodProcessor();
-        ServerChannelMethodProcessor channelMethodProcessor = methodProcessor.getChannelMethodProcessor(channelId);
         final int classAndMethod = in.readInt();
         int classId = classAndMethod >> 16;
         int methodId = classAndMethod & 0xFFFF;
@@ -115,116 +114,117 @@ public class ServerDecoder extends AMQDe
                     ChannelOpenBody.process(channelId, in, methodProcessor);
                     break;
                 case 0x00140014:
-                    ChannelFlowBody.process(in, channelMethodProcessor);
+                    ChannelFlowBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x00140015:
-                    ChannelFlowOkBody.process(in, channelMethodProcessor);
+                    ChannelFlowOkBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x00140028:
-                    ChannelCloseBody.process(in, channelMethodProcessor);
+                    ChannelCloseBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x00140029:
-                    channelMethodProcessor.receiveChannelCloseOk();
+                    methodProcessor.getChannelMethodProcessor(channelId).receiveChannelCloseOk();
                     break;
 
                 // ACCESS_CLASS:
 
                 case 0x001e000a:
-                    AccessRequestBody.process(in, channelMethodProcessor);
+                    AccessRequestBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
 
                 // EXCHANGE_CLASS:
 
                 case 0x0028000a:
-                    ExchangeDeclareBody.process(in, channelMethodProcessor);
+                    ExchangeDeclareBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x00280014:
-                    ExchangeDeleteBody.process(in, channelMethodProcessor);
+                    ExchangeDeleteBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x00280016:
-                    ExchangeBoundBody.process(in, channelMethodProcessor);
+                    ExchangeBoundBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
 
 
                 // QUEUE_CLASS:
 
                 case 0x0032000a:
-                    QueueDeclareBody.process(in, channelMethodProcessor);
+                    QueueDeclareBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x00320014:
-                    QueueBindBody.process(in, channelMethodProcessor);
+                    QueueBindBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x0032001e:
-                    QueuePurgeBody.process(in, channelMethodProcessor);
+                    QueuePurgeBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x00320028:
-                    QueueDeleteBody.process(in, channelMethodProcessor);
+                    QueueDeleteBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x00320032:
-                    QueueUnbindBody.process(in, channelMethodProcessor);
+                    QueueUnbindBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
 
 
                 // BASIC_CLASS:
 
                 case 0x003c000a:
-                    BasicQosBody.process(in, channelMethodProcessor);
+                    BasicQosBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x003c0014:
-                    BasicConsumeBody.process(in, channelMethodProcessor);
+                    BasicConsumeBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x003c001e:
-                    BasicCancelBody.process(in, channelMethodProcessor);
+                    BasicCancelBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x003c0028:
-                    BasicPublishBody.process(in, channelMethodProcessor);
+                    BasicPublishBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x003c0046:
-                    BasicGetBody.process(in, channelMethodProcessor);
+                    BasicGetBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x003c0050:
-                    BasicAckBody.process(in, channelMethodProcessor);
+                    BasicAckBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x003c005a:
-                    BasicRejectBody.process(in, channelMethodProcessor);
+                    BasicRejectBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x003c0064:
-                    BasicRecoverBody.process(in, methodProcessor.getProtocolVersion(), channelMethodProcessor);
+                    BasicRecoverBody.process(in, methodProcessor.getProtocolVersion(),
+                                             methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x003c0066:
-                    BasicRecoverSyncBody.process(in, channelMethodProcessor);
+                    BasicRecoverSyncBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x003c006e:
-                    BasicRecoverSyncBody.process(in, channelMethodProcessor);
+                    BasicRecoverSyncBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
                 case 0x003c0078:
-                    BasicNackBody.process(in, channelMethodProcessor);
+                    BasicNackBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
 
                 // CONFIRM CLASS:
 
                 case 0x0055000a:
-                    ConfirmSelectBody.process(in, channelMethodProcessor);
+                    ConfirmSelectBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
                     break;
 
                 // TX_CLASS:
 
                 case 0x005a000a:
-                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk())
                     {
-                        channelMethodProcessor.receiveTxSelect();
+                        methodProcessor.getChannelMethodProcessor(channelId).receiveTxSelect();
                     }
                     break;
                 case 0x005a0014:
-                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk())
                     {
-                        channelMethodProcessor.receiveTxCommit();
+                        methodProcessor.getChannelMethodProcessor(channelId).receiveTxCommit();
                     }
                     break;
                 case 0x005a001e:
-                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk())
                     {
-                        channelMethodProcessor.receiveTxRollback();
+                        methodProcessor.getChannelMethodProcessor(channelId).receiveTxRollback();
                     }
                     break;
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org