You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/03/11 00:11:10 UTC

svn commit: r752300 [9/12] - in /qpid/branches/qpid-1673/qpid: cpp/ cpp/examples/ cpp/examples/direct/ cpp/examples/failover/ cpp/examples/fanout/ cpp/examples/pub-sub/ cpp/examples/qmf-console/ cpp/examples/request-response/ cpp/examples/tradedemo/ cp...

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue Mar 10 23:10:57 2009
@@ -135,7 +135,7 @@
                 {
                     try
                     {
-                        flushAcknowledgments();
+                        flushAcknowledgments(true);
                     }
                     catch (Throwable t)
                     {
@@ -236,12 +236,17 @@
 
     void flushAcknowledgments()
     {
+        flushAcknowledgments(false);
+    }
+    
+    void flushAcknowledgments(boolean setSyncBit)
+    {
         synchronized (unacked)
         {
             if (unackedCount > 0)
             {
                 messageAcknowledge
-                    (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+                    (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit);
                 clearUnacked();
             }
         }
@@ -249,6 +254,11 @@
 
     void messageAcknowledge(RangeSet ranges, boolean accept)
     {
+        messageAcknowledge(ranges,accept,false);
+    }
+    
+    void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
+    {
         Session ssn = getQpidSession();
         for (Range range : ranges)
         {
@@ -257,7 +267,7 @@
         ssn.flushProcessed(accept ? BATCH : NONE);
         if (accept)
         {
-            ssn.messageAccept(ranges, UNRELIABLE);
+            ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
         }
     }
 
@@ -272,7 +282,8 @@
      * @param arguments    0_8 specific
      */
     public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
-                              final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination)
+                              final FieldTable arguments, final AMQShortString exchangeName,
+                              final AMQDestination destination, final boolean nowait)
             throws AMQException, FailoverException
     {
         Map args = FiledTableSupport.convertToMap(arguments);
@@ -287,9 +298,12 @@
             _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString());
             getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args);
         }
-        // We need to sync so that we get notify of an error.
-        getQpidSession().sync();
-        getCurrentException();
+        if (!nowait)
+        {
+            // We need to sync so that we get notify of an error.
+            getQpidSession().sync();
+            getCurrentException();
+        }
     }
 
 
@@ -501,18 +515,24 @@
         {
             getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW);
         }
-        getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
+        getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
+                                     Option.UNRELIABLE);
         // We need to sync so that we get notify of an error.
         // only if not immediat prefetch
-        if(prefetch() && (consumer.isStrated() || _immediatePrefetch))
+        if(prefetch() && (isStarted() || _immediatePrefetch))
         {
             // set the flow
             getQpidSession().messageFlow(consumerTag,
                                          MessageCreditUnit.MESSAGE,
-                                         getAMQConnection().getMaxPrefetch());
+                                         getAMQConnection().getMaxPrefetch(),
+                                         Option.UNRELIABLE);
+        }
+
+        if (!nowait)
+        {
+            getQpidSession().sync();
+            getCurrentException();
         }
-        getQpidSession().sync();
-        getCurrentException();
     }
 
     /**
@@ -540,14 +560,18 @@
                                         null,
                                         name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE);
         // We need to sync so that we get notify of an error.
-        getQpidSession().sync();
-        getCurrentException();
+        if (!nowait)
+        {
+            getQpidSession().sync();
+            getCurrentException();
+        }
     }
 
     /**
      * Declare a queue with the given queueName
      */
-    public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
+    public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+                                 final boolean nowait)
             throws AMQException, FailoverException
     {
         // do nothing this is only used by 0_8
@@ -557,7 +581,7 @@
      * Declare a queue with the given queueName
      */
     public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                               final boolean noLocal)
+                                               final boolean noLocal, final boolean nowait)
             throws AMQException, FailoverException
     {
         AMQShortString res;
@@ -581,9 +605,12 @@
                                       amqd.isDurable() ? Option.DURABLE : Option.NONE,
                                       !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
         // passive --> false
-        // We need to sync so that we get notify of an error.
-        getQpidSession().sync();
-        getCurrentException();
+        if (!nowait)
+        {
+            // We need to sync so that we get notify of an error.
+            getQpidSession().sync();
+            getCurrentException();
+        }
         return res;
     }
 
@@ -609,7 +636,8 @@
         {
             for (BasicMessageConsumer consumer : _consumers.values())
             {
-                getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()));
+                getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+                                             Option.UNRELIABLE);
             }
         }
         else
@@ -625,17 +653,20 @@
                         if (consumer.getMessageListener() != null)
                         {
                             getQpidSession().messageFlow(consumerTag,
-                                                         MessageCreditUnit.MESSAGE, 1);
+                                                         MessageCreditUnit.MESSAGE, 1,
+                                                         Option.UNRELIABLE);
                         }
                     }
                     else
                     {
                         getQpidSession()
                             .messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
-                                         getAMQConnection().getMaxPrefetch());
+                                         getAMQConnection().getMaxPrefetch(),
+                                         Option.UNRELIABLE);
                     }
                     getQpidSession()
-                        .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
+                        .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
+                                     Option.UNRELIABLE);
                 }
                 catch (Exception e)
                 {
@@ -700,6 +731,19 @@
 
     public void opened(Session ssn) {}
 
+    public void resumed(Session ssn)
+    {
+        _qpidConnection = ssn.getConnection();
+        try
+        {
+            resubscribe();
+        }
+        catch (AMQException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     public void message(Session ssn, MessageTransfer xfr)
     {
         messageReceived(new UnprocessedMessage_0_10(xfr));
@@ -716,7 +760,7 @@
     public void closed(Session ssn) {}
 
     protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                          final boolean noLocal)
+                                          final boolean noLocal, final boolean nowait)
             throws AMQException
     {
         /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
@@ -736,34 +780,11 @@
                             amqd.setQueueName(new AMQShortString( binddingKey + "@"
                                     + amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
                         }
-                        return send0_10QueueDeclare(amqd, protocolHandler, noLocal);
+                        return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait);
                     }
                 }, _connection).execute();
     }
 
-
-    void start() throws AMQException
-    {
-        super.start();
-        for(BasicMessageConsumer  c:  _consumers.values())
-        {
-              c.start();
-        }
-    }
-
-
-    void stop() throws AMQException
-    {
-        super.stop();
-        for(BasicMessageConsumer  c:  _consumers.values())
-        {
-              c.stop();
-        }
-    }
-
-
-
-
     public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
     {
 

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Tue Mar 10 23:10:57 2009
@@ -106,7 +106,8 @@
     }
 
     public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
-            final AMQShortString exchangeName, final AMQDestination dest) throws AMQException, FailoverException
+                              final AMQShortString exchangeName, final AMQDestination dest,
+                              final boolean nowait) throws AMQException, FailoverException
     {
         getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
                                         (getTicket(),queueName,exchangeName,routingKey,false,arguments).
@@ -300,13 +301,14 @@
     {
         ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,
                                                                                  name.toString().startsWith("amq."),
-                                                                                 false,false,false,nowait,null);
+                                                                                 false,false,false,false,null);
         AMQFrame exchangeDeclare = body.generateFrame(_channelId);
 
         protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
     }
 
-    public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
+    public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+                                 final boolean nowait) throws AMQException, FailoverException
     {
         QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
 

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Mar 10 23:10:57 2009
@@ -441,7 +441,9 @@
              o = _synchronousQueue.take();
          }
          return o;
