You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/10 11:54:37 UTC

svn commit: r1630745 [2/2] - in /qpid/branches/QPID-6125-ProtocolRefactoring/java: bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ client/src/main/java...

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java Fri Oct 10 09:54:36 2014
@@ -119,12 +119,12 @@ public class ConnectionTuneOkBody extend
         return buf.toString();
     }
 
-    public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+    public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
     {
 
         int channelMax = buffer.readUnsignedShort();
         long frameMax = EncodingUtils.readUnsignedInteger(buffer);
         int heartbeat = buffer.readUnsignedShort();
-        return dispatcher.connectionTuneOk(channelMax, frameMax, heartbeat);
+        dispatcher.receiveConnectionTuneOk(channelMax, frameMax, heartbeat);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Fri Oct 10 09:54:36 2014
@@ -92,14 +92,14 @@ public class ContentBody implements AMQB
         return _payload;
     }
 
-    public static <T> T process(final int channel,
+    public static void process(final int channel,
                                   final MarkableDataInput in,
-                                  final MethodProcessor<T> methodProcessor, final long bodySize)
+                                  final MethodProcessor methodProcessor, final long bodySize)
             throws IOException
     {
         byte[] payload = new byte[(int)bodySize];
         in.readFully(payload);
-        return methodProcessor.messageContent(channel, payload);
+        methodProcessor.receiveMessageContent(channel, payload);
     }
 
     private static class BufferContentBody implements AMQBody

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Fri Oct 10 09:54:36 2014
@@ -155,9 +155,9 @@ public class ContentHeaderBody implement
         _bodySize = bodySize;
     }
 
-    public static <T> T process(final int channelId,
+    public static void process(final int channelId,
                                 final MarkableDataInput buffer,
-                                final MethodProcessor<T> methodProcessor, final long size)
+                                final MethodProcessor methodProcessor, final long size)
             throws IOException, AMQFrameDecodingException
     {
 
@@ -175,6 +175,6 @@ public class ContentHeaderBody implement
             properties = new BasicContentHeaderProperties();
         properties.populatePropertiesFromBuffer(buffer, propertyFlags, (int)(size-14));
 
-        return methodProcessor.messageHeader(channelId, properties, bodySize);
+        methodProcessor.receiveMessageHeader(channelId, properties, bodySize);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java Fri Oct 10 09:54:36 2014
@@ -122,13 +122,13 @@ public class ExchangeBoundBody extends A
         return buf.toString();
     }
 
-    public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+    public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
             throws IOException
     {
 
         AMQShortString exchange = buffer.readAMQShortString();
         AMQShortString routingKey = buffer.readAMQShortString();
         AMQShortString queue = buffer.readAMQShortString();
-        return dispatcher.exchangeBound(channelId, exchange, routingKey, queue);
+        dispatcher.receiveExchangeBound(channelId, exchange, routingKey, queue);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java Fri Oct 10 09:54:36 2014
@@ -108,12 +108,12 @@ public class ExchangeBoundOkBody extends
         return buf.toString();
     }
 
-    public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+    public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
             throws IOException
     {
 
         int replyCode = buffer.readUnsignedShort();
         AMQShortString replyText = buffer.readAMQShortString();
-        return dispatcher.exchangeBoundOk(channelId, replyCode, replyText);
+        dispatcher.receiveExchangeBoundOk(channelId, replyCode, replyText);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java Fri Oct 10 09:54:36 2014
@@ -204,9 +204,9 @@ public class ExchangeDeclareBody extends
         return buf.toString();
     }
 
