You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/10/28 17:52:18 UTC

svn commit: r1190406 [4/5] - in /qpid/branches/qpid-3346/qpid: ./ bin/ cpp/design_docs/ cpp/docs/api/ cpp/docs/man/ cpp/examples/old_api/tradedemo/ cpp/include/qmf/engine/ cpp/src/ cpp/src/posix/ cpp/src/qmf/engine/ cpp/src/qpid/ cpp/src/qpid/acl/ cpp/...

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Fri Oct 28 15:52:13 2011
@@ -50,6 +50,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Session
@@ -125,6 +126,8 @@ public class Session extends SessionInvo
     private SessionDetachCode detachCode;
     private final Object stateLock = new Object();
 
+    private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
+
     protected Session(Connection connection, Binary name, long expiry)
     {
         this(connection, new SessionDelegate(), name, expiry);
@@ -257,6 +260,7 @@ public class Session extends SessionInvo
 
     void resume()
     {
+        _failoverRequired.set(false);
         synchronized (commands)
         {
             attach();
@@ -459,7 +463,7 @@ public class Session extends SessionInvo
 
         synchronized (commands)
         {
-            if (state == DETACHED || state == CLOSING)
+            if (state == DETACHED || state == CLOSING || state == CLOSED)
             {
                 return;
             }
@@ -583,30 +587,25 @@ public class Session extends SessionInvo
             
             synchronized (commands)
             {
-                //allow the txSelect operation to be invoked during resume
-                boolean skipWait = m instanceof TxSelect && state == RESUMING;
-
-                if(!skipWait)
+                if (state == DETACHED && m.isUnreliable())
                 {
-                    if (state == DETACHED && m.isUnreliable())
+                    Thread current = Thread.currentThread();
+                    if (!current.equals(resumer))
                     {
-                        Thread current = Thread.currentThread();
-                        if (!current.equals(resumer))
-                        {
-                            return;
-                        }
+                        return;
                     }
+                }
 
-                    if (state != OPEN && state != CLOSED && state != CLOSING)
+                if (state != OPEN && state != CLOSED && state != CLOSING)
+                {
+                    Thread current = Thread.currentThread();
+                    if (!current.equals(resumer) )
                     {
-                        Thread current = Thread.currentThread();
-                        if (!current.equals(resumer))
+                        Waiter w = new Waiter(commands, timeout);
+                        while (w.hasTime() && (state != OPEN && state != CLOSED))
                         {
-                            Waiter w = new Waiter(commands, timeout);
-                            while (w.hasTime() && (state != OPEN && state != CLOSED))
-                            {
-                                w.await();
-                            }
+                            checkFailoverRequired("Command was interrupted because of failover, before being sent");
+                            w.await();
                         }
                     }
                 }
@@ -674,6 +673,7 @@ public class Session extends SessionInvo
                                 }
                             }
                         }
+                        checkFailoverRequired("Command was interrupted because of failover, before being sent");
                         w.await();
                     }
                 }
@@ -768,6 +768,14 @@ public class Session extends SessionInvo
         }
     }
 
+    private void checkFailoverRequired(String message)
+    {
+        if (_failoverRequired.get())
+        {
+            throw new SessionException(message);
+        }
+    }
+
     protected boolean shouldIssueFlush(int next)
     {
         return (next % 65536) == 0;
@@ -793,6 +801,7 @@ public class Session extends SessionInvo
             Waiter w = new Waiter(commands, timeout);
             while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
             {
+                checkFailoverRequired("Session sync was interrupted by failover.");
                 log.debug("%s   waiting for[%d]: %d, %s", this, point, maxComplete, commands);
                 w.await();
             }
@@ -853,13 +862,6 @@ public class Session extends SessionInvo
         }
     }
 
-    private ConnectionClose close = null;
-
-    void closeCode(ConnectionClose close)
-    {
-        this.close = close;
-    }
-
     ExecutionException getException()
     {
         synchronized (results)
@@ -910,6 +912,7 @@ public class Session extends SessionInvo
                 Waiter w = new Waiter(this, timeout);
                 while (w.hasTime() && state != CLOSED && !isDone())
                 {
+                    checkFailoverRequired("Operation was interrupted by failover.");
                     log.debug("%s waiting for result: %s", Session.this, this);
                     w.await();
                 }
@@ -921,7 +924,12 @@ public class Session extends SessionInvo
             }
             else if (state == CLOSED)
             {
-                throw new SessionException(getException());
+                ExecutionException ex = getException();
+                if(ex == null)
+                {
+                    throw new SessionClosedException();
+                }
+                throw new SessionException(ex);
             }
             else
             {
@@ -1001,6 +1009,7 @@ public class Session extends SessionInvo
         Waiter w = new Waiter(commands, timeout);
         while (w.hasTime() && state != CLOSED)
         {
+            checkFailoverRequired("close() was interrupted by failover.");
             w.await();
         }
 
@@ -1095,6 +1104,7 @@ public class Session extends SessionInvo
                 Waiter w = new Waiter(stateLock, timeout);
                 while (w.hasTime() && state == NEW)
                 {
+                    checkFailoverRequired("Session opening was interrupted by failover.");
                     w.await();
                 }
             }
@@ -1117,4 +1127,26 @@ public class Session extends SessionInvo
     {
         return stateLock;
     }
+
+    protected void notifyFailoverRequired()
+    {
+        //ensure any operations waiting are aborted to
+        //prevent them waiting for timeout for 60 seconds
+        //and possibly preventing failover proceeding
+        _failoverRequired.set(true);
+        synchronized (commands)
+        {
+            commands.notifyAll();
+        }
+        synchronized (results)
+        {
+            for (ResultFuture<?> result : results.values())
+            {
+                synchronized(result)
+                {
+                    result.notifyAll();
+                }
+            }
+        }
+    }
 }

Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/management/LoggingManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747869,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,4 +3,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,4 +3,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,4 +3,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanAttribute.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,4 +3,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanConstructor.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,4 +3,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanDescription.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,4 +3,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperation.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,4 +3,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperationParameter.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/eclipse-plugin/src/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/management/eclipse-plugin/src:788755
 /qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src:1144319-1179750
