You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/10/12 01:22:20 UTC

svn commit: r824198 [7/9] - in /qpid/branches/java-network-refactor: ./ qpid/cpp/ qpid/cpp/bindings/qmf/ qpid/cpp/bindings/qmf/python/ qpid/cpp/bindings/qmf/ruby/ qpid/cpp/bindings/qmf/tests/ qpid/cpp/boost-1.32-support/ qpid/cpp/etc/ qpid/cpp/examples...

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Sun Oct 11 23:22:08 2009
@@ -100,6 +100,18 @@
     }
 
     /**
+     * Create an XASession with default prefetch values of:
+     * High = MaxPrefetch
+     * Low  = MaxPrefetch / 2
+     * @return XASession
+     * @throws JMSException
+     */
+    public XASession createXASession() throws JMSException
+    {
+        return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
+    }
+
+    /**
      * create an XA Session and start it if required.
      */
     public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
@@ -285,7 +297,6 @@
         _qpidConnection.setIdleTimeout(l);
     }
 
-    @Override
     public int getMaxChannelID()
     {
        return Integer.MAX_VALUE;

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Sun Oct 11 23:22:08 2009
@@ -191,6 +191,18 @@
                 }, _conn).execute();
     }
 
+    /**
+     * Create an XASession with default prefetch values of:
+     * High = MaxPrefetch
+     * Low  = MaxPrefetch / 2
+     * @return XASession
+     * @throws JMSException thrown if there is a problem creating the session.
+     */        
+    public XASession createXASession() throws JMSException
+    {
+        return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
+    }
+
     private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
             throws AMQException, FailoverException
     {
@@ -290,7 +302,6 @@
     
     public void setIdleTimeout(long l){}
 
-    @Override
     public int getMaxChannelID()
     {
         return (int) (Math.pow(2, 16)-1);

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Sun Oct 11 23:22:08 2009
@@ -21,7 +21,6 @@
 package org.apache.qpid.client;
 
 import java.io.Serializable;
-import java.io.IOException;
 import java.net.URISyntaxException;
 import java.text.MessageFormat;
 import java.util.ArrayList;
@@ -60,6 +59,7 @@
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
 
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
@@ -113,7 +113,6 @@
 public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
 {
 
-
     public static final class IdToConsumerMap<C extends BasicMessageConsumer>
     {
         private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -198,16 +197,32 @@
      * The default value for immediate flag used by producers created by this session is false. That is, a consumer does
      * not need to be attached to a queue.
      */
-    protected static final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+    protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
 
     /**
      * The default value for mandatory flag used by producers created by this session is true. That is, server will not
      * silently drop messages where no queue is connected to the exchange for the message.
      */
-    protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+    protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+
+    protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false"));
+
+    /**
+     * The period to wait while flow controlled before sending a log message confirming that the session is still
+     * waiting on flow control being revoked
+     */
+    protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+
+    /**
+     * The period to wait while flow controlled before declaring a failure
+     */
+    public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
+    protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure",
+                                                                  DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
 
     protected final boolean DECLARE_QUEUES =
         Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
+
     protected final boolean DECLARE_EXCHANGES =
         Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
 
@@ -244,10 +259,10 @@
     private int _ticket;
 
     /** Holds the high mark for prefetched message, at which the session is suspended. */
-    private int _defaultPrefetchHighMark;
+    private int _prefetchHighMark;
 
     /** Holds the low mark for prefetched messages, below which the session is resumed. */
-    private int _defaultPrefetchLowMark;
+    private int _prefetchLowMark;
 
     /** Holds the message listener, if any, which is attached to this session. */
     private MessageListener _messageListener = null;
@@ -428,13 +443,13 @@
 
         _channelId = channelId;
         _messageFactoryRegistry = messageFactoryRegistry;
-        _defaultPrefetchHighMark = defaultPrefetchHighMark;
-        _defaultPrefetchLowMark = defaultPrefetchLowMark;
+        _prefetchHighMark = defaultPrefetchHighMark;
+        _prefetchLowMark = defaultPrefetchLowMark;
 
         if (_acknowledgeMode == NO_ACKNOWLEDGE)
         {
             _queue =
-                    new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
+                    new FlowControllingBlockingQueue(_prefetchHighMark, _prefetchLowMark,
                                                      new FlowControllingBlockingQueue.ThresholdListener()
                                                      {
                                                          private final AtomicBoolean _suspendState = new AtomicBoolean();
@@ -442,7 +457,7 @@
                                                          public void aboveThreshold(int currentValue)
                                                          {
                                                              _logger.debug(
-                                                                     "Above threshold(" + _defaultPrefetchHighMark
+                                                                     "Above threshold(" + _prefetchHighMark
                                                                      + ") so suspending channel. Current value is " + currentValue);
                                                              _suspendState.set(true);
                                                              new Thread(new SuspenderRunner(_suspendState)).start();
@@ -452,7 +467,7 @@
                                                          public void underThreshold(int currentValue)
                                                          {
                                                              _logger.debug(
-                                                                     "Below threshold(" + _defaultPrefetchLowMark
+                                                                     "Below threshold(" + _prefetchLowMark
                                                                      + ") so unsuspending channel. Current value is " + currentValue);
                                                              _suspendState.set(false);
                                                              new Thread(new SuspenderRunner(_suspendState)).start();
@@ -462,7 +477,7 @@
         }
         else
         {
-            _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
+            _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
         }
     }
 
@@ -759,8 +774,16 @@
 
         try
         {
+            //Check that we are clean to commit.
+            if (_failedOverDirty)
+            {
+                rollback();
+
+                throw new TransactionRolledBackException("Connection failover has occured since last send. " +
+                                                         "Forced rollback");
+            }
+
 
-            // TGM FIXME: what about failover?
             // Acknowledge all delivered messages
             while (true)
             {
@@ -778,7 +801,7 @@
         }
         catch (AMQException e)
         {
-            throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+            throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e);
         }
         catch (FailoverException e)
         {
@@ -870,7 +893,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
+        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, false,
                                   messageSelector, null, true, true);
     }
 
@@ -878,7 +901,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic), null, null,
+        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
                                   false, false);
     }
 
@@ -886,7 +909,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null,
+        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
                                   false, false);
     }
 
@@ -894,7 +917,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, (destination instanceof Topic),
+        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
                                   messageSelector, null, false, false);
     }
 