-    public static <T> T process(final int channelId,
+    public static void process(final int channelId,
                                 final MarkableDataInput buffer,
-                                final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException
+                                final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
     {
 
         int ticket = buffer.readUnsignedShort();
@@ -219,6 +219,14 @@ public class ExchangeDeclareBody extends
         boolean internal = (bitfield & 0x8) == 0x8;
         boolean nowait = (bitfield & 0x10) == 0x10;
         FieldTable arguments = EncodingUtils.readFieldTable(buffer);
-        return dispatcher.exchangeDeclare(channelId, exchange, type, passive, durable, autoDelete, internal, nowait, arguments);
+        dispatcher.receiveExchangeDeclare(channelId,
+                                          exchange,
+                                          type,
+                                          passive,
+                                          durable,
+                                          autoDelete,
+                                          internal,
+                                          nowait,
+                                          arguments);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java Fri Oct 10 09:54:36 2014
@@ -138,7 +138,7 @@ public class ExchangeDeleteBody extends 
         return buf.toString();
     }
 
-    public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+    public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
             throws IOException
     {
 
@@ -147,6 +147,6 @@ public class ExchangeDeleteBody extends 
         byte bitfield = buffer.readByte();
         boolean ifUnused = (bitfield & 0x01) == 0x01;
         boolean nowait = (bitfield & 0x02) == 0x02;
-        return dispatcher.exchangeDelete(channelId, exchange, ifUnused, nowait);
+        dispatcher.receiveExchangeDelete(channelId, exchange, ifUnused, nowait);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java Fri Oct 10 09:54:36 2014
@@ -20,484 +20,506 @@
  */
 package org.apache.qpid.framing;
 
-public class FrameCreatingMethodProcessor implements MethodProcessor<AMQFrame>
+import java.util.ArrayList;
+import java.util.List;
+
+public class FrameCreatingMethodProcessor implements MethodProcessor
 {
-    private final MethodRegistry _methodRegistry;
+    private ProtocolVersion _protocolVersion;
+    
+    private final List<AMQDataBlock> _processedMethods = new ArrayList<>();
 
-    FrameCreatingMethodProcessor(final MethodRegistry methodRegistry)
+    public FrameCreatingMethodProcessor(final ProtocolVersion protocolVersion)
     {
-        _methodRegistry = methodRegistry;
+        _protocolVersion = protocolVersion;
     }
 
+    public List<AMQDataBlock> getProcessedMethods()
+    {
+        return _processedMethods;
+    }
+    
     @Override
-    public AMQFrame connectionStart(final short versionMajor,
-                                         final short versionMinor,
-                                         final FieldTable serverProperties,
-                                         final byte[] mechanisms,
-                                         final byte[] locales)
+    public void receiveConnectionStart(final short versionMajor,
+                                       final short versionMinor,
+                                       final FieldTable serverProperties,
+                                       final byte[] mechanisms,
+                                       final byte[] locales)
     {
-        return new AMQFrame(0, new ConnectionStartBody(versionMajor, versionMinor, serverProperties, mechanisms, locales));
+        _processedMethods.add(new AMQFrame(0, new ConnectionStartBody(versionMajor, versionMinor, serverProperties, mechanisms, locales)));
     }
 
     @Override
-    public AMQFrame connectionStartOk(final FieldTable clientProperties,
-                                           final AMQShortString mechanism,
-                                           final byte[] response,
-                                           final AMQShortString locale)
+    public void receiveConnectionStartOk(final FieldTable clientProperties,
+                                         final AMQShortString mechanism,
+                                         final byte[] response,
+                                         final AMQShortString locale)
     {
-        return new AMQFrame(0, new ConnectionStartOkBody(clientProperties, mechanism, response, locale));
+        _processedMethods.add(new AMQFrame(0, new ConnectionStartOkBody(clientProperties, mechanism, response, locale)));
     }
 
     @Override
-    public AMQFrame txSelect(final int channelId)
+    public void receiveTxSelect(final int channelId)
     {
-        return new AMQFrame(channelId, TxSelectBody.INSTANCE);
+        _processedMethods.add(new AMQFrame(channelId, TxSelectBody.INSTANCE));
     }
 
     @Override
-    public AMQFrame txSelectOk(final int channelId)
+    public void receiveTxSelectOk(final int channelId)
     {
-        return new AMQFrame(channelId, TxSelectOkBody.INSTANCE);
+        _processedMethods.add(new AMQFrame(channelId, TxSelectOkBody.INSTANCE));
     }
 
     @Override
-    public AMQFrame txCommit(final int channelId)
+    public void receiveTxCommit(final int channelId)
     {
-        return new AMQFrame(channelId, TxCommitBody.INSTANCE);
+        _processedMethods.add(new AMQFrame(channelId, TxCommitBody.INSTANCE));
     }
 
     @Override
-    public AMQFrame txCommitOk(final int channelId)
+    public void receiveTxCommitOk(final int channelId)
     {
-        return new AMQFrame(channelId, TxCommitOkBody.INSTANCE);
+        _processedMethods.add(new AMQFrame(channelId, TxCommitOkBody.INSTANCE));
     }
 
     @Override
-    public AMQFrame txRollback(final int channelId)
+    public void receiveTxRollback(final int channelId)
     {
-        return new AMQFrame(channelId, TxRollbackBody.INSTANCE);
+        _processedMethods.add(new AMQFrame(channelId, TxRollbackBody.INSTANCE));
     }
 
     @Override
-    public AMQFrame txRollbackOk(final int channelId)
+    public void receiveTxRollbackOk(final int channelId)
     {
-        return new AMQFrame(channelId, TxRollbackOkBody.INSTANCE);
+        _processedMethods.add(new AMQFrame(channelId, TxRollbackOkBody.INSTANCE));
     }
 
     @Override
-    public AMQFrame connectionSecure(final byte[] challenge)
+    public void receiveConnectionSecure(final byte[] challenge)
     {
-        return new AMQFrame(0, new ConnectionSecureBody(challenge));
+        _processedMethods.add(new AMQFrame(0, new ConnectionSecureBody(challenge)));
     }
 
     @Override
-    public AMQFrame connectionSecureOk(final byte[] response)
+    public void receiveConnectionSecureOk(final byte[] response)
     {
-        return new AMQFrame(0, new ConnectionSecureOkBody(response));
+        _processedMethods.add(new AMQFrame(0, new ConnectionSecureOkBody(response)));
     }
 
     @Override
-    public AMQFrame connectionTune(final int channelMax, final long frameMax, final int heartbeat)
+    public void receiveConnectionTune(final int channelMax, final long frameMax, final int heartbeat)
     {
-        return new AMQFrame(0, new ConnectionTuneBody(channelMax, frameMax, heartbeat));
+        _processedMethods.add(new AMQFrame(0, new ConnectionTuneBody(channelMax, frameMax, heartbeat)));
     }
 
     @Override
-    public AMQFrame connectionTuneOk(final int channelMax, final long frameMax, final int heartbeat)
+    public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat)
     {
-        return new AMQFrame(0, new ConnectionTuneOkBody(channelMax, frameMax, heartbeat));
+        _processedMethods.add(new AMQFrame(0, new ConnectionTuneOkBody(channelMax, frameMax, heartbeat)));
     }
 
     @Override
-    public AMQFrame connectionOpen(final AMQShortString virtualHost,
-                                        final AMQShortString capabilities,
-                                        final boolean insist)
+    public void receiveConnectionOpen(final AMQShortString virtualHost,
+                                      final AMQShortString capabilities,
+                                      final boolean insist)
     {
-        return new AMQFrame(0, new ConnectionOpenBody(virtualHost, capabilities, insist));
+        _processedMethods.add(new AMQFrame(0, new ConnectionOpenBody(virtualHost, capabilities, insist)));
     }
 
     @Override
-    public AMQFrame connectionOpenOk(final AMQShortString knownHosts)
+    public void receiveConnectionOpenOk(final AMQShortString knownHosts)
     {
-        return new AMQFrame(0, new ConnectionOpenOkBody(knownHosts));
+        _processedMethods.add(new AMQFrame(0, new ConnectionOpenOkBody(knownHosts)));
     }
 
     @Override
-    public AMQFrame connectionRedirect(final AMQShortString host, final AMQShortString knownHosts)
+    public void receiveConnectionRedirect(final AMQShortString host, final AMQShortString knownHosts)
     {
-        return new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), host, knownHosts));
+        _processedMethods.add(new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), host, knownHosts)));
     }
 
     @Override
-    public AMQFrame connectionClose(final int replyCode,
-                                         final AMQShortString replyText,
-                                         final int classId,
-                                         final int methodId)
+    public void receiveConnectionClose(final int replyCode,
+                                       final AMQShortString replyText,
+                                       final int classId,
+                                       final int methodId)
     {
-        return new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), replyCode, replyText, classId, methodId));
+        _processedMethods.add(new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), replyCode, replyText, classId, methodId)));
     }
 
     @Override
-    public AMQFrame connectionCloseOk()
+    public void receiveConnectionCloseOk()
     {
-        return new AMQFrame(0, ProtocolVersion.v8_0.equals(getProtocolVersion())
+        _processedMethods.add(new AMQFrame(0, ProtocolVersion.v8_0.equals(getProtocolVersion())
                 ? ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8
-                : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9);
+                : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9));
     }
 
     @Override
-    public AMQFrame channelOpen(final int channelId)
+    public void receiveChannelOpen(final int channelId)
     {
-        return new AMQFrame(channelId, new ChannelOpenBody());
+        _processedMethods.add(new AMQFrame(channelId, new ChannelOpenBody()));
     }
 
     @Override
-    public AMQFrame channelOpenOk(final int channelId)
+    public void receiveChannelOpenOk(final int channelId)
     {
-        return new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion())
+        _processedMethods.add(new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion())
                 ? ChannelOpenOkBody.INSTANCE_0_8
-                : ChannelOpenOkBody.INSTANCE_0_9);
+                : ChannelOpenOkBody.INSTANCE_0_9));
     }
 
     @Override
