You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC
svn commit: r1187150 [30/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Fri Oct 21 01:19:00 2011
@@ -19,11 +19,10 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
-import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
@@ -66,13 +65,19 @@ public class BasicMessageConsumer_0_10 e
private boolean _preAcquire = true;
/**
+ * Indicate whether this consumer is started.
+ */
+ private boolean _isStarted = false;
+
+ /**
* Specify whether this consumer is performing a sync receive
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
private String _consumerTagString;
private long capacity = 0;
-
+
+ //--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
@@ -98,6 +103,7 @@ public class BasicMessageConsumer_0_10 e
_preAcquire = false;
}
}
+ _isStarted = connection.started();
// Destination setting overrides connection defaults
if (destination.getDestSyntax() == DestSyntax.ADDR &&
@@ -150,20 +156,13 @@ public class BasicMessageConsumer_0_10 e
{
if (isMessageListenerSet() && capacity == 0)
{
- messageFlow();
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
_logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage);
}
- else
- {
- // if we are synchronously waiting for a message
- // and messages are not pre-fetched we then need to request another one
- if(capacity == 0)
- {
- messageFlow();
- }
- }
}
catch (AMQException e)
{
@@ -172,6 +171,8 @@ public class BasicMessageConsumer_0_10 e
}
}
+ //----- overwritten methods
+
/**
* This method is invoked when this consumer is stopped.
* It tells the broker to stop delivering messages to this consumer.
@@ -201,18 +202,11 @@ public class BasicMessageConsumer_0_10 e
super.notifyMessage(messageFrame);
}
- @Override
- protected void preDeliver(AbstractJMSMessage jmsMsg)
+ @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
{
- super.preDeliver(jmsMsg);
-
- if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
+ super.preApplicationProcessing(jmsMsg);
+ if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE)
{
- //For 0-10 we need to ensure that all messages are indicated processed in some way to
- //ensure their AMQP command-id is marked completed, and so we must send a completion
- //even for no-ack messages even though there isnt actually an 'acknowledgement' occurring.
- //Add message to the unacked message list to ensure we dont lose record of it before
- //sending a completion of some sort.
_session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
}
}
@@ -224,6 +218,7 @@ public class BasicMessageConsumer_0_10 e
return _messageFactory.createMessage(msg.getMessageTransfer());
}
+ // private methods
/**
* Check whether a message can be delivered to this consumer.
*
@@ -252,7 +247,6 @@ public class BasicMessageConsumer_0_10 e
_logger.debug("messageOk " + messageOk);
_logger.debug("_preAcquire " + _preAcquire);
}
-
if (!messageOk)
{
if (_preAcquire)
@@ -269,12 +263,19 @@ public class BasicMessageConsumer_0_10 e
{
if (_logger.isDebugEnabled())
{
- _logger.debug("filterMessage - not ack'ing message as not acquired");
+ _logger.debug("Message not OK, releasing");
}
- flushUnwantedMessage(message);
+ releaseMessage(message);
+ }
+ // if we are syncrhonously waiting for a message
+ // and messages are not prefetched we then need to request another one
+ if(capacity == 0)
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
-
// now we need to acquire this message if needed
// this is the case of queue with a message selector set
if (!_preAcquire && messageOk && !isNoConsume())
@@ -286,7 +287,6 @@ public class BasicMessageConsumer_0_10 e
messageOk = acquireMessage(message);
_logger.debug("filterMessage - message acquire status : " + messageOk);
}
-
return messageOk;
}
@@ -297,38 +297,38 @@ public class BasicMessageConsumer_0_10 e
* @param message The message to be acknowledged
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException
+ private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException
{
- final RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- _0_10session.messageAcknowledge
- (ranges,
- _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
-
- final AMQException amqe = _0_10session.getCurrentException();
- if (amqe != null)
+ if (!_preAcquire)
{
- throw amqe;
+ RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
+ _0_10session.messageAcknowledge
+ (ranges,
+ _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+
+ AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
}
}
/**
- * Flush an unwanted message. For 0-10 we need to ensure that all messages are indicated
- * processed to ensure their AMQP command-id is marked completed.
+ * Release a message
*
- * @param message The unwanted message to be flushed
- * @throws AMQException If the unwanted message cannot be flushed due to some internal error.
+ * @param message The message to be released
+ * @throws AMQException If the message cannot be released due to some internal error.
*/
- private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException
+ private void releaseMessage(AbstractJMSMessage message) throws AMQException
{
- final RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- _0_10session.flushProcessed(ranges,false);
-
- final AMQException amqe = _0_10session.getCurrentException();
- if (amqe != null)
+ if (_preAcquire)
{
- throw amqe;
+ RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
+ _0_10session.getQpidSession().messageRelease(ranges);
+ _0_10session.sync();
}
}
@@ -339,52 +339,44 @@ public class BasicMessageConsumer_0_10 e
* @return true if the message has been acquired, false otherwise.
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException
+ private boolean acquireMessage(AbstractJMSMessage message) throws AMQException
{
boolean result = false;
- final RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
+ if (!_preAcquire)
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
- final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
+ Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
- final RangeSet acquired = acq.getTransfers();
- if (acquired != null && acquired.size() > 0)
- {
- result = true;
+ RangeSet acquired = acq.getTransfers();
+ if (acquired != null && acquired.size() > 0)
+ {
+ result = true;
+ }
}
return result;
}
- private void messageFlow()
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
- }
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
super.setMessageListener(messageListener);
- try
+ if (messageListener != null && capacity == 0)
{
- if (messageListener != null && capacity == 0)
- {
- messageFlow();
- }
- if (messageListener != null && !_synchronousQueue.isEmpty())
- {
- Iterator messages=_synchronousQueue.iterator();
- while (messages.hasNext())
- {
- AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
- messages.remove();
- _session.rejectMessage(message, true);
- }
- }
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
- catch(TransportException e)
+ if (messageListener != null && !_synchronousQueue.isEmpty())
{
- throw _session.toJMSException("Exception while setting message listener:"+ e.getMessage(), e);
+ Iterator messages=_synchronousQueue.iterator();
+ while (messages.hasNext())
+ {
+ AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
+ messages.remove();
+ _session.rejectMessage(message, true);
+ }
}
}
@@ -392,7 +384,9 @@ public class BasicMessageConsumer_0_10 e
{
if (_0_10session.isStarted() && _syncReceive.get())
{
- messageFlow();
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
@@ -413,7 +407,9 @@ public class BasicMessageConsumer_0_10 e
}
if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
{
- messageFlow();
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
Object o = super.getMessageFromQueue(l);
if (o == null && _0_10session.isStarted())
@@ -444,7 +440,7 @@ public class BasicMessageConsumer_0_10 e
return o;
}
- void postDeliver(AbstractJMSMessage msg)
+ void postDeliver(AbstractJMSMessage msg) throws JMSException
{
super.postDeliver(msg);
if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
@@ -453,8 +449,10 @@ public class BasicMessageConsumer_0_10 e
}
if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE &&
- !_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
+ !_session.isInRecovery() &&
+ _session.getAMQConnection().getSyncAck())
{
+ ((AMQSession_0_10) getSession()).flushAcknowledgments();
((AMQSession_0_10) getSession()).getQpidSession().sync();
}
}
@@ -511,18 +509,4 @@ public class BasicMessageConsumer_0_10 e
return _exclusive;
}
}
-
- void cleanupQueue() throws AMQException, FailoverException
- {
- AMQDestination dest = this.getDestination();
- if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
- {
- if (dest.getDelete() == AddressOption.ALWAYS ||
- dest.getDelete() == AddressOption.RECEIVER )
- {
- ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
- this.getDestination().getQueueName());
- }
- }
- }
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Fri Oct 21 01:19:00 2011
@@ -88,8 +88,4 @@ public class BasicMessageConsumer_0_8 ex
return receive();
}
- void cleanupQueue() throws AMQException, FailoverException
- {
-
- }
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Fri Oct 21 01:19:00 2011
@@ -39,7 +39,6 @@ import org.apache.qpid.client.message.Ab
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
import org.slf4j.Logger;
@@ -114,6 +113,8 @@ public abstract class BasicMessageProduc
private final boolean _mandatory;
+ private final boolean _waitUntilSent;
+
private boolean _disableMessageId;
private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
@@ -125,7 +126,8 @@ public abstract class BasicMessageProduc
protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
+ boolean waitUntilSent) throws AMQException
{
_connection = connection;
_destination = destination;
@@ -141,6 +143,7 @@ public abstract class BasicMessageProduc
_immediate = immediate;
_mandatory = mandatory;
+ _waitUntilSent = waitUntilSent;
_userID = connection.getUsername();
setPublishMode();
}
@@ -263,7 +266,7 @@ public abstract class BasicMessageProduc
return _destination;
}
- public void close() throws JMSException
+ public void close()
{
_closed.set(true);
_session.deregisterProducer(_producerId);
@@ -360,6 +363,19 @@ public abstract class BasicMessageProduc
}
}
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
+ boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
+ {
+ checkPreConditions();
+ checkDestination(destination);
+ synchronized (_connection.getFailoverMutex())
+ {
+ validateDestination(destination);
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
+ waitUntilSent);
+ }
+ }
+
private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
{
if (message instanceof AbstractJMSMessage)
@@ -434,6 +450,12 @@ public abstract class BasicMessageProduc
}
}
+ protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
+ boolean mandatory, boolean immediate) throws JMSException
+ {
+ sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
+ }
+
/**
* The caller of this method must hold the failover mutex.
*
@@ -448,13 +470,23 @@ public abstract class BasicMessageProduc
* @throws JMSException
*/
protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate) throws JMSException
+ boolean mandatory, boolean immediate, boolean wait) throws JMSException
{
checkTemporaryDestination(destination);
origMessage.setJMSDestination(destination);
AbstractJMSMessage message = convertToNativeMessage(origMessage);
+ if (_transacted)
+ {
+ if (_session.hasFailedOver() && _session.isDirty())
+ {
+ throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.",
+ new AMQSessionDirtyException("Failover has occurred and session is dirty " +
+ "so unable to send."));
+ }
+ }
+
UUID messageId = null;
if (_disableMessageId)
{
@@ -466,14 +498,7 @@ public abstract class BasicMessageProduc
message.setJMSMessageID(messageId);
}
- try
- {
- sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate);
- }
- catch (TransportException e)
- {
- throw getSession().toJMSException("Exception whilst sending:" + e.getMessage(), e);
- }
+ sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
if (message != origMessage)
{
@@ -493,7 +518,7 @@ public abstract class BasicMessageProduc
abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
- boolean immediate) throws JMSException;
+ boolean immediate, boolean wait) throws JMSException;
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
@@ -571,13 +596,6 @@ public abstract class BasicMessageProduc
public boolean isBound(AMQDestination destination) throws JMSException
{
- try
- {
- return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
- }
- catch (TransportException e)
- {
- throw getSession().toJMSException("Exception whilst checking destination binding:" + e.getMessage(), e);
- }
+ return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
}
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Fri Oct 21 01:19:00 2011
@@ -19,7 +19,6 @@ package org.apache.qpid.client;
import static org.apache.qpid.transport.Option.NONE;
import static org.apache.qpid.transport.Option.SYNC;
-import static org.apache.qpid.transport.Option.UNRELIABLE;
import java.nio.ByteBuffer;
import java.util.HashMap;
@@ -31,12 +30,9 @@ import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.QpidMessageProperties;
-import org.apache.qpid.client.messaging.address.Link.Reliability;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
@@ -46,7 +42,6 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,9 +56,10 @@ public class BasicMessageProducer_0_10 e
BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
- boolean immediate, boolean mandatory) throws AMQException
+ boolean immediate, boolean mandatory, boolean waitUntilSent) throws AMQException
{
- super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
+ super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate,
+ mandatory, waitUntilSent);
userIDBytes = Strings.toUTF8(_userID);
}
@@ -72,15 +68,12 @@ public class BasicMessageProducer_0_10 e
{
if (destination.getDestSyntax() == DestSyntax.BURL)
{
- if (getSession().isDeclareExchanges())
- {
- String name = destination.getExchangeName().toString();
- ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare
- (name,
- destination.getExchangeClass().toString(),
- null, null,
- name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
- }
+ String name = destination.getExchangeName().toString();
+ ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare
+ (name,
+ destination.getExchangeClass().toString(),
+ null, null,
+ name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
}
else
{
@@ -103,7 +96,7 @@ public class BasicMessageProducer_0_10 e
*/
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
- boolean immediate) throws JMSException
+ boolean immediate, boolean wait) throws JMSException
{
message.prepareForSending();
@@ -178,7 +171,7 @@ public class BasicMessageProducer_0_10 e
if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR &&
(destination.getSubject() != null ||
- (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT) != null))
+ (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get("qpid.subject") != null))
)
{
Map<String,Object> appProps = messageProps.getApplicationHeaders();
@@ -188,21 +181,20 @@ public class BasicMessageProducer_0_10 e
messageProps.setApplicationHeaders(appProps);
}
- if (appProps.get(QpidMessageProperties.QPID_SUBJECT) == null)
+ if (appProps.get("qpid.subject") == null)
{
// use default subject in address string
- appProps.put(QpidMessageProperties.QPID_SUBJECT,destination.getSubject());
+ appProps.put("qpid.subject",destination.getSubject());
}
- if (destination.getAddressType() == AMQDestination.TOPIC_TYPE)
+ if (destination.getTargetNode().getType() == AMQDestination.TOPIC_TYPE)
{
deliveryProp.setRoutingKey((String)
- messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT));
+ messageProps.getApplicationHeaders().get("qpid.subject"));
}
}
-
- ByteBuffer data = message.getData();
- messageProps.setContentLength(data.remaining());
+
+ messageProps.setContentLength(message.getContentLength());
// send the message
try
@@ -218,17 +210,14 @@ public class BasicMessageProducer_0_10 e
deliveryMode == DeliveryMode.PERSISTENT)
);
- boolean unreliable = (destination.getDestSyntax() == DestSyntax.ADDR) &&
- (destination.getLink().getReliability() == Reliability.UNRELIABLE);
-
-
- ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.slice();
+ org.apache.mina.common.ByteBuffer data = message.getData();
+ ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
ssn.messageTransfer(destination.getExchangeName() == null ? "" : destination.getExchangeName().toString(),
MessageAcceptMode.NONE,
MessageAcquireMode.PRE_ACQUIRED,
new Header(deliveryProp, messageProps),
- buffer, sync ? SYNC : NONE, unreliable ? UNRELIABLE : NONE);
+ buffer, sync ? SYNC : NONE);
if (sync)
{
ssn.sync();
@@ -245,34 +234,10 @@ public class BasicMessageProducer_0_10 e
}
}
- @Override
+
public boolean isBound(AMQDestination destination) throws JMSException
{
return _session.isQueueBound(destination);
}
-
- @Override
- public void close() throws JMSException
- {
- super.close();
- AMQDestination dest = _destination;
- if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
- {
- if (dest.getDelete() == AddressOption.ALWAYS ||
- dest.getDelete() == AddressOption.SENDER )
- {
- try
- {
- ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
- _destination.getQueueName());
- }
- catch(TransportException e)
- {
- throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
- }
- }
- }
- }
-
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Fri Oct 21 01:19:00 2011
@@ -27,13 +27,14 @@ import javax.jms.Message;
import javax.jms.Topic;
import javax.jms.Queue;
-import java.nio.ByteBuffer;
-
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.AMQMessageDelegate;
import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.CompositeAMQDataBlock;
@@ -45,9 +46,10 @@ public class BasicMessageProducer_0_8 ex
{
BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
+ boolean waitUntilSent) throws AMQException
{
- super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
+ super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory,waitUntilSent);
}
void declareDestination(AMQDestination destination)
@@ -72,7 +74,7 @@ public class BasicMessageProducer_0_8 ex
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
- boolean immediate) throws JMSException
+ boolean immediate, boolean wait) throws JMSException
{
BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
destination.getExchangeName(),
@@ -167,7 +169,7 @@ public class BasicMessageProducer_0_8 ex
throw jmse;
}
- _protocolHandler.writeFrame(compositeFrame);
+ _protocolHandler.writeFrame(compositeFrame, wait);
}
/**
@@ -184,9 +186,7 @@ public class BasicMessageProducer_0_8 ex
if (frames.length == (offset + 1))
{
- byte[] data = new byte[payload.remaining()];
- payload.get(data);
- frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(data));
+ frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
}
else
{
@@ -198,10 +198,7 @@ public class BasicMessageProducer_0_8 ex
payload.position((int) framePayloadMax * (i - offset));
int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
payload.limit(payload.position() + length);
- byte[] data = new byte[payload.remaining()];
- payload.get(data);
-
- frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(data));
+ frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
remaining -= length;
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java Fri Oct 21 01:19:00 2011
@@ -1,23 +1,3 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
package org.apache.qpid.client;
import java.util.ArrayList;
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java Fri Oct 21 01:19:00 2011
@@ -23,7 +23,6 @@ package org.apache.qpid.client;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
-import java.util.List;
import org.apache.qpid.framing.AMQShortString;
@@ -35,18 +34,6 @@ public enum CustomJMSXProperty
JMSXGroupSeq,
JMSXUserID;
- private static List<String> _names;
-
- static
- {
- CustomJMSXProperty[] properties = values();
- _names = new ArrayList<String>(properties.length);
- for(CustomJMSXProperty property : properties)
- {
- _names.add(property.toString());
- }
-
- }
private final AMQShortString _nameAsShortString;
@@ -60,8 +47,20 @@ public enum CustomJMSXProperty
return _nameAsShortString;
}
- public static Enumeration asEnumeration()
+ private static Enumeration _names;
+
+ public static synchronized Enumeration asEnumeration()
{
- return Collections.enumeration(_names);
+ if(_names == null)
+ {
+ CustomJMSXProperty[] properties = values();
+ ArrayList<String> nameList = new ArrayList<String>(properties.length);
+ for(CustomJMSXProperty property : properties)
+ {
+ nameList.add(property.toString());
+ }
+ _names = Collections.enumeration(nameList);
+ }
+ return _names;
}
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java Fri Oct 21 01:19:00 2011
@@ -30,11 +30,9 @@ import org.apache.qpid.common.QpidProper
public class QpidConnectionMetaData implements ConnectionMetaData
{
- private AMQConnection con;
QpidConnectionMetaData(AMQConnection conn)
{
- this.con = conn;
}
public int getJMSMajorVersion() throws JMSException
@@ -64,12 +62,12 @@ public class QpidConnectionMetaData impl
public int getProviderMajorVersion() throws JMSException
{
- return con.getProtocolVersion().getMajorVersion();
+ return 0;
}
public int getProviderMinorVersion() throws JMSException
{
- return con.getProtocolVersion().getMinorVersion();
+ return 8;
}
public String getProviderVersion() throws JMSException
@@ -80,7 +78,8 @@ public class QpidConnectionMetaData impl
private String getProtocolVersion()
{
- return con.getProtocolVersion().toString();
+ // TODO - Implement based on connection negotiated protocol
+ return "0.8";
}
public String getBrokerVersion()
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java Fri Oct 21 01:19:00 2011
@@ -50,25 +50,25 @@ public class QueueSenderAdapter implemen
public void send(Message msg) throws JMSException
{
- checkQueuePreConditions(_queue);
+ checkPreConditions();
_delegate.send(msg);
}
public void send(Queue queue, Message msg) throws JMSException
{
- checkQueuePreConditions(queue);
+ checkPreConditions(queue);
_delegate.send(queue, msg);
}
public void publish(Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException
{
- checkQueuePreConditions(_queue);
+ checkPreConditions();
_delegate.send(msg, deliveryMode, priority, timeToLive);
}
public void send(Queue queue, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException
{
- checkQueuePreConditions(queue);
+ checkPreConditions(queue);
_delegate.send(queue, msg, deliveryMode, priority, timeToLive);
}
@@ -122,19 +122,19 @@ public class QueueSenderAdapter implemen
public void send(Destination dest, Message msg) throws JMSException
{
- checkQueuePreConditions((Queue) dest);
+ checkPreConditions((Queue) dest);
_delegate.send(dest, msg);
}
public void send(Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException
{
- checkQueuePreConditions(_queue);
+ checkPreConditions();
_delegate.send(msg, deliveryMode, priority, timeToLive);
}
public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException
{
- checkQueuePreConditions((Queue) dest);
+ checkPreConditions((Queue) dest);
_delegate.send(dest, msg, deliveryMode, priority, timeToLive);
}
@@ -170,6 +170,11 @@ public class QueueSenderAdapter implemen
private void checkPreConditions() throws JMSException
{
+ checkPreConditions(_queue);
+ }
+
+ private void checkPreConditions(Queue queue) throws JMSException
+ {
if (closed)
{
throw new javax.jms.IllegalStateException("Publisher is closed");
@@ -181,43 +186,39 @@ public class QueueSenderAdapter implemen
{
throw new javax.jms.IllegalStateException("Invalid Session");
}
- }
- private void checkQueuePreConditions(Queue queue) throws JMSException
- {
- checkPreConditions() ;
-
- if (queue == null)
- {
- throw new UnsupportedOperationException("Queue is null.");
- }
-
- if (!(queue instanceof AMQDestination))
- {
- throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue");
- }
-
- AMQDestination destination = (AMQDestination) queue;
- if (!destination.isCheckedForQueueBinding() && checkQueueBeforePublish())
- {
- if (_delegate.getSession().isStrictAMQP())
- {
- _delegate._logger.warn("AMQP does not support destination validation before publish, ");
- destination.setCheckedForQueueBinding(true);
- }
- else
- {
- if (_delegate.isBound(destination))
- {
- destination.setCheckedForQueueBinding(true);
- }
- else
- {
- throw new InvalidDestinationException("Queue: " + queue
- + " is not a valid destination (no bindings on server");
- }
- }
- }
+ if (queue == null)
+ {
+ throw new UnsupportedOperationException("Queue is null.");
+ }
+
+ if (!(queue instanceof AMQDestination))
+ {
+ throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue");
+ }
+
+ AMQDestination destination = (AMQDestination) queue;
+ if (!destination.isCheckedForQueueBinding() && checkQueueBeforePublish())
+ {
+
+ if (_delegate.getSession().isStrictAMQP())
+ {
+ _delegate._logger.warn("AMQP does not support destination validation before publish, ");
+ destination.setCheckedForQueueBinding(true);
+ }
+ else
+ {
+ if (_delegate.isBound(destination))
+ {
+ destination.setCheckedForQueueBinding(true);
+ }
+ else
+ {
+ throw new InvalidDestinationException("Queue: " + queue
+ + " is not a valid destination (no bindings on server");
+ }
+ }
+ }
}
private boolean checkQueueBeforePublish()
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java Fri Oct 21 01:19:00 2011
@@ -24,16 +24,13 @@ package org.apache.qpid.client;
import javax.jms.Destination;
import javax.jms.JMSException;
-import org.apache.qpid.framing.AMQShortString;
-
/**
- * Provides support for convenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
+ * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
* so that operations related to their "temporary-ness" can be abstracted out.
*/
interface TemporaryDestination extends Destination
{
- public AMQShortString getAMQQueueName();
public void delete() throws JMSException;
public AMQSession getSession();
public boolean isDeleted();
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java Fri Oct 21 01:19:00 2011
@@ -31,9 +31,9 @@ public class XAConnectionImpl extends AM
/**
* Create a XAConnection from a connectionURL
*/
- public XAConnectionImpl(ConnectionURL connectionURL) throws AMQException
+ public XAConnectionImpl(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
- super(connectionURL);
+ super(connectionURL, sslConfig);
}
//-- interface XAConnection
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java Fri Oct 21 01:19:00 2011
@@ -21,14 +21,10 @@ import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.dtx.XidImpl;
-import org.apache.qpid.transport.DtxXaStatus;
-import org.apache.qpid.transport.ExecutionErrorCode;
-import org.apache.qpid.transport.Future;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.RecoverResult;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.XaResult;
+import org.apache.qpid.transport.*;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -215,28 +211,9 @@ public class XAResourceImpl implements X
* @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
*/
public boolean isSameRM(XAResource xaResource) throws XAException
- {
- if(this == xaResource)
- {
- return true;
- }
- if(!(xaResource instanceof XAResourceImpl))
- {
- return false;
- }
-
- XAResourceImpl other = (XAResourceImpl)xaResource;
-
- String myUUID = ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID();
- String otherUUID = ((AMQSession_0_10)other._xaSession).getAMQConnection().getBrokerUUID();
-
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Comparing my UUID " + myUUID + " with other UUID " + otherUUID);
- }
-
- return (myUUID != null && otherUUID != null && myUUID.equals(otherUUID));
-
+ {
+ // TODO : get the server identity of xaResource and compare it with our own one
+ return false;
}
/**
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Fri Oct 21 01:19:00 2011
@@ -52,7 +52,7 @@ public class XASessionImpl extends AMQSe
{
super(qpidConnection, con, channelId, false, // this is not a transacted session
Session.AUTO_ACKNOWLEDGE, // the ack mode is transacted
- MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow,null);
+ MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
createSession();
_xaResource = new XAResourceImpl(this);
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java Fri Oct 21 01:19:00 2011
@@ -59,8 +59,8 @@ import org.slf4j.LoggerFactory;
* <tr><td> Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception.
* </table>
*
- * @todo Another continuation. Could use an interface Continuation (as described in other todos)
- * Then have a wrapping continuation (this), which blocks on an arbitrary
+ * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see
+ * {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary
* Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation.
* Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type
* to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that.
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Fri Oct 21 01:19:00 2011
@@ -78,7 +78,7 @@ public class ChannelCloseMethodHandler i
{
throw new AMQNoRouteException("Error: " + reason, null, null);
}
- else if (errorCode == AMQConstant.ARGUMENT_INVALID)
+ else if (errorCode == AMQConstant.INVALID_ARGUMENT)
{
_logger.debug("Broker responded with Invalid Argument.");
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Fri Oct 21 01:19:00 2011
@@ -20,13 +20,6 @@
*/
package org.apache.qpid.client.handler;
-import java.io.UnsupportedEncodingException;
-import java.util.StringTokenizer;
-
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.security.AMQCallbackHandler;
@@ -41,9 +34,18 @@ import org.apache.qpid.framing.Connectio
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.ProtocolVersion;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.StringTokenizer;
+
public class ConnectionStartMethodHandler implements StateAwareMethodListener<ConnectionStartBody>
{
private static final Logger _log = LoggerFactory.getLogger(ConnectionStartMethodHandler.class);
@@ -195,20 +197,40 @@ public class ConnectionStartMethodHandle
private String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException
{
final String mechanisms = new String(availableMechanisms, "utf8");
- return CallbackHandlerRegistry.getInstance().selectMechanism(mechanisms);
+ StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
+ HashSet mechanismSet = new HashSet();
+ while (tokenizer.hasMoreTokens())
+ {
+ mechanismSet.add(tokenizer.nextToken());
+ }
+
+ String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms();
+ StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " ");
+ while (prefTokenizer.hasMoreTokens())
+ {
+ String mech = prefTokenizer.nextToken();
+ if (mechanismSet.contains(mech))
+ {
+ return mech;
+ }
+ }
+
+ return null;
}
private AMQCallbackHandler createCallbackHandler(String mechanism, AMQProtocolSession protocolSession)
throws AMQException
{
+ Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism);
try
{
- AMQCallbackHandler instance = CallbackHandlerRegistry.getInstance().createCallbackHandler(mechanism);
- instance.initialise(protocolSession.getAMQConnection().getConnectionURL());
+ Object instance = mechanismClass.newInstance();
+ AMQCallbackHandler cbh = (AMQCallbackHandler) instance;
+ cbh.initialise(protocolSession);
- return instance;
+ return cbh;
}
- catch (IllegalArgumentException e)
+ catch (Exception e)
{
throw new AMQException(null, "Unable to create callback handler: " + e, e);
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java Fri Oct 21 01:19:00 2011
@@ -26,7 +26,9 @@ import org.apache.qpid.client.AMQSession
import javax.jms.Destination;
import javax.jms.JMSException;
+import java.nio.ByteBuffer;
import java.util.Enumeration;
+import java.util.Map;
import java.util.UUID;
public interface AMQMessageDelegate
@@ -128,9 +130,9 @@ public interface AMQMessageDelegate
void removeProperty(final String propertyName) throws JMSException;
- void setAMQSession(final AMQSession<?,?> s);
+ void setAMQSession(final AMQSession s);
- AMQSession<?,?> getAMQSession();
+ AMQSession getAMQSession();
long getDeliveryTag();
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java Fri Oct 21 01:19:00 2011
@@ -21,6 +21,11 @@
package org.apache.qpid.client.message;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+
public interface AMQMessageDelegateFactory<D extends AMQMessageDelegate>
{
public static AMQMessageDelegateFactory DEFAULT_FACTORY = null;
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Fri Oct 21 01:19:00 2011
@@ -22,12 +22,10 @@
package org.apache.qpid.client.message;
import java.lang.ref.SoftReference;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -37,10 +35,12 @@ import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotWriteableException;
+import javax.jms.Session;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQPInvalidClassException;
import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
@@ -53,9 +53,6 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.ReplyTo;
-import org.apache.qpid.transport.TransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This extends AbstractAMQMessageDelegate which contains common code between
@@ -64,7 +61,6 @@ import org.slf4j.LoggerFactory;
*/
public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
{
- private static final Logger _logger = LoggerFactory.getLogger(AMQMessageDelegate_0_10.class);
private static final Map<ReplyTo, SoftReference<Destination>> _destinationCache = Collections.synchronizedMap(new HashMap<ReplyTo, SoftReference<Destination>>());
public static final String JMS_TYPE = "x-jms-type";
@@ -74,8 +70,13 @@ public class AMQMessageDelegate_0_10 ext
private Destination _destination;
+
private MessageProperties _messageProps;
private DeliveryProperties _deliveryProps;
+ /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
+ private AMQSession _session;
+ private final long _deliveryTag;
+
protected AMQMessageDelegate_0_10()
{
@@ -85,29 +86,15 @@ public class AMQMessageDelegate_0_10 ext
protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag)
{
- super(deliveryTag);
_messageProps = messageProps;
_deliveryProps = deliveryProps;
+ _deliveryTag = deliveryTag;
_readableProperties = (_messageProps != null);
AMQDestination dest;
- if (AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)
- {
- dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()),
+ dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()),
new AMQShortString(_deliveryProps.getRoutingKey()));
- }
- else
- {
- String subject = null;
- if (messageProps != null && messageProps.getApplicationHeaders() != null)
- {
- subject = (String)messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT);
- }
- dest = (AMQDestination) convertToAddressBasedDestination(_deliveryProps.getExchange(),
- _deliveryProps.getRoutingKey(), subject);
- }
-
setJMSDestination(dest);
}
@@ -198,6 +185,7 @@ public class AMQMessageDelegate_0_10 ext
}
}
+
public long getJMSTimestamp() throws JMSException
{
return _deliveryProps.getTimestamp();
@@ -252,50 +240,13 @@ public class AMQMessageDelegate_0_10 ext
String exchange = replyTo.getExchange();
String routingKey = replyTo.getRoutingKey();
- if (AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)
- {
-
- dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
- }
- else
- {
- dest = convertToAddressBasedDestination(exchange,routingKey,null);
- }
+ dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
_destinationCache.put(replyTo, new SoftReference<Destination>(dest));
}
return dest;
}
}
-
- private Destination convertToAddressBasedDestination(String exchange, String routingKey, String subject)
- {
- String addr;
- if ("".equals(exchange)) // type Queue
- {
- subject = (subject == null) ? "" : "/" + subject;
- addr = routingKey + subject;
- }
- else
- {
- addr = exchange + "/" + routingKey;
- }
-
- try
- {
- return AMQDestination.createDestination("ADDR:" + addr);
- }
- catch(Exception e)
- {
- // An exception is only thrown here if the address syntax is invalid.
- // Logging the exception, but not throwing as this is only important to Qpid developers.
- // An exception here means a bug in the code.
- _logger.error("Exception when constructing an address string from the ReplyTo struct");
-
- // falling back to the old way of doing it to ensure the application continues.
- return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
- }
- }
public void setJMSReplyTo(Destination destination) throws JMSException
{
@@ -317,14 +268,14 @@ public class AMQMessageDelegate_0_10 ext
{
try
{
- int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd);
+ int type = ((AMQSession_0_10)_session).resolveAddressType(amqd);
if (type == AMQDestination.QUEUE_TYPE)
{
- ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForQueueType(amqd);
+ ((AMQSession_0_10)_session).setLegacyFiledsForQueueType(amqd);
}
else
{
- ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForTopicType(amqd);
+ ((AMQSession_0_10)_session).setLegacyFiledsForTopicType(amqd);
}
}
catch(AMQException ex)
@@ -334,14 +285,6 @@ public class AMQMessageDelegate_0_10 ext
e.setLinkedException(ex);
throw e;
}
- catch (TransportException e)
- {
- JMSException jmse = new JMSException("Exception occured while figuring out the node type:" + e.getMessage());
- jmse.initCause(e);
- jmse.setLinkedException(e);
- throw jmse;
- }
-
}
final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString());
@@ -392,7 +335,7 @@ public class AMQMessageDelegate_0_10 ext
Destination replyTo = getJMSReplyTo();
if(replyTo != null)
{
- return ((AMQDestination)replyTo).toString();
+ return ((AMQDestination)replyTo).toURL();
}
else
{
@@ -689,16 +632,6 @@ public class AMQMessageDelegate_0_10 ext
{
return new String(_messageProps.getUserId());
}
- else if (QpidMessageProperties.AMQP_0_10_APP_ID.equals(propertyName) &&
- _messageProps.getAppId() != null)
- {
- return new String(_messageProps.getAppId());
- }
- else if (QpidMessageProperties.AMQP_0_10_ROUTING_KEY.equals(propertyName) &&
- _deliveryProps.getRoutingKey() != null)
- {
- return _deliveryProps.getRoutingKey();
- }
else
{
checkPropertyName(propertyName);
@@ -737,19 +670,7 @@ public class AMQMessageDelegate_0_10 ext
public Enumeration getPropertyNames() throws JMSException
{
- List<String> props = new ArrayList<String>();
- Map<String, Object> propertyMap = getApplicationHeaders();
- for (String prop: getApplicationHeaders().keySet())
- {
- Object value = propertyMap.get(prop);
- if (value instanceof Boolean || value instanceof Number
- || value instanceof String)
- {
- props.add(prop);
- }
- }
-
- return java.util.Collections.enumeration(props);
+ return java.util.Collections.enumeration(getApplicationHeaders().keySet());
}
public void setBooleanProperty(String propertyName, boolean b) throws JMSException
@@ -805,14 +726,7 @@ public class AMQMessageDelegate_0_10 ext
{
checkPropertyName(propertyName);
checkWritableProperties();
- if (QpidMessageProperties.AMQP_0_10_APP_ID.equals(propertyName))
- {
- _messageProps.setAppId(value.getBytes());
- }
- else
- {
- setApplicationHeader(propertyName, value);
- }
+ setApplicationHeader(propertyName, value);
}
private static final Set<Class> ALLOWED = new HashSet();
@@ -897,6 +811,64 @@ public class AMQMessageDelegate_0_10 ext
_readableProperties = false;
}
+
+ public void acknowledgeThis() throws JMSException
+ {
+ // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+ // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+ if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ if (_session.getAMQConnection().isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Connection is already closed");
+ }
+
+ // we set multiple to true here since acknowledgment implies acknowledge of all previous messages
+ // received on the session
+ _session.acknowledgeMessage(_deliveryTag, true);
+ }
+ }
+
+ public void acknowledge() throws JMSException
+ {
+ if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ _session.acknowledge();
+ }
+ }
+
+
+ /**
+ * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+ * acknowledge()
+ *
+ * @param s the AMQ session that delivered this message
+ */
+ public void setAMQSession(AMQSession s)
+ {
+ _session = s;
+ }
+
+ public AMQSession getAMQSession()
+ {
+ return _session;
+ }
+
+ /**
+ * Get the AMQ message number assigned to this message
+ *
+ * @return the message number
+ */
+ public long getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+
+
+
+
+
+
protected void checkPropertyName(CharSequence propertyName)
{
if (propertyName == null)
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Fri Oct 21 01:19:00 2011
@@ -30,6 +30,7 @@ import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageNotWriteableException;
+import javax.jms.Session;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
@@ -59,12 +60,15 @@ public class AMQMessageDelegate_0_8 exte
Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
private ContentHeaderProperties _contentHeaderProperties;
+ /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
+ private AMQSession _session;
+ private final long _deliveryTag;
// The base set of items that needs to be set.
private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
{
- super(deliveryTag);
_contentHeaderProperties = properties;
+ _deliveryTag = deliveryTag;
_readableProperties = (_contentHeaderProperties != null);
_headerAdapter = new JMSHeaderAdapter(_readableProperties ? ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()
: (new BasicContentHeaderProperties()).getHeaders() );
@@ -495,6 +499,7 @@ public class AMQMessageDelegate_0_8 exte
{
throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
}
+ _contentHeaderProperties.updated();
}
@@ -514,4 +519,58 @@ public class AMQMessageDelegate_0_8 exte
_readableProperties = false;
}
+
+
+ public void acknowledgeThis() throws JMSException
+ {
+ // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+ // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+ if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ if (_session.getAMQConnection().isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Connection is already closed");
+ }
+
+ // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+ // received on the session
+ _session.acknowledgeMessage(_deliveryTag, true);
+ }
+ }
+
+ public void acknowledge() throws JMSException
+ {
+ if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ _session.acknowledge();
+ }
+ }
+
+
+ /**
+ * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+ * acknowledge()
+ *
+ * @param s the AMQ session that delivered this message
+ */
+ public void setAMQSession(AMQSession s)
+ {
+ _session = s;
+ }
+
+ public AMQSession getAMQSession()
+ {
+ return _session;
+ }
+
+ /**
+ * Get the AMQ message number assigned to this message
+ *
+ * @return the message number
+ */
+ public long getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java Fri Oct 21 01:19:00 2011
@@ -23,12 +23,11 @@ package org.apache.qpid.client.message;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
-import java.nio.ByteBuffer;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.transport.codec.BBEncoder;
@@ -66,7 +65,7 @@ public class AMQPEncodedMapMessage exten
if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer)
|| (value instanceof Long) || (value instanceof Character) || (value instanceof Float)
|| (value instanceof Double) || (value instanceof String) || (value instanceof byte[])
- || (value instanceof List) || (value instanceof Map) || (value instanceof UUID) || (value == null))
+ || (value instanceof List) || (value instanceof Map) || (value == null))
{
_map.put(propName, value);
}
@@ -81,19 +80,18 @@ public class AMQPEncodedMapMessage exten
@ Override
public ByteBuffer getData()
{
- BBEncoder encoder = new BBEncoder(1024);
- encoder.writeMap(_map);
- return encoder.segment();
+ writeMapToData();
+ return _data;
}
@ Override
- protected void populateMapFromData(ByteBuffer data) throws JMSException
+ protected void populateMapFromData() throws JMSException
{
- if (data != null)
+ if (_data != null)
{
- data.rewind();
+ _data.rewind();
BBDecoder decoder = new BBDecoder();
- decoder.init(data);
+ decoder.init(_data.buf());
_map = decoder.readMap();
}
else
@@ -102,8 +100,16 @@ public class AMQPEncodedMapMessage exten
}
}
+ @ Override
+ protected void writeMapToData()
+ {
+ BBEncoder encoder = new BBEncoder(1024);
+ encoder.writeMap(_map);
+ _data = ByteBuffer.wrap(encoder.segment());
+ }
+
// for testing
- public Map<String,Object> getMap()
+ Map<String,Object> getMap()
{
return _map;
}
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java Fri Oct 21 01:19:00 2011
@@ -1,6 +1,6 @@
package org.apache.qpid.client.message;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,23 +8,22 @@ package org.apache.qpid.client.message;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import javax.jms.JMSException;
-import java.nio.ByteBuffer;
-
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory
@@ -37,7 +36,7 @@ public class AMQPEncodedMapMessageFactor
return new AMQPEncodedMapMessage(delegate,data);
}
-
+ @Override
public AbstractJMSMessage createMessage(
AMQMessageDelegateFactory delegateFactory) throws JMSException
{
Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java Fri Oct 21 01:19:00 2011
@@ -23,13 +23,9 @@ package org.apache.qpid.client.message;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import javax.jms.JMSException;
-import javax.jms.Session;
-
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -82,25 +78,7 @@ public abstract class AbstractAMQMessage
new ExchangeInfo(ExchangeDefaults.HEADERS_EXCHANGE_NAME.toString(),
ExchangeDefaults.HEADERS_EXCHANGE_CLASS.toString(),
AMQDestination.QUEUE_TYPE));
- }
-
- /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
- private AMQSession<?,?> _session;
- private final long _deliveryTag;
-
- protected AbstractAMQMessageDelegate(long deliveryTag)
- {
- _deliveryTag = deliveryTag;
- }
-
- /**
- * Get the AMQ message number assigned to this message
- *
- * @return the message number
- */
- public long getDeliveryTag()
- {
- return _deliveryTag;
+
}
/**
@@ -179,47 +157,6 @@ public abstract class AbstractAMQMessage
{
return _exchangeMap.containsKey(exchange);
}
-
- public void acknowledgeThis() throws JMSException
- {
- // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
- // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
- if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- if (_session.getAMQConnection().isClosed())
- {
- throw new javax.jms.IllegalStateException("Connection is already closed");
- }
-
- // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
- // received on the session
- _session.acknowledgeMessage(getDeliveryTag(), true);
- }
- }
-
- public void acknowledge() throws JMSException
- {
- if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- _session.acknowledge();
- }
- }
-
- /**
- * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
- * acknowledge()
- *
- * @param s the AMQ session that delivered this message
- */
- public void setAMQSession(AMQSession<?,?> s)
- {
- _session = s;
- }
-
- public AMQSession<?,?> getAMQSession()
- {
- return _session;
- }
}
class ExchangeInfo
@@ -265,5 +202,5 @@ class ExchangeInfo
public void setDestType(int destType)
{
this.destType = destType;
- }
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org