You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/16 23:10:08 UTC
svn commit: r496873 - in /incubator/qpid/branches/qpid.0-9/java:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/handler/
client/src/main/java/org/apache/qpid/client/protocol/
common/src/main/java/org/apache/qpi...
Author: kpvdr
Date: Tue Jan 16 14:10:07 2007
New Revision: 496873
URL: http://svn.apache.org/viewvc?view=rev&rev=496873
Log:
Created wiring to client RequestManagers and ResponseManagers, refactored all frame write code to use new write mechanisms.
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=496873&r1=496872&r2=496873
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Jan 16 14:10:07 2007
@@ -59,6 +59,7 @@
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.framing.MessageOkBody;
@@ -481,9 +482,8 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(
- ChannelOpenBody.createAMQFrame(channelId,
- (byte)0, (byte)9, // AMQP version (major, minor)
+ _protocolHandler.syncWrite(channelId,
+ ChannelOpenBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor)
null), // outOfBand
ChannelOpenOkBody.class);
@@ -491,9 +491,8 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(
- MessageQosBody.createAMQFrame(channelId,
- (byte)0, (byte)9, // AMQP version (major, minor)
+ _protocolHandler.syncWrite(channelId,
+ MessageQosBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor)
false, // global
prefetchHigh, // prefetchCount
0), // prefetchSize
@@ -508,7 +507,7 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte)0, (byte)9), TxSelectOkBody.class);
+ _protocolHandler.syncWrite(channelId, TxSelectBody.createMethodBody((byte)0, (byte)9), TxSelectOkBody.class);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=496873&r1=496872&r2=496873
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Jan 16 14:10:07 2007
@@ -68,6 +68,7 @@
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ChannelFlowBody;
@@ -523,7 +524,7 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId, (byte)0, (byte)9), TxCommitOkBody.class);
+ _connection.getProtocolHandler().syncWrite(_channelId, TxCommitBody.createMethodBody((byte)0, (byte)9), TxCommitOkBody.class);
}
catch (AMQException e)
{
@@ -541,8 +542,8 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId, (byte)0, (byte)9), TxRollbackOkBody.class);
+ _connection.getProtocolHandler().syncWrite(_channelId,
+ TxRollbackBody.createMethodBody((byte)0, (byte)9), TxRollbackOkBody.class);
}
catch (AMQException e)
{
@@ -568,13 +569,13 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(),
+ final AMQMethodBody methodBody = ChannelCloseBody.createMethodBody(
(byte)0, (byte)9, // AMQP version (major, minor)
0, // classId
0, // methodId
AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
"JMS client closing channel"); // replyText
- _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
+ _connection.getProtocolHandler().syncWrite(getChannelId(), methodBody, ChannelCloseOkBody.class);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully
@@ -766,9 +767,9 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().writeFrame(MessageRecoverBody.createAMQFrame(_channelId,
- (byte)0, (byte)9, // AMQP version (major, minor)
- false)); // requeue
+ _connection.getProtocolHandler().writeRequest(_channelId,
+ MessageRecoverBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor)
+ false)); // requeue
}
boolean isInRecovery()
@@ -1103,7 +1104,7 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
+ AMQMethodBody methodBody = ExchangeDeclareBody.createMethodBody(
(byte)0, (byte)9, // AMQP version (major, minor)
null, // arguments
false, // autoDelete
@@ -1114,7 +1115,7 @@
false, // passive
0, // ticket
type); // type
- _connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
+ _connection.getProtocolHandler().syncWrite(_channelId, methodBody, ExchangeDeclareOkBody.class);
}
private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler)
@@ -1127,7 +1128,7 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
+ AMQMethodBody methodBody = ExchangeDeclareBody.createMethodBody(
(byte)0, (byte)9, // AMQP version (major, minor)
null, // arguments
false, // autoDelete
@@ -1138,7 +1139,7 @@
false, // passive
0, // ticket
type); // type
- protocolHandler.writeFrame(exchangeDeclare);
+ protocolHandler.writeRequest(_channelId, methodBody);
}
/**
@@ -1162,7 +1163,7 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
+ AMQMethodBody methodBody = QueueDeclareBody.createMethodBody(
(byte)0, (byte)9, // AMQP version (major, minor)
null, // arguments
amqd.isAutoDelete(), // autoDelete
@@ -1173,7 +1174,7 @@
amqd.getQueueName(), // queue
0); // ticket
- protocolHandler.writeFrame(queueDeclare);
+ protocolHandler.writeRequest(_channelId, methodBody);
return amqd.getQueueName();
}
@@ -1182,7 +1183,7 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
+ AMQMethodBody methodBody = QueueBindBody.createMethodBody(
(byte)0, (byte)9, // AMQP version (major, minor)
ft, // arguments
amqd.getExchangeName(), // exchange
@@ -1191,7 +1192,7 @@
amqd.getRoutingKey(), // routingKey
0); // ticket
- protocolHandler.writeFrame(queueBind);
+ protocolHandler.writeRequest(_channelId, methodBody);
}
/**
@@ -1241,7 +1242,7 @@
queueName, // queue
0); // ticket */
- AMQFrame jmsConsume = MessageConsumeBody.createAMQFrame(_channelId,
+ AMQMethodBody methodBody = MessageConsumeBody.createMethodBody(
(byte)0, (byte)9, // AMQP version (major, minor)
tag, // consumerTag
consumer.isExclusive(), // exclusive
@@ -1260,7 +1261,7 @@
protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
}*/
- protocolHandler.syncWrite(jmsConsume,MessageOkBody.class);
+ protocolHandler.syncWrite(_channelId, methodBody, MessageOkBody.class);
}
catch (AMQException e)
{
@@ -1432,14 +1433,14 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId,
+ AMQMethodBody methodBody = QueueDeleteBody.createMethodBody(
(byte)0, (byte)9, // AMQP version (major, minor)
false, // ifEmpty
false, // ifUnused
true, // nowait
queueName, // queue
0); // ticket
- _connection.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+ _connection.getProtocolHandler().syncWrite(_channelId, methodBody, QueueDeleteOkBody.class);
}
catch (AMQException e)
{
@@ -1527,7 +1528,7 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
+ AMQMethodBody methodBody = ExchangeBoundBody.createMethodBody(
(byte)0, (byte)9, // AMQP version (major, minor)
ExchangeDefaults.TOPIC_EXCHANGE_NAME, // exchange
queueName, // queue
@@ -1535,7 +1536,7 @@
AMQMethodEvent response = null;
try
{
- response = _connection.getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+ response = _connection.getProtocolHandler().syncWrite(_channelId, methodBody, ExchangeBoundOkBody.class);
}
catch (AMQException e)
{
@@ -1586,19 +1587,19 @@
* @param multiple if true will acknowledge all messages up to and including the one specified by the
* delivery tag
*/
- public void acknowledgeMessage(long deliveryTag, boolean multiple)
+ public void acknowledgeMessage(long requestId, boolean multiple)
{
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQFrame ackFrame = MessageOkBody.createAMQFrame(_channelId,(byte)0, (byte)9); // AMQP version (major, minor)
+ final AMQMethodBody methodBody = MessageOkBody.createMethodBody((byte)0, (byte)9); // AMQP version (major, minor)
//deliveryTag, // deliveryTag
//multiple); // multiple
if (_logger.isDebugEnabled())
{
- _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
+ _logger.debug("Sending ack for request ID " + requestId + " on channel " + _channelId);
}
- _connection.getProtocolHandler().writeFrame(ackFrame);
+ _connection.getProtocolHandler().writeResponse(_channelId, requestId, methodBody);
}
public int getDefaultPrefetch()
@@ -1755,10 +1756,10 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+ AMQMethodBody methodBody = ChannelFlowBody.createMethodBody(
(byte)0, (byte)9, // AMQP version (major, minor)
false); // active
- _connection.getProtocolHandler().writeFrame(channelFlowFrame);
+ _connection.getProtocolHandler().writeRequest(_channelId, methodBody);
}
private void unsuspendChannel()
@@ -1767,10 +1768,10 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
+ AMQMethodBody methodBody = ChannelFlowBody.createMethodBody(
(byte)0, (byte)9, // AMQP version (major, minor)
true); // active
- _connection.getProtocolHandler().writeFrame(channelFlowFrame);
+ _connection.getProtocolHandler().writeRequest(_channelId, methodBody);
}
public void confirmConsumerCancelled(String consumerTag)
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=496873&r1=496872&r2=496873
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Jan 16 14:10:07 2007
@@ -39,6 +39,7 @@
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MessageCancelBody;
import org.apache.qpid.framing.MessageOkBody;
@@ -452,13 +453,13 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQFrame cancelFrame = MessageCancelBody.createAMQFrame(_channelId,
+ final AMQMethodBody cancelBody = MessageCancelBody.createMethodBody(
(byte)0, (byte)9, // AMQP version (major, minor)
_consumerTag); // consumerTag
try
{
- _protocolHandler.syncWrite(cancelFrame, MessageOkBody.class);
+ _protocolHandler.syncWrite(_channelId, cancelBody, MessageOkBody.class);
}
catch (AMQException e)
{
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=496873&r1=496872&r2=496873
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Tue Jan 16 14:10:07 2007
@@ -137,7 +137,7 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId,
+ AMQPMethodBody methodBody = ExchangeDeclareBody.createMethodBody(
(byte)0, (byte)9, // AMQP version (major, minor)
null, // arguments
false, // autoDelete
@@ -148,7 +148,7 @@
false, // passive
0, // ticket
destination.getExchangeClass()); // type
- _protocolHandler.writeFrame(declare);
+ _protocolHandler.writeRequest(_channelId, methodBody);
}
public void setDisableMessageID(boolean b) throws JMSException
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?view=diff&rev=496873&r1=496872&r2=496873
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Tue Jan 16 14:10:07 2007
@@ -30,6 +30,7 @@
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.protocol.AMQConstant;
@@ -61,8 +62,8 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9);
- protocolSession.writeFrame(frame);
+ AMQMethodBody methodBody = ChannelCloseOkBody.createMethodBody((byte)0, (byte)9);
+ protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
if (errorCode != AMQConstant.REPLY_SUCCESS.getCode())
{
_logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason);
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?view=diff&rev=496873&r1=496872&r2=496873
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Tue Jan 16 14:10:07 2007
@@ -63,7 +63,7 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, (byte)0, (byte)9));
+ protocolSession.writeResponse(0, evt.getRequestId(), ConnectionCloseOkBody.createMethodBody((byte)0, (byte)9));
if (errorCode != 200)
{
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java?view=diff&rev=496873&r1=496872&r2=496873
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java Tue Jan 16 14:10:07 2007
@@ -22,6 +22,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ConnectionSecureOkBody;
import org.apache.qpid.framing.ConnectionSecureBody;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -58,10 +59,9 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(),
- (byte)0, (byte)9, // AMQP version (major, minor)
+ AMQMethodBody methodBody = ConnectionSecureOkBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor)
response); // response
- protocolSession.writeFrame(responseFrame);
+ protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody);
}
catch (SaslException e)
{
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=496873&r1=496872&r2=496873
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Tue Jan 16 14:10:07 2007
@@ -128,12 +128,12 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
- (byte)0, (byte)9, // AMQP version (major, minor)
- clientProperties, // clientProperties
- selectedLocale, // locale
- mechanism, // mechanism
- saslResponse)); // response
+ protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(),
+ ConnectionStartOkBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor)
+ clientProperties, // clientProperties
+ selectedLocale, // locale
+ mechanism, // mechanism
+ saslResponse)); // response
}
catch (UnsupportedEncodingException e)
{
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?view=diff&rev=496873&r1=496872&r2=496873
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Tue Jan 16 14:10:07 2007
@@ -31,6 +31,7 @@
import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
public class ConnectionTuneMethodHandler implements StateAwareMethodListener
@@ -65,29 +66,29 @@
protocolSession.setConnectionTuneParameters(params);
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
- protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params));
- protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), session.getAMQConnection().getVirtualHost(), null, true));
+ protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), createTuneOkMethodBody(params));
+ protocolSession.writeRequest(evt.getChannelId(),
+ createConnectionOpenMethodBody(protocolSession.getAMQConnection().getVirtualHost(), null, true),
+ protocolSession.getStateManager());
}
- protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist)
+ protected AMQMethodBody createConnectionOpenMethodBody(String path, String capabilities, boolean insist)
{
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- return ConnectionOpenBody.createAMQFrame(channel,
- (byte)0, (byte)9, // AMQP version (major, minor)
+ return ConnectionOpenBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor)
capabilities, // capabilities
insist, // insist
path); // virtualHost
}
- protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params)
+ protected AMQMethodBody createTuneOkMethodBody(ConnectionTuneParameters params)
{
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- return ConnectionTuneOkBody.createAMQFrame(channel,
- (byte)0, (byte)9, // AMQP version (major, minor)
+ return ConnectionTuneOkBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor)
params.getChannelMax(), // channelMax
params.getFrameMax(), // frameMax
params.getHeartbeat()); // heartbeat
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=496873&r1=496872&r2=496873
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Tue Jan 16 14:10:07 2007
@@ -40,10 +40,10 @@
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQRequestBody;
+import org.apache.qpid.framing.AMQResponseBody;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -308,48 +308,56 @@
HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody);
- if (frame.bodyFrame instanceof AMQMethodBody)
+ if (frame.bodyFrame instanceof AMQRequestBody)
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Method frame received: " + frame);
- }
-
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, (AMQMethodBody) frame.bodyFrame);
- try
- {
- boolean wasAnyoneInterested = false;
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
- }
- if (!wasAnyoneInterested)
- {
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
- }
- }
- catch (AMQException e)
- {
- it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
- }
- exceptionCaught(session, e);
- }
- }
- else if (frame.bodyFrame instanceof ContentHeaderBody)
- {
- _protocolSession.messageContentHeaderReceived(frame.channel,
- (ContentHeaderBody) frame.bodyFrame);
+ _protocolSession.messageRequestBodyReceived(frame.channel, (AMQRequestBody)frame.bodyFrame);
}
- else if (frame.bodyFrame instanceof ContentBody)
+ else if (frame.bodyFrame instanceof AMQResponseBody)
{
- _protocolSession.messageContentBodyReceived(frame.channel,
- (ContentBody) frame.bodyFrame);
+ _protocolSession.messageResponseBodyReceived(frame.channel, (AMQResponseBody)frame.bodyFrame);
}
+// if (frame.bodyFrame instanceof AMQMethodBody)
+// {
+// if (_logger.isDebugEnabled())
+// {
+// _logger.debug("Method frame received: " + frame);
+// }
+//
+// final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, (AMQMethodBody) frame.bodyFrame);
+// try
+// {
+// boolean wasAnyoneInterested = false;
+// while (it.hasNext())
+// {
+// final AMQMethodListener listener = (AMQMethodListener) it.next();
+// wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+// }
+// if (!wasAnyoneInterested)
+// {
+// throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
+// }
+// }
+// catch (AMQException e)
+// {
+// it = _frameListeners.iterator();
+// while (it.hasNext())
+// {
+// final AMQMethodListener listener = (AMQMethodListener) it.next();
+// listener.error(e);
+// }
+// exceptionCaught(session, e);
+// }
+// }
+// else if (frame.bodyFrame instanceof ContentHeaderBody)
+// {
+// _protocolSession.messageContentHeaderReceived(frame.channel,
+// (ContentHeaderBody) frame.bodyFrame);
+// }
+// else if (frame.bodyFrame instanceof ContentBody)
+// {
+// _protocolSession.messageContentBodyReceived(frame.channel,
+// (ContentBody) frame.bodyFrame);
+// }
else if (frame.bodyFrame instanceof HeartbeatBody)
{
_logger.debug("Received heartbeat");
@@ -402,23 +410,33 @@
{
_protocolSession.writeFrame(frame, wait);
}
+
+ public long writeRequest(int channelNum, AMQMethodBody methodBody)
+ {
+ return _protocolSession.writeRequest(channelNum, methodBody, _protocolSession.getStateManager());
+ }
+
+ public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
+ {
+ _protocolSession.writeResponse(channelNum, requestId, methodBody);
+ }
/**
* Convenience method that writes a frame to the protocol session and waits for
* a particular response. Equivalent to calling getProtocolSession().write() then
* waiting for the response.
*
- * @param frame
+ * @param methodBody
* @param listener the blocking listener. Note the calling thread will block.
*/
- private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
+ private AMQMethodEvent writeCommandFrameAndWaitForReply(int channelNum, AMQMethodBody methodBody,
BlockingMethodFrameListener listener)
throws AMQException
{
try
{
_frameListeners.add(listener);
- _protocolSession.writeFrame(frame);
+ _protocolSession.writeRequest(channelNum, methodBody, listener);
AMQMethodEvent e = listener.blockForFrame();
return e;
@@ -436,10 +454,10 @@
/**
* More convenient method to write a frame and wait for it's response.
*/
- public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException
+ public AMQMethodEvent syncWrite(int channelNum, AMQMethodBody methodBody, Class responseClass) throws AMQException
{
- return writeCommandFrameAndWaitForReply(frame,
- new SpecificMethodFrameListener(frame.channel, responseClass));
+ return writeCommandFrameAndWaitForReply(channelNum, methodBody,
+ new SpecificMethodFrameListener(channelNum, responseClass));
}
/**
@@ -477,13 +495,14 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
+ AMQMethodBody methodBody = ConnectionCloseBody.createMethodBody(
(byte)0, (byte)9, // AMQP version (major, minor)
0, // classId
0, // methodId
AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
"JMS client is closing the connection."); // replyText
- syncWrite(frame, ConnectionCloseOkBody.class);
+
+ syncWrite(0, methodBody, ConnectionCloseOkBody.class);
_protocolSession.closeProtocolSession();
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=496873&r1=496872&r2=496873
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Tue Jan 16 14:10:07 2007
@@ -33,10 +33,15 @@
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQRequestBody;
+import org.apache.qpid.framing.AMQResponseBody;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.RequestManager;
+import org.apache.qpid.framing.ResponseManager;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.AMQProtocolWriter;
import org.apache.commons.lang.StringUtils;
@@ -91,6 +96,9 @@
*/
protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
+ protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap();
+ protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap();
+
/**
* Counter to ensure unique queue names
*/
@@ -234,52 +242,76 @@
{
_channelId2UnprocessedMsgMap.put(message.channelId, message);
}
-
- public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader)
- throws AMQException
+
+ public void messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception
{
- UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
- if (msg == null)
- {
- throw new AMQException("Error: received content header without having received a BasicDeliver frame first");
- }
- if (msg.contentHeader != null)
- {
- throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames");
- }
- msg.contentHeader = contentHeader;
- if (contentHeader.bodySize == 0)
+ if (_logger.isDebugEnabled())
{
- deliverMessageToAMQSession(channelId, msg);
+ _logger.debug("Request frame received: " + requestBody);
}
+ ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(channelId);
+ if (responseManager == null)
+ throw new AMQException("Unable to find ResponseManager for channel " + channelId);
+ responseManager.requestReceived(requestBody);
}
-
- public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException
+
+ public void messageResponseBodyReceived(int channelId, AMQResponseBody responseBody) throws Exception
{
- UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
- if (msg == null)
- {
- throw new AMQException("Error: received content body without having received a JMSDeliver frame first");
- }
- if (msg.contentHeader == null)
- {
- _channelId2UnprocessedMsgMap.remove(channelId);
- throw new AMQException("Error: received content body without having received a ContentHeader frame first");
- }
- try
- {
- msg.receiveBody(contentBody);
- }
- catch (UnexpectedBodyReceivedException e)
+ if (_logger.isDebugEnabled())
{
- _channelId2UnprocessedMsgMap.remove(channelId);
- throw e;
+ _logger.debug("Response frame received: " + responseBody);
}
- if (msg.isAllBodyDataReceived())
- {
- deliverMessageToAMQSession(channelId, msg);
- }
- }
+ RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(channelId);
+ if (requestManager == null)
+ throw new AMQException("Unable to find RequestManager for channel " + channelId);
+ requestManager.responseReceived(responseBody);
+ }
+
+// public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader)
+// throws AMQException
+// {
+// UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+// if (msg == null)
+// {
+// throw new AMQException("Error: received content header without having received a BasicDeliver frame first");
+// }
+// if (msg.contentHeader != null)
+// {
+// throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames");
+// }
+// msg.contentHeader = contentHeader;
+// if (contentHeader.bodySize == 0)
+// {
+// deliverMessageToAMQSession(channelId, msg);
+// }
+// }
+//
+// public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException
+// {
+// UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+// if (msg == null)
+// {
+// throw new AMQException("Error: received content body without having received a JMSDeliver frame first");
+// }
+// if (msg.contentHeader == null)
+// {
+// _channelId2UnprocessedMsgMap.remove(channelId);
+// throw new AMQException("Error: received content body without having received a ContentHeader frame first");
+// }
+// try
+// {
+// msg.receiveBody(contentBody);
+// }
+// catch (UnexpectedBodyReceivedException e)
+// {
+// _channelId2UnprocessedMsgMap.remove(channelId);
+// throw e;
+// }
+// if (msg.isAllBodyDataReceived())
+// {
+// deliverMessageToAMQSession(channelId, msg);
+// }
+// }
/**
* Deliver a message to the appropriate session, removing the unprocessed message
@@ -293,6 +325,31 @@
session.messageReceived(msg);
_channelId2UnprocessedMsgMap.remove(channelId);
}
+
+ public long writeRequest(int channelNum, AMQMethodBody methodBody,
+ AMQMethodListener methodListener)
+ throws AMQException
+ {
+ RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(channelNum);
+ if (requestManager == null)
+ throw new AMQException("Unable to find RequestManager for channel " + channelNum);
+ requestManager.sendRequest(methodBody, methodListener);
+ }
+
+ public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
+ throws AMQException
+ {
+ ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(channelNum);
+ if (responseManager == null)
+ throw new AMQException("Unable to find ResponseManager for channel " + channelNum);
+ responseManager.sendResponse(requestId, methodBody);
+ }
+
+ public void writeResponse(AMQMethodEvent evt, AMQMethodBody response)
+ throws AMQException
+ {
+ writeResponse(evt.getChannelId(), evt.getRequestId(), response);
+ }
/**
* Convenience method that writes a frame to the protocol session. Equivalent
@@ -330,6 +387,17 @@
}
_logger.debug("Add session with channel id " + channelId);
_channelId2SessionMap.put(channelId, session);
+
+ // Add request and response handlers, one per channel, if they do not already exist
+ if (_channelId2RequestMgrMap.get(channelId) == null)
+ {
+ _channelId2RequestMgrMap.put(channelId, new RequestManager(channelId, this));
+ }
+ if (_channelId2ResponseMgrMap.get(channelId) == null)
+ {
+
+ _channelId2ResponseMgrMap.put(channelId, new ResponseManager(channelId, _stateManager, this));
+ }
}
public void removeSessionByChannel(int channelId)
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java?view=diff&rev=496873&r1=496872&r2=496873
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java Tue Jan 16 14:10:07 2007
@@ -24,7 +24,7 @@
public class AMQRequestBody extends AMQBody
{
- public static final byte TYPE = 9;
+ public static final byte TYPE = (byte)AmqpConstants.frameRequestAsInt();
// Fields declared in specification
protected long requestId;
@@ -51,7 +51,7 @@
protected byte getFrameType()
{
- return (byte)AmqpConstants.frameRequestAsInt();
+ return TYPE;
}
protected int getSize()
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java?view=diff&rev=496873&r1=496872&r2=496873
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java Tue Jan 16 14:10:07 2007
@@ -24,7 +24,7 @@
public class AMQResponseBody extends AMQBody
{
- public static final byte TYPE = 10;
+ public static final byte TYPE = (byte)AmqpConstants.frameResponseAsInt();
// Fields declared in specification
protected long responseId;
@@ -51,7 +51,7 @@
protected byte getFrameType()
{
- return (byte)AmqpConstants.frameResponseAsInt();
+ return TYPE;
}
protected int getSize()