-    public AMQFrame channelFlow(final int channelId, final boolean active)
+    public void receiveChannelFlow(final int channelId, final boolean active)
     {
-        return new AMQFrame(channelId, new ChannelFlowBody(active));
+        _processedMethods.add(new AMQFrame(channelId, new ChannelFlowBody(active)));
     }
 
     @Override
-    public AMQFrame channelFlowOk(final int channelId, final boolean active)
+    public void receiveChannelFlowOk(final int channelId, final boolean active)
     {
-        return new AMQFrame(channelId, new ChannelFlowOkBody(active));
+        _processedMethods.add(new AMQFrame(channelId, new ChannelFlowOkBody(active)));
     }
 
     @Override
-    public AMQFrame channelAlert(final int channelId,
-                                      final int replyCode,
-                                      final AMQShortString replyText,
-                                      final FieldTable details)
+    public void receiveChannelAlert(final int channelId,
+                                    final int replyCode,
+                                    final AMQShortString replyText,
+                                    final FieldTable details)
     {
-        return new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details));
+        _processedMethods.add(new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details)));
     }
 
     @Override
-    public AMQFrame channelClose(final int channelId,
-                                      final int replyCode,
-                                      final AMQShortString replyText,
-                                      final int classId,
-                                      final int methodId)
+    public void receiveChannelClose(final int channelId,
+                                    final int replyCode,
+                                    final AMQShortString replyText,
+                                    final int classId,
+                                    final int methodId)
     {
-        return new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId));
+        _processedMethods.add(new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId)));
     }
 
     @Override
-    public AMQFrame channelCloseOk(final int channelId)
+    public void receiveChannelCloseOk(final int channelId)
     {
-        return new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE);
+        _processedMethods.add(new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE));
     }
 
     @Override
-    public AMQFrame accessRequest(final int channelId,
-                                       final AMQShortString realm,
-                                       final boolean exclusive,
-                                       final boolean passive,
-                                       final boolean active,
-                                       final boolean write,
-                                       final boolean read)
+    public void receiveAccessRequest(final int channelId,
+                                     final AMQShortString realm,
+                                     final boolean exclusive,
+                                     final boolean passive,
+                                     final boolean active,
+                                     final boolean write,
+                                     final boolean read)
     {
-        return new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read));
+        _processedMethods.add(new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read)));
     }
 
     @Override
-    public AMQFrame accessRequestOk(final int channelId, final int ticket)
+    public void receiveAccessRequestOk(final int channelId, final int ticket)
     {
-        return new AMQFrame(channelId, new AccessRequestOkBody(ticket));
+        _processedMethods.add(new AMQFrame(channelId, new AccessRequestOkBody(ticket)));
     }
 
     @Override
-    public AMQFrame exchangeDeclare(final int channelId,
-                                         final AMQShortString exchange,
-                                         final AMQShortString type,
-                                         final boolean passive,
-                                         final boolean durable,
-                                         final boolean autoDelete,
-                                         final boolean internal,
-                                         final boolean nowait, final FieldTable arguments)
+    public void receiveExchangeDeclare(final int channelId,
+                                       final AMQShortString exchange,
+                                       final AMQShortString type,
+                                       final boolean passive,
+                                       final boolean durable,
+                                       final boolean autoDelete,
+                                       final boolean internal,
+                                       final boolean nowait, final FieldTable arguments)
     {
-        return new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments));
+        _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments)));
     }
 
     @Override
-    public AMQFrame exchangeDeclareOk(final int channelId)
+    public void receiveExchangeDeclareOk(final int channelId)
     {
-        return new AMQFrame(channelId, new ExchangeDeclareOkBody());
+        _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareOkBody()));
     }
 
     @Override
-    public AMQFrame exchangeDelete(final int channelId,
-                                        final AMQShortString exchange,
-                                        final boolean ifUnused,
-                                        final boolean nowait)
+    public void receiveExchangeDelete(final int channelId,
+                                      final AMQShortString exchange,
+                                      final boolean ifUnused,
+                                      final boolean nowait)
     {
-        return new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait));
+        _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait)));
     }
 
     @Override
-    public AMQFrame exchangeDeleteOk(final int channelId)
+    public void receiveExchangeDeleteOk(final int channelId)
     {
-        return new AMQFrame(channelId, new ExchangeDeleteOkBody());
+        _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteOkBody()));
     }
 
     @Override
-    public AMQFrame exchangeBound(final int channelId,
-                                       final AMQShortString exchange,
-                                       final AMQShortString routingKey,
-                                       final AMQShortString queue)
+    public void receiveExchangeBound(final int channelId,
+                                     final AMQShortString exchange,
+                                     final AMQShortString routingKey,
+                                     final AMQShortString queue)
     {
-        return new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue));
+        _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue)));
     }
 
     @Override
-    public AMQFrame exchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText)
+    public void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText)
     {
-        return new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText));
+        _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText)));
     }
 
     @Override
-    public AMQFrame queueBindOk(final int channelId)
+    public void receiveQueueBindOk(final int channelId)
     {
-        return new AMQFrame(channelId, new QueueBindOkBody());
+        _processedMethods.add(new AMQFrame(channelId, new QueueBindOkBody()));
     }
 
     @Override
-    public AMQFrame queueUnbindOk(final int channelId)
+    public void receiveQueueUnbindOk(final int channelId)
     {
-        return new AMQFrame(channelId, new QueueUnbindOkBody());
+        _processedMethods.add(new AMQFrame(channelId, new QueueUnbindOkBody()));
     }
 
     @Override
-    public AMQFrame queueDeclare(final int channelId,
-                                      final AMQShortString queue,
-                                      final boolean passive,
-                                      final boolean durable,
-                                      final boolean exclusive,
-                                      final boolean autoDelete,
-                                      final boolean nowait,
-                                      final FieldTable arguments)
+    public void receiveQueueDeclare(final int channelId,
+                                    final AMQShortString queue,
+                                    final boolean passive,
+                                    final boolean durable,
+                                    final boolean exclusive,
+                                    final boolean autoDelete,
+                                    final boolean nowait,
+                                    final FieldTable arguments)
     {
-        return new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments));
+        _processedMethods.add(new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments)));
     }
 
     @Override
-    public AMQFrame queueDeclareOk(final int channelId,
-                                        final AMQShortString queue,
-                                        final long messageCount,
-                                        final long consumerCount)
+    public void receiveQueueDeclareOk(final int channelId,
+                                      final AMQShortString queue,
+                                      final long messageCount,
+                                      final long consumerCount)
     {
-        return new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount));
+        _processedMethods.add(new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount)));
     }
 
     @Override
