You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/09/05 21:54:25 UTC
svn commit: r573039 -
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/
Author: rajith
Date: Wed Sep 5 12:54:24 2007
New Revision: 573039
URL: http://svn.apache.org/viewvc?rev=573039&view=rev
Log:
Separated 0-8 functionality from the AMQSession,/BasicMessageProducer and BasicMessageConsumer and made them abstract
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=573039&r1=573038&r2=573039&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Wed Sep 5 12:54:24 2007
@@ -535,7 +535,7 @@
// open it, so that there is no window where we could receive data on the channel and not be set
// up to handle it appropriately.
AMQSession session =
- new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
+ new AMQSession_0_8(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
prefetchLow);
// _protocolHandler.addSessionByChannel(channelId, session);
registerSession(channelId, session);
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=573039&r1=573038&r2=573039&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Sep 5 12:54:24 2007
@@ -21,6 +21,41 @@
package org.apache.qpid.client;
+import java.io.Serializable;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
@@ -129,7 +164,7 @@
* @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
* after looking at worse bottlenecks first.
*/
-public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
+public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -174,16 +209,16 @@
public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
/** The connection to which this session belongs. */
- private AMQConnection _connection;
+ protected AMQConnection _connection;
/** Used to indicate whether or not this is a transactional session. */
- private boolean _transacted;
+ protected boolean _transacted;
/** Holds the sessions acknowledgement mode. */
- private int _acknowledgeMode;
+ protected int _acknowledgeMode;
/** Holds this session unique identifier, used to distinguish it from other sessions. */
- private int _channelId;
+ protected int _channelId;
/** @todo This does not appear to be set? */
private int _ticket;
@@ -231,7 +266,7 @@
private Dispatcher _dispatcher;
/** Holds the message factory factory for this session. */
- private MessageFactoryRegistry _messageFactoryRegistry;
+ protected MessageFactoryRegistry _messageFactoryRegistry;
/** Holds all of the producers created by this session, keyed by their unique identifiers. */
private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
@@ -246,7 +281,7 @@
* Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
* consumer.
*/
- private Map<AMQShortString, BasicMessageConsumer> _consumers =
+ protected Map<AMQShortString, BasicMessageConsumer> _consumers =
new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
/** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
@@ -428,19 +463,7 @@
*
* @todo Be aware of possible changes to parameter order as versions change.
*/
- public void acknowledgeMessage(long deliveryTag, boolean multiple)
- {
- final AMQFrame ackFrame =
- BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
- multiple);
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
- }
-
- getProtocolHandler().writeFrame(ackFrame);
- }
+ public abstract void acknowledgeMessage(long deliveryTag, boolean multiple);
/**
* Binds the named queue, with the specified routing key, to the named exchange.
@@ -466,22 +489,15 @@
{
public Object execute() throws AMQException, FailoverException
{
- AMQFrame queueBind =
- QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- arguments, // arguments
- exchangeName, // exchange
- false, // nowait
- queueName, // queue
- routingKey, // routingKey
- getTicket()); // ticket
-
- getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
-
+ sendQueueBind(queueName,routingKey,arguments,exchangeName);
return null;
}
}, _connection).execute();
}
+ public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
+ final AMQShortString exchangeName) throws AMQException, FailoverException;
+
/**
* Closes the session.
@@ -525,20 +541,7 @@
try
{
-
- getProtocolHandler().closeSession(this);
-
- final AMQFrame frame =
- ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client closing channel")); // replyText
-
- getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
-
- // When control resumes at this point, a reply will have been received that
- // indicates the broker has closed the channel successfully.
+ sendClose(timeout);
}
catch (AMQException e)
{
@@ -562,6 +565,8 @@
}
}
+ public abstract void sendClose(long timeout) throws AMQException, FailoverException;
+
/**
* Called when the server initiates the closure of the session unilaterally.
*
@@ -620,10 +625,7 @@
}
// Commits outstanding messages sent and outstanding acknowledgements.
- final AMQProtocolHandler handler = getProtocolHandler();
-
- handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()),
- TxCommitOkBody.class);
+ sendCommit();
}
catch (AMQException e)
{
@@ -635,6 +637,8 @@
}
}
+ public abstract void sendCommit() throws AMQException, FailoverException;
+
public void confirmConsumerCancelled(AMQShortString consumerTag)
{
@@ -968,24 +972,14 @@
{
public Object execute() throws AMQException, FailoverException
{
- AMQFrame queueDeclare =
- QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- null, // arguments
- autoDelete, // autoDelete
- durable, // durable
- exclusive, // exclusive
- false, // nowait
- false, // passive
- name, // queue
- getTicket()); // ticket
-
- getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
-
+ sendCreateQueue(name, autoDelete,durable,exclusive);
return null;
}
}, _connection).execute();
}
+ public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
+ final boolean exclusive)throws AMQException, FailoverException;
/**
* Creates a QueueReceiver
*
@@ -1356,20 +1350,7 @@
_dispatcher.rollback();
}
- if (isStrictAMQP())
- {
- // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
- _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue
- _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
- }
- else
- {
-
- _connection.getProtocolHandler().syncWrite(
- BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue
- , BasicRecoverOkBody.class);
- }
+ sendRecover();
if (!isSuspended)
{
@@ -1386,6 +1367,8 @@
}
}
+ public abstract void sendRecover() throws AMQException, FailoverException;
+
public void rejectMessage(UnprocessedMessage message, boolean requeue)
{
@@ -1408,22 +1391,7 @@
}
- public void rejectMessage(long deliveryTag, boolean requeue)
- {
- if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED))
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Rejecting delivery tag:" + deliveryTag);
- }
-
- AMQFrame basicRejectBody =
- BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
- requeue);
-
- _connection.getProtocolHandler().writeFrame(basicRejectBody);
- }
- }
+ public abstract void rejectMessage(long deliveryTag, boolean requeue);
/**
* Commits all messages done in this transaction and releases any locks currently held.
@@ -1458,8 +1426,7 @@
_dispatcher.rollback();
}
- _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+ sendRollback();
if (!isSuspended)
{
@@ -1477,6 +1444,13 @@
}
}
+ public void sendRollback() throws AMQException, FailoverException
+ {
+ _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+
+ }
+
public void run()
{
throw new java.lang.UnsupportedOperationException();
@@ -1591,7 +1565,6 @@
AMQDestination amqd = (AMQDestination) destination;
- final AMQProtocolHandler protocolHandler = getProtocolHandler();
// TODO: Define selectors in AMQP
// TODO: construct the rawSelector from the selector string if rawSelector == null
final FieldTable ft = FieldTableFactory.newFieldTable();
@@ -1602,10 +1575,8 @@
ft.addAll(rawSelector);
}
- BasicMessageConsumer consumer =
- new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal,
- _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow,
- exclusive, _acknowledgeMode, noConsume, autoClose);
+ BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh,prefetchLow,
+ noLocal,exclusive, messageSelector, ft, noConsume, autoClose);
if (_messageListener != null)
{
@@ -1653,6 +1624,10 @@
}, _connection).execute();
}
+ public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+ final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
+ final boolean noConsume, final boolean autoClose);
+
/**
* Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
* instance.
@@ -1710,38 +1685,8 @@
*
* @todo Be aware of possible changes to parameter order as versions change.
*/
- boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
- throws JMSException
- {
- try
- {
- AMQMethodEvent response =
- new FailoverRetrySupport<AMQMethodEvent, AMQException>(
- new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
- {
- public AMQMethodEvent execute() throws AMQException, FailoverException
- {
- AMQFrame boundFrame =
- ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(),
- getProtocolMinorVersion(), exchangeName, // exchange
- queueName, // queue
- routingKey); // routingKey
-
- return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
-
- }
- }, _connection).execute();
-
- // Extract and return the response code from the query.
- ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
-
- return (responseBody.replyCode == 0);
- }
- catch (AMQException e)
- {
- throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
- }
- }
+ public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+ throws JMSException;
/**
* Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
@@ -2048,50 +1993,15 @@
private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
{
- // need to generate a consumer tag on the client so we can exploit the nowait flag
+ //need to generate a consumer tag on the client so we can exploit the nowait flag
AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
-
- FieldTable arguments = FieldTableFactory.newFieldTable();
- if ((messageSelector != null) && !messageSelector.equals(""))
- {
- arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
- }
-
- if (consumer.isAutoClose())
- {
- arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
- }
-
- if (consumer.isNoConsume())
- {
- arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
- }
-
consumer.setConsumerTag(tag);
// we must register the consumer in the map before we actually start listening
_consumers.put(tag, consumer);
try
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame jmsConsume =
- BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
- tag, // consumerTag
- consumer.isExclusive(), // exclusive
- consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
- consumer.isNoLocal(), // noLocal
- nowait, // nowait
- queueName, // queue
- getTicket()); // ticket
-
- if (nowait)
- {
- protocolHandler.writeFrame(jmsConsume);
- }
- else
- {
- protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
- }
+ sendConsume(consumer,queueName,protocolHandler,nowait,messageSelector,tag);
}
catch (AMQException e)
{
@@ -2101,6 +2011,9 @@
}
}
+ public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName,
+ AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector,AMQShortString tag) throws AMQException, FailoverException;
+
private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
@@ -2117,9 +2030,8 @@
{
checkNotClosed();
long producerId = getNextProducerId();
- BasicMessageProducer producer =
- new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId,
- AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+ BasicMessageProducer producer = createMessageProducer(destination, mandatory,
+ immediate, waitUntilSent, producerId);
registerProducer(producerId, producer);
return producer;
@@ -2127,6 +2039,9 @@
}, _connection).execute();
}
+ public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+ final boolean immediate, final boolean waitUntilSent, long producerId);
+
private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
{
declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
@@ -2147,31 +2062,22 @@
* @todo Be aware of possible changes to parameter order as versions change.
*/
private void declareExchange(final AMQShortString name, final AMQShortString type,
- final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
+ final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
{
new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- AMQFrame exchangeDeclare =
- ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- null, // arguments
- false, // autoDelete
- false, // durable
- name, // exchange
- false, // internal
- nowait, // nowait
- false, // passive
- getTicket(), // ticket
- type); // type
-
- protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
-
+ sendExchangeDeclare(name, type, protocolHandler, nowait);
return null;
}
}, _connection).execute();
}
+ public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
+ final boolean nowait) throws AMQException, FailoverException;
+
+
/**
* Declares a queue for a JMS destination.
*
@@ -2208,24 +2114,15 @@
amqd.setQueueName(protocolHandler.generateQueueName());
}
- AMQFrame queueDeclare =
- QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- null, // arguments
- amqd.isAutoDelete(), // autoDelete
- amqd.isDurable(), // durable
- amqd.isExclusive(), // exclusive
- false, // nowait
- false, // passive
- amqd.getAMQQueueName(), // queue
- getTicket()); // ticket
-
- protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ sendQueueDeclare(amqd,protocolHandler);
return amqd.getAMQQueueName();
}
}, _connection).execute();
}
+ public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)throws AMQException, FailoverException;
+
/**
* Undeclares the specified queue.
*
@@ -2245,16 +2142,7 @@
{
public Object execute() throws AMQException, FailoverException
{
- AMQFrame queueDeleteFrame =
- QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- false, // ifEmpty
- false, // ifUnused
- true, // nowait
- queueName, // queue
- getTicket()); // ticket
-
- getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
-
+ sendQueueDelete(queueName);
return null;
}
}, _connection).execute();
@@ -2265,22 +2153,24 @@
}
}
+ public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException;
+
private long getNextProducerId()
{
return ++_nextProducerId;
}
- private AMQProtocolHandler getProtocolHandler()
+ protected AMQProtocolHandler getProtocolHandler()
{
return _connection.getProtocolHandler();
}
- private byte getProtocolMajorVersion()
+ protected byte getProtocolMajorVersion()
{
return getProtocolHandler().getProtocolMajorVersion();
}
- private byte getProtocolMinorVersion()
+ protected byte getProtocolMinorVersion()
{
return getProtocolHandler().getProtocolMinorVersion();
}
@@ -2538,12 +2428,7 @@
}
_suspended = suspend;
-
- AMQFrame channelFlowFrame =
- ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
- !suspend);
-
- _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
+ sendSuspendChannel(suspend);
}
catch (FailoverException e)
{
@@ -2551,6 +2436,8 @@
}
}
}
+
+ public abstract void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException;
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=573039&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Wed Sep 5 12:54:24 2007
@@ -0,0 +1,347 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicAckBody;
+import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.BasicRecoverOkBody;
+import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ExchangeBoundBody;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.TxCommitBody;
+import org.apache.qpid.framing.TxCommitOkBody;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQSession_0_8 extends AMQSession
+{
+
+ /** Used for debugging. */
+ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
+
+ /**
+ * Creates a new session on a connection.
+ *
+ * @param con The connection on which to create the session.
+ * @param channelId The unique identifier for the session.
+ * @param transacted Indicates whether or not the session is transactional.
+ * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param messageFactoryRegistry The message factory factory for the session.
+ * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
+ * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
+ */
+ AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+ MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+ {
+
+ super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
+ }
+
+ /**
+ * Creates a new session on a connection with the default message factory factory.
+ *
+ * @param con The connection on which to create the session.
+ * @param channelId The unique identifier for the session.
+ * @param transacted Indicates whether or not the session is transactional.
+ * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
+ * @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
+ */
+ AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
+ int defaultPrefetchLow)
+ {
+ this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
+ defaultPrefetchLow);
+ }
+
+ public void acknowledgeMessage(long deliveryTag, boolean multiple)
+ {
+ final AMQFrame ackFrame =
+ BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
+ multiple);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
+ }
+
+ getProtocolHandler().writeFrame(ackFrame);
+ }
+
+ public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
+ final AMQShortString exchangeName) throws AMQException, FailoverException
+ {
+ AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
+ exchangeName, // exchange
+ false, // nowait
+ queueName, // queue
+ routingKey, // routingKey
+ getTicket()); // ticket
+
+ getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
+ }
+
+ public void sendClose(long timeout) throws AMQException, FailoverException
+ {
+ getProtocolHandler().closeSession(this);
+
+ final AMQFrame frame = ChannelCloseBody.createAMQFrame
+ (getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client closing channel")); // replyText
+
+ getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
+ // When control resumes at this point, a reply will have been received that
+ // indicates the broker has closed the channel successfully.
+ }
+
+ public void sendCommit() throws AMQException, FailoverException
+ {
+ final AMQProtocolHandler handler = getProtocolHandler();
+
+ handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxCommitOkBody.class);
+ }
+
+ public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive) throws AMQException,
+ FailoverException
+ {
+ AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), null, // arguments
+ autoDelete, // autoDelete
+ durable, // durable
+ exclusive, // exclusive
+ false, // nowait
+ false, // passive
+ name, // queue
+ getTicket()); // ticket
+
+ getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ }
+
+ public void sendRecover() throws AMQException, FailoverException
+ {
+ if (isStrictAMQP())
+ {
+ // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
+ _connection.getProtocolHandler().writeFrame(
+ BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue
+ _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
+ }
+ else
+ {
+
+ _connection.getProtocolHandler().syncWrite(
+ BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue
+ , BasicRecoverOkBody.class);
+ }
+ }
+
+ public void rejectMessage(long deliveryTag, boolean requeue)
+ {
+ if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED))
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting delivery tag:" + deliveryTag);
+ }
+
+ AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
+ requeue);
+
+ _connection.getProtocolHandler().writeFrame(basicRejectBody);
+ }
+ }
+
+ public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+ throws JMSException
+ {
+ try
+ {
+ AMQMethodEvent response = new FailoverRetrySupport<AMQMethodEvent, AMQException>(
+ new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
+ {
+ public AMQMethodEvent execute() throws AMQException, FailoverException
+ {
+ AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+ exchangeName, // exchange
+ queueName, // queue
+ routingKey); // routingKey
+
+ return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+
+ }
+ }, _connection).execute();
+
+ // Extract and return the response code from the query.
+ ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
+
+ return (responseBody.replyCode == 0);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
+ }
+ }
+
+ public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait,
+ String messageSelector, AMQShortString tag) throws AMQException, FailoverException
+ {
+
+ FieldTable arguments = FieldTableFactory.newFieldTable();
+ if ((messageSelector != null) && !messageSelector.equals(""))
+ {
+ arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
+ }
+
+ if (consumer.isAutoClose())
+ {
+ arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
+ }
+
+ if (consumer.isNoConsume())
+ {
+ arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
+ }
+
+ consumer.setConsumerTag(tag);
+ // we must register the consumer in the map before we actually start listening
+ _consumers.put(tag, consumer);
+ // TODO: Be aware of possible changes to parameter order as versions change.
+ AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
+ tag, // consumerTag
+ consumer.isExclusive(), // exclusive
+ consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
+ consumer.isNoLocal(), // noLocal
+ nowait, // nowait
+ queueName, // queue
+ getTicket()); // ticket
+
+ if (nowait)
+ {
+ protocolHandler.writeFrame(jmsConsume);
+ }
+ else
+ {
+ protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
+ }
+ }
+
+ public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
+ final boolean nowait) throws AMQException, FailoverException
+ {
+ AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), null, // arguments
+ false, // autoDelete
+ false, // durable
+ name, // exchange
+ false, // internal
+ nowait, // nowait
+ false, // passive
+ getTicket(), // ticket
+ type); // type
+
+ protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ }
+
+ public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
+ {
+ AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), null, // arguments
+ amqd.isAutoDelete(), // autoDelete
+ amqd.isDurable(), // durable
+ amqd.isExclusive(), // exclusive
+ false, // nowait
+ false, // passive
+ amqd.getAMQQueueName(), // queue
+ getTicket()); // ticket
+
+ protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ }
+
+ public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
+ {
+ AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false, // ifEmpty
+ false, // ifUnused
+ true, // nowait
+ queueName, // queue
+ getTicket()); // ticket
+
+ getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+ }
+
+ public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException
+ {
+ AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), !suspend);
+
+ _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
+ }
+
+ public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+ final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft,
+ final boolean noConsume, final boolean autoClose)
+ {
+
+ final AMQProtocolHandler protocolHandler = getProtocolHandler();
+ return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal,
+ _messageFactoryRegistry,this, protocolHandler, ft, prefetchHigh, prefetchLow,
+ exclusive, _acknowledgeMode, noConsume, autoClose);
+ }
+
+
+ public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+ final boolean immediate, final boolean waitUntilSent, long producerId)
+ {
+
+ return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId,
+ this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+ }
+}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=573039&r1=573038&r2=573039&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Wed Sep 5 12:54:24 2007
@@ -20,37 +20,31 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.failover.FailoverException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class BasicMessageConsumer extends Closeable implements MessageConsumer
+public abstract class BasicMessageConsumer extends Closeable implements MessageConsumer
{
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
@@ -69,10 +63,10 @@
private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
/** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
- private AMQShortString _consumerTag;
+ protected AMQShortString _consumerTag;
/** We need to know the channel id when constructing frames */
- private int _channelId;
+ protected int _channelId;
/**
* Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
@@ -84,7 +78,7 @@
private final AMQSession _session;
- private AMQProtocolHandler _protocolHandler;
+ protected AMQProtocolHandler _protocolHandler;
/** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */
private FieldTable _rawSelectorFieldTable;
@@ -482,29 +476,7 @@
if (sendClose)
{
// TODO: Be aware of possible changes to parameter order as versions change.
- final AMQFrame cancelFrame =
- BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
- false); // nowait
-
- try
- {
- _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("CancelOk'd for consumer:" + debugIdentity());
- }
-
- }
- catch (AMQException e)
- {
- throw new JMSAMQException("Error closing consumer: " + e, e);
- }
- catch (FailoverException e)
- {
- throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
- }
+ sendCancel();
}
else
{
@@ -527,6 +499,8 @@
}
}
}
+
+ public abstract void sendCancel() throws JMSAMQException;
/**
* Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=573039&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Wed Sep 5 12:54:24 2007
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.FieldTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BasicMessageConsumer_0_8 extends BasicMessageConsumer
+{
+ protected final Logger _logger = LoggerFactory.getLogger(getClass());
+
+ protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
+ String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ {
+ super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
+ protocolHandler, rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive,
+ acknowledgeMode, noConsume, autoClose);
+ }
+
+ public void sendCancel() throws JMSAMQException
+ {
+ final AMQFrame cancelFrame =
+ BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
+ false); // nowait
+
+ try
+ {
+ _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("CancelOk'd for consumer:" + debugIdentity());
+ }
+
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException("Error closing consumer: " + e, e);
+ }
+ catch (FailoverException e)
+ {
+ throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
+ }
+ }
+}
\ No newline at end of file
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=573039&r1=573038&r2=573039&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Wed Sep 5 12:54:24 2007
@@ -20,23 +20,8 @@
*/
package org.apache.qpid.client;
-import org.apache.mina.common.ByteBuffer;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.MessageConverter;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.CompositeAMQDataBlock;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.UnsupportedEncodingException;
+import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
@@ -51,10 +36,15 @@
import javax.jms.TextMessage;
import javax.jms.Topic;
-import java.io.UnsupportedEncodingException;
-import java.util.UUID;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageConverter;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.ContentBody;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
+public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
protected final Logger _logger = LoggerFactory.getLogger(getClass());
@@ -63,7 +53,7 @@
/**
* If true, messages will not get a timestamp.
*/
- private boolean _disableTimestamps;
+ protected boolean _disableTimestamps;
/**
* Priority of messages created by this producer.
@@ -95,14 +85,14 @@
*/
private String _mimeType;
- private AMQProtocolHandler _protocolHandler;
+ protected AMQProtocolHandler _protocolHandler;
/**
* True if this producer was created from a transacted session
*/
private boolean _transacted;
- private int _channelId;
+ protected int _channelId;
/**
* This is an id generated by the session and is used to tie individual producers to the session. This means we
@@ -115,7 +105,7 @@
/**
* The session used to create this producer
*/
- private AMQSession _session;
+ protected AMQSession _session;
private final boolean _immediate;
@@ -156,24 +146,7 @@
}
}
- private void declareDestination(AMQDestination destination)
- {
- // Declare the exchange
- // Note that the durable and internal arguments are ignored since passive is set to false
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame declare =
- ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), null, // arguments
- false, // autoDelete
- false, // durable
- destination.getExchangeName(), // exchange
- false, // internal
- true, // nowait
- false, // passive
- _session.getTicket(), // ticket
- destination.getExchangeClass()); // type
- _protocolHandler.writeFrame(declare);
- }
+ public abstract void declareDestination(AMQDestination destination);
public void setDisableMessageID(boolean b) throws JMSException
{
@@ -485,81 +458,13 @@
message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame publishFrame =
- BasicPublishBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), // exchange
- immediate, // immediate
- mandatory, // mandatory
- destination.getRoutingKey(), // routingKey
- _session.getTicket()); // ticket
-
- message.prepareForSending();
- ByteBuffer payload = message.getData();
- BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
-
- if (!_disableTimestamps)
- {
- final long currentTime = System.currentTimeMillis();
- contentHeaderProperties.setTimestamp(currentTime);
-
- if (timeToLive > 0)
- {
- contentHeaderProperties.setExpiration(currentTime + timeToLive);
- }
- else
- {
- contentHeaderProperties.setExpiration(0);
- }
- }
-
- contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
- contentHeaderProperties.setPriority((byte) priority);
-
- final int size = (payload != null) ? payload.limit() : 0;
- final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
- final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
-
- if (payload != null)
- {
- createContentBodies(payload, frames, 2, _channelId);
- }
-
- if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
- {
- _logger.debug("Sending content body frames to " + destination);
- }
-
- // weight argument of zero indicates no child content headers, just bodies
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- AMQFrame contentHeaderFrame =
- ContentHeaderBody.createAMQFrame(_channelId,
- BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Sending content header frame to " + destination);
- }
-
- frames[0] = publishFrame;
- frames[1] = contentHeaderFrame;
- CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
- _protocolHandler.writeFrame(compositeFrame, wait);
-
- if (message != origMessage)
- {
- _logger.debug("Updating original message");
- origMessage.setJMSPriority(message.getJMSPriority());
- origMessage.setJMSTimestamp(message.getJMSTimestamp());
- _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
- origMessage.setJMSExpiration(message.getJMSExpiration());
- origMessage.setJMSMessageID(message.getJMSMessageID());
- }
+ sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive,
+ mandatory, immediate, wait);
}
+ public abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, int deliveryMode,
+ int priority, long timeToLive, boolean mandatory, boolean immediate, boolean wait)throws JMSException;
+
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
if (destination instanceof TemporaryDestination)
@@ -578,60 +483,6 @@
throw new JMSException("Cannot send to a deleted temporary destination");
}
}
- }
-
- /**
- * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
- * maximum frame size.
- *
- * @param payload
- * @param frames
- * @param offset
- * @param channelId @return the array of content bodies
- */
- private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
- {
-
- if (frames.length == (offset + 1))
- {
- frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
- }
- else
- {
-
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
- long remaining = payload.remaining();
- for (int i = offset; i < frames.length; i++)
- {
- payload.position((int) framePayloadMax * (i - offset));
- int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
- payload.limit(payload.position() + length);
- frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
-
- remaining -= length;
- }
- }
-
- }
-
- private int calculateContentBodyFrameCount(ByteBuffer payload)
- {
- // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
- // (0xCE byte).
- int frameCount;
- if ((payload == null) || (payload.remaining() == 0))
- {
- frameCount = 0;
- }
- else
- {
- int dataLength = payload.remaining();
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
- int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
- frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
- }
-
- return frameCount;
}
public void setMimeType(String mimeType) throws JMSException
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=573039&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Wed Sep 5 12:54:24 2007
@@ -0,0 +1,199 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.CompositeAMQDataBlock;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+
+public class BasicMessageProducer_0_8 extends BasicMessageProducer
+{
+
+ BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
+ boolean waitUntilSent)
+ {
+ super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory,waitUntilSent);
+ }
+
+ public void declareDestination(AMQDestination destination)
+ {
+ // Declare the exchange
+ // Note that the durable and internal arguments are ignored since passive is set to false
+ // TODO: Be aware of possible changes to parameter order as versions change.
+ AMQFrame declare =
+ ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), null, // arguments
+ false, // autoDelete
+ false, // durable
+ destination.getExchangeName(), // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ _session.getTicket(), // ticket
+ destination.getExchangeClass()); // type
+ _protocolHandler.writeFrame(declare);
+ }
+
+ public void sendMessage(AMQDestination destination, Message origMessage,AbstractJMSMessage message,
+ int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
+ {
+// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame publishFrame =
+ BasicPublishBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), // exchange
+ immediate, // immediate
+ mandatory, // mandatory
+ destination.getRoutingKey(), // routingKey
+ _session.getTicket()); // ticket
+
+ message.prepareForSending();
+ ByteBuffer payload = message.getData();
+ BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
+
+ if (!_disableTimestamps)
+ {
+ final long currentTime = System.currentTimeMillis();
+ contentHeaderProperties.setTimestamp(currentTime);
+
+ if (timeToLive > 0)
+ {
+ contentHeaderProperties.setExpiration(currentTime + timeToLive);
+ }
+ else
+ {
+ contentHeaderProperties.setExpiration(0);
+ }
+ }
+
+ contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
+ contentHeaderProperties.setPriority((byte) priority);
+
+ final int size = (payload != null) ? payload.limit() : 0;
+ final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
+ final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
+
+ if (payload != null)
+ {
+ createContentBodies(payload, frames, 2, _channelId);
+ }
+
+ if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
+ {
+ _logger.debug("Sending content body frames to " + destination);
+ }
+
+ // weight argument of zero indicates no child content headers, just bodies
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ AMQFrame contentHeaderFrame =
+ ContentHeaderBody.createAMQFrame(_channelId,
+ BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Sending content header frame to " + destination);
+ }
+
+ frames[0] = publishFrame;
+ frames[1] = contentHeaderFrame;
+ CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+ _protocolHandler.writeFrame(compositeFrame, wait);
+
+ if (message != origMessage)
+ {
+ _logger.debug("Updating original message");
+ origMessage.setJMSPriority(message.getJMSPriority());
+ origMessage.setJMSTimestamp(message.getJMSTimestamp());
+ _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+ origMessage.setJMSExpiration(message.getJMSExpiration());
+ origMessage.setJMSMessageID(message.getJMSMessageID());
+ }
+ }
+
+ /**
+ * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+ * maximum frame size.
+ *
+ * @param payload
+ * @param frames
+ * @param offset
+ * @param channelId @return the array of content bodies
+ */
+ private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
+ {
+
+ if (frames.length == (offset + 1))
+ {
+ frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
+ }
+ else
+ {
+
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ long remaining = payload.remaining();
+ for (int i = offset; i < frames.length; i++)
+ {
+ payload.position((int) framePayloadMax * (i - offset));
+ int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
+ payload.limit(payload.position() + length);
+ frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
+
+ remaining -= length;
+ }
+ }
+
+ }
+
+ private int calculateContentBodyFrameCount(ByteBuffer payload)
+ {
+ // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+ // (0xCE byte).
+ int frameCount;
+ if ((payload == null) || (payload.remaining() == 0))
+ {
+ frameCount = 0;
+ }
+ else
+ {
+ int dataLength = payload.remaining();
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
+ frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
+ }
+
+ return frameCount;
+ }
+
+}