You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/11/24 22:14:23 UTC

svn commit: r597918 [9/12] - in /incubator/qpid/branches/M2.1: gentools/ gentools/lib/ gentools/src/org/apache/qpid/gentools/ java/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/handler/ java/broker/...

Added: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java?rev=597918&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java (added)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java Sat Nov 24 13:14:14 2007
@@ -0,0 +1,566 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQException;
+
+public class ServerMethodDispatcherImpl implements MethodDispatcher
+{
+    private final AMQStateManager _stateManager;
+
+    private static interface DispatcherFactory
+        {
+            public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager);
+        }
+
+        private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
+                new HashMap<ProtocolVersion, DispatcherFactory>();
+
+
+    static
+        {
+            _dispatcherFactories.put(ProtocolVersion.v8_0,
+                                     new DispatcherFactory()
+                                     {
+                                         public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+                                         {
+                                             return new ServerMethodDispatcherImpl_8_0(stateManager);
+                                         }
+                                     });
+
+            _dispatcherFactories.put(ProtocolVersion.v0_9,
+                                     new DispatcherFactory()
+                                     {
+                                         public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+                                         {
+                                             return new ServerMethodDispatcherImpl_0_9(stateManager);
+                                         }
+                                     });
+
+        }
+
+
+    private static final AccessRequestHandler _accessRequestHandler = AccessRequestHandler.getInstance();
+    private static final ChannelCloseHandler _channelCloseHandler = ChannelCloseHandler.getInstance();
+    private static final ChannelOpenHandler _channelOpenHandler = ChannelOpenHandler.getInstance();
+    private static final ChannelCloseOkHandler _channelCloseOkHandler = ChannelCloseOkHandler.getInstance();
+    private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
+    private static final ConnectionCloseOkMethodHandler _connectionCloseOkMethodHandler = ConnectionCloseOkMethodHandler.getInstance();
+    private static final ConnectionOpenMethodHandler _connectionOpenMethodHandler = ConnectionOpenMethodHandler.getInstance();
+    private static final ConnectionTuneOkMethodHandler _connectionTuneOkMethodHandler = ConnectionTuneOkMethodHandler.getInstance();
+    private static final ConnectionSecureOkMethodHandler _connectionSecureOkMethodHandler = ConnectionSecureOkMethodHandler.getInstance();
+    private static final ConnectionStartOkMethodHandler _connectionStartOkMethodHandler = ConnectionStartOkMethodHandler.getInstance();
+    private static final ExchangeDeclareHandler _exchangeDeclareHandler = ExchangeDeclareHandler.getInstance();
+    private static final ExchangeDeleteHandler _exchangeDeleteHandler = ExchangeDeleteHandler.getInstance();
+    private static final ExchangeBoundHandler _exchangeBoundHandler = ExchangeBoundHandler.getInstance();
+    private static final BasicAckMethodHandler _basicAckMethodHandler = BasicAckMethodHandler.getInstance();
+    private static final BasicRecoverMethodHandler _basicRecoverMethodHandler = BasicRecoverMethodHandler.getInstance();
+    private static final BasicConsumeMethodHandler _basicConsumeMethodHandler = BasicConsumeMethodHandler.getInstance();
+    private static final BasicGetMethodHandler _basicGetMethodHandler = BasicGetMethodHandler.getInstance();
+    private static final BasicCancelMethodHandler _basicCancelMethodHandler = BasicCancelMethodHandler.getInstance();
+    private static final BasicPublishMethodHandler _basicPublishMethodHandler = BasicPublishMethodHandler.getInstance();
+    private static final BasicQosHandler _basicQosHandler = BasicQosHandler.getInstance();
+    private static final QueueBindHandler _queueBindHandler = QueueBindHandler.getInstance();
+    private static final QueueDeclareHandler _queueDeclareHandler = QueueDeclareHandler.getInstance();
+    private static final QueueDeleteHandler _queueDeleteHandler = QueueDeleteHandler.getInstance();
+    private static final QueuePurgeHandler _queuePurgeHandler = QueuePurgeHandler.getInstance();
+    private static final ChannelFlowHandler _channelFlowHandler = ChannelFlowHandler.getInstance();
+    private static final TxSelectHandler _txSelectHandler = TxSelectHandler.getInstance();
+    private static final TxCommitHandler _txCommitHandler = TxCommitHandler.getInstance();
+    private static final TxRollbackHandler _txRollbackHandler = TxRollbackHandler.getInstance();
+    private static final BasicRejectMethodHandler _basicRejectMethodHandler = BasicRejectMethodHandler.getInstance();
+
+
+
+    public static MethodDispatcher createMethodDispatcher(AMQStateManager stateManager, ProtocolVersion protocolVersion)
+    {
+        return _dispatcherFactories.get(protocolVersion).createMethodDispatcher(stateManager);
+    }
+
+
+    public ServerMethodDispatcherImpl(AMQStateManager stateManager)
+    {
+        _stateManager = stateManager;
+    }
+
+
+    protected AMQStateManager getStateManager()
+    {
+        return _stateManager;
+    }
+    
+
+
+    public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
+    {
+        _accessRequestHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
+    {
+        _basicAckMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
+    {
+        _basicCancelMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
+    {
+        _basicConsumeMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
+    {
+        _basicGetMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
+    {
+        _basicPublishMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
+    {
+        _basicQosHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
+    {
+        _basicRecoverMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
+    {
+        _basicRejectMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
+    {
+        _channelOpenHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+
+    public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
+    {
+        _channelCloseHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+
+    public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
+    {
+        _channelCloseOkHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+
+    public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
+    {
+        _channelFlowHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+
+    public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
+    {
+        _connectionOpenMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+
+    public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
+    {
+        _connectionCloseMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+
+    public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
+    {
+        _connectionCloseOkMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+
+    public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
+    {
+        _connectionSecureOkMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
+    {
+        _connectionStartOkMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
+    {
+        _connectionTuneOkMethodHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
+    {
+        _exchangeBoundHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
+    {
+        _exchangeDeclareHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
+    {
+        _exchangeDeleteHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
+    {
+        _queueBindHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
+    {
+        _queueDeclareHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException
+    {
+        _queueDeleteHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException
+    {
+        _queuePurgeHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException
+    {
+        _txCommitHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException
+    {
+        _txRollbackHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+    public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
+    {
+        _txSelectHandler.methodReceived(_stateManager, body, channelId);
+        return true;
+    }
+
+
+
+
+}

Added: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java?rev=597918&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java (added)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java Sat Nov 24 13:14:14 2007
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+
+import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.AMQException;
+
+
+
+public class ServerMethodDispatcherImpl_0_9
+        extends ServerMethodDispatcherImpl
+        implements MethodDispatcher_0_9
+
+{
+
+    private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler =
+            BasicRecoverSyncMethodHandler.getInstance();
+
+    public ServerMethodDispatcherImpl_0_9(AMQStateManager stateManager)
+    {
+        super(stateManager);
+    }
+
+    public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
+    {
+        _basicRecoverSyncMethodHandler.methodReceived(getStateManager(), body, channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+}

Added: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java?rev=597918&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java (added)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_8_0.java Sat Nov 24 13:14:14 2007
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.AMQException;
+
+public class ServerMethodDispatcherImpl_8_0
+        extends ServerMethodDispatcherImpl
+        implements MethodDispatcher_8_0
+{
+    public ServerMethodDispatcherImpl_8_0(AMQStateManager stateManager)
+    {
+        super(stateManager);
+    }
+
+    public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
+    {
+        throw new UnexpectedMethodException(body);
+    }
+
+    public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+}

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Sat Nov 24 13:14:14 2007
@@ -24,6 +24,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.TxCommitBody;
 import org.apache.qpid.framing.TxCommitOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -45,7 +47,7 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxCommitBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, TxCommitBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
 
@@ -53,25 +55,26 @@
         {
             if (_log.isDebugEnabled())
             {
-                _log.debug("Commit received on channel " + evt.getChannelId());
+                _log.debug("Commit received on channel " + channelId);
             }
-            AMQChannel channel = session.getChannel(evt.getChannelId());
+            AMQChannel channel = session.getChannel(channelId);
 
             if (channel == null)
             {
-                throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
+                throw body.getChannelNotFoundException(channelId);
             }
 
             channel.commit();
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            session.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+
+            MethodRegistry methodRegistry = session.getMethodRegistry();
+            AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
+            session.writeFrame(responseBody.generateFrame(channelId));
+            
             channel.processReturns(session);
         }
         catch (AMQException e)
         {
-            throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
+            throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
         }
     }
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Sat Nov 24 13:14:14 2007
@@ -23,6 +23,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.TxRollbackBody;
 import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -42,24 +44,26 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxRollbackBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, TxRollbackBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
 
         try
         {
-            AMQChannel channel = session.getChannel(evt.getChannelId());
+            AMQChannel channel = session.getChannel(channelId);
 
             if (channel == null)
             {
-                throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
+                throw body.getChannelNotFoundException(channelId);
             }
 
             channel.rollback();
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+
+            MethodRegistry methodRegistry = session.getMethodRegistry();
+            AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
+            session.writeFrame(responseBody.generateFrame(channelId));
+
+            
             //Now resend all the unacknowledged messages back to the original subscribers.
             //(Must be done after the TxnRollback-ok response).
             // Why, are we not allowed to send messages back to client before the ok method?
@@ -67,7 +71,7 @@
         }
         catch (AMQException e)
         {
-            throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
+            throw body.getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
         }
     }
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java Sat Nov 24 13:14:14 2007
@@ -23,6 +23,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.TxSelectBody;
 import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.state.AMQStateManager;
@@ -42,22 +43,21 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxSelectBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, TxSelectBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
 
-        AMQChannel channel = session.getChannel(evt.getChannelId());
+        AMQChannel channel = session.getChannel(channelId);
 
         if (channel == null)
         {
-            throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
+            throw body.getChannelNotFoundException(channelId);
         }
 
         channel.setLocalTransactional();
 
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+        MethodRegistry methodRegistry = session.getMethodRegistry();
+        TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();
+        session.writeFrame(responseBody.generateFrame(channelId));
     }
 }

Added: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java?rev=597918&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java (added)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java Sat Nov 24 13:14:14 2007
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.AMQException;
+
+public class UnexpectedMethodException extends AMQException
+{
+    public UnexpectedMethodException(AMQMethodBody body)
+    {
+        super("Unexpected method recevied: " + body.getClass().getName());
+    }
+}

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java Sat Nov 24 13:14:14 2007
@@ -27,8 +27,8 @@
 package org.apache.qpid.server.output;
 
 import org.apache.qpid.server.output.ProtocolOutputConverter.Factory;
-import org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.framing.ProtocolVersion;
 
 import java.util.Map;
 import java.util.HashMap;
@@ -36,27 +36,26 @@
 public class ProtocolOutputConverterRegistry
 {
 
-    private static final Map<Byte, Map<Byte, Factory>> _registry =
-            new HashMap<Byte, Map<Byte, Factory>>();
+    private static final Map<ProtocolVersion, Factory> _registry =
+            new HashMap<ProtocolVersion, Factory>();
 
 
     static
     {
-        register((byte) 8, (byte) 0, ProtocolOutputConverterImpl.getInstanceFactory());
+        register(ProtocolVersion.v8_0, org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl.getInstanceFactory());
+        register(ProtocolVersion.v0_9, org.apache.qpid.server.output.amqp0_9.ProtocolOutputConverterImpl.getInstanceFactory());
+
     }
 
-    private static void register(byte major, byte minor, Factory converter)
+    private static void register(ProtocolVersion version, Factory converter)
     {
-        if(!_registry.containsKey(major))
-        {
-            _registry.put(major, new HashMap<Byte, Factory>());
-        }
-        _registry.get(major).put(minor, converter);
+
+        _registry.put(version,converter);
     }
 
 
     public static ProtocolOutputConverter getConverter(AMQProtocolSession session)
     {
-        return _registry.get(session.getProtocolMajorVersion()).get(session.getProtocolMinorVersion()).newInstance(session);
+        return _registry.get(session.getProtocolVersion()).newInstance(session);
     }
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Sat Nov 24 13:14:14 2007
@@ -43,6 +43,7 @@
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
 {
 
+
     public static Factory getInstanceFactory()
     {
         return new Factory()
@@ -98,7 +99,7 @@
             //
             ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
 
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
             AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
             CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
             writeFrame(compositeBlock);
@@ -109,7 +110,7 @@
             for(int i = 1; i < bodyCount; i++)
             {
                 cb = messageHandle.getContentChunk(storeContext,messageId, i);
-                writeFrame(new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+                writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
             }
 
 
@@ -149,7 +150,7 @@
             //
             ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
 
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
             AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
             CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
             writeFrame(compositeBlock);
@@ -160,7 +161,7 @@
             for(int i = 1; i < bodyCount; i++)
             {
                 cb = messageHandle.getContentChunk(storeContext, messageId, i);
-                writeFrame(new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+                writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
             }
 
 
@@ -176,11 +177,14 @@
         final MessagePublishInfo pb = message.getMessagePublishInfo();
         final AMQMessageHandle messageHandle = message.getMessageHandle();
 
-        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, getProtocolMajorVersion(),
-                                                                getProtocolMinorVersion(),
-                                                                consumerTag,
-                                                                deliveryTag, pb.getExchange(), messageHandle.isRedelivered(),
-                                                                pb.getRoutingKey());
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+        BasicDeliverBody deliverBody =
+                methodRegistry.createBasicDeliverBody(consumerTag,
+                                                      deliveryTag,
+                                                      messageHandle.isRedelivered(),
+                                                      pb.getExchange(),
+                                                      pb.getRoutingKey());
+        AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
 
 
         return deliverFrame.toByteBuffer();
@@ -192,13 +196,14 @@
         final MessagePublishInfo pb = message.getMessagePublishInfo();
         final AMQMessageHandle messageHandle = message.getMessageHandle();
 
-        AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
-                                                            getProtocolMajorVersion(),
-                                                            getProtocolMinorVersion(),
-                                                                deliveryTag, pb.getExchange(),
-                                                                queueSize,
-                                                                messageHandle.isRedelivered(),
-                                                                pb.getRoutingKey());
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+        BasicGetOkBody getOkBody =
+                methodRegistry.createBasicGetOkBody(deliveryTag,
+                                                    messageHandle.isRedelivered(),
+                                                    pb.getExchange(),
+                                                    pb.getRoutingKey(),
+                                                    queueSize);
+        AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
 
         return getOkFrame.toByteBuffer();
     }
@@ -215,12 +220,13 @@
 
     private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
     {
-        AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
-                                                              getProtocolMajorVersion(),
-                                                              getProtocolMinorVersion(),
-                                                              message.getMessagePublishInfo().getExchange(),
-                                                              replyCode, replyText,
-                                                              message.getMessagePublishInfo().getRoutingKey());
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+        BasicReturnBody basicReturnBody =
+                methodRegistry.createBasicReturnBody(replyCode,
+                                                     replyText,
+                                                     message.getMessagePublishInfo().getExchange(),
+                                                     message.getMessagePublishInfo().getRoutingKey());
+        AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
 
         return returnFrame.toByteBuffer();
     }
@@ -272,11 +278,9 @@
 
     public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
     {
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+        BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
+        writeFrame(basicCancelOkBody.generateFrame(channelId));
 
-        writeFrame(BasicCancelOkBody.createAMQFrame(channelId,
-                   getProtocolMajorVersion(),
-                   getProtocolMinorVersion(),
-                   consumerTag    // consumerTag
-        ));
     }
 }

Added: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=597918&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (added)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Sat Nov 24 13:14:14 2007
@@ -0,0 +1,260 @@
+package org.apache.qpid.server.output.amqp0_9;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Iterator;
+
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+
+
+    public static Factory getInstanceFactory()
+    {
+        return new Factory()
+        {
+
+            public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+            {
+                return new ProtocolOutputConverterImpl(session);
+            }
+        };
+    }
+
+    private final AMQProtocolSession _protocolSession;
+
+    private ProtocolOutputConverterImpl(AMQProtocolSession session)
+    {
+        _protocolSession = session;
+    }
+
+
+    public AMQProtocolSession getProtocolSession()
+    {
+        return _protocolSession;
+    }
+
+    public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+            throws AMQException
+    {
+        ByteBuffer deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                      message.getContentHeaderBody());
+
+        final AMQMessageHandle messageHandle = message.getMessageHandle();
+        final StoreContext storeContext = message.getStoreContext();
+        final Long messageId = message.getMessageId();
+
+        final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
+
+        if(bodyCount == 0)
+        {
+            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+                                                                             contentHeader);
+
+            writeFrame(compositeBlock);
+        }
+        else
+        {
+
+
+            //
+            // Optimise the case where we have a single content body. In that case we create a composite block
+            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+            //
+            ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
+
+            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+            AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+            writeFrame(compositeBlock);
+
+            //
+            // Now start writing out the other content bodies
+            //
+            for(int i = 1; i < bodyCount; i++)
+            {
+                cb = messageHandle.getContentChunk(storeContext,messageId, i);
+                writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+            }
+
+
+        }
+
+
+    }
+
+
+    public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+    {
+
+        final AMQMessageHandle messageHandle = message.getMessageHandle();
+        final StoreContext storeContext = message.getStoreContext();
+        final long messageId = message.getMessageId();
+
+        ByteBuffer deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+
+
+        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                      message.getContentHeaderBody());
+
+        final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
+        if(bodyCount == 0)
+        {
+            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+                                                                             contentHeader);
+            writeFrame(compositeBlock);
+        }
+        else
+        {
+
+
+            //
+            // Optimise the case where we have a single content body. In that case we create a composite block
+            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+            //
+            ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
+
+            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+            AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+            writeFrame(compositeBlock);
+
+            //
+            // Now start writing out the other content bodies
+            //
+            for(int i = 1; i < bodyCount; i++)
+            {
+                cb = messageHandle.getContentChunk(storeContext, messageId, i);
+                writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+            }
+
+
+        }
+
+
+    }
+
+
+    private ByteBuffer createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+            throws AMQException
+    {
+        final MessagePublishInfo pb = message.getMessagePublishInfo();
+        final AMQMessageHandle messageHandle = message.getMessageHandle();
+
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+        BasicDeliverBody deliverBody =
+                methodRegistry.createBasicDeliverBody(consumerTag,
+                                                      deliveryTag,
+                                                      messageHandle.isRedelivered(),
+                                                      pb.getExchange(),
+                                                      pb.getRoutingKey());
+        AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
+
+
+        return deliverFrame.toByteBuffer();
+    }
+
+    private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+            throws AMQException
+    {
+        final MessagePublishInfo pb = message.getMessagePublishInfo();
+        final AMQMessageHandle messageHandle = message.getMessageHandle();
+
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+        BasicGetOkBody getOkBody =
+                methodRegistry.createBasicGetOkBody(deliveryTag,
+                                                    messageHandle.isRedelivered(),
+                                                    pb.getExchange(),
+                                                    pb.getRoutingKey(),
+                                                    queueSize);
+        AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
+
+        return getOkFrame.toByteBuffer();
+    }
+
+    public byte getProtocolMinorVersion()
+    {
+        return getProtocolSession().getProtocolMinorVersion();
+    }
+
+    public byte getProtocolMajorVersion()
+    {
+        return getProtocolSession().getProtocolMajorVersion();
+    }
+
+    private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+    {
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+        BasicReturnBody basicReturnBody =
+                methodRegistry.createBasicReturnBody(replyCode,
+                                                     replyText,
+                                                     message.getMessagePublishInfo().getExchange(),
+                                                     message.getMessagePublishInfo().getRoutingKey());
+        AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
+
+        return returnFrame.toByteBuffer();
+    }
+
+    public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+            throws AMQException
+    {
+        ByteBuffer returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+
+        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                      message.getContentHeaderBody());
+
+        Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
+        //
+        // Optimise the case where we have a single content body. In that case we create a composite block
+        // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+        //
+        if (bodyFrameIterator.hasNext())
+        {
+            AMQDataBlock firstContentBody = bodyFrameIterator.next();
+            AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+            writeFrame(compositeBlock);
+        }
+        else
+        {
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
+                                                                             new AMQDataBlock[]{contentHeader});
+
+            writeFrame(compositeBlock);
+        }
+
+        //
+        // Now start writing out the other content bodies
+        // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+        //
+        while (bodyFrameIterator.hasNext())
+        {
+            writeFrame(bodyFrameIterator.next());
+        }
+    }
+
+
+    public void writeFrame(AMQDataBlock block)
+    {
+        getProtocolSession().writeFrame(block);
+    }
+
+
+    public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+    {
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+        BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
+        writeFrame(basicCancelOkBody.generateFrame(channelId));
+
+    }
+}

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Sat Nov 24 13:14:14 2007
@@ -39,6 +39,7 @@
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -107,10 +108,11 @@
 
     private FieldTable _clientProperties;
     private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
-    private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(_protocolVersion);
+
     private List<Integer> _closingChannelsList = new ArrayList<Integer>();
     private ProtocolOutputConverter _protocolOutputConverter;
     private Principal _authorizedID;
+    private MethodDispatcher _dispatcher;
 
     public ManagedObject getManagedObject()
     {
@@ -235,24 +237,24 @@
         ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
         try
         {
-            pi.checkVersion(); // Fails if not correct
+            ProtocolVersion pv = pi.checkVersion(); // Fails if not correct
 
             // This sets the protocol version (and hence framing classes) for this session.
-            setProtocolVersion(pi._protocolMajor, pi._protocolMinor);
+            setProtocolVersion(pv);
 
             String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
 
             String locales = "en_US";
 
-            // Interfacing with generated code - be aware of possible changes to parameter order as versions change.
-            AMQFrame response =
-                ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
-                    locales.getBytes(), // locales
-                    mechanisms.getBytes(), // mechanisms
-                    null, // serverProperties
-                    (short) getProtocolMajorVersion(), // versionMajor
-                    (short) getProtocolMinorVersion()); // versionMinor
-            _minaProtocolSession.write(response);
+
+            AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
+                                                                                       (short) getProtocolMinorVersion(),
+                                                                                       null,
+                                                                                       mechanisms.getBytes(),
+                                                                                       locales.getBytes());
+            _minaProtocolSession.write(responseBody.generateFrame(0));
+
+
         }
         catch (AMQException e)
         {
@@ -548,6 +550,7 @@
 
     public void closeChannelOk(int channelId)
     {
+        removeChannel(channelId);
         _closingChannelsList.remove(new Integer(channelId));
     }
 
@@ -695,13 +698,12 @@
         }
     }
 
-    private void setProtocolVersion(byte major, byte minor)
+    private void setProtocolVersion(ProtocolVersion pv)
     {
-        _protocolVersion = new ProtocolVersion(major, minor);
-
-        _registry = MainRegistry.getVersionSpecificRegistry(_protocolVersion);
+        _protocolVersion = pv;
 
         _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
+        _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion);
     }
 
     public byte getProtocolMajorVersion()
@@ -709,6 +711,11 @@
         return _protocolVersion.getMajorVersion();
     }
 
+    public ProtocolVersion getProtocolVersion()
+    {
+        return _protocolVersion;
+    }
+
     public byte getProtocolMinorVersion()
     {
         return _protocolVersion.getMinorVersion();
@@ -719,9 +726,9 @@
         return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor);
     }
 
-    public VersionSpecificRegistry getRegistry()
+    public MethodRegistry getRegistry()
     {
-        return _registry;
+        return getMethodRegistry();
     }
 
     public Object getClientIdentifier()
@@ -764,6 +771,16 @@
     public Principal getAuthorizedID()
     {
         return _authorizedID;
+    }
+
+    public MethodRegistry getMethodRegistry()
+    {
+        return MethodRegistry.getMethodRegistry(getProtocolVersion());
+    }
+
+    public MethodDispatcher getMethodDispatcher()
+    {
+        return _dispatcher;
     }
 
     public String getClientVersion()

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Sat Nov 24 13:14:14 2007
@@ -30,13 +30,7 @@
 import org.apache.mina.util.SessionUtil;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQProtocolHeaderException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.transport.ConnectorConfiguration;
@@ -177,15 +171,12 @@
         {
             _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
 
-            // Be aware of possible changes to parameter order as versions change.
-            protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
-                                                                     session.getProtocolMajorVersion(),
-                                                                     session.getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                                     0,    // classId
-                                                                     0,    // methodId
-                                                                     200,    // replyCode
-                                                                     new AMQShortString(throwable.getMessage())    // replyText
-            ));
+
+            MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(session.getProtocolVersion());
+            ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
+                        
+            protocolSession.write(closeBody.generateFrame(0));
+
             protocolSession.close();
         }
     }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Sat Nov 24 13:14:14 2007
@@ -23,9 +23,7 @@
 import javax.security.sasl.SaslServer;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -37,6 +35,8 @@
 public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
 {
 
+
+
     public static interface Task
     {
         public void doTask(AMQProtocolSession session) throws AMQException;
@@ -172,4 +172,8 @@
     /** @return a Principal that was used to authorized this session */
     Principal getAuthorizedID();
 
+    public MethodRegistry getMethodRegistry();
+
+    public MethodDispatcher getMethodDispatcher();
+    
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Sat Nov 24 13:14:14 2007
@@ -61,6 +61,7 @@
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.management.AMQManagedObject;
@@ -261,17 +262,17 @@
      */
     public void closeConnection() throws JMException
     {
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        final AMQFrame response =
-            ConnectionCloseBody.createAMQFrame(0, _session.getProtocolMajorVersion(), _session.getProtocolMinorVersion(), // AMQP version (major, minor)
-                0, // classId
-                0, // methodId
-                AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
-                BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION // replyText
-            );
-        _session.writeFrame(response);
+
+        MethodRegistry methodRegistry = _session.getMethodRegistry();
+        ConnectionCloseBody responseBody =
+                methodRegistry.createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
+                                                         // replyCode
+                                                         BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION,
+                                                         // replyText,
+                                                         0,
+                                                         0);
+
+        _session.writeFrame(responseBody.generateFrame(0));
 
         try
         {

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Sat Nov 24 13:14:14 2007
@@ -187,7 +187,7 @@
 
         private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
         {
-            return _protocolSession.getRegistry().getProtocolVersionMethodConverter();
+            return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter();
         }
 
         public void remove()

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Sat Nov 24 13:14:14 2007
@@ -28,37 +28,11 @@
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicGetBody;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueuePurgeBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.handler.BasicAckMethodHandler;
 import org.apache.qpid.server.handler.BasicCancelMethodHandler;
 import org.apache.qpid.server.handler.BasicConsumeMethodHandler;
@@ -107,43 +81,35 @@
      * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
      * AMQFrame.
      */
-    private final EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
+/*    private final EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
         new EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>(
             AMQState.class);
+  */
+
 
     private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
 
     public AMQStateManager(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
     {
-        this(AMQState.CONNECTION_NOT_STARTED, true, virtualHostRegistry, protocolSession);
-    }
 
-    protected AMQStateManager(AMQState initial, boolean register, VirtualHostRegistry virtualHostRegistry,
-        AMQProtocolSession protocolSession)
-    {
         _virtualHostRegistry = virtualHostRegistry;
         _protocolSession = protocolSession;
-        _currentState = initial;
-        if (register)
-        {
-            registerListeners();
-        }
+        _currentState = AMQState.CONNECTION_NOT_STARTED;
+
     }
 
+    /*
     protected void registerListeners()
     {
         Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> frame2handlerMap;
 
         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
-        frame2handlerMap.put(ConnectionStartOkBody.class, ConnectionStartOkMethodHandler.getInstance());
         _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap);
 
         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
-        frame2handlerMap.put(ConnectionSecureOkBody.class, ConnectionSecureOkMethodHandler.getInstance());
         _state2HandlersMap.put(AMQState.CONNECTION_NOT_AUTH, frame2handlerMap);
 
         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
-        frame2handlerMap.put(ConnectionTuneOkBody.class, ConnectionTuneOkMethodHandler.getInstance());
         _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap);
 
         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
@@ -154,37 +120,41 @@
         // ConnectionOpen handlers
         //
         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
-        frame2handlerMap.put(ChannelOpenBody.class, ChannelOpenHandler.getInstance());
-        frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseHandler.getInstance());
-        frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkHandler.getInstance());
-        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
-        frame2handlerMap.put(ExchangeDeclareBody.class, ExchangeDeclareHandler.getInstance());
-        frame2handlerMap.put(ExchangeDeleteBody.class, ExchangeDeleteHandler.getInstance());
-        frame2handlerMap.put(ExchangeBoundBody.class, ExchangeBoundHandler.getInstance());
-        frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance());
-        frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance());
-        frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance());
-        frame2handlerMap.put(BasicGetBody.class, BasicGetMethodHandler.getInstance());
-        frame2handlerMap.put(BasicCancelBody.class, BasicCancelMethodHandler.getInstance());
-        frame2handlerMap.put(BasicPublishBody.class, BasicPublishMethodHandler.getInstance());
-        frame2handlerMap.put(BasicQosBody.class, BasicQosHandler.getInstance());
-        frame2handlerMap.put(QueueBindBody.class, QueueBindHandler.getInstance());
-        frame2handlerMap.put(QueueDeclareBody.class, QueueDeclareHandler.getInstance());
-        frame2handlerMap.put(QueueDeleteBody.class, QueueDeleteHandler.getInstance());
-        frame2handlerMap.put(QueuePurgeBody.class, QueuePurgeHandler.getInstance());
-        frame2handlerMap.put(ChannelFlowBody.class, ChannelFlowHandler.getInstance());
-        frame2handlerMap.put(TxSelectBody.class, TxSelectHandler.getInstance());
-        frame2handlerMap.put(TxCommitBody.class, TxCommitHandler.getInstance());
-        frame2handlerMap.put(TxRollbackBody.class, TxRollbackHandler.getInstance());
-        frame2handlerMap.put(BasicRejectBody.class, BasicRejectMethodHandler.getInstance());
+        ChannelOpenHandler.getInstance();
+        ChannelCloseHandler.getInstance();
+        ChannelCloseOkHandler.getInstance();
+        ConnectionCloseMethodHandler.getInstance();
+        ConnectionCloseOkMethodHandler.getInstance();
+        ConnectionTuneOkMethodHandler.getInstance();
+        ConnectionSecureOkMethodHandler.getInstance();
+        ConnectionStartOkMethodHandler.getInstance();
+        ExchangeDeclareHandler.getInstance();
+        ExchangeDeleteHandler.getInstance();
+        ExchangeBoundHandler.getInstance();
+        BasicAckMethodHandler.getInstance();
+        BasicRecoverMethodHandler.getInstance();
+        BasicConsumeMethodHandler.getInstance();
+        BasicGetMethodHandler.getInstance();
+        BasicCancelMethodHandler.getInstance();
+        BasicPublishMethodHandler.getInstance();
+        BasicQosHandler.getInstance();
+        QueueBindHandler.getInstance();
+        QueueDeclareHandler.getInstance();
+        QueueDeleteHandler.getInstance();
+        QueuePurgeHandler.getInstance();
+        ChannelFlowHandler.getInstance();
+        TxSelectHandler.getInstance();
+        TxCommitHandler.getInstance();
+        TxRollbackHandler.getInstance();
+        BasicRejectMethodHandler.getInstance();
 
         _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
 
         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
-        frame2handlerMap.put(ConnectionCloseOkBody.class, ConnectionCloseOkMethodHandler.getInstance());
+
         _state2HandlersMap.put(AMQState.CONNECTION_CLOSING, frame2handlerMap);
 
-    }
+    } */
 
     public AMQState getCurrentState()
     {
@@ -214,18 +184,25 @@
 
     public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
     {
-        StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod());
-        if (handler != null)
-        {
+        MethodDispatcher dispatcher = _protocolSession.getMethodDispatcher();
 
-            checkChannel(evt, _protocolSession);
+        final int channelId = evt.getChannelId();
+        B body = evt.getMethod();
 
-            handler.methodReceived(this, evt);
+        if(channelId != 0 && _protocolSession.getChannel(channelId)== null)
+        {
+
+            if(! ((body instanceof ChannelOpenBody)
+                  || (body instanceof ChannelCloseOkBody)
+                  || (body instanceof ChannelCloseBody)))
+            {
+                throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed");
+            }
 
-            return true;
         }
 
-        return false;
+        return body.execute(dispatcher, channelId);
+
     }
 
     private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession)
@@ -239,6 +216,7 @@
         }
     }
 
+/*
     protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState,
         B frame)
     // throws IllegalStateTransitionException
@@ -260,6 +238,7 @@
             return handler;
         }
     }
+*/
 
     public void addStateListener(StateListener listener)
     {

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java Sat Nov 24 13:14:14 2007
@@ -29,7 +29,7 @@
  * the opportunity to update state.
  *
  */
-public interface StateAwareMethodListener <B extends AMQMethodBody>
+public interface StateAwareMethodListener<B extends AMQMethodBody>
 {
-    void methodReceived(AMQStateManager stateManager,  AMQMethodEvent<B> evt) throws AMQException;
+    void methodReceived(AMQStateManager stateManager,  B evt, int channelId) throws AMQException;
 }