-    public AMQFrame queueBind(final int channelId,
-                                   final AMQShortString queue,
-                                   final AMQShortString exchange,
-                                   final AMQShortString bindingKey,
-                                   final boolean nowait,
-                                   final FieldTable arguments)
+    public void receiveQueueBind(final int channelId,
+                                 final AMQShortString queue,
+                                 final AMQShortString exchange,
+                                 final AMQShortString bindingKey,
+                                 final boolean nowait,
+                                 final FieldTable arguments)
     {
-        return new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments));
+        _processedMethods.add(new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments)));
     }
 
     @Override
-    public AMQFrame queuePurge(final int channelId, final AMQShortString queue, final boolean nowait)
+    public void receiveQueuePurge(final int channelId, final AMQShortString queue, final boolean nowait)
     {
-        return new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait));
+        _processedMethods.add(new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait)));
     }
 
     @Override
-    public AMQFrame queuePurgeOk(final int channelId, final long messageCount)
+    public void receiveQueuePurgeOk(final int channelId, final long messageCount)
     {
-        return new AMQFrame(channelId, new QueuePurgeOkBody(messageCount));
+        _processedMethods.add(new AMQFrame(channelId, new QueuePurgeOkBody(messageCount)));
     }
 
     @Override
-    public AMQFrame queueDelete(final int channelId,
-                                     final AMQShortString queue,
-                                     final boolean ifUnused,
-                                     final boolean ifEmpty,
-                                     final boolean nowait)
+    public void receiveQueueDelete(final int channelId,
+                                   final AMQShortString queue,
+                                   final boolean ifUnused,
+                                   final boolean ifEmpty,
+                                   final boolean nowait)
     {
-        return new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait));
+        _processedMethods.add(new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait)));
     }
 
     @Override
-    public AMQFrame queueDeleteOk(final int channelId, final long messageCount)
+    public void receiveQueueDeleteOk(final int channelId, final long messageCount)
     {
-        return new AMQFrame(channelId, new QueueDeleteOkBody(messageCount));
+        _processedMethods.add(new AMQFrame(channelId, new QueueDeleteOkBody(messageCount)));
     }
 
     @Override
-    public AMQFrame queueUnbind(final int channelId,
-                                     final AMQShortString queue,
-                                     final AMQShortString exchange,
-                                     final AMQShortString bindingKey,
-                                     final FieldTable arguments)
+    public void receiveQueueUnbind(final int channelId,
+                                   final AMQShortString queue,
+                                   final AMQShortString exchange,
+                                   final AMQShortString bindingKey,
+                                   final FieldTable arguments)
     {
-        return new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments));
+        _processedMethods.add(new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments)));
     }
 
     @Override
-    public AMQFrame basicRecoverSyncOk(final int channelId)
+    public void receiveBasicRecoverSyncOk(final int channelId)
     {
-        return new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion()));
+        _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion())));
     }
 
     @Override
-    public AMQFrame basicRecover(final int channelId, final boolean requeue, final boolean sync)
+    public void receiveBasicRecover(final int channelId, final boolean requeue, final boolean sync)
     {
         if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync)
         {
-            return new AMQFrame(channelId, new BasicRecoverBody(requeue));
+            _processedMethods.add(new AMQFrame(channelId, new BasicRecoverBody(requeue)));
         }
         else
         {
-            return new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue));
+            _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue)));
         }
     }
 
     @Override
-    public AMQFrame basicQos(final int channelId,
-                                  final long prefetchSize,
-                                  final int prefetchCount,
-                                  final boolean global)
+    public void receiveBasicQos(final int channelId,
+                                final long prefetchSize,
+                                final int prefetchCount,
+                                final boolean global)
     {
-        return new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global));
+        _processedMethods.add(new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global)));
     }
 
     @Override
-    public AMQFrame basicQosOk(final int channelId)
+    public void receiveBasicQosOk(final int channelId)
     {
-        return new AMQFrame(channelId, new BasicQosOkBody());
+        _processedMethods.add(new AMQFrame(channelId, new BasicQosOkBody()));
     }
 
     @Override
-    public AMQFrame basicConsume(final int channelId,
-                                      final AMQShortString queue,
-                                      final AMQShortString consumerTag,
-                                      final boolean noLocal,
-                                      final boolean noAck,
-                                      final boolean exclusive,
-                                      final boolean nowait,
-                                      final FieldTable arguments)
+    public void receiveBasicConsume(final int channelId,
+                                    final AMQShortString queue,
+                                    final AMQShortString consumerTag,
+                                    final boolean noLocal,
+                                    final boolean noAck,
+                                    final boolean exclusive,
+                                    final boolean nowait,
+                                    final FieldTable arguments)
     {
-        return new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments));
+        _processedMethods.add(new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments)));
     }
 
     @Override
-    public AMQFrame basicConsumeOk(final int channelId, final AMQShortString consumerTag)
+    public void receiveBasicConsumeOk(final int channelId, final AMQShortString consumerTag)
     {
-        return new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag));
+        _processedMethods.add(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag)));
     }
 
     @Override
-    public AMQFrame basicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait)
+    public void receiveBasicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait)
     {
-        return new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait));
+        _processedMethods.add(new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait)));
     }
 
     @Override
-    public AMQFrame basicCancelOk(final int channelId, final AMQShortString consumerTag)
+    public void receiveBasicCancelOk(final int channelId, final AMQShortString consumerTag)
     {
-        return new AMQFrame(channelId, new BasicCancelOkBody(consumerTag));
+        _processedMethods.add(new AMQFrame(channelId, new BasicCancelOkBody(consumerTag)));
     }
 
     @Override
-    public AMQFrame basicPublish(final int channelId,
-                                      final AMQShortString exchange,
-                                      final AMQShortString routingKey,
-                                      final boolean mandatory,
-                                      final boolean immediate)
+    public void receiveBasicPublish(final int channelId,
+                                    final AMQShortString exchange,
+                                    final AMQShortString routingKey,
+                                    final boolean mandatory,
+                                    final boolean immediate)
     {
-        return new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate));
+        _processedMethods.add(new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate)));
     }
 
     @Override
-    public AMQFrame basicReturn(final int channelId, final int replyCode,
-                                final AMQShortString replyText,
-                                final AMQShortString exchange,
-                                final AMQShortString routingKey)
+    public void receiveBasicReturn(final int channelId, final int replyCode,
+                                   final AMQShortString replyText,
+                                   final AMQShortString exchange,
+                                   final AMQShortString routingKey)
     {
-        return new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey));
+        _processedMethods.add(new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey)));
     }
 
     @Override
-    public AMQFrame basicDeliver(final int channelId,
-                                      final AMQShortString consumerTag,
-                                      final long deliveryTag,
-                                      final boolean redelivered,
-                                      final AMQShortString exchange,
-                                      final AMQShortString routingKey)
+    public void receiveBasicDeliver(final int channelId,
+                                    final AMQShortString consumerTag,
+                                    final long deliveryTag,
+                                    final boolean redelivered,
+                                    final AMQShortString exchange,
+                                    final AMQShortString routingKey)
     {
-        return new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey));
+        _processedMethods.add(new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)));
     }
 
     @Override