+/qpid/trunk/qpid/java/management/eclipse-plugin/src:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:1144319-1179750
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:1144319-1179750
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:1144319-1179750
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:1144319-1179750
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:805429-821809
 /qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:1144319-1179750
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:1144319-1190375

Modified: qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-firewall-2.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-firewall-2.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-firewall-2.xml (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-firewall-2.xml Fri Oct 28 15:52:13 2011
@@ -32,8 +32,8 @@
         <ssl>
             <enabled>false</enabled>
             <sslOnly>false</sslOnly>
-            <keystorePath>/path/to/keystore.ks</keystorePath>
-            <keystorePassword>keystorepass</keystorePassword>
+            <keyStorePath>/path/to/keystore.ks</keyStorePath>
+            <keyStorePassword>keystorepass</keyStorePassword>
         </ssl>
         <port>5672</port>
         <sslport>8672</sslport>

Modified: qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-firewall-3.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-firewall-3.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-firewall-3.xml (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-firewall-3.xml Fri Oct 28 15:52:13 2011
@@ -28,12 +28,12 @@
     <connector>
         <!-- To enable SSL edit the keystorePath and keystorePassword
 	     and set enabled to true. 
-             To disasble Non-SSL port set sslOnly to true -->
+             To disable Non-SSL port set sslOnly to true -->
         <ssl>
             <enabled>false</enabled>
             <sslOnly>false</sslOnly>
-            <keystorePath>/path/to/keystore.ks</keystorePath>
-            <keystorePassword>keystorepass</keystorePassword>
+            <keyStorePath>/path/to/keystore.ks</keyStorePath>
+            <keyStorePassword>keystorepass</keyStorePassword>
         </ssl>
         <port>5672</port>
         <sslport>8672</sslport>

Modified: qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-settings.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-settings.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-settings.xml (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-settings.xml Fri Oct 28 15:52:13 2011
@@ -25,8 +25,8 @@
             <port>15671</port>
             <enabled>false</enabled>
             <sslOnly>false</sslOnly>
-            <keystorePath>${QPID_HOME}/../test-profiles/test_resources/ssl/java_broker_keystore.jks</keystorePath>
-            <keystorePassword>password</keystorePassword>
+            <keyStorePath>${QPID_HOME}/../test-profiles/test_resources/ssl/java_broker_keystore.jks</keyStorePath>
+            <keyStorePassword>password</keyStorePassword>
         </ssl>
     </connector>
     <management>

Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java Fri Oct 28 15:52:13 2011
@@ -35,13 +35,11 @@ import org.slf4j.LoggerFactory;
 
 public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase
 {
-
-    private static final int NUM_MESSAGES = 1000;
-
     private Connection con;
     private Session session;
     private AMQQueue queue;
     private MessageConsumer consumer;
+    private int numMessages;
 
     private static final Logger _logger = LoggerFactory.getLogger(AMQQueueDeferredOrderingTest.class);
 
@@ -87,6 +85,8 @@ public class AMQQueueDeferredOrderingTes
     {
         super.setUp();
 
+        numMessages = isBrokerStorePersistent() ? 300 : 1000;
+
         _logger.info("Create Connection");
         con = getConnection();
         _logger.info("Create Session");
@@ -105,19 +105,19 @@ public class AMQQueueDeferredOrderingTes
 
         // Setup initial messages
         _logger.info("Creating first producer thread");
-        producerThread = new ASyncProducer(queue, 0, NUM_MESSAGES / 2);
+        producerThread = new ASyncProducer(queue, 0, numMessages / 2);
         producerThread.start();
         // Wait for them to be done
         producerThread.join();
 
         // Setup second set of messages to produce while we consume
         _logger.info("Creating second producer thread");
-        producerThread = new ASyncProducer(queue, NUM_MESSAGES / 2, NUM_MESSAGES);
+        producerThread = new ASyncProducer(queue, numMessages / 2, numMessages);
         producerThread.start();
 
         // Start consuming and checking they're in order
         _logger.info("Consuming messages");
-        for (int i = 0; i < NUM_MESSAGES; i++)
+        for (int i = 0; i < numMessages; i++)
         {
             Message msg = consumer.receive(3000);
             assertNotNull("Message should not be null", msg);

Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java Fri Oct 28 15:52:13 2011
@@ -135,7 +135,7 @@ public class ResetMessageListenerTest ex
         try
         {
             assertTrue("Did not receive all first batch of messages", 
-                    _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS));
+                    _allFirstMessagesSent.await(MSG_COUNT, TimeUnit.SECONDS));
             _logger.info("Received first batch of messages");
         }
         catch (InterruptedException e)
@@ -212,7 +212,7 @@ public class ResetMessageListenerTest ex
 
         try
         {
-            assertTrue(_allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS));
+            assertTrue(_allSecondMessagesSent.await(MSG_COUNT, TimeUnit.SECONDS));
         }
         catch (InterruptedException e)
         {

Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java Fri Oct 28 15:52:13 2011
@@ -20,6 +20,7 @@ package org.apache.qpid.client.failover;
 
 import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.Enumeration;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -33,12 +34,15 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.TransactionRolledBackException;
 
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.FailoverPolicy;
 import org.apache.qpid.test.utils.FailoverBaseCase;
 
 /**
@@ -96,6 +100,11 @@ public class FailoverBehaviourTest exten
      */
     private JMSException _exceptionListenerException;
 
+    /**
+     * Latch to check that failover mutex is hold by a failover thread
+     */
+    private CountDownLatch _failoverStarted;
+
     @Override
     protected void setUp() throws Exception
     {
@@ -105,6 +114,7 @@ public class FailoverBehaviourTest exten
         _connection.setExceptionListener(this);
         ((AMQConnection) _connection).setConnectionListener(this);
         _failoverComplete = new CountDownLatch(1);
+        _failoverStarted = new CountDownLatch(1);
     }
 
     /**
@@ -463,6 +473,31 @@ public class FailoverBehaviourTest exten
     }
 
     /**
+     * Test that calling acknowledge before failover leaves the session
+     * clean for use after failover.
+     */
+    public void testAcknowledgeBeforeFailover() throws Exception
+    {
+        init(Session.CLIENT_ACKNOWLEDGE, true);
+
+        produceMessages();
+
+        // consume messages and acknowledge them
+        Message lastMessage = consumeMessages();
+        lastMessage.acknowledge();
+
+        causeFailure();
+
+        assertFailoverException();
+
+        produceMessages();
+
+        // tests whether receiving and acknowledgment is working after recover
+        lastMessage = consumeMessages();
+        lastMessage.acknowledge();
+    }
+
+    /**
      * Test that receiving of messages after failover prior to calling
      * {@link Message#acknowledge()} still results in acknowledge throwing an exception.
      */
@@ -600,8 +635,134 @@ public class FailoverBehaviourTest exten
         sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE);
     }
 
+    public void testPublishAutoAcknowledgedWhileFailover() throws Exception
+    {
+        publishWhileFailingOver(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public void testPublishClientAcknowledgedWhileFailover() throws Exception
+    {
+        Message receivedMessage = publishWhileFailingOver(Session.CLIENT_ACKNOWLEDGE);
+        receivedMessage.acknowledge();
+    }
+
+    public void testPublishTransactedAcknowledgedWhileFailover() throws Exception
+    {
+        publishWhileFailingOver(Session.SESSION_TRANSACTED);
+        _consumerSession.commit();
+    }
+
+    public void testPublishAutoAcknowledgedWithFailoverMutex() throws Exception
+    {
+        publishWithFailoverMutex(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public void testPublishClientAcknowledgedWithFailoverMutex() throws Exception
+    {
+        publishWithFailoverMutex(Session.CLIENT_ACKNOWLEDGE);
+
+    }
+
+    public void testPublishTransactedAcknowledgedWithFailoverMutex() throws Exception
+    {
+        publishWithFailoverMutex(Session.SESSION_TRANSACTED);
+    }
+
+    public void testClientAcknowledgedSessionCloseWhileFailover() throws Exception
+    {
+        sessionCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    public void testTransactedSessionCloseWhileFailover() throws Exception
+    {
+        sessionCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
+    }
+
+    public void testAutoAcknowledgedSessionCloseWhileFailover() throws Exception
+    {
+        sessionCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public void testClientAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
+    {
+        browserCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    public void testTransactedQueueBrowserCloseWhileFailover() throws Exception
+    {
+        browserCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
+    }
+
+    public void testAutoAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
+    {
+        browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException
+    {
+        setDelayedFailoverPolicy(5);
+        init(autoAcknowledge, true);
+
+        String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
+        Message message = _producerSession.createTextMessage(text);
+
+        failBroker(getFailingPort());
+
+        if(!_failoverStarted.await(5, TimeUnit.SECONDS))
+        {
+            fail("Did not receieve notification failover had started");
+        }
+
+        _producer.send(message);
+
+        if (_producerSession.getTransacted())
+        {
+            _producerSession.commit();
+        }
+
+        Message receivedMessage = _consumer.receive(1000l);
+        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+        return receivedMessage;
+    }
+
+    private void publishWithFailoverMutex(int autoAcknowledge) throws JMSException, InterruptedException
+    {
+        setDelayedFailoverPolicy(5);
+        init(autoAcknowledge, true);
+
+        String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
+        Message message = _producerSession.createTextMessage(text);
+
+        AMQConnection connection = (AMQConnection)_connection;
+
+        // holding failover mutex should prevent the failover from
+        // proceeding before we try to send the message
+        synchronized(connection.getFailoverMutex())
+        {
+            failBroker(getFailingPort());
+
+            // wait to make sure that connection is lost
+            while(!connection.isFailingOver())
+            {
+                Thread.sleep(25l);
+            }
+
+            try
+            {
+                _producer.send(message);
+                fail("Sending should fail because connection was lost and failover has not yet completed");
+            }
+            catch(JMSException e)
+            {
+                // JMSException is expected
+            }
+        }
+        // wait for failover completion, thus ensuring it actually
+        //got started, before allowing the test to tear down
+        awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
+     }
     /**
-     * Tests {@link Session#close()} for session with given acknowledge mode 
+     * Tests {@link Session#close()} for session with given acknowledge mode
      * to ensure that close works after failover.
      *
      * @param acknowledgeMode session acknowledge mode
@@ -646,7 +807,7 @@ public class FailoverBehaviourTest exten
         boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false;
 
         _consumerSession = _connection.createSession(isTransacted, acknowledgeMode);
-        _destination = _consumerSession.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
+        _destination = createDestination(_consumerSession);
         _consumer = _consumerSession.createConsumer(_destination);
 
         if (startConnection)
@@ -659,6 +820,11 @@ public class FailoverBehaviourTest exten
 
     }
 
+    protected Destination createDestination(Session session) throws JMSException
+    {
+        return session.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
+    }
+
     /**
      * Resends messages if reconnected to a non-clustered broker
      *
@@ -854,6 +1020,7 @@ public class FailoverBehaviourTest exten
     @Override
     public boolean preFailover(boolean redirect)
     {
+        _failoverStarted.countDown();
         return true;
     }
 
@@ -875,6 +1042,39 @@ public class FailoverBehaviourTest exten
         _exceptionListenerException = e;
     }
 
+    /**
+     * Causes 1 second delay before reconnect in order to test whether JMS
+     * methods block while failover is in progress
+     */
+    private static class DelayingFailoverPolicy extends FailoverPolicy
+    {
+
+        private CountDownLatch _suspendLatch;
+        private long _delay;
+
+        public DelayingFailoverPolicy(AMQConnection connection, long delay)
+        {
+            super(connection.getConnectionURL(), connection);
+            _suspendLatch = new CountDownLatch(1);
+            _delay = delay;
+        }
+
+        public void attainedConnection()
+        {
+            try
+            {
+                _suspendLatch.await(_delay, TimeUnit.SECONDS);
+            }
+            catch (InterruptedException e)
+            {
+                // continue
+            }
+            super.attainedConnection();
+        }
+
+    }
+
+
     private class FailoverTestMessageListener implements MessageListener
     {
         // message counter
@@ -921,4 +1121,101 @@ public class FailoverBehaviourTest exten
             return _counter.get();
         }
     }
+
+    /**
+     * Tests {@link Session#close()} for session with given acknowledge mode
+     * to ensure that it blocks until failover implementation restores connection.
+     *
+     * @param acknowledgeMode session acknowledge mode
+     * @throws JMSException
+     */
+    private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException
+    {
+        initDelayedFailover(acknowledgeMode);
+
+        // intentionally receive message but not commit or acknowledge it in
+        // case of transacted or CLIENT_ACK session
+        Message receivedMessage = _consumer.receive(1000l);
+        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+
+        failBroker(getFailingPort());
+
+        // test whether session#close blocks while failover is in progress
+        _consumerSession.close();
+
+        assertFailoverException();
+    }
+
+    /**
+     * A helper method to instantiate {@link QueueBrowser} and publish test messages on a test queue for further browsing.
+     *
+     * @param acknowledgeMode session acknowledge mode
+     * @return queue browser
+     * @throws JMSException
+     */
+    private QueueBrowser prepareQueueBrowser(int acknowledgeMode) throws JMSException
+    {
+        init(acknowledgeMode, false);
+        _consumer.close();
+        QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination);
+        _connection.start();
+
+        produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
+        if (acknowledgeMode == Session.SESSION_TRANSACTED)
+        {
+            _producerSession.commit();
+        }
+        return browser;
+    }
+
+    /**
+     * Tests {@link QueueBrowser#close()} for session with given acknowledge mode
+     * to ensure that it blocks until failover implementation restores connection.
+     *
+     * @param acknowledgeMode session acknowledge mode
+     * @throws JMSException
+     */
+    private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException
+    {
+        setDelayedFailoverPolicy();
+
+        QueueBrowser browser = prepareQueueBrowser(acknowledgeMode);
+
+        @SuppressWarnings("unchecked")
+        Enumeration<Message> messages = browser.getEnumeration();
+        Message receivedMessage = (Message) messages.nextElement();
+        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+
+        failBroker(getFailingPort());
+
+        browser.close();
+
+        assertFailoverException();
+    }
+
+    private DelayingFailoverPolicy initDelayedFailover(int acknowledgeMode) throws JMSException
+    {
+        DelayingFailoverPolicy failoverPolicy = setDelayedFailoverPolicy();
+        init(acknowledgeMode, true);
+        produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
+        if (acknowledgeMode == Session.SESSION_TRANSACTED)
+        {
+            _producerSession.commit();
+        }
+        return failoverPolicy;
+    }
+
+    private DelayingFailoverPolicy setDelayedFailoverPolicy()
+    {
+        return setDelayedFailoverPolicy(2);
+    }
+
+    private DelayingFailoverPolicy setDelayedFailoverPolicy(long delay)
+    {
+        AMQConnection amqConnection = (AMQConnection) _connection;
+        DelayingFailoverPolicy failoverPolicy = new DelayingFailoverPolicy(amqConnection, delay);
+        ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy);
+        return failoverPolicy;
+    }
+    
 }

Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java Fri Oct 28 15:52:13 2011
@@ -122,7 +122,8 @@ public class MultipleTransactedBatchProd
         producer2.start();
 
         //await delivery of the messages
-        boolean result = _receivedLatch.await(75, TimeUnit.SECONDS);
+        int timeout = isBrokerStorePersistent() ? 300 : 75;
+        boolean result = _receivedLatch.await(timeout, TimeUnit.SECONDS);
 
         assertNull("Test failed because: " + String.valueOf(_failMsg), _failMsg);
         assertTrue("Some of the messages were not all recieved in the given timeframe, remaining count was: "+_receivedLatch.getCount(),

Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java Fri Oct 28 15:52:13 2011
@@ -110,18 +110,13 @@ public class QueueDepthWithSelectorTest 
         try
         {
             Connection connection = getConnection();
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            AMQSession session = (AMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-            Thread.sleep(2000);
-            long queueDepth = ((AMQSession) session).getQueueDepth((AMQDestination) _queue);
+            long queueDepth = session.getQueueDepth((AMQDestination) _queue);
             assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth);
             
             connection.close();
         }
-        catch (InterruptedException e)
-        {
-            fail(e.getMessage());
-        }
         catch (AMQException e)
         {
             fail(e.getMessage());
@@ -158,6 +153,10 @@ public class QueueDepthWithSelectorTest 
         {
             assertTrue("Message " + msgId + " not received.", msgIdRecevied[msgId]);
         }
+
+        //do a synchronous op to ensure the acks are processed
+        //on the broker before proceeding
+        ((AMQSession)_clientSession).sync();
     }
 
     /**

Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java Fri Oct 28 15:52:13 2011
@@ -94,7 +94,7 @@ public class CancelTest extends QpidBrok
         browser.close();
 
         MessageConsumer consumer = _clientSession.createConsumer(_queue);
-        assertNotNull( consumer.receive() );
+        assertNotNull( consumer.receive(2000l) );
         consumer.close();
     }
 }

Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java Fri Oct 28 15:52:13 2011
@@ -22,237 +22,145 @@
 package org.apache.qpid.test.unit.client.temporaryqueue;
 
 import javax.jms.Connection;
-import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.jms.TextMessage;
-import junit.framework.Assert;
 
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.jms.ConnectionListener;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.LinkedList;
-
-public class TemporaryQueueTest extends QpidBrokerTestCase implements ExceptionListener
+/**
+ * Tests the behaviour of {@link TemporaryQueue}.
+ */
+public class TemporaryQueueTest extends QpidBrokerTestCase
 {
-    private List<Exception> _exceptions = new ArrayList<Exception>();
-
-    protected void setUp() throws Exception
-    {
-        super.setUp();
-    }
-
-    protected void tearDown() throws Exception
-    {
-       super.tearDown();
-    }
-
-    protected Connection createConnection() throws Exception
-    {
-        return  getConnection("guest", "guest");
-    }
-
-    public void testTemporaryQueue() throws Exception
-    {
-        Connection conn = createConnection();
-        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        TemporaryQueue queue = session.createTemporaryQueue();
+    /**
+     * Tests the basic produce/consume behaviour of a temporary queue.
+     */
+    public void testMessageDeliveryUsingTemporaryQueue() throws Exception
+    {
+        final Connection conn = getConnection();
+        final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final TemporaryQueue queue = session.createTemporaryQueue();
         assertNotNull(queue);
-        MessageProducer producer = session.createProducer(queue);
-        MessageConsumer consumer = session.createConsumer(queue);
+        final MessageProducer producer = session.createProducer(queue);
+        final MessageConsumer consumer = session.createConsumer(queue);
         conn.start();
         producer.send(session.createTextMessage("hello"));
         TextMessage tm = (TextMessage) consumer.receive(2000);
-        assertNotNull(tm);
+        assertNotNull("Message not received", tm);
         assertEquals("hello", tm.getText());
+    }
 
-        try
-        {
-            queue.delete();
-            fail("Expected JMSException : should not be able to delete while there are active consumers");
-        }
-        catch (JMSException je)
-        {
-            ; //pass
-        }
-
-        consumer.close();
+    /**
+     * Tests that a temporary queue cannot be used by another {@link Session}.
+     */
+    public void testUseFromAnotherSessionProhibited() throws Exception
+    {
+        final Connection conn = getConnection();
+        final Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final TemporaryQueue queue = session1.createTemporaryQueue();
+        assertNotNull(queue);
 
         try
         {
-            queue.delete();
+            session2.createConsumer(queue);
+            fail("Expected a JMSException when subscribing to a temporary queue created on a different session");
         }
         catch (JMSException je)
         {
-            fail("Unexpected Exception: " + je.getMessage());
+            //pass
+            assertEquals("Cannot consume from a temporary destination created on another session", je.getMessage());
         }
-
-        conn.close();
-    }
-
-    public void tUniqueness() throws Exception
-    {
-        int numProcs = Runtime.getRuntime().availableProcessors();
-        final int threadsProc = 5;
-
-        runUniqueness(1, 10);
-        runUniqueness(numProcs * threadsProc, 10);
-        runUniqueness(numProcs * threadsProc, 100);
-        runUniqueness(numProcs * threadsProc, 500);
     }
 
-    void runUniqueness(int makers, int queues) throws Exception
-    {
-        Connection connection = createConnection();
-
-        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+    /**
+     * Tests that the client is able to explicitly delete a temporary queue using
+     * {@link TemporaryQueue#delete()} and is prevented from deleting one that
+     * still has consumers.
+     *
+     * Note: Under < 0-10 {@link TemporaryQueue#delete()} only marks the queue as deleted
+     * on the client. 0-10 causes the queue to be deleted from the Broker.
+     */
+    public void testExplictTemporaryQueueDeletion() throws Exception
+    {
+        final Connection conn = getConnection();
+        final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session; // Required to observe the queue binding on the Broker
+        final TemporaryQueue queue = session.createTemporaryQueue();
+        assertNotNull(queue);
+        final MessageConsumer consumer = session.createConsumer(queue);
+        conn.start();
 
-        List<TempQueueMaker> tqList = new LinkedList<TempQueueMaker>();
+        assertTrue("Queue should be bound", amqSession.isQueueBound((AMQDestination)queue));
 
-        //Create Makers
-        for (int m = 0; m < makers; m++)
+        try
         {
-            tqList.add(new TempQueueMaker(session, queues));
+            queue.delete();
+            fail("Expected JMSException : should not be able to delete while there are active consumers");
         }
-
-
-        List<Thread> threadList = new LinkedList<Thread>();
-
-        //Create Makers
-        for (TempQueueMaker maker : tqList)
+        catch (JMSException je)
         {
-            threadList.add(new Thread(maker));
+            //pass
+            assertEquals("Temporary Queue has consumers so cannot be deleted", je.getMessage());
         }
+        consumer.close();
 
-        //Start threads
-        for (Thread thread : threadList)
-        {
-            thread.start();
-        }
+        // Now deletion should succeed.
+        queue.delete();
 
-        // Join Threads
-        for (Thread thread : threadList)
+        try
         {
-            try
-            {
-                thread.join();
-            }
-            catch (InterruptedException e)
-            {
-                fail("Couldn't correctly join threads");
-            }
+            session.createConsumer(queue);
+            fail("Exception not thrown");
         }
-
-
-        List<AMQQueue> list = new LinkedList<AMQQueue>();
-
-        // Test values
-        for (TempQueueMaker maker : tqList)
+        catch (JMSException je)
         {
-            check(maker, list);
+            //pass
+            assertEquals("Cannot consume from a deleted destination", je.getMessage());
         }
 
-        Assert.assertEquals("Not enough queues made.", makers * queues, list.size());
-
-        connection.close();
-    }
-
-    private void check(TempQueueMaker tq, List<AMQQueue> list)
-    {
-        for (AMQQueue q : tq.getList())
+        if (isBroker010())
         {
-            if (list.contains(q))
-            {
-                fail(q + " already exists.");
-            }
-            else
-            {
-                list.add(q);
-            }
+            assertFalse("Queue should no longer be bound", amqSession.isQueueBound((AMQDestination)queue));
         }
     }
 
-
-    class TempQueueMaker implements Runnable
+    /**
+     * Tests that a temporary queue remains available for reuse even after its initial
+     * consumer has disconnected.
+     *
+     *  This test would fail under < 0-10 as their temporary queues are deleted automatically
+     *  (broker side) after the last consumer disconnects (so message2 would be lost). For this
+     *  reason this test is excluded from those profiles.
+     */
+    public void testTemporaryQueueReused() throws Exception
     {
-        List<AMQQueue> _queues;
-        Session _session;
-        private int _count;
-
-
-        TempQueueMaker(Session session, int queues) throws JMSException
-        {
-            _queues = new LinkedList<AMQQueue>();
-
-            _count = queues;
-
-            _session = session;
-        }
-
-        public void run()
-        {
-            int i = 0;
-            try
-            {
-                for (; i < _count; i++)
-                {
-                    _queues.add((AMQQueue) _session.createTemporaryQueue());
-                }
-            }
-            catch (JMSException jmse)
-            {
-                //stop
-            }
-        }
-
-        List<AMQQueue> getList()
-        {
-            return _queues;
-        }
-    }
+        final Connection conn = getConnection();
+        final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final TemporaryQueue queue = session.createTemporaryQueue();
+        assertNotNull(queue);
 
-    public void testQPID1217() throws Exception
-    {
-        Connection conA = getConnection();
-        conA.setExceptionListener(this);
-        Session sessA = conA.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        TemporaryQueue temp = sessA.createTemporaryQueue();
-        
-        MessageProducer prod = sessA.createProducer(temp);
-        prod.send(sessA.createTextMessage("hi"));
-
-        Thread.sleep(500);
-        assertTrue("Exception received", _exceptions.isEmpty());
-        
-        Connection conB = getConnection();
-        Session sessB = conB.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        
-        JMSException ex = null;
-        try
-        {
-            MessageConsumer consB = sessB.createConsumer(temp);
-        } 
-        catch (JMSException e)
-        {
-            ex = e; 
-        }
-        assertNotNull(ex);
-    }
-    
-    public static junit.framework.Test suite()
-    {
-        return new junit.framework.TestSuite(TemporaryQueueTest.class);
-    }
+        final MessageProducer producer1 = session.createProducer(queue);
+        final MessageConsumer consumer1 = session.createConsumer(queue);
+        conn.start();
+        producer1.send(session.createTextMessage("message1"));
+        producer1.send(session.createTextMessage("message2"));
+        TextMessage tm = (TextMessage) consumer1.receive(2000);
+        assertNotNull("Message not received by first consumer", tm);
+        assertEquals("message1", tm.getText());
+        consumer1.close();
 
-    public void onException(JMSException arg0)
-    {
-        _exceptions.add(arg0);
+        final MessageConsumer consumer2 = session.createConsumer(queue);
+        conn.start();
+        tm = (TextMessage) consumer2.receive(2000);
+        assertNotNull("Message not received by second consumer", tm);
+        assertEquals("message2", tm.getText());
+        consumer2.close();
     }
-
 }

Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Fri Oct 28 15:52:13 2011
@@ -262,7 +262,7 @@ public class DurableSubscriptionTest ext
         producer.send(session1.createTextMessage("B"));
 
         _logger.info("Receive message on consumer 1 :expecting B");
-        msg = consumer1.receive(500);
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Consumer 1 should get message 'B'.", msg);
         assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
@@ -287,13 +287,13 @@ public class DurableSubscriptionTest ext
         else
         {
             _logger.info("Receive message on consumer 3 :expecting B");
-            msg = consumer3.receive(500);
+            msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
             assertNotNull("Consumer 3 should get message 'B'.", msg);
             assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage) msg).getText());
         }
 
         _logger.info("Receive message on consumer 1 :expecting C");
-        msg = consumer1.receive(500);
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Consumer 1 should get message 'C'.", msg);
         assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
@@ -301,7 +301,7 @@ public class DurableSubscriptionTest ext
         assertNull("There should be no more messages for consumption on consumer1.", msg);
 
         _logger.info("Receive message on consumer 3 :expecting C");
-        msg = consumer3.receive(500);
+        msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Consumer 3 should get message 'C'.", msg);
         assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 3 :expecting null");
@@ -358,7 +358,7 @@ public class DurableSubscriptionTest ext
         // Send message and check that both consumers get it and only it.
         producer.send(session0.createTextMessage("A"));
 
-        msg = consumer1.receive(500);
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Message should be available", msg);
         assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText());
         msg = consumer1.receive(500);
@@ -729,7 +729,7 @@ public class DurableSubscriptionTest ext
 
         conn.start();
         
-        Message rMsg = subB.receive(1000);
+        Message rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull(rMsg);
         assertEquals("Content was wrong", 
                      "testResubscribeWithChangedSelectorAndRestart2",
@@ -797,7 +797,7 @@ public class DurableSubscriptionTest ext
         
         conn.start();
         
-        Message rMsg = subTwo.receive(1000);
+        Message rMsg = subTwo.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull(rMsg);
         assertEquals("Content was wrong", 
                      "testResubscribeWithChangedSelectorAndRestart1",

Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Fri Oct 28 15:52:13 2011
@@ -20,38 +20,28 @@
  */
 package org.apache.qpid.test.unit.topic;
 
+import javax.jms.Connection;
 import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
-import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQTopicSessionAdaptor;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 
 /** @author Apache Software Foundation */
 public class TopicSessionTest extends QpidBrokerTestCase
 {
-
-    protected void setUp() throws Exception
-    {
-        super.setUp();
-    }
-
-    protected void tearDown() throws Exception
-    {
-        super.tearDown();
-    }
-
-
     public void testTopicSubscriptionUnsubscription() throws Exception
     {
 
@@ -228,83 +218,6 @@ public class TopicSessionTest extends Qp
         con.close();
     }
 
-    public void testSendingSameMessage() throws Exception
-    {
-        AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
-        TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
-        TemporaryTopic topic = session.createTemporaryTopic();
-        assertNotNull(topic);
-        TopicPublisher producer = session.createPublisher(topic);
-        MessageConsumer consumer = session.createConsumer(topic);
-        conn.start();
-        TextMessage sentMessage = session.createTextMessage("Test Message");
-        producer.send(sentMessage);
-        session.commit();
-        TextMessage receivedMessage = (TextMessage) consumer.receive(2000);
-        assertNotNull(receivedMessage);
-        assertEquals(sentMessage.getText(), receivedMessage.getText());
-        producer.send(sentMessage);
-        session.commit();
-        receivedMessage = (TextMessage) consumer.receive(2000);
-        assertNotNull(receivedMessage);
-        assertEquals(sentMessage.getText(), receivedMessage.getText());
-        session.commit();
-        conn.close();
-
-    }
-
-    public void testTemporaryTopic() throws Exception
-    {
-        AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
-        TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
-        TemporaryTopic topic = session.createTemporaryTopic();
-        assertNotNull(topic);
-        TopicPublisher producer = session.createPublisher(topic);
-        MessageConsumer consumer = session.createConsumer(topic);
-        conn.start();
-        producer.send(session.createTextMessage("hello"));
-        session.commit();
-        TextMessage tm = (TextMessage) consumer.receive(2000);
-        assertNotNull(tm);
-        assertEquals("hello", tm.getText());
-        session.commit();
-        try
-        {
-            topic.delete();
-            fail("Expected JMSException : should not be able to delete while there are active consumers");
-        }
-        catch (JMSException je)
-        {
-            ; //pass
-        }
-
-        consumer.close();
-
-        try
-        {
-            topic.delete();
-        }
-        catch (JMSException je)
-        {
-            fail("Unexpected Exception: " + je.getMessage());
-        }
-
-        TopicSession session2 = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-        try
-        {
-            session2.createConsumer(topic);
-            fail("Expected a JMSException when subscribing to a temporary topic created on adifferent session");
-        }
-        catch (JMSException je)
-        {
-            ; // pass
-        }
-
-
-        conn.close();
-    }
-
-
     public void testNoLocal() throws Exception
     {
 
@@ -398,56 +311,39 @@ public class TopicSessionTest extends Qp
     }
 
     /**
-     * This tests QPID-1191, where messages which are sent to a topic but are not consumed by a subscriber
-     * due to a selector can be leaked.
-     * @throws Exception
+     * This tests was added to demonstrate QPID-3542.  The Java Client when used with the CPP Broker was failing to
+     * ack messages received that did not match the selector.  This meant the messages remained indefinitely on the Broker.
      */
-    public void testNonMatchingMessagesDoNotFillQueue() throws Exception
+    public void testNonMatchingMessagesHandledCorrectly() throws Exception
     {
-        AMQConnection con = (AMQConnection) getConnection("guest", "guest");
-
-        // Setup Topic
-        AMQTopic topic = new AMQTopic(con, "testNoLocal");
-
-        TopicSession session = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
+        final String topicName = getName();
+        final String clientId = "clientId" + topicName;
+        final Connection con1 = getConnection();
+        final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Topic topic1 = session1.createTopic(topicName);
+        final AMQQueue internalNameOnBroker = new AMQQueue("amq.topic", "clientid" + ":" + clientId);
 
         // Setup subscriber with selector
-        TopicSubscriber selector = session.createSubscriber(topic,  "Selector = 'select'", false);
-        TopicPublisher publisher = session.createPublisher(topic);
+        final TopicSubscriber subscriberWithSelector = session1.createDurableSubscriber(topic1, clientId, "Selector = 'select'", false);
+        final MessageProducer publisher = session1.createProducer(topic1);
 
-        con.start();
-        TextMessage m;
-        TextMessage message;
+        con1.start();
 
         // Send non-matching message
-        message = session.createTextMessage("non-matching 1");
-        publisher.publish(message);
-        session.commit();
-
-        // Send and consume matching message
-        message = session.createTextMessage("hello");
-        message.setStringProperty("Selector", "select");
-
-        publisher.publish(message);
-        session.commit();
-
-        m = (TextMessage) selector.receive(1000);
-        assertNotNull("should have received message", m);
-        assertEquals("Message contents were wrong", "hello", m.getText());
-
-        // Send non-matching message
-        message = session.createTextMessage("non-matching 2");
-        publisher.publish(message);
-        session.commit();
-
-        // Assert queue count is 0
-        long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic);
-        assertEquals("Queue depth was wrong", 0, depth);
-
-    }
-
-    public static junit.framework.Test suite()
-    {
-        return new junit.framework.TestSuite(TopicSessionTest.class);
+        final Message sentMessage = session1.createTextMessage("hello");
+        sentMessage.setStringProperty("Selector", "nonMatch");
+        publisher.send(sentMessage);
+
+        // Try to consume non-message, expect this to fail.
+        final Message message1 = subscriberWithSelector.receive(1000);
+        assertNull("should not have received message", message1);
+        subscriberWithSelector.close();
+
+        session1.close();
+
+        // Now verify queue depth on broker.
+        final Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final long depth = ((AMQSession) session2).getQueueDepth(internalNameOnBroker);
+        assertEquals("Expected queue depth of zero", 0, depth);
     }
 }

Propchange: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,5 +3,5 @@
 /qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java:805429-821809
 /qpid/branches/qpid-2935/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java:1061302-1072333
-/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java:1144319-1179750
+/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java:1144319-1190375
 /qpid/trunk/qpid/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java:796646-796653

Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -1,3 +1,3 @@
 /qpid/branches/0.5.x-dev/qpid/java/test-profiles:931179
 /qpid/branches/qpid-2935/qpid/java/test-profiles:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles:1144319-1190375

Modified: qpid/branches/qpid-3346/qpid/java/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/test-profiles/CPPExcludes?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/test-profiles/CPPExcludes (original)
+++ qpid/branches/qpid-3346/qpid/java/test-profiles/CPPExcludes Fri Oct 28 15:52:13 2011
@@ -64,9 +64,6 @@ org.apache.qpid.test.unit.client.connect
 // 0-10 c++ broker in cpp.testprofile is started with no auth so won't pass this test
 org.apache.qpid.test.unit.client.connection.ConnectionTest#testPasswordFailureConnection
 
-// c++ broker doesn't do selectors, so this will fail
-org.apache.qpid.test.unit.topic.TopicSessionTest#testNonMatchingMessagesDoNotFillQueue
-
 // InVM Broker tests
 org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#*
 

Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/CPPExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes:805429-821809
 /qpid/branches/qpid-2935/qpid/java/test-profiles/CPPExcludes:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/CPPExcludes:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/CPPExcludes:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/Excludes:805429-821809
 /qpid/branches/qpid-2935/qpid/java/test-profiles/Excludes:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/Excludes:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/Excludes:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/JavaBDBExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,3 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/08TransientExcludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/08TransientExcludes:805429-821809
 /qpid/branches/qpid-2935/qpid/java/test-profiles/JavaTransientExcludes:1061302-1072333
+/qpid/trunk/qpid/java/test-profiles/JavaBDBExcludes:1175236-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/JavaExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/08Excludes:805429-821809
 /qpid/branches/qpid-2935/qpid/java/test-profiles/JavaExcludes:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/JavaExcludes:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/JavaExcludes:1144319-1190375

Modified: qpid/branches/qpid-3346/qpid/java/test-profiles/JavaPre010Excludes
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/test-profiles/JavaPre010Excludes?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/test-profiles/JavaPre010Excludes (original)
+++ qpid/branches/qpid-3346/qpid/java/test-profiles/JavaPre010Excludes Fri Oct 28 15:52:13 2011
@@ -28,6 +28,7 @@ org.apache.qpid.test.client.message.JMSD
 org.apache.qpid.test.client.message.JMSDestinationTest#testGetDestinationWithCustomExchange
 
 // The new addressing based syntax is not supported for AMQP 0-8/0-9 versions
+org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#*
 org.apache.qpid.test.client.destination.AddressBasedDestinationTest#*
 org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy
 org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy
@@ -43,20 +44,14 @@ org.apache.qpid.test.unit.client.connect
 // uses AMQP 0-10 related properties
 org.apache.qpid.test.unit.message.JMSPropertiesTest#testQpidExtensionProperties
 
-// QPID-3034: tests are passing on 0.10 profiles but failing on 0.9.1 profiles
-org.apache.qpid.test.unit.ack.RecoverTest#testRecoverResendsMsgs
-org.apache.qpid.test.unit.ack.RecoverTest#testRecoverResendsMsgsAckOnEarlier
-org.apache.qpid.test.unit.ack.RecoverTest#testAcknowledgePerConsumer
-org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testRecoverResendsMsgs
-org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testRecoverResendsMsgsAckOnEarlier
-org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testAcknowledgePerConsumer
-org.apache.qpid.test.unit.ack.RecoverTest#testOderingWithSyncConsumer
-org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testOderingWithSyncConsumer
-
-
 // LVQ tests use new address syntax and can not be run on 0.9.1 profiles
 org.apache.qpid.test.client.queue.LVQTest#*
 
 // Verification of unique client id is 0-10 specific
 org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificationForSameUser
 org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificationForDifferentUsers
+
+// Under AMQP 0-8..0-9-1 temporary queues are deleted on consumer close, rather than connection close
+// and for this reason this test would fail.
+org.apache.qpid.test.unit.client.temporaryqueue.TemporaryQueueTest#testTemporaryQueueReused
+

Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/JavaPre010Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/08StandaloneExcludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes:805429-821809
 /qpid/branches/qpid-2935/qpid/java/test-profiles/08StandaloneExcludes:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/JavaTransientExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/08TransientExcludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/08TransientExcludes:805429-821809
 /qpid/branches/qpid-2935/qpid/java/test-profiles/JavaTransientExcludes:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/XAExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/XAExcludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/XAExcludes:805429-821809
 /qpid/branches/qpid-2935/qpid/java/test-profiles/XAExcludes:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/XAExcludes:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/XAExcludes:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/cpp.async.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.async.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.async.testprofile:805429-821809
 /qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.async.testprofile:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/cpp.cluster.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.cluster.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.cluster.testprofile:805429-821809
 /qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.cluster.testprofile:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.cluster.testprofile:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/cpp.cluster.testprofile:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/cpp.noprefetch.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.noprefetch.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.noprefetch.testprofile:805429-821809
 /qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.noprefetch.testprofile:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/cpp.ssl.excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.ssl.excludes:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.ssl.excludes:805429-821809
 /qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.ssl.excludes:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/cpp.ssl.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.ssl.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.ssl.testprofile:805429-821809
 /qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.ssl.testprofile:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.ssl.testprofile:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/cpp.ssl.testprofile:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/cpp.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.testprofile:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.testprofile:805429-821809
 /qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.testprofile:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:1144319-1190375



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org