@@ -903,7 +926,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, (destination instanceof Topic),
+        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
                                   messageSelector, null, false, false);
     }
 
@@ -912,7 +935,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true,
+        return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, true,
                                   messageSelector, null, false, false);
     }
 
@@ -1336,17 +1359,17 @@
 
     public int getDefaultPrefetch()
     {
-        return _defaultPrefetchHighMark;
+        return _prefetchHighMark;
     }
 
     public int getDefaultPrefetchHigh()
     {
-        return _defaultPrefetchHighMark;
+        return _prefetchHighMark;
     }
 
     public int getDefaultPrefetchLow()
     {
-        return _defaultPrefetchLowMark;
+        return _prefetchLowMark;
     }
 
     public AMQShortString getDefaultQueueExchangeName()
@@ -1491,6 +1514,8 @@
 
             sendRecover();
 
+            markClean();
+            
             if (!isSuspended)
             {
                 suspendChannel(false);
@@ -1559,6 +1584,14 @@
                     suspendChannel(true);
                 }
 
+                // Let the dispatcher know that all the incomming messages
+                // should be rolled back(reject/release)
+                _rollbackMark.set(_highestDeliveryTag.get());
+
+                syncDispatchQueue();      
+
+                _dispatcher.rollback();
+
                 releaseForRollback();
 
                 sendRollback();
