You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/10 11:54:37 UTC
svn commit: r1630745 [2/2] - in
/qpid/branches/QPID-6125-ProtocolRefactoring/java:
bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/
client/src/main/java...
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java Fri Oct 10 09:54:36 2014
@@ -119,12 +119,12 @@ public class ConnectionTuneOkBody extend
return buf.toString();
}
- public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
{
int channelMax = buffer.readUnsignedShort();
long frameMax = EncodingUtils.readUnsignedInteger(buffer);
int heartbeat = buffer.readUnsignedShort();
- return dispatcher.connectionTuneOk(channelMax, frameMax, heartbeat);
+ dispatcher.receiveConnectionTuneOk(channelMax, frameMax, heartbeat);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Fri Oct 10 09:54:36 2014
@@ -92,14 +92,14 @@ public class ContentBody implements AMQB
return _payload;
}
- public static <T> T process(final int channel,
+ public static void process(final int channel,
final MarkableDataInput in,
- final MethodProcessor<T> methodProcessor, final long bodySize)
+ final MethodProcessor methodProcessor, final long bodySize)
throws IOException
{
byte[] payload = new byte[(int)bodySize];
in.readFully(payload);
- return methodProcessor.messageContent(channel, payload);
+ methodProcessor.receiveMessageContent(channel, payload);
}
private static class BufferContentBody implements AMQBody
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Fri Oct 10 09:54:36 2014
@@ -155,9 +155,9 @@ public class ContentHeaderBody implement
_bodySize = bodySize;
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> methodProcessor, final long size)
+ final MethodProcessor methodProcessor, final long size)
throws IOException, AMQFrameDecodingException
{
@@ -175,6 +175,6 @@ public class ContentHeaderBody implement
properties = new BasicContentHeaderProperties();
properties.populatePropertiesFromBuffer(buffer, propertyFlags, (int)(size-14));
- return methodProcessor.messageHeader(channelId, properties, bodySize);
+ methodProcessor.receiveMessageHeader(channelId, properties, bodySize);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java Fri Oct 10 09:54:36 2014
@@ -122,13 +122,13 @@ public class ExchangeBoundBody extends A
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException
{
AMQShortString exchange = buffer.readAMQShortString();
AMQShortString routingKey = buffer.readAMQShortString();
AMQShortString queue = buffer.readAMQShortString();
- return dispatcher.exchangeBound(channelId, exchange, routingKey, queue);
+ dispatcher.receiveExchangeBound(channelId, exchange, routingKey, queue);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java Fri Oct 10 09:54:36 2014
@@ -108,12 +108,12 @@ public class ExchangeBoundOkBody extends
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException
{
int replyCode = buffer.readUnsignedShort();
AMQShortString replyText = buffer.readAMQShortString();
- return dispatcher.exchangeBoundOk(channelId, replyCode, replyText);
+ dispatcher.receiveExchangeBoundOk(channelId, replyCode, replyText);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java Fri Oct 10 09:54:36 2014
@@ -204,9 +204,9 @@ public class ExchangeDeclareBody extends
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException
+ final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
{
int ticket = buffer.readUnsignedShort();
@@ -219,6 +219,14 @@ public class ExchangeDeclareBody extends
boolean internal = (bitfield & 0x8) == 0x8;
boolean nowait = (bitfield & 0x10) == 0x10;
FieldTable arguments = EncodingUtils.readFieldTable(buffer);
- return dispatcher.exchangeDeclare(channelId, exchange, type, passive, durable, autoDelete, internal, nowait, arguments);
+ dispatcher.receiveExchangeDeclare(channelId,
+ exchange,
+ type,
+ passive,
+ durable,
+ autoDelete,
+ internal,
+ nowait,
+ arguments);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java Fri Oct 10 09:54:36 2014
@@ -138,7 +138,7 @@ public class ExchangeDeleteBody extends
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException
{
@@ -147,6 +147,6 @@ public class ExchangeDeleteBody extends
byte bitfield = buffer.readByte();
boolean ifUnused = (bitfield & 0x01) == 0x01;
boolean nowait = (bitfield & 0x02) == 0x02;
- return dispatcher.exchangeDelete(channelId, exchange, ifUnused, nowait);
+ dispatcher.receiveExchangeDelete(channelId, exchange, ifUnused, nowait);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java Fri Oct 10 09:54:36 2014
@@ -20,484 +20,506 @@
*/
package org.apache.qpid.framing;
-public class FrameCreatingMethodProcessor implements MethodProcessor<AMQFrame>
+import java.util.ArrayList;
+import java.util.List;
+
+public class FrameCreatingMethodProcessor implements MethodProcessor
{
- private final MethodRegistry _methodRegistry;
+ private ProtocolVersion _protocolVersion;
+
+ private final List<AMQDataBlock> _processedMethods = new ArrayList<>();
- FrameCreatingMethodProcessor(final MethodRegistry methodRegistry)
+ public FrameCreatingMethodProcessor(final ProtocolVersion protocolVersion)
{
- _methodRegistry = methodRegistry;
+ _protocolVersion = protocolVersion;
}
+ public List<AMQDataBlock> getProcessedMethods()
+ {
+ return _processedMethods;
+ }
+
@Override
- public AMQFrame connectionStart(final short versionMajor,
- final short versionMinor,
- final FieldTable serverProperties,
- final byte[] mechanisms,
- final byte[] locales)
+ public void receiveConnectionStart(final short versionMajor,
+ final short versionMinor,
+ final FieldTable serverProperties,
+ final byte[] mechanisms,
+ final byte[] locales)
{
- return new AMQFrame(0, new ConnectionStartBody(versionMajor, versionMinor, serverProperties, mechanisms, locales));
+ _processedMethods.add(new AMQFrame(0, new ConnectionStartBody(versionMajor, versionMinor, serverProperties, mechanisms, locales)));
}
@Override
- public AMQFrame connectionStartOk(final FieldTable clientProperties,
- final AMQShortString mechanism,
- final byte[] response,
- final AMQShortString locale)
+ public void receiveConnectionStartOk(final FieldTable clientProperties,
+ final AMQShortString mechanism,
+ final byte[] response,
+ final AMQShortString locale)
{
- return new AMQFrame(0, new ConnectionStartOkBody(clientProperties, mechanism, response, locale));
+ _processedMethods.add(new AMQFrame(0, new ConnectionStartOkBody(clientProperties, mechanism, response, locale)));
}
@Override
- public AMQFrame txSelect(final int channelId)
+ public void receiveTxSelect(final int channelId)
{
- return new AMQFrame(channelId, TxSelectBody.INSTANCE);
+ _processedMethods.add(new AMQFrame(channelId, TxSelectBody.INSTANCE));
}
@Override
- public AMQFrame txSelectOk(final int channelId)
+ public void receiveTxSelectOk(final int channelId)
{
- return new AMQFrame(channelId, TxSelectOkBody.INSTANCE);
+ _processedMethods.add(new AMQFrame(channelId, TxSelectOkBody.INSTANCE));
}
@Override
- public AMQFrame txCommit(final int channelId)
+ public void receiveTxCommit(final int channelId)
{
- return new AMQFrame(channelId, TxCommitBody.INSTANCE);
+ _processedMethods.add(new AMQFrame(channelId, TxCommitBody.INSTANCE));
}
@Override
- public AMQFrame txCommitOk(final int channelId)
+ public void receiveTxCommitOk(final int channelId)
{
- return new AMQFrame(channelId, TxCommitOkBody.INSTANCE);
+ _processedMethods.add(new AMQFrame(channelId, TxCommitOkBody.INSTANCE));
}
@Override
- public AMQFrame txRollback(final int channelId)
+ public void receiveTxRollback(final int channelId)
{
- return new AMQFrame(channelId, TxRollbackBody.INSTANCE);
+ _processedMethods.add(new AMQFrame(channelId, TxRollbackBody.INSTANCE));
}
@Override
- public AMQFrame txRollbackOk(final int channelId)
+ public void receiveTxRollbackOk(final int channelId)
{
- return new AMQFrame(channelId, TxRollbackOkBody.INSTANCE);
+ _processedMethods.add(new AMQFrame(channelId, TxRollbackOkBody.INSTANCE));
}
@Override
- public AMQFrame connectionSecure(final byte[] challenge)
+ public void receiveConnectionSecure(final byte[] challenge)
{
- return new AMQFrame(0, new ConnectionSecureBody(challenge));
+ _processedMethods.add(new AMQFrame(0, new ConnectionSecureBody(challenge)));
}
@Override
- public AMQFrame connectionSecureOk(final byte[] response)
+ public void receiveConnectionSecureOk(final byte[] response)
{
- return new AMQFrame(0, new ConnectionSecureOkBody(response));
+ _processedMethods.add(new AMQFrame(0, new ConnectionSecureOkBody(response)));
}
@Override
- public AMQFrame connectionTune(final int channelMax, final long frameMax, final int heartbeat)
+ public void receiveConnectionTune(final int channelMax, final long frameMax, final int heartbeat)
{
- return new AMQFrame(0, new ConnectionTuneBody(channelMax, frameMax, heartbeat));
+ _processedMethods.add(new AMQFrame(0, new ConnectionTuneBody(channelMax, frameMax, heartbeat)));
}
@Override
- public AMQFrame connectionTuneOk(final int channelMax, final long frameMax, final int heartbeat)
+ public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat)
{
- return new AMQFrame(0, new ConnectionTuneOkBody(channelMax, frameMax, heartbeat));
+ _processedMethods.add(new AMQFrame(0, new ConnectionTuneOkBody(channelMax, frameMax, heartbeat)));
}
@Override
- public AMQFrame connectionOpen(final AMQShortString virtualHost,
- final AMQShortString capabilities,
- final boolean insist)
+ public void receiveConnectionOpen(final AMQShortString virtualHost,
+ final AMQShortString capabilities,
+ final boolean insist)
{
- return new AMQFrame(0, new ConnectionOpenBody(virtualHost, capabilities, insist));
+ _processedMethods.add(new AMQFrame(0, new ConnectionOpenBody(virtualHost, capabilities, insist)));
}
@Override
- public AMQFrame connectionOpenOk(final AMQShortString knownHosts)
+ public void receiveConnectionOpenOk(final AMQShortString knownHosts)
{
- return new AMQFrame(0, new ConnectionOpenOkBody(knownHosts));
+ _processedMethods.add(new AMQFrame(0, new ConnectionOpenOkBody(knownHosts)));
}
@Override
- public AMQFrame connectionRedirect(final AMQShortString host, final AMQShortString knownHosts)
+ public void receiveConnectionRedirect(final AMQShortString host, final AMQShortString knownHosts)
{
- return new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), host, knownHosts));
+ _processedMethods.add(new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), host, knownHosts)));
}
@Override
- public AMQFrame connectionClose(final int replyCode,
- final AMQShortString replyText,
- final int classId,
- final int methodId)
+ public void receiveConnectionClose(final int replyCode,
+ final AMQShortString replyText,
+ final int classId,
+ final int methodId)
{
- return new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), replyCode, replyText, classId, methodId));
+ _processedMethods.add(new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), replyCode, replyText, classId, methodId)));
}
@Override
- public AMQFrame connectionCloseOk()
+ public void receiveConnectionCloseOk()
{
- return new AMQFrame(0, ProtocolVersion.v8_0.equals(getProtocolVersion())
+ _processedMethods.add(new AMQFrame(0, ProtocolVersion.v8_0.equals(getProtocolVersion())
? ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8
- : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9);
+ : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9));
}
@Override
- public AMQFrame channelOpen(final int channelId)
+ public void receiveChannelOpen(final int channelId)
{
- return new AMQFrame(channelId, new ChannelOpenBody());
+ _processedMethods.add(new AMQFrame(channelId, new ChannelOpenBody()));
}
@Override
- public AMQFrame channelOpenOk(final int channelId)
+ public void receiveChannelOpenOk(final int channelId)
{
- return new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion())
+ _processedMethods.add(new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion())
? ChannelOpenOkBody.INSTANCE_0_8
- : ChannelOpenOkBody.INSTANCE_0_9);
+ : ChannelOpenOkBody.INSTANCE_0_9));
}
@Override
- public AMQFrame channelFlow(final int channelId, final boolean active)
+ public void receiveChannelFlow(final int channelId, final boolean active)
{
- return new AMQFrame(channelId, new ChannelFlowBody(active));
+ _processedMethods.add(new AMQFrame(channelId, new ChannelFlowBody(active)));
}
@Override
- public AMQFrame channelFlowOk(final int channelId, final boolean active)
+ public void receiveChannelFlowOk(final int channelId, final boolean active)
{
- return new AMQFrame(channelId, new ChannelFlowOkBody(active));
+ _processedMethods.add(new AMQFrame(channelId, new ChannelFlowOkBody(active)));
}
@Override
- public AMQFrame channelAlert(final int channelId,
- final int replyCode,
- final AMQShortString replyText,
- final FieldTable details)
+ public void receiveChannelAlert(final int channelId,
+ final int replyCode,
+ final AMQShortString replyText,
+ final FieldTable details)
{
- return new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details));
+ _processedMethods.add(new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details)));
}
@Override
- public AMQFrame channelClose(final int channelId,
- final int replyCode,
- final AMQShortString replyText,
- final int classId,
- final int methodId)
+ public void receiveChannelClose(final int channelId,
+ final int replyCode,
+ final AMQShortString replyText,
+ final int classId,
+ final int methodId)
{
- return new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId));
+ _processedMethods.add(new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId)));
}
@Override
- public AMQFrame channelCloseOk(final int channelId)
+ public void receiveChannelCloseOk(final int channelId)
{
- return new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE);
+ _processedMethods.add(new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE));
}
@Override
- public AMQFrame accessRequest(final int channelId,
- final AMQShortString realm,
- final boolean exclusive,
- final boolean passive,
- final boolean active,
- final boolean write,
- final boolean read)
+ public void receiveAccessRequest(final int channelId,
+ final AMQShortString realm,
+ final boolean exclusive,
+ final boolean passive,
+ final boolean active,
+ final boolean write,
+ final boolean read)
{
- return new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read));
+ _processedMethods.add(new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read)));
}
@Override
- public AMQFrame accessRequestOk(final int channelId, final int ticket)
+ public void receiveAccessRequestOk(final int channelId, final int ticket)
{
- return new AMQFrame(channelId, new AccessRequestOkBody(ticket));
+ _processedMethods.add(new AMQFrame(channelId, new AccessRequestOkBody(ticket)));
}
@Override
- public AMQFrame exchangeDeclare(final int channelId,
- final AMQShortString exchange,
- final AMQShortString type,
- final boolean passive,
- final boolean durable,
- final boolean autoDelete,
- final boolean internal,
- final boolean nowait, final FieldTable arguments)
+ public void receiveExchangeDeclare(final int channelId,
+ final AMQShortString exchange,
+ final AMQShortString type,
+ final boolean passive,
+ final boolean durable,
+ final boolean autoDelete,
+ final boolean internal,
+ final boolean nowait, final FieldTable arguments)
{
- return new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments));
+ _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments)));
}
@Override
- public AMQFrame exchangeDeclareOk(final int channelId)
+ public void receiveExchangeDeclareOk(final int channelId)
{
- return new AMQFrame(channelId, new ExchangeDeclareOkBody());
+ _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareOkBody()));
}
@Override
- public AMQFrame exchangeDelete(final int channelId,
- final AMQShortString exchange,
- final boolean ifUnused,
- final boolean nowait)
+ public void receiveExchangeDelete(final int channelId,
+ final AMQShortString exchange,
+ final boolean ifUnused,
+ final boolean nowait)
{
- return new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait));
+ _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait)));
}
@Override
- public AMQFrame exchangeDeleteOk(final int channelId)
+ public void receiveExchangeDeleteOk(final int channelId)
{
- return new AMQFrame(channelId, new ExchangeDeleteOkBody());
+ _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteOkBody()));
}
@Override
- public AMQFrame exchangeBound(final int channelId,
- final AMQShortString exchange,
- final AMQShortString routingKey,
- final AMQShortString queue)
+ public void receiveExchangeBound(final int channelId,
+ final AMQShortString exchange,
+ final AMQShortString routingKey,
+ final AMQShortString queue)
{
- return new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue));
+ _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue)));
}
@Override
- public AMQFrame exchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText)
+ public void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText)
{
- return new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText));
+ _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText)));
}
@Override
- public AMQFrame queueBindOk(final int channelId)
+ public void receiveQueueBindOk(final int channelId)
{
- return new AMQFrame(channelId, new QueueBindOkBody());
+ _processedMethods.add(new AMQFrame(channelId, new QueueBindOkBody()));
}
@Override
- public AMQFrame queueUnbindOk(final int channelId)
+ public void receiveQueueUnbindOk(final int channelId)
{
- return new AMQFrame(channelId, new QueueUnbindOkBody());
+ _processedMethods.add(new AMQFrame(channelId, new QueueUnbindOkBody()));
}
@Override
- public AMQFrame queueDeclare(final int channelId,
- final AMQShortString queue,
- final boolean passive,
- final boolean durable,
- final boolean exclusive,
- final boolean autoDelete,
- final boolean nowait,
- final FieldTable arguments)
+ public void receiveQueueDeclare(final int channelId,
+ final AMQShortString queue,
+ final boolean passive,
+ final boolean durable,
+ final boolean exclusive,
+ final boolean autoDelete,
+ final boolean nowait,
+ final FieldTable arguments)
{
- return new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments));
+ _processedMethods.add(new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments)));
}
@Override
- public AMQFrame queueDeclareOk(final int channelId,
- final AMQShortString queue,
- final long messageCount,
- final long consumerCount)
+ public void receiveQueueDeclareOk(final int channelId,
+ final AMQShortString queue,
+ final long messageCount,
+ final long consumerCount)
{
- return new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount));
+ _processedMethods.add(new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount)));
}
@Override
- public AMQFrame queueBind(final int channelId,
- final AMQShortString queue,
- final AMQShortString exchange,
- final AMQShortString bindingKey,
- final boolean nowait,
- final FieldTable arguments)
+ public void receiveQueueBind(final int channelId,
+ final AMQShortString queue,
+ final AMQShortString exchange,
+ final AMQShortString bindingKey,
+ final boolean nowait,
+ final FieldTable arguments)
{
- return new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments));
+ _processedMethods.add(new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments)));
}
@Override
- public AMQFrame queuePurge(final int channelId, final AMQShortString queue, final boolean nowait)
+ public void receiveQueuePurge(final int channelId, final AMQShortString queue, final boolean nowait)
{
- return new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait));
+ _processedMethods.add(new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait)));
}
@Override
- public AMQFrame queuePurgeOk(final int channelId, final long messageCount)
+ public void receiveQueuePurgeOk(final int channelId, final long messageCount)
{
- return new AMQFrame(channelId, new QueuePurgeOkBody(messageCount));
+ _processedMethods.add(new AMQFrame(channelId, new QueuePurgeOkBody(messageCount)));
}
@Override
- public AMQFrame queueDelete(final int channelId,
- final AMQShortString queue,
- final boolean ifUnused,
- final boolean ifEmpty,
- final boolean nowait)
+ public void receiveQueueDelete(final int channelId,
+ final AMQShortString queue,
+ final boolean ifUnused,
+ final boolean ifEmpty,
+ final boolean nowait)
{
- return new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait));
+ _processedMethods.add(new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait)));
}
@Override
- public AMQFrame queueDeleteOk(final int channelId, final long messageCount)
+ public void receiveQueueDeleteOk(final int channelId, final long messageCount)
{
- return new AMQFrame(channelId, new QueueDeleteOkBody(messageCount));
+ _processedMethods.add(new AMQFrame(channelId, new QueueDeleteOkBody(messageCount)));
}
@Override
- public AMQFrame queueUnbind(final int channelId,
- final AMQShortString queue,
- final AMQShortString exchange,
- final AMQShortString bindingKey,
- final FieldTable arguments)
+ public void receiveQueueUnbind(final int channelId,
+ final AMQShortString queue,
+ final AMQShortString exchange,
+ final AMQShortString bindingKey,
+ final FieldTable arguments)
{
- return new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments));
+ _processedMethods.add(new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments)));
}
@Override
- public AMQFrame basicRecoverSyncOk(final int channelId)
+ public void receiveBasicRecoverSyncOk(final int channelId)
{
- return new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion()));
+ _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion())));
}
@Override
- public AMQFrame basicRecover(final int channelId, final boolean requeue, final boolean sync)
+ public void receiveBasicRecover(final int channelId, final boolean requeue, final boolean sync)
{
if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync)
{
- return new AMQFrame(channelId, new BasicRecoverBody(requeue));
+ _processedMethods.add(new AMQFrame(channelId, new BasicRecoverBody(requeue)));
}
else
{
- return new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue));
+ _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue)));
}
}
@Override
- public AMQFrame basicQos(final int channelId,
- final long prefetchSize,
- final int prefetchCount,
- final boolean global)
+ public void receiveBasicQos(final int channelId,
+ final long prefetchSize,
+ final int prefetchCount,
+ final boolean global)
{
- return new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global));
+ _processedMethods.add(new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global)));
}
@Override
- public AMQFrame basicQosOk(final int channelId)
+ public void receiveBasicQosOk(final int channelId)
{
- return new AMQFrame(channelId, new BasicQosOkBody());
+ _processedMethods.add(new AMQFrame(channelId, new BasicQosOkBody()));
}
@Override
- public AMQFrame basicConsume(final int channelId,
- final AMQShortString queue,
- final AMQShortString consumerTag,
- final boolean noLocal,
- final boolean noAck,
- final boolean exclusive,
- final boolean nowait,
- final FieldTable arguments)
+ public void receiveBasicConsume(final int channelId,
+ final AMQShortString queue,
+ final AMQShortString consumerTag,
+ final boolean noLocal,
+ final boolean noAck,
+ final boolean exclusive,
+ final boolean nowait,
+ final FieldTable arguments)
{
- return new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments));
+ _processedMethods.add(new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments)));
}
@Override
- public AMQFrame basicConsumeOk(final int channelId, final AMQShortString consumerTag)
+ public void receiveBasicConsumeOk(final int channelId, final AMQShortString consumerTag)
{
- return new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag));
+ _processedMethods.add(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag)));
}
@Override
- public AMQFrame basicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait)
+ public void receiveBasicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait)
{
- return new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait));
+ _processedMethods.add(new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait)));
}
@Override
- public AMQFrame basicCancelOk(final int channelId, final AMQShortString consumerTag)
+ public void receiveBasicCancelOk(final int channelId, final AMQShortString consumerTag)
{
- return new AMQFrame(channelId, new BasicCancelOkBody(consumerTag));
+ _processedMethods.add(new AMQFrame(channelId, new BasicCancelOkBody(consumerTag)));
}
@Override
- public AMQFrame basicPublish(final int channelId,
- final AMQShortString exchange,
- final AMQShortString routingKey,
- final boolean mandatory,
- final boolean immediate)
+ public void receiveBasicPublish(final int channelId,
+ final AMQShortString exchange,
+ final AMQShortString routingKey,
+ final boolean mandatory,
+ final boolean immediate)
{
- return new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate));
+ _processedMethods.add(new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate)));
}
@Override
- public AMQFrame basicReturn(final int channelId, final int replyCode,
- final AMQShortString replyText,
- final AMQShortString exchange,
- final AMQShortString routingKey)
+ public void receiveBasicReturn(final int channelId, final int replyCode,
+ final AMQShortString replyText,
+ final AMQShortString exchange,
+ final AMQShortString routingKey)
{
- return new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey));
+ _processedMethods.add(new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey)));
}
@Override
- public AMQFrame basicDeliver(final int channelId,
- final AMQShortString consumerTag,
- final long deliveryTag,
- final boolean redelivered,
- final AMQShortString exchange,
- final AMQShortString routingKey)
+ public void receiveBasicDeliver(final int channelId,
+ final AMQShortString consumerTag,
+ final long deliveryTag,
+ final boolean redelivered,
+ final AMQShortString exchange,
+ final AMQShortString routingKey)
{
- return new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey));
+ _processedMethods.add(new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)));
}
@Override
- public AMQFrame basicGet(final int channelId, final AMQShortString queue, final boolean noAck)
+ public void receiveBasicGet(final int channelId, final AMQShortString queue, final boolean noAck)
{
- return new AMQFrame(channelId, new BasicGetBody(0, queue, noAck));
+ _processedMethods.add(new AMQFrame(channelId, new BasicGetBody(0, queue, noAck)));
}
@Override
- public AMQFrame basicGetOk(final int channelId,
- final long deliveryTag,
- final boolean redelivered,
- final AMQShortString exchange,
- final AMQShortString routingKey,
- final long messageCount)
+ public void receiveBasicGetOk(final int channelId,
+ final long deliveryTag,
+ final boolean redelivered,
+ final AMQShortString exchange,
+ final AMQShortString routingKey,
+ final long messageCount)
+ {
+ _processedMethods.add(new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount)));
+ }
+
+ @Override
+ public void receiveBasicGetEmpty(final int channelId)
{
- return new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount));
+ _processedMethods.add(new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null)));
}
@Override
- public AMQFrame basicGetEmpty(final int channelId)
+ public void receiveBasicAck(final int channelId, final long deliveryTag, final boolean multiple)
{
- return new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null));
+ _processedMethods.add(new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple)));
}
@Override
- public AMQFrame basicAck(final int channelId, final long deliveryTag, final boolean multiple)
+ public void receiveBasicReject(final int channelId, final long deliveryTag, final boolean requeue)
{
- return new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple));
+ _processedMethods.add(new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue)));
}
@Override
- public AMQFrame basicReject(final int channelId, final long deliveryTag, final boolean requeue)
+ public void receiveHeartbeat()
{
- return new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue));
+ _processedMethods.add(new AMQFrame(0, new HeartbeatBody()));
}
@Override
- public AMQFrame heartbeat()
+ public ProtocolVersion getProtocolVersion()
{
- return new AMQFrame(0, new HeartbeatBody());
+ return _protocolVersion;
}
- private ProtocolVersion getProtocolVersion()
+ public void setProtocolVersion(final ProtocolVersion protocolVersion)
+ {
+ _protocolVersion = protocolVersion;
+ }
+
+ @Override
+ public void receiveMessageContent(final int channelId, final byte[] data)
{
- return _methodRegistry.getProtocolVersion();
+ _processedMethods.add(new AMQFrame(channelId, new ContentBody(data)));
}
@Override
- public AMQFrame messageContent(final int channelId, final byte[] data)
+ public void receiveMessageHeader(final int channelId,
+ final BasicContentHeaderProperties properties,
+ final long bodySize)
{
- return new AMQFrame(channelId, new ContentBody(data));
+ _processedMethods.add(new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize)));
}
@Override
- public AMQFrame messageHeader(final int channelId,
- final BasicContentHeaderProperties properties,
- final long bodySize)
+ public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation)
{
- return new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize));
+ _processedMethods.add(protocolInitiation);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Fri Oct 10 09:54:36 2014
@@ -81,9 +81,9 @@ public class HeartbeatBody implements AM
return new AMQFrame(0, this);
}
- public static <T> T process(final int channel,
+ public static void process(final int channel,
final MarkableDataInput in,
- final MethodProcessor<T> processor,
+ final MethodProcessor processor,
final long bodySize) throws IOException
{
@@ -91,6 +91,6 @@ public class HeartbeatBody implements AM
{
in.skip(bodySize);
}
- return processor.heartbeat();
+ processor.receiveHeartbeat();
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java Fri Oct 10 09:54:36 2014
@@ -20,178 +20,182 @@
*/
package org.apache.qpid.framing;
-public interface MethodProcessor<T>
+public interface MethodProcessor
{
- T connectionStart(short versionMajor,
- short versionMinor,
- FieldTable serverProperties,
- byte[] mechanisms,
- byte[] locales);
+ ProtocolVersion getProtocolVersion();
- T connectionStartOk(FieldTable clientProperties,
- AMQShortString mechanism,
- byte[] response,
- AMQShortString locale);
+ void receiveConnectionStart(short versionMajor,
+ short versionMinor,
+ FieldTable serverProperties,
+ byte[] mechanisms,
+ byte[] locales);
- T txSelect(int channelId);
+ void receiveConnectionStartOk(FieldTable clientProperties,
+ AMQShortString mechanism,
+ byte[] response,
+ AMQShortString locale);
- T txSelectOk(int channelId);
+ void receiveTxSelect(int channelId);
- T txCommit(int channelId);
+ void receiveTxSelectOk(int channelId);
- T txCommitOk(int channelId);
+ void receiveTxCommit(int channelId);
- T txRollback(int channelId);
+ void receiveTxCommitOk(int channelId);
- T txRollbackOk(int channelId);
+ void receiveTxRollback(int channelId);
- T connectionSecure(byte[] challenge);
+ void receiveTxRollbackOk(int channelId);
- T connectionSecureOk(byte[] response);
+ void receiveConnectionSecure(byte[] challenge);
- T connectionTune(int channelMax, long frameMax, int heartbeat);
+ void receiveConnectionSecureOk(byte[] response);
- T connectionTuneOk(int channelMax, long frameMax, int heartbeat);
+ void receiveConnectionTune(int channelMax, long frameMax, int heartbeat);
- T connectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist);
+ void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat);
- T connectionOpenOk(AMQShortString knownHosts);
+ void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist);
- T connectionRedirect(AMQShortString host, AMQShortString knownHosts);
+ void receiveConnectionOpenOk(AMQShortString knownHosts);
- T connectionClose(int replyCode, AMQShortString replyText, int classId, int methodId);
+ void receiveConnectionRedirect(AMQShortString host, AMQShortString knownHosts);
- T connectionCloseOk();
+ void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId);
- T channelOpen(int channelId);
+ void receiveConnectionCloseOk();
- T channelOpenOk(int channelId);
+ void receiveChannelOpen(int channelId);
- T channelFlow(int channelId, boolean active);
+ void receiveChannelOpenOk(int channelId);
- T channelFlowOk(int channelId, boolean active);
+ void receiveChannelFlow(int channelId, boolean active);
- T channelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details);
+ void receiveChannelFlowOk(int channelId, boolean active);
- T channelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId);
+ void receiveChannelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details);
- T channelCloseOk(int channelId);
+ void receiveChannelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId);
- T accessRequest(int channelId,
- AMQShortString realm,
- boolean exclusive,
- boolean passive,
- boolean active,
- boolean write, boolean read);
+ void receiveChannelCloseOk(int channelId);
- T accessRequestOk(int channelId, int ticket);
+ void receiveAccessRequest(int channelId,
+ AMQShortString realm,
+ boolean exclusive,
+ boolean passive,
+ boolean active,
+ boolean write, boolean read);
- T exchangeDeclare(int channelId,
- AMQShortString exchange,
- AMQShortString type,
- boolean passive,
- boolean durable,
- boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments);
+ void receiveAccessRequestOk(int channelId, int ticket);
- T exchangeDeclareOk(int channelId);
+ void receiveExchangeDeclare(int channelId,
+ AMQShortString exchange,
+ AMQShortString type,
+ boolean passive,
+ boolean durable,
+ boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments);
- T exchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait);
+ void receiveExchangeDeclareOk(int channelId);
- T exchangeDeleteOk(int channelId);
+ void receiveExchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait);
- T exchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue);
+ void receiveExchangeDeleteOk(int channelId);
- T exchangeBoundOk(int channelId, int replyCode, AMQShortString replyText);
+ void receiveExchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue);
- T queueBindOk(int channelId);
+ void receiveExchangeBoundOk(int channelId, int replyCode, AMQShortString replyText);
- T queueUnbindOk(final int channelId);
+ void receiveQueueBindOk(int channelId);
- T queueDeclare(int channelId,
- AMQShortString queue,
- boolean passive,
- boolean durable,
- boolean exclusive,
- boolean autoDelete, boolean nowait, FieldTable arguments);
+ void receiveQueueUnbindOk(final int channelId);
- T queueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount);
+ void receiveQueueDeclare(int channelId,
+ AMQShortString queue,
+ boolean passive,
+ boolean durable,
+ boolean exclusive,
+ boolean autoDelete, boolean nowait, FieldTable arguments);
- T queueBind(int channelId,
- AMQShortString queue,
- AMQShortString exchange,
- AMQShortString bindingKey,
- boolean nowait, FieldTable arguments);
+ void receiveQueueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount);
- T queuePurge(int channelId, AMQShortString queue, boolean nowait);
+ void receiveQueueBind(int channelId,
+ AMQShortString queue,
+ AMQShortString exchange,
+ AMQShortString bindingKey,
+ boolean nowait, FieldTable arguments);
- T queuePurgeOk(int channelId, long messageCount);
+ void receiveQueuePurge(int channelId, AMQShortString queue, boolean nowait);
- T queueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait);
+ void receiveQueuePurgeOk(int channelId, long messageCount);
- T queueDeleteOk(int channelId, long messageCount);
+ void receiveQueueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait);
- T queueUnbind(int channelId,
- AMQShortString queue,
- AMQShortString exchange,
- AMQShortString bindingKey,
- FieldTable arguments);
+ void receiveQueueDeleteOk(int channelId, long messageCount);
- T basicRecoverSyncOk(int channelId);
+ void receiveQueueUnbind(int channelId,
+ AMQShortString queue,
+ AMQShortString exchange,
+ AMQShortString bindingKey,
+ FieldTable arguments);
- T basicRecover(int channelId, final boolean requeue, boolean sync);
+ void receiveBasicRecoverSyncOk(int channelId);
- T basicQos(int channelId, long prefetchSize, int prefetchCount, boolean global);
+ void receiveBasicRecover(int channelId, final boolean requeue, boolean sync);
- T basicQosOk(int channelId);
+ void receiveBasicQos(int channelId, long prefetchSize, int prefetchCount, boolean global);
- T basicConsume(int channelId,
- AMQShortString queue,
- AMQShortString consumerTag,
- boolean noLocal,
- boolean noAck,
- boolean exclusive, boolean nowait, FieldTable arguments);
+ void receiveBasicQosOk(int channelId);
- T basicConsumeOk(int channelId, AMQShortString consumerTag);
+ void receiveBasicConsume(int channelId,
+ AMQShortString queue,
+ AMQShortString consumerTag,
+ boolean noLocal,
+ boolean noAck,
+ boolean exclusive, boolean nowait, FieldTable arguments);
- T basicCancel(int channelId, AMQShortString consumerTag, boolean noWait);
+ void receiveBasicConsumeOk(int channelId, AMQShortString consumerTag);
- T basicCancelOk(int channelId, AMQShortString consumerTag);
+ void receiveBasicCancel(int channelId, AMQShortString consumerTag, boolean noWait);
- T basicPublish(int channelId,
- AMQShortString exchange,
- AMQShortString routingKey,
- boolean mandatory,
- boolean immediate);
+ void receiveBasicCancelOk(int channelId, AMQShortString consumerTag);
- T basicReturn(final int channelId,
- int replyCode,
- AMQShortString replyText,
- AMQShortString exchange,
- AMQShortString routingKey);
+ void receiveBasicPublish(int channelId,
+ AMQShortString exchange,
+ AMQShortString routingKey,
+ boolean mandatory,
+ boolean immediate);
- T basicDeliver(int channelId,
- AMQShortString consumerTag,
- long deliveryTag,
- boolean redelivered,
- AMQShortString exchange, AMQShortString routingKey);
+ void receiveBasicReturn(final int channelId,
+ int replyCode,
+ AMQShortString replyText,
+ AMQShortString exchange,
+ AMQShortString routingKey);
- T basicGet(int channelId, AMQShortString queue, boolean noAck);
+ void receiveBasicDeliver(int channelId,
+ AMQShortString consumerTag,
+ long deliveryTag,
+ boolean redelivered,
+ AMQShortString exchange, AMQShortString routingKey);
- T basicGetOk(int channelId,
- long deliveryTag,
- boolean redelivered,
- AMQShortString exchange,
- AMQShortString routingKey, long messageCount);
+ void receiveBasicGet(int channelId, AMQShortString queue, boolean noAck);
- T basicGetEmpty(int channelId);
+ void receiveBasicGetOk(int channelId,
+ long deliveryTag,
+ boolean redelivered,
+ AMQShortString exchange,
+ AMQShortString routingKey, long messageCount);
- T basicAck(int channelId, long deliveryTag, boolean multiple);
+ void receiveBasicGetEmpty(int channelId);
- T basicReject(int channelId, long deliveryTag, boolean requeue);
+ void receiveBasicAck(int channelId, long deliveryTag, boolean multiple);
- T heartbeat();
+ void receiveBasicReject(int channelId, long deliveryTag, boolean requeue);
- T messageContent(int channelId, byte[] data);
+ void receiveHeartbeat();
- T messageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize);
+ void receiveMessageContent(int channelId, byte[] data);
+
+ void receiveMessageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize);
+
+ void receiveProtocolHeader(ProtocolInitiation protocolInitiation);
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java Fri Oct 10 09:54:36 2014
@@ -31,14 +31,12 @@ package org.apache.qpid.framing;
public final class MethodRegistry
{
- private final FrameCreatingMethodProcessor _methodProcessor;
private ProtocolVersion _protocolVersion;
public MethodRegistry(ProtocolVersion pv)
{
_protocolVersion = pv;
- _methodProcessor = new FrameCreatingMethodProcessor(this);
}
public void setProtocolVersion(final ProtocolVersion protocolVersion)
@@ -555,10 +553,5 @@ public final class MethodRegistry
return _protocolVersion;
}
- public FrameCreatingMethodProcessor getMethodProcessor()
- {
- return _methodProcessor;
- }
-
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java Fri Oct 10 09:54:36 2014
@@ -165,9 +165,9 @@ public class QueueBindBody extends AMQMe
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException
+ final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
{
int ticket = buffer.readUnsignedShort();
@@ -176,6 +176,6 @@ public class QueueBindBody extends AMQMe
AMQShortString bindingKey = buffer.readAMQShortString();
boolean nowait = (buffer.readByte() & 0x01) == 0x01;
FieldTable arguments = EncodingUtils.readFieldTable(buffer);
- return dispatcher.queueBind(channelId, queue, exchange, bindingKey, nowait, arguments);
+ dispatcher.receiveQueueBind(channelId, queue, exchange, bindingKey, nowait, arguments);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java Fri Oct 10 09:54:36 2014
@@ -191,9 +191,9 @@ public class QueueDeclareBody extends AM
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException
+ final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
{
int ticket = buffer.readUnsignedShort();
@@ -206,6 +206,6 @@ public class QueueDeclareBody extends AM
boolean autoDelete = (bitfield & 0x08 ) == 0x08;
boolean nowait = (bitfield & 0x010 ) == 0x010;
FieldTable arguments = EncodingUtils.readFieldTable(buffer);
- return dispatcher.queueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments);
+ dispatcher.receiveQueueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java Fri Oct 10 09:54:36 2014
@@ -120,13 +120,13 @@ public class QueueDeclareOkBody extends
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
AMQShortString queue = buffer.readAMQShortString();
long messageCount = EncodingUtils.readUnsignedInteger(buffer);
long consumerCount = EncodingUtils.readUnsignedInteger(buffer);
- return dispatcher.queueDeclareOk(channelId, queue, messageCount, consumerCount);
+ dispatcher.receiveQueueDeclareOk(channelId, queue, messageCount, consumerCount);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java Fri Oct 10 09:54:36 2014
@@ -151,9 +151,9 @@ public class QueueDeleteBody extends AMQ
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
int ticket = buffer.readUnsignedShort();
@@ -163,6 +163,6 @@ public class QueueDeleteBody extends AMQ
boolean ifUnused = (bitfield & 0x01) == 0x01;
boolean ifEmpty = (bitfield & 0x02) == 0x02;
boolean nowait = (bitfield & 0x04) == 0x04;
- return dispatcher.queueDelete(channelId, queue, ifUnused, ifEmpty, nowait);
+ dispatcher.receiveQueueDelete(channelId, queue, ifUnused, ifEmpty, nowait);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java Fri Oct 10 09:54:36 2014
@@ -95,11 +95,11 @@ public class QueueDeleteOkBody extends A
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
long messageCount = EncodingUtils.readUnsignedInteger(buffer);
- return dispatcher.queueDeleteOk(channelId, messageCount);
+ dispatcher.receiveQueueDeleteOk(channelId, messageCount);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java Fri Oct 10 09:54:36 2014
@@ -125,14 +125,14 @@ public class QueuePurgeBody extends AMQM
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
int ticket = buffer.readUnsignedShort();
AMQShortString queue = buffer.readAMQShortString();
boolean nowait = (buffer.readByte() & 0x01) == 0x01;
- return dispatcher.queuePurge(channelId, queue, nowait);
+ dispatcher.receiveQueuePurge(channelId, queue, nowait);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java Fri Oct 10 09:54:36 2014
@@ -95,11 +95,11 @@ public class QueuePurgeOkBody extends AM
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
long messageCount = EncodingUtils.readUnsignedInteger(buffer);
- return dispatcher.queuePurgeOk(channelId, messageCount);
+ dispatcher.receiveQueuePurgeOk(channelId, messageCount);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java Fri Oct 10 09:54:36 2014
@@ -147,9 +147,9 @@ public class QueueUnbindBody extends AMQ
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException
+ final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException
{
int ticket = buffer.readUnsignedShort();
@@ -157,6 +157,6 @@ public class QueueUnbindBody extends AMQ
AMQShortString exchange = buffer.readAMQShortString();
AMQShortString routingKey = buffer.readAMQShortString();
FieldTable arguments = EncodingUtils.readFieldTable(buffer);
- return dispatcher.queueUnbind(channelId, queue, exchange, routingKey, arguments);
+ dispatcher.receiveQueueUnbind(channelId, queue, exchange, routingKey, arguments);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Fri Oct 10 09:54:36 2014
@@ -25,7 +25,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.List;
import junit.framework.TestCase;
@@ -33,19 +33,21 @@ import org.apache.qpid.framing.AMQDataBl
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.framing.FrameCreatingMethodProcessor;
import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolVersion;
public class AMQDecoderTest extends TestCase
{
private AMQDecoder _decoder;
+ private FrameCreatingMethodProcessor _methodProcessor;
public void setUp()
{
- _decoder = new AMQDecoder(false, new MethodRegistry(ProtocolVersion.v0_91));
+ _methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91);
+ _decoder = new AMQDecoder(false, _methodProcessor);
}
@@ -59,7 +61,8 @@ public class AMQDecoderTest extends Test
public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException
{
ByteBuffer msg = getHeartbeatBodyBuffer();
- ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg);
+ _decoder.decodeBuffer(msg);
+ List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods();
if (frames.get(0) instanceof AMQFrame)
{
assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType());
@@ -79,9 +82,12 @@ public class AMQDecoderTest extends Test
msgA.limit(msgaLimit);
msg.position(msgbPos);
ByteBuffer msgB = msg.slice();
- ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msgA);
+
+ _decoder.decodeBuffer(msgA);
+ List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods();
assertEquals(0, frames.size());
- frames = _decoder.decodeBuffer(msgB);
+
+ _decoder.decodeBuffer(msgB);
assertEquals(1, frames.size());
if (frames.get(0) instanceof AMQFrame)
{
@@ -101,7 +107,8 @@ public class AMQDecoderTest extends Test
msg.put(msgA);
msg.put(msgB);
msg.flip();
- ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg);
+ _decoder.decodeBuffer(msg);
+ List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods();
assertEquals(2, frames.size());
for (AMQDataBlock frame : frames)
{
@@ -138,12 +145,15 @@ public class AMQDecoderTest extends Test
sliceB.put(msgC);
sliceB.flip();
msgC.limit(limit);
-
- ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(sliceA);
+
+ _decoder.decodeBuffer(sliceA);
+ List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods();
assertEquals(1, frames.size());
- frames = _decoder.decodeBuffer(sliceB);
+ frames.clear();
+ _decoder.decodeBuffer(sliceB);
assertEquals(1, frames.size());
- frames = _decoder.decodeBuffer(msgC);
+ frames.clear();
+ _decoder.decodeBuffer(msgC);
assertEquals(1, frames.size());
for (AMQDataBlock frame : frames)
{
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java Fri Oct 10 09:54:36 2014
@@ -27,7 +27,6 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -41,6 +40,7 @@ import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
+import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQDataBlockDecoder;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQFrameDecodingException;
@@ -51,7 +51,7 @@ import org.apache.qpid.framing.Connectio
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.FrameCreatingMethodProcessor;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.server.model.AuthenticationProvider;
@@ -110,11 +110,11 @@ public class MaxFrameSizeTest extends Qp
{
@Override
- public void evaluate(final Socket socket, final List<AMQFrame> frames)
+ public void evaluate(final Socket socket, final List<AMQDataBlock> frames)
{
if(!socket.isClosed())
{
- AMQFrame lastFrame = frames.get(frames.size() - 1);
+ AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1);
assertTrue("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE, lastFrame.getBodyFrame() instanceof ConnectionCloseBody);
}
}
@@ -159,11 +159,11 @@ public class MaxFrameSizeTest extends Qp
{
@Override
- public void evaluate(final Socket socket, final List<AMQFrame> frames)
+ public void evaluate(final Socket socket, final List<AMQDataBlock> frames)
{
if(!socket.isClosed())
{
- AMQFrame lastFrame = frames.get(frames.size() - 1);
+ AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1);
assertTrue("Connection should not be possible with a frame size larger than the broker requested", lastFrame.getBodyFrame() instanceof ConnectionCloseBody);
}
}
@@ -173,7 +173,7 @@ public class MaxFrameSizeTest extends Qp
private static interface ResultEvaluator
{
- void evaluate(Socket socket, List<AMQFrame> frames);
+ void evaluate(Socket socket, List<AMQDataBlock> frames);
}
private void doAMQP08test(int frameSize, ResultEvaluator evaluator)
@@ -236,17 +236,14 @@ public class MaxFrameSizeTest extends Qp
byte[] serverData = baos.toByteArray();
ByteArrayDataInput badi = new ByteArrayDataInput(serverData);
AMQDataBlockDecoder datablockDecoder = new AMQDataBlockDecoder();
- final MethodRegistry methodRegistry_0_91 = new MethodRegistry(ProtocolVersion.v0_91);
+ final FrameCreatingMethodProcessor methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91);
- List<AMQFrame> frames = new ArrayList<>();
while (datablockDecoder.decodable(badi))
{
- frames.add(datablockDecoder.createAndPopulateFrame(methodRegistry_0_91.getProtocolVersion(),
- methodRegistry_0_91.getMethodProcessor(),
- badi));
+ datablockDecoder.processInput(methodProcessor, badi);
}
- evaluator.evaluate(socket, frames);
+ evaluator.evaluate(socket, methodProcessor.getProcessedMethods());
}
private static class TestClientDelegate extends ClientDelegate
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org