You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/01 11:04:39 UTC
svn commit: r1295492 [3/6] - in
/qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ bdbstore/ bdbstore/bin/
bdbstore/etc/ bdbstore/etc/scripts/ bdbstore/src/ bdbstore/src/main/
bdbstore/src/main/java/ bdbstore/src/main/java/org/
bdbstore/src/main/java/org...
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Mar 1 10:04:31 2012
@@ -27,6 +27,7 @@ import org.apache.qpid.client.protocol.A
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,10 +37,7 @@ import javax.jms.MessageListener;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.SortedSet;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -117,29 +115,10 @@ public abstract class BasicMessageConsum
protected final int _acknowledgeMode;
/**
- * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
- */
- private int _outstanding;
-
- /**
- * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
- * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
- */
- private boolean _dups_ok_acknowledge_send;
-
- /**
* List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
*/
private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
- /** The last tag that was "multiple" acknowledged on this session (if transacted) */
- private long _lastAcked;
-
- /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */
- private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
-
- private final Object _commitLock = new Object();
-
/**
* The thread that was used to call receive(). This is important for being able to interrupt that thread if a
* receive() is in progress.
@@ -289,17 +268,6 @@ public abstract class BasicMessageConsum
}
}
- protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
- {
- if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
- }
-
- _session.setInRecovery(false);
- preDeliver(jmsMsg);
- }
-
/**
* @param immediate if true then return immediately if the connection is failing over
*
@@ -322,14 +290,14 @@ public abstract class BasicMessageConsum
}
}
- if (!_receiving.compareAndSet(false, true))
+ if (isMessageListenerSet())
{
- throw new javax.jms.IllegalStateException("Another thread is already receiving.");
+ throw new javax.jms.IllegalStateException("A listener has already been set.");
}
- if (isMessageListenerSet())
+ if (!_receiving.compareAndSet(false, true))
{
- throw new javax.jms.IllegalStateException("A listener has already been set.");
+ throw new javax.jms.IllegalStateException("Another thread is already receiving.");
}
_receivingThread = Thread.currentThread();
@@ -408,7 +376,7 @@ public abstract class BasicMessageConsum
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
- preApplicationProcessing(m);
+ preDeliver(m);
postDeliver(m);
}
return m;
@@ -419,6 +387,10 @@ public abstract class BasicMessageConsum
return null;
}
+ catch(TransportException e)
+ {
+ throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e);
+ }
finally
{
releaseReceiving();
@@ -477,7 +449,7 @@ public abstract class BasicMessageConsum
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
- preApplicationProcessing(m);
+ preDeliver(m);
postDeliver(m);
}
@@ -489,6 +461,10 @@ public abstract class BasicMessageConsum
return null;
}
+ catch(TransportException e)
+ {
+ throw _session.toJMSException("Exception while receiving:" + e.getMessage(), e);
+ }
finally
{
releaseReceiving();
@@ -582,6 +558,10 @@ public abstract class BasicMessageConsum
{
throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
}
+ catch (TransportException e)
+ {
+ throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e);
+ }
}
}
else
@@ -721,7 +701,7 @@ public abstract class BasicMessageConsum
{
if (isMessageListenerSet())
{
- preApplicationProcessing(jmsMessage);
+ preDeliver(jmsMessage);
getMessageListener().onMessage(jmsMessage);
postDeliver(jmsMessage);
}
@@ -745,49 +725,42 @@ public abstract class BasicMessageConsum
}
}
- void preDeliver(AbstractJMSMessage msg)
+ protected void preDeliver(AbstractJMSMessage msg)
{
+ _session.setInRecovery(false);
+
switch (_acknowledgeMode)
{
-
case Session.PRE_ACKNOWLEDGE:
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
break;
-
+ case Session.AUTO_ACKNOWLEDGE:
+ //fall through
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ _session.addUnacknowledgedMessage(msg.getDeliveryTag());
+ break;
case Session.CLIENT_ACKNOWLEDGE:
// we set the session so that when the user calls acknowledge() it can call the method on session
// to send out the appropriate frame
msg.setAMQSession(_session);
+ _session.addUnacknowledgedMessage(msg.getDeliveryTag());
+ _session.markDirty();
break;
case Session.SESSION_TRANSACTED:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- else
- {
- _session.addDeliveredMessage(msg.getDeliveryTag());
- _session.markDirty();
- }
-
+ _session.addDeliveredMessage(msg.getDeliveryTag());
+ _session.markDirty();
+ break;
+ case Session.NO_ACKNOWLEDGE:
+ //do nothing.
+ //path used for NO-ACK consumers, and browsers (see constructor).
break;
}
-
}
- void postDeliver(AbstractJMSMessage msg) throws JMSException
+ void postDeliver(AbstractJMSMessage msg)
{
switch (_acknowledgeMode)
{
-
- case Session.CLIENT_ACKNOWLEDGE:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- _session.markDirty();
- break;
-
case Session.DUPS_OK_ACKNOWLEDGE:
case Session.AUTO_ACKNOWLEDGE:
// we do not auto ack a message if the application code called recover()
@@ -825,63 +798,6 @@ public abstract class BasicMessageConsum
return null;
}
- /**
- * Acknowledge up to last message delivered (if any). Used when commiting.
- */
- void acknowledgeDelivered()
- {
- synchronized(_commitLock)
- {
- ArrayList<Long> tagsToAck = new ArrayList<Long>();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- tagsToAck.add(_receivedDeliveryTags.poll());
- }
-
- Collections.sort(tagsToAck);
-
- long prevAcked = _lastAcked;
- long oldAckPoint = -1;
-
- while(oldAckPoint != prevAcked)
- {
- oldAckPoint = prevAcked;
-
- Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
-
- while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1)
- {
- tagsToAckIterator.remove();
- prevAcked++;
- }
-
- Iterator<Long> previousAckIterator = _previouslyAcked.iterator();
- while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1)
- {
- previousAckIterator.remove();
- prevAcked++;
- }
-
- }
- if(prevAcked != _lastAcked)
- {
- _session.acknowledgeMessage(prevAcked, true);
- _lastAcked = prevAcked;
- }
-
- Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
-
- while(tagsToAckIterator.hasNext())
- {
- Long tag = tagsToAckIterator.next();
- _session.acknowledgeMessage(tag, false);
- _previouslyAcked.add(tag);
- }
- }
- }
-
-
void notifyError(Throwable cause)
{
// synchronized (_closed)
@@ -960,7 +876,7 @@ public abstract class BasicMessageConsum
public boolean isNoConsume()
{
- return _noConsume || _destination.isBrowseOnly() ;
+ return _noConsume;
}
public void rollback()
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Mar 1 10:04:31 2012
@@ -23,9 +23,7 @@ import org.apache.qpid.client.AMQDestina
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
@@ -68,19 +66,13 @@ public class BasicMessageConsumer_0_10 e
private boolean _preAcquire = true;
/**
- * Indicate whether this consumer is started.
- */
- private boolean _isStarted = false;
-
- /**
* Specify whether this consumer is performing a sync receive
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
private String _consumerTagString;
private long capacity = 0;
-
- //--- constructor
+
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
@@ -106,7 +98,6 @@ public class BasicMessageConsumer_0_10 e
_preAcquire = false;
}
}
- _isStarted = connection.started();
// Destination setting overrides connection defaults
if (destination.getDestSyntax() == DestSyntax.ADDR &&
@@ -174,8 +165,6 @@ public class BasicMessageConsumer_0_10 e
}
}
- //----- overwritten methods
-
/**
* This method is invoked when this consumer is stopped.
* It tells the broker to stop delivering messages to this consumer.
@@ -205,11 +194,18 @@ public class BasicMessageConsumer_0_10 e
super.notifyMessage(messageFrame);
}
- @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+ @Override
+ protected void preDeliver(AbstractJMSMessage jmsMsg)
{
- super.preApplicationProcessing(jmsMsg);
- if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE)
+ super.preDeliver(jmsMsg);
+
+ if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
{
+ //For 0-10 we need to ensure that all messages are indicated processed in some way to
+ //ensure their AMQP command-id is marked completed, and so we must send a completion
+ //even for no-ack messages even though there isnt actually an 'acknowledgement' occurring.
+ //Add message to the unacked message list to ensure we dont lose record of it before
+ //sending a completion of some sort.
_session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
}
}
@@ -221,7 +217,6 @@ public class BasicMessageConsumer_0_10 e
return _messageFactory.createMessage(msg.getMessageTransfer());
}
- // private methods
/**
* Check whether a message can be delivered to this consumer.
*
@@ -365,21 +360,28 @@ public class BasicMessageConsumer_0_10 e
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
super.setMessageListener(messageListener);
- if (messageListener != null && capacity == 0)
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
- }
- if (messageListener != null && !_synchronousQueue.isEmpty())
+ try
{
- Iterator messages=_synchronousQueue.iterator();
- while (messages.hasNext())
+ if (messageListener != null && capacity == 0)
{
- AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
- messages.remove();
- _session.rejectMessage(message, true);
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
+ if (messageListener != null && !_synchronousQueue.isEmpty())
+ {
+ Iterator messages=_synchronousQueue.iterator();
+ while (messages.hasNext())
+ {
+ AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
+ messages.remove();
+ _session.rejectMessage(message, true);
+ }
+ }
+ }
+ catch(TransportException e)
+ {
+ throw _session.toJMSException("Exception while setting message listener:"+ e.getMessage(), e);
}
}
@@ -443,7 +445,7 @@ public class BasicMessageConsumer_0_10 e
return o;
}
- void postDeliver(AbstractJMSMessage msg) throws JMSException
+ void postDeliver(AbstractJMSMessage msg)
{
super.postDeliver(msg);
if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
@@ -452,10 +454,8 @@ public class BasicMessageConsumer_0_10 e
}
if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE &&
- !_session.isInRecovery() &&
- _session.getAMQConnection().getSyncAck())
+ !_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
{
- ((AMQSession_0_10) getSession()).flushAcknowledgments();
((AMQSession_0_10) getSession()).getQpidSession().sync();
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Thu Mar 1 10:04:31 2012
@@ -39,6 +39,7 @@ import org.apache.qpid.client.message.Ab
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
import org.slf4j.Logger;
@@ -113,8 +114,6 @@ public abstract class BasicMessageProduc
private final boolean _mandatory;
- private final boolean _waitUntilSent;
-
private boolean _disableMessageId;
private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
@@ -126,8 +125,7 @@ public abstract class BasicMessageProduc
protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
- boolean waitUntilSent) throws AMQException
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
{
_connection = connection;
_destination = destination;
@@ -143,7 +141,6 @@ public abstract class BasicMessageProduc
_immediate = immediate;
_mandatory = mandatory;
- _waitUntilSent = waitUntilSent;
_userID = connection.getUsername();
setPublishMode();
}
@@ -266,7 +263,7 @@ public abstract class BasicMessageProduc
return _destination;
}
- public void close()
+ public void close() throws JMSException
{
_closed.set(true);
_session.deregisterProducer(_producerId);
@@ -363,19 +360,6 @@ public abstract class BasicMessageProduc
}
}
- public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
- {
- checkPreConditions();
- checkDestination(destination);
- synchronized (_connection.getFailoverMutex())
- {
- validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
- waitUntilSent);
- }
- }
-
private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
{
if (message instanceof AbstractJMSMessage)
@@ -450,12 +434,6 @@ public abstract class BasicMessageProduc
}
}
- protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate) throws JMSException
- {
- sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
- }
-
/**
* The caller of this method must hold the failover mutex.
*
@@ -470,23 +448,13 @@ public abstract class BasicMessageProduc
* @throws JMSException
*/
protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate, boolean wait) throws JMSException
+ boolean mandatory, boolean immediate) throws JMSException
{
checkTemporaryDestination(destination);
origMessage.setJMSDestination(destination);
AbstractJMSMessage message = convertToNativeMessage(origMessage);
- if (_transacted)
- {
- if (_session.hasFailedOver() && _session.isDirty())
- {
- throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.",
- new AMQSessionDirtyException("Failover has occurred and session is dirty " +
- "so unable to send."));
- }
- }
-
UUID messageId = null;
if (_disableMessageId)
{
@@ -498,7 +466,14 @@ public abstract class BasicMessageProduc
message.setJMSMessageID(messageId);
}
- sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
+ try
+ {
+ sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate);
+ }
+ catch (TransportException e)
+ {
+ throw getSession().toJMSException("Exception whilst sending:" + e.getMessage(), e);
+ }
if (message != origMessage)
{
@@ -518,7 +493,7 @@ public abstract class BasicMessageProduc
abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
- boolean immediate, boolean wait) throws JMSException;
+ boolean immediate) throws JMSException;
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
@@ -596,6 +571,13 @@ public abstract class BasicMessageProduc
public boolean isBound(AMQDestination destination) throws JMSException
{
- return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
+ try
+ {
+ return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
+ }
+ catch (TransportException e)
+ {
+ throw getSession().toJMSException("Exception whilst checking destination binding:" + e.getMessage(), e);
+ }
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Thu Mar 1 10:04:31 2012
@@ -37,7 +37,6 @@ import org.apache.qpid.client.message.AM
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.client.messaging.address.Link.Reliability;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
@@ -47,6 +46,7 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,10 +61,9 @@ public class BasicMessageProducer_0_10 e
BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
- boolean immediate, boolean mandatory, boolean waitUntilSent) throws AMQException
+ boolean immediate, boolean mandatory) throws AMQException
{
- super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate,
- mandatory, waitUntilSent);
+ super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
userIDBytes = Strings.toUTF8(_userID);
}
@@ -104,7 +103,7 @@ public class BasicMessageProducer_0_10 e
*/
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
- boolean immediate, boolean wait) throws JMSException
+ boolean immediate) throws JMSException
{
message.prepareForSending();
@@ -246,14 +245,14 @@ public class BasicMessageProducer_0_10 e
}
}
-
+ @Override
public boolean isBound(AMQDestination destination) throws JMSException
{
return _session.isQueueBound(destination);
}
@Override
- public void close()
+ public void close() throws JMSException
{
super.close();
AMQDestination dest = _destination;
@@ -262,10 +261,18 @@ public class BasicMessageProducer_0_10 e
if (dest.getDelete() == AddressOption.ALWAYS ||
dest.getDelete() == AddressOption.SENDER )
{
- ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
+ try
+ {
+ ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
_destination.getQueueName());
+ }
+ catch(TransportException e)
+ {
+ throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+ }
}
}
}
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Thu Mar 1 10:04:31 2012
@@ -45,10 +45,9 @@ public class BasicMessageProducer_0_8 ex
{
BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
- boolean waitUntilSent) throws AMQException
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
{
- super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory,waitUntilSent);
+ super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
}
void declareDestination(AMQDestination destination)
@@ -73,7 +72,7 @@ public class BasicMessageProducer_0_8 ex
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
- boolean immediate, boolean wait) throws JMSException
+ boolean immediate) throws JMSException
{
BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
destination.getExchangeName(),
@@ -168,7 +167,7 @@ public class BasicMessageProducer_0_8 ex
throw jmse;
}
- _protocolHandler.writeFrame(compositeFrame, wait);
+ _protocolHandler.writeFrame(compositeFrame);
}
/**
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java Thu Mar 1 10:04:31 2012
@@ -24,13 +24,16 @@ package org.apache.qpid.client;
import javax.jms.Destination;
import javax.jms.JMSException;
+import org.apache.qpid.framing.AMQShortString;
+
/**
- * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
+ * Provides support for convenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
* so that operations related to their "temporary-ness" can be abstracted out.
*/
interface TemporaryDestination extends Destination
{
+ public AMQShortString getAMQQueueName();
public void delete() throws JMSException;
public AMQSession getSession();
public boolean isDeleted();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Thu Mar 1 10:04:31 2012
@@ -78,7 +78,7 @@ public class ChannelCloseMethodHandler i
{
throw new AMQNoRouteException("Error: " + reason, null, null);
}
- else if (errorCode == AMQConstant.INVALID_ARGUMENT)
+ else if (errorCode == AMQConstant.ARGUMENT_INVALID)
{
_logger.debug("Broker responded with Invalid Argument.");
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Thu Mar 1 10:04:31 2012
@@ -20,6 +20,13 @@
*/
package org.apache.qpid.client.handler;
+import java.io.UnsupportedEncodingException;
+import java.util.StringTokenizer;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.security.AMQCallbackHandler;
@@ -34,18 +41,9 @@ import org.apache.qpid.framing.Connectio
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.ProtocolVersion;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
-import java.io.UnsupportedEncodingException;
-import java.util.HashSet;
-import java.util.StringTokenizer;
-
public class ConnectionStartMethodHandler implements StateAwareMethodListener<ConnectionStartBody>
{
private static final Logger _log = LoggerFactory.getLogger(ConnectionStartMethodHandler.class);
@@ -197,40 +195,20 @@ public class ConnectionStartMethodHandle
private String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException
{
final String mechanisms = new String(availableMechanisms, "utf8");
- StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
- HashSet mechanismSet = new HashSet();
- while (tokenizer.hasMoreTokens())
- {
- mechanismSet.add(tokenizer.nextToken());
- }
-
- String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms();
- StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " ");
- while (prefTokenizer.hasMoreTokens())
- {
- String mech = prefTokenizer.nextToken();
- if (mechanismSet.contains(mech))
- {
- return mech;
- }
- }
-
- return null;
+ return CallbackHandlerRegistry.getInstance().selectMechanism(mechanisms);
}
private AMQCallbackHandler createCallbackHandler(String mechanism, AMQProtocolSession protocolSession)
throws AMQException
{
- Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism);
try
{
- Object instance = mechanismClass.newInstance();
- AMQCallbackHandler cbh = (AMQCallbackHandler) instance;
- cbh.initialise(protocolSession.getAMQConnection().getConnectionURL());
+ AMQCallbackHandler instance = CallbackHandlerRegistry.getInstance().createCallbackHandler(mechanism);
+ instance.initialise(protocolSession.getAMQConnection().getConnectionURL());
- return cbh;
+ return instance;
}
- catch (Exception e)
+ catch (IllegalArgumentException e)
{
throw new AMQException(null, "Unable to create callback handler: " + e, e);
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java Thu Mar 1 10:04:31 2012
@@ -26,9 +26,7 @@ import org.apache.qpid.client.AMQSession
import javax.jms.Destination;
import javax.jms.JMSException;
-import java.nio.ByteBuffer;
import java.util.Enumeration;
-import java.util.Map;
import java.util.UUID;
public interface AMQMessageDelegate
@@ -130,9 +128,9 @@ public interface AMQMessageDelegate
void removeProperty(final String propertyName) throws JMSException;
- void setAMQSession(final AMQSession s);
+ void setAMQSession(final AMQSession<?,?> s);
- AMQSession getAMQSession();
+ AMQSession<?,?> getAMQSession();
long getDeliveryTag();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Thu Mar 1 10:04:31 2012
@@ -37,17 +37,14 @@ import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotWriteableException;
-import javax.jms.Session;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQPInvalidClassException;
import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Message;
-import org.apache.qpid.messaging.Address;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.ExchangeQueryResult;
import org.apache.qpid.transport.Future;
@@ -56,6 +53,7 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,13 +74,8 @@ public class AMQMessageDelegate_0_10 ext
private Destination _destination;
-
private MessageProperties _messageProps;
private DeliveryProperties _deliveryProps;
- /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
- private AMQSession _session;
- private final long _deliveryTag;
-
protected AMQMessageDelegate_0_10()
{
@@ -92,9 +85,9 @@ public class AMQMessageDelegate_0_10 ext
protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag)
{
+ super(deliveryTag);
_messageProps = messageProps;
_deliveryProps = deliveryProps;
- _deliveryTag = deliveryTag;
_readableProperties = (_messageProps != null);
AMQDestination dest;
@@ -205,7 +198,6 @@ public class AMQMessageDelegate_0_10 ext
}
}
-
public long getJMSTimestamp() throws JMSException
{
return _deliveryProps.getTimestamp();
@@ -291,7 +283,7 @@ public class AMQMessageDelegate_0_10 ext
try
{
- return AMQDestination.createDestination("ADDR:" + addr.toString());
+ return AMQDestination.createDestination("ADDR:" + addr);
}
catch(Exception e)
{
@@ -325,14 +317,14 @@ public class AMQMessageDelegate_0_10 ext
{
try
{
- int type = ((AMQSession_0_10)_session).resolveAddressType(amqd);
+ int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd);
if (type == AMQDestination.QUEUE_TYPE)
{
- ((AMQSession_0_10)_session).setLegacyFiledsForQueueType(amqd);
+ ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForQueueType(amqd);
}
else
{
- ((AMQSession_0_10)_session).setLegacyFiledsForTopicType(amqd);
+ ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForTopicType(amqd);
}
}
catch(AMQException ex)
@@ -342,6 +334,14 @@ public class AMQMessageDelegate_0_10 ext
e.setLinkedException(ex);
throw e;
}
+ catch (TransportException e)
+ {
+ JMSException jmse = new JMSException("Exception occured while figuring out the node type:" + e.getMessage());
+ jmse.initCause(e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+
}
final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString());
@@ -897,64 +897,6 @@ public class AMQMessageDelegate_0_10 ext
_readableProperties = false;
}
-
- public void acknowledgeThis() throws JMSException
- {
- // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
- // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
- if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- if (_session.getAMQConnection().isClosed())
- {
- throw new javax.jms.IllegalStateException("Connection is already closed");
- }
-
- // we set multiple to true here since acknowledgment implies acknowledge of all previous messages
- // received on the session
- _session.acknowledgeMessage(_deliveryTag, true);
- }
- }
-
- public void acknowledge() throws JMSException
- {
- if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- _session.acknowledge();
- }
- }
-
-
- /**
- * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
- * acknowledge()
- *
- * @param s the AMQ session that delivered this message
- */
- public void setAMQSession(AMQSession s)
- {
- _session = s;
- }
-
- public AMQSession getAMQSession()
- {
- return _session;
- }
-
- /**
- * Get the AMQ message number assigned to this message
- *
- * @return the message number
- */
- public long getDeliveryTag()
- {
- return _deliveryTag;
- }
-
-
-
-
-
-
protected void checkPropertyName(CharSequence propertyName)
{
if (propertyName == null)
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Thu Mar 1 10:04:31 2012
@@ -30,7 +30,6 @@ import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageNotWriteableException;
-import javax.jms.Session;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
@@ -60,15 +59,12 @@ public class AMQMessageDelegate_0_8 exte
Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
private ContentHeaderProperties _contentHeaderProperties;
- /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
- private AMQSession _session;
- private final long _deliveryTag;
// The base set of items that needs to be set.
private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
{
+ super(deliveryTag);
_contentHeaderProperties = properties;
- _deliveryTag = deliveryTag;
_readableProperties = (_contentHeaderProperties != null);
_headerAdapter = new JMSHeaderAdapter(_readableProperties ? ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()
: (new BasicContentHeaderProperties()).getHeaders() );
@@ -518,58 +514,4 @@ public class AMQMessageDelegate_0_8 exte
_readableProperties = false;
}
-
-
- public void acknowledgeThis() throws JMSException
- {
- // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
- // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
- if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- if (_session.getAMQConnection().isClosed())
- {
- throw new javax.jms.IllegalStateException("Connection is already closed");
- }
-
- // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
- // received on the session
- _session.acknowledgeMessage(_deliveryTag, true);
- }
- }
-
- public void acknowledge() throws JMSException
- {
- if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- _session.acknowledge();
- }
- }
-
-
- /**
- * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
- * acknowledge()
- *
- * @param s the AMQ session that delivered this message
- */
- public void setAMQSession(AMQSession s)
- {
- _session = s;
- }
-
- public AMQSession getAMQSession()
- {
- return _session;
- }
-
- /**
- * Get the AMQ message number assigned to this message
- *
- * @return the message number
- */
- public long getDeliveryTag()
- {
- return _deliveryTag;
- }
-
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java Thu Mar 1 10:04:31 2012
@@ -23,9 +23,13 @@ package org.apache.qpid.client.message;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -78,7 +82,25 @@ public abstract class AbstractAMQMessage
new ExchangeInfo(ExchangeDefaults.HEADERS_EXCHANGE_NAME.toString(),
ExchangeDefaults.HEADERS_EXCHANGE_CLASS.toString(),
AMQDestination.QUEUE_TYPE));
-
+ }
+
+ /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
+ private AMQSession<?,?> _session;
+ private final long _deliveryTag;
+
+ protected AbstractAMQMessageDelegate(long deliveryTag)
+ {
+ _deliveryTag = deliveryTag;
+ }
+
+ /**
+ * Get the AMQ message number assigned to this message
+ *
+ * @return the message number
+ */
+ public long getDeliveryTag()
+ {
+ return _deliveryTag;
}
/**
@@ -157,6 +179,47 @@ public abstract class AbstractAMQMessage
{
return _exchangeMap.containsKey(exchange);
}
+
+ public void acknowledgeThis() throws JMSException
+ {
+ // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+ // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+ if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ if (_session.getAMQConnection().isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Connection is already closed");
+ }
+
+ // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+ // received on the session
+ _session.acknowledgeMessage(getDeliveryTag(), true);
+ }
+ }
+
+ public void acknowledge() throws JMSException
+ {
+ if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ _session.acknowledge();
+ }
+ }
+
+ /**
+ * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+ * acknowledge()
+ *
+ * @param s the AMQ session that delivered this message
+ */
+ public void setAMQSession(AMQSession<?,?> s)
+ {
+ _session = s;
+ }
+
+ public AMQSession<?,?> getAMQSession()
+ {
+ return _session;
+ }
}
class ExchangeInfo
@@ -202,5 +265,5 @@ class ExchangeInfo
public void setDestType(int destType)
{
this.destType = destType;
- }
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Mar 1 10:04:31 2012
@@ -30,7 +30,6 @@ import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.AMQConnectionClosedException;
@@ -47,6 +46,7 @@ import org.apache.qpid.client.state.AMQS
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
@@ -65,7 +65,6 @@ import org.apache.qpid.protocol.Protocol
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.NetworkTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -163,7 +162,9 @@ public class AMQProtocolHandler implemen
private FailoverException _lastFailoverException;
/** Defines the default timeout to use for synchronous protocol commands. */
- private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30);
+ private final long DEFAULT_SYNC_TIMEOUT = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
+ Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
+ ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
/** Object to lock on when changing the latch */
private Object _failoverLatchChange = new Object();
@@ -513,18 +514,7 @@ public class AMQProtocolHandler implemen
return getStateManager().createWaiter(states);
}
- /**
- * Convenience method that writes a frame to the protocol session. Equivalent to calling
- * getProtocolSession().write().
- *
- * @param frame the frame to write
- */
- public void writeFrame(AMQDataBlock frame)
- {
- writeFrame(frame, false);
- }
-
- public synchronized void writeFrame(AMQDataBlock frame, boolean wait)
+ public synchronized void writeFrame(AMQDataBlock frame)
{
final ByteBuffer buf = asByteBuffer(frame);
_writtenBytes += buf.remaining();
@@ -675,22 +665,21 @@ public class AMQProtocolHandler implemen
* <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed
* anyway.
*
- * @param timeout The timeout to wait for an acknowledgement to the close request.
+ * @param timeout The timeout to wait for an acknowledgment to the close request.
*
* @throws AMQException If the close fails for any reason.
*/
public void closeConnection(long timeout) throws AMQException
{
- ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection."), 0, 0);
-
- final AMQFrame frame = body.generateFrame(0);
-
- //If the connection is already closed then don't do a syncWrite
if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
{
+ // Connection is already closed then don't do a syncWrite
try
{
+ final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client is closing the connection."), 0, 0);
+ final AMQFrame frame = body.generateFrame(0);
+
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
_network.close();
closed();
@@ -701,10 +690,9 @@ public class AMQProtocolHandler implemen
}
catch (FailoverException e)
{
- _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
+ _logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway.");
}
}
-
}
/** @return the number of bytes read from this protocol session */
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Mar 1 10:04:31 2012
@@ -20,27 +20,35 @@
*/
package org.apache.qpid.client.protocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import javax.jms.JMSException;
import javax.security.sasl.SaslClient;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.Sender;
-import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
@@ -289,22 +297,11 @@ public class AMQProtocolSession implemen
return _connection.getSession(channelId);
}
- /**
- * Convenience method that writes a frame to the protocol session. Equivalent to calling
- * getProtocolSession().write().
- *
- * @param frame the frame to write
- */
public void writeFrame(AMQDataBlock frame)
{
_protocolHandler.writeFrame(frame);
}
- public void writeFrame(AMQDataBlock frame, boolean wait)
- {
- _protocolHandler.writeFrame(frame, wait);
- }
-
/**
* Starts the process of closing a session
*
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java Thu Mar 1 10:04:31 2012
@@ -20,17 +20,22 @@
*/
package org.apache.qpid.client.security;
-import org.apache.qpid.util.FileUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.io.InputStream;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.TreeMap;
+
+import org.apache.qpid.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* CallbackHandlerRegistry is a registry for call back handlers for user authentication and interaction during user
@@ -42,7 +47,7 @@ import java.util.Properties;
* "amp.callbackhandler.properties". The format of the properties file is:
*
* <p/><pre>
- * CallbackHanlder.mechanism=fully.qualified.class.name
+ * CallbackHanlder.n.mechanism=fully.qualified.class.name where n is an ordinal
* </pre>
*
* <p/>Where mechanism is an IANA-registered mechanism name and the fully qualified class name refers to a
@@ -66,51 +71,15 @@ public class CallbackHandlerRegistry
public static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/client/security/CallbackHandlerRegistry.properties";
/** A static reference to the singleton instance of this registry. */
- private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry();
+ private static final CallbackHandlerRegistry _instance;
/** Holds a map from SASL mechanism names to call back handlers. */
- private Map<String, Class> _mechanismToHandlerClassMap = new HashMap<String, Class>();
-
- /** Holds a space delimited list of mechanisms that callback handlers exist for. */
- private String _mechanisms;
-
- /**
- * Gets the singleton instance of this registry.
- *
- * @return The singleton instance of this registry.
- */
- public static CallbackHandlerRegistry getInstance()
- {
- return _instance;
- }
+ private Map<String, Class<AMQCallbackHandler>> _mechanismToHandlerClassMap = new HashMap<String, Class<AMQCallbackHandler>>();
- /**
- * Gets the callback handler class for a given SASL mechanism name.
- *
- * @param mechanism The SASL mechanism name.
- *
- * @return The callback handler class for the mechanism, or null if none is configured for that mechanism.
- */
- public Class getCallbackHandlerClass(String mechanism)
- {
- return (Class) _mechanismToHandlerClassMap.get(mechanism);
- }
+ /** Ordered collection of mechanisms for which callback handlers exist. */
+ private Collection<String> _mechanisms;
- /**
- * Gets a space delimited list of supported SASL mechanisms.
- *
- * @return A space delimited list of supported SASL mechanisms.
- */
- public String getMechanisms()
- {
- return _mechanisms;
- }
-
- /**
- * Creates the call back handler registry from its configuration resource or file. This also has the side effect
- * of configuring and registering the SASL client factory implementations using {@link DynamicSaslRegistrar}.
- */
- private CallbackHandlerRegistry()
+ static
{
// Register any configured SASL client factories.
DynamicSaslRegistrar.registerSaslProviders();
@@ -120,12 +89,12 @@ public class CallbackHandlerRegistry
FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME,
CallbackHandlerRegistry.class.getClassLoader());
+ final Properties props = new Properties();
+
try
{
- Properties props = new Properties();
+
props.load(is);
- parseProperties(props);
- _logger.info("Callback handlers available for SASL mechanisms: " + _mechanisms);
}
catch (IOException e)
{
@@ -146,32 +115,68 @@ public class CallbackHandlerRegistry
}
}
}
+
+ _instance = new CallbackHandlerRegistry(props);
+ _logger.info("Callback handlers available for SASL mechanisms: " + _instance._mechanisms);
+
}
- /*private InputStream openPropertiesInputStream(String filename)
+ /**
+ * Gets the singleton instance of this registry.
+ *
+ * @return The singleton instance of this registry.
+ */
+ public static CallbackHandlerRegistry getInstance()
+ {
+ return _instance;
+ }
+
+ public AMQCallbackHandler createCallbackHandler(final String mechanism)
{
- boolean useDefault = true;
- InputStream is = null;
- if (filename != null)
+ final Class<AMQCallbackHandler> mechanismClass = _mechanismToHandlerClassMap.get(mechanism);
+
+ if (mechanismClass == null)
{
- try
- {
- is = new BufferedInputStream(new FileInputStream(new File(filename)));
- useDefault = false;
- }
- catch (FileNotFoundException e)
- {
- _logger.error("Unable to read from file " + filename + ": " + e, e);
- }
+ throw new IllegalArgumentException("Mechanism " + mechanism + " not known");
}
- if (useDefault)
+ try
+ {
+ return mechanismClass.newInstance();
+ }
+ catch (InstantiationException e)
+ {
+ throw new IllegalArgumentException("Unable to create an instance of mechanism " + mechanism, e);
+ }
+ catch (IllegalAccessException e)
{
- is = CallbackHandlerRegistry.class.getResourceAsStream(DEFAULT_RESOURCE_NAME);
+ throw new IllegalArgumentException("Unable to create an instance of mechanism " + mechanism, e);
}
+ }
- return is;
- }*/
+ /**
+ * Gets collections of supported SASL mechanism names, ordered by preference
+ *
+ * @return collection of SASL mechanism names.
+ */
+ public Collection<String> getMechanisms()
+ {
+ return Collections.unmodifiableCollection(_mechanisms);
+ }
+
+ /**
+ * Creates the call back handler registry from its configuration resource or file.
+ *
+ * This also has the side effect of configuring and registering the SASL client factory
+ * implementations using {@link DynamicSaslRegistrar}.
+ *
+ * This constructor is default protection to allow for effective unit testing. Clients must use
+ * {@link #getInstance()} to obtain the singleton instance.
+ */
+ CallbackHandlerRegistry(final Properties props)
+ {
+ parseProperties(props);
+ }
/**
* Scans the specified properties as a mapping from IANA registered SASL mechanism to call back handler
@@ -183,20 +188,20 @@ public class CallbackHandlerRegistry
*/
private void parseProperties(Properties props)
{
+
+ final Map<Integer, String> mechanisms = new TreeMap<Integer, String>();
+
Enumeration e = props.propertyNames();
while (e.hasMoreElements())
{
- String propertyName = (String) e.nextElement();
- int period = propertyName.indexOf(".");
- if (period < 0)
- {
- _logger.warn("Unable to parse property " + propertyName + " when configuring SASL providers");
+ final String propertyName = (String) e.nextElement();
+ final String[] parts = propertyName.split("\\.", 2);
- continue;
- }
+ checkPropertyNameFormat(propertyName, parts);
- String mechanism = propertyName.substring(period + 1);
- String className = props.getProperty(propertyName);
+ final String mechanism = parts[0];
+ final int ordinal = getPropertyOrdinal(propertyName, parts);
+ final String className = props.getProperty(propertyName);
Class clazz = null;
try
{
@@ -205,20 +210,11 @@ public class CallbackHandlerRegistry
{
_logger.warn("SASL provider " + clazz + " does not implement " + AMQCallbackHandler.class
+ ". Skipping");
-
continue;
}
-
_mechanismToHandlerClassMap.put(mechanism, clazz);
- if (_mechanisms == null)
- {
- _mechanisms = mechanism;
- }
- else
- {
- // one time cost
- _mechanisms = _mechanisms + " " + mechanism;
- }
+
+ mechanisms.put(ordinal, mechanism);
}
catch (ClassNotFoundException ex)
{
@@ -227,5 +223,91 @@ public class CallbackHandlerRegistry
continue;
}
}
+
+ _mechanisms = mechanisms.values(); // order guaranteed by keys of treemap (i.e. our ordinals)
+
+
+ }
+
+ private void checkPropertyNameFormat(final String propertyName, final String[] parts)
+ {
+ if (parts.length != 2)
+ {
+ throw new IllegalArgumentException("Unable to parse property " + propertyName + " when configuring SASL providers");
+ }
+ }
+
+ private int getPropertyOrdinal(final String propertyName, final String[] parts)
+ {
+ try
+ {
+ return Integer.parseInt(parts[1]);
+ }
+ catch(NumberFormatException nfe)
+ {
+ throw new IllegalArgumentException("Unable to parse property " + propertyName + " when configuring SASL providers", nfe);
+ }
+ }
+
+ /**
+ * Selects a SASL mechanism that is mutually available to both parties. If more than one
+ * mechanism is mutually available the one appearing first (by ordinal) will be returned.
+ *
+ * @param peerMechanismList space separated list of mechanisms
+ * @return selected mechanism, or null if none available
+ */
+ public String selectMechanism(final String peerMechanismList)
+ {
+ final Set<String> peerList = mechListToSet(peerMechanismList);
+
+ return selectMechInternal(peerList, Collections.<String>emptySet());
+ }
+
+ /**
+ * Selects a SASL mechanism that is mutually available to both parties.
+ *
+ * @param peerMechanismList space separated list of mechanisms
+ * @param restrictionList space separated list of mechanisms
+ * @return selected mechanism, or null if none available
+ */
+ public String selectMechanism(final String peerMechanismList, final String restrictionList)
+ {
+ final Set<String> peerList = mechListToSet(peerMechanismList);
+ final Set<String> restrictionSet = mechListToSet(restrictionList);
+
+ return selectMechInternal(peerList, restrictionSet);
+ }
+
+ private String selectMechInternal(final Set<String> peerSet, final Set<String> restrictionSet)
+ {
+ for (final String mech : _mechanisms)
+ {
+ if (peerSet.contains(mech))
+ {
+ if (restrictionSet.isEmpty() || restrictionSet.contains(mech))
+ {
+ return mech;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private Set<String> mechListToSet(final String mechanismList)
+ {
+ if (mechanismList == null)
+ {
+ return Collections.emptySet();
+ }
+
+ final StringTokenizer tokenizer = new StringTokenizer(mechanismList, " ");
+ final Set<String> mechanismSet = new HashSet<String>(tokenizer.countTokens());
+ while (tokenizer.hasMoreTokens())
+ {
+ mechanismSet.add(tokenizer.nextToken());
+ }
+ return Collections.unmodifiableSet(mechanismSet);
}
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties Thu Mar 1 10:04:31 2012
@@ -16,7 +16,17 @@
# specific language governing permissions and limitations
# under the License.
#
-CallbackHandler.CRAM-MD5-HASHED=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
-CallbackHandler.CRAM-MD5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-CallbackHandler.AMQPLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-CallbackHandler.PLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+
+#
+# Format:
+# <mechanism name>.ordinal=<implementation>
+#
+# @see CallbackHandlerRegistry
+#
+
+EXTERNAL.1=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+GSSAPI.2=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+CRAM-MD5-HASHED.3=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
+CRAM-MD5.4=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+AMQPLAIN.5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+PLAIN.6=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Thu Mar 1 10:04:31 2012
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.io.IOException;
/**
* The state manager is responsible for managing the state of the protocol session. <p/>
@@ -48,7 +47,7 @@ import java.io.IOException;
*
* The two step process is required as there is an inherit race condition between starting a process that will cause
* the state to change and then attempting to wait for that change. The interest in the change must be first set up so
- * that any asynchrous errors that occur can be delivered to the correct waiters.
+ * that any asynchronous errors that occur can be delivered to the correct waiters.
*/
public class AMQStateManager implements AMQMethodListener
{
@@ -84,7 +83,10 @@ public class AMQStateManager implements
public AMQState getCurrentState()
{
- return _currentState;
+ synchronized (_stateLock)
+ {
+ return _currentState;
+ }
}
public void changeState(AMQState newState)
@@ -114,7 +116,7 @@ public class AMQStateManager implements
}
/**
- * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted.
+ * Setting of the ProtocolSession will be required when Failover has been successfully completed.
*
* The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the
* connection to the network.
@@ -131,9 +133,9 @@ public class AMQStateManager implements
}
/**
- * Propogate error to waiters
+ * Propagate error to waiters
*
- * @param error The error to propogate.
+ * @param error The error to propagate.
*/
public void error(Exception error)
{
@@ -177,7 +179,7 @@ public class AMQStateManager implements
}
/**
- * Create and add a new waiter to the notifcation list.
+ * Create and add a new waiter to the notification list.
*
* @param states The waiter will attempt to wait for one of these desired set states to be achived.
*
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java Thu Mar 1 10:04:31 2012
@@ -34,7 +34,7 @@ import java.util.Set;
*
* On construction the current state and a set of States to await for is provided.
*
- * When await() is called the state at constuction is compared against the awaitStates. If the state at construction is
+ * When await() is called the state at construction is compared against the awaitStates. If the state at construction is
* a desired state then await() returns immediately.
*
* Otherwise it will block for the set timeout for a desired state to be achieved.
@@ -48,9 +48,9 @@ public class StateWaiter extends Blockin
{
private static final Logger _logger = LoggerFactory.getLogger(StateWaiter.class);
- Set<AMQState> _awaitStates;
- private AMQState _startState;
- private AMQStateManager _stateManager;
+ private final Set<AMQState> _awaitStates;
+ private final AMQState _startState;
+ private final AMQStateManager _stateManager;
/**
*
@@ -78,9 +78,9 @@ public class StateWaiter extends Blockin
}
/**
- * Await for the requried State to be achieved within the default timeout.
+ * Await for the required State to be achieved within the default timeout.
* @return The achieved state that was requested.
- * @throws AMQException The exception that prevented the required state from being achived.
+ * @throws AMQException The exception that prevented the required state from being achieved.
*/
public AMQState await() throws AMQException
{
@@ -88,13 +88,13 @@ public class StateWaiter extends Blockin
}
/**
- * Await for the requried State to be achieved.
+ * Await for the required State to be achieved.
*
* <b>It is the responsibility of this class to remove the waiter from the StateManager
*
- * @param timeout The time in milliseconds to wait for any of the states to be achived.
+ * @param timeout The time in milliseconds to wait for any of the states to be achieved.
* @return The achieved state that was requested.
- * @throws AMQException The exception that prevented the required state from being achived.
+ * @throws AMQException The exception that prevented the required state from being achieved.
*/
public AMQState await(long timeout) throws AMQException
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java Thu Mar 1 10:04:31 2012
@@ -28,9 +28,8 @@ import java.util.concurrent.locks.Reentr
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* BlockingWaiter is a 'rendezvous' which delegates handling of
@@ -64,6 +63,8 @@ import org.apache.qpid.protocol.AMQMetho
*/
public abstract class BlockingWaiter<T>
{
+ private static final Logger _logger = LoggerFactory.getLogger(BlockingWaiter.class);
+
/** This flag is used to indicate that the blocked for method has been received. */
private volatile boolean _ready = false;
@@ -180,7 +181,7 @@ public abstract class BlockingWaiter<T>
}
catch (InterruptedException e)
{
- System.err.println(e.getMessage());
+ _logger.error(e.getMessage(), e);
// IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
// if (!_ready && timeout != -1)
// {
@@ -228,12 +229,12 @@ public abstract class BlockingWaiter<T>
}
/**
- * This is a callback, called when an error has occured that should interupt any waiter.
+ * This is a callback, called when an error has occurred that should interrupt any waiter.
* It is also called from within this class to avoid code repetition but it should only be called by the MINA threads.
*
* Once closed any notification of an exception will be ignored.
*
- * @param e The exception being propogated.
+ * @param e The exception being propagated.
*/
public void error(Exception e)
{
@@ -255,7 +256,7 @@ public abstract class BlockingWaiter<T>
}
else
{
- System.err.println("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
+ _logger.error("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
}
if (_waiting.get())
@@ -272,7 +273,7 @@ public abstract class BlockingWaiter<T>
}
catch (InterruptedException e1)
{
- System.err.println(e.getMessage());
+ _logger.error(e1.getMessage(), e1);
}
}
_errorAck = false;
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java Thu Mar 1 10:04:31 2012
@@ -51,7 +51,4 @@ public interface MessageProducer extends
int priority, long timeToLive, boolean mandatory, boolean immediate)
throws JMSException;
- void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException;
-
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java Thu Mar 1 10:04:31 2012
@@ -73,7 +73,7 @@ public class ChannelCloseMethodHandlerNo
{
throw new AMQNoRouteException("Error: " + reason, null, null);
}
- else if (errorCode == AMQConstant.INVALID_ARGUMENT)
+ else if (errorCode == AMQConstant.ARGUMENT_INVALID)
{
_logger.debug("Broker responded with Invalid Argument.");
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Thu Mar 1 10:04:31 2012
@@ -20,17 +20,24 @@
*/
package org.apache.qpid.test.unit.message;
-import org.apache.qpid.client.*;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.TemporaryQueue;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageConsumer_0_8;
+import org.apache.qpid.client.BasicMessageProducer_0_8;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-
-import javax.jms.*;
-
-import java.util.Map;
public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
@@ -57,7 +64,12 @@ public class TestAMQSession extends AMQS
}
- public void sendCommit() throws AMQException, FailoverException
+ public void commitImpl() throws AMQException, FailoverException
+ {
+
+ }
+
+ public void acknowledgeImpl()
{
}
@@ -117,7 +129,7 @@ public class TestAMQSession extends AMQS
}
- public BasicMessageProducer_0_8 createMessageProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent, long producerId)
+ public BasicMessageProducer_0_8 createMessageProducer(Destination destination, boolean mandatory, boolean immediate, long producerId)
{
return null;
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common.xml
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common.xml?rev=1295492&r1=1295491&r2=1295492&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common.xml (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common.xml Thu Mar 1 10:04:31 2012
@@ -42,7 +42,6 @@
<property name="build.report" location="${build}/report"/>
<property name="build.release" location="${build}/release"/>
<property name="build.release.prepare" location="${build.release}/prepare"/>
- <property name="build.data" location="${build.scratch}/data"/>
<property name="build.plugins" location="${build}/lib/plugins"/>
<property name="build.coveragereport" location="${build}/coverage"/>
<property name="build.findbugs" location="${build}/findbugs"/>
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org