You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/03/22 14:15:14 UTC
svn commit: r521253 [8/10] - in /incubator/qpid/branches/java.multi_version:
./ gentools/src/org/apache/qpid/gentools/ gentools/templ.cpp/
gentools/templ.cpp/class/ gentools/templ.cpp/field/
gentools/templ.cpp/method/ gentools/templ.cpp/model/ gentools...
Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Mar 22 06:14:42 2007
@@ -33,6 +33,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
@@ -42,6 +43,8 @@
import org.apache.qpid.framing.BasicCancelBody;
import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQMethodFactory;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
@@ -438,16 +441,10 @@
{
if (sendClose)
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- _consumerTag, // consumerTag
- false); // nowait
-
try
{
- _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+ AMQMethodBody cancelBody = getAMQMethodFactory().createConsumerCancel(_consumerTag);
+ sendCommandReceiveResponse(cancelBody);
}
catch (AMQException e)
{
@@ -467,6 +464,16 @@
}
}
+ private AMQMethodBody sendCommandReceiveResponse(AMQMethodBody cancelBody) throws AMQException
+ {
+ return getSession().sendCommandReceiveResponse(cancelBody);
+ }
+
+ private AMQMethodFactory getAMQMethodFactory()
+ {
+ return getSession().getAMQMethodFactory();
+ }
+
/**
* Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
* vetoed automatic resubscription. The caller must hold the failover mutex.
@@ -490,14 +497,14 @@
if (debug)
{
- _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag);
+ _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().getDeliveryTag());
}
try
{
- AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
- messageFrame.getDeliverBody().redelivered,
- messageFrame.getDeliverBody().exchange,
- messageFrame.getDeliverBody().routingKey,
+ AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().getDeliveryTag(),
+ messageFrame.getDeliverBody().getRedelivered(),
+ messageFrame.getDeliverBody().getExchange(),
+ messageFrame.getDeliverBody().getRoutingKey(),
messageFrame.getContentHeader(),
messageFrame.getBodies());
Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Thu Mar 22 06:14:42 2007
@@ -38,17 +38,11 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.CompositeAMQDataBlock;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
+import org.apache.qpid.framing.*;
public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
@@ -91,7 +85,7 @@
*/
private String _mimeType;
- private AMQProtocolHandler _protocolHandler;
+ private AMQProtocolHandlerImpl _protocolHandler;
/**
* True if this producer was created from a transacted session
@@ -121,7 +115,7 @@
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
+ AMQSession session, AMQProtocolHandlerImpl protocolHandler, long producerId,
boolean immediate, boolean mandatory, boolean waitUntilSent)
{
_connection = connection;
@@ -152,20 +146,28 @@
private void declareDestination(AMQDestination destination)
{
// Declare the exchange
- // Note that the durable and internal arguments are ignored since passive is set to false
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame declare =
- ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), null, // arguments
- false, // autoDelete
- false, // durable
- destination.getExchangeName(), // exchange
- false, // internal
- true, // nowait
- false, // passive
- _session.getTicket(), // ticket
- destination.getExchangeClass()); // type
- _protocolHandler.writeFrame(declare);
+
+ ExchangeDeclareBody exchangeDeclareBody =
+ getAMQMethodFactory().createExchangeDeclare(destination.getExchangeName(),
+ destination.getExchangeClass(),
+ _session.getTicket());
+ sendCommand(exchangeDeclareBody);
+
+ }
+
+ private void sendCommand(AMQMethodBody command)
+ {
+ getSession().sendCommand(command);
+ }
+
+ private AMQMethodBody sendCommandReceiveResponse(AMQMethodBody command) throws AMQException
+ {
+ return getSession().sendCommandReceiveResponse(command);
+ }
+
+ private AMQMethodFactory getAMQMethodFactory()
+ {
+ return getSession().getAMQMethodFactory();
}
public void setDisableMessageID(boolean b) throws JMSException
@@ -467,21 +469,9 @@
message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame publishFrame =
- BasicPublishBody.createAMQFrame(
- _channelId, _protocolHandler.getProtocolMajorVersion(), _protocolHandler.getProtocolMinorVersion(),
- destination.getExchangeName(), // exchange
- immediate, // immediate
- mandatory, // mandatory
- destination.getRoutingKey(), // routingKey
- _session.getTicket()); // ticket
-
message.prepareForSending();
ByteBuffer payload = message.getData();
- BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
+ CommonContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
if (!_disableTimestamps)
{
@@ -501,37 +491,15 @@
contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
contentHeaderProperties.setPriority((byte) priority);
- final int size = (payload != null) ? payload.limit() : 0;
- final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
- final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
+ getSession().getProtocolOutputHandler().publishMessage(getSession().getChannelId(),
+ destination.getExchangeName(),
+ destination.getRoutingKey(),
+ immediate,
+ mandatory,
+ payload,
+ contentHeaderProperties,
+ getSession().getTicket());
- if (payload != null)
- {
- createContentBodies(payload, frames, 2, _channelId);
- }
-
- if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
- {
- _logger.debug("Sending content body frames to " + destination);
- }
-
- // weight argument of zero indicates no child content headers, just bodies
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- AMQFrame contentHeaderFrame =
- ContentHeaderBody.createAMQFrame(_channelId,
- BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion()), 0,
- contentHeaderProperties, size);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Sending content header frame to " + destination);
- }
-
- frames[0] = publishFrame;
- frames[1] = contentHeaderFrame;
- CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
- _protocolHandler.writeFrame(compositeFrame, wait);
if (message != origMessage)
{
@@ -544,6 +512,8 @@
}
}
+
+
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
if (destination instanceof TemporaryDestination)
@@ -564,59 +534,6 @@
}
}
- /**
- * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
- * maximum frame size.
- *
- * @param payload
- * @param frames
- * @param offset
- * @param channelId @return the array of content bodies
- */
- private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
- {
-
- if (frames.length == (offset + 1))
- {
- frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
- }
- else
- {
-
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
- long remaining = payload.remaining();
- for (int i = offset; i < frames.length; i++)
- {
- payload.position((int) framePayloadMax * (i - offset));
- int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
- payload.limit(payload.position() + length);
- frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
-
- remaining -= length;
- }
- }
-
- }
-
- private int calculateContentBodyFrameCount(ByteBuffer payload)
- {
- // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
- // (0xCE byte).
- int frameCount;
- if ((payload == null) || (payload.remaining() == 0))
- {
- frameCount = 0;
- }
- else
- {
- int dataLength = payload.remaining();
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
- int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
- frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
- }
-
- return frameCount;
- }
public void setMimeType(String mimeType) throws JMSException
{
Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Thu Mar 22 06:14:42 2007
@@ -25,7 +25,7 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.IoSession;
import org.apache.qpid.AMQDisconnectedException;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
import org.apache.qpid.client.state.AMQStateManager;
/**
@@ -42,7 +42,7 @@
private static final Logger _logger = Logger.getLogger(FailoverHandler.class);
private final IoSession _session;
- private AMQProtocolHandler _amqProtocolHandler;
+ private AMQProtocolHandlerImpl _amqProtocolHandler;
/**
* Used where forcing the failover host
@@ -54,7 +54,7 @@
*/
private int _port;
- public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session)
+ public FailoverHandler(AMQProtocolHandlerImpl amqProtocolHandler, IoSession session)
{
_amqProtocolHandler = amqProtocolHandler;
_session = session;
Added: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java?view=auto&rev=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java (added)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -0,0 +1,24 @@
+package org.apache.qpid.client.handler.amqp_0_9;
+
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.amqp_0_9.ChannelCloseOkBodyImpl;
+
+import org.apache.log4j.Logger;
+
+public class ChannelCloseMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ChannelCloseMethodHandler
+{
+ private static final Logger _logger = Logger.getLogger(ChannelCloseMethodHandler.class);
+
+ private static ChannelCloseMethodHandler _handler = new ChannelCloseMethodHandler();
+
+ public static ChannelCloseMethodHandler getInstance()
+ {
+ return _handler;
+ }
+
+
+ protected ChannelCloseOkBody createChannelCloseOkBody()
+ {
+ return new ChannelCloseOkBodyImpl();
+ }
+}
Added: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java?view=auto&rev=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java (added)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -0,0 +1,29 @@
+package org.apache.qpid.client.handler.amqp_0_9;
+
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.amqp_0_9.ConnectionCloseOkBodyImpl;
+
+import org.apache.log4j.Logger;
+
+public class ConnectionCloseMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ConnectionCloseMethodHandler
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class);
+
+ private static ConnectionCloseMethodHandler _handler = new ConnectionCloseMethodHandler();
+
+ public static ConnectionCloseMethodHandler getInstance()
+ {
+ return _handler;
+ }
+
+ protected ConnectionCloseMethodHandler()
+ {
+ }
+
+
+ protected ConnectionCloseOkBody createConnectionCloseOkBody()
+ {
+ return new ConnectionCloseOkBodyImpl();
+ }
+
+}
Added: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java?view=auto&rev=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java (added)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -0,0 +1,28 @@
+package org.apache.qpid.client.handler.amqp_0_9;
+
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionSecureOkBodyImpl;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+public class ConnectionSecureMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ConnectionSecureMethodHandler
+{
+ private static final ConnectionSecureMethodHandler _instance = new ConnectionSecureMethodHandler();
+
+ public static ConnectionSecureMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ protected ConnectionSecureOkBody createConnectionSecureOkBody(byte[] response)
+ {
+ return new ConnectionSecureOkBodyImpl(response);
+ }
+}
Added: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java?view=auto&rev=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java (added)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -0,0 +1,41 @@
+package org.apache.qpid.client.handler.amqp_0_9;
+
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.amqp_0_9.ConnectionOpenBodyImpl;
+import org.apache.qpid.framing.amqp_0_9.ConnectionTuneOkBodyImpl;
+
+import org.apache.log4j.Logger;
+
+public class ConnectionTuneMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ConnectionTuneMethodHandler
+{
+ private static final Logger _logger = Logger.getLogger(ConnectionTuneMethodHandler.class);
+
+ private static final ConnectionTuneMethodHandler _instance = new ConnectionTuneMethodHandler();
+
+ public static ConnectionTuneMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+
+ protected ConnectionOpenBody createConnectionOpenBody(AMQShortString path, AMQShortString capabilities, boolean insist)
+ {
+
+ return new ConnectionOpenBodyImpl(path,// virtualHost
+ capabilities, // capabilities
+ insist); // insist
+
+ }
+
+ protected ConnectionTuneOkBody createTuneOkBody(ConnectionTuneParameters params)
+ {
+ // Be aware of possible changes to parameter order as versions change.
+ return new ConnectionTuneOkBodyImpl(
+ params.getChannelMax(), // channelMax
+ params.getFrameMax(), // frameMax
+ params.getHeartbeat()); // heartbeat
+ }
+}
Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -45,10 +45,12 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
+
_logger.debug("New BasicCancelOk method received");
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
- protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);
+ protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.getConsumerTag());
}
}
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -40,8 +40,9 @@
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicDeliverBody) evt.getMethod());
_logger.debug("New JmsDeliver method received");
protocolSession.unprocessedMessageReceived(msg);
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -40,9 +40,10 @@
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
_logger.debug("New JmsBounce method received");
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(),(BasicReturnBody)evt.getMethod());
protocolSession.unprocessedMessageReceived(msg);
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelClosedException;
@@ -29,10 +29,10 @@
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.amqp_8_0.ChannelCloseOkBodyImpl;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -47,21 +47,23 @@
return _handler;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
_logger.debug("ChannelClose method received");
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
- AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
- AMQShortString reason = method.replyText;
+ AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+ AMQShortString reason = method.getReplyText();
if (_logger.isDebugEnabled())
{
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
}
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor());
- protocolSession.writeFrame(frame);
+ protocolSession.getOutputHandler().sendCommand(evt.getChannelId(), createChannelCloseOkBody());
+
+
+
if (errorCode != AMQConstant.REPLY_SUCCESS)
{
if (_logger.isDebugEnabled())
@@ -95,5 +97,10 @@
}
protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
+ }
+
+ protected ChannelCloseOkBody createChannelCloseOkBody()
+ {
+ return new ChannelCloseOkBodyImpl();
}
}
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,11 +18,10 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -38,7 +37,7 @@
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
_logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,11 +18,10 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ChannelFlowOkBody;
@@ -42,9 +41,9 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
ChannelFlowOkBody method = (ChannelFlowOkBody) evt.getMethod();
- _logger.debug("Received Channel.Flow-Ok message, active = " + method.active);
+ _logger.debug("Received Channel.Flow-Ok message, active = " + method.getActive());
}
}
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionClosedException;
@@ -31,6 +31,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionCloseOkBodyImpl;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -45,26 +46,31 @@
return _handler;
}
- private ConnectionCloseMethodHandler()
+ protected ConnectionCloseMethodHandler()
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
_logger.info("ConnectionClose frame received");
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
ConnectionCloseBody method = (ConnectionCloseBody) evt.getMethod();
// does it matter
//stateManager.changeState(AMQState.CONNECTION_CLOSING);
- AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
- AMQShortString reason = method.replyText;
+ AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+ AMQShortString reason = method.getReplyText();
try
{
- // TODO: check whether channel id of zero is appropriate
- // Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short) 0, method.getMajor(), method.getMinor()));
+
+
+
+ ConnectionCloseOkBody closeOkBody = createConnectionCloseOkBody();
+ protocolSession.getOutputHandler().sendCommand(0, closeOkBody);
+
+
if (errorCode != AMQConstant.REPLY_SUCCESS)
{
@@ -97,4 +103,10 @@
stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
}
+
+ protected ConnectionCloseOkBody createConnectionCloseOkBody()
+ {
+ return new ConnectionCloseOkBodyImpl();
+ }
+
}
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,10 +18,9 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
@@ -40,7 +39,7 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
stateManager.changeState(AMQState.CONNECTION_OPEN);
}
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -45,12 +45,13 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
_logger.info("ConnectionRedirect frame received");
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod();
- String host = method.host.toString();
+ String host = method.getHost().toString();
// the host is in the form hostname:port with the port being optional
int portIndex = host.indexOf(':');
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
@@ -27,9 +27,9 @@
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ConnectionSecureBody;
import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionSecureOkBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
public class ConnectionSecureMethodHandler implements StateAwareMethodListener
@@ -41,8 +41,9 @@
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
SaslClient client = protocolSession.getSaslClient();
if (client == null)
{
@@ -54,14 +55,10 @@
try
{
// Evaluate server challenge
- byte[] response = client.evaluateChallenge(body.challenge);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(),
- body.getMajor(), body.getMinor(),
- response); // response
- protocolSession.writeFrame(responseFrame);
+ byte[] response = client.evaluateChallenge(body.getChallenge());
+
+ ConnectionSecureOkBody secureOkBody = createConnectionSecureOkBody(response);
+ protocolSession.getOutputHandler().sendCommand(evt.getChannelId(),secureOkBody);
}
catch (SaslException e)
{
@@ -69,5 +66,10 @@
}
+ }
+
+ protected ConnectionSecureOkBody createConnectionSecureOkBody(byte[] response)
+ {
+ return new ConnectionSecureOkBodyImpl(response);
}
}
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java (from r511389, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java&r1=511389&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import java.io.UnsupportedEncodingException;
import java.util.HashSet;
@@ -45,6 +45,7 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.amqp_8_0.ConnectionStartOkBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
public class ConnectionStartMethodHandler implements StateAwareMethodListener
@@ -61,15 +62,16 @@
private ConnectionStartMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
throws AMQException
{
_log.debug("public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, "
+ "AMQMethodEvent evt): called");
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
ConnectionStartBody body = (ConnectionStartBody) evt.getMethod();
- ProtocolVersion pv = new ProtocolVersion((byte) body.versionMajor,(byte) body.versionMinor);
+ ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(),(byte) body.getVersionMinor());
// For the purposes of interop, we can make the client accept the broker's version string.
@@ -83,26 +85,26 @@
if (pv.isSupported())
{
- protocolSession.setProtocolVersion(pv.getMajorVersion(), pv.getMinorVersion());
+ protocolSession.setProtocolVersion(pv);
try
{
// Used to hold the SASL mechanism to authenticate with.
String mechanism;
- if (body.mechanisms == null)
+ if (body.getMechanisms() == null)
{
throw new AMQException("mechanism not specified in ConnectionStart method frame");
}
else
{
- mechanism = chooseMechanism(body.mechanisms);
+ mechanism = chooseMechanism(body.getMechanisms());
_log.debug("mechanism = " + mechanism);
}
if (mechanism == null)
{
- throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms));
+ throw new AMQException("No supported security mechanism found, passed: " + new String(body.getMechanisms()));
}
byte[] saslResponse;
@@ -128,12 +130,12 @@
throw new AMQException("Unable to create SASL client: " + e, e);
}
- if (body.locales == null)
+ if (body.getLocales() == null)
{
throw new AMQException("Locales is not defined in Connection Start method");
}
- final String locales = new String(body.locales, "utf8");
+ final String locales = new String(body.getLocales(), "utf8");
final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
String selectedLocale = null;
if (tokenizer.hasMoreTokens())
@@ -148,24 +150,19 @@
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
FieldTable clientProperties = FieldTableFactory.newFieldTable();
- clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()),
+ clientProperties.setString(ClientProperties.instance.getName(),
protocolSession.getClientID());
- clientProperties.setString(new AMQShortString(ClientProperties.product.toString()),
+ clientProperties.setString(ClientProperties.product.getName(),
QpidProperties.getProductName());
- clientProperties.setString(new AMQShortString(ClientProperties.version.toString()),
+ clientProperties.setString(ClientProperties.version.getName(),
QpidProperties.getReleaseVersion());
- clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
+ clientProperties.setString(ClientProperties.platform.getName(), getFullSystemInfo());
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
- clientProperties, // clientProperties
+ ConnectionStartOkBody startOkBody = createConnectionStartOkBody(clientProperties, // clientProperties
new AMQShortString(selectedLocale), // locale
new AMQShortString(mechanism), // mechanism
- saslResponse)); // response
+ saslResponse); // response
+ protocolSession.getOutputHandler().sendCommand(0, startOkBody);
}
catch (UnsupportedEncodingException e)
@@ -175,14 +172,19 @@
}
else
{
- _log.error("Broker requested Protocol [" + body.versionMajor + "-" + body.versionMinor
+ _log.error("Broker requested Protocol [" + body.getVersionMajor() + "-" + body.getVersionMinor()
+ "] which is not supported by this version of the client library");
protocolSession.closeProtocolSession();
}
}
-
+ private ConnectionStartOkBody createConnectionStartOkBody(FieldTable clientProperties, AMQShortString locale, AMQShortString mechanism, byte[] saslResponse)
+ {
+ return new ConnectionStartOkBodyImpl(clientProperties,mechanism,saslResponse,locale);
+ }
+
+
private String getFullSystemInfo()
{
StringBuffer fullSystemInfo = new StringBuffer();
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -27,11 +27,12 @@
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionOpenBody;
import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionOpenBodyImpl;
+import org.apache.qpid.framing.amqp_8_0.ConnectionTuneOkBodyImpl;
import org.apache.qpid.protocol.AMQMethodEvent;
public class ConnectionTuneMethodHandler implements StateAwareMethodListener
@@ -49,9 +50,10 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
_logger.debug("ConnectionTune frame received");
+ final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod();
ConnectionTuneParameters params = protocolSession.getConnectionTuneParameters();
@@ -60,36 +62,36 @@
params = new ConnectionTuneParameters();
}
- params.setFrameMax(frame.frameMax);
- params.setChannelMax(frame.channelMax);
- params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
+ params.setFrameMax(frame.getFrameMax());
+ params.setChannelMax(frame.getChannelMax());
+ params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat()));
protocolSession.setConnectionTuneParameters(params);
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
- protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params,frame.getMajor(), frame.getMinor()));
+ protocolSession.getOutputHandler().sendCommand(evt.getChannelId(),
+ createTuneOkBody(params));
String host = protocolSession.getAMQConnection().getVirtualHost();
AMQShortString virtualHost = new AMQShortString("/" + host);
- protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), virtualHost, null, true,frame.getMajor(), frame.getMinor()));
+ protocolSession.getOutputHandler().sendCommand(evt.getChannelId(),
+ createConnectionOpenBody( virtualHost, null, true));
}
- protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist, byte major, byte minor)
+ protected ConnectionOpenBody createConnectionOpenBody(AMQShortString path, AMQShortString capabilities, boolean insist)
{
- // Be aware of possible changes to parameter order as versions change.
- return ConnectionOpenBody.createAMQFrame(channel,
- major, minor, // AMQP version (major, minor)
+
+ return new ConnectionOpenBodyImpl(path,// virtualHost
capabilities, // capabilities
- insist, // insist
- path); // virtualHost
+ insist); // insist
+
}
- protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params, byte major, byte minor)
+ protected ConnectionTuneOkBody createTuneOkBody(ConnectionTuneParameters params)
{
// Be aware of possible changes to parameter order as versions change.
- return ConnectionTuneOkBody.createAMQFrame(channel,
- major, minor,
+ return new ConnectionTuneOkBodyImpl(
params.getChannelMax(), // channelMax
params.getFrameMax(), // frameMax
params.getHeartbeat()); // heartbeat
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -15,11 +15,10 @@
* limitations under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ExchangeBoundOkBody;
@@ -42,13 +41,13 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
if (_logger.isDebugEnabled())
{
ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod();
- _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: " +
- body.replyText);
+ _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.getReplyCode() + " text: " +
+ body.getReplyText());
}
}
}
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -15,11 +15,10 @@
* limitations under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.QueueDeleteOkBody;
@@ -42,12 +41,12 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
if (_logger.isDebugEnabled())
{
QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod();
- _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount);
+ _logger.debug("Received Queue.Delete-Ok message, message count: " + body.getMessageCount());
}
}
}
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=auto&rev=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (added)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Mar 22 06:14:42 2007
@@ -0,0 +1,36 @@
+package org.apache.qpid.client.protocol;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.state.AMQStateManager;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: U146758
+ * Date: 07-Mar-2007
+ * Time: 19:40:08
+ * To change this template use File | Settings | File Templates.
+ */
+public interface AMQProtocolHandler
+{
+ void writeFrame(AMQDataBlock frame);
+
+ void closeSession(AMQSession session) throws AMQException;
+
+ void closeConnection() throws AMQException;
+
+ AMQConnection getConnection();
+
+ AMQStateManager getStateManager();
+
+ AMQProtocolSession getProtocolSession();
+
+ ProtocolOutputHandler getOutputHandler();
+
+ ProtocolVersion getProtocolVersion();
+}
Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java Thu Mar 22 06:14:42 2007
@@ -43,26 +43,16 @@
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.pool.ReadWriteThreadModel;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.ssl.SSLContextFactory;
-public class AMQProtocolHandler extends IoHandlerAdapter
+public class AMQProtocolHandlerImpl extends IoHandlerAdapter implements AMQProtocolHandler
{
- private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
+ private static final Logger _logger = Logger.getLogger(AMQProtocolHandlerImpl.class);
/**
* The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances
@@ -75,7 +65,7 @@
private AMQStateManager _stateManager = new AMQStateManager();
- private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
+
/**
* We create the failover handler when the session is created since it needs a reference to the IoSession in order
@@ -93,8 +83,9 @@
private CountDownLatch _failoverLatch;
private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
+ private static final int CONTROL_CHANNEL = 0;
- public AMQProtocolHandler(AMQConnection con)
+ public AMQProtocolHandlerImpl(AMQConnection con)
{
_connection = con;
}
@@ -125,25 +116,21 @@
}
- try
- {
- ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
- threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
- threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
- }
- catch (RuntimeException e)
- {
- e.printStackTrace();
- }
+ ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
+ threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
+ threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
+
_protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
+
+ // This starts the AMQP initiation by sending the AMQP Header
_protocolSession.init();
}
public void sessionOpened(IoSession session) throws Exception
{
- //System.setProperty("foo", "bar");
+
}
/**
@@ -272,15 +259,8 @@
public void propagateExceptionToWaiters(Exception e)
{
getStateManager().error(e);
- if (!_frameListeners.isEmpty())
- {
- final Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener ml = (AMQMethodListener) it.next();
- ml.error(e);
- }
- }
+ getProtocolSession().getOutputHandler().error(e);
+
}
private static int _messageReceivedCount;
@@ -290,7 +270,7 @@
final boolean debug = _logger.isDebugEnabled();
final long msgNumber = ++_messageReceivedCount;
- if (debug && (msgNumber % 1000 == 0))
+ if (debug && (msgNumber % 1000 == CONTROL_CHANNEL))
{
_logger.debug("Received " + _messageReceivedCount + " protocol messages");
}
@@ -303,45 +283,30 @@
switch (bodyFrame.getFrameType())
{
- case AMQMethodBody.TYPE:
+ case AMQMethodBodyImpl.TYPE:
if (debug)
{
_logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
}
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
+ final AMQMethodEvent<? extends AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
try
{
boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
- }
- }
+ wasAnyoneInterested = getProtocolSession().getOutputHandler().methodReceived(evt) || wasAnyoneInterested;
+
if (!wasAnyoneInterested)
{
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
}
}
catch (AMQException e)
{
getStateManager().error(e);
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
- }
- }
+ getProtocolSession().getOutputHandler().error(e);
exceptionCaught(session, e);
}
break;
@@ -380,7 +345,7 @@
final boolean debug = _logger.isDebugEnabled();
- if (debug && (sentMessages % 1000 == 0))
+ if (debug && (sentMessages % 1000 == CONTROL_CHANNEL))
{
_logger.debug("Sent " + _messagesOut + " protocol messages");
}
@@ -418,70 +383,7 @@
_protocolSession.writeFrame(frame);
}
- public void writeFrame(AMQDataBlock frame, boolean wait)
- {
- _protocolSession.writeFrame(frame, wait);
- }
-
- /**
- * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
- * calling getProtocolSession().write() then waiting for the response.
- *
- * @param frame
- * @param listener the blocking listener. Note the calling thread will block.
- */
- public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
- BlockingMethodFrameListener listener)
- throws AMQException
- {
- return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
- }
-
- /**
- * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
- * calling getProtocolSession().write() then waiting for the response.
- *
- * @param frame
- * @param listener the blocking listener. Note the calling thread will block.
- */
- public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
- BlockingMethodFrameListener listener, long timeout)
- throws AMQException
- {
- try
- {
- _frameListeners.add(listener);
- _protocolSession.writeFrame(frame);
-
- AMQMethodEvent e = listener.blockForFrame(timeout);
- return e;
- // When control resumes before this line, a reply will have been received
- // that matches the criteria defined in the blocking listener
- }
- catch (AMQException e)
- {
- throw e;
- }
- finally
- {
- // If we don't removeKey the listener then no-one will
- _frameListeners.remove(listener);
- }
-
- }
-
- /** More convenient method to write a frame and wait for it's response. */
- public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException
- {
- return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT);
- }
- /** More convenient method to write a frame and wait for it's response. */
- public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException
- {
- return writeCommandFrameAndWaitForReply(frame,
- new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
- }
/**
* Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol
@@ -519,20 +421,13 @@
{
getStateManager().changeState(AMQState.CONNECTION_CLOSING);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
- _protocolSession.getProtocolMajorVersion(),
- _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection.")); // replyText
try
{
- syncWrite(frame, ConnectionCloseOkBody.class, timeout);
+ ConnectionCloseBody closeBody = getAMQMethodFactory().createConnectionClose();
+ sendCommandReceiveResponse(CONTROL_CHANNEL,closeBody);
+
+
_protocolSession.closeProtocolSession();
}
catch (AMQTimeoutException e)
@@ -543,6 +438,16 @@
}
+ private void sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException
+ {
+ getOutputHandler().sendCommandReceiveResponse(channelId, command);
+ }
+
+ private AMQMethodFactory getAMQMethodFactory()
+ {
+ return getOutputHandler().getAMQMethodFactory();
+ }
+
/** @return the number of bytes read from this protocol session */
public long getReadBytes()
{
@@ -607,6 +512,11 @@
return _protocolSession;
}
+ public ProtocolOutputHandler getOutputHandler()
+ {
+ return getProtocolSession().getOutputHandler();
+ }
+
FailoverState getFailoverState()
{
return _failoverState;
@@ -617,14 +527,11 @@
_failoverState = failoverState;
}
- public byte getProtocolMajorVersion()
+ public ProtocolVersion getProtocolVersion()
{
- return _protocolSession.getProtocolMajorVersion();
+ return _protocolSession.getProtocolVersion();
}
- public byte getProtocolMinorVersion()
- {
- return _protocolSession.getProtocolMinorVersion();
- }
+
}
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java
------------------------------------------------------------------------------
svn:keywords = Rev Date