You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/03/11 00:11:10 UTC
svn commit: r752300 [9/12] - in /qpid/branches/qpid-1673/qpid: cpp/
cpp/examples/ cpp/examples/direct/ cpp/examples/failover/
cpp/examples/fanout/ cpp/examples/pub-sub/ cpp/examples/qmf-console/
cpp/examples/request-response/ cpp/examples/tradedemo/ cp...
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue Mar 10 23:10:57 2009
@@ -135,7 +135,7 @@
{
try
{
- flushAcknowledgments();
+ flushAcknowledgments(true);
}
catch (Throwable t)
{
@@ -236,12 +236,17 @@
void flushAcknowledgments()
{
+ flushAcknowledgments(false);
+ }
+
+ void flushAcknowledgments(boolean setSyncBit)
+ {
synchronized (unacked)
{
if (unackedCount > 0)
{
messageAcknowledge
- (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit);
clearUnacked();
}
}
@@ -249,6 +254,11 @@
void messageAcknowledge(RangeSet ranges, boolean accept)
{
+ messageAcknowledge(ranges,accept,false);
+ }
+
+ void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
+ {
Session ssn = getQpidSession();
for (Range range : ranges)
{
@@ -257,7 +267,7 @@
ssn.flushProcessed(accept ? BATCH : NONE);
if (accept)
{
- ssn.messageAccept(ranges, UNRELIABLE);
+ ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
}
}
@@ -272,7 +282,8 @@
* @param arguments 0_8 specific
*/
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
- final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination)
+ final FieldTable arguments, final AMQShortString exchangeName,
+ final AMQDestination destination, final boolean nowait)
throws AMQException, FailoverException
{
Map args = FiledTableSupport.convertToMap(arguments);
@@ -287,9 +298,12 @@
_logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString());
getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args);
}
- // We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ if (!nowait)
+ {
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
}
@@ -501,18 +515,24 @@
{
getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW);
}
- getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
+ getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
+ Option.UNRELIABLE);
// We need to sync so that we get notify of an error.
// only if not immediat prefetch
- if(prefetch() && (consumer.isStrated() || _immediatePrefetch))
+ if(prefetch() && (isStarted() || _immediatePrefetch))
{
// set the flow
getQpidSession().messageFlow(consumerTag,
MessageCreditUnit.MESSAGE,
- getAMQConnection().getMaxPrefetch());
+ getAMQConnection().getMaxPrefetch(),
+ Option.UNRELIABLE);
+ }
+
+ if (!nowait)
+ {
+ getQpidSession().sync();
+ getCurrentException();
}
- getQpidSession().sync();
- getCurrentException();
}
/**
@@ -540,14 +560,18 @@
null,
name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE);
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ if (!nowait)
+ {
+ getQpidSession().sync();
+ getCurrentException();
+ }
}
/**
* Declare a queue with the given queueName
*/
- public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
+ public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ final boolean nowait)
throws AMQException, FailoverException
{
// do nothing this is only used by 0_8
@@ -557,7 +581,7 @@
* Declare a queue with the given queueName
*/
public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal)
+ final boolean noLocal, final boolean nowait)
throws AMQException, FailoverException
{
AMQShortString res;
@@ -581,9 +605,12 @@
amqd.isDurable() ? Option.DURABLE : Option.NONE,
!amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
// passive --> false
- // We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ if (!nowait)
+ {
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
return res;
}
@@ -609,7 +636,8 @@
{
for (BasicMessageConsumer consumer : _consumers.values())
{
- getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()));
+ getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+ Option.UNRELIABLE);
}
}
else
@@ -625,17 +653,20 @@
if (consumer.getMessageListener() != null)
{
getQpidSession().messageFlow(consumerTag,
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
else
{
getQpidSession()
.messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
- getAMQConnection().getMaxPrefetch());
+ getAMQConnection().getMaxPrefetch(),
+ Option.UNRELIABLE);
}
getQpidSession()
- .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
+ .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
+ Option.UNRELIABLE);
}
catch (Exception e)
{
@@ -700,6 +731,19 @@
public void opened(Session ssn) {}
+ public void resumed(Session ssn)
+ {
+ _qpidConnection = ssn.getConnection();
+ try
+ {
+ resubscribe();
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
public void message(Session ssn, MessageTransfer xfr)
{
messageReceived(new UnprocessedMessage_0_10(xfr));
@@ -716,7 +760,7 @@
public void closed(Session ssn) {}
protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal)
+ final boolean noLocal, final boolean nowait)
throws AMQException
{
/*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
@@ -736,34 +780,11 @@
amqd.setQueueName(new AMQShortString( binddingKey + "@"
+ amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
}
- return send0_10QueueDeclare(amqd, protocolHandler, noLocal);
+ return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait);
}
}, _connection).execute();
}
-
- void start() throws AMQException
- {
- super.start();
- for(BasicMessageConsumer c: _consumers.values())
- {
- c.start();
- }
- }
-
-
- void stop() throws AMQException
- {
- super.stop();
- for(BasicMessageConsumer c: _consumers.values())
- {
- c.stop();
- }
- }
-
-
-
-
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Tue Mar 10 23:10:57 2009
@@ -106,7 +106,8 @@
}
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName, final AMQDestination dest) throws AMQException, FailoverException
+ final AMQShortString exchangeName, final AMQDestination dest,
+ final boolean nowait) throws AMQException, FailoverException
{
getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
(getTicket(),queueName,exchangeName,routingKey,false,arguments).
@@ -300,13 +301,14 @@
{
ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,
name.toString().startsWith("amq."),
- false,false,false,nowait,null);
+ false,false,false,false,null);
AMQFrame exchangeDeclare = body.generateFrame(_channelId);
protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
- public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
+ public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ final boolean nowait) throws AMQException, FailoverException
{
QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Mar 10 23:10:57 2009
@@ -441,7 +441,9 @@
o = _synchronousQueue.take();
}
return o;
- }
+ }
+
+ abstract Message receiveBrowse() throws JMSException;
public Message receiveNoWait() throws JMSException
{
@@ -1037,23 +1039,6 @@
_synchronousQueue.clear();
}
- public void start()
- {
- // do nothing as this is a 0_10 feature
- }
-
-
- public void stop()
- {
- // do nothing as this is a 0_10 feature
- }
-
- public boolean isStrated()
- {
- // do nothing as this is a 0_10 feature
- return false;
- }
-
public AMQShortString getQueuename()
{
return _queuename;
@@ -1070,10 +1055,13 @@
}
/** to be called when a failover has occured */
- public void failedOver()
+ public void failedOverPre()
{
clearReceiveQueue();
// TGM FIXME: think this should just be removed
// clearUnackedMessages();
}
+
+ public void failedOverPost() {}
+
}
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Mar 10 23:10:57 2009
@@ -31,6 +31,7 @@
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -148,7 +149,8 @@
if (isMessageListenerSet() && ! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
_logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage);
@@ -246,7 +248,8 @@
if(! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
// now we need to acquire this message if needed
@@ -258,9 +261,7 @@
_logger.debug("filterMessage - trying to acquire message");
}
messageOk = acquireMessage(message);
- _logger.debug("filterMessage - *************************************");
_logger.debug("filterMessage - message acquire status : " + messageOk);
- _logger.debug("filterMessage - *************************************");
}
return messageOk;
}
@@ -335,7 +336,8 @@
if (messageListener != null && ! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
if (messageListener != null && !_synchronousQueue.isEmpty())
{
@@ -349,26 +351,16 @@
}
}
- public boolean isStrated()
+ public void failedOverPost()
{
- return _isStarted;
- }
-
- public void start()
- {
- _isStarted = true;
- if (_syncReceive.get())
+ if (_0_10session.isStarted() && _syncReceive.get())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
- public void stop()
- {
- _isStarted = false;
- }
-
/**
* When messages are not prefetched we need to request a message from the
* broker.
@@ -380,16 +372,35 @@
*/
public Object getMessageFromQueue(long l) throws InterruptedException
{
- if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
- }
if (! getSession().prefetch())
{
_syncReceive.set(true);
}
+ if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+ }
Object o = super.getMessageFromQueue(l);
+ if (o == null && _0_10session.isStarted())
+ {
+ _0_10session.getQpidSession().messageFlush
+ (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC);
+ _0_10session.getQpidSession().sync();
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.BYTE,
+ 0xFFFFFFFF, Option.UNRELIABLE);
+ if (getSession().prefetch())
+ {
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.MESSAGE,
+ _0_10session.getAMQConnection().getMaxPrefetch(),
+ Option.UNRELIABLE);
+ }
+ _0_10session.syncDispatchQueue();
+ o = super.getMessageFromQueue(-1);
+ }
if (! getSession().prefetch())
{
_syncReceive.set(false);
@@ -404,6 +415,19 @@
{
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
}
+
+ if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE &&
+ !_session.isInRecovery() &&
+ _session.getAMQConnection().getSyncAck())
+ {
+ ((AMQSession_0_10) getSession()).flushAcknowledgments();
+ ((AMQSession_0_10) getSession()).getQpidSession().sync();
+ }
+ }
+
+ Message receiveBrowse() throws JMSException
+ {
+ return receiveNoWait();
}
}
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Tue Mar 10 23:10:57 2009
@@ -22,6 +22,7 @@
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import javax.jms.Message;
import org.apache.qpid.AMQException;
import org.apache.qpid.QpidException;
@@ -38,9 +39,9 @@
protected final Logger _logger = LoggerFactory.getLogger(getClass());
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
- String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
+ String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
{
super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive,
@@ -73,13 +74,18 @@
}
}
- public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
- {
+ public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
+ {
return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
- messageFrame.isRedelivered(), messageFrame.getExchange(),
- messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+ messageFrame.isRedelivered(), messageFrame.getExchange(),
+ messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+
+ }
+ Message receiveBrowse() throws JMSException
+ {
+ return receive();
}
}
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Tue Mar 10 23:10:57 2009
@@ -46,6 +46,8 @@
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
+ enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
+
protected final Logger _logger = LoggerFactory.getLogger(getClass());
private AMQConnection _connection;
@@ -120,6 +122,8 @@
protected String _userID; // ref user id used in the connection.
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
+
+ protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
@@ -141,6 +145,26 @@
_mandatory = mandatory;
_waitUntilSent = waitUntilSent;
_userID = connection.getUsername();
+ setPublishMode();
+ }
+
+ void setPublishMode()
+ {
+ // Publish mode could be configured at destination level as well.
+ // Will add support for this when we provide a more robust binding URL
+
+ String syncPub = _connection.getSyncPublish();
+ // Support for deprecated option sync_persistence
+ if (syncPub.equals("persistent") || _connection.getSyncPersistence())
+ {
+ publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT;
+ }
+ else if (syncPub.equals("all"))
+ {
+ publishMode = PublishMode.SYNC_PUBLISH_ALL;
+ }
+
+ _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode);
}
void resubscribe() throws AMQException
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Tue Mar 10 23:10:57 2009
@@ -151,9 +151,13 @@
((AMQSession_0_10) getSession()).getQpidSession();
// if true, we need to sync the delivery of this message
- boolean sync = (deliveryMode == DeliveryMode.PERSISTENT &&
- getSession().getAMQConnection().getSyncPersistence());
+ boolean sync = false;
+ sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) ||
+ (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT &&
+ deliveryMode == DeliveryMode.PERSISTENT)
+ );
+
org.apache.mina.common.ByteBuffer data = message.getData();
ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java Tue Mar 10 23:10:57 2009
@@ -47,6 +47,19 @@
*/
public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence";
+ /**
+ * When true a sync command is sent after sending a message ack.
+ * type: boolean
+ */
+ public static final String SYNC_ACK_PROP_NAME = "sync_ack";
+
+ /**
+ * sync_publish property - {persistent|all}
+ * If set to 'persistent',then persistent messages will be publish synchronously
+ * If set to 'all', then all messages regardless of the delivery mode will be
+ * published synchronously.
+ */
+ public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish";
/**
* This value will be used in the following settings
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Tue Mar 10 23:10:57 2009
@@ -25,8 +25,6 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
@@ -35,8 +33,6 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
{
@@ -157,7 +153,7 @@
}
finally
{
- _data.rewind();
+ // _data.rewind();
close(in);
}
}
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java Tue Mar 10 23:10:57 2009
@@ -58,22 +58,31 @@
if ((connection.getHost() == null) || connection.getHost().equals(""))
{
- String uid = AMQConnectionFactory.getUniqueClientID();
- if (uid == null)
- {
- throw URLHelper.parseError(-1, "Client Name not specified", fullURL);
+ String tmp = connection.getAuthority();
+ // hack to read a clientid such as "my_clientID"
+ if (tmp != null && tmp.indexOf('@') < tmp.length()-1)
+ {
+ _url.setClientName(tmp.substring(tmp.indexOf('@')+1,tmp.length()));
}
else
{
- _url.setClientName(uid);
+ String uid = AMQConnectionFactory.getUniqueClientID();
+ if (uid == null)
+ {
+ throw URLHelper.parseError(-1, "Client Name not specified", fullURL);
+ }
+ else
+ {
+ _url.setClientName(uid);
+ }
}
- }
+ }
else
{
_url.setClientName(connection.getHost());
}
-
+
String userInfo = connection.getUserInfo();
if (userInfo == null)
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Tue Mar 10 23:10:57 2009
@@ -33,9 +33,11 @@
*/
public interface ConnectionURL
{
- public static final String AMQ_SYNC_PERSISTENCE = "sync_persistence";
- public static final String AMQ_MAXPREFETCH = "maxprefetch";
public static final String AMQ_PROTOCOL = "amqp";
+ public static final String OPTIONS_SYNC_PERSISTENCE = "sync_persistence";
+ public static final String OPTIONS_MAXPREFETCH = "maxprefetch";
+ public static final String OPTIONS_SYNC_ACK = "sync_ack";
+ public static final String OPTIONS_SYNC_PUBLISH = "sync_publish";
public static final String OPTIONS_BROKERLIST = "brokerlist";
public static final String OPTIONS_FAILOVER = "failover";
public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java Tue Mar 10 23:10:57 2009
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.jms.failover;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
@@ -52,7 +53,7 @@
* from the list.
*/
-public class FailoverExchangeMethod extends FailoverRoundRobinServers implements FailoverMethod, MessageListener
+public class FailoverExchangeMethod implements FailoverMethod, MessageListener
{
private static final Logger _logger = LoggerFactory.getLogger(FailoverExchangeMethod.class);
@@ -65,17 +66,29 @@
/** The session used to subscribe to failover exchange */
private Session _ssn;
- private BrokerDetails _orginalBrokerDetail;
+ private BrokerDetails _originalBrokerDetail;
+
+ /** The index into the hostDetails array of the broker to which we are connected */
+ private int _currentBrokerIndex = 0;
+
+ /** The broker currently selected **/
+ private BrokerDetails _currentBrokerDetail;
+
+ /** Array of BrokerDetail used to make connections. */
+ private ConnectionURL _connectionDetails;
+
+ /** Denotes the number of failed attempts **/
+ private int _failedAttemps = 0;
public FailoverExchangeMethod(ConnectionURL connectionDetails, AMQConnection conn)
{
- super(connectionDetails);
- _orginalBrokerDetail = _connectionDetails.getBrokerDetails(0);
+ _connectionDetails = connectionDetails;
+ _originalBrokerDetail = _connectionDetails.getBrokerDetails(0);
// This is not safe to use until attainConnection is called, as this ref will not initialized fully.
// The reason being this constructor is called inside the AMWConnection constructor.
// It would be best if we find a way to pass this ref after AMQConnection is fully initialized.
- _conn = conn;
+ _conn = conn;
}
private void subscribeForUpdates() throws JMSException
@@ -96,6 +109,17 @@
public void onMessage(Message m)
{
_logger.info("Failover exchange notified cluster membership change");
+
+ String currentBrokerIP = "";
+ try
+ {
+ currentBrokerIP = InetAddress.getByName(_currentBrokerDetail.getHost()).getHostAddress();
+ }
+ catch(Exception e)
+ {
+ _logger.warn("Unable to resolve current broker host name",e);
+ }
+
List<BrokerDetails> brokerList = new ArrayList<BrokerDetails>();
try
{
@@ -109,15 +133,22 @@
for (String url:urls)
{
String[] tokens = url.split(":");
- if (tokens[0].equalsIgnoreCase(_orginalBrokerDetail.getTransport()))
+ if (tokens[0].equalsIgnoreCase(_originalBrokerDetail.getTransport()))
{
BrokerDetails broker = new AMQBrokerDetails();
broker.setTransport(tokens[0]);
broker.setHost(tokens[1]);
broker.setPort(Integer.parseInt(tokens[2]));
- broker.setProperties(_orginalBrokerDetail.getProperties());
- broker.setSSLConfiguration(_orginalBrokerDetail.getSSLConfiguration());
+ broker.setProperties(_originalBrokerDetail.getProperties());
+ broker.setSSLConfiguration(_originalBrokerDetail.getSSLConfiguration());
brokerList.add(broker);
+
+ if (currentBrokerIP.equals(broker.getHost()) &&
+ _currentBrokerDetail.getPort() == broker.getPort())
+ {
+ _currentBrokerIndex = brokerList.indexOf(broker);
+ }
+
break;
}
}
@@ -132,13 +163,20 @@
{
_connectionDetails.setBrokerDetails(brokerList);
}
+
+ _logger.info("============================================================");
+ _logger.info("Updated cluster membership details " + _connectionDetails);
+ _logger.info("============================================================");
}
public void attainedConnection()
{
- super.attainedConnection();
try
{
+ _failedAttemps = 0;
+ _logger.info("============================================================");
+ _logger.info("Attained connection ");
+ _logger.info("============================================================");
subscribeForUpdates();
}
catch (JMSException e)
@@ -151,17 +189,92 @@
{
synchronized (_brokerListLock)
{
- return super.getCurrentBrokerDetails();
+ return _connectionDetails.getBrokerDetails(_currentBrokerIndex);
}
- }
-
+ }
+
public BrokerDetails getNextBrokerDetails()
{
synchronized(_brokerListLock)
{
- return super.getNextBrokerDetails();
+ if (_currentBrokerIndex == (_connectionDetails.getBrokerCount() - 1))
+ {
+ _currentBrokerIndex = 0;
+ }
+ else
+ {
+ _currentBrokerIndex++;
+ }
+
+ BrokerDetails broker = _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+
+ // When the broker list is updated it will include the current broker as well
+ // There is no point trying it again, so trying the next one.
+ if (_currentBrokerDetail != null &&
+ broker.getHost().equals(_currentBrokerDetail.getHost()) &&
+ broker.getPort() == _currentBrokerDetail.getPort())
+ {
+ return getNextBrokerDetails();
+ }
+
+ String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY);
+ if (delayStr != null)
+ {
+ Long delay = Long.parseLong(delayStr);
+ _logger.info("Delay between connect retries:" + delay);
+ try
+ {
+ Thread.sleep(delay);
+ }
+ catch (InterruptedException ie)
+ {
+ return null;
+ }
+ }
+ else
+ {
+ _logger.info("No delay between connect retries, use tcp://host:port?connectdelay='value' to enable.");
+ }
+
+ _failedAttemps ++;
+ _currentBrokerDetail = broker;
+ return broker;
}
}
+
+ public boolean failoverAllowed()
+ {
+ // We allow to Failover provided
+ // our broker list is not empty and
+ // we haven't gone through all of them
+
+ boolean b = _connectionDetails.getBrokerCount() > 0 &&
+ _failedAttemps <= _connectionDetails.getBrokerCount();
+
+
+ _logger.info("============================================================");
+ _logger.info(toString());
+ _logger.info("FailoverAllowed " + b);
+ _logger.info("============================================================");
+
+ return b;
+ }
+
+ public void reset()
+ {
+ _failedAttemps = 0;
+ }
+
+ public void setBroker(BrokerDetails broker)
+ {
+ // not sure if this method is needed
+ }
+
+ public void setRetries(int maxRetries)
+ {
+ // no max retries we keep trying as long
+ // as we get updates
+ }
public String methodName()
{
@@ -172,7 +285,24 @@
{
StringBuffer sb = new StringBuffer();
sb.append("FailoverExchange:\n");
- sb.append(super.toString());
+ sb.append("\n Current Broker Index:");
+ sb.append(_currentBrokerIndex);
+ sb.append("\n Failed Attempts:");
+ sb.append(_failedAttemps);
+ sb.append("\n Orignal broker details:");
+ sb.append(_originalBrokerDetail).append("\n");
+ sb.append("\n -------- Broker List -----------\n");
+ for (int i = 0; i < _connectionDetails.getBrokerCount(); i++)
+ {
+ if (i == _currentBrokerIndex)
+ {
+ sb.append(">");
+ }
+
+ sb.append(_connectionDetails.getBrokerDetails(i));
+ sb.append("\n");
+ }
+ sb.append("--------------------------------\n");
return sb.toString();
}
}
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java Tue Mar 10 23:10:57 2009
@@ -30,7 +30,7 @@
private static final Logger _logger = LoggerFactory.getLogger(FailoverRoundRobinServers.class);
/** The default number of times to cycle through all servers */
- public static final int DEFAULT_CYCLE_RETRIES = 0;
+ public static final int DEFAULT_CYCLE_RETRIES = 1;
/** The default number of times to retry each server */
public static final int DEFAULT_SERVER_RETRIES = 0;
@@ -66,6 +66,8 @@
String cycleRetries = _connectionDetails.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE);
+ _cycleRetries = DEFAULT_CYCLE_RETRIES;
+
if (cycleRetries != null)
{
try
@@ -74,7 +76,7 @@
}
catch (NumberFormatException nfe)
{
- _cycleRetries = DEFAULT_CYCLE_RETRIES;
+ _logger.warn("Cannot set cycle Retries, " + cycleRetries + " is not a number. Using default: " + DEFAULT_CYCLE_RETRIES);
}
}
@@ -93,8 +95,8 @@
public boolean failoverAllowed()
{
- return ((_currentCycleRetries < _cycleRetries) || (_currentServerRetry < _serverRetries)
- || (_currentBrokerIndex < (_connectionDetails.getBrokerCount() - 1)));
+ return ((_currentCycleRetries < _cycleRetries) || (_currentServerRetry < _serverRetries));
+ //|| (_currentBrokerIndex <= (_connectionDetails.getBrokerCount() - 1)));
}
public void attainedConnection()
Modified: qpid/branches/qpid-1673/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java Tue Mar 10 23:10:57 2009
@@ -375,6 +375,19 @@
assertTrue(connectionurl.getBrokerCount() == 1);
}
+ public void testClientIDWithUnderscore() throws URLSyntaxException
+ {
+ String url = "amqp://user:pass@client_id/test?brokerlist='tcp://localhost:5672'";
+
+ ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+ assertTrue(connectionurl.getUsername().equals("user"));
+ assertTrue(connectionurl.getPassword().equals("pass"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
+ assertTrue(connectionurl.getClientName().equals("client_id"));
+
+ assertTrue(connectionurl.getBrokerCount() == 1);
+ }
public void testWrongOptionSeparatorInOptions()
{
Modified: qpid/branches/qpid-1673/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Tue Mar 10 23:10:57 2009
@@ -44,7 +44,9 @@
}
- public void sendQueueBind(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException
+ public void sendQueueBind(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments,
+ AMQShortString exchangeName, AMQDestination destination,
+ boolean nowait) throws AMQException, FailoverException
{
}
@@ -129,7 +131,8 @@
}
- public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
+ public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler,
+ boolean nowait) throws AMQException, FailoverException
{
}
Modified: qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java Tue Mar 10 23:10:57 2009
@@ -37,6 +37,8 @@
{
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void exception(Session ssn, SessionException exc)
{
exc.printStackTrace();
Modified: qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Tue Mar 10 23:10:57 2009
@@ -74,7 +74,7 @@
final private Map<Integer,Session> channels = new HashMap<Integer,Session>();
private State state = NEW;
- private Object lock = new Object();
+ final private Object lock = new Object();
private long timeout = 60000;
private ConnectionListener listener = new DefaultConnectionListener();
private ConnectionException error = null;
Modified: qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java Tue Mar 10 23:10:57 2009
@@ -37,6 +37,8 @@
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
int id = xfr.getId();
Modified: qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Tue Mar 10 23:10:57 2009
@@ -53,13 +53,15 @@
private static final Logger log = Logger.get(Session.class);
- enum State { NEW, DETACHED, OPEN, CLOSING, CLOSED }
+ enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
class DefaultSessionListener implements SessionListener
{
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
log.info("message: %s", xfr);
@@ -107,6 +109,8 @@
private volatile boolean flowControl = false;
private Semaphore credit = new Semaphore(0);
+ private Thread resumer = null;
+
Session(Connection connection, Binary name, long expiry)
{
this.connection = connection;
@@ -234,15 +238,21 @@
for (int i = maxComplete + 1; lt(i, commandsOut); i++)
{
Method m = commands[mod(i, commands.length)];
- if (m != null)
+ if (m == null)
{
- sessionCommandPoint(m.getId(), 0);
- send(m);
+ m = new ExecutionSync();
+ m.setId(i);
}
+ sessionCommandPoint(m.getId(), 0);
+ send(m);
}
sessionCommandPoint(commandsOut, 0);
sessionFlush(COMPLETED);
+ resumer = Thread.currentThread();
+ state = RESUMING;
+ listener.resumed(this);
+ resumer = null;
}
}
@@ -387,7 +397,7 @@
synchronized (commands)
{
- if (state == DETACHED)
+ if (state == DETACHED || state == CLOSING)
{
return;
}
@@ -494,15 +504,23 @@
{
if (state == DETACHED && m.isUnreliable())
{
- return;
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
+ {
+ return;
+ }
}
if (state != OPEN && state != CLOSED)
{
- Waiter w = new Waiter(commands, timeout);
- while (w.hasTime() && (state != OPEN && state != CLOSED))
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
{
- w.await();
+ Waiter w = new Waiter(commands, timeout);
+ while (w.hasTime() && (state != OPEN && state != CLOSED))
+ {
+ w.await();
+ }
}
}
@@ -510,8 +528,24 @@
{
case OPEN:
break;
+ case RESUMING:
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
+ {
+ throw new SessionException
+ ("timed out waiting for resume to finish");
+ }
+ break;
case CLOSED:
- throw new SessionClosedException();
+ ExecutionException exc = getException();
+ if (exc != null)
+ {
+ throw new SessionException(exc);
+ }
+ else
+ {
+ throw new SessionClosedException();
+ }
default:
throw new SessionException
(String.format
@@ -527,7 +561,7 @@
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && isFull(next))
{
- if (state == OPEN)
+ if (state == OPEN || state == RESUMING)
{
try
{
@@ -560,7 +594,7 @@
{
sessionCommandPoint(0, 0);
}
- if (expiry > 0)
+ if (expiry > 0 && !m.isUnreliable())
{
commands[mod(next, commands.length)] = m;
commandBytes += m.getBodySize();
@@ -828,9 +862,9 @@
{
throw new SessionException("close() timed out");
}
-
- connection.removeSession(this);
}
+
+ connection.removeSession(this);
}
public void exception(Throwable t)
@@ -842,7 +876,7 @@
{
synchronized (commands)
{
- if (expiry == 0)
+ if (expiry == 0 || getException() != null)
{
state = CLOSED;
}
Modified: qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java Tue Mar 10 23:10:57 2009
@@ -31,6 +31,8 @@
void opened(Session session);
+ void resumed(Session session);
+
void message(Session ssn, MessageTransfer xfr);
void exception(Session session, SessionException exception);
Modified: qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java Tue Mar 10 23:10:57 2009
@@ -87,6 +87,8 @@
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
count++;
Modified: qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java Tue Mar 10 23:10:57 2009
@@ -252,16 +252,16 @@
{
for (File subFile : file.listFiles())
{
- success = delete(subFile, true) & success ;
+ success = delete(subFile, true) && success;
}
- return file.delete();
+ return success && file.delete();
}
return false;
}
- return success && file.delete();
+ return file.delete();
}
Modified: qpid/branches/qpid-1673/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java Tue Mar 10 23:10:57 2009
@@ -74,6 +74,8 @@
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(final Session ssn, MessageTransfer xfr)
{
if (queue)
@@ -122,6 +124,13 @@
{
// do nothing
}
+ else if (body.startsWith("EXCP"))
+ {
+ ExecutionException exc = new ExecutionException();
+ exc.setDescription("intentional exception for testing");
+ ssn.invoke(exc);
+ ssn.close();
+ }
else
{
throw new IllegalArgumentException
@@ -138,9 +147,14 @@
private void send(Session ssn, String msg)
{
+ send(ssn, msg, false);
+ }
+
+ private void send(Session ssn, String msg, boolean sync)
+ {
ssn.messageTransfer
("xxx", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
- null, msg);
+ null, msg, sync ? SYNC : NONE);
}
private Connection connect(final Condition closed)
@@ -277,6 +291,7 @@
class TestSessionListener implements SessionListener
{
public void opened(Session s) {}
+ public void resumed(Session s) {}
public void exception(Session s, SessionException e) {}
public void message(Session s, MessageTransfer xfr)
{
@@ -391,4 +406,41 @@
conn.close();
}
+ public void testExecutionExceptionInvoke() throws Exception
+ {
+ startServer();
+
+ Connection conn = new Connection();
+ conn.connect("localhost", port, null, "guest", "guest");
+ Session ssn = conn.createSession();
+ send(ssn, "EXCP 0");
+ Thread.sleep(3000);
+ try
+ {
+ send(ssn, "SINK 1");
+ }
+ catch (SessionException exc)
+ {
+ assertNotNull(exc.getException());
+ }
+ }
+
+ public void testExecutionExceptionSync() throws Exception
+ {
+ startServer();
+
+ Connection conn = new Connection();
+ conn.connect("localhost", port, null, "guest", "guest");
+ Session ssn = conn.createSession();
+ send(ssn, "EXCP 0", true);
+ try
+ {
+ ssn.sync();
+ }
+ catch (SessionException exc)
+ {
+ assertNotNull(exc.getException());
+ }
+ }
+
}
Modified: qpid/branches/qpid-1673/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java Tue Mar 10 23:10:57 2009
@@ -280,6 +280,30 @@
checkFileLists(filesBefore, filesAfter);
}
+ public void testDeleteNonExistentFile()
+ {
+ File test = new File("FileUtilsTest-testDelete-"+System.currentTimeMillis());
+
+ assertTrue("File exists", !test.exists());
+ assertFalse("File is a directory", test.isDirectory());
+
+ assertTrue("Delete Succeeded ", !FileUtils.delete(test, true));
+ }
+
+ public void testDeleteNull()
+ {
+ try
+ {
+ FileUtils.delete(null, true);
+ fail("Delete with null value should throw NPE.");
+ }
+ catch (NullPointerException npe)
+ {
+ // expected path
+ }
+ }
+
+
/**
* Given two lists of File arrays ensure they are the same length and all entries in Before are in After
*
Modified: qpid/branches/qpid-1673/qpid/java/cpp.cluster.testprofile
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/cpp.cluster.testprofile?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/cpp.cluster.testprofile (original)
+++ qpid/branches/qpid-1673/qpid/java/cpp.cluster.testprofile Tue Mar 10 23:10:57 2009
@@ -3,3 +3,6 @@
test.excludesfile=${project.root}/ExcludeList ${project.root}/XAExcludeList ${project.root}/010ExcludeList
profile.clustered=true
+profile.failoverMsgCount=10
+profile.failoverIterations=10
+profile.failoverRandomSeed=20080921
Propchange: qpid/branches/qpid-1673/qpid/java/management/client/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Mar 10 23:10:57 2009
@@ -0,0 +1,2 @@
+compile
+release
Modified: qpid/branches/qpid-1673/qpid/java/management/client/bin/qman-wsdm-start.cmd
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/bin/qman-wsdm-start.cmd?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/bin/qman-wsdm-start.cmd (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/bin/qman-wsdm-start.cmd Tue Mar 10 23:10:57 2009
@@ -58,7 +58,6 @@
SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\start.jar
SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\jetty-6.1.14.jar
SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\jetty-util-6.1.14.jar
-SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\jetty-util-6.1.14.jar
SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\geronimo-servlet_2.5_spec-1.2.jar
SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\slf4j-api-1.4.0.jar
SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\slf4j-log4j12-1.4.0.jar
Modified: qpid/branches/qpid-1673/qpid/java/management/client/bin/qman-wsdm-start.sh
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/bin/qman-wsdm-start.sh?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/bin/qman-wsdm-start.sh (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/bin/qman-wsdm-start.sh Tue Mar 10 23:10:57 2009
@@ -58,7 +58,7 @@
QMAN_LIBS=$QMAN_HOME/lib
JETTY_CONFIG_FILE=$QMAN_HOME/etc/jetty.xml
-QMAN_CLASSPATH=$QMAN_HOME/etc:$QMAN_LIBS/start.jar:$QMAN_LIBS/jetty-6.1.14.jar:$QMAN_LIBS/jetty-util-6.1.14.jar:$QMAN_LIBS/jetty-util-6.1.14.jar:$QMAN_LIBS/geronimo-servlet_2.5_spec-1.2.jar:$QMAN_LIBS/slf4j-api-1.4.0.jar:$QMAN_LIBS/slf4j-log4j12-1.4.0.jar:$QMAN_LIBS/log4j-1.2.12.jar
+QMAN_CLASSPATH=$QMAN_HOME/etc:$QMAN_LIBS/start.jar:$QMAN_LIBS/jetty-6.1.14.jar:$QMAN_LIBS/jetty-util-6.1.14.jar:$QMAN_LIBS/geronimo-servlet_2.5_spec-1.2.jar:$QMAN_LIBS/slf4j-api-1.4.0.jar:$QMAN_LIBS/slf4j-log4j12-1.4.0.jar:$QMAN_LIBS/log4j-1.2.12.jar
echo "==============================================================================="
echo""
Modified: qpid/branches/qpid-1673/qpid/java/management/client/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/build.xml?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/build.xml (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/build.xml Tue Mar 10 23:10:57 2009
@@ -25,7 +25,6 @@
<import file="../../module.xml"/>
- <property name="war.name" value="qman.war"/>
<property name="build.root" value="${module.build}"/>
<property name="web.module" value="${module.build}${file.separator}wsdm-module"/>
<property name="web-inf.folder" value="${web.module}${file.separator}WEB-INF"/>
@@ -49,7 +48,6 @@
</copy>
</target>
-
<target name="libs-release" description="copy dependencies into module release">
<copy todir="${module.release}${file.separator}" failonerror="true" verbose="true">
<fileset dir="${build}" casesensitive="yes" includes="${module.libs}">
@@ -59,6 +57,8 @@
<not><filename name="**/*xalan*"/></not>
<not><filename name="**/*wsdl*"/></not>
<not><filename name="**/*muse*"/></not>
+ <not><filename name="**/*jsp*"/></not>
+ <not><filename name="**/*core-3.1.1.jar*"/></not>
</fileset>
</copy>
<copy todir="${module.release}${file.separator}lib" failonerror="true">
@@ -195,7 +195,7 @@
<batchtest fork="${test.fork}" todir="${module.results}">
<fileset dir="${module.test.src}" excludes="${module.test.excludes}">
- <include name="**/${test}.java"/>
+ <include name="**/${test}.java"/>
</fileset>
</batchtest>
</junit>
Modified: qpid/branches/qpid-1673/qpid/java/management/client/console/brokers_management.jsp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/console/brokers_management.jsp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/console/brokers_management.jsp (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/console/brokers_management.jsp Tue Mar 10 23:10:57 2009
@@ -97,7 +97,7 @@
Virtual Host :
</td>
<td>
- <input type="text" name="port"/>
+ <input type="text" name="virtualHost"/>
</td>
<td style="font-size: x-small;">
The virtual host name.
Modified: qpid/branches/qpid-1673/qpid/java/management/client/console/fragments/menu.jsp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/console/fragments/menu.jsp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/console/fragments/menu.jsp (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/console/fragments/menu.jsp Tue Mar 10 23:10:57 2009
@@ -3,8 +3,8 @@
<a href="<%=request.getContextPath()%>/console"> > System Overview</a>
<a href="<%=request.getContextPath()%>/brokers_management"> > Brokers Management</a>
<a href="<%=request.getContextPath()%>/resources_management"> > Resources Management</a>
- <a href="<%=request.getContextPath()%>/tbd.jsp"> > Subscriptions Management</a>
- <a href="<%=request.getContextPath()%>/tbd.jsp"> > System Health</a>
+ <a> > Subscriptions Management</a>
+ <a> > System Health</a>
<a href="<%=request.getContextPath()%>/logging_configuration"> > Logging Configuration</a>
</div>
</div>
Modified: qpid/branches/qpid-1673/qpid/java/management/client/console/wsdm_rmd_perspective.jsp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/console/wsdm_rmd_perspective.jsp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/console/wsdm_rmd_perspective.jsp (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/console/wsdm_rmd_perspective.jsp Tue Mar 10 23:10:57 2009
@@ -18,7 +18,7 @@
<body>
<div id="page" align="center">
<jsp:include page="/fragments/header.jsp">
- <jsp:param name="title" value="Resource Management - WS-DM WSDL Perspective"/>
+ <jsp:param name="title" value="Resource Management - WS-DM RMD Perspective"/>
</jsp:include>
<div id="content" align="center">
@@ -63,12 +63,8 @@
</tr>
<tr>
<td valign="top">
- <div class="panel" align="justify" style="height:500px; overflow-y:auto;">
- <c:set var="xml">
- ${wsdl}
- </c:set>
- <c:import var="xslt" url="wsdl-viewer.xsl" />
- <x:transform xml="${xml}" xslt="${xslt}" />
+ <div class="panel" align="left" style="height:500px; width=200px; overflow-y:auto; font-size: smaller; font-weight:bold;">
+ <pre> <c:out value="${rmd}" /> </pre>
</div>
</td>
</tr>
Modified: qpid/branches/qpid-1673/qpid/java/management/client/console/wsdm_wsdl_perspective.jsp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/console/wsdm_wsdl_perspective.jsp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/console/wsdm_wsdl_perspective.jsp (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/console/wsdm_wsdl_perspective.jsp Tue Mar 10 23:10:57 2009
@@ -63,12 +63,8 @@
</tr>
<tr>
<td valign="top">
- <div class="panel" align="justify" style="height:500px; overflow-y:auto;">
- <c:set var="xml">
- ${wsdl}
- </c:set>
- <c:import var="xslt" url="wsdl-viewer.xsl" />
- <x:transform xml="${xml}" xslt="${xslt}" />
+ <div class="panel" align="left" style="height:500px; width=200px; overflow-y:auto; font-size: smaller; font-weight:bold;">
+ <pre> <c:out value="${wsdl}" /> </pre>
</div>
</td>
</tr>
Modified: qpid/branches/qpid-1673/qpid/java/management/client/src/example/org/apache/qpid/management/example/ConsumerAndProducerExample.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/src/example/org/apache/qpid/management/example/ConsumerAndProducerExample.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/src/example/org/apache/qpid/management/example/ConsumerAndProducerExample.java (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/src/example/org/apache/qpid/management/example/ConsumerAndProducerExample.java Tue Mar 10 23:10:57 2009
@@ -26,7 +26,9 @@
import org.apache.muse.util.xml.XPathUtils;
import org.apache.muse.ws.addressing.EndpointReference;
import org.apache.muse.ws.addressing.soap.SoapFault;
+import org.apache.muse.ws.notification.impl.FilterCollection;
import org.apache.muse.ws.notification.impl.MessagePatternFilter;
+import org.apache.muse.ws.notification.impl.ProducerPropertiesFilter;
import org.apache.muse.ws.notification.impl.TopicFilter;
import org.apache.muse.ws.notification.remote.NotificationProducerClient;
import org.apache.qpid.management.Names;
@@ -65,11 +67,11 @@
void executeExample(String host, int port) throws Exception
{
// This is QMan...
- URI producerURI = URI.create("http://"+host+":"+port+"/qman/services/consumer");
+ URI producerURI = URI.create("http://"+host+":"+port+"/qman/services/adapter");
// ...and this is QMan too! Note that it has an hidden consumer capability that is used in
// order to run successfully this example...
- URI consumerURI = producerURI;
+ URI consumerURI = URI.create("http://"+host+":"+port+"/qman/services/consumer");
EndpointReference producerEPR = new EndpointReference(producerURI);
EndpointReference consumerEPR = new EndpointReference(consumerURI);
@@ -93,8 +95,11 @@
// Example 6: a MessageFilter is installed in order to listen only for connection events
// (connections created or removed). The subscription will expire in 10 seconds.
allMessagesWithMessageFilterAndTerminationTime(producerEPR,consumerEPR);
+
+ // Example 7 : a subscription with more than one filter.
+ complexSubscription(producerEPR, consumerEPR);
}
-
+
/**
* Makes a subscription on all topics / all messages without an expiry date.
*
@@ -223,6 +228,41 @@
new Date(System.currentTimeMillis() + 10000)); // Termination Time
}
+ /**
+ * Makes a subscription on a specifc topic with an expiry date.
+ * Only messages published on the given topic will be delivered to the given consumer.
+ * The subscription will end after 10 seconds
+ *
+ * @param producer the producer endpoint reference.
+ * @param consumer the consumer endpoint reference .
+ * @throws SoapFault when the subscription cannot be made.
+ */
+ private void complexSubscription(EndpointReference producer, EndpointReference consumer) throws SoapFault
+ {
+ NotificationProducerClient producerClient = new NotificationProducerClient(producer);
+ producerClient.setTrace(true);
+
+ FilterCollection filter = new FilterCollection();
+
+ TopicFilter topicFilter = new TopicFilter(Names.EVENTS_LIFECYLE_TOPIC_NAME);
+ MessagePatternFilter messageFilter= new MessagePatternFilter(
+ "/wsnt:NotificationMessage/wsnt:Message/qman:LifeCycleEvent/qman:Resource/qman:Name/text()='connection'", // expression (XPath)
+ XPathUtils.NAMESPACE_URI); // Dialect : the only supported dialect is XPath 1.0
+
+ ProducerPropertiesFilter producerFilter = new ProducerPropertiesFilter(
+ "boolean(/*/MgtPubInterval > 100 and /*/MsgTotalEnqueues > 56272)",
+ XPathUtils.NAMESPACE_URI);
+
+ filter.addFilter(topicFilter);
+ filter.addFilter(messageFilter);
+ filter.addFilter(producerFilter);
+
+ producerClient.subscribe(
+ consumer, // Consumer Endpoint reference
+ filter, // Topic Filter
+ new Date(System.currentTimeMillis() + 10000)); // Termination Time
+ }
+
@Override
void printOutExampleDescription()
{
@@ -245,4 +285,9 @@
System.out.println("A subscription with a termination time will have a predefined expiry");
System.out.println("date while if there's no termination the subscription will never expire.");
}
+
+ public static void main(String[] args)
+ {
+ new ConsumerAndProducerExample().execute(new String[]{"localhost","8080"});
+ }
}
Modified: qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/muse.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/muse.xml?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/muse.xml (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/muse.xml Tue Mar 10 23:10:57 2009
@@ -29,7 +29,7 @@
<java-serializer-class>org.apache.qpid.management.wsdm.muse.serializer.DateSerializer</java-serializer-class>
</custom-serializer>
<router>
- <java-router-class>org.apache.muse.core.routing.SimpleResourceRouter</java-router-class>
+ <java-router-class>org.apache.muse.ws.resource.impl.WsResourceRouter</java-router-class>
<logging>
<log-file>log/muse.log</log-file>
<log-level>SEVERE</log-level>
Modified: qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java Tue Mar 10 23:10:57 2009
@@ -170,6 +170,6 @@
String QMAN_100037_INVOKE_OPERATION_FAILURE = "<QMAN-100037> : Operation Invocation failure for operation.";
String QMAN_100038_UNABLE_TO_SEND_WS_NOTIFICATION = "<QMAN-100038> : Unable to send notification.";
String QMAN_100039_UNABLE_TO_CONFIGURE_PROPERLY_WORKER_MANAGER = "<QMAN-100039> : Unable to properly configure WorkManager. A malformed property (NaN) was given as input parameter.";
-
+ String QMAN_100040_UNABLE_TO_LOCATE_WSRP_PROPERTIES = "<QMAN-100040> : Unable to evaluate the WSRP XPath expression on resource WSDL.";
}
\ No newline at end of file
Modified: qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java Tue Mar 10 23:10:57 2009
@@ -45,7 +45,7 @@
public static String CLASS = "class";
public static String EVENT = "event";
public static String OBJECT_ID="objectId";
- public static String BROKER_ID = "brokerID";
+ public static String BROKER_ID = "brokerId";
public static String DOMAIN_NAME = "Q-MAN";
public static String ARG_COUNT_PARAM_NAME = "argCount";
@@ -86,7 +86,7 @@
new StringBuilder()
.append(DOMAIN_NAME)
.append(':')
- .append("Type=Service")
+ .append("Name=QMan,Type=Service")
.toString());
} catch(Exception exception)
{
Modified: qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java Tue Mar 10 23:10:57 2009
@@ -30,7 +30,16 @@
import org.apache.qpid.management.Names;
import org.apache.qpid.management.domain.handler.base.IMessageHandler;
import org.apache.qpid.management.domain.model.AccessMode;
+import org.apache.qpid.management.domain.model.type.AbsTime;
+import org.apache.qpid.management.domain.model.type.DeltaTime;
+import org.apache.qpid.management.domain.model.type.ObjectReference;
+import org.apache.qpid.management.domain.model.type.Str16;
+import org.apache.qpid.management.domain.model.type.Str8;
import org.apache.qpid.management.domain.model.type.Type;
+import org.apache.qpid.management.domain.model.type.Uint16;
+import org.apache.qpid.management.domain.model.type.Uint32;
+import org.apache.qpid.management.domain.model.type.Uint64;
+import org.apache.qpid.management.domain.model.type.Uint8;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageProperties;
@@ -71,7 +80,12 @@
private Configuration()
{
defineQueueNames();
+
createHeaderForCommandMessages();
+
+ addAccessModeMappings();
+
+ addTypeMappings();
}
void clean()
@@ -90,9 +104,11 @@
}
/**
- * Returns true if this configuration has at least one broker connection data.
+ * Returns true if this configuration has at least
+ * one broker configured.
*
- * @return true if this configuration has at least one broker connection data.
+ * @return true if this configuration has at least one
+ * broker configured.
*/
public boolean hasOneOrMoreBrokersDefined()
{
@@ -245,26 +261,46 @@
/**
* Adds a new type mapping to this configuration.
*
- * @param mapping the type mapping that will be added.
+ * @param code the code that will be associated with the declared type.
+ * @param type the type.
+ * @param vailidatorClassName the FQN of the validator class that will be
+ * associated with the given type.
*/
- void addTypeMapping(TypeMapping mapping) {
- int code = mapping.getCode();
- Type type = mapping.getType();
- String validatorClassName = mapping.getValidatorClassName();
- _typeMappings.put(code, type);
+ void addTypeMapping(int code, Type type, String validatorClassName) {
+ _typeMappings.put(code, type);
_validators.put(type, validatorClassName);
- LOGGER.info(Messages.QMAN_000005_TYPE_MAPPING_CONFIGURED, code,type,validatorClassName);
+ LOGGER.info(
+ Messages.QMAN_000005_TYPE_MAPPING_CONFIGURED,
+ code,
+ type,
+ validatorClassName);
}
-
+
+
+ /**
+ * Adds a new type mapping to this configuration.
+ *
+ * @param code the code that will be associated with the declared type.
+ * @param type the type.
+ */
+ void addTypeMapping(int code, Type type) {
+ _typeMappings.put(code, type);
+
+ LOGGER.info(
+ Messages.QMAN_000005_TYPE_MAPPING_CONFIGURED,
+ code,
+ type,
+ "not configured for this type.");
+ }
+
/**
* Adds a new access mode mapping to this configuration.
*
- * @param mapping the mapping that will be added.
+ * @param code the code that will be associated with the access mode,
+ * @param accessMode the accessMode.
*/
- void addAccessModeMapping(AccessModeMapping mapping){
- int code = mapping.getCode();
- AccessMode accessMode = mapping.getAccessMode();
+ void addAccessModeMapping(int code, AccessMode accessMode){
_accessModes.put(code, accessMode);
LOGGER.info(Messages.QMAN_000006_ACCESS_MODE_MAPPING_CONFIGURED, code,accessMode);
@@ -420,4 +456,34 @@
{
this._keepAliveTime = keepAliveTime;
}
+
+ /**
+ * Configures access mode mappings.
+ * An access mode mapping is an association between a code and an access mode.
+ */
+ private void addAccessModeMappings() {
+ addAccessModeMapping(1,AccessMode.RC);
+ addAccessModeMapping(2,AccessMode.RW);
+ addAccessModeMapping(3,AccessMode.RO);
+ }
+
+ /**
+ * Configures type mappings.
+ * A type mapping is an association between a code and a management type.
+ */
+ private void addTypeMappings()
+ {
+ addTypeMapping(1,new Uint8(),Names.NUMBER_VALIDATOR);
+ addTypeMapping(2,new Uint16(),Names.NUMBER_VALIDATOR);
+ addTypeMapping(3,new Uint32(),Names.NUMBER_VALIDATOR);
+ addTypeMapping(4,new Uint64(),Names.NUMBER_VALIDATOR);
+ addTypeMapping(6,new Str8(),Names.STRING_VALIDATOR);
+ addTypeMapping(7,new Str16(),Names.STRING_VALIDATOR);
+ addTypeMapping(8,new AbsTime());
+ addTypeMapping(9,new DeltaTime());
+ addTypeMapping(10,new ObjectReference());
+ addTypeMapping(11,new org.apache.qpid.management.domain.model.type.Boolean());
+ addTypeMapping(14,new org.apache.qpid.management.domain.model.type.Uuid());
+ addTypeMapping(15,new org.apache.qpid.management.domain.model.type.Map());
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org