-    public AMQFrame basicGet(final int channelId, final AMQShortString queue, final boolean noAck)
+    public void receiveBasicGet(final int channelId, final AMQShortString queue, final boolean noAck)
     {
-        return new AMQFrame(channelId, new BasicGetBody(0, queue, noAck));
+        _processedMethods.add(new AMQFrame(channelId, new BasicGetBody(0, queue, noAck)));
     }
 
     @Override
-    public AMQFrame basicGetOk(final int channelId,
-                                    final long deliveryTag,
-                                    final boolean redelivered,
-                                    final AMQShortString exchange,
-                                    final AMQShortString routingKey,
-                                    final long messageCount)
+    public void receiveBasicGetOk(final int channelId,
+                                  final long deliveryTag,
+                                  final boolean redelivered,
+                                  final AMQShortString exchange,
+                                  final AMQShortString routingKey,
+                                  final long messageCount)
+    {
+        _processedMethods.add(new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount)));
+    }
+
+    @Override
+    public void receiveBasicGetEmpty(final int channelId)
     {
-        return new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount));
+        _processedMethods.add(new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null)));
     }
 
     @Override
-    public AMQFrame basicGetEmpty(final int channelId)
+    public void receiveBasicAck(final int channelId, final long deliveryTag, final boolean multiple)
     {
-        return new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null));
+        _processedMethods.add(new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple)));
     }
 
     @Override
-    public AMQFrame basicAck(final int channelId, final long deliveryTag, final boolean multiple)
+    public void receiveBasicReject(final int channelId, final long deliveryTag, final boolean requeue)
     {
-        return new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple));
+        _processedMethods.add(new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue)));
     }
 
     @Override
-    public AMQFrame basicReject(final int channelId, final long deliveryTag, final boolean requeue)
+    public void receiveHeartbeat()
     {
-        return new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue));
+        _processedMethods.add(new AMQFrame(0, new HeartbeatBody()));
     }
 
     @Override
-    public AMQFrame heartbeat()
+    public ProtocolVersion getProtocolVersion()
     {
-        return new AMQFrame(0, new HeartbeatBody());
+        return _protocolVersion;
     }
 
-    private ProtocolVersion getProtocolVersion()
+    public void setProtocolVersion(final ProtocolVersion protocolVersion)
+    {
+        _protocolVersion = protocolVersion;
+    }
+
+    @Override
+    public void receiveMessageContent(final int channelId, final byte[] data)
     {
-        return _methodRegistry.getProtocolVersion();
+        _processedMethods.add(new AMQFrame(channelId, new ContentBody(data)));
     }
 
     @Override
-    public AMQFrame messageContent(final int channelId, final byte[] data)
+    public void receiveMessageHeader(final int channelId,
+                                     final BasicContentHeaderProperties properties,
+                                     final long bodySize)
     {
-        return new AMQFrame(channelId, new ContentBody(data));
+        _processedMethods.add(new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize)));
     }
 
     @Override
-    public AMQFrame messageHeader(final int channelId,
-                                 final BasicContentHeaderProperties properties,
-                                 final long bodySize)
+    public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation)
     {
-        return new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize));
+        _processedMethods.add(protocolInitiation);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Fri Oct 10 09:54:36 2014
@@ -81,9 +81,9 @@ public class HeartbeatBody implements AM
         return new AMQFrame(0, this);
     }
 
-    public static <T> T process(final int channel,
+    public static void process(final int channel,
                             final MarkableDataInput in,
-                            final MethodProcessor<T> processor,
+                            final MethodProcessor processor,
                             final long bodySize) throws IOException
     {
 
@@ -91,6 +91,6 @@ public class HeartbeatBody implements AM
         {
             in.skip(bodySize);
         }
-        return processor.heartbeat();
+        processor.receiveHeartbeat();
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java Fri Oct 10 09:54:36 2014
@@ -20,178 +20,182 @@
  */
 package org.apache.qpid.framing;
 
-public interface MethodProcessor<T>
+public interface MethodProcessor
 {
-    T connectionStart(short versionMajor,
-                      short versionMinor,
-                      FieldTable serverProperties,
-                      byte[] mechanisms,
-                      byte[] locales);
+    ProtocolVersion getProtocolVersion();
 
-    T connectionStartOk(FieldTable clientProperties,
-                        AMQShortString mechanism,
-                        byte[] response,
-                        AMQShortString locale);
+    void receiveConnectionStart(short versionMajor,
+                                short versionMinor,
+                                FieldTable serverProperties,
+                                byte[] mechanisms,
+                                byte[] locales);
 
-    T txSelect(int channelId);
+    void receiveConnectionStartOk(FieldTable clientProperties,
+                                  AMQShortString mechanism,
+                                  byte[] response,
+                                  AMQShortString locale);
 
-    T txSelectOk(int channelId);
+    void receiveTxSelect(int channelId);
 
-    T txCommit(int channelId);
+    void receiveTxSelectOk(int channelId);
 
-    T txCommitOk(int channelId);
+    void receiveTxCommit(int channelId);
 
-    T txRollback(int channelId);
+    void receiveTxCommitOk(int channelId);
 
-    T txRollbackOk(int channelId);
+    void receiveTxRollback(int channelId);
 
-    T connectionSecure(byte[] challenge);
+    void receiveTxRollbackOk(int channelId);
 
-    T connectionSecureOk(byte[] response);
+    void receiveConnectionSecure(byte[] challenge);
 
-    T connectionTune(int channelMax, long frameMax, int heartbeat);
+    void receiveConnectionSecureOk(byte[] response);
 
-    T connectionTuneOk(int channelMax, long frameMax, int heartbeat);
+    void receiveConnectionTune(int channelMax, long frameMax, int heartbeat);
 
-    T connectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist);
+    void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat);
 
-    T connectionOpenOk(AMQShortString knownHosts);
+    void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist);
 
-    T connectionRedirect(AMQShortString host, AMQShortString knownHosts);
+    void receiveConnectionOpenOk(AMQShortString knownHosts);
 
-    T connectionClose(int replyCode, AMQShortString replyText, int classId, int methodId);
+    void receiveConnectionRedirect(AMQShortString host, AMQShortString knownHosts);
 
-    T connectionCloseOk();
+    void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId);
 
-    T channelOpen(int channelId);
+    void receiveConnectionCloseOk();
 
-    T channelOpenOk(int channelId);
+    void receiveChannelOpen(int channelId);
 
-    T channelFlow(int channelId, boolean active);
+    void receiveChannelOpenOk(int channelId);
 
-    T channelFlowOk(int channelId, boolean active);
+    void receiveChannelFlow(int channelId, boolean active);
 
-    T channelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details);
+    void receiveChannelFlowOk(int channelId, boolean active);
 
