You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/08 11:10:11 UTC
svn commit: r619823 [11/19] - in
/incubator/qpid/branches/thegreatmerge/qpid: ./ cpp/ dotnet/
dotnet/Qpid.Buffer.Tests/Properties/ dotnet/Qpid.Buffer/Properties/
dotnet/Qpid.Client.Tests/ dotnet/Qpid.Client.Tests/Channel/
dotnet/Qpid.Client.Tests/Commo...
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Fri Feb 8 02:09:37 2008
@@ -36,11 +36,18 @@
import javax.jms.TextMessage;
import javax.jms.Topic;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.CompositeAMQDataBlock;
import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +58,7 @@
private AMQConnection _connection;
/**
- * If true, messages will not get a timestamp.
+ * If true, messages will not get a timestamp.
*/
protected boolean _disableTimestamps;
@@ -103,7 +110,7 @@
private long _producerId;
/**
- * The session used to create this producer
+ * The session used to create this producer
*/
protected AMQSession _session;
@@ -118,8 +125,8 @@
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
- boolean waitUntilSent)
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
+ boolean waitUntilSent)
{
_connection = connection;
_destination = destination;
@@ -146,7 +153,24 @@
}
}
- public abstract void declareDestination(AMQDestination destination);
+ private void declareDestination(AMQDestination destination)
+ {
+ ExchangeDeclareBody body = getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(),
+ destination.getExchangeName(),
+ destination.getExchangeClass(),
+ false,
+ false,
+ false,
+ false,
+ true,
+ null);
+ // Declare the exchange
+ // Note that the durable and internal arguments are ignored since passive is set to false
+
+ AMQFrame declare = body.generateFrame(_channelId);
+
+ _protocolHandler.writeFrame(declare);
+ }
public void setDisableMessageID(boolean b) throws JMSException
{
@@ -181,7 +205,7 @@
if ((i != DeliveryMode.NON_PERSISTENT) && (i != DeliveryMode.PERSISTENT))
{
throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i
- + " is illegal");
+ + " is illegal");
}
_deliveryMode = i;
@@ -293,7 +317,7 @@
{
validateDestination(destination);
sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory,
- _immediate);
+ _immediate);
}
}
@@ -310,7 +334,7 @@
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
- boolean mandatory) throws JMSException
+ boolean mandatory) throws JMSException
{
checkPreConditions();
checkDestination(destination);
@@ -322,7 +346,7 @@
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate) throws JMSException
+ boolean mandatory, boolean immediate) throws JMSException
{
checkPreConditions();
checkDestination(destination);
@@ -334,7 +358,7 @@
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
+ boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
{
checkPreConditions();
checkDestination(destination);
@@ -342,7 +366,7 @@
{
validateDestination(destination);
sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
- waitUntilSent);
+ waitUntilSent);
}
}
@@ -388,7 +412,7 @@
else
{
throw new JMSException("Unable to send message, due to class conversion error: "
- + message.getClass().getName());
+ + message.getClass().getName());
}
}
}
@@ -398,14 +422,14 @@
if (!(destination instanceof AMQDestination))
{
throw new JMSException("Unsupported destination class: "
- + ((destination != null) ? destination.getClass() : null));
+ + ((destination != null) ? destination.getClass() : null));
}
declareDestination((AMQDestination) destination);
}
protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate) throws JMSException
+ boolean mandatory, boolean immediate) throws JMSException
{
sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
}
@@ -420,16 +444,27 @@
* @param timeToLive
* @param mandatory
* @param immediate
+ *
* @throws JMSException
*/
protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate, boolean wait) throws JMSException
+ boolean mandatory, boolean immediate, boolean wait) throws JMSException
{
checkTemporaryDestination(destination);
origMessage.setJMSDestination(destination);
AbstractJMSMessage message = convertToNativeMessage(origMessage);
+ if (_transacted)
+ {
+ if (_session.hasFailedOver() && _session.isDirty())
+ {
+ throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.",
+ new AMQSessionDirtyException("Failover has occurred and session is dirty " +
+ "so unable to send."));
+ }
+ }
+
if (_disableMessageId)
{
message.setJMSMessageID(null);
@@ -458,8 +493,82 @@
// message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
- sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive,
- mandatory, immediate, wait);
+
+ BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
+ destination.getExchangeName(),
+ destination.getRoutingKey(),
+ mandatory,
+ immediate);
+
+ AMQFrame publishFrame = body.generateFrame(_channelId);
+
+ message.prepareForSending();
+ ByteBuffer payload = message.getData();
+ BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
+
+ if (!_disableTimestamps)
+ {
+ final long currentTime = System.currentTimeMillis();
+ contentHeaderProperties.setTimestamp(currentTime);
+
+ if (timeToLive > 0)
+ {
+ contentHeaderProperties.setExpiration(currentTime + timeToLive);
+ }
+ else
+ {
+ contentHeaderProperties.setExpiration(0);
+ }
+ }
+
+ contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
+ contentHeaderProperties.setPriority((byte) priority);
+
+ final int size = (payload != null) ? payload.limit() : 0;
+ final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
+ final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
+
+ if (payload != null)
+ {
+ createContentBodies(payload, frames, 2, _channelId);
+ }
+
+ if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
+ {
+ _logger.debug("Sending content body frames to " + destination);
+ }
+
+
+ // TODO: This is a hacky way of getting the AMQP class-id for the Basic class
+ int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
+
+ AMQFrame contentHeaderFrame =
+ ContentHeaderBody.createAMQFrame(_channelId,
+ classIfForBasic, 0, contentHeaderProperties, size);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Sending content header frame to " + destination);
+ }
+
+ frames[0] = publishFrame;
+ frames[1] = contentHeaderFrame;
+ CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+ _protocolHandler.writeFrame(compositeFrame, wait);
+
+ if (message != origMessage)
+ {
+ _logger.debug("Updating original message");
+ origMessage.setJMSPriority(message.getJMSPriority());
+ origMessage.setJMSTimestamp(message.getJMSTimestamp());
+ _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+ origMessage.setJMSExpiration(message.getJMSExpiration());
+ origMessage.setJMSMessageID(message.getJMSMessageID());
+ }
+
+ if (_transacted)
+ {
+ _session.markDirty();
+ }
}
public abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, int deliveryMode,
@@ -485,6 +594,60 @@
}
}
+ /**
+ * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+ * maximum frame size.
+ *
+ * @param payload
+ * @param frames
+ * @param offset
+ * @param channelId @return the array of content bodies
+ */
+ private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
+ {
+
+ if (frames.length == (offset + 1))
+ {
+ frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
+ }
+ else
+ {
+
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ long remaining = payload.remaining();
+ for (int i = offset; i < frames.length; i++)
+ {
+ payload.position((int) framePayloadMax * (i - offset));
+ int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
+ payload.limit(payload.position() + length);
+ frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
+
+ remaining -= length;
+ }
+ }
+
+ }
+
+ private int calculateContentBodyFrameCount(ByteBuffer payload)
+ {
+ // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+ // (0xCE byte).
+ int frameCount;
+ if ((payload == null) || (payload.remaining() == 0))
+ {
+ frameCount = 0;
+ }
+ else
+ {
+ int dataLength = payload.remaining();
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
+ frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
+ }
+
+ return frameCount;
+ }
+
public void setMimeType(String mimeType) throws JMSException
{
checkNotClosed();
@@ -520,7 +683,7 @@
if ((_destination != null) && (suppliedDestination != null))
{
throw new UnsupportedOperationException(
- "This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
+ "This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
}
if (suppliedDestination == null)
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Fri Feb 8 02:09:37 2008
@@ -47,20 +47,20 @@
public void declareDestination(AMQDestination destination)
{
+ ExchangeDeclareBody body = getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(),
+ destination.getExchangeName(),
+ destination.getExchangeClass(),
+ false,
+ false,
+ false,
+ false,
+ true,
+ null);
// Declare the exchange
// Note that the durable and internal arguments are ignored since passive is set to false
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame declare =
- ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), null, // arguments
- false, // autoDelete
- false, // durable
- destination.getExchangeName(), // exchange
- false, // internal
- true, // nowait
- false, // passive
- _session.getTicket(), // ticket
- destination.getExchangeClass()); // type
+
+ AMQFrame declare = body.generateFrame(_channelId);
+
_protocolHandler.writeFrame(declare);
}
@@ -70,13 +70,13 @@
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame publishFrame =
- BasicPublishBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), // exchange
- immediate, // immediate
- mandatory, // mandatory
- destination.getRoutingKey(), // routingKey
- _session.getTicket()); // ticket
+ BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
+ destination.getExchangeName(),
+ destination.getRoutingKey(),
+ mandatory,
+ immediate);
+
+ AMQFrame publishFrame = body.generateFrame(_channelId);
message.prepareForSending();
ByteBuffer payload = message.getData();
@@ -114,13 +114,17 @@
_logger.debug("Sending content body frames to " + destination);
}
+ // TODO: This is a hacky way of getting the AMQP class-id for the Basic class
+ int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
+
// weight argument of zero indicates no child content headers, just bodies
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+
AMQFrame contentHeaderFrame =
ContentHeaderBody.createAMQFrame(_channelId,
- BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size);
+ classIfForBasic, 0, contentHeaderProperties, size);
+
if (_logger.isDebugEnabled())
{
_logger.debug("Sending content header frame to " + destination);
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java Fri Feb 8 02:09:37 2008
@@ -29,9 +29,10 @@
public enum CustomJMSXProperty
{
JMS_AMQP_NULL,
- JMS_QPID_DESTTYPE,
+ JMS_QPID_DESTTYPE,
JMSXGroupID,
- JMSXGroupSeq;
+ JMSXGroupSeq,
+ JMSXUserID;
private final AMQShortString _nameAsShortString;
@@ -47,7 +48,7 @@
}
private static Enumeration _names;
-
+
public static synchronized Enumeration asEnumeration()
{
if(_names == null)
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java Fri Feb 8 02:09:37 2008
@@ -21,18 +21,18 @@
package org.apache.qpid.client;
import org.apache.qpid.AMQException;
-import org.apache.qpid.jms.TopicSubscriber;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
/**
* Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
*
*/
-class TopicSubscriberAdaptor implements org.apache.qpid.jms.TopicSubscriber
+class TopicSubscriberAdaptor implements TopicSubscriber
{
private final Topic _topic;
private final BasicMessageConsumer _consumer;
Added: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -0,0 +1,36 @@
+package org.apache.qpid.client.handler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public class AccessRequestOkMethodHandler implements StateAwareMethodListener<AccessRequestOkBody>
+{
+ private static final Logger _logger = LoggerFactory.getLogger(AccessRequestOkMethodHandler.class);
+
+ private static AccessRequestOkMethodHandler _handler = new AccessRequestOkMethodHandler();
+
+ public static AccessRequestOkMethodHandler getInstance()
+ {
+ return _handler;
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AccessRequestOkBody method, int channelId)
+ throws AMQException
+ {
+ _logger.debug("AccessRequestOk method received");
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+ session.setTicket(method.getTicket(), channelId);
+
+ }
+}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -25,12 +25,13 @@
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BasicCancelOkMethodHandler implements StateAwareMethodListener
+public class BasicCancelOkMethodHandler implements StateAwareMethodListener<BasicCancelOkBody>
{
private static final Logger _logger = LoggerFactory.getLogger(BasicCancelOkMethodHandler.class);
@@ -44,16 +45,18 @@
private BasicCancelOkMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ public void methodReceived(AMQStateManager stateManager, BasicCancelOkBody body, int channelId)
throws AMQException
{
- BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+
if (_logger.isInfoEnabled())
{
- _logger.info("New BasicCancelOk method received for consumer:" + body.consumerTag);
+ _logger.info("New BasicCancelOk method received for consumer:" + body.getConsumerTag());
}
- protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);
+ session.confirmConsumerCancelled(channelId, body.getConsumerTag());
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -30,7 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BasicDeliverMethodHandler implements StateAwareMethodListener
+public class BasicDeliverMethodHandler implements StateAwareMethodListener<BasicDeliverBody>
{
private static final Logger _logger = LoggerFactory.getLogger(BasicDeliverMethodHandler.class);
@@ -41,18 +41,12 @@
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ public void methodReceived(AMQStateManager stateManager, BasicDeliverBody body, int channelId)
throws AMQException
{
- BasicDeliverBody deliveryBody = (BasicDeliverBody) evt.getMethod();
- final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(
- evt.getChannelId(),
- deliveryBody.deliveryTag,
- deliveryBody.consumerTag.asString(),
- deliveryBody.getExchange(),
- deliveryBody.getRoutingKey(),
- deliveryBody.getRedelivered());
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+ final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(channelId, body);
_logger.debug("New JmsDeliver method received");
- protocolSession.unprocessedMessageReceived(msg);
+ session.unprocessedMessageReceived(msg);
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -21,16 +21,15 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.ReturnMessage;
+import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BasicReturnMethodHandler implements StateAwareMethodListener
+public class BasicReturnMethodHandler implements StateAwareMethodListener<BasicReturnBody>
{
private static final Logger _logger = LoggerFactory.getLogger(BasicReturnMethodHandler.class);
@@ -41,18 +40,16 @@
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ public void methodReceived(AMQStateManager stateManager, BasicReturnBody body, int channelId)
throws AMQException
{
- BasicReturnBody returnBody = (BasicReturnBody)evt.getMethod();
_logger.debug("New JmsBounce method received");
- final ReturnMessage msg = new ReturnMessage(evt.getChannelId(),
- returnBody.getExchange(),
- returnBody.getRoutingKey(),
- returnBody.getReplyText(),
- returnBody.getReplyCode()
- );
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+ /** FIXME: TGM AS SRSLY 4RL */
+ final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(channelId, body);
- protocolSession.unprocessedMessageReceived(msg);
+ session.unprocessedMessageReceived(msg);
}
+
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -38,7 +38,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ChannelCloseMethodHandler implements StateAwareMethodListener
+public class ChannelCloseMethodHandler implements StateAwareMethodListener<ChannelCloseBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseMethodHandler.class);
@@ -49,22 +49,26 @@
return _handler;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId)
throws AMQException
{
_logger.debug("ChannelClose method received");
- ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
- AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
- AMQShortString reason = method.replyText;
+
+ AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+ AMQShortString reason = method.getReplyText();
if (_logger.isDebugEnabled())
{
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
}
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor());
- protocolSession.writeFrame(frame);
+
+
+ ChannelCloseOkBody body = session.getMethodRegistry().createChannelCloseOkBody();
+ AMQFrame frame = body.generateFrame(channelId);
+ session.writeFrame(frame);
+
if (errorCode != AMQConstant.REPLY_SUCCESS)
{
if (_logger.isDebugEnabled())
@@ -100,6 +104,6 @@
}
// fixme why is this only done when the close is expected...
// should the above forced closes not also cause a close?
- protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
+ session.channelClosed(channelId, errorCode, String.valueOf(reason));
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -21,6 +21,7 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
@@ -29,7 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ChannelCloseOkMethodHandler implements StateAwareMethodListener
+public class ChannelCloseOkMethodHandler implements StateAwareMethodListener<ChannelCloseOkBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseOkMethodHandler.class);
@@ -40,11 +41,12 @@
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ public void methodReceived(AMQStateManager stateManager, ChannelCloseOkBody method, int channelId)
throws AMQException
{
- _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
+ _logger.info("Received channel-close-ok for channel-id " + channelId);
+ final AMQProtocolSession session = stateManager.getProtocolSession();
// todo this should do the local closure
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -30,7 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ChannelFlowOkMethodHandler implements StateAwareMethodListener
+public class ChannelFlowOkMethodHandler implements StateAwareMethodListener<ChannelFlowOkBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ChannelFlowOkMethodHandler.class);
private static final ChannelFlowOkMethodHandler _instance = new ChannelFlowOkMethodHandler();
@@ -43,10 +43,12 @@
private ChannelFlowOkMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
- throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ChannelFlowOkBody body, int channelId)
+ throws AMQException
{
- ChannelFlowOkBody method = (ChannelFlowOkBody) evt.getMethod();
- _logger.debug("Received Channel.Flow-Ok message, active = " + method.active);
+
+ _logger.debug("Received Channel.Flow-Ok message, active = " + body.getActive());
}
+
+
}
Added: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java Fri Feb 8 02:09:37 2008
@@ -0,0 +1,528 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQMethodNotImplementedException;
+
+
+public class ClientMethodDispatcherImpl implements MethodDispatcher
+{
+
+
+ private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance();
+ private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance();
+ private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
+ private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
+ private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
+ private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
+ private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
+ private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
+ private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance();
+ private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance();
+ private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance();
+ private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance();
+ private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance();
+
+
+
+ private static interface DispatcherFactory
+ {
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager);
+ }
+
+ private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
+ new HashMap<ProtocolVersion, DispatcherFactory>();
+
+ static
+ {
+ _dispatcherFactories.put(ProtocolVersion.v8_0,
+ new DispatcherFactory()
+ {
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
+ {
+ return new ClientMethodDispatcherImpl_8_0(stateManager);
+ }
+ });
+
+ _dispatcherFactories.put(ProtocolVersion.v0_9,
+ new DispatcherFactory()
+ {
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
+ {
+ return new ClientMethodDispatcherImpl_0_9(stateManager);
+ }
+ });
+
+ }
+
+
+ public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQStateManager stateManager)
+ {
+ DispatcherFactory factory = _dispatcherFactories.get(version);
+ return factory.createMethodDispatcher(stateManager);
+ }
+
+
+
+
+ private AMQStateManager _stateManager;
+
+ public ClientMethodDispatcherImpl(AMQStateManager stateManager)
+ {
+ _stateManager = stateManager;
+ }
+
+
+ public AMQStateManager getStateManager()
+ {
+ return _stateManager;
+ }
+
+ public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
+ {
+ _basicCancelOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException
+ {
+ _basicDeliverMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException
+ {
+ _basicReturnMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
+ {
+ _channelCloseMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
+ {
+ _channelFlowOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
+ {
+ _connectionCloseMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException
+ {
+ _connectionOpenOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException
+ {
+ _connectionRedirectMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException
+ {
+ _connectionSecureMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException
+ {
+ _connectionStartMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException
+ {
+ _connectionTuneMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException
+ {
+ _queueDeleteOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
+ {
+ _exchangeBoundOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ return true;
+ }
+
+ public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+}
Added: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java Fri Feb 8 02:09:37 2008
@@ -0,0 +1,155 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQMethodNotImplementedException;
+
+
+public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9
+{
+ public ClientMethodDispatcherImpl_0_9(AMQStateManager stateManager)
+ {
+ super(stateManager);
+ }
+
+
+ public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
+ {
+ throw new AMQMethodNotImplementedException(body);
+ }
+
+ public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+
+}
Added: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java Fri Feb 8 02:09:37 2008
@@ -0,0 +1,65 @@
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.state.AMQStateManager;
+
+public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0
+{
+ public ClientMethodDispatcherImpl_8_0(AMQStateManager stateManager)
+ {
+ super(stateManager);
+ }
+
+ public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -36,7 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConnectionCloseMethodHandler implements StateAwareMethodListener
+public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ConnectionCloseMethodHandler.class);
@@ -50,24 +50,26 @@
private ConnectionCloseMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
- throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody method, int channelId)
+ throws AMQException
{
_logger.info("ConnectionClose frame received");
- ConnectionCloseBody method = (ConnectionCloseBody) evt.getMethod();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+
// does it matter
// stateManager.changeState(AMQState.CONNECTION_CLOSING);
- AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
- AMQShortString reason = method.replyText;
+ AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+ AMQShortString reason = method.getReplyText();
try
{
+
+ ConnectionCloseOkBody closeOkBody = session.getMethodRegistry().createConnectionCloseOkBody();
// TODO: check whether channel id of zero is appropriate
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short) 0, method.getMajor(),
- method.getMinor()));
+ session.writeFrame(closeOkBody.generateFrame(0));
if (errorCode != AMQConstant.REPLY_SUCCESS)
{
@@ -75,7 +77,7 @@
{
_logger.info("Authentication Error:" + Thread.currentThread().getName());
- protocolSession.closeProtocolSession();
+ session.closeProtocolSession();
// todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
@@ -94,9 +96,11 @@
{
// this actually closes the connection in the case where it is not an error.
- protocolSession.closeProtocolSession();
+ session.closeProtocolSession();
stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
}
+
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -21,13 +21,14 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.protocol.AMQMethodEvent;
-public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener
+public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<ConnectionOpenOkBody>
{
private static final ConnectionOpenOkMethodHandler _instance = new ConnectionOpenOkMethodHandler();
@@ -40,9 +41,11 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionOpenOkBody body, int channelId)
+ throws AMQException
{
stateManager.changeState(AMQState.CONNECTION_OPEN);
}
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -30,7 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConnectionRedirectMethodHandler implements StateAwareMethodListener
+public class ConnectionRedirectMethodHandler implements StateAwareMethodListener<ConnectionRedirectBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ConnectionRedirectMethodHandler.class);
@@ -46,13 +46,13 @@
private ConnectionRedirectMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
- throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionRedirectBody method, int channelId)
+ throws AMQException
{
_logger.info("ConnectionRedirect frame received");
- ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
- String host = method.host.toString();
+ String host = method.getHost().toString();
// the host is in the form hostname:port with the port being optional
int portIndex = host.indexOf(':');
@@ -68,6 +68,7 @@
}
- protocolSession.failover(host, port);
+ session.failover(host, port);
}
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -32,7 +32,7 @@
import org.apache.qpid.framing.ConnectionSecureOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-public class ConnectionSecureMethodHandler implements StateAwareMethodListener
+public class ConnectionSecureMethodHandler implements StateAwareMethodListener<ConnectionSecureBody>
{
private static final ConnectionSecureMethodHandler _instance = new ConnectionSecureMethodHandler();
@@ -41,27 +41,26 @@
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionSecureBody body, int channelId)
+ throws AMQException
{
- SaslClient client = protocolSession.getSaslClient();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+ SaslClient client = session.getSaslClient();
if (client == null)
{
throw new AMQException(null, "No SASL client set up - cannot proceed with authentication", null);
}
- ConnectionSecureBody body = (ConnectionSecureBody) evt.getMethod();
+
try
{
// Evaluate server challenge
- byte[] response = client.evaluateChallenge(body.challenge);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(),
- body.getMajor(), body.getMinor(),
- response); // response
- protocolSession.writeFrame(responseFrame);
+ byte[] response = client.evaluateChallenge(body.getChallenge());
+
+ ConnectionSecureOkBody secureOkBody = session.getMethodRegistry().createConnectionSecureOkBody(response);
+
+ session.writeFrame(secureOkBody.generateFrame(channelId));
}
catch (SaslException e)
{
@@ -70,4 +69,6 @@
}
+
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -48,7 +48,7 @@
import java.util.HashSet;
import java.util.StringTokenizer;
-public class ConnectionStartMethodHandler implements StateAwareMethodListener
+public class ConnectionStartMethodHandler implements StateAwareMethodListener<ConnectionStartBody>
{
private static final Logger _log = LoggerFactory.getLogger(ConnectionStartMethodHandler.class);
@@ -62,15 +62,16 @@
private ConnectionStartMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
- throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionStartBody body, int channelId)
+ throws AMQException
{
_log.debug("public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, "
+ "AMQMethodEvent evt): called");
- ConnectionStartBody body = (ConnectionStartBody) evt.getMethod();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+
- ProtocolVersion pv = new ProtocolVersion((byte) body.versionMajor, (byte) body.versionMinor);
+ ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(), (byte) body.getVersionMinor());
// For the purposes of interop, we can make the client accept the broker's version string.
// If it does, it then internally records the version as being the latest one that it understands.
@@ -83,26 +84,26 @@
if (pv.isSupported())
{
- protocolSession.setProtocolVersion(pv.getMajorVersion(), pv.getMinorVersion());
+ session.setProtocolVersion(pv);
try
{
// Used to hold the SASL mechanism to authenticate with.
String mechanism;
- if (body.mechanisms == null)
+ if (body.getMechanisms()== null)
{
throw new AMQException(null, "mechanism not specified in ConnectionStart method frame", null);
}
else
{
- mechanism = chooseMechanism(body.mechanisms);
+ mechanism = chooseMechanism(body.getMechanisms());
_log.debug("mechanism = " + mechanism);
}
if (mechanism == null)
{
- throw new AMQException(null, "No supported security mechanism found, passed: " + new String(body.mechanisms), null);
+ throw new AMQException(null, "No supported security mechanism found, passed: " + new String(body.getMechanisms()), null);
}
byte[] saslResponse;
@@ -110,7 +111,7 @@
{
SaslClient sc =
Sasl.createSaslClient(new String[] { mechanism }, null, "AMQP", "localhost", null,
- createCallbackHandler(mechanism, protocolSession));
+ createCallbackHandler(mechanism, session));
if (sc == null)
{
throw new AMQException(null, "Client SASL configuration error: no SaslClient could be created for mechanism " + mechanism
@@ -118,21 +119,21 @@
+ " details of how to register non-standard SASL client providers.", null);
}
- protocolSession.setSaslClient(sc);
+ session.setSaslClient(sc);
saslResponse = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null);
}
catch (SaslException e)
{
- protocolSession.setSaslClient(null);
+ session.setSaslClient(null);
throw new AMQException(null, "Unable to create SASL client: " + e, e);
}
- if (body.locales == null)
+ if (body.getLocales() == null)
{
throw new AMQException(null, "Locales is not defined in Connection Start method", null);
}
- final String locales = new String(body.locales, "utf8");
+ final String locales = new String(body.getLocales(), "utf8");
final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
String selectedLocale = null;
if (tokenizer.hasMoreTokens())
@@ -148,23 +149,20 @@
FieldTable clientProperties = FieldTableFactory.newFieldTable();
clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()),
- protocolSession.getClientID());
+ session.getClientID());
clientProperties.setString(new AMQShortString(ClientProperties.product.toString()),
QpidProperties.getProductName());
clientProperties.setString(new AMQShortString(ClientProperties.version.toString()),
QpidProperties.getReleaseVersion());
clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
+
+ ConnectionStartOkBody connectionStartOkBody = session.getMethodRegistry().createConnectionStartOkBody(clientProperties,new AMQShortString(mechanism),saslResponse,new AMQShortString(locales));
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
- protocolSession.getProtocolMajorVersion(), protocolSession.getProtocolMinorVersion(),
- clientProperties, // clientProperties
- new AMQShortString(selectedLocale), // locale
- new AMQShortString(mechanism), // mechanism
- saslResponse)); // response
-
+ session.writeFrame(connectionStartOkBody.generateFrame(channelId));
+
}
catch (UnsupportedEncodingException e)
{
@@ -173,10 +171,10 @@
}
else
{
- _log.error("Broker requested Protocol [" + body.versionMajor + "-" + body.versionMinor
+ _log.error("Broker requested Protocol [" + body.getVersionMajor() + "-" + body.getVersionMinor()
+ "] which is not supported by this version of the client library");
- protocolSession.closeProtocolSession();
+ session.closeProtocolSession();
}
}
@@ -235,4 +233,5 @@
throw new AMQException(null, "Unable to create callback handler: " + e, e);
}
}
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -26,17 +26,13 @@
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConnectionTuneMethodHandler implements StateAwareMethodListener
+public class ConnectionTuneMethodHandler implements StateAwareMethodListener<ConnectionTuneBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ConnectionTuneMethodHandler.class);
@@ -50,48 +46,41 @@
protected ConnectionTuneMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
- throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionTuneBody frame, int channelId)
+ throws AMQException
{
_logger.debug("ConnectionTune frame received");
- ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+ final MethodRegistry methodRegistry = session.getMethodRegistry();
- ConnectionTuneParameters params = protocolSession.getConnectionTuneParameters();
+
+ ConnectionTuneParameters params = session.getConnectionTuneParameters();
if (params == null)
{
params = new ConnectionTuneParameters();
}
- params.setFrameMax(frame.frameMax);
- params.setChannelMax(frame.channelMax);
- params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
- protocolSession.setConnectionTuneParameters(params);
+ params.setFrameMax(frame.getFrameMax());
+ params.setChannelMax(frame.getChannelMax());
+ params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat()));
+ session.setConnectionTuneParameters(params);
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
- protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params, frame.getMajor(), frame.getMinor()));
- String host = protocolSession.getAMQConnection().getVirtualHost();
+ ConnectionTuneOkBody tuneOkBody = methodRegistry.createConnectionTuneOkBody(params.getChannelMax(),
+ params.getFrameMax(),
+ params.getHeartbeat());
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(tuneOkBody.generateFrame(channelId));
+
+ String host = session.getAMQConnection().getVirtualHost();
AMQShortString virtualHost = new AMQShortString("/" + host);
- protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), virtualHost, null, true, frame.getMajor(),
- frame.getMinor()));
- }
+ ConnectionOpenBody openBody = methodRegistry.createConnectionOpenBody(virtualHost,null,true);
- protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities,
- boolean insist, byte major, byte minor)
- {
// Be aware of possible changes to parameter order as versions change.
- return ConnectionOpenBody.createAMQFrame(channel, major, minor, // AMQP version (major, minor)
- capabilities, // capabilities
- insist, // insist
- path); // virtualHost
+ session.writeFrame(openBody.generateFrame(channelId));
}
- protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params, byte major, byte minor)
- {
- // Be aware of possible changes to parameter order as versions change.
- return ConnectionTuneOkBody.createAMQFrame(channel, major, minor, params.getChannelMax(), // channelMax
- params.getFrameMax(), // frameMax
- params.getHeartbeat()); // heartbeat
- }
+
}