-     }
+    }
+
+    abstract Message receiveBrowse() throws JMSException;
 
     public Message receiveNoWait() throws JMSException
     {
@@ -1037,23 +1039,6 @@
         _synchronousQueue.clear();
     }
 
-    public void start()
-    {
-        // do nothing as this is a 0_10 feature
-    }
-
-
-    public void stop()
-    {
-        // do nothing as this is a 0_10 feature
-    }
-
-    public boolean isStrated()
-    {
-        // do nothing as this is a 0_10 feature
-        return false;
-    }
-
     public AMQShortString getQueuename()
     {
         return _queuename;
@@ -1070,10 +1055,13 @@
     }
 
     /** to be called when a failover has occured */
-    public void failedOver()
+    public void failedOverPre()
     {
         clearReceiveQueue();
         // TGM FIXME: think this should just be removed
         // clearUnackedMessages();
     }
+
+    public void failedOverPost() {}
+
 }

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Mar 10 23:10:57 2009
@@ -31,6 +31,7 @@
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageListener;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -148,7 +149,8 @@
             if (isMessageListenerSet() && ! getSession().prefetch())
             {
                 _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                          MessageCreditUnit.MESSAGE, 1);
+                                                          MessageCreditUnit.MESSAGE, 1,
+                                                          Option.UNRELIABLE);
             }
             _logger.debug("messageOk, trying to notify");
             super.notifyMessage(jmsMessage);
@@ -246,7 +248,8 @@
             if(! getSession().prefetch())
             {
                _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                         MessageCreditUnit.MESSAGE, 1);
+                                                         MessageCreditUnit.MESSAGE, 1,
+                                                         Option.UNRELIABLE);
             }
         }
         // now we need to acquire this message if needed
@@ -258,9 +261,7 @@
                 _logger.debug("filterMessage - trying to acquire message");
             }
             messageOk = acquireMessage(message);
-            _logger.debug("filterMessage - *************************************");
             _logger.debug("filterMessage - message acquire status : " + messageOk);
-            _logger.debug("filterMessage - *************************************");
         }
         return messageOk;
     }
@@ -335,7 +336,8 @@
         if (messageListener != null && ! getSession().prefetch())
         {
             _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                      MessageCreditUnit.MESSAGE, 1);
+                                                      MessageCreditUnit.MESSAGE, 1,
+                                                      Option.UNRELIABLE);
         }
         if (messageListener != null && !_synchronousQueue.isEmpty())
         {
@@ -349,26 +351,16 @@
         }
     }
 
-    public boolean isStrated()
+    public void failedOverPost()
     {
-        return _isStarted;
-    }
-
-    public void start()
-    {
-        _isStarted = true;
-        if (_syncReceive.get())
+        if (_0_10session.isStarted() && _syncReceive.get())
         {
-            _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                      MessageCreditUnit.MESSAGE, 1);
+            _0_10session.getQpidSession().messageFlow
+                (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
+                 Option.UNRELIABLE);
         }
     }
 
-    public void stop()
-    {
-        _isStarted = false;
-    }
-
     /**
      * When messages are not prefetched we need to request a message from the
      * broker.
@@ -380,16 +372,35 @@
      */
     public Object getMessageFromQueue(long l) throws InterruptedException
     {
-        if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
-        {
-            _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                      MessageCreditUnit.MESSAGE, 1);
-        }
         if (! getSession().prefetch())
         {
             _syncReceive.set(true);
         }
+        if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
+        {
+            _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+                                                      MessageCreditUnit.MESSAGE, 1,
+                                                      Option.UNRELIABLE);
+        }
         Object o = super.getMessageFromQueue(l);
+        if (o == null && _0_10session.isStarted())
+        {
+            _0_10session.getQpidSession().messageFlush
+                (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC);
+            _0_10session.getQpidSession().sync();
+            _0_10session.getQpidSession().messageFlow
+                (getConsumerTagString(), MessageCreditUnit.BYTE,
+                 0xFFFFFFFF, Option.UNRELIABLE);
+            if (getSession().prefetch())
+            {
+                _0_10session.getQpidSession().messageFlow
+                    (getConsumerTagString(), MessageCreditUnit.MESSAGE,
+                     _0_10session.getAMQConnection().getMaxPrefetch(),
+                     Option.UNRELIABLE);
+            }
+            _0_10session.syncDispatchQueue();
+            o = super.getMessageFromQueue(-1);
+        }
         if (! getSession().prefetch())
         {
             _syncReceive.set(false);
@@ -404,6 +415,19 @@
         {
           _session.acknowledgeMessage(msg.getDeliveryTag(), false);
         }
+        
+        if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE  &&
+             !_session.isInRecovery() &&   
+             _session.getAMQConnection().getSyncAck())
+        {
+            ((AMQSession_0_10) getSession()).flushAcknowledgments();
+            ((AMQSession_0_10) getSession()).getQpidSession().sync();
+        }
+    }
+
+    Message receiveBrowse() throws JMSException
+    {
+        return receiveNoWait();
     }
 
 }

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Tue Mar 10 23:10:57 2009
@@ -22,6 +22,7 @@
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
+import javax.jms.Message;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.QpidException;
@@ -38,9 +39,9 @@
     protected final Logger _logger = LoggerFactory.getLogger(getClass());
 
     protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
-            String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
-            AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
-            boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
+                                       String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+                                       AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
+                                       boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
     {
         super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
               protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive,
@@ -73,13 +74,18 @@
         }
     }
 
-     public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
-     {
+    public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
+    {
 
         return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
-            messageFrame.isRedelivered(), messageFrame.getExchange(),
-            messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+                                             messageFrame.isRedelivered(), messageFrame.getExchange(),
+                                             messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+
+    }
 
+    Message receiveBrowse() throws JMSException
+    {
+        return receive();
     }
 
 }

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Tue Mar 10 23:10:57 2009
@@ -46,6 +46,8 @@
 
 public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
 {
+    enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; 
+    
     protected final Logger _logger = LoggerFactory.getLogger(getClass());
 
     private AMQConnection _connection;
@@ -120,6 +122,8 @@
     protected String _userID;  // ref user id used in the connection.
 
     private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
+    
+    protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
 
     protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
                                    AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
@@ -141,6 +145,26 @@
         _mandatory = mandatory;
         _waitUntilSent = waitUntilSent;
         _userID = connection.getUsername();
+        setPublishMode();        
+    }
+    
+    void setPublishMode()
+    {
+        // Publish mode could be configured at destination level as well.
+        // Will add support for this when we provide a more robust binding URL
+        
+        String syncPub = _connection.getSyncPublish();
+        // Support for deprecated option sync_persistence
+        if (syncPub.equals("persistent") || _connection.getSyncPersistence())
+        {
+            publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT;
+        }
+        else if (syncPub.equals("all"))
+        {
+            publishMode = PublishMode.SYNC_PUBLISH_ALL;
+        }
+        
+        _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode);
     }
 
     void resubscribe() throws AMQException

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Tue Mar 10 23:10:57 2009
@@ -151,9 +151,13 @@
                 ((AMQSession_0_10) getSession()).getQpidSession();
 
             // if true, we need to sync the delivery of this message