-    T channelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId);
+    void receiveChannelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details);
 
-    T channelCloseOk(int channelId);
+    void receiveChannelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId);
 
-    T accessRequest(int channelId,
-                    AMQShortString realm,
-                    boolean exclusive,
-                    boolean passive,
-                    boolean active,
-                    boolean write, boolean read);
+    void receiveChannelCloseOk(int channelId);
 
-    T accessRequestOk(int channelId, int ticket);
+    void receiveAccessRequest(int channelId,
+                              AMQShortString realm,
+                              boolean exclusive,
+                              boolean passive,
+                              boolean active,
+                              boolean write, boolean read);
 
-    T exchangeDeclare(int channelId,
-                      AMQShortString exchange,
-                      AMQShortString type,
-                      boolean passive,
-                      boolean durable,
-                      boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments);
+    void receiveAccessRequestOk(int channelId, int ticket);
 
-    T exchangeDeclareOk(int channelId);
+    void receiveExchangeDeclare(int channelId,
+                                AMQShortString exchange,
+                                AMQShortString type,
+                                boolean passive,
+                                boolean durable,
+                                boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments);
 
-    T exchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait);
+    void receiveExchangeDeclareOk(int channelId);
 
-    T exchangeDeleteOk(int channelId);
+    void receiveExchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait);
 
-    T exchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue);
+    void receiveExchangeDeleteOk(int channelId);
 
-    T exchangeBoundOk(int channelId, int replyCode, AMQShortString replyText);
+    void receiveExchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue);
 
-    T queueBindOk(int channelId);
+    void receiveExchangeBoundOk(int channelId, int replyCode, AMQShortString replyText);
 
-    T queueUnbindOk(final int channelId);
+    void receiveQueueBindOk(int channelId);
 
-    T queueDeclare(int channelId,
-                   AMQShortString queue,
-                   boolean passive,
-                   boolean durable,
-                   boolean exclusive,
-                   boolean autoDelete, boolean nowait, FieldTable arguments);
+    void receiveQueueUnbindOk(final int channelId);
 
-    T queueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount);
+    void receiveQueueDeclare(int channelId,
+                             AMQShortString queue,
+                             boolean passive,
+                             boolean durable,
+                             boolean exclusive,
+                             boolean autoDelete, boolean nowait, FieldTable arguments);
 
-    T queueBind(int channelId,
-                AMQShortString queue,
-                AMQShortString exchange,
-                AMQShortString bindingKey,
-                boolean nowait, FieldTable arguments);
+    void receiveQueueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount);
 
-    T queuePurge(int channelId, AMQShortString queue, boolean nowait);
+    void receiveQueueBind(int channelId,
+                          AMQShortString queue,
+                          AMQShortString exchange,
+                          AMQShortString bindingKey,
+                          boolean nowait, FieldTable arguments);
 
-    T queuePurgeOk(int channelId, long messageCount);
+    void receiveQueuePurge(int channelId, AMQShortString queue, boolean nowait);
 
-    T queueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait);
+    void receiveQueuePurgeOk(int channelId, long messageCount);
 
-    T queueDeleteOk(int channelId, long messageCount);
+    void receiveQueueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait);
 
-    T queueUnbind(int channelId,
-                  AMQShortString queue,
-                  AMQShortString exchange,
-                  AMQShortString bindingKey,
-                  FieldTable arguments);
+    void receiveQueueDeleteOk(int channelId, long messageCount);
 
-    T basicRecoverSyncOk(int channelId);
+    void receiveQueueUnbind(int channelId,
+                            AMQShortString queue,
+                            AMQShortString exchange,
+                            AMQShortString bindingKey,
+                            FieldTable arguments);
 
-    T basicRecover(int channelId, final boolean requeue, boolean sync);
+    void receiveBasicRecoverSyncOk(int channelId);
 
-    T basicQos(int channelId, long prefetchSize, int prefetchCount, boolean global);
+    void receiveBasicRecover(int channelId, final boolean requeue, boolean sync);
 
-    T basicQosOk(int channelId);
+    void receiveBasicQos(int channelId, long prefetchSize, int prefetchCount, boolean global);
 
-    T basicConsume(int channelId,
-                   AMQShortString queue,
-                   AMQShortString consumerTag,
-                   boolean noLocal,
-                   boolean noAck,
-                   boolean exclusive, boolean nowait, FieldTable arguments);
+    void receiveBasicQosOk(int channelId);
 
-    T basicConsumeOk(int channelId, AMQShortString consumerTag);
+    void receiveBasicConsume(int channelId,
+                             AMQShortString queue,
+                             AMQShortString consumerTag,
+                             boolean noLocal,
+                             boolean noAck,
+                             boolean exclusive, boolean nowait, FieldTable arguments);
 
-    T basicCancel(int channelId, AMQShortString consumerTag, boolean noWait);
+    void receiveBasicConsumeOk(int channelId, AMQShortString consumerTag);
 
-    T basicCancelOk(int channelId, AMQShortString consumerTag);
+    void receiveBasicCancel(int channelId, AMQShortString consumerTag, boolean noWait);
 
-    T basicPublish(int channelId,
-                   AMQShortString exchange,
-                   AMQShortString routingKey,
-                   boolean mandatory,
-                   boolean immediate);
+    void receiveBasicCancelOk(int channelId, AMQShortString consumerTag);
 
-    T basicReturn(final int channelId,
-                  int replyCode,
-                  AMQShortString replyText,
-                  AMQShortString exchange,
-                  AMQShortString routingKey);
+    void receiveBasicPublish(int channelId,
+                             AMQShortString exchange,
+                             AMQShortString routingKey,
+                             boolean mandatory,
+                             boolean immediate);
 
-    T basicDeliver(int channelId,
-                   AMQShortString consumerTag,
-                   long deliveryTag,
-                   boolean redelivered,
-                   AMQShortString exchange, AMQShortString routingKey);
+    void receiveBasicReturn(final int channelId,
+                            int replyCode,
+                            AMQShortString replyText,
+                            AMQShortString exchange,
+                            AMQShortString routingKey);
 
-    T basicGet(int channelId, AMQShortString queue, boolean noAck);
+    void receiveBasicDeliver(int channelId,
+                             AMQShortString consumerTag,
+                             long deliveryTag,
+                             boolean redelivered,
+                             AMQShortString exchange, AMQShortString routingKey);
 
-    T basicGetOk(int channelId,
-                 long deliveryTag,
-                 boolean redelivered,
-                 AMQShortString exchange,
-                 AMQShortString routingKey, long messageCount);
+    void receiveBasicGet(int channelId, AMQShortString queue, boolean noAck);
 
-    T basicGetEmpty(int channelId);
+    void receiveBasicGetOk(int channelId,
+                           long deliveryTag,
+                           boolean redelivered,
+                           AMQShortString exchange,
+                           AMQShortString routingKey, long messageCount);
 
