You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/10 11:54:37 UTC
svn commit: r1630745 [1/2] - in
/qpid/branches/QPID-6125-ProtocolRefactoring/java:
bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/
client/src/main/java...
Author: rgodfrey
Date: Fri Oct 10 09:54:36 2014
New Revision: 1630745
URL: http://svn.apache.org/r1630745
Log:
More refactoring
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Fri Oct 10 09:54:36 2014
@@ -30,8 +30,6 @@ import java.util.Arrays;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10;
@@ -241,8 +239,6 @@ public class BDBMessageStoreTest extends
private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length)
{
- MethodRegistry methodRegistry = new MethodRegistry(ProtocolVersion.v0_9);
- int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
return new ContentHeaderBody(props, length);
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Fri Oct 10 09:54:36 2014
@@ -133,6 +133,7 @@ public class AMQProtocolEngine implement
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion);
+ private final FrameCreatingMethodProcessor _methodProcessor = new FrameCreatingMethodProcessor(_protocolVersion);
private final List<Action<? super AMQProtocolEngine>> _taskList =
new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>();
@@ -185,7 +186,7 @@ public class AMQProtocolEngine implement
_transport = transport;
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
_receivedLock = new ReentrantLock();
- _decoder = new AMQDecoder(true, _methodRegistry);
+ _decoder = new AMQDecoder(true, _methodProcessor);
_connectionID = connectionId;
_logSubject = new ConnectionLogSubject(this);
@@ -296,10 +297,11 @@ public class AMQProtocolEngine implement
_readBytes += msg.remaining();
_receivedLock.lock();
+ List<AMQDataBlock> processedMethods = _methodProcessor.getProcessedMethods();
try
{
- final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg);
- for (AMQDataBlock dataBlock : dataBlocks)
+ _decoder.decodeBuffer(msg);
+ for (AMQDataBlock dataBlock : processedMethods)
{
try
{
@@ -320,6 +322,7 @@ public class AMQProtocolEngine implement
break;
}
}
+ processedMethods.clear();
receivedComplete();
}
catch (ConnectionScopedRuntimeException e)
@@ -349,6 +352,7 @@ public class AMQProtocolEngine implement
}
finally
{
+ processedMethods.clear();
_receivedLock.unlock();
}
return null;
@@ -1089,13 +1093,32 @@ public class AMQProtocolEngine implement
private void closeConnection(int channelId, AMQConnectionException e)
{
- try
+
+ if (_logger.isInfoEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Closing connection due to: " + e);
- }
+ _logger.info("Closing connection due to: " + e);
+ }
+ closeConnection(channelId, e.getCloseFrame());
+ }
+
+
+ void closeConnection(AMQConstant errorCode,
+ String message, int channelId,
+ int classId,
+ int methodId)
+ {
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing connection due to: " + message);
+ }
+ closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), classId, methodId)));
+ }
+ private void closeConnection(int channelId, AMQFrame frame)
+ {
+ try
+ {
markChannelAwaitingCloseOk(channelId);
closeSession();
}
@@ -1103,7 +1126,7 @@ public class AMQProtocolEngine implement
{
try
{
- writeFrame(e.getCloseFrame());
+ writeFrame(frame);
}
finally
{
@@ -1208,6 +1231,7 @@ public class AMQProtocolEngine implement
{
_protocolVersion = pv;
_methodRegistry.setProtocolVersion(_protocolVersion);
+ _methodProcessor.setProtocolVersion(_protocolVersion);
_protocolOutputConverter = new ProtocolOutputConverterImpl(this);
_dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(this);
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Oct 10 09:54:36 2014
@@ -23,8 +23,8 @@ package org.apache.qpid.client.protocol;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
@@ -193,7 +193,7 @@ public class AMQProtocolHandler implemen
_connection = con;
_protocolSession = new AMQProtocolSession(this, _connection);
_stateManager = new AMQStateManager(_protocolSession);
- _decoder = new AMQDecoder(false, _protocolSession.getMethodRegistry());
+ _decoder = new AMQDecoder(false, _protocolSession.getMethodProcessor());
_failoverHandler = new FailoverHandler(this);
}
@@ -459,9 +459,10 @@ public class AMQProtocolHandler implemen
{
_readBytes += msg.remaining();
_lastReadTime = System.currentTimeMillis();
+ final List<AMQDataBlock> dataBlocks = _protocolSession.getMethodProcessor().getProcessedMethods();
try
{
- final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg);
+ _decoder.decodeBuffer(msg);
// Decode buffer
int size = dataBlocks.size();
@@ -511,6 +512,10 @@ public class AMQProtocolHandler implemen
propagateExceptionToFrameListeners(e);
exception(e);
}
+ finally
+ {
+ dataBlocks.clear();
+ }
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Oct 10 09:54:36 2014
@@ -44,6 +44,7 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FrameCreatingMethodProcessor;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodDispatcher;
import org.apache.qpid.framing.MethodRegistry;
@@ -91,6 +92,9 @@ public class AMQProtocolSession implemen
private final MethodRegistry _methodRegistry =
new MethodRegistry(ProtocolVersion.getLatestSupportedVersion());
+ private final FrameCreatingMethodProcessor _methodProcessor =
+ new FrameCreatingMethodProcessor(ProtocolVersion.getLatestSupportedVersion());
+
private MethodDispatcher _methodDispatcher;
private final AMQConnection _connection;
@@ -416,7 +420,8 @@ public class AMQProtocolSession implemen
_logger.debug("Setting ProtocolVersion to :" + pv);
}
_protocolVersion = pv;
- _methodRegistry.setProtocolVersion(pv);;
+ _methodRegistry.setProtocolVersion(pv);
+ _methodProcessor.setProtocolVersion(pv);
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
}
@@ -549,4 +554,9 @@ public class AMQProtocolSession implemen
{
_protocolHandler.setMaxFrameSize(frameMax);
}
+
+ public FrameCreatingMethodProcessor getMethodProcessor()
+ {
+ return _methodProcessor;
+ }
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Fri Oct 10 09:54:36 2014
@@ -30,14 +30,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
-import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQDataBlockDecoder;
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ByteArrayDataInput;
import org.apache.qpid.framing.EncodingUtils;
-import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.MethodProcessor;
import org.apache.qpid.framing.ProtocolInitiation;
/**
@@ -54,7 +53,8 @@ import org.apache.qpid.framing.ProtocolI
*/
public class AMQDecoder
{
- private final MethodRegistry _registry;
+ private final MethodProcessor _methodProcessor;
+
/** Holds the 'normal' AMQP data decoder. */
private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
@@ -73,12 +73,12 @@ public class AMQDecoder
* Creates a new AMQP decoder.
*
* @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation.
- * @param registry method registry
+ * @param methodProcessor method processor
*/
- public AMQDecoder(boolean expectProtocolInitiation, MethodRegistry registry)
+ public AMQDecoder(boolean expectProtocolInitiation, MethodProcessor methodProcessor)
{
_expectProtocolInitiation = expectProtocolInitiation;
- _registry = registry;
+ _methodProcessor = methodProcessor;
}
@@ -217,14 +217,13 @@ public class AMQDecoder
}
- public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
+ public void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
- // get prior remaining data from accumulator
- ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
MarkableDataInput msg;
+ // get prior remaining data from accumulator
ByteArrayInputStream bais;
DataInput di;
if(!_remainingBufs.isEmpty())
@@ -258,9 +257,7 @@ public class AMQDecoder
enoughData = _dataBlockDecoder.decodable(msg);
if (enoughData)
{
- dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_registry.getProtocolVersion(),
- _registry.getMethodProcessor(),
- msg));
+ _dataBlockDecoder.processInput(_methodProcessor, msg);
}
}
else
@@ -268,7 +265,7 @@ public class AMQDecoder
enoughData = _piDecoder.decodable(msg);
if (enoughData)
{
- dataBlocks.add(new ProtocolInitiation(msg));
+ _methodProcessor.receiveProtocolHeader(new ProtocolInitiation(msg));
}
}
@@ -305,6 +302,5 @@ public class AMQDecoder
}
}
}
- return dataBlocks;
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Fri Oct 10 09:54:36 2014
@@ -35,7 +35,8 @@ public class AMQDataBlockDecoder
private int _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode();
public AMQDataBlockDecoder()
- { }
+ {
+ }
public boolean decodable(MarkableDataInput in) throws AMQFrameDecodingException, IOException
{
@@ -52,9 +53,13 @@ public class AMQDataBlockDecoder
// Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
final long bodySize = in.readInt() & 0xffffffffL;
- if(bodySize > _maxFrameSize)
+ if (bodySize > _maxFrameSize)
{
- throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Incoming frame size of "+bodySize+" is larger than negotiated maximum of " + _maxFrameSize);
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
+ "Incoming frame size of "
+ + bodySize
+ + " is larger than negotiated maximum of "
+ + _maxFrameSize);
}
in.reset();
@@ -62,9 +67,8 @@ public class AMQDataBlockDecoder
}
- public <T> T createAndPopulateFrame(ProtocolVersion pv,
- MethodProcessor<T> processor,
- MarkableDataInput in)
+ public void processInput(MethodProcessor processor,
+ MarkableDataInput in)
throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
final byte type = in.readByte();
@@ -75,24 +79,24 @@ public class AMQDataBlockDecoder
// bodySize can be zero
if ((channel < 0) || (bodySize < 0))
{
- throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Undecodable frame: type = " + type + " channel = " + channel
- + " bodySize = " + bodySize);
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
+ "Undecodable frame: type = " + type + " channel = " + channel
+ + " bodySize = " + bodySize);
}
- T result;
- switch(type)
+ switch (type)
{
case 1:
- result = processMethod(channel, in, processor, pv);
+ processMethod(channel, in, processor);
break;
case 2:
- result = ContentHeaderBody.process(channel, in, processor, bodySize);
+ ContentHeaderBody.process(channel, in, processor, bodySize);
break;
case 3:
- result = ContentBody.process(channel, in, processor, bodySize);
+ ContentBody.process(channel, in, processor, bodySize);
break;
case 8:
- result = HeartbeatBody.process(channel, in, processor, bodySize);
+ HeartbeatBody.process(channel, in, processor, bodySize);
break;
default:
throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type);
@@ -101,11 +105,11 @@ public class AMQDataBlockDecoder
byte marker = in.readByte();
if ((marker & 0xFF) != 0xCE)
{
- throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "End of frame marker not found. Read " + marker + " length=" + bodySize
- + " type=" + type);
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR,
+ "End of frame marker not found. Read " + marker + " length=" + bodySize
+ + " type=" + type);
}
- return result;
}
public void setMaxFrameSize(final int maxFrameSize)
@@ -113,200 +117,277 @@ public class AMQDataBlockDecoder
_maxFrameSize = maxFrameSize;
}
- private <T> T processMethod(int channelId, MarkableDataInput in, MethodProcessor<T> dispatcher, ProtocolVersion protocolVersion)
+ private void processMethod(int channelId,
+ MarkableDataInput in,
+ MethodProcessor dispatcher)
throws AMQFrameDecodingException, IOException
{
final int classAndMethod = in.readInt();
-
switch (classAndMethod)
{
//CONNECTION_CLASS:
case 0x000a000a:
- return ConnectionStartBody.process(in, dispatcher);
+ ConnectionStartBody.process(in, dispatcher);
+ break;
case 0x000a000b:
- return ConnectionStartOkBody.process(in, dispatcher);
+ ConnectionStartOkBody.process(in, dispatcher);
+ break;
case 0x000a0014:
- return ConnectionSecureBody.process(in, dispatcher);
+ ConnectionSecureBody.process(in, dispatcher);
+ break;
case 0x000a0015:
- return ConnectionSecureOkBody.process(in, dispatcher);
+ ConnectionSecureOkBody.process(in, dispatcher);
+ break;
case 0x000a001e:
- return ConnectionTuneBody.process(in, dispatcher);
+ ConnectionTuneBody.process(in, dispatcher);
+ break;
case 0x000a001f:
- return ConnectionTuneOkBody.process(in, dispatcher);
+ ConnectionTuneOkBody.process(in, dispatcher);
+ break;
case 0x000a0028:
- return ConnectionOpenBody.process(in, dispatcher);
+ ConnectionOpenBody.process(in, dispatcher);
+ break;
case 0x000a0029:
- return ConnectionOpenOkBody.process(in, dispatcher);
+ ConnectionOpenOkBody.process(in, dispatcher);
+ break;
case 0x000a002a:
- return ConnectionRedirectBody.process(in, dispatcher);
+ ConnectionRedirectBody.process(in, dispatcher);
+ break;
case 0x000a0032:
- if (protocolVersion.equals(ProtocolVersion.v8_0))
+ if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
{
- return ConnectionRedirectBody.process(in, dispatcher);
+ ConnectionRedirectBody.process(in, dispatcher);
}
else
{
- return ConnectionCloseBody.process(in, dispatcher);
+ ConnectionCloseBody.process(in, dispatcher);
}
+ break;
case 0x000a0033:
- if (protocolVersion.equals(ProtocolVersion.v8_0))
+ if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
{
- throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion);
+ throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF),
+ dispatcher.getProtocolVersion());
}
else
{
- return dispatcher.connectionCloseOk();
+ dispatcher.receiveConnectionCloseOk();
}
+ break;
case 0x000a003c:
- if (protocolVersion.equals(ProtocolVersion.v8_0))
+ if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
{
- return ConnectionCloseBody.process(in, dispatcher);
+ ConnectionCloseBody.process(in, dispatcher);
}
else
{
- throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion);
+ throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF),
+ dispatcher.getProtocolVersion());
}
+ break;
case 0x000a003d:
- if (protocolVersion.equals(ProtocolVersion.v8_0))
+ if (dispatcher.getProtocolVersion().equals(ProtocolVersion.v8_0))
{
- return dispatcher.connectionCloseOk();
+ dispatcher.receiveConnectionCloseOk();
}
else
{
- throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion);
+ throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF),
+ dispatcher.getProtocolVersion());
}
+ break;
// CHANNEL_CLASS:
case 0x0014000a:
- return ChannelOpenBody.process(channelId, in, dispatcher);
+ ChannelOpenBody.process(channelId, in, dispatcher);
+ break;
case 0x0014000b:
- return ChannelOpenOkBody.process(channelId, in, protocolVersion, dispatcher);
+ ChannelOpenOkBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher);
+ break;
case 0x00140014:
- return ChannelFlowBody.process(channelId, in, dispatcher);
+ ChannelFlowBody.process(channelId, in, dispatcher);
+ break;
case 0x00140015:
- return ChannelFlowOkBody.process(channelId, in, dispatcher);
+ ChannelFlowOkBody.process(channelId, in, dispatcher);
+ break;
case 0x0014001e:
- return ChannelAlertBody.process(channelId, in, dispatcher);
+ ChannelAlertBody.process(channelId, in, dispatcher);
+ break;
case 0x00140028:
- return ChannelCloseBody.process(channelId, in, dispatcher);
+ ChannelCloseBody.process(channelId, in, dispatcher);
+ break;
case 0x00140029:
- return dispatcher.channelCloseOk(channelId);
+ dispatcher.receiveChannelCloseOk(channelId);
+ break;
// ACCESS_CLASS:
case 0x001e000a:
- return AccessRequestBody.process(channelId, in, dispatcher);
+ AccessRequestBody.process(channelId, in, dispatcher);
+ break;
case 0x001e000b:
- return AccessRequestOkBody.process(channelId, in, dispatcher);
+ AccessRequestOkBody.process(channelId, in, dispatcher);
+ break;
// EXCHANGE_CLASS:
case 0x0028000a:
- return ExchangeDeclareBody.process(channelId, in, dispatcher);
+ ExchangeDeclareBody.process(channelId, in, dispatcher);
+ break;
case 0x0028000b:
- return dispatcher.exchangeDeclareOk(channelId);
+ dispatcher.receiveExchangeDeclareOk(channelId);
+ break;
case 0x00280014:
- return ExchangeDeleteBody.process(channelId, in, dispatcher);
+ ExchangeDeleteBody.process(channelId, in, dispatcher);
+ break;
case 0x00280015:
- return dispatcher.exchangeDeleteOk(channelId);
+ dispatcher.receiveExchangeDeleteOk(channelId);
+ break;
case 0x00280016:
- return ExchangeBoundBody.process(channelId, in, dispatcher);
+ ExchangeBoundBody.process(channelId, in, dispatcher);
+ break;
case 0x00280017:
- return ExchangeBoundOkBody.process(channelId, in, dispatcher);
+ ExchangeBoundOkBody.process(channelId, in, dispatcher);
+ break;
// QUEUE_CLASS:
case 0x0032000a:
- return QueueDeclareBody.process(channelId, in, dispatcher);
+ QueueDeclareBody.process(channelId, in, dispatcher);
+ break;
case 0x0032000b:
- return QueueDeclareOkBody.process(channelId, in, dispatcher);
+ QueueDeclareOkBody.process(channelId, in, dispatcher);
+ break;
case 0x00320014:
- return QueueBindBody.process(channelId, in, dispatcher);
+ QueueBindBody.process(channelId, in, dispatcher);
+ break;
case 0x00320015:
- return dispatcher.queueBindOk(channelId);
+ dispatcher.receiveQueueBindOk(channelId);
+ break;
case 0x0032001e:
- return QueuePurgeBody.process(channelId, in, dispatcher);
+ QueuePurgeBody.process(channelId, in, dispatcher);
+ break;
case 0x0032001f:
- return QueuePurgeOkBody.process(channelId, in, dispatcher);
+ QueuePurgeOkBody.process(channelId, in, dispatcher);
+ break;
case 0x00320028:
- return QueueDeleteBody.process(channelId, in, dispatcher);
+ QueueDeleteBody.process(channelId, in, dispatcher);
+ break;
case 0x00320029:
- return QueueDeleteOkBody.process(channelId, in, dispatcher);
+ QueueDeleteOkBody.process(channelId, in, dispatcher);
+ break;
case 0x00320032:
- return QueueUnbindBody.process(channelId, in, dispatcher);
+ QueueUnbindBody.process(channelId, in, dispatcher);
+ break;
case 0x00320033:
- return dispatcher.queueUnbindOk(channelId);
+ dispatcher.receiveQueueUnbindOk(channelId);
+ break;
// BASIC_CLASS:
case 0x003c000a:
- return BasicQosBody.process(channelId, in, dispatcher);
+ BasicQosBody.process(channelId, in, dispatcher);
+ break;
case 0x003c000b:
- return dispatcher.basicQosOk(channelId);
+ dispatcher.receiveBasicQosOk(channelId);
+ break;
case 0x003c0014:
- return BasicConsumeBody.process(channelId, in, dispatcher);
+ BasicConsumeBody.process(channelId, in, dispatcher);
+ break;
case 0x003c0015:
- return BasicConsumeOkBody.process(channelId, in, dispatcher);
+ BasicConsumeOkBody.process(channelId, in, dispatcher);
+ break;
case 0x003c001e:
- return BasicCancelBody.process(channelId, in, dispatcher);
+ BasicCancelBody.process(channelId, in, dispatcher);
+ break;
case 0x003c001f:
- return BasicCancelOkBody.process(channelId, in, dispatcher);
+ BasicCancelOkBody.process(channelId, in, dispatcher);
+ break;
case 0x003c0028:
- return BasicPublishBody.process(channelId, in, dispatcher);
+ BasicPublishBody.process(channelId, in, dispatcher);
+ break;
case 0x003c0032:
- return BasicReturnBody.process(channelId, in, dispatcher);
+ BasicReturnBody.process(channelId, in, dispatcher);
+ break;
case 0x003c003c:
- return BasicDeliverBody.process(channelId, in, dispatcher);
+ BasicDeliverBody.process(channelId, in, dispatcher);
+ break;
case 0x003c0046:
- return BasicGetBody.process(channelId, in, dispatcher);
+ BasicGetBody.process(channelId, in, dispatcher);
+ break;
case 0x003c0047:
- return BasicGetOkBody.process(channelId, in, dispatcher);
+ BasicGetOkBody.process(channelId, in, dispatcher);
+ break;
case 0x003c0048:
- return BasicGetEmptyBody.process(channelId, in, dispatcher);
+ BasicGetEmptyBody.process(channelId, in, dispatcher);
+ break;
case 0x003c0050:
- return BasicAckBody.process(channelId, in, dispatcher);
+ BasicAckBody.process(channelId, in, dispatcher);
+ break;
case 0x003c005a:
- return BasicRejectBody.process(channelId, in, dispatcher);
+ BasicRejectBody.process(channelId, in, dispatcher);
+ break;
case 0x003c0064:
- return BasicRecoverBody.process(channelId, in, protocolVersion, dispatcher);
+ BasicRecoverBody.process(channelId, in, dispatcher.getProtocolVersion(), dispatcher);
+ break;
case 0x003c0065:
- return dispatcher.basicRecoverSyncOk(channelId);
+ dispatcher.receiveBasicRecoverSyncOk(channelId);
+ break;
case 0x003c0066:
- return BasicRecoverSyncBody.process(channelId, in, dispatcher);
+ BasicRecoverSyncBody.process(channelId, in, dispatcher);
+ break;
case 0x003c006e:
- return BasicRecoverSyncBody.process(channelId, in, dispatcher);
+ BasicRecoverSyncBody.process(channelId, in, dispatcher);
+ break;
case 0x003c006f:
- return dispatcher.basicRecoverSyncOk(channelId);
+ dispatcher.receiveBasicRecoverSyncOk(channelId);
+ break;
// TX_CLASS:
case 0x005a000a:
- return dispatcher.txSelect(channelId);
+ dispatcher.receiveTxSelect(channelId);
+ break;
case 0x005a000b:
- return dispatcher.txSelectOk(channelId);
+ dispatcher.receiveTxSelectOk(channelId);
+ break;
case 0x005a0014:
- return dispatcher.txCommit(channelId);
+ dispatcher.receiveTxCommit(channelId);
+ break;
case 0x005a0015:
- return dispatcher.txCommitOk(channelId);
+ dispatcher.receiveTxCommitOk(channelId);
+ break;
case 0x005a001e:
- return dispatcher.txRollback(channelId);
+ dispatcher.receiveTxRollback(channelId);
+ break;
case 0x005a001f:
- return dispatcher.txRollbackOk(channelId);
+ dispatcher.receiveTxRollbackOk(channelId);
+ break;
default:
- throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF), protocolVersion);
+ throw newUnknownMethodException((classAndMethod >> 16), (classAndMethod & 0xFFFF),
+ dispatcher.getProtocolVersion());
}
}
- private AMQFrameDecodingException newUnknownMethodException(final int classId, final int methodId, ProtocolVersion protocolVersion)
+ private AMQFrameDecodingException newUnknownMethodException(final int classId,
+ final int methodId,
+ ProtocolVersion protocolVersion)
{
return new AMQFrameDecodingException(AMQConstant.COMMAND_INVALID,
- "Method " + methodId + " unknown in AMQP version " + protocolVersion
- + " (while trying to decode class " + classId + " method " + methodId + ".");
+ "Method "
+ + methodId
+ + " unknown in AMQP version "
+ + protocolVersion
+ + " (while trying to decode class "
+ + classId
+ + " method "
+ + methodId
+ + ".");
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java Fri Oct 10 09:54:36 2014
@@ -165,9 +165,9 @@ public class AccessRequestBody extends A
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
AMQShortString realm = buffer.readAMQShortString();
byte bitfield = buffer.readByte();
@@ -176,6 +176,6 @@ public class AccessRequestBody extends A
boolean active = (bitfield & 0x04) == 0x4 ;
boolean write = (bitfield & 0x08) == 0x8 ;
boolean read = (bitfield & 0x10) == 0x10 ;
- return dispatcher.accessRequest(channelId, realm, exclusive, passive, active, write, read);
+ dispatcher.receiveAccessRequest(channelId, realm, exclusive, passive, active, write, read);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java Fri Oct 10 09:54:36 2014
@@ -95,10 +95,10 @@ public class AccessRequestOkBody extends
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException
{
int ticket = buffer.readUnsignedShort();
- return dispatcher.accessRequestOk(channelId, ticket);
+ dispatcher.receiveAccessRequestOk(channelId, ticket);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java Fri Oct 10 09:54:36 2014
@@ -112,13 +112,13 @@ public class BasicAckBody extends AMQMet
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
long deliveryTag = buffer.readLong();
boolean multiple = (buffer.readByte() & 0x01) != 0;
- return dispatcher.basicAck(channelId, deliveryTag, multiple);
+ dispatcher.receiveBasicAck(channelId, deliveryTag, multiple);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java Fri Oct 10 09:54:36 2014
@@ -113,13 +113,13 @@ public class BasicCancelBody extends AMQ
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
AMQShortString consumerTag = buffer.readAMQShortString();
boolean noWait = (buffer.readByte() & 0x01) == 0x01;
- return dispatcher.basicCancel(channelId, consumerTag, noWait);
+ dispatcher.receiveBasicCancel(channelId, consumerTag, noWait);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java Fri Oct 10 09:54:36 2014
@@ -96,10 +96,10 @@ public class BasicCancelOkBody extends A
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput in, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput in, final MethodProcessor dispatcher)
throws IOException
{
AMQShortString consumerTag = in.readAMQShortString();
- return dispatcher.basicCancelOk(channelId, consumerTag);
+ dispatcher.receiveBasicCancelOk(channelId, consumerTag);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java Fri Oct 10 09:54:36 2014
@@ -191,7 +191,7 @@ public class BasicConsumeBody extends AM
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException, AMQFrameDecodingException
{
@@ -205,6 +205,6 @@ public class BasicConsumeBody extends AM
boolean exclusive = (bitfield & 0x04) == 0x04;
boolean nowait = (bitfield & 0x08) == 0x08;
FieldTable arguments = EncodingUtils.readFieldTable(buffer);
- return dispatcher.basicConsume(channelId, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments);
+ dispatcher.receiveBasicConsume(channelId, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java Fri Oct 10 09:54:36 2014
@@ -96,10 +96,10 @@ public class BasicConsumeOkBody extends
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException
{
AMQShortString consumerTag = buffer.readAMQShortString();
- return dispatcher.basicConsumeOk(channelId, consumerTag);
+ dispatcher.receiveBasicConsumeOk(channelId, consumerTag);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java Fri Oct 10 09:54:36 2014
@@ -152,9 +152,9 @@ public class BasicDeliverBody extends AM
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
AMQShortString consumerTag = buffer.readAMQShortString();
@@ -162,6 +162,6 @@ public class BasicDeliverBody extends AM
boolean redelivered = (buffer.readByte() & 0x01) != 0;
AMQShortString exchange = buffer.readAMQShortString();
AMQShortString routingKey = buffer.readAMQShortString();
- return dispatcher.basicDeliver(channelId, consumerTag, deliveryTag, redelivered, exchange, routingKey);
+ dispatcher.receiveBasicDeliver(channelId, consumerTag, deliveryTag, redelivered, exchange, routingKey);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java Fri Oct 10 09:54:36 2014
@@ -125,13 +125,13 @@ public class BasicGetBody extends AMQMet
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException
{
int ticket = buffer.readUnsignedShort();
AMQShortString queue = buffer.readAMQShortString();
boolean noAck = (buffer.readByte() & 0x01) != 0;
- return dispatcher.basicGet(channelId, queue, noAck);
+ dispatcher.receiveBasicGet(channelId, queue, noAck);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java Fri Oct 10 09:54:36 2014
@@ -96,11 +96,11 @@ public class BasicGetEmptyBody extends A
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
AMQShortString clusterId = buffer.readAMQShortString();
- return dispatcher.basicGetEmpty(channelId);
+ dispatcher.receiveBasicGetEmpty(channelId);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java Fri Oct 10 09:54:36 2014
@@ -151,15 +151,15 @@ public class BasicGetOkBody extends AMQM
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
long deliveryTag = buffer.readLong();
boolean redelivered = (buffer.readByte() & 0x01) != 0;
AMQShortString exchange = buffer.readAMQShortString();
AMQShortString routingKey = buffer.readAMQShortString();
long messageCount = EncodingUtils.readUnsignedInteger(buffer);
- return dispatcher.basicGetOk(channelId, deliveryTag, redelivered, exchange, routingKey, messageCount);
+ dispatcher.receiveBasicGetOk(channelId, deliveryTag, redelivered, exchange, routingKey, messageCount);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java Fri Oct 10 09:54:36 2014
@@ -151,9 +151,9 @@ public class BasicPublishBody extends AM
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
int ticket = buffer.readUnsignedShort();
@@ -163,6 +163,6 @@ public class BasicPublishBody extends AM
boolean mandatory = (bitfield & 0x01) != 0;
boolean immediate = (bitfield & 0x02) != 0;
- return dispatcher.basicPublish(channelId, exchange, routingKey, mandatory, immediate);
+ dispatcher.receiveBasicPublish(channelId, exchange, routingKey, mandatory, immediate);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java Fri Oct 10 09:54:36 2014
@@ -124,14 +124,14 @@ public class BasicQosBody extends AMQMet
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
long prefetchSize = EncodingUtils.readUnsignedInteger(buffer);
int prefetchCount = buffer.readUnsignedShort();
boolean global = (buffer.readByte() & 0x01) == 0x01;
- return dispatcher.basicQos(channelId, prefetchSize, prefetchCount, global);
+ dispatcher.receiveBasicQos(channelId, prefetchSize, prefetchCount, global);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java Fri Oct 10 09:54:36 2014
@@ -100,14 +100,14 @@ public class BasicRecoverBody extends AM
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput in,
final ProtocolVersion protocolVersion,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
boolean requeue = (in.readByte() & 0x01) == 0x01;
boolean sync = (ProtocolVersion.v8_0.equals(protocolVersion));
- return dispatcher.basicRecover(channelId, requeue, sync);
+ dispatcher.receiveBasicRecover(channelId, requeue, sync);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java Fri Oct 10 09:54:36 2014
@@ -103,11 +103,11 @@ public class BasicRecoverSyncBody extend
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput in,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
boolean requeue = (in.readByte() & 0x01) == 0x01;
- return dispatcher.basicRecover(channelId, requeue, true);
+ dispatcher.receiveBasicRecover(channelId, requeue, true);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java Fri Oct 10 09:54:36 2014
@@ -112,13 +112,13 @@ public class BasicRejectBody extends AMQ
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
long deliveryTag = buffer.readLong();
boolean requeue = (buffer.readByte() & 0x01) != 0;
- return dispatcher.basicReject(channelId, deliveryTag, requeue);
+ dispatcher.receiveBasicReject(channelId, deliveryTag, requeue);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java Fri Oct 10 09:54:36 2014
@@ -134,15 +134,15 @@ public class BasicReturnBody extends AMQ
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
int replyCode = buffer.readUnsignedShort();
AMQShortString replyText = buffer.readAMQShortString();
AMQShortString exchange = buffer.readAMQShortString();
AMQShortString routingKey = buffer.readAMQShortString();
- return dispatcher.basicReturn(channelId, replyCode, replyText, exchange, routingKey);
+ dispatcher.receiveBasicReturn(channelId, replyCode, replyText, exchange, routingKey);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java Fri Oct 10 09:54:36 2014
@@ -121,13 +121,13 @@ public class ChannelAlertBody extends AM
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException, AMQFrameDecodingException
{
int replyCode = buffer.readUnsignedShort();
AMQShortString replyText = buffer.readAMQShortString();
FieldTable details = EncodingUtils.readFieldTable(buffer);
- return dispatcher.channelAlert(channelId, replyCode, replyText, details);
+ dispatcher.receiveChannelAlert(channelId, replyCode, replyText, details);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java Fri Oct 10 09:54:36 2014
@@ -132,15 +132,15 @@ public class ChannelCloseBody extends AM
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
int replyCode = buffer.readUnsignedShort();
AMQShortString replyText = buffer.readAMQShortString();
int classId = buffer.readUnsignedShort();
int methodId = buffer.readUnsignedShort();
- return dispatcher.channelClose(channelId, replyCode, replyText, classId, methodId);
+ dispatcher.receiveChannelClose(channelId, replyCode, replyText, classId, methodId);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java Fri Oct 10 09:54:36 2014
@@ -92,11 +92,11 @@ public class ChannelFlowBody extends AMQ
return buf.toString();
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
boolean active = (buffer.readByte() & 0x01) == 0x01;
- return dispatcher.channelFlow(channelId, active);
+ dispatcher.receiveChannelFlow(channelId, active);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java Fri Oct 10 09:54:36 2014
@@ -93,10 +93,10 @@ public class ChannelFlowOkBody extends A
return buf.toString();
}
- public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher)
+ public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher)
throws IOException
{
boolean active = (buffer.readByte() & 0x01) == 0x01;
- return dispatcher.channelFlowOk(channelId, active);
+ dispatcher.receiveChannelFlowOk(channelId, active);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java Fri Oct 10 09:54:36 2014
@@ -82,11 +82,11 @@ public class ChannelOpenBody extends AMQ
return "[ChannelOpenBody] ";
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput buffer,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
buffer.readAMQShortString();
- return dispatcher.channelOpen(channelId);
+ dispatcher.receiveChannelOpen(channelId);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java Fri Oct 10 09:54:36 2014
@@ -96,16 +96,16 @@ public class ChannelOpenOkBody extends A
return "[ChannelOpenOkBody]";
}
- public static <T> T process(final int channelId,
+ public static void process(final int channelId,
final MarkableDataInput in,
final ProtocolVersion protocolVersion,
- final MethodProcessor<T> dispatcher) throws IOException
+ final MethodProcessor dispatcher) throws IOException
{
if(!ProtocolVersion.v8_0.equals(protocolVersion))
{
EncodingUtils.readBytes(in);
}
- return dispatcher.channelOpenOk(channelId);
+ dispatcher.receiveChannelOpenOk(channelId);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java Fri Oct 10 09:54:36 2014
@@ -134,12 +134,12 @@ public class ConnectionCloseBody extends
return buf.toString();
}
- public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
{
int replyCode = buffer.readUnsignedShort();
AMQShortString replyText = buffer.readAMQShortString();
int classId = buffer.readUnsignedShort();
int methodId = buffer.readUnsignedShort();
- return dispatcher.connectionClose(replyCode, replyText, classId, methodId);
+ dispatcher.receiveConnectionClose(replyCode, replyText, classId, methodId);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java Fri Oct 10 09:54:36 2014
@@ -121,12 +121,12 @@ public class ConnectionOpenBody extends
return buf.toString();
}
- public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
{
AMQShortString virtualHost = buffer.readAMQShortString();
AMQShortString capabilities = buffer.readAMQShortString();
boolean insist = (buffer.readByte() & 0x01) == 0x01;
- return dispatcher.connectionOpen(virtualHost, capabilities, insist);
+ dispatcher.receiveConnectionOpen(virtualHost, capabilities, insist);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java Fri Oct 10 09:54:36 2014
@@ -96,10 +96,10 @@ public class ConnectionOpenOkBody extend
return buf.toString();
}
- public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
{
AMQShortString knownHosts = buffer.readAMQShortString();
- return dispatcher.connectionOpenOk(knownHosts);
+ dispatcher.receiveConnectionOpenOk(knownHosts);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java Fri Oct 10 09:54:36 2014
@@ -108,10 +108,10 @@ public class ConnectionRedirectBody exte
return buf.toString();
}
- public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
{
AMQShortString host = buffer.readAMQShortString();
AMQShortString knownHosts = buffer.readAMQShortString();
- return dispatcher.connectionRedirect(host, knownHosts);
+ dispatcher.receiveConnectionRedirect(host, knownHosts);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java Fri Oct 10 09:54:36 2014
@@ -96,11 +96,11 @@ public class ConnectionSecureBody extend
return buf.toString();
}
- public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher)
+ public static void process(final MarkableDataInput in, final MethodProcessor dispatcher)
throws IOException, AMQFrameDecodingException
{
byte[] challenge = EncodingUtils.readBytes(in);
- return dispatcher.connectionSecure(challenge);
+ dispatcher.receiveConnectionSecure(challenge);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java Fri Oct 10 09:54:36 2014
@@ -96,9 +96,9 @@ public class ConnectionSecureOkBody exte
return buf.toString();
}
- public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher) throws IOException
+ public static void process(final MarkableDataInput in, final MethodProcessor dispatcher) throws IOException
{
byte[] response = EncodingUtils.readBytes(in);
- return dispatcher.connectionSecureOk(response);
+ dispatcher.receiveConnectionSecureOk(response);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java Fri Oct 10 09:54:36 2014
@@ -136,7 +136,7 @@ public class ConnectionStartBody extends
return buf.toString();
}
- public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher)
+ public static void process(final MarkableDataInput in, final MethodProcessor dispatcher)
throws IOException, AMQFrameDecodingException
{
short versionMajor = (short) in.readUnsignedByte();
@@ -146,6 +146,6 @@ public class ConnectionStartBody extends
byte[] locales = EncodingUtils.readBytes(in);
- return dispatcher.connectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales);
+ dispatcher.receiveConnectionStart(versionMajor, versionMinor, serverProperties, mechanisms, locales);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java Fri Oct 10 09:54:36 2014
@@ -126,7 +126,7 @@ public class ConnectionStartOkBody exten
return buf.toString();
}
- public static <T> T process(final MarkableDataInput in, final MethodProcessor<T> dispatcher)
+ public static void process(final MarkableDataInput in, final MethodProcessor dispatcher)
throws IOException, AMQFrameDecodingException
{
@@ -135,6 +135,6 @@ public class ConnectionStartOkBody exten
byte[] response = EncodingUtils.readBytes(in);
AMQShortString locale = in.readAMQShortString();
- return dispatcher.connectionStartOk(clientProperties, mechanism, response, locale);
+ dispatcher.receiveConnectionStartOk(clientProperties, mechanism, response, locale);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java Fri Oct 10 09:54:36 2014
@@ -119,12 +119,12 @@ public class ConnectionTuneBody extends
return buf.toString();
}
- public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException
+ public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException
{
int channelMax = buffer.readUnsignedShort();
long frameMax = EncodingUtils.readUnsignedInteger(buffer);
int heartbeat = buffer.readUnsignedShort();
- return dispatcher.connectionTune(channelMax, frameMax, heartbeat);
+ dispatcher.receiveConnectionTune(channelMax, frameMax, heartbeat);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org