-            boolean sync = (deliveryMode == DeliveryMode.PERSISTENT &&
-                            getSession().getAMQConnection().getSyncPersistence());
+            boolean sync = false;
 
+            sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) ||
+                     (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT && 
+                         deliveryMode == DeliveryMode.PERSISTENT)
+                   );  
+            
             org.apache.mina.common.ByteBuffer data = message.getData();
             ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
             

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java Tue Mar 10 23:10:57 2009
@@ -47,6 +47,19 @@
      */
     public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence";
     
+    /**
+     * When true a sync command is sent after sending a message ack.
+     * type: boolean
+     */
+    public static final String SYNC_ACK_PROP_NAME = "sync_ack";
+        
+    /**
+     * sync_publish property - {persistent|all}
+     * If set to 'persistent',then persistent messages will be publish synchronously
+     * If set to 'all', then all messages regardless of the delivery mode will be
+     * published synchronously.
+     */
+    public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish";
     
     /**
      * This value will be used in the following settings

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Tue Mar 10 23:10:57 2009
@@ -25,8 +25,6 @@
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
 
 import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
@@ -35,8 +33,6 @@
 import org.apache.mina.common.ByteBuffer;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
 {
@@ -157,7 +153,7 @@
         }
         finally
         {
-            _data.rewind();
+          //  _data.rewind();
             close(in);
         }
     }

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java Tue Mar 10 23:10:57 2009
@@ -58,22 +58,31 @@
 
             if ((connection.getHost() == null) || connection.getHost().equals(""))
             {
-                String uid = AMQConnectionFactory.getUniqueClientID();
-                if (uid == null)
-                {
-                    throw URLHelper.parseError(-1, "Client Name not specified", fullURL);
+                String tmp = connection.getAuthority();
+                // hack to read a clientid such as "my_clientID"
+                if (tmp != null && tmp.indexOf('@') < tmp.length()-1)
+                {                   
+                    _url.setClientName(tmp.substring(tmp.indexOf('@')+1,tmp.length()));
                 }
                 else
                 {
-                    _url.setClientName(uid);
+                    String uid = AMQConnectionFactory.getUniqueClientID();
+                    if (uid == null)
+                    {
+                        throw URLHelper.parseError(-1, "Client Name not specified", fullURL);
+                    }
+                    else
+                    {
+                        _url.setClientName(uid);
+                    }
                 }
 
-            }
+            }            
             else
             {
                 _url.setClientName(connection.getHost());
             }
-
+            
             String userInfo = connection.getUserInfo();
 
             if (userInfo == null)

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Tue Mar 10 23:10:57 2009
@@ -33,9 +33,11 @@
   */
 public interface ConnectionURL
 {
-    public static final String AMQ_SYNC_PERSISTENCE = "sync_persistence";
-    public static final String AMQ_MAXPREFETCH = "maxprefetch";
     public static final String AMQ_PROTOCOL = "amqp";
+    public static final String OPTIONS_SYNC_PERSISTENCE = "sync_persistence";
+    public static final String OPTIONS_MAXPREFETCH = "maxprefetch";
+    public static final String OPTIONS_SYNC_ACK = "sync_ack";    
+    public static final String OPTIONS_SYNC_PUBLISH = "sync_publish";
     public static final String OPTIONS_BROKERLIST = "brokerlist";
     public static final String OPTIONS_FAILOVER = "failover";
     public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java Tue Mar 10 23:10:57 2009
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.jms.failover;
 
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -52,7 +53,7 @@
  * from the list.   
  */
 
-public class FailoverExchangeMethod extends FailoverRoundRobinServers implements FailoverMethod, MessageListener
+public class FailoverExchangeMethod implements FailoverMethod, MessageListener
 {
     private static final Logger _logger = LoggerFactory.getLogger(FailoverExchangeMethod.class);
    
@@ -65,17 +66,29 @@
     /** The session used to subscribe to failover exchange */
     private Session _ssn;
     
-    private BrokerDetails _orginalBrokerDetail;
+    private BrokerDetails _originalBrokerDetail;
+    
+    /** The index into the hostDetails array of the broker to which we are connected */
+    private int _currentBrokerIndex = 0;
+    
+    /** The broker currently selected **/
+    private BrokerDetails _currentBrokerDetail;
+        
+    /** Array of BrokerDetail used to make connections. */
+    private ConnectionURL _connectionDetails;
+    
+    /** Denotes the number of failed attempts **/
+    private int _failedAttemps = 0;
     
     public FailoverExchangeMethod(ConnectionURL connectionDetails, AMQConnection conn)
     {
-        super(connectionDetails);        
-        _orginalBrokerDetail = _connectionDetails.getBrokerDetails(0);
+        _connectionDetails = connectionDetails;
+        _originalBrokerDetail = _connectionDetails.getBrokerDetails(0);
         
         // This is not safe to use until attainConnection is called, as this ref will not initialized fully.
         // The reason being this constructor is called inside the AMWConnection constructor.
         // It would be best if we find a way to pass this ref after AMQConnection is fully initialized.
-        _conn = conn; 
+        _conn = conn;
     }
 
     private void subscribeForUpdates() throws JMSException
@@ -96,6 +109,17 @@
     public void onMessage(Message m)
     {
         _logger.info("Failover exchange notified cluster membership change");
+        
+        String currentBrokerIP = ""; 
+        try
+        {
+            currentBrokerIP = InetAddress.getByName(_currentBrokerDetail.getHost()).getHostAddress();
+        }
+        catch(Exception e)
+        {
+            _logger.warn("Unable to resolve current broker host name",e);
+        }
+        
         List<BrokerDetails> brokerList = new ArrayList<BrokerDetails>();
         try
         {            
@@ -109,15 +133,22 @@
                 for (String url:urls)
                 {
                     String[] tokens = url.split(":");
-                    if (tokens[0].equalsIgnoreCase(_orginalBrokerDetail.getTransport()))
+                    if (tokens[0].equalsIgnoreCase(_originalBrokerDetail.getTransport()))
                     {
                         BrokerDetails broker = new AMQBrokerDetails();
                         broker.setTransport(tokens[0]);
                         broker.setHost(tokens[1]);
                         broker.setPort(Integer.parseInt(tokens[2]));
-                        broker.setProperties(_orginalBrokerDetail.getProperties());
-                        broker.setSSLConfiguration(_orginalBrokerDetail.getSSLConfiguration());
+                        broker.setProperties(_originalBrokerDetail.getProperties());
+                        broker.setSSLConfiguration(_originalBrokerDetail.getSSLConfiguration());
                         brokerList.add(broker);
+                        
+                        if (currentBrokerIP.equals(broker.getHost()) && 
+                            _currentBrokerDetail.getPort() == broker.getPort())
+                        {
+                            _currentBrokerIndex = brokerList.indexOf(broker);
+                        }
+                        
                         break;
                     }
                 }                
@@ -132,13 +163,20 @@
         {
             _connectionDetails.setBrokerDetails(brokerList);
         }
+        
+        _logger.info("============================================================");
+        _logger.info("Updated cluster membership details " + _connectionDetails);
+        _logger.info("============================================================");
     }
     
     public void attainedConnection()
     {
-        super.attainedConnection();
         try
         {
+            _failedAttemps = 0;
+            _logger.info("============================================================");
+            _logger.info("Attained connection ");
+            _logger.info("============================================================");
             subscribeForUpdates();
         }
         catch (JMSException e)
@@ -151,17 +189,92 @@
     {
         synchronized (_brokerListLock)
         {
-            return super.getCurrentBrokerDetails();
+            return _connectionDetails.getBrokerDetails(_currentBrokerIndex);
         }
-    }
-
+    }   
+    
     public BrokerDetails getNextBrokerDetails()
     {
         synchronized(_brokerListLock)
         {
-            return super.getNextBrokerDetails();
+            if (_currentBrokerIndex == (_connectionDetails.getBrokerCount() - 1))
+            {
+                _currentBrokerIndex = 0;
+            }
+            else
+            {
+                _currentBrokerIndex++;
+            }
+            
+            BrokerDetails broker = _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+            
+            // When the broker list is updated it will include the current broker as well
+            // There is no point trying it again, so trying the next one.
+            if (_currentBrokerDetail != null &&
+                broker.getHost().equals(_currentBrokerDetail.getHost()) &&
+                broker.getPort() == _currentBrokerDetail.getPort())
+            {
+                return getNextBrokerDetails();
+            }
+
+            String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY);
+            if (delayStr != null)
+            {
+                Long delay = Long.parseLong(delayStr);
+                _logger.info("Delay between connect retries:" + delay);
+                try
+                {
+                    Thread.sleep(delay);
+                }
+                catch (InterruptedException ie)
+                {
+                    return null;
+                }
+            }
+            else
+            {
+                _logger.info("No delay between connect retries, use tcp://host:port?connectdelay='value' to enable.");
+            }
+
+            _failedAttemps ++;
+            _currentBrokerDetail = broker;
+            return broker;            
         }
     }
+    
+    public boolean failoverAllowed()
+    {
+        // We allow to Failover provided 
+        // our broker list is not empty and
+        // we haven't gone through all of them  
+               
+        boolean b = _connectionDetails.getBrokerCount() > 0 &&
+               _failedAttemps <= _connectionDetails.getBrokerCount();
+        
+        
+        _logger.info("============================================================");
+        _logger.info(toString());
+        _logger.info("FailoverAllowed " + b);
+        _logger.info("============================================================");
+        
+        return b;
+    }
+    
+    public void reset()
+    {
+        _failedAttemps = 0;
+    }
+    
+    public void setBroker(BrokerDetails broker)
+    {
+        // not sure if this method is needed
+    }
+
+    public void setRetries(int maxRetries)
+    {
+        // no max retries we keep trying as long
+        // as we get updates
+    }
 
     public String methodName()
     {
@@ -172,7 +285,24 @@
     {
         StringBuffer sb = new StringBuffer();
         sb.append("FailoverExchange:\n");
-        sb.append(super.toString());
+        sb.append("\n Current Broker Index:");
+        sb.append(_currentBrokerIndex);
+        sb.append("\n Failed Attempts:");
+        sb.append(_failedAttemps);
+        sb.append("\n Orignal broker details:");
+        sb.append(_originalBrokerDetail).append("\n");
+        sb.append("\n -------- Broker List -----------\n");
+        for (int i = 0; i < _connectionDetails.getBrokerCount(); i++)
+        {
+            if (i == _currentBrokerIndex)
+            {
+                sb.append(">");
+            }
+
+            sb.append(_connectionDetails.getBrokerDetails(i));
+            sb.append("\n");
+        }
+        sb.append("--------------------------------\n");
         return sb.toString();
     }
 }

Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java Tue Mar 10 23:10:57 2009
@@ -30,7 +30,7 @@
     private static final Logger _logger = LoggerFactory.getLogger(FailoverRoundRobinServers.class);
 
     /** The default number of times to cycle through all servers */
-    public static final int DEFAULT_CYCLE_RETRIES = 0;
+    public static final int DEFAULT_CYCLE_RETRIES = 1;
     /** The default number of times to retry each server */
     public static final int DEFAULT_SERVER_RETRIES = 0;
 
@@ -66,6 +66,8 @@
 
         String cycleRetries = _connectionDetails.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE);
 
+        _cycleRetries = DEFAULT_CYCLE_RETRIES;
+        
         if (cycleRetries != null)
         {
             try
@@ -74,7 +76,7 @@
             }
             catch (NumberFormatException nfe)
             {
-                _cycleRetries = DEFAULT_CYCLE_RETRIES;
+                _logger.warn("Cannot set cycle Retries, " + cycleRetries + " is not a number. Using default: " + DEFAULT_CYCLE_RETRIES); 
             }
         }
 