-    T basicAck(int channelId, long deliveryTag, boolean multiple);
+    void receiveBasicGetEmpty(int channelId);
 
-    T basicReject(int channelId, long deliveryTag, boolean requeue);
+    void receiveBasicAck(int channelId, long deliveryTag, boolean multiple);
 
-    T heartbeat();
+    void receiveBasicReject(int channelId, long deliveryTag, boolean requeue);
 
-    T messageContent(int channelId, byte[] data);
+    void receiveHeartbeat();
 
-    T messageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize);
+    void receiveMessageContent(int channelId, byte[] data);
+
+    void receiveMessageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize);
+
+    void receiveProtocolHeader(ProtocolInitiation protocolInitiation);
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java Fri Oct 10 09:54:36 2014
@@ -31,14 +31,12 @@ package org.apache.qpid.framing;
 
 public final class MethodRegistry
 {
-    private final FrameCreatingMethodProcessor _methodProcessor;
     private ProtocolVersion _protocolVersion;
 
 
     public MethodRegistry(ProtocolVersion pv)
     {
         _protocolVersion = pv;
-        _methodProcessor = new FrameCreatingMethodProcessor(this);
     }
 
     public void setProtocolVersion(final ProtocolVersion protocolVersion)
@@ -555,10 +553,5 @@ public final class MethodRegistry
         return _protocolVersion;
     }
 
-    public FrameCreatingMethodProcessor getMethodProcessor()
-    {
-        return _methodProcessor;
-    }
-
 
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java Fri Oct 10 09:54:36 2014
@@ -165,9 +165,9 @@ public class QueueBindBody extends AMQMe
         return buf.toString();
     }
 
