You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/03/22 14:15:14 UTC
svn commit: r521253 [9/10] - in /incubator/qpid/branches/java.multi_version:
./ gentools/src/org/apache/qpid/gentools/ gentools/templ.cpp/
gentools/templ.cpp/class/ gentools/templ.cpp/field/
gentools/templ.cpp/method/ gentools/templ.cpp/model/ gentools...
Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (from r511389, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java&r1=511389&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Mar 22 06:14:42 2007
@@ -43,10 +43,11 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MainRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.VersionSpecificRegistry;
import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.VersionSpecificRegistry;
+import org.apache.qpid.framing.MainRegistry;
+import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.protocol.AMQConstant;
@@ -81,7 +82,7 @@
* The handler from which this session was created and which is used to handle protocol events.
* We send failover events to the handler.
*/
- protected final AMQProtocolHandler _protocolHandler;
+ protected final AMQProtocolHandlerImpl _protocolHandler;
/**
* Maps from the channel id to the AMQSession that it represents.
@@ -102,9 +103,10 @@
protected int _queueId = 1;
protected final Object _queueIdLock = new Object();
- private byte _protocolMinorVersion;
- private byte _protocolMajorVersion;
- private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
+ private MethodRegistry _registry = MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
+ private ProtocolVersion _protocolVersion;
+ private ProtocolOutputHandler _outputHandler = ProtocolOutputHandlerFactory.createOutputHandler(ProtocolVersion.getLatestSupportedVersion(), this);
+ ;
/**
@@ -118,7 +120,7 @@
_stateManager = new AMQStateManager(this);
}
- public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
+ public AMQProtocolSession(AMQProtocolHandlerImpl protocolHandler, IoSession protocolSession, AMQConnection connection)
{
_protocolHandler = protocolHandler;
_minaProtocolSession = protocolSession;
@@ -129,7 +131,7 @@
_stateManager = new AMQStateManager(this);
}
- public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
+ public AMQProtocolSession(AMQProtocolHandlerImpl protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
{
_protocolHandler = protocolHandler;
_minaProtocolSession = protocolSession;
@@ -471,26 +473,27 @@
session.confirmConsumerCancelled(consumerTag);
}
- public void setProtocolVersion(final byte versionMajor, final byte versionMinor)
+ public void setProtocolVersion(ProtocolVersion pv)
{
- _protocolMajorVersion = versionMajor;
- _protocolMinorVersion = versionMinor;
- _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
+ _protocolVersion = pv;
+
+ _registry = MethodRegistry.getMethodRegistry(pv);
}
- public byte getProtocolMinorVersion()
+ public ProtocolVersion getProtocolVersion()
{
- return _protocolMinorVersion;
+ return _protocolVersion;
}
- public byte getProtocolMajorVersion()
+ public MethodRegistry getRegistry()
{
- return _protocolMajorVersion;
+ return _registry;
}
- public VersionSpecificRegistry getRegistry()
+ public ProtocolOutputHandler getOutputHandler()
{
- return _registry;
+ return _outputHandler;
}
+
}
Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Thu Mar 22 06:14:42 2007
@@ -23,11 +23,12 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-public abstract class BlockingMethodFrameListener implements AMQMethodListener
+public abstract class BlockingMethodFrameListener<T extends AMQMethodBody> implements AMQMethodListener
{
private volatile boolean _ready = false;
@@ -43,7 +44,7 @@
protected int _channelId;
- protected AMQMethodEvent _doneEvt = null;
+ protected AMQMethodEvent<T> _doneEvt = null;
public BlockingMethodFrameListener(int channelId)
{
@@ -91,7 +92,7 @@
/**
* This method is called by the thread that wants to wait for a frame.
*/
- public AMQMethodEvent blockForFrame(long timeout) throws AMQException
+ public AMQMethodEvent<T> blockForFrame(long timeout) throws AMQException
{
synchronized (_lock)
{
Added: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandler.java?view=auto&rev=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandler.java (added)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandler.java Thu Mar 22 06:14:42 2007
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.client.protocol;
+
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodFactory;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
+import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Map;
+import java.util.HashMap;
+
+
+public interface ProtocolOutputHandler
+{
+
+ void sendCommand(int channelId, AMQMethodBody command);
+
+ AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException;
+ AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command, long timeout) throws AMQException;
+ <T extends AMQMethodBody> T sendCommandReceiveResponse(int channelId, AMQMethodBody command, Class<T> responseClass, long timeout) throws AMQException;
+ <T extends AMQMethodBody> T sendCommandReceiveResponse(int channelId, AMQMethodBody command, Class<T> responseClass) throws AMQException;
+
+ AMQMethodFactory getAMQMethodFactory();
+
+ void publishMessage(int channelId, AMQShortString exchangeName, AMQShortString routingKey, boolean immediate, boolean mandatory, ByteBuffer payload, CommonContentHeaderProperties contentHeaderProperties, int ticket);
+
+ <M extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<M> evt) throws Exception;
+
+ void error(Exception e);
+}
Added: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandlerFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandlerFactory.java?view=auto&rev=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandlerFactory.java (added)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandlerFactory.java Thu Mar 22 06:14:42 2007
@@ -0,0 +1,50 @@
+package org.apache.qpid.client.protocol;
+
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.client.protocol.amqp_8_0.ProtocolOutputHandler_8_0;
+
+import java.util.Map;
+import java.util.HashMap;
+
+public abstract class ProtocolOutputHandlerFactory
+{
+ private static final Map<ProtocolVersion, ProtocolOutputHandlerFactory> _handlers =
+ new HashMap<ProtocolVersion, ProtocolOutputHandlerFactory>();
+
+ public ProtocolOutputHandlerFactory(ProtocolVersion pv)
+ {
+ _handlers.put(pv,this);
+ }
+
+ public abstract ProtocolOutputHandler newInstance(AMQProtocolSession amqProtocolSession);
+
+ public static ProtocolOutputHandler createOutputHandler(ProtocolVersion version, AMQProtocolSession amqProtocolSession)
+ {
+ return _handlers.get(version).newInstance(amqProtocolSession);
+ }
+
+ private static final ProtocolOutputHandlerFactory VERSION_8_0 =
+ new ProtocolOutputHandlerFactory(new ProtocolVersion((byte)8,(byte)0))
+ {
+
+ public ProtocolOutputHandler newInstance(AMQProtocolSession amqProtocolSession)
+ {
+ return new ProtocolOutputHandler_8_0(amqProtocolSession);
+ }
+ };
+
+ // TODO - HACK
+
+ private static final ProtocolOutputHandlerFactory VERSION_0_9 =
+ new ProtocolOutputHandlerFactory(new ProtocolVersion((byte)0,(byte)9))
+ {
+
+ public ProtocolOutputHandler newInstance(AMQProtocolSession amqProtocolSession)
+ {
+ return new ProtocolOutputHandler_8_0(amqProtocolSession);
+ }
+ };
+
+
+
+}
Added: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/amqp_8_0/ProtocolOutputHandler_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/amqp_8_0/ProtocolOutputHandler_8_0.java?view=auto&rev=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/amqp_8_0/ProtocolOutputHandler_8_0.java (added)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/amqp_8_0/ProtocolOutputHandler_8_0.java Thu Mar 22 06:14:42 2007
@@ -0,0 +1,278 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.protocol.amqp_8_0;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.*;
+import org.apache.qpid.client.protocol.ProtocolOutputHandler;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
+import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+public class ProtocolOutputHandler_8_0 implements ProtocolOutputHandler
+{
+ private static final AMQMethodFactory METHOD_FACTORY = new AMQMethodFactory_8_0() ;
+
+
+ private static final Map<Class<? extends AMQMethodBody>, Class<? extends AMQMethodBody>> REQUSET_RESPONSE_METHODBODY_MAP =
+ new HashMap<Class<? extends AMQMethodBody>, Class<? extends AMQMethodBody>>();
+
+ static
+ {
+ // Basic Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(BasicCancelBody.class, BasicCancelOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(BasicConsumeBody.class, BasicConsumeOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(BasicQosBody.class, BasicQosOkBody.class);
+ // GET ???
+ REQUSET_RESPONSE_METHODBODY_MAP.put(BasicRecoverBody.class, BasicRecoverOkBody.class);
+
+ // Channel Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ChannelCloseBody.class, ChannelCloseOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ChannelFlowBody.class, ChannelFlowOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ChannelOpenBody.class, ChannelOpenOkBody.class);
+
+ // Connection Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ConnectionOpenBody.class, ConnectionOpenOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ConnectionSecureBody.class, ConnectionSecureOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ConnectionStartBody.class, ConnectionStartOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ConnectionTuneBody.class, ConnectionTuneOkBody.class);
+
+ // Exchange Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ExchangeBoundBody.class, ExchangeBoundOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ExchangeDeclareBody.class, ExchangeDeclareOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(ExchangeDeleteBody.class, ExchangeDeleteOkBody.class);
+
+ // Queue Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(QueueBindBody.class, QueueBindOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(QueueDeclareBody.class, QueueDeclareOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(QueueDeleteBody.class, QueueDeleteOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(QueuePurgeBody.class, QueuePurgeOkBody.class);
+
+ // Tx Class
+ REQUSET_RESPONSE_METHODBODY_MAP.put(TxCommitBody.class, TxCommitOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(TxRollbackBody.class, TxRollbackOkBody.class);
+ REQUSET_RESPONSE_METHODBODY_MAP.put(TxSelectBody.class, TxSelectOkBody.class);
+
+ }
+
+
+
+
+
+ private final AMQProtocolSession _session;
+ private static final long DEFAULT_TIMEOUT = 30000;
+ private final CopyOnWriteArraySet<SpecificMethodFrameListener> _frameListeners =
+ new CopyOnWriteArraySet<SpecificMethodFrameListener>();
+
+ public ProtocolOutputHandler_8_0(AMQProtocolSession amqProtocolSession)
+ {
+ _session = amqProtocolSession;
+ }
+
+
+
+
+ private void writeFrame(AMQDataBlock frame)
+ {
+ _session.writeFrame(frame);
+ }
+
+ public void sendCommand(int channelId, AMQMethodBody command)
+ {
+ _session.writeFrame(new AMQFrame(channelId,command));
+ }
+
+ public AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException
+ {
+ return sendCommandReceiveResponse(channelId, command, REQUSET_RESPONSE_METHODBODY_MAP.get(command.getClass()));
+ }
+
+ public AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command, long timeout) throws AMQException
+ {
+ return sendCommandReceiveResponse(channelId, command, REQUSET_RESPONSE_METHODBODY_MAP.get(command.getClass()), timeout);
+ }
+
+ public <T extends AMQMethodBody> T sendCommandReceiveResponse(int channelId, AMQMethodBody command, Class<T> responseClass, long timeout) throws AMQException
+ {
+ AMQFrame frame = new AMQFrame(channelId,command);
+ return writeCommandFrameAndWaitForReply(frame,
+ new SpecificMethodFrameListener<T>(channelId, responseClass), timeout);
+ }
+
+ private <T extends AMQMethodBody> T writeCommandFrameAndWaitForReply(AMQFrame frame, SpecificMethodFrameListener<T> listener, long timeout) throws AMQException
+ {
+ try
+ {
+ _frameListeners.add(listener);
+ _session.writeFrame(frame);
+
+ AMQMethodEvent<T> e = listener.blockForFrame(timeout);
+ return e.getMethod();
+ // When control resumes before this line, a reply will have been received
+ // that matches the criteria defined in the blocking listener
+ }
+ finally
+ {
+ // If we don't removeKey the listener then no-one will
+ _frameListeners.remove(listener);
+ }
+
+
+ }
+
+ public <T extends AMQMethodBody> T sendCommandReceiveResponse(int channelId, AMQMethodBody command, Class<T> responseClass) throws AMQException
+ {
+ return sendCommandReceiveResponse(channelId, command, responseClass, DEFAULT_TIMEOUT);
+ }
+
+ public AMQMethodFactory getAMQMethodFactory()
+ {
+ return METHOD_FACTORY;
+ }
+
+
+ public void publishMessage(int channelId, AMQShortString exchangeName, AMQShortString routingKey, boolean immediate, boolean mandatory, ByteBuffer payload, CommonContentHeaderProperties contentHeaderProperties, int ticket)
+ {
+ final int size = (payload != null) ? payload.limit() : 0;
+ BasicPublishBodyImpl publishBody = new BasicPublishBodyImpl(ticket, exchangeName, routingKey, mandatory, immediate);
+
+
+ final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
+ final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
+
+ if (payload != null)
+ {
+ createContentBodies(payload, frames, 2, channelId);
+ }
+
+
+
+ AMQFrame contentHeaderFrame =
+ ContentHeaderBody.createAMQFrame(channelId,
+ publishBody.CLASS_ID,
+ 0, // weight
+ contentHeaderProperties,
+ size);
+
+ frames[0] = new AMQFrame(channelId,publishBody);
+ frames[1] = contentHeaderFrame;
+ CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+ writeFrame(compositeFrame);
+ }
+
+ public <M extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<M> evt) throws AMQException
+ {
+ boolean wasAnyoneInterested = false;
+ if (!_frameListeners.isEmpty())
+ {
+ Iterator<SpecificMethodFrameListener> it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final SpecificMethodFrameListener listener = it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
+ }
+
+ return wasAnyoneInterested;
+ }
+
+ public void error(Exception e)
+ {
+ if (!_frameListeners.isEmpty())
+ {
+ final Iterator<SpecificMethodFrameListener> it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final SpecificMethodFrameListener ml = it.next();
+ ml.error(e);
+ }
+ }
+ }
+
+
+ /**
+ * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+ * maximum frame size.
+ *
+ * @param payload
+ * @param frames
+ * @param offset
+ * @param channelId @return the array of content bodies
+ */
+ private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
+ {
+
+ if (frames.length == (offset + 1))
+ {
+ frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
+ }
+ else
+ {
+
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ long remaining = payload.remaining();
+ for (int i = offset; i < frames.length; i++)
+ {
+ payload.position((int) framePayloadMax * (i - offset));
+ int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
+ payload.limit(payload.position() + length);
+ frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
+
+ remaining -= length;
+ }
+ }
+
+ }
+
+ private int calculateContentBodyFrameCount(ByteBuffer payload)
+ {
+ // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+ // (0xCE byte).
+ int frameCount;
+ if ((payload == null) || (payload.remaining() == 0))
+ {
+ frameCount = 0;
+ }
+ else
+ {
+ int dataLength = payload.remaining();
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
+ frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
+ }
+
+ return frameCount;
+ }
+
+
+
+}
Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Thu Mar 22 06:14:42 2007
@@ -27,34 +27,20 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.handler.BasicCancelOkMethodHandler;
-import org.apache.qpid.client.handler.BasicDeliverMethodHandler;
-import org.apache.qpid.client.handler.BasicReturnMethodHandler;
-import org.apache.qpid.client.handler.ChannelCloseMethodHandler;
-import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler;
-import org.apache.qpid.client.handler.ChannelFlowOkMethodHandler;
-import org.apache.qpid.client.handler.ConnectionCloseMethodHandler;
-import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
-import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
-import org.apache.qpid.client.handler.ConnectionStartMethodHandler;
-import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
-import org.apache.qpid.client.handler.ExchangeBoundOkMethodHandler;
-import org.apache.qpid.client.handler.QueueDeleteOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.BasicCancelOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.BasicDeliverMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ChannelCloseOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ChannelFlowOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionOpenOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionTuneMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ExchangeBoundOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.QueueDeleteOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionCloseMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionSecureMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.BasicReturnMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.*;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionOpenOkBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionStartBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -62,7 +48,7 @@
* The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
* there is a separate state manager.
*/
-public class AMQStateManager implements AMQMethodListener
+public class AMQStateManager
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
private AMQProtocolSession _protocolSession;
@@ -178,7 +164,7 @@
StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
{
- handler.methodReceived(this, _protocolSession, evt);
+ handler.methodReceived(this, evt);
return true;
}
return false;
Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java Thu Mar 22 06:14:42 2007
@@ -31,6 +31,6 @@
*/
public interface StateAwareMethodListener
{
- void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession,
- AMQMethodEvent evt) throws AMQException;
+ void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException;
+
}
Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java Thu Mar 22 06:14:42 2007
@@ -22,13 +22,14 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.framing.AMQMethodBody;
-public class SpecificMethodFrameListener extends BlockingMethodFrameListener
+public class SpecificMethodFrameListener<T extends AMQMethodBody> extends BlockingMethodFrameListener
{
private final Class _expectedClass;
- public SpecificMethodFrameListener(int channelId, Class expectedClass)
+ public SpecificMethodFrameListener(int channelId, Class<T> expectedClass)
{
super(channelId);
_expectedClass = expectedClass;
Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java Thu Mar 22 06:14:42 2007
@@ -22,11 +22,11 @@
import java.io.IOException;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
import org.apache.qpid.jms.BrokerDetails;
public interface ITransportConnection
{
- void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail)
+ void connect(AMQProtocolHandlerImpl protocolHandler, BrokerDetails brokerDetail)
throws IOException;
}
Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java Thu Mar 22 06:14:42 2007
@@ -30,7 +30,7 @@
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
@@ -50,7 +50,7 @@
_socketConnectorFactory = socketConnectorFactory;
}
- public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail)
+ public void connect(AMQProtocolHandlerImpl protocolHandler, BrokerDetails brokerDetail)
throws IOException
{
ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java Thu Mar 22 06:14:42 2007
@@ -27,7 +27,7 @@
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.PoolingFilter;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
@@ -43,7 +43,7 @@
_port = port;
}
- public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
+ public void connect(AMQProtocolHandlerImpl protocolHandler, BrokerDetails brokerDetail) throws IOException
{
final VmPipeConnector ioConnector = new VmPipeConnector();
final IoServiceConfig cfg = ioConnector.getDefaultConfig();
Modified: incubator/qpid/branches/java.multi_version/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java Thu Mar 22 06:14:42 2007
@@ -21,6 +21,8 @@
package org.apache.qpid.codec;
import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.BasicDeliverBodyImpl;
+
import org.apache.mina.common.*;
import org.apache.mina.common.support.BaseIoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
@@ -238,7 +240,7 @@
return new CompositeAMQDataBlock(frames);
}
- static AMQFrame wrapBody(AMQBody body)
+ static AMQFrame wrapBody(AMQBodyImpl body)
{
AMQFrame frame = new AMQFrame(1, body);
return frame;
@@ -264,13 +266,11 @@
return body;
}
- static BasicDeliverBody createBasicDeliverBody()
+ static BasicDeliverBodyImpl createBasicDeliverBody()
{
- BasicDeliverBody body = new BasicDeliverBody((byte) 8, (byte) 0,
- BasicDeliverBody.getClazz((byte) 8, (byte) 0),
- BasicDeliverBody.getMethod((byte) 8, (byte) 0),
- new AMQShortString("myConsumerTag"), 1,
- new AMQShortString("myExchange"), false,
+ BasicDeliverBodyImpl body = new BasicDeliverBodyImpl(
+ new AMQShortString("myConsumerTag"), 1,false,
+ new AMQShortString("myExchange"),
new AMQShortString("myRoutingKey"));
return body;
}
Modified: incubator/qpid/branches/java.multi_version/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java Thu Mar 22 06:14:42 2007
@@ -118,8 +118,8 @@
//decode
buffer.flip();
- header = new ContentHeaderBody();
- header.populateFromBuffer(buffer, size);
+ header = new ContentHeaderBody(buffer, size);
+
return ((BasicContentHeaderProperties) header.properties).getHeaders();
}
Modified: incubator/qpid/branches/java.multi_version/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java Thu Mar 22 06:14:42 2007
@@ -46,13 +46,14 @@
return _handler;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
_logger.debug("ChannelClose method received");
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
- AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
- AMQShortString reason = method.replyText;
+ AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+ AMQShortString reason = method.getReplyText();
if (_logger.isDebugEnabled())
{
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
Modified: incubator/qpid/branches/java.multi_version/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java Thu Mar 22 06:14:42 2007
@@ -44,6 +44,9 @@
import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.amqp_8_0.ChannelCloseOkBodyImpl;
+import org.apache.qpid.framing.amqp_8_0.ExchangeDeclareBodyImpl;
+import org.apache.qpid.framing.amqp_8_0.ChannelOpenBodyImpl;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.url.URLSyntaxException;
@@ -270,9 +273,7 @@
private void sendClose(int channel)
{
- AMQFrame frame = ChannelCloseOkBody.createAMQFrame(channel,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion());
+ AMQFrame frame = new AMQFrame(channel, new ChannelCloseOkBodyImpl());
((AMQConnection) _connection).getProtocolHandler().writeFrame(frame);
}
@@ -332,37 +333,29 @@
private void declareExchange(int channelId, String _type, String _name, boolean nowait) throws AMQException
{
- AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(channelId,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(),
- null, // arguments
- false, // autoDelete
- false, // durable
- new AMQShortString(_name), // exchange
- false, // internal
- nowait, // nowait
- true, // passive
- 0, // ticket
- new AMQShortString(_type)); // type
+ ExchangeDeclareBody exchangeDeclareBody =
+ ((AMQConnection) _connection).getProtocolOutputHandler().getAMQMethodFactory().createExchangeDeclare(new AMQShortString(_name),new AMQShortString(_type),0);
+// new ExchangeDeclareBodyImpl(0,new AMQShortString(_name),new AMQShortString(_type),true,false,false,false,nowait,null);
+
+ //AMQFrame exchangeDeclare = new AMQFrame(channelId, exchangeDeclareBody);
if (nowait)
{
- ((AMQConnection) _connection).getProtocolHandler().writeFrame(exchangeDeclare);
+ ((AMQConnection) _connection).getProtocolOutputHandler().sendCommand(channelId, exchangeDeclareBody);
+
}
else
{
- ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT);
+ ((AMQConnection) _connection).getProtocolOutputHandler().sendCommandReceiveResponse(channelId, exchangeDeclareBody, SYNC_TIMEOUT);
}
}
private void createChannel(int channelId) throws AMQException
{
- ((AMQConnection) _connection).getProtocolHandler().syncWrite(
- ChannelOpenBody.createAMQFrame(channelId,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(),
- null), // outOfBand
- ChannelOpenOkBody.class);
+
+ ChannelOpenBody openBody =
+ ((AMQConnection) _connection).getProtocolOutputHandler().getAMQMethodFactory().createChannelOpen();
+ ((AMQConnection) _connection).getProtocolOutputHandler().sendCommandReceiveResponse(channelId, openBody);
}
Modified: incubator/qpid/branches/java.multi_version/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java Thu Mar 22 06:14:42 2007
@@ -22,18 +22,14 @@
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.handler.ConnectionStartMethodHandler;
-import org.apache.qpid.client.handler.ConnectionCloseMethodHandler;
-import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
-import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
-import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
-import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler;
-import org.apache.qpid.client.handler.BasicDeliverMethodHandler;
-import org.apache.qpid.client.handler.BasicReturnMethodHandler;
-import org.apache.qpid.client.handler.BasicCancelOkMethodHandler;
-import org.apache.qpid.client.handler.ChannelFlowOkMethodHandler;
-import org.apache.qpid.client.handler.QueueDeleteOkMethodHandler;
-import org.apache.qpid.client.handler.ExchangeBoundOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionTuneMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ChannelCloseOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.BasicDeliverMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ChannelFlowOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ExchangeBoundOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.BasicReturnMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionStartMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.*;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ConnectionCloseBody;
Modified: incubator/qpid/branches/java.multi_version/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java Thu Mar 22 06:14:42 2007
@@ -24,7 +24,7 @@
import org.apache.mina.common.IoSession;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.AMQShortString;
@@ -36,7 +36,7 @@
{
}
- public AMQProtSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
+ public AMQProtSession(AMQProtocolHandlerImpl protocolHandler, IoSession protocolSession, AMQConnection connection)
{
super(protocolHandler,protocolSession,connection);
}
Copied: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java (from r511389, incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java&r1=511389&p2=incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java Thu Mar 22 06:14:42 2007
@@ -21,12 +21,12 @@
package org.apache.qpid.server.cluster;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQBodyImpl;
public class AMQUnexpectedBodyTypeException extends AMQException
{
- public AMQUnexpectedBodyTypeException(Class<? extends AMQBody> expectedClass, AMQBody body)
+ public AMQUnexpectedBodyTypeException(Class<? extends AMQBodyImpl> expectedClass, AMQBodyImpl body)
{
super("Unexpected body type. Expected: " + expectedClass.getName() + "; got: " + body.getClass().getName());
}
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java Thu Mar 22 06:14:42 2007
@@ -20,26 +20,26 @@
*/
package org.apache.qpid.server.cluster;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
public class BlockingHandler implements ResponseHandler
{
private final Class _expected;
private boolean _completed;
- private AMQMethodBody _response;
+ private AMQMethodBodyImpl _response;
public BlockingHandler()
{
- this(AMQMethodBody.class);
+ this(AMQMethodBodyImpl.class);
}
- public BlockingHandler(Class<? extends AMQMethodBody> expected)
+ public BlockingHandler(Class<? extends AMQMethodBodyImpl> expected)
{
_expected = expected;
}
- public void responded(AMQMethodBody response)
+ public void responded(AMQMethodBodyImpl response)
{
if (_expected.isInstance(response))
{
@@ -74,7 +74,7 @@
}
}
- AMQMethodBody getResponse()
+ AMQMethodBodyImpl getResponse()
{
return _response;
}
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java Thu Mar 22 06:14:42 2007
@@ -22,7 +22,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.log4j.Logger;
import java.io.IOException;
@@ -88,7 +88,7 @@
* @param response the response received
* @return true if the response matched an outstanding request
*/
- protected synchronized boolean handleResponse(int channel, AMQMethodBody response)
+ protected synchronized boolean handleResponse(int channel, AMQMethodBodyImpl response)
{
ResponseHandler request = _requests.get(channel);
if (request == null)
@@ -174,7 +174,7 @@
/**
* Start connection process, including replay
*/
- abstract void connectAsynch(Iterable<AMQMethodBody> msgs);
+ abstract void connectAsynch(Iterable<AMQMethodBodyImpl> msgs);
/**
* Replay messages to the remote peer this instance represents. These messages
@@ -182,7 +182,7 @@
*
* @param msgs
*/
- abstract void replay(Iterable<AMQMethodBody> msgs);
+ abstract void replay(Iterable<AMQMethodBodyImpl> msgs);
/**
* establish connection, handling redirect if required...
@@ -200,7 +200,7 @@
this.channel = channel;
}
- public void responded(AMQMethodBody response)
+ public void responded(AMQMethodBodyImpl response)
{
request.responseReceived(Broker.this, response);
_requests.remove(channel);
@@ -228,7 +228,7 @@
this.channel = channel;
}
- public void responded(AMQMethodBody response)
+ public void responded(AMQMethodBodyImpl response)
{
handler.responded(response);
_requests.remove(channel);
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java Thu Mar 22 06:14:42 2007
@@ -24,7 +24,7 @@
import org.apache.qpid.server.cluster.replay.ReplayManager;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.cluster.util.InvokeMultiple;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import java.io.IOException;
import java.util.ArrayList;
@@ -225,7 +225,7 @@
if (create)
{
Broker b = _factory.create(handle);
- List<AMQMethodBody> msgs = _replayMgr.replay(isLeader(_local));
+ List<AMQMethodBodyImpl> msgs = _replayMgr.replay(isLeader(_local));
_logger.info(new LogMessage("Replaying {0} from {1} to {2}", msgs, _local, b));
b.connectAsynch(msgs);
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java Thu Mar 22 06:14:42 2007
@@ -26,7 +26,7 @@
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
/**
* Hack to assist with reuse of the client handlers for connection setup in
@@ -49,7 +49,7 @@
_stateMgr = stateMgr;
}
- public void handle(int channel, AMQMethodBody method) throws AMQException
+ public void handle(int channel, AMQMethodBodyImpl method) throws AMQException
{
AMQMethodEvent evt = new AMQMethodEvent(channel, method);
_stateMgr.methodReceived(evt);
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java Thu Mar 22 06:14:42 2007
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.server.cluster;
-import org.apache.qpid.client.handler.ConnectionCloseMethodHandler;
-import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
-import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
-import org.apache.qpid.client.handler.ConnectionStartMethodHandler;
-import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionCloseMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionOpenOkMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionSecureMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionTuneMethodHandler;
+import org.apache.qpid.client.handler.amqp_8_0.ConnectionStartMethodHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.IllegalStateTransitionException;
@@ -78,14 +78,14 @@
return registry;
}
- protected StateAwareMethodListener findStateTransitionHandler(AMQState state, AMQMethodBody frame) throws IllegalStateTransitionException
+ protected StateAwareMethodListener findStateTransitionHandler(AMQState state, AMQMethodBodyImpl frame) throws IllegalStateTransitionException
{
ClientRegistry registry = _handlers.get(state);
return registry == null ? null : registry.getHandler(frame);
}
- <A extends Class<AMQMethodBody>> void addHandlers(Class type, StateAwareMethodListener handler, AMQState... states)
+ <A extends Class<AMQMethodBodyImpl>> void addHandlers(Class type, StateAwareMethodListener handler, AMQState... states)
{
for (AMQState state : states)
{
@@ -93,7 +93,7 @@
}
}
- <A extends Class<AMQMethodBody>> void addHandler(Class type, StateAwareMethodListener handler, AMQState state)
+ <A extends Class<AMQMethodBodyImpl>> void addHandler(Class type, StateAwareMethodListener handler, AMQState state)
{
ClientRegistry registry = _handlers.get(state);
if (registry == null)
@@ -106,15 +106,15 @@
static class ClientRegistry
{
- private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener> registry
- = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener>();
+ private final Map<Class<? extends AMQMethodBodyImpl>, StateAwareMethodListener> registry
+ = new HashMap<Class<? extends AMQMethodBodyImpl>, StateAwareMethodListener>();
- <A extends Class<AMQMethodBody>> void add(A type, StateAwareMethodListener handler)
+ <A extends Class<AMQMethodBodyImpl>> void add(A type, StateAwareMethodListener handler)
{
registry.put(type, handler);
}
- StateAwareMethodListener getHandler(AMQMethodBody frame)
+ StateAwareMethodListener getHandler(AMQMethodBodyImpl frame)
{
return registry.get(frame.getClass());
}
@@ -122,9 +122,9 @@
class ConnectionTuneHandler extends ConnectionTuneMethodHandler
{
- protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist, byte major, byte minor)
+ protected AMQFrame createConnectionOpenBody(int channel, AMQShortString path, AMQShortString capabilities, boolean insist, byte major, byte minor)
{
- return super.createConnectionOpenFrame(channel, path, new AMQShortString(ClusterCapability.add(capabilities, _identity)), insist, major, minor);
+ return super.createConnectionOpenBody(channel, path, new AMQShortString(ClusterCapability.add(capabilities, _identity)), insist, major, minor);
}
}
}
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java Thu Mar 22 06:14:42 2007
@@ -24,17 +24,14 @@
import org.apache.mina.common.IoSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ConnectionOpenBody;
import org.apache.qpid.framing.ConnectionSecureOkBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
import org.apache.qpid.framing.ClusterMembershipBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.cluster.util.LogMessage;
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java Thu Mar 22 06:14:42 2007
@@ -230,7 +230,7 @@
//connect to the host and port specified:
Broker prospect = connectToProspect(member);
announceMembership();
- List<AMQMethodBody> msgs = _replayMgr.replay(true);
+ List<AMQMethodBodyImpl> msgs = _replayMgr.replay(true);
_logger.info(new LogMessage("Replaying {0} from leader to {1}", msgs, prospect));
prospect.replay(msgs);
}
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java Thu Mar 22 06:14:42 2007
@@ -21,7 +21,7 @@
package org.apache.qpid.server.cluster;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import java.util.ArrayList;
import java.util.HashMap;
@@ -35,7 +35,7 @@
*/
class GroupRequest
{
- private final Map<Member, AMQMethodBody> _responses = new HashMap<Member, AMQMethodBody>();
+ private final Map<Member, AMQMethodBodyImpl> _responses = new HashMap<Member, AMQMethodBodyImpl>();
private final List<Member> _brokers = new ArrayList<Member>();
private boolean _sent;
@@ -62,7 +62,7 @@
return checkCompletion();
}
- public boolean responseReceived(Member broker, AMQMethodBody response)
+ public boolean responseReceived(Member broker, AMQMethodBodyImpl response)
{
_responses.put(broker, response);
return checkCompletion();
@@ -90,9 +90,9 @@
return true;
}
- List<AMQMethodBody> getResults()
+ List<AMQMethodBodyImpl> getResults()
{
- List<AMQMethodBody> results = new ArrayList<AMQMethodBody>(_brokers.size());
+ List<AMQMethodBodyImpl> results = new ArrayList<AMQMethodBodyImpl>(_brokers.size());
for (Member b : _brokers)
{
results.add(_responses.get(b));
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java Thu Mar 22 06:14:42 2007
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.server.cluster;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import java.util.List;
public interface GroupResponseHandler
{
//Note: this implies that the response to a group request will always be a method body...
- public void response(List<AMQMethodBody> responses, List<Member> members);
+ public void response(List<AMQMethodBodyImpl> responses, List<Member> members);
}
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java Thu Mar 22 06:14:42 2007
@@ -21,9 +21,9 @@
package org.apache.qpid.server.cluster;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
interface MethodHandler
{
- public void handle(int channel, AMQMethodBody method) throws AMQException;
+ public void handle(int channel, AMQMethodBodyImpl method) throws AMQException;
}
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java Thu Mar 22 06:14:42 2007
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.cluster;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.server.state.StateAwareMethodListener;
import java.util.HashMap;
@@ -28,16 +28,16 @@
public class MethodHandlerRegistry
{
- private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> registry =
- new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
+ private final Map<Class<? extends AMQMethodBodyImpl>, StateAwareMethodListener<? extends AMQMethodBodyImpl>> registry =
+ new HashMap<Class<? extends AMQMethodBodyImpl>, StateAwareMethodListener<? extends AMQMethodBodyImpl>>();
- public <A extends AMQMethodBody, B extends Class<A>> MethodHandlerRegistry addHandler(B type, StateAwareMethodListener<A> handler)
+ public <A extends AMQMethodBodyImpl, B extends Class<A>> MethodHandlerRegistry addHandler(B type, StateAwareMethodListener<A> handler)
{
registry.put(type, handler);
return this;
}
- public <B extends AMQMethodBody> StateAwareMethodListener<B> getHandler(B frame)
+ public <B extends AMQMethodBodyImpl> StateAwareMethodListener<B> getHandler(B frame)
{
return (StateAwareMethodListener<B>) registry.get(frame.getClass());
}
Copied: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java (from r511389, incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java&r1=511389&p2=incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java Thu Mar 22 06:14:42 2007
@@ -33,10 +33,10 @@
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQBodyImpl;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.framing.ConnectionRedirectBody;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
@@ -57,7 +57,7 @@
private final MemberHandle _local;
private IoSession _session;
private MethodHandler _handler;
- private Iterable<AMQMethodBody> _replay;
+ private Iterable<AMQMethodBodyImpl> _replay;
MinaBrokerProxy(String host, int port, MemberHandle local)
{
@@ -106,13 +106,13 @@
return _connectionMonitor.waitUntilOpen();
}
- void connectAsynch(Iterable<AMQMethodBody> msgs)
+ void connectAsynch(Iterable<AMQMethodBodyImpl> msgs)
{
_replay = msgs;
connectImpl();
}
- void replay(Iterable<AMQMethodBody> msgs)
+ void replay(Iterable<AMQMethodBodyImpl> msgs)
{
_replay = msgs;
if(_connectionMonitor.isOpened())
@@ -158,14 +158,14 @@
{
if(_replay != null)
{
- for(AMQMethodBody b : _replay)
+ for(AMQMethodBodyImpl b : _replay)
{
_session.write(new AMQFrame(0, b));
}
}
}
- public void handle(int channel, AMQMethodBody method) throws AMQException
+ public void handle(int channel, AMQMethodBodyImpl method) throws AMQException
{
_logger.info(new LogMessage("Handling method: {0} for channel {1}", method, channel));
if (!handleResponse(channel, method))
@@ -174,7 +174,7 @@
}
}
- private void handleMethod(int channel, AMQMethodBody method) throws AMQException
+ private void handleMethod(int channel, AMQMethodBodyImpl method) throws AMQException
{
if (method instanceof ConnectionRedirectBody)
{
@@ -200,14 +200,14 @@
private void handleFrame(AMQFrame frame) throws AMQException
{
- AMQBody body = frame.getBodyFrame();
- if (body instanceof AMQMethodBody)
+ AMQBodyImpl body = frame.getBodyFrame();
+ if (body instanceof AMQMethodBodyImpl)
{
- handleMethod(frame.getChannel(), (AMQMethodBody) body);
+ handleMethod(frame.getChannel(), (AMQMethodBodyImpl) body);
}
else
{
- throw new AMQUnexpectedBodyTypeException(AMQMethodBody.class, body);
+ throw new AMQUnexpectedBodyTypeException(AMQMethodBodyImpl.class, body);
}
}
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java Thu Mar 22 06:14:42 2007
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.server.cluster;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
public interface ResponseHandler
{
- public void responded(AMQMethodBody response);
+ public void responded(AMQMethodBodyImpl response);
public void removed();
}
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java Thu Mar 22 06:14:42 2007
@@ -21,14 +21,12 @@
package org.apache.qpid.server.cluster;
import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.IllegalStateTransitionException;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -74,7 +72,7 @@
}
}
- protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState state, B frame) throws IllegalStateTransitionException
+ protected <B extends AMQMethodBodyImpl> StateAwareMethodListener<B> findStateTransitionHandler(AMQState state, B frame) throws IllegalStateTransitionException
{
MethodHandlerRegistry registry = _handlers.get(state);
StateAwareMethodListener<B> handler = (registry == null) ? null : registry.getHandler(frame);
@@ -85,7 +83,7 @@
return handler;
}
- <A extends AMQMethodBody, B extends Class<A>> void addHandler(AMQState state, B type, StateAwareMethodListener<A> handler)
+ <A extends AMQMethodBodyImpl, B extends Class<A>> void addHandler(AMQState state, B type, StateAwareMethodListener<A> handler)
{
MethodHandlerRegistry registry = _handlers.get(state);
if (registry == null)
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java Thu Mar 22 06:14:42 2007
@@ -18,16 +18,16 @@
package org.apache.qpid.server.cluster;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQBodyImpl;
import org.apache.qpid.framing.AMQFrame;
/**
*/
public class SimpleBodySendable implements Sendable
{
- private final AMQBody _body;
+ private final AMQBodyImpl _body;
- public SimpleBodySendable(AMQBody body)
+ public SimpleBodySendable(AMQBodyImpl body)
{
_body = body;
}
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -21,18 +21,14 @@
package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import java.util.List;
import java.util.ArrayList;
-public class ChainedClusterMethodHandler <A extends AMQMethodBody> extends ClusterMethodHandler<A>
+public class ChainedClusterMethodHandler <A extends AMQMethodBodyImpl> extends ClusterMethodHandler<A>
{
private final List<ClusterMethodHandler<A>> _handlers;
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -20,17 +20,15 @@
*/
package org.apache.qpid.server.cluster.handler;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.AMQException;
-public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A>
+public abstract class ClusterMethodHandler<A extends AMQMethodBodyImpl> implements StateAwareMethodListener<A>
{
public final void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java Thu Mar 22 06:14:42 2007
@@ -222,17 +222,17 @@
}
}
- private <B extends AMQMethodBody> ReplicatingHandler<B> replicated(StateAwareMethodListener<B> handler)
+ private <B extends AMQMethodBodyImpl> ReplicatingHandler<B> replicated(StateAwareMethodListener<B> handler)
{
return new ReplicatingHandler<B>(_groupMgr, handler);
}
- private <B extends AMQMethodBody> StateAwareMethodListener<B> alternate(StateAwareMethodListener<B> peer, StateAwareMethodListener<B> client)
+ private <B extends AMQMethodBodyImpl> StateAwareMethodListener<B> alternate(StateAwareMethodListener<B> peer, StateAwareMethodListener<B> client)
{
return new PeerHandler<B>(peer, client);
}
- private <B extends AMQMethodBody> StateAwareMethodListener<B> chain(ClusterMethodHandler<B>... h)
+ private <B extends AMQMethodBodyImpl> StateAwareMethodListener<B> chain(ClusterMethodHandler<B>... h)
{
return new ChainedClusterMethodHandler<B>(h);
}
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java Thu Mar 22 06:14:42 2007
@@ -21,15 +21,12 @@
package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-class ExtendedHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A>
+class ExtendedHandler<A extends AMQMethodBodyImpl> implements StateAwareMethodListener<A>
{
private final StateAwareMethodListener<A> _base;
Modified: incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java (original)
+++ incubator/qpid/branches/java.multi_version/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java Thu Mar 22 06:14:42 2007
@@ -21,15 +21,12 @@
package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.framing.AMQMethodBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T>
+public class NullListener<T extends AMQMethodBodyImpl> implements StateAwareMethodListener<T>
{
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<T> evt) throws AMQException
{