@@ -93,8 +95,8 @@
 
     public boolean failoverAllowed()
     {
-        return ((_currentCycleRetries < _cycleRetries) || (_currentServerRetry < _serverRetries)
-                || (_currentBrokerIndex < (_connectionDetails.getBrokerCount() - 1)));
+        return ((_currentCycleRetries < _cycleRetries) || (_currentServerRetry < _serverRetries));
+                //|| (_currentBrokerIndex <= (_connectionDetails.getBrokerCount() - 1)));
     }
 
     public void attainedConnection()

Modified: qpid/branches/qpid-1673/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java Tue Mar 10 23:10:57 2009
@@ -375,6 +375,19 @@
         assertTrue(connectionurl.getBrokerCount() == 1);
     }
 
+    public void testClientIDWithUnderscore() throws URLSyntaxException
+    {
+        String url = "amqp://user:pass@client_id/test?brokerlist='tcp://localhost:5672'";
+
+        ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+        assertTrue(connectionurl.getUsername().equals("user"));
+        assertTrue(connectionurl.getPassword().equals("pass"));
+        assertTrue(connectionurl.getVirtualHost().equals("/test"));
+        assertTrue(connectionurl.getClientName().equals("client_id"));
+        
+        assertTrue(connectionurl.getBrokerCount() == 1);
+    }
 
     public void testWrongOptionSeparatorInOptions()
     {

Modified: qpid/branches/qpid-1673/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Tue Mar 10 23:10:57 2009
@@ -44,7 +44,9 @@
 
     }
 
-    public void sendQueueBind(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException
+    public void sendQueueBind(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments,
+                              AMQShortString exchangeName, AMQDestination destination,
+                              boolean nowait) throws AMQException, FailoverException
     {
 
     }
@@ -129,7 +131,8 @@
 
     }
 
-    public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
+    public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler,
+                                 boolean nowait) throws AMQException, FailoverException
     {
 
     }