-    public static <T> T process(final int channelId,
+    public static void process(final int channelId,
                                 final MarkableDataInput buffer,
-                                final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException
+                                final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
     {
 
         int ticket = buffer.readUnsignedShort();
@@ -176,6 +176,6 @@ public class QueueBindBody extends AMQMe
         AMQShortString bindingKey = buffer.readAMQShortString();
         boolean nowait = (buffer.readByte() & 0x01) == 0x01;
         FieldTable arguments = EncodingUtils.readFieldTable(buffer);
-        return dispatcher.queueBind(channelId, queue, exchange, bindingKey, nowait, arguments);
+        dispatcher.receiveQueueBind(channelId, queue, exchange, bindingKey, nowait, arguments);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java Fri Oct 10 09:54:36 2014
@@ -191,9 +191,9 @@ public class QueueDeclareBody extends AM
         return buf.toString();
     }
 
-    public static <T> T process(final int channelId,
+    public static void process(final int channelId,
                                 final MarkableDataInput buffer,
-                                final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException
+                                final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
     {
 
         int ticket = buffer.readUnsignedShort();
@@ -206,6 +206,6 @@ public class QueueDeclareBody extends AM
         boolean autoDelete = (bitfield & 0x08 ) == 0x08;
         boolean nowait = (bitfield & 0x010 ) == 0x010;
         FieldTable arguments = EncodingUtils.readFieldTable(buffer);
-        return dispatcher.queueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments);
+        dispatcher.receiveQueueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java Fri Oct 10 09:54:36 2014
@@ -120,13 +120,13 @@ public class QueueDeclareOkBody extends 
         return buf.toString();
     }
 
-    public static <T> T process(final int channelId,
+    public static void process(final int channelId,
                                 final MarkableDataInput buffer,
-                                final MethodProcessor<T> dispatcher) throws IOException
+                                final MethodProcessor dispatcher) throws IOException
     {
         AMQShortString queue = buffer.readAMQShortString();
         long messageCount = EncodingUtils.readUnsignedInteger(buffer);
         long consumerCount = EncodingUtils.readUnsignedInteger(buffer);
-        return dispatcher.queueDeclareOk(channelId, queue, messageCount, consumerCount);
+        dispatcher.receiveQueueDeclareOk(channelId, queue, messageCount, consumerCount);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java Fri Oct 10 09:54:36 2014
@@ -151,9 +151,9 @@ public class QueueDeleteBody extends AMQ
         return buf.toString();
     }
 
-    public static <T> T process(final int channelId,
+    public static void process(final int channelId,
                                 final MarkableDataInput buffer,
-                                final MethodProcessor<T> dispatcher) throws IOException
+                                final MethodProcessor dispatcher) throws IOException
     {
 
         int ticket = buffer.readUnsignedShort();
@@ -163,6 +163,6 @@ public class QueueDeleteBody extends AMQ
         boolean ifUnused = (bitfield & 0x01) == 0x01;
         boolean ifEmpty = (bitfield & 0x02) == 0x02;
         boolean nowait = (bitfield & 0x04) == 0x04;
-        return dispatcher.queueDelete(channelId, queue, ifUnused, ifEmpty, nowait);
+        dispatcher.receiveQueueDelete(channelId, queue, ifUnused, ifEmpty, nowait);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java Fri Oct 10 09:54:36 2014
@@ -95,11 +95,11 @@ public class QueueDeleteOkBody extends A
         return buf.toString();
     }
 
-    public static <T> T process(final int channelId,
+    public static void process(final int channelId,
                                 final MarkableDataInput buffer,
-                                final MethodProcessor<T> dispatcher) throws IOException
+                                final MethodProcessor dispatcher) throws IOException
     {
         long messageCount = EncodingUtils.readUnsignedInteger(buffer);
-        return dispatcher.queueDeleteOk(channelId, messageCount);
+        dispatcher.receiveQueueDeleteOk(channelId, messageCount);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java Fri Oct 10 09:54:36 2014
@@ -125,14 +125,14 @@ public class QueuePurgeBody extends AMQM
         return buf.toString();
     }
 
-    public static <T> T process(final int channelId,
+    public static void process(final int channelId,
                                 final MarkableDataInput buffer,
-                                final MethodProcessor<T> dispatcher) throws IOException
+                                final MethodProcessor dispatcher) throws IOException
     {
 
         int ticket = buffer.readUnsignedShort();
         AMQShortString queue = buffer.readAMQShortString();
         boolean nowait = (buffer.readByte() & 0x01) == 0x01;
-        return dispatcher.queuePurge(channelId, queue, nowait);
+        dispatcher.receiveQueuePurge(channelId, queue, nowait);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java Fri Oct 10 09:54:36 2014
@@ -95,11 +95,11 @@ public class QueuePurgeOkBody extends AM
         return buf.toString();
     }
 
-    public static <T> T process(final int channelId,
+    public static void process(final int channelId,
                                 final MarkableDataInput buffer,
-                                final MethodProcessor<T> dispatcher) throws IOException
+                                final MethodProcessor dispatcher) throws IOException
     {
         long messageCount = EncodingUtils.readUnsignedInteger(buffer);
-        return dispatcher.queuePurgeOk(channelId, messageCount);
+        dispatcher.receiveQueuePurgeOk(channelId, messageCount);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java Fri Oct 10 09:54:36 2014
@@ -147,9 +147,9 @@ public class QueueUnbindBody extends AMQ
         return buf.toString();
     }
 
-    public static <T> T process(final int channelId,
+    public static void process(final int channelId,
                                 final MarkableDataInput buffer,
-                                final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException
+                                final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
     {
 
         int ticket = buffer.readUnsignedShort();
@@ -157,6 +157,6 @@ public class QueueUnbindBody extends AMQ
         AMQShortString exchange = buffer.readAMQShortString();
         AMQShortString routingKey = buffer.readAMQShortString();
         FieldTable arguments = EncodingUtils.readFieldTable(buffer);
-        return dispatcher.queueUnbind(channelId, queue, exchange, routingKey, arguments);
+        dispatcher.receiveQueueUnbind(channelId, queue, exchange, routingKey, arguments);
     }
 }

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Fri Oct 10 09:54:36 2014
@@ -25,7 +25,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.List;
 
 import junit.framework.TestCase;
 
@@ -33,19 +33,21 @@ import org.apache.qpid.framing.AMQDataBl
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQFrameDecodingException;
 import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.framing.FrameCreatingMethodProcessor;
 import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.ProtocolVersion;
 
 public class AMQDecoderTest extends TestCase
 {
 
     private AMQDecoder _decoder;
+    private FrameCreatingMethodProcessor _methodProcessor;
 
 
     public void setUp()
     {
-        _decoder = new AMQDecoder(false, new MethodRegistry(ProtocolVersion.v0_91));
+        _methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91);
+        _decoder = new AMQDecoder(false, _methodProcessor);
     }
    
     
@@ -59,7 +61,8 @@ public class AMQDecoderTest extends Test
     public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException
     {
         ByteBuffer msg = getHeartbeatBodyBuffer();
-        ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg);
+        _decoder.decodeBuffer(msg);
+        List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods();
         if (frames.get(0) instanceof AMQFrame)
         {
             assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType());
@@ -79,9 +82,12 @@ public class AMQDecoderTest extends Test
         msgA.limit(msgaLimit);
         msg.position(msgbPos);
         ByteBuffer msgB = msg.slice();
-        ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msgA);
+
+        _decoder.decodeBuffer(msgA);
+        List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods();
         assertEquals(0, frames.size());
-        frames = _decoder.decodeBuffer(msgB);
+
+        _decoder.decodeBuffer(msgB);
         assertEquals(1, frames.size());
         if (frames.get(0) instanceof AMQFrame)
         {
@@ -101,7 +107,8 @@ public class AMQDecoderTest extends Test
         msg.put(msgA);
         msg.put(msgB);
         msg.flip();
-        ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg);
+        _decoder.decodeBuffer(msg);
+        List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods();
         assertEquals(2, frames.size());
         for (AMQDataBlock frame : frames)
         {
@@ -138,12 +145,15 @@ public class AMQDecoderTest extends Test
         sliceB.put(msgC);
         sliceB.flip();
         msgC.limit(limit);
-        
-        ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(sliceA);
+
+        _decoder.decodeBuffer(sliceA);
+        List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods();
         assertEquals(1, frames.size());
-        frames = _decoder.decodeBuffer(sliceB);
+        frames.clear();
+        _decoder.decodeBuffer(sliceB);
         assertEquals(1, frames.size());
-        frames = _decoder.decodeBuffer(msgC);
+        frames.clear();
+        _decoder.decodeBuffer(msgC);
         assertEquals(1, frames.size());
         for (AMQDataBlock frame : frames)
         {

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java Fri Oct 10 09:54:36 2014
@@ -27,7 +27,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
@@ -41,6 +40,7 @@ import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 
+import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQDataBlockDecoder;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQFrameDecodingException;
@@ -51,7 +51,7 @@ import org.apache.qpid.framing.Connectio
 import org.apache.qpid.framing.ConnectionStartOkBody;
 import org.apache.qpid.framing.ConnectionTuneOkBody;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.FrameCreatingMethodProcessor;
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.server.model.AuthenticationProvider;
@@ -110,11 +110,11 @@ public class MaxFrameSizeTest extends Qp
                                 {
 
                                     @Override
-                                    public void evaluate(final Socket socket, final List<AMQFrame> frames)
+                                    public void evaluate(final Socket socket, final List<AMQDataBlock> frames)
                                     {
                                         if(!socket.isClosed())
                                         {
-                                            AMQFrame lastFrame = frames.get(frames.size() - 1);
+                                            AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1);
                                             assertTrue("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE, lastFrame.getBodyFrame() instanceof ConnectionCloseBody);
                                         }
                                     }
@@ -159,11 +159,11 @@ public class MaxFrameSizeTest extends Qp
                                 {
 
                                     @Override
-                                    public void evaluate(final Socket socket, final List<AMQFrame> frames)
+                                    public void evaluate(final Socket socket, final List<AMQDataBlock> frames)
                                     {
                                         if(!socket.isClosed())
                                         {
-                                            AMQFrame lastFrame = frames.get(frames.size() - 1);
+                                            AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1);
                                             assertTrue("Connection should not be possible with a frame size larger than the broker requested", lastFrame.getBodyFrame() instanceof ConnectionCloseBody);
                                         }
                                     }
@@ -173,7 +173,7 @@ public class MaxFrameSizeTest extends Qp
 
     private static interface ResultEvaluator
     {
-        void evaluate(Socket socket, List<AMQFrame> frames);
+        void evaluate(Socket socket, List<AMQDataBlock> frames);
     }
 
     private void doAMQP08test(int frameSize, ResultEvaluator evaluator)
@@ -236,17 +236,14 @@ public class MaxFrameSizeTest extends Qp
         byte[] serverData = baos.toByteArray();
         ByteArrayDataInput badi = new ByteArrayDataInput(serverData);
         AMQDataBlockDecoder datablockDecoder = new AMQDataBlockDecoder();
-        final MethodRegistry methodRegistry_0_91 = new MethodRegistry(ProtocolVersion.v0_91);
+        final FrameCreatingMethodProcessor methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91);
 
-        List<AMQFrame> frames = new ArrayList<>();
         while (datablockDecoder.decodable(badi))
         {
-            frames.add(datablockDecoder.createAndPopulateFrame(methodRegistry_0_91.getProtocolVersion(),
-                                                               methodRegistry_0_91.getMethodProcessor(),
-                                                               badi));
+            datablockDecoder.processInput(methodProcessor, badi);
         }
 
-        evaluator.evaluate(socket, frames);
+        evaluator.evaluate(socket, methodProcessor.getProcessedMethods());
     }
 
     private static class TestClientDelegate extends ClientDelegate



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org