@@ -1851,26 +1884,58 @@
 
     void failoverPrep()
     {
-        startDispatcherIfNecessary();
         syncDispatchQueue();
     }
 
     void syncDispatchQueue()
     {
-        final CountDownLatch signal = new CountDownLatch(1);
-        _queue.add(new Dispatchable() {
-            public void dispatch(AMQSession ssn)
+        if (Thread.currentThread() == _dispatcherThread)
+        {
+            while (!_closed.get() && !_queue.isEmpty())
             {
-                signal.countDown();
+                Dispatchable disp;
+                try
+                {
+                    disp = (Dispatchable) _queue.take();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+
+                // Check just in case _queue becomes empty, it shouldn't but
+                // better than an NPE.
+                if (disp == null)
+                {
+                    _logger.debug("_queue became empty during sync.");
+                    break;
+                }
+
+                disp.dispatch(AMQSession.this);
             }
-        });
-        try
-        {
-            signal.await();
         }
-        catch (InterruptedException e)
+        else
         {
-            throw new RuntimeException(e);
+            startDispatcherIfNecessary();
+
+            final CountDownLatch signal = new CountDownLatch(1);
+
+            _queue.add(new Dispatchable()
+            {
+                public void dispatch(AMQSession ssn)
+                {
+                    signal.countDown();
+                }
+            });
+
+            try
+            {
+                signal.await();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
     }
 
@@ -2233,7 +2298,7 @@
     private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
             throws JMSException
     {
-        return createProducerImpl(destination, mandatory, immediate, false);
+        return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND);
     }
 
     private P createProducerImpl(final Destination destination, final boolean mandatory,
@@ -2704,15 +2769,26 @@
     public void setFlowControl(final boolean active)
     {
         _flowControl.setFlowControl(active);
+        _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
     }
 
-    public void checkFlowControl() throws InterruptedException
+    public void checkFlowControl() throws InterruptedException, JMSException
     {
+        long expiryTime = 0L;
         synchronized (_flowControl)
         {
-            while (!_flowControl.getFlowControl())
+            while (!_flowControl.getFlowControl() &&
+                   (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE)
+                                     : expiryTime) >= System.currentTimeMillis() )
+            {
+
+                _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD);
+                _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control");
+            }
+            if(!_flowControl.getFlowControl())
             {
-                _flowControl.wait();
+                _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
+                throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control");
             }
         }
 

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Sun Oct 11 23:22:08 2009
@@ -414,9 +414,6 @@
 
     public void releaseForRollback()
     {
-        startDispatcherIfNecessary();
-        syncDispatchQueue();
-        _dispatcher.rollback();
         getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
         _txRangeSet.clear();
         _txSize = 0;

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Sun Oct 11 23:22:08 2009
@@ -195,6 +195,12 @@
 
     public void releaseForRollback()
     {
+        // Reject all the messages that have been received in this session and
+        // have not yet been acknowledged. Should look to remove
+        // _deliveredMessageTags and use _txRangeSet as used by 0-10.
+        // Otherwise messages will be able to arrive out of order to a second
+        // consumer on the queue. Whilst this is within the JMS spec it is not
+        // user friendly and avoidable.
         while (true)
         {
             Long tag = _deliveredMessageTags.poll();
@@ -205,11 +211,6 @@
 
             rejectMessage(tag, true);
         }
-
-        if (_dispatcher != null)
-        {
-            _dispatcher.rollback();
-        }
     }
 
     public void rejectMessage(long deliveryTag, boolean requeue)

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Sun Oct 11 23:22:08 2009
@@ -779,6 +779,7 @@
                 else
                 {
                     _session.addDeliveredMessage(msg.getDeliveryTag());
+                    _session.markDirty();
                 }
 
                 break;

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Sun Oct 11 23:22:08 2009
@@ -60,7 +60,7 @@
     /**
      * Priority of messages created by this producer.
      */
-    private int _messagePriority;
+    private int _messagePriority = Message.DEFAULT_PRIORITY;
 
     /**
      * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java Sun Oct 11 23:22:08 2009
@@ -47,7 +47,7 @@
     public synchronized XASession createXASession() throws JMSException
     {
         checkNotClosed();
-        return _delegate.createXASession(_maxPrefetch, _maxPrefetch / 2);
+        return _delegate.createXASession();
     }
 
     //-- Interface  XAQueueConnection

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java Sun Oct 11 23:22:08 2009
@@ -39,7 +39,7 @@
      * type: long
      */
     public static final String MAX_PREFETCH_PROP_NAME = "max_prefetch";
-    public static final String MAX_PREFETCH_DEFAULT = "5000";
+    public static final String MAX_PREFETCH_DEFAULT = "500";
 
     /**
      * When true a sync command is sent after every persistent messages.

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Sun Oct 11 23:22:08 2009
@@ -21,6 +21,7 @@
 package org.apache.qpid.client.failover;
 
 import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.state.AMQStateManager;
 
@@ -134,6 +135,7 @@
             // a slightly more complex state model therefore I felt it was worthwhile doing this.
             AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
 
+            // Use a fresh new StateManager for the reconnection attempts
             _amqProtocolHandler.setStateManager(new AMQStateManager());
 
 

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java Sun Oct 11 23:22:08 2009
@@ -39,6 +39,7 @@
     private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
     private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
     private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
+    private static final ChannelFlowMethodHandler _channelFlowMethodHandler = ChannelFlowMethodHandler.getInstance();
     private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
     private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
     private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
@@ -159,7 +160,8 @@
 
     public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
     {
-        return false;
+        _channelFlowMethodHandler.methodReceived(_session, body, channelId);
+        return true;
     }
 
     public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Sun Oct 11 23:22:08 2009
@@ -308,7 +308,6 @@
      */
     public void exception(Throwable cause)
     {
-        _logger.info("AS: HELLO");
         if (_failoverState == FailoverState.NOT_STARTED)
         {
             // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java Sun Oct 11 23:22:08 2009
@@ -253,7 +253,7 @@
         }
         else
         {
-            System.err.println("WARNING: new error arrived while old one not yet processed");
+            System.err.println("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
         }
 
         try

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java Sun Oct 11 23:22:08 2009
@@ -189,7 +189,8 @@
     {
         synchronized (_brokerListLock)
         {
-            return _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+            _currentBrokerDetail = _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+            return _currentBrokerDetail;
         }
     }   
     
@@ -214,7 +215,15 @@
                 broker.getHost().equals(_currentBrokerDetail.getHost()) &&
                 broker.getPort() == _currentBrokerDetail.getPort())
             {
-                return getNextBrokerDetails();
+                if (_connectionDetails.getBrokerCount() > 1)
+                {
+                    return getNextBrokerDetails();
+                }
+                else
+                {
+                    _failedAttemps ++;
+                    return null;
+                }
             }
 
             String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY);

Modified: qpid/branches/java-network-refactor/qpid/java/common/bin/qpid-run
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/bin/qpid-run?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/bin/qpid-run (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/bin/qpid-run Sun Oct 11 23:22:08 2009
@@ -111,6 +111,7 @@
 fi
 
 log $INFO System Properties set to $SYSTEM_PROPS
+log $INFO QPID_OPTS set to $QPID_OPTS
 
 program=$(basename $0)
 sourced=${BASH_SOURCE[0]}

Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java Sun Oct 11 23:22:08 2009
@@ -51,7 +51,6 @@
         System.out.println("CLOSED");
     }
 
-    @Override
     public void setIdleTimeout(long l)
     {
         // TODO Auto-generated method stub

Propchange: qpid/branches/java-network-refactor/qpid/java/lib/org.osgi.core_1.0.0.jar
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
 /incubator/qpid/trunk/qpid/java/lib/org.osgi.core_1.0.0.jar:443187-720930
-/qpid/trunk/qpid/java/lib/org.osgi.core_1.0.0.jar:805429-816233
+/qpid/trunk/qpid/java/lib/org.osgi.core_1.0.0.jar:805429-824132

Propchange: qpid/branches/java-network-refactor/qpid/java/management/client/src/main/java/org/apache/qpid/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
 /incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management:443187-703176
-/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management:805429-816233
+/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management:805429-824132

Propchange: qpid/branches/java-network-refactor/qpid/java/management/client/src/test/java/org/apache/qpid/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
 /incubator/qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management:443187-703176
-/qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management:805429-816233
+/qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management:805429-824132

Modified: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java Sun Oct 11 23:22:08 2009
@@ -110,7 +110,7 @@
      * Reloads the log4j configuration file, applying any changes made. 
      * 
      * @throws IOException
-     * @since Qpid JMX API 1.3
+     * @since Qpid JMX API 1.4
      */
     @MBeanOperation(name = "reloadConfigFile", description = "Reload the log4j xml configuration file", impact = MBeanOperationInfo.ACTION)
     void reloadConfigFile() throws IOException;

Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,3 +1,3 @@
 /qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java:757268
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:805429-824132

Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:805429-824132

Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:805429-824132

Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,3 +1,3 @@
 /qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java:757257
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:805429-824132

Modified: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java Sun Oct 11 23:22:08 2009
@@ -43,7 +43,7 @@
      *  Qpid JMX API 1.1 can be assumed.
      */
     int QPID_JMX_API_MAJOR_VERSION = 1;
-    int QPID_JMX_API_MINOR_VERSION = 3;
+    int QPID_JMX_API_MINOR_VERSION = 4;
     
     
     /**

Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:805429-824132

Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,3 +1,3 @@
 /qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanAttribute.java:757268
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanAttribute.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:805429-824132

Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,3 +1,3 @@
 /qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanConstructor.java:757268
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanConstructor.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:805429-824132

Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,3 +1,3 @@
 /qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanDescription.java:757268
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanDescription.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:805429-824132

Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,3 +1,3 @@
 /qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperation.java:757268
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperation.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:805429-824132

Propchange: qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,3 +1,3 @@
 /qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperationParameter.java:757268
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperationParameter.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:805429-816233
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:805429-824132

Propchange: qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
 /qpid/branches/jmx_mc_gsoc09/qpid/java/management/eclipse-plugin/src:788755
-/qpid/trunk/qpid/java/management/eclipse-plugin/src:805429-816233
+/qpid/trunk/qpid/java/management/eclipse-plugin/src:805429-824132

Modified: qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java Sun Oct 11 23:22:08 2009
@@ -48,7 +48,7 @@
 
     //max supported broker management interface supported by this release of the management console
     public static final int SUPPORTED_QPID_JMX_API_MAJOR_VERSION = 1;
-    public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 3;
+    public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 4;
     
     public static final String DATA_DIR = System.getProperty("user.home") + File.separator + ".qpidmc";
     

Modified: qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/logging/ConfigurationFileTabControl.java Sun Oct 11 23:22:08 2009
@@ -349,7 +349,7 @@
         _logWatchIntervalLabel.setFont(ApplicationRegistry.getFont(FONT_BOLD));
         _logWatchIntervalLabel.setLayoutData(new GridData(SWT.LEFT, SWT.CENTER, false, true));
         
-        if(_ApiVersion.greaterThanOrEqualTo(1, 3))
+        if(_ApiVersion.greaterThanOrEqualTo(1, 4))
         {
             Group reloadConfigFileGroup = new Group(attributesComposite, SWT.SHADOW_NONE);
             reloadConfigFileGroup.setBackground(attributesComposite.getBackground());

Propchange: qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
 /incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:443187-726139
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:805429-816233
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:805429-824132

Propchange: qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
 /incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:443187-726139
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:805429-816233
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:805429-824132

Propchange: qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
 /incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:443187-726139
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:805429-816233
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:805429-824132

Propchange: qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
 /incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:443187-726139
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:805429-816233
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:805429-824132

Propchange: qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
 /incubator/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:443187-726139
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:805429-816233
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:805429-824132

Modified: qpid/branches/java-network-refactor/qpid/java/module.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/module.xml?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/module.xml (original)
+++ qpid/branches/java-network-refactor/qpid/java/module.xml Sun Oct 11 23:22:08 2009
@@ -261,6 +261,7 @@
       <jvmarg value="${jvm.args}"/>
 
       <sysproperty key="amqj.logging.level" value="${amqj.logging.level}"/>
+      <sysproperty key="amqj.server.logging.level" value="${amqj.server.logging.level}"/>
       <sysproperty key="amqj.protocol.logging.level" value="${amqj.protocol.logging.level}"/>
       <sysproperty key="log4j.debug" value="${log4j.debug}"/>
       <sysproperty key="root.logging.level" value="${root.logging.level}"/>
@@ -269,6 +270,7 @@
       <sysproperty key="java.naming.provider.url" value="${java.naming.provider.url}"/>
       <sysproperty key="broker" value="${broker}"/>
       <sysproperty key="broker.clean" value="${broker.clean}"/>
+      <sysproperty key="broker.clean.between.tests" value="${broker.clean.between.tests}"/>
       <sysproperty key="broker.version" value="${broker.version}"/>
       <sysproperty key="broker.ready" value="${broker.ready}" />
       <sysproperty key="broker.stopped" value="${broker.stopped}" />

Propchange: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,2 +1,2 @@
 /incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java:443187-707694
-/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java:805429-816233
+/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java:805429-824132

Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/BrokerStartupTest.java Sun Oct 11 23:22:08 2009
@@ -78,15 +78,23 @@
             // Add an invalid value
             _broker += " -l invalid";
 
-            // The release-bin build of the broker uses this log4j configuration
-            // so set up the broker environment to use it for this test.
-            // Also include -Dlog4j.debug so we can validate that it picked up this config
-            setBrokerEnvironment("QPID_OPTS", "-Dlog4j.debug -Dlog4j.configuration=file:" + System.getProperty(QPID_HOME) + "/../broker/src/main/java/log4j.properties");
+            // The  broker has a built in default log4j configuration set up
+            // so if the the broker cannot load the -l value it will use default
+            // use this default. Test that this is correctly loaded, by
+            // including -Dlog4j.debug so we can validate.
+            setBrokerEnvironment("QPID_OPTS", "-Dlog4j.debug");
 
             // Disable all client logging so we can test for broker DEBUG only.
-            Logger.getRootLogger().setLevel(Level.WARN);
-            Logger.getLogger("qpid.protocol").setLevel(Level.WARN);
-            Logger.getLogger("org.apache.qpid").setLevel(Level.WARN);
+            setLoggerLevel(Logger.getRootLogger(), Level.WARN);
+            setLoggerLevel(Logger.getLogger("qpid.protocol"), Level.WARN);
+            setLoggerLevel(Logger.getLogger("org.apache.qpid"), Level.WARN);
+
+            // Set the broker to use info level logging, which is the qpid-server
+            // default. Rather than debug which is the test default.
+            setBrokerOnlySystemProperty("amqj.server.logging.level", "info");
+            // Set the logging defaults to info for this test.
+            setBrokerOnlySystemProperty("amqj.logging.level", "info");
+            setBrokerOnlySystemProperty("root.logging.level", "info");
 
             startBroker();
 

Copied: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java (from r824132, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java?p2=qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java&p1=qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java&r1=824132&r2=824198&rev=824198&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java Sun Oct 11 23:22:08 2009
@@ -188,8 +188,7 @@
 
         // Send IO Exception - causing failover
         _connection.getProtocolHandler().
-                exceptionCaught(_connection.getProtocolHandler().getProtocolSession().getIoSession(),
-                                new WriteTimeoutException("WriteTimeoutException to cause failover."));
+                exception(new WriteTimeoutException("WriteTimeoutException to cause failover."));
 
         // Verify Failover occured through ConnectionListener
         assertTrue("Failover did not occur",

Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BrokerLoggingTest.java Sun Oct 11 23:22:08 2009
@@ -160,7 +160,7 @@
 
             // Set the broker.ready string to check for the _log4j default that
             // is still present on standard out. 
-            System.setProperty(BROKER_READY, "Qpid Broker Ready");
+            setTestClientSystemProperty(BROKER_READY, "Qpid Broker Ready");
 
             startBroker();
 

Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java Sun Oct 11 23:22:08 2009
@@ -63,7 +63,6 @@
  */
 public class DeepQueueConsumeWithSelector extends QpidTestCase implements MessageListener
 {
-    private static final String INDEX = "index";
 
     private static final int MESSAGE_COUNT = 10000;
     private static final int BATCH_SIZE = MESSAGE_COUNT / 10;
@@ -129,9 +128,7 @@
     @Override
     public Message createNextMessage(Session session, int msgCount) throws JMSException
     {
-        Message message = session.createTextMessage("Message :" + msgCount);
-
-        message.setIntProperty(INDEX, msgCount);
+        Message message = super.createNextMessage(session,msgCount);
 
         if ((msgCount % BATCH_SIZE) == 0 )
         {

Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java Sun Oct 11 23:22:08 2009
@@ -21,27 +21,34 @@
 
 package org.apache.qpid.server.security.acl;
 
-
-import junit.framework.TestCase;
-
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.Logger;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.client.*;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.commons.configuration.ConfigurationException;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+
 
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.naming.NamingException;
-
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -54,6 +61,14 @@
 
     public void setUp() throws Exception
     {
+    	//Performing setUp here would result in a broker with the default ACL test config
+    	
+    	//Each test now calls the private setUpACLTest to allow them to make 
+    	//individual customisations to the base ACL settings
+    }
+    
+    private void setUpACLTest() throws Exception
+    {
         final String QPID_HOME = System.getProperty("QPID_HOME");
 
         if (QPID_HOME == null)
@@ -73,8 +88,10 @@
         return "amqp://" + username + ":" + password + "@clientid/test?brokerlist='" + getBroker() + "?retries='0''";
     }
 
-    public void testAccessAuthorized() throws AMQException, URLSyntaxException
+    public void testAccessAuthorized() throws AMQException, URLSyntaxException, Exception
     {
+    	setUpACLTest();
+    	
         try
         {
             Connection conn = getConnection("client", "guest");
@@ -96,6 +113,8 @@
 
     public void testAccessNoRights() throws Exception
     {
+    	setUpACLTest();
+    	
         try
         {
             Connection conn = getConnection("guest", "guest");
@@ -120,8 +139,40 @@
         }
     }
 
-    public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException
+    public void testGuestConsumeWithCreateRightsAndWithoutConsumeRights() throws NamingException, ConfigurationException, IOException, Exception
+    {
+        //Customise the ACL config to give the guest user some create (could be any, non-consume) rights to 
+        //force creation of a PrincipalPermissions instance to perform the consume rights check against.
+        setConfigurationProperty("virtualhosts.virtualhost.test.security.access_control_list.create.queues.queue.users.user", "guest");
+        
+        setUpACLTest();
+        
+        try
+        {
+            Connection conn = getConnection("guest", "guest");
+
+            Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            conn.start();
+
+            sesh.createConsumer(sesh.createQueue("example.RequestQueue"));
+
+            conn.close();
+        }
+        catch (JMSException e)
+        {
+            Throwable cause = e.getLinkedException();
+
+            assertNotNull("There was no liked exception", cause);
+            assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
+            assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+        }
+    }
+
+    public void testClientConsumeFromTempQueueValid() throws AMQException, URLSyntaxException, Exception
     {
+    	setUpACLTest();
+    	
         try
         {
             Connection conn = getConnection("client", "guest");
@@ -142,6 +193,8 @@
 
     public void testClientConsumeFromNamedQueueInvalid() throws NamingException, Exception
     {
+    	setUpACLTest();
+    	
         try
         {
             Connection conn = getConnection("client", "guest");
@@ -167,8 +220,10 @@
         }
     }
 
-    public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException
+    public void testClientCreateTemporaryQueue() throws JMSException, URLSyntaxException, Exception
     {
+    	setUpACLTest();
+    	
         try
         {
             Connection conn = getConnection("client", "guest");
@@ -191,6 +246,8 @@
 
     public void testClientCreateNamedQueue() throws NamingException, JMSException, AMQException, Exception
     {
+    	setUpACLTest();
+    	
         try
         {
             Connection conn = getConnection("client", "guest");
@@ -212,8 +269,10 @@
         }
     }
 
-    public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException
+    public void testClientPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, Exception
     {
+    	setUpACLTest();
+    	
         try
         {
             Connection conn = getConnection("client", "guest");
@@ -239,8 +298,10 @@
         }
     }
 
-    public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException
+    public void testClientPublishValidQueueSuccess() throws AMQException, URLSyntaxException, Exception
     {
+    	setUpACLTest();
+    	
         try
         {
             Connection conn = getConnection("client", "guest");
@@ -271,6 +332,8 @@
 
     public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
     {
+    	setUpACLTest();
+    	
         try
         {
             Connection conn = getConnection("client", "guest");
@@ -311,8 +374,10 @@
         assertTrue("Did not get AMQAuthenticationException thrown", foundCorrectException);
     }
 
-    public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException
+    public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException, Exception
     {
+    	setUpACLTest();
+    	
         try
         {
             Connection conn = getConnection("server", "guest");
@@ -333,6 +398,8 @@
 
     public void testServerConsumeFromNamedQueueInvalid() throws AMQException, URLSyntaxException, NamingException, Exception
     {
+    	setUpACLTest();
+    	
         try
         {
             Connection conn = getConnection("client", "guest");
@@ -358,6 +425,8 @@
 
     public void testServerConsumeFromTemporaryQueue() throws AMQException, URLSyntaxException, NamingException, Exception
     {
+    	setUpACLTest();
+    	
         try
         {
             Connection conn = getConnection("server", "guest");
@@ -391,8 +460,10 @@
         return (Connection) connection;
     }
 
-    public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException
+    public void testServerCreateNamedQueueValid() throws JMSException, URLSyntaxException, Exception
     {
+    	setUpACLTest();
+    	
         try
         {
             Connection conn = getConnection("server", "guest");
@@ -414,6 +485,8 @@
 
     public void testServerCreateNamedQueueInvalid() throws JMSException, URLSyntaxException, AMQException, NamingException, Exception
     {
+    	setUpACLTest();
+    	
         try
         {
             Connection conn = getConnection("server", "guest");
@@ -436,6 +509,8 @@
 
     public void testServerCreateTemporaryQueueInvalid() throws NamingException, Exception
     {
+    	setUpACLTest();
+    	
         try
         {
             Connection conn = getConnection("server", "guest");
@@ -461,6 +536,8 @@
 
     public void testServerCreateAutoDeleteQueueInvalid() throws NamingException, JMSException, AMQException, Exception
     {
+    	setUpACLTest();
+    	
         Connection connection = null;
         try
         {
@@ -492,6 +569,8 @@
      */
     public void testServerPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
     {
+    	setUpACLTest();
+    	
         //Set up the Server
         Connection serverConnection = getConnection("server", "guest");
 
@@ -572,6 +651,8 @@
 
     public void testServerPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
     {
+    	setUpACLTest();
+    	
         try
         {
             Connection conn = getConnection("server", "guest");

Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java Sun Oct 11 23:22:08 2009
@@ -61,7 +61,7 @@
 
         setupSession();
 
-        _queue = _clientSession.createQueue(getName()+System.currentTimeMillis());
+        _queue = _clientSession.createQueue(getTestQueueName());
         _clientSession.createConsumer(_queue).close();
         
         //Ensure there are no messages on the queue to start with.
@@ -497,7 +497,7 @@
 
             if (msgCount == failPoint)
             {
-                failBroker();
+                failBroker(getFailingPort());
             }
         }
 
@@ -529,7 +529,7 @@
             sendMessages("connection2", messages);
         }
 
-        failBroker();
+        failBroker(getFailingPort());
 
         checkQueueDepth(messages);
 

Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java Sun Oct 11 23:22:08 2009
@@ -22,64 +22,172 @@
 
 import org.apache.qpid.test.utils.*;
 import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import junit.framework.ComparisonFailure;
+import junit.framework.AssertionFailedError;
 
 /**
- * RollbackOrderTest
+ * RollbackOrderTest, QPID-1864, QPID-1871
+ *
+ * Description:
+ *
+ * The problem that this test is exposing is that the dispatcher used to be capable
+ * of holding on to a message when stopped. This ment that when the rollback was
+ * called and the dispatcher stopped it may have hold of a message. So after all
+ * the local queues(preDeliveryQueue, SynchronousQueue, PostDeliveryTagQueue)
+ * have been cleared the client still had a single message, the one the
+ * dispatcher was holding on to.
+ *
+ * As a result the TxRollback operation would run and then release the dispatcher.
+ * Whilst the dispatcher would then proceed to reject the message it was holiding
+ * the Broker would already have resent that message so the rejection would silently
+ * fail.
+ *
+ * And the client would receieve that single message 'early', depending on the
+ * number of messages already recevied when rollback was called.
+ *
+ *
+ * Aims:
+ *
+ * The tests puts 50 messages on to the queue.
+ *
+ * The test then tries to cause the dispatcher to stop whilst it is in the process
+ * of moving a message from the preDeliveryQueue to a consumers sychronousQueue.
+ *
+ * To exercise this path we have 50 message flowing to the client to give the
+ * dispatcher a bit of work to do moving messages.
+ *
+ * Then we loop - 10 times
+ *  - Validating that the first message received is always message 1.
+ *  - Receive a few more so that there are a few messages to reject.
+ *  - call rollback, to try and catch the dispatcher mid process.
+ *
+ * Outcome:
+ *
+ * The hope is that we catch the dispatcher mid process and cause a BasicReject
+ * to fail. Which will be indicated in the log but will also cause that failed
+ * rejected message to be the next to be delivered which will not be message 1
+ * as expected.
+ *
+ * We are testing a race condition here but we can check through the log file if
+ * the race condition occured. However, performing that check will only validate
+ * the problem exists and will not be suitable as part of a system test.
  *
  */
-
 public class RollbackOrderTest extends QpidTestCase
 {
 
-    private Connection conn;
-    private Queue queue;
-    private Session ssn;
-    private MessageProducer prod;
-    private MessageConsumer cons;
+    private Connection _connection;
+    private Queue _queue;
+    private Session _session;
+    private MessageConsumer _consumer;
 
     @Override public void setUp() throws Exception
     {
         super.setUp();
-        conn = getConnection();
-        conn.start();
-        ssn = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        queue = ssn.createQueue("rollback-order-test-queue");
-        prod = ssn.createProducer(queue);
-        cons = ssn.createConsumer(queue);
-        for (int i = 0; i < 5; i++)
-        {
-            TextMessage msg = ssn.createTextMessage("message " + (i+1));
-            prod.send(msg);
-        }
-        ssn.commit();
+        _connection = getConnection();
+
+        _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+        _queue = _session.createQueue(getTestQueueName());
+        _consumer = _session.createConsumer(_queue);
+
+        //Send more messages so it is more likely that the dispatcher is
+        // processing on rollback.
+        sendMessage(_session, _queue, 50);
+        _session.commit();
+
     }
 
     public void testOrderingAfterRollback() throws Exception
     {
-        for (int i = 0; i < 10; i++)
+        //Start the session now so we
+        _connection.start();
+
+        for (int i = 0; i < 20; i++)
         {
-            TextMessage msg = (TextMessage) cons.receive();
-            assertEquals("message 1", msg.getText());
-            ssn.rollback();
+            Message msg = _consumer.receive();
+            assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
+
+            // Pull additional messages through so we have some reject work to do
+            for (int m=0; m < 5 ; m++)
+            {
+                _consumer.receive();
+            }
+
+            System.err.println("ROT-Rollback");
+            _logger.warn("ROT-Rollback");
+            _session.rollback();
         }
     }
 
-    @Override public void tearDown() throws Exception
+    public void testOrderingAfterRollbackOnMessage() throws Exception
     {
-        while (true)
+        final CountDownLatch count= new CountDownLatch(20);
+        final Exception exceptions[] = new Exception[20];
+        final AtomicBoolean failed = new AtomicBoolean(false);
+
+        _consumer.setMessageListener(new MessageListener()
         {
-            Message msg = cons.receiveNoWait();
-            if (msg == null)
+
+            public void onMessage(Message message)
             {
-                break;
+
+                Message msg = message;
+                try
+                {
+                    count.countDown();
+                    assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
+
+                    _session.rollback();
+                }
+                catch (JMSException e)
+                {
+                    System.out.println("Error:" + e.getMessage());
+                    exceptions[(int)count.getCount()] = e;
+                }
+                catch (AssertionFailedError cf)
+                {
+                    // End Test if Equality test fails
+                    while (count.getCount() != 0)
+                    {
+                        count.countDown();
+                    }
+
+                    System.out.println("Error:" + cf.getMessage());
+                    System.err.println(cf.getMessage());
+                    cf.printStackTrace();
+                    failed.set(true);
+                }
             }
-            else
+        });
+        //Start the session now so we
+        _connection.start();
+
+        count.await();
+
+        for (Exception e : exceptions)
+        {
+            if (e != null)
             {
-                msg.acknowledge();
+                System.err.println(e.getMessage());
+                e.printStackTrace();
+                failed.set(true);
             }
         }
-        ssn.commit();
+
+//        _consumer.close();
+        _connection.close();
+        
+        assertFalse("Exceptions thrown during test run, Check Std.err.", failed.get());
+    }
+
+    @Override public void tearDown() throws Exception
+    {
+
+        drainQueue(_queue);
+
         super.tearDown();
     }
 

Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java Sun Oct 11 23:22:08 2009
@@ -37,7 +37,6 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession_0_10;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ConnectionListener;
 import org.apache.qpid.jms.ConnectionURL;
@@ -58,13 +57,12 @@
     private Session consumerSession;
     private MessageConsumer consumer;
 
-    private static int usedBrokers = 0;
     private CountDownLatch failoverComplete;
-    private static final long DEFAULT_FAILOVER_TIME = 10000L;
     private boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
     private int seed;
     private Random rand;
-    
+    private int _currentPort = getFailingPort();
+
     @Override
     protected void setUp() throws Exception
     {
@@ -227,7 +225,7 @@
         
         _logger.info("Failing over");
 
-        causeFailure(DEFAULT_FAILOVER_TIME);
+        causeFailure(_currentPort, DEFAULT_FAILOVER_TIME);
 
         // Check that you produce and consume the rest of messages.
         _logger.debug("==================");
@@ -242,10 +240,10 @@
         _logger.debug("==================");
     }
 
-    private void causeFailure(long delay)
+    private void causeFailure(int port, long delay)
     {
 
-        failBroker();
+        failBroker(port);
 
         _logger.info("Awaiting Failover completion");
         try
@@ -268,7 +266,7 @@
         Message msg = consumer.receive();
         assertNotNull("Expected msgs not received", msg);
 
-        causeFailure(DEFAULT_FAILOVER_TIME);
+        causeFailure(getFailingPort(), DEFAULT_FAILOVER_TIME);
 
         Exception failure = null;
         try
@@ -314,7 +312,7 @@
         long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000;
 
         //Fail the first broker
-        causeFailure(FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
+        causeFailure(getFailingPort(), FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
 
         //Reconnection should occur
         assertTrue("Failover did not take long enough", System.nanoTime() > failTime);
@@ -344,15 +342,15 @@
             _logger.debug("===================================================================");
             
             runP2PFailover(numMessages, false,false, false);
-            startBroker(getFailingPort());
+            startBroker(_currentPort);
             if (useAltPort)
             {
-            	setFailingPort(altPort);
+                _currentPort = altPort;
                 useAltPort = false;
             }
             else
             {
-            	setFailingPort(stdPort);
+            	_currentPort = stdPort;
             	useAltPort = true;
             }
             



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org