Modified: qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java Tue Mar 10 23:10:57 2009
@@ -37,6 +37,8 @@
 {
     public void opened(Session ssn) {}
 
+    public void resumed(Session ssn) {}
+
     public void exception(Session ssn, SessionException exc)
     {
         exc.printStackTrace();

Modified: qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Tue Mar 10 23:10:57 2009
@@ -74,7 +74,7 @@
     final private Map<Integer,Session> channels = new HashMap<Integer,Session>();
 
     private State state = NEW;
-    private Object lock = new Object();
+    final private Object lock = new Object();
     private long timeout = 60000;
     private ConnectionListener listener = new DefaultConnectionListener();
     private ConnectionException error = null;

Modified: qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java Tue Mar 10 23:10:57 2009
@@ -37,6 +37,8 @@
 
     public void opened(Session ssn) {}
 
+    public void resumed(Session ssn) {}
+
     public void message(Session ssn, MessageTransfer xfr)
     {
         int id = xfr.getId();

Modified: qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Tue Mar 10 23:10:57 2009
@@ -53,13 +53,15 @@
 
     private static final Logger log = Logger.get(Session.class);
 
-    enum State { NEW, DETACHED, OPEN, CLOSING, CLOSED }
+    enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
 
     class DefaultSessionListener implements SessionListener
     {
 
         public void opened(Session ssn) {}
 
+        public void resumed(Session ssn) {}
+
         public void message(Session ssn, MessageTransfer xfr)
         {
             log.info("message: %s", xfr);
@@ -107,6 +109,8 @@
     private volatile boolean flowControl = false;
     private Semaphore credit = new Semaphore(0);
 
+    private Thread resumer = null;
+
     Session(Connection connection, Binary name, long expiry)
     {
         this.connection = connection;
@@ -234,15 +238,21 @@
             for (int i = maxComplete + 1; lt(i, commandsOut); i++)
             {
                 Method m = commands[mod(i, commands.length)];
-                if (m != null)
+                if (m == null)
                 {
-                    sessionCommandPoint(m.getId(), 0);
-                    send(m);
+                    m = new ExecutionSync();
+                    m.setId(i);
                 }
+                sessionCommandPoint(m.getId(), 0);
+                send(m);
             }
 
             sessionCommandPoint(commandsOut, 0);
             sessionFlush(COMPLETED);
+            resumer = Thread.currentThread();
+            state = RESUMING;
+            listener.resumed(this);
+            resumer = null;
         }
     }
 
@@ -387,7 +397,7 @@
 
         synchronized (commands)
         {
-            if (state == DETACHED)
+            if (state == DETACHED || state == CLOSING)
             {
                 return;
             }
@@ -494,15 +504,23 @@
             {
                 if (state == DETACHED && m.isUnreliable())
                 {
-                    return;
+                    Thread current = Thread.currentThread();
+                    if (!current.equals(resumer))
+                    {
+                        return;
+                    }
                 }
 
                 if (state != OPEN && state != CLOSED)
                 {
-                    Waiter w = new Waiter(commands, timeout);
-                    while (w.hasTime() && (state != OPEN && state != CLOSED))
+                    Thread current = Thread.currentThread();
+                    if (!current.equals(resumer))
                     {
-                        w.await();
+                        Waiter w = new Waiter(commands, timeout);
+                        while (w.hasTime() && (state != OPEN && state != CLOSED))
+                        {
+                            w.await();
+                        }
                     }
                 }
 
@@ -510,8 +528,24 @@
                 {
                 case OPEN:
                     break;
+                case RESUMING:
+                    Thread current = Thread.currentThread();
+                    if (!current.equals(resumer))
+                    {
+                        throw new SessionException
+                            ("timed out waiting for resume to finish");
+                    }
+                    break;
                 case CLOSED:
-                    throw new SessionClosedException();
+                    ExecutionException exc = getException();
+                    if (exc != null)
+                    {
+                        throw new SessionException(exc);
+                    }
+                    else
+                    {
+                        throw new SessionClosedException();
+                    }
                 default:
                     throw new SessionException
                         (String.format
@@ -527,7 +561,7 @@
                     Waiter w = new Waiter(commands, timeout);
                     while (w.hasTime() && isFull(next))
                     {
-                        if (state == OPEN)
+                        if (state == OPEN || state == RESUMING)
                         {
                             try
                             {
@@ -560,7 +594,7 @@
                 {
                     sessionCommandPoint(0, 0);
                 }
-                if (expiry > 0)
+                if (expiry > 0 && !m.isUnreliable())
                 {
                     commands[mod(next, commands.length)] = m;
                     commandBytes += m.getBodySize();
@@ -828,9 +862,9 @@
             {
                 throw new SessionException("close() timed out");
             }
-
-            connection.removeSession(this);
         }
+
+        connection.removeSession(this);
     }
 
     public void exception(Throwable t)
@@ -842,7 +876,7 @@
     {
         synchronized (commands)
         {
-            if (expiry == 0)
+            if (expiry == 0 || getException() != null)
             {
                 state = CLOSED;
             }

Modified: qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java Tue Mar 10 23:10:57 2009
@@ -31,6 +31,8 @@
 
     void opened(Session session);
 
+    void resumed(Session session);
+
     void message(Session ssn, MessageTransfer xfr);
 
     void exception(Session session, SessionException exception);

Modified: qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java Tue Mar 10 23:10:57 2009
@@ -87,6 +87,8 @@
 
     public void opened(Session ssn) {}
 
+    public void resumed(Session ssn) {}
+
     public void message(Session ssn, MessageTransfer xfr)
     {
         count++;

Modified: qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java Tue Mar 10 23:10:57 2009
@@ -252,16 +252,16 @@
              {
                  for (File subFile : file.listFiles())
                  {
-                     success = delete(subFile, true) & success ;
+                     success = delete(subFile, true) && success;
                  }
 
-                 return file.delete();
+                 return success && file.delete();
              }
 
              return false;
          }
 
-         return success && file.delete();
+         return file.delete();
      }
 
 

Modified: qpid/branches/qpid-1673/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java Tue Mar 10 23:10:57 2009
@@ -74,6 +74,8 @@
 
     public void opened(Session ssn) {}
 
+    public void resumed(Session ssn) {}
+
     public void message(final Session ssn, MessageTransfer xfr)
     {
         if (queue)
@@ -122,6 +124,13 @@
         {
             // do nothing
         }
+        else if (body.startsWith("EXCP"))
+        {
+            ExecutionException exc = new ExecutionException();
+            exc.setDescription("intentional exception for testing");
+            ssn.invoke(exc);
+            ssn.close();
+        }
         else
         {
             throw new IllegalArgumentException
@@ -138,9 +147,14 @@
 
     private void send(Session ssn, String msg)
     {
+        send(ssn, msg, false);
+    }
+
+    private void send(Session ssn, String msg, boolean sync)
+    {
         ssn.messageTransfer
             ("xxx", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
-             null, msg);
+             null, msg, sync ? SYNC : NONE);
     }
 
     private Connection connect(final Condition closed)
@@ -277,6 +291,7 @@
     class TestSessionListener implements SessionListener
     {
         public void opened(Session s) {}
+        public void resumed(Session s) {}
         public void exception(Session s, SessionException e) {}
         public void message(Session s, MessageTransfer xfr)
         {
@@ -391,4 +406,41 @@
         conn.close();
     }
 
+    public void testExecutionExceptionInvoke() throws Exception
+    {
+        startServer();
+
+        Connection conn = new Connection();
+        conn.connect("localhost", port, null, "guest", "guest");
+        Session ssn = conn.createSession();
+        send(ssn, "EXCP 0");
+        Thread.sleep(3000);
+        try
+        {
+            send(ssn, "SINK 1");
+        }
+        catch (SessionException exc)
+        {
+            assertNotNull(exc.getException());
+        }
+    }
+
+    public void testExecutionExceptionSync() throws Exception
+    {
+        startServer();
+
+        Connection conn = new Connection();
+        conn.connect("localhost", port, null, "guest", "guest");
+        Session ssn = conn.createSession();
+        send(ssn, "EXCP 0", true);
+        try
+        {
+            ssn.sync();
+        }
+        catch (SessionException exc)
+        {
+            assertNotNull(exc.getException());
+        }
+    }
+
 }

Modified: qpid/branches/qpid-1673/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java Tue Mar 10 23:10:57 2009
@@ -280,6 +280,30 @@
         checkFileLists(filesBefore, filesAfter);
     }
 
+    public void testDeleteNonExistentFile()
+    {
+        File test = new File("FileUtilsTest-testDelete-"+System.currentTimeMillis());
+
+        assertTrue("File exists", !test.exists());
+        assertFalse("File is a directory", test.isDirectory());
+
+        assertTrue("Delete Succeeded ", !FileUtils.delete(test, true));
+    }
+
+    public void testDeleteNull()
+    {
+        try
+        {
+            FileUtils.delete(null, true);
+            fail("Delete with null value should throw NPE.");
+        }
+        catch (NullPointerException npe)
+        {
+            // expected path
+        }
+    }
+
+
     /**
      * Given two lists of File arrays ensure they are the same length and all entries in Before are in After
      *

Modified: qpid/branches/qpid-1673/qpid/java/cpp.cluster.testprofile
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/cpp.cluster.testprofile?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/cpp.cluster.testprofile (original)
+++ qpid/branches/qpid-1673/qpid/java/cpp.cluster.testprofile Tue Mar 10 23:10:57 2009
@@ -3,3 +3,6 @@
 test.excludesfile=${project.root}/ExcludeList ${project.root}/XAExcludeList ${project.root}/010ExcludeList
 
 profile.clustered=true
+profile.failoverMsgCount=10
+profile.failoverIterations=10
+profile.failoverRandomSeed=20080921

Propchange: qpid/branches/qpid-1673/qpid/java/management/client/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Mar 10 23:10:57 2009
@@ -0,0 +1,2 @@
+compile
+release

Modified: qpid/branches/qpid-1673/qpid/java/management/client/bin/qman-wsdm-start.cmd
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/bin/qman-wsdm-start.cmd?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/bin/qman-wsdm-start.cmd (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/bin/qman-wsdm-start.cmd Tue Mar 10 23:10:57 2009
@@ -58,7 +58,6 @@
 SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\start.jar
 SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\jetty-6.1.14.jar
 SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\jetty-util-6.1.14.jar
-SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\jetty-util-6.1.14.jar
 SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\geronimo-servlet_2.5_spec-1.2.jar
 SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\slf4j-api-1.4.0.jar
 SET CLASSPATH=%CLASSPATH%;%QMAN_LIBS%\slf4j-log4j12-1.4.0.jar

Modified: qpid/branches/qpid-1673/qpid/java/management/client/bin/qman-wsdm-start.sh
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/bin/qman-wsdm-start.sh?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/bin/qman-wsdm-start.sh (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/bin/qman-wsdm-start.sh Tue Mar 10 23:10:57 2009
@@ -58,7 +58,7 @@
 QMAN_LIBS=$QMAN_HOME/lib
 JETTY_CONFIG_FILE=$QMAN_HOME/etc/jetty.xml
  
-QMAN_CLASSPATH=$QMAN_HOME/etc:$QMAN_LIBS/start.jar:$QMAN_LIBS/jetty-6.1.14.jar:$QMAN_LIBS/jetty-util-6.1.14.jar:$QMAN_LIBS/jetty-util-6.1.14.jar:$QMAN_LIBS/geronimo-servlet_2.5_spec-1.2.jar:$QMAN_LIBS/slf4j-api-1.4.0.jar:$QMAN_LIBS/slf4j-log4j12-1.4.0.jar:$QMAN_LIBS/log4j-1.2.12.jar
+QMAN_CLASSPATH=$QMAN_HOME/etc:$QMAN_LIBS/start.jar:$QMAN_LIBS/jetty-6.1.14.jar:$QMAN_LIBS/jetty-util-6.1.14.jar:$QMAN_LIBS/geronimo-servlet_2.5_spec-1.2.jar:$QMAN_LIBS/slf4j-api-1.4.0.jar:$QMAN_LIBS/slf4j-log4j12-1.4.0.jar:$QMAN_LIBS/log4j-1.2.12.jar
 
 echo "==============================================================================="
 echo""

Modified: qpid/branches/qpid-1673/qpid/java/management/client/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/build.xml?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/build.xml (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/build.xml Tue Mar 10 23:10:57 2009
@@ -25,7 +25,6 @@
 	
 	<import file="../../module.xml"/>
 
-	<property name="war.name" value="qman.war"/>
     <property name="build.root" value="${module.build}"/>
     <property name="web.module" value="${module.build}${file.separator}wsdm-module"/>	
     <property name="web-inf.folder" value="${web.module}${file.separator}WEB-INF"/>
@@ -49,7 +48,6 @@
 	    </copy>
 	</target>
 
-	
 	<target name="libs-release" description="copy dependencies into module release">
 	    <copy todir="${module.release}${file.separator}" failonerror="true" verbose="true">
 	    	<fileset dir="${build}" casesensitive="yes" includes="${module.libs}">
@@ -59,6 +57,8 @@
 	    		<not><filename name="**/*xalan*"/></not>	    		
 	    		<not><filename name="**/*wsdl*"/></not>	    		
 	    		<not><filename name="**/*muse*"/></not>
+	    		<not><filename name="**/*jsp*"/></not>
+		    		<not><filename name="**/*core-3.1.1.jar*"/></not>
 	    	</fileset>
 	    </copy>
 		<copy todir="${module.release}${file.separator}lib" failonerror="true">
@@ -195,7 +195,7 @@
 
 			<batchtest fork="${test.fork}" todir="${module.results}">
 	        	<fileset dir="${module.test.src}" excludes="${module.test.excludes}">
-					<include name="**/${test}.java"/>
+					<include name="**/${test}.java"/>	        		
 	        	</fileset>
 			</batchtest>
 		</junit>

Modified: qpid/branches/qpid-1673/qpid/java/management/client/console/brokers_management.jsp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/console/brokers_management.jsp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/console/brokers_management.jsp (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/console/brokers_management.jsp Tue Mar 10 23:10:57 2009
@@ -97,7 +97,7 @@
 	                        					Virtual Host : 
 	                        				</td>
 	                        				<td>
-	                        					<input type="text" name="port"/>
+	                        					<input type="text" name="virtualHost"/>
 	                        				</td>
 	                        				<td style="font-size: x-small;">
 	                        					The virtual host name.

Modified: qpid/branches/qpid-1673/qpid/java/management/client/console/fragments/menu.jsp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/console/fragments/menu.jsp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/console/fragments/menu.jsp (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/console/fragments/menu.jsp Tue Mar 10 23:10:57 2009
@@ -3,8 +3,8 @@
 		<a href="<%=request.getContextPath()%>/console"> &nbsp; &gt; System Overview</a>
 		<a href="<%=request.getContextPath()%>/brokers_management">&nbsp; &gt; Brokers Management</a>
 		<a href="<%=request.getContextPath()%>/resources_management">&nbsp; &gt; Resources Management</a>
-		<a href="<%=request.getContextPath()%>/tbd.jsp">&nbsp; &gt; Subscriptions Management</a>
-		<a href="<%=request.getContextPath()%>/tbd.jsp">&nbsp; &gt; System Health</a>
+		<a>&nbsp; &gt; Subscriptions Management</a>
+		<a>&nbsp; &gt; System Health</a>
 		<a href="<%=request.getContextPath()%>/logging_configuration">&nbsp; &gt; Logging Configuration</a>
 	</div>	
 </div>

Modified: qpid/branches/qpid-1673/qpid/java/management/client/console/wsdm_rmd_perspective.jsp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/console/wsdm_rmd_perspective.jsp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/console/wsdm_rmd_perspective.jsp (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/console/wsdm_rmd_perspective.jsp Tue Mar 10 23:10:57 2009
@@ -18,7 +18,7 @@
 	<body>
 		<div id="page" align="center">
 			<jsp:include page="/fragments/header.jsp">
-				<jsp:param name="title" value="Resource Management - WS-DM WSDL Perspective"/>
+				<jsp:param name="title" value="Resource Management - WS-DM RMD Perspective"/>
 			</jsp:include>
 				
 			<div id="content" align="center">
@@ -63,12 +63,8 @@
                         </tr>                                          
                         <tr>    
                         	<td valign="top">
-								<div class="panel" align="justify" style="height:500px; overflow-y:auto;">								
-									<c:set var="xml">
-										${wsdl} 	  									
-  									</c:set>
-  									<c:import var="xslt" url="wsdl-viewer.xsl" />
-									<x:transform xml="${xml}" xslt="${xslt}" />
+								<div class="panel" align="left" style="height:500px; width=200px; overflow-y:auto; font-size: smaller; font-weight:bold;">								
+									<pre>  <c:out value="${rmd}" /> </pre>
                             	</div>
                             </td>
                         </tr>

Modified: qpid/branches/qpid-1673/qpid/java/management/client/console/wsdm_wsdl_perspective.jsp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/console/wsdm_wsdl_perspective.jsp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/console/wsdm_wsdl_perspective.jsp (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/console/wsdm_wsdl_perspective.jsp Tue Mar 10 23:10:57 2009
@@ -63,12 +63,8 @@
                         </tr>                                          
                         <tr>    
                         	<td valign="top">
-								<div class="panel" align="justify" style="height:500px; overflow-y:auto;">								
-									<c:set var="xml">
-										${wsdl} 	  									
-  									</c:set>
-  									<c:import var="xslt" url="wsdl-viewer.xsl" />
-									<x:transform xml="${xml}" xslt="${xslt}" />
+								<div class="panel" align="left" style="height:500px; width=200px; overflow-y:auto; font-size: smaller; font-weight:bold;">								
+									<pre>  <c:out value="${wsdl}" /> </pre>
                             	</div>
                             </td>
                         </tr>

Modified: qpid/branches/qpid-1673/qpid/java/management/client/src/example/org/apache/qpid/management/example/ConsumerAndProducerExample.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/src/example/org/apache/qpid/management/example/ConsumerAndProducerExample.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/src/example/org/apache/qpid/management/example/ConsumerAndProducerExample.java (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/src/example/org/apache/qpid/management/example/ConsumerAndProducerExample.java Tue Mar 10 23:10:57 2009
@@ -26,7 +26,9 @@
 import org.apache.muse.util.xml.XPathUtils;
 import org.apache.muse.ws.addressing.EndpointReference;
 import org.apache.muse.ws.addressing.soap.SoapFault;
+import org.apache.muse.ws.notification.impl.FilterCollection;
 import org.apache.muse.ws.notification.impl.MessagePatternFilter;
+import org.apache.muse.ws.notification.impl.ProducerPropertiesFilter;
 import org.apache.muse.ws.notification.impl.TopicFilter;
 import org.apache.muse.ws.notification.remote.NotificationProducerClient;
 import org.apache.qpid.management.Names;
@@ -65,11 +67,11 @@
 	void executeExample(String host, int port) throws Exception
 	{ 
 		// This is QMan... 
-		URI producerURI = URI.create("http://"+host+":"+port+"/qman/services/consumer");
+		URI producerURI = URI.create("http://"+host+":"+port+"/qman/services/adapter");
 		
 		// ...and this is QMan too! Note that it has an hidden consumer capability that is used in 
 		// order to run successfully this example...
-		URI consumerURI = producerURI;
+		URI consumerURI = URI.create("http://"+host+":"+port+"/qman/services/consumer");
 		
 		EndpointReference producerEPR = new EndpointReference(producerURI);		
 		EndpointReference consumerEPR = new EndpointReference(consumerURI);
@@ -93,8 +95,11 @@
         // Example 6: a MessageFilter is installed in order to listen only for connection events 
         // (connections created or removed). The subscription will expire in 10 seconds.
         allMessagesWithMessageFilterAndTerminationTime(producerEPR,consumerEPR);
+		
+		// Example 7 : a subscription with more than one filter.
+		complexSubscription(producerEPR, consumerEPR);
 	}	
-	
+
 	/**
 	 * Makes a subscription on all topics / all messages without an expiry date.
 	 * 
@@ -223,6 +228,41 @@
         		new Date(System.currentTimeMillis() + 10000));	// Termination Time 
 	}
 	
+	/**
+	 * Makes a subscription on a specifc topic with an expiry date.
+	 * Only messages published on the given topic will be delivered to the given consumer.
+	 * The subscription will end after 10 seconds
+	 * 
+	 * @param producer the producer endpoint reference.
+	 * @param consumer the consumer endpoint reference .
+	 * @throws SoapFault when the subscription cannot be made.
+	 */
+	private void complexSubscription(EndpointReference producer, EndpointReference consumer) throws SoapFault
+	{
+		NotificationProducerClient producerClient = new NotificationProducerClient(producer);
+        producerClient.setTrace(true);
+
+        FilterCollection filter = new FilterCollection();
+        
+		TopicFilter topicFilter = new TopicFilter(Names.EVENTS_LIFECYLE_TOPIC_NAME);
+        MessagePatternFilter messageFilter= new MessagePatternFilter(
+        		"/wsnt:NotificationMessage/wsnt:Message/qman:LifeCycleEvent/qman:Resource/qman:Name/text()='connection'", // expression (XPath)
+        		XPathUtils.NAMESPACE_URI); // Dialect : the only supported dialect is XPath 1.0
+
+        ProducerPropertiesFilter producerFilter = new ProducerPropertiesFilter(
+        		"boolean(/*/MgtPubInterval > 100 and /*/MsgTotalEnqueues > 56272)",
+        		XPathUtils.NAMESPACE_URI);
+        
+        filter.addFilter(topicFilter);
+        filter.addFilter(messageFilter);
+        filter.addFilter(producerFilter);
+		
+        producerClient.subscribe(
+        		consumer,	// Consumer Endpoint reference
+        		filter,			// Topic Filter
+        		new Date(System.currentTimeMillis() + 10000));	// Termination Time 
+	}
+	
 	@Override
 	void printOutExampleDescription()
 	{
@@ -245,4 +285,9 @@
 		System.out.println("A subscription with a termination time will have a predefined expiry"); 
 		System.out.println("date while if there's no termination the subscription will never expire.");
 	}
+	
+	public static void main(String[] args)
+	{
+		new ConsumerAndProducerExample().execute(new String[]{"localhost","8080"});
+	}
 }

Modified: qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/muse.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/muse.xml?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/muse.xml (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/muse.xml Tue Mar 10 23:10:57 2009
@@ -29,7 +29,7 @@
    		<java-serializer-class>org.apache.qpid.management.wsdm.muse.serializer.DateSerializer</java-serializer-class>
 	</custom-serializer>
 	<router>
-		<java-router-class>org.apache.muse.core.routing.SimpleResourceRouter</java-router-class>
+		<java-router-class>org.apache.muse.ws.resource.impl.WsResourceRouter</java-router-class>
 		<logging>
 			<log-file>log/muse.log</log-file>
 			<log-level>SEVERE</log-level>

Modified: qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/Messages.java Tue Mar 10 23:10:57 2009
@@ -170,6 +170,6 @@
 	String QMAN_100037_INVOKE_OPERATION_FAILURE = "<QMAN-100037> : Operation Invocation failure for operation.";	
 	String QMAN_100038_UNABLE_TO_SEND_WS_NOTIFICATION = "<QMAN-100038> : Unable to send notification.";	
 	String QMAN_100039_UNABLE_TO_CONFIGURE_PROPERLY_WORKER_MANAGER = "<QMAN-100039> : Unable to properly configure WorkManager. A malformed property (NaN) was given as input parameter.";	
-	
+	String QMAN_100040_UNABLE_TO_LOCATE_WSRP_PROPERTIES = "<QMAN-100040> : Unable to evaluate the WSRP XPath expression on resource WSDL.";	
 	
 }
\ No newline at end of file

Modified: qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/Names.java Tue Mar 10 23:10:57 2009
@@ -45,7 +45,7 @@
     public static String CLASS = "class";
     public static String EVENT = "event";
     public static String OBJECT_ID="objectId";    
-    public static String BROKER_ID = "brokerID";
+    public static String BROKER_ID = "brokerId";
     public static String DOMAIN_NAME = "Q-MAN";
         
     public static String ARG_COUNT_PARAM_NAME = "argCount";
@@ -86,7 +86,7 @@
 	    			new StringBuilder()
 	    				.append(DOMAIN_NAME)
 	    				.append(':')
-	    				.append("Type=Service")
+	    				.append("Name=QMan,Type=Service")
 	    				.toString());
 	    } catch(Exception exception)
 	    {

Modified: qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java (original)
+++ qpid/branches/qpid-1673/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/Configuration.java Tue Mar 10 23:10:57 2009
@@ -30,7 +30,16 @@
 import org.apache.qpid.management.Names;
 import org.apache.qpid.management.domain.handler.base.IMessageHandler;
 import org.apache.qpid.management.domain.model.AccessMode;
+import org.apache.qpid.management.domain.model.type.AbsTime;
+import org.apache.qpid.management.domain.model.type.DeltaTime;
+import org.apache.qpid.management.domain.model.type.ObjectReference;
+import org.apache.qpid.management.domain.model.type.Str16;
+import org.apache.qpid.management.domain.model.type.Str8;
 import org.apache.qpid.management.domain.model.type.Type;
+import org.apache.qpid.management.domain.model.type.Uint16;
+import org.apache.qpid.management.domain.model.type.Uint32;
+import org.apache.qpid.management.domain.model.type.Uint64;
+import org.apache.qpid.management.domain.model.type.Uint8;
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageProperties;
@@ -71,7 +80,12 @@
     private Configuration()
     {
         defineQueueNames();
+        
         createHeaderForCommandMessages();
+        
+        addAccessModeMappings();
+        
+        addTypeMappings();
     }
 
     void clean()
@@ -90,9 +104,11 @@
     }  
     
     /**
-     * Returns true if this configuration has at least one broker connection data.
+     * Returns true if this configuration has at least 
+     * one broker configured.
      * 
-     * @return true if this configuration has at least one broker connection data.
+     * @return true if this configuration has at least one 
+     * 				broker configured.
      */
     public boolean hasOneOrMoreBrokersDefined()
     {
@@ -245,26 +261,46 @@
     /**
      * Adds a new type mapping to this configuration.
      * 
-     * @param mapping the type mapping that will be added.
+     * @param code the code that will be associated with the declared type.
+     * @param type the type.
+     * @param vailidatorClassName the FQN of the validator class that will be 
+     * 				associated with the given type.
      */
-    void addTypeMapping(TypeMapping mapping) {
-        int code = mapping.getCode();
-        Type type = mapping.getType();
-        String validatorClassName = mapping.getValidatorClassName();
-        _typeMappings.put(code, type);
+    void addTypeMapping(int code, Type type, String validatorClassName) {
+    	_typeMappings.put(code, type);
         _validators.put(type, validatorClassName);
         
-        LOGGER.info(Messages.QMAN_000005_TYPE_MAPPING_CONFIGURED, code,type,validatorClassName);
+        LOGGER.info(
+        		Messages.QMAN_000005_TYPE_MAPPING_CONFIGURED, 
+        		code,
+        		type,
+        		validatorClassName);
     }
-    
+
+
+    /**
+     * Adds a new type mapping to this configuration.
+     * 
+     * @param code the code that will be associated with the declared type.
+     * @param type the type.
+     */
+    void addTypeMapping(int code, Type type) {
+        _typeMappings.put(code, type);
+        
+        LOGGER.info(
+        		Messages.QMAN_000005_TYPE_MAPPING_CONFIGURED, 
+        		code,
+        		type,
+        		"not configured for this type.");
+    }
+
     /**
      * Adds a new access mode mapping to this configuration.
      * 
-     * @param mapping the mapping that will be added.
+     * @param code the code that will be associated with the access mode,
+     * @param accessMode the accessMode.
      */
-    void addAccessModeMapping(AccessModeMapping mapping){
-        int code = mapping.getCode();
-        AccessMode accessMode = mapping.getAccessMode();
+    void addAccessModeMapping(int code, AccessMode accessMode){
         _accessModes.put(code, accessMode);
         
         LOGGER.info(Messages.QMAN_000006_ACCESS_MODE_MAPPING_CONFIGURED, code,accessMode);        
@@ -420,4 +456,34 @@
 	{
 		this._keepAliveTime = keepAliveTime;
 	}
+	
+	/**
+     * Configures access mode mappings.
+     * An access mode mapping is an association between a code and an access mode.
+     */
+    private void addAccessModeMappings() {
+    	addAccessModeMapping(1,AccessMode.RC);
+    	addAccessModeMapping(2,AccessMode.RW);
+    	addAccessModeMapping(3,AccessMode.RO);
+	}	
+    
+	/**
+     * Configures type mappings.
+     * A type mapping is an association between a code and a management type.
+     */
+    private void addTypeMappings()
+    {
+    	addTypeMapping(1,new Uint8(),Names.NUMBER_VALIDATOR);
+    	addTypeMapping(2,new Uint16(),Names.NUMBER_VALIDATOR);
+    	addTypeMapping(3,new Uint32(),Names.NUMBER_VALIDATOR);
+    	addTypeMapping(4,new Uint64(),Names.NUMBER_VALIDATOR);
+    	addTypeMapping(6,new Str8(),Names.STRING_VALIDATOR);
+    	addTypeMapping(7,new Str16(),Names.STRING_VALIDATOR);
+    	addTypeMapping(8,new AbsTime());
+    	addTypeMapping(9,new DeltaTime());
+    	addTypeMapping(10,new ObjectReference());
+    	addTypeMapping(11,new org.apache.qpid.management.domain.model.type.Boolean());
+    	addTypeMapping(14,new org.apache.qpid.management.domain.model.type.Uuid());
+    	addTypeMapping(15,new org.apache.qpid.management.domain.model.type.Map());
+    }        
 }
\ No newline at end of file



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org