You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/10/28 17:52:18 UTC
svn commit: r1190406 [4/5] - in /qpid/branches/qpid-3346/qpid: ./ bin/
cpp/design_docs/ cpp/docs/api/ cpp/docs/man/
cpp/examples/old_api/tradedemo/ cpp/include/qmf/engine/ cpp/src/
cpp/src/posix/ cpp/src/qmf/engine/ cpp/src/qpid/ cpp/src/qpid/acl/ cpp/...
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Fri Oct 28 15:52:13 2011
@@ -50,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Session
@@ -125,6 +126,8 @@ public class Session extends SessionInvo
private SessionDetachCode detachCode;
private final Object stateLock = new Object();
+ private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
+
protected Session(Connection connection, Binary name, long expiry)
{
this(connection, new SessionDelegate(), name, expiry);
@@ -257,6 +260,7 @@ public class Session extends SessionInvo
void resume()
{
+ _failoverRequired.set(false);
synchronized (commands)
{
attach();
@@ -459,7 +463,7 @@ public class Session extends SessionInvo
synchronized (commands)
{
- if (state == DETACHED || state == CLOSING)
+ if (state == DETACHED || state == CLOSING || state == CLOSED)
{
return;
}
@@ -583,30 +587,25 @@ public class Session extends SessionInvo
synchronized (commands)
{
- //allow the txSelect operation to be invoked during resume
- boolean skipWait = m instanceof TxSelect && state == RESUMING;
-
- if(!skipWait)
+ if (state == DETACHED && m.isUnreliable())
{
- if (state == DETACHED && m.isUnreliable())
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
{
- Thread current = Thread.currentThread();
- if (!current.equals(resumer))
- {
- return;
- }
+ return;
}
+ }
- if (state != OPEN && state != CLOSED && state != CLOSING)
+ if (state != OPEN && state != CLOSED && state != CLOSING)
+ {
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer) )
{
- Thread current = Thread.currentThread();
- if (!current.equals(resumer))
+ Waiter w = new Waiter(commands, timeout);
+ while (w.hasTime() && (state != OPEN && state != CLOSED))
{
- Waiter w = new Waiter(commands, timeout);
- while (w.hasTime() && (state != OPEN && state != CLOSED))
- {
- w.await();
- }
+ checkFailoverRequired("Command was interrupted because of failover, before being sent");
+ w.await();
}
}
}
@@ -674,6 +673,7 @@ public class Session extends SessionInvo
}
}
}
+ checkFailoverRequired("Command was interrupted because of failover, before being sent");
w.await();
}
}
@@ -768,6 +768,14 @@ public class Session extends SessionInvo
}
}
+ private void checkFailoverRequired(String message)
+ {
+ if (_failoverRequired.get())
+ {
+ throw new SessionException(message);
+ }
+ }
+
protected boolean shouldIssueFlush(int next)
{
return (next % 65536) == 0;
@@ -793,6 +801,7 @@ public class Session extends SessionInvo
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
{
+ checkFailoverRequired("Session sync was interrupted by failover.");
log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands);
w.await();
}
@@ -853,13 +862,6 @@ public class Session extends SessionInvo
}
}
- private ConnectionClose close = null;
-
- void closeCode(ConnectionClose close)
- {
- this.close = close;
- }
-
ExecutionException getException()
{
synchronized (results)
@@ -910,6 +912,7 @@ public class Session extends SessionInvo
Waiter w = new Waiter(this, timeout);
while (w.hasTime() && state != CLOSED && !isDone())
{
+ checkFailoverRequired("Operation was interrupted by failover.");
log.debug("%s waiting for result: %s", Session.this, this);
w.await();
}
@@ -921,7 +924,12 @@ public class Session extends SessionInvo
}
else if (state == CLOSED)
{
- throw new SessionException(getException());
+ ExecutionException ex = getException();
+ if(ex == null)
+ {
+ throw new SessionClosedException();
+ }
+ throw new SessionException(ex);
}
else
{
@@ -1001,6 +1009,7 @@ public class Session extends SessionInvo
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && state != CLOSED)
{
+ checkFailoverRequired("close() was interrupted by failover.");
w.await();
}
@@ -1095,6 +1104,7 @@ public class Session extends SessionInvo
Waiter w = new Waiter(stateLock, timeout);
while (w.hasTime() && state == NEW)
{
+ checkFailoverRequired("Session opening was interrupted by failover.");
w.await();
}
}
@@ -1117,4 +1127,26 @@ public class Session extends SessionInvo
{
return stateLock;
}
+
+ protected void notifyFailoverRequired()
+ {
+ //ensure any operations waiting are aborted to
+ //prevent them waiting for timeout for 60 seconds
+ //and possibly preventing failover proceeding
+ _failoverRequired.set(true);
+ synchronized (commands)
+ {
+ commands.notifyAll();
+ }
+ synchronized (results)
+ {
+ for (ResultFuture<?> result : results.values())
+ {
+ synchronized(result)
+ {
+ result.notifyAll();
+ }
+ }
+ }
+ }
}
Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/management/ConfigurationManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/management/LoggingManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747869,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,4 +3,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,4 +3,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,4 +3,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanAttribute.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,4 +3,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanConstructor.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,4 +3,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanDescription.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,4 +3,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperation.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,4 +3,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:1061302-1072333
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanOperationParameter.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:1144319-1179750
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/eclipse-plugin/src/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/management/eclipse-plugin/src:788755
/qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src:1144319-1179750
+/qpid/trunk/qpid/java/management/eclipse-plugin/src:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:1144319-1179750
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:1144319-1179750
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:1144319-1179750
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:1144319-1179750
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:805429-821809
/qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:1061302-1072333
-/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:1144319-1179750
+/qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc:1144319-1190375
Modified: qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-firewall-2.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-firewall-2.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-firewall-2.xml (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-firewall-2.xml Fri Oct 28 15:52:13 2011
@@ -32,8 +32,8 @@
<ssl>
<enabled>false</enabled>
<sslOnly>false</sslOnly>
- <keystorePath>/path/to/keystore.ks</keystorePath>
- <keystorePassword>keystorepass</keystorePassword>
+ <keyStorePath>/path/to/keystore.ks</keyStorePath>
+ <keyStorePassword>keystorepass</keyStorePassword>
</ssl>
<port>5672</port>
<sslport>8672</sslport>
Modified: qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-firewall-3.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-firewall-3.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-firewall-3.xml (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-firewall-3.xml Fri Oct 28 15:52:13 2011
@@ -28,12 +28,12 @@
<connector>
<!-- To enable SSL edit the keystorePath and keystorePassword
and set enabled to true.
- To disasble Non-SSL port set sslOnly to true -->
+ To disable Non-SSL port set sslOnly to true -->
<ssl>
<enabled>false</enabled>
<sslOnly>false</sslOnly>
- <keystorePath>/path/to/keystore.ks</keystorePath>
- <keystorePassword>keystorepass</keystorePassword>
+ <keyStorePath>/path/to/keystore.ks</keyStorePath>
+ <keyStorePassword>keystorepass</keyStorePassword>
</ssl>
<port>5672</port>
<sslport>8672</sslport>
Modified: qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-settings.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-settings.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-settings.xml (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/etc/config-systests-settings.xml Fri Oct 28 15:52:13 2011
@@ -25,8 +25,8 @@
<port>15671</port>
<enabled>false</enabled>
<sslOnly>false</sslOnly>
- <keystorePath>${QPID_HOME}/../test-profiles/test_resources/ssl/java_broker_keystore.jks</keystorePath>
- <keystorePassword>password</keystorePassword>
+ <keyStorePath>${QPID_HOME}/../test-profiles/test_resources/ssl/java_broker_keystore.jks</keyStorePath>
+ <keyStorePassword>password</keyStorePassword>
</ssl>
</connector>
<management>
Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java Fri Oct 28 15:52:13 2011
@@ -35,13 +35,11 @@ import org.slf4j.LoggerFactory;
public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase
{
-
- private static final int NUM_MESSAGES = 1000;
-
private Connection con;
private Session session;
private AMQQueue queue;
private MessageConsumer consumer;
+ private int numMessages;
private static final Logger _logger = LoggerFactory.getLogger(AMQQueueDeferredOrderingTest.class);
@@ -87,6 +85,8 @@ public class AMQQueueDeferredOrderingTes
{
super.setUp();
+ numMessages = isBrokerStorePersistent() ? 300 : 1000;
+
_logger.info("Create Connection");
con = getConnection();
_logger.info("Create Session");
@@ -105,19 +105,19 @@ public class AMQQueueDeferredOrderingTes
// Setup initial messages
_logger.info("Creating first producer thread");
- producerThread = new ASyncProducer(queue, 0, NUM_MESSAGES / 2);
+ producerThread = new ASyncProducer(queue, 0, numMessages / 2);
producerThread.start();
// Wait for them to be done
producerThread.join();
// Setup second set of messages to produce while we consume
_logger.info("Creating second producer thread");
- producerThread = new ASyncProducer(queue, NUM_MESSAGES / 2, NUM_MESSAGES);
+ producerThread = new ASyncProducer(queue, numMessages / 2, numMessages);
producerThread.start();
// Start consuming and checking they're in order
_logger.info("Consuming messages");
- for (int i = 0; i < NUM_MESSAGES; i++)
+ for (int i = 0; i < numMessages; i++)
{
Message msg = consumer.receive(3000);
assertNotNull("Message should not be null", msg);
Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java Fri Oct 28 15:52:13 2011
@@ -135,7 +135,7 @@ public class ResetMessageListenerTest ex
try
{
assertTrue("Did not receive all first batch of messages",
- _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS));
+ _allFirstMessagesSent.await(MSG_COUNT, TimeUnit.SECONDS));
_logger.info("Received first batch of messages");
}
catch (InterruptedException e)
@@ -212,7 +212,7 @@ public class ResetMessageListenerTest ex
try
{
- assertTrue(_allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS));
+ assertTrue(_allSecondMessagesSent.await(MSG_COUNT, TimeUnit.SECONDS));
}
catch (InterruptedException e)
{
Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java Fri Oct 28 15:52:13 2011
@@ -20,6 +20,7 @@ package org.apache.qpid.client.failover;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -33,12 +34,15 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.test.utils.FailoverBaseCase;
/**
@@ -96,6 +100,11 @@ public class FailoverBehaviourTest exten
*/
private JMSException _exceptionListenerException;
+ /**
+ * Latch to check that failover mutex is hold by a failover thread
+ */
+ private CountDownLatch _failoverStarted;
+
@Override
protected void setUp() throws Exception
{
@@ -105,6 +114,7 @@ public class FailoverBehaviourTest exten
_connection.setExceptionListener(this);
((AMQConnection) _connection).setConnectionListener(this);
_failoverComplete = new CountDownLatch(1);
+ _failoverStarted = new CountDownLatch(1);
}
/**
@@ -463,6 +473,31 @@ public class FailoverBehaviourTest exten
}
/**
+ * Test that calling acknowledge before failover leaves the session
+ * clean for use after failover.
+ */
+ public void testAcknowledgeBeforeFailover() throws Exception
+ {
+ init(Session.CLIENT_ACKNOWLEDGE, true);
+
+ produceMessages();
+
+ // consume messages and acknowledge them
+ Message lastMessage = consumeMessages();
+ lastMessage.acknowledge();
+
+ causeFailure();
+
+ assertFailoverException();
+
+ produceMessages();
+
+ // tests whether receiving and acknowledgment is working after recover
+ lastMessage = consumeMessages();
+ lastMessage.acknowledge();
+ }
+
+ /**
* Test that receiving of messages after failover prior to calling
* {@link Message#acknowledge()} still results in acknowledge throwing an exception.
*/
@@ -600,8 +635,134 @@ public class FailoverBehaviourTest exten
sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE);
}
+ public void testPublishAutoAcknowledgedWhileFailover() throws Exception
+ {
+ publishWhileFailingOver(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testPublishClientAcknowledgedWhileFailover() throws Exception
+ {
+ Message receivedMessage = publishWhileFailingOver(Session.CLIENT_ACKNOWLEDGE);
+ receivedMessage.acknowledge();
+ }
+
+ public void testPublishTransactedAcknowledgedWhileFailover() throws Exception
+ {
+ publishWhileFailingOver(Session.SESSION_TRANSACTED);
+ _consumerSession.commit();
+ }
+
+ public void testPublishAutoAcknowledgedWithFailoverMutex() throws Exception
+ {
+ publishWithFailoverMutex(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testPublishClientAcknowledgedWithFailoverMutex() throws Exception
+ {
+ publishWithFailoverMutex(Session.CLIENT_ACKNOWLEDGE);
+
+ }
+
+ public void testPublishTransactedAcknowledgedWithFailoverMutex() throws Exception
+ {
+ publishWithFailoverMutex(Session.SESSION_TRANSACTED);
+ }
+
+ public void testClientAcknowledgedSessionCloseWhileFailover() throws Exception
+ {
+ sessionCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testTransactedSessionCloseWhileFailover() throws Exception
+ {
+ sessionCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
+ }
+
+ public void testAutoAcknowledgedSessionCloseWhileFailover() throws Exception
+ {
+ sessionCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testClientAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
+ {
+ browserCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testTransactedQueueBrowserCloseWhileFailover() throws Exception
+ {
+ browserCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
+ }
+
+ public void testAutoAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
+ {
+ browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException
+ {
+ setDelayedFailoverPolicy(5);
+ init(autoAcknowledge, true);
+
+ String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
+ Message message = _producerSession.createTextMessage(text);
+
+ failBroker(getFailingPort());
+
+ if(!_failoverStarted.await(5, TimeUnit.SECONDS))
+ {
+ fail("Did not receieve notification failover had started");
+ }
+
+ _producer.send(message);
+
+ if (_producerSession.getTransacted())
+ {
+ _producerSession.commit();
+ }
+
+ Message receivedMessage = _consumer.receive(1000l);
+ assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+ return receivedMessage;
+ }
+
+ private void publishWithFailoverMutex(int autoAcknowledge) throws JMSException, InterruptedException
+ {
+ setDelayedFailoverPolicy(5);
+ init(autoAcknowledge, true);
+
+ String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
+ Message message = _producerSession.createTextMessage(text);
+
+ AMQConnection connection = (AMQConnection)_connection;
+
+ // holding failover mutex should prevent the failover from
+ // proceeding before we try to send the message
+ synchronized(connection.getFailoverMutex())
+ {
+ failBroker(getFailingPort());
+
+ // wait to make sure that connection is lost
+ while(!connection.isFailingOver())
+ {
+ Thread.sleep(25l);
+ }
+
+ try
+ {
+ _producer.send(message);
+ fail("Sending should fail because connection was lost and failover has not yet completed");
+ }
+ catch(JMSException e)
+ {
+ // JMSException is expected
+ }
+ }
+ // wait for failover completion, thus ensuring it actually
+ //got started, before allowing the test to tear down
+ awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
+ }
/**
- * Tests {@link Session#close()} for session with given acknowledge mode
+ * Tests {@link Session#close()} for session with given acknowledge mode
* to ensure that close works after failover.
*
* @param acknowledgeMode session acknowledge mode
@@ -646,7 +807,7 @@ public class FailoverBehaviourTest exten
boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false;
_consumerSession = _connection.createSession(isTransacted, acknowledgeMode);
- _destination = _consumerSession.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
+ _destination = createDestination(_consumerSession);
_consumer = _consumerSession.createConsumer(_destination);
if (startConnection)
@@ -659,6 +820,11 @@ public class FailoverBehaviourTest exten
}
+ protected Destination createDestination(Session session) throws JMSException
+ {
+ return session.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
+ }
+
/**
* Resends messages if reconnected to a non-clustered broker
*
@@ -854,6 +1020,7 @@ public class FailoverBehaviourTest exten
@Override
public boolean preFailover(boolean redirect)
{
+ _failoverStarted.countDown();
return true;
}
@@ -875,6 +1042,39 @@ public class FailoverBehaviourTest exten
_exceptionListenerException = e;
}
+ /**
+ * Causes 1 second delay before reconnect in order to test whether JMS
+ * methods block while failover is in progress
+ */
+ private static class DelayingFailoverPolicy extends FailoverPolicy
+ {
+
+ private CountDownLatch _suspendLatch;
+ private long _delay;
+
+ public DelayingFailoverPolicy(AMQConnection connection, long delay)
+ {
+ super(connection.getConnectionURL(), connection);
+ _suspendLatch = new CountDownLatch(1);
+ _delay = delay;
+ }
+
+ public void attainedConnection()
+ {
+ try
+ {
+ _suspendLatch.await(_delay, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ // continue
+ }
+ super.attainedConnection();
+ }
+
+ }
+
+
private class FailoverTestMessageListener implements MessageListener
{
// message counter
@@ -921,4 +1121,101 @@ public class FailoverBehaviourTest exten
return _counter.get();
}
}
+
+ /**
+ * Tests {@link Session#close()} for session with given acknowledge mode
+ * to ensure that it blocks until failover implementation restores connection.
+ *
+ * @param acknowledgeMode session acknowledge mode
+ * @throws JMSException
+ */
+ private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException
+ {
+ initDelayedFailover(acknowledgeMode);
+
+ // intentionally receive message but not commit or acknowledge it in
+ // case of transacted or CLIENT_ACK session
+ Message receivedMessage = _consumer.receive(1000l);
+ assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+
+ failBroker(getFailingPort());
+
+ // test whether session#close blocks while failover is in progress
+ _consumerSession.close();
+
+ assertFailoverException();
+ }
+
+ /**
+ * A helper method to instantiate {@link QueueBrowser} and publish test messages on a test queue for further browsing.
+ *
+ * @param acknowledgeMode session acknowledge mode
+ * @return queue browser
+ * @throws JMSException
+ */
+ private QueueBrowser prepareQueueBrowser(int acknowledgeMode) throws JMSException
+ {
+ init(acknowledgeMode, false);
+ _consumer.close();
+ QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination);
+ _connection.start();
+
+ produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
+ if (acknowledgeMode == Session.SESSION_TRANSACTED)
+ {
+ _producerSession.commit();
+ }
+ return browser;
+ }
+
+ /**
+ * Tests {@link QueueBrowser#close()} for session with given acknowledge mode
+ * to ensure that it blocks until failover implementation restores connection.
+ *
+ * @param acknowledgeMode session acknowledge mode
+ * @throws JMSException
+ */
+ private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException
+ {
+ setDelayedFailoverPolicy();
+
+ QueueBrowser browser = prepareQueueBrowser(acknowledgeMode);
+
+ @SuppressWarnings("unchecked")
+ Enumeration<Message> messages = browser.getEnumeration();
+ Message receivedMessage = (Message) messages.nextElement();
+ assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+
+ failBroker(getFailingPort());
+
+ browser.close();
+
+ assertFailoverException();
+ }
+
+ private DelayingFailoverPolicy initDelayedFailover(int acknowledgeMode) throws JMSException
+ {
+ DelayingFailoverPolicy failoverPolicy = setDelayedFailoverPolicy();
+ init(acknowledgeMode, true);
+ produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
+ if (acknowledgeMode == Session.SESSION_TRANSACTED)
+ {
+ _producerSession.commit();
+ }
+ return failoverPolicy;
+ }
+
+ private DelayingFailoverPolicy setDelayedFailoverPolicy()
+ {
+ return setDelayedFailoverPolicy(2);
+ }
+
+ private DelayingFailoverPolicy setDelayedFailoverPolicy(long delay)
+ {
+ AMQConnection amqConnection = (AMQConnection) _connection;
+ DelayingFailoverPolicy failoverPolicy = new DelayingFailoverPolicy(amqConnection, delay);
+ ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy);
+ return failoverPolicy;
+ }
+
}
Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MultipleTransactedBatchProducerTest.java Fri Oct 28 15:52:13 2011
@@ -122,7 +122,8 @@ public class MultipleTransactedBatchProd
producer2.start();
//await delivery of the messages
- boolean result = _receivedLatch.await(75, TimeUnit.SECONDS);
+ int timeout = isBrokerStorePersistent() ? 300 : 75;
+ boolean result = _receivedLatch.await(timeout, TimeUnit.SECONDS);
assertNull("Test failed because: " + String.valueOf(_failMsg), _failMsg);
assertTrue("Some of the messages were not all recieved in the given timeframe, remaining count was: "+_receivedLatch.getCount(),
Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java Fri Oct 28 15:52:13 2011
@@ -110,18 +110,13 @@ public class QueueDepthWithSelectorTest
try
{
Connection connection = getConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQSession session = (AMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Thread.sleep(2000);
- long queueDepth = ((AMQSession) session).getQueueDepth((AMQDestination) _queue);
+ long queueDepth = session.getQueueDepth((AMQDestination) _queue);
assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth);
connection.close();
}
- catch (InterruptedException e)
- {
- fail(e.getMessage());
- }
catch (AMQException e)
{
fail(e.getMessage());
@@ -158,6 +153,10 @@ public class QueueDepthWithSelectorTest
{
assertTrue("Message " + msgId + " not received.", msgIdRecevied[msgId]);
}
+
+ //do a synchronous op to ensure the acks are processed
+ //on the broker before proceeding
+ ((AMQSession)_clientSession).sync();
}
/**
Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java Fri Oct 28 15:52:13 2011
@@ -94,7 +94,7 @@ public class CancelTest extends QpidBrok
browser.close();
MessageConsumer consumer = _clientSession.createConsumer(_queue);
- assertNotNull( consumer.receive() );
+ assertNotNull( consumer.receive(2000l) );
consumer.close();
}
}
Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java Fri Oct 28 15:52:13 2011
@@ -22,237 +22,145 @@
package org.apache.qpid.test.unit.client.temporaryqueue;
import javax.jms.Connection;
-import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
-import junit.framework.Assert;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.jms.ConnectionListener;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.LinkedList;
-
-public class TemporaryQueueTest extends QpidBrokerTestCase implements ExceptionListener
+/**
+ * Tests the behaviour of {@link TemporaryQueue}.
+ */
+public class TemporaryQueueTest extends QpidBrokerTestCase
{
- private List<Exception> _exceptions = new ArrayList<Exception>();
-
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- protected Connection createConnection() throws Exception
- {
- return getConnection("guest", "guest");
- }
-
- public void testTemporaryQueue() throws Exception
- {
- Connection conn = createConnection();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- TemporaryQueue queue = session.createTemporaryQueue();
+ /**
+ * Tests the basic produce/consume behaviour of a temporary queue.
+ */
+ public void testMessageDeliveryUsingTemporaryQueue() throws Exception
+ {
+ final Connection conn = getConnection();
+ final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final TemporaryQueue queue = session.createTemporaryQueue();
assertNotNull(queue);
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createConsumer(queue);
+ final MessageProducer producer = session.createProducer(queue);
+ final MessageConsumer consumer = session.createConsumer(queue);
conn.start();
producer.send(session.createTextMessage("hello"));
TextMessage tm = (TextMessage) consumer.receive(2000);
- assertNotNull(tm);
+ assertNotNull("Message not received", tm);
assertEquals("hello", tm.getText());
+ }
- try
- {
- queue.delete();
- fail("Expected JMSException : should not be able to delete while there are active consumers");
- }
- catch (JMSException je)
- {
- ; //pass
- }
-
- consumer.close();
+ /**
+ * Tests that a temporary queue cannot be used by another {@link Session}.
+ */
+ public void testUseFromAnotherSessionProhibited() throws Exception
+ {
+ final Connection conn = getConnection();
+ final Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final TemporaryQueue queue = session1.createTemporaryQueue();
+ assertNotNull(queue);
try
{
- queue.delete();
+ session2.createConsumer(queue);
+ fail("Expected a JMSException when subscribing to a temporary queue created on a different session");
}
catch (JMSException je)
{
- fail("Unexpected Exception: " + je.getMessage());
+ //pass
+ assertEquals("Cannot consume from a temporary destination created on another session", je.getMessage());
}
-
- conn.close();
- }
-
- public void tUniqueness() throws Exception
- {
- int numProcs = Runtime.getRuntime().availableProcessors();
- final int threadsProc = 5;
-
- runUniqueness(1, 10);
- runUniqueness(numProcs * threadsProc, 10);
- runUniqueness(numProcs * threadsProc, 100);
- runUniqueness(numProcs * threadsProc, 500);
}
- void runUniqueness(int makers, int queues) throws Exception
- {
- Connection connection = createConnection();
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ /**
+ * Tests that the client is able to explicitly delete a temporary queue using
+ * {@link TemporaryQueue#delete()} and is prevented from deleting one that
+ * still has consumers.
+ *
+ * Note: Under < 0-10 {@link TemporaryQueue#delete()} only marks the queue as deleted
+ * on the client. 0-10 causes the queue to be deleted from the Broker.
+ */
+ public void testExplictTemporaryQueueDeletion() throws Exception
+ {
+ final Connection conn = getConnection();
+ final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session; // Required to observe the queue binding on the Broker
+ final TemporaryQueue queue = session.createTemporaryQueue();
+ assertNotNull(queue);
+ final MessageConsumer consumer = session.createConsumer(queue);
+ conn.start();
- List<TempQueueMaker> tqList = new LinkedList<TempQueueMaker>();
+ assertTrue("Queue should be bound", amqSession.isQueueBound((AMQDestination)queue));
- //Create Makers
- for (int m = 0; m < makers; m++)
+ try
{
- tqList.add(new TempQueueMaker(session, queues));
+ queue.delete();
+ fail("Expected JMSException : should not be able to delete while there are active consumers");
}
-
-
- List<Thread> threadList = new LinkedList<Thread>();
-
- //Create Makers
- for (TempQueueMaker maker : tqList)
+ catch (JMSException je)
{
- threadList.add(new Thread(maker));
+ //pass
+ assertEquals("Temporary Queue has consumers so cannot be deleted", je.getMessage());
}
+ consumer.close();
- //Start threads
- for (Thread thread : threadList)
- {
- thread.start();
- }
+ // Now deletion should succeed.
+ queue.delete();
- // Join Threads
- for (Thread thread : threadList)
+ try
{
- try
- {
- thread.join();
- }
- catch (InterruptedException e)
- {
- fail("Couldn't correctly join threads");
- }
+ session.createConsumer(queue);
+ fail("Exception not thrown");
}
-
-
- List<AMQQueue> list = new LinkedList<AMQQueue>();
-
- // Test values
- for (TempQueueMaker maker : tqList)
+ catch (JMSException je)
{
- check(maker, list);
+ //pass
+ assertEquals("Cannot consume from a deleted destination", je.getMessage());
}
- Assert.assertEquals("Not enough queues made.", makers * queues, list.size());
-
- connection.close();
- }
-
- private void check(TempQueueMaker tq, List<AMQQueue> list)
- {
- for (AMQQueue q : tq.getList())
+ if (isBroker010())
{
- if (list.contains(q))
- {
- fail(q + " already exists.");
- }
- else
- {
- list.add(q);
- }
+ assertFalse("Queue should no longer be bound", amqSession.isQueueBound((AMQDestination)queue));
}
}
-
- class TempQueueMaker implements Runnable
+ /**
+ * Tests that a temporary queue remains available for reuse even after its initial
+ * consumer has disconnected.
+ *
+ * This test would fail under < 0-10 as their temporary queues are deleted automatically
+ * (broker side) after the last consumer disconnects (so message2 would be lost). For this
+ * reason this test is excluded from those profiles.
+ */
+ public void testTemporaryQueueReused() throws Exception
{
- List<AMQQueue> _queues;
- Session _session;
- private int _count;
-
-
- TempQueueMaker(Session session, int queues) throws JMSException
- {
- _queues = new LinkedList<AMQQueue>();
-
- _count = queues;
-
- _session = session;
- }
-
- public void run()
- {
- int i = 0;
- try
- {
- for (; i < _count; i++)
- {
- _queues.add((AMQQueue) _session.createTemporaryQueue());
- }
- }
- catch (JMSException jmse)
- {
- //stop
- }
- }
-
- List<AMQQueue> getList()
- {
- return _queues;
- }
- }
+ final Connection conn = getConnection();
+ final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final TemporaryQueue queue = session.createTemporaryQueue();
+ assertNotNull(queue);
- public void testQPID1217() throws Exception
- {
- Connection conA = getConnection();
- conA.setExceptionListener(this);
- Session sessA = conA.createSession(false, Session.AUTO_ACKNOWLEDGE);
- TemporaryQueue temp = sessA.createTemporaryQueue();
-
- MessageProducer prod = sessA.createProducer(temp);
- prod.send(sessA.createTextMessage("hi"));
-
- Thread.sleep(500);
- assertTrue("Exception received", _exceptions.isEmpty());
-
- Connection conB = getConnection();
- Session sessB = conB.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- JMSException ex = null;
- try
- {
- MessageConsumer consB = sessB.createConsumer(temp);
- }
- catch (JMSException e)
- {
- ex = e;
- }
- assertNotNull(ex);
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TemporaryQueueTest.class);
- }
+ final MessageProducer producer1 = session.createProducer(queue);
+ final MessageConsumer consumer1 = session.createConsumer(queue);
+ conn.start();
+ producer1.send(session.createTextMessage("message1"));
+ producer1.send(session.createTextMessage("message2"));
+ TextMessage tm = (TextMessage) consumer1.receive(2000);
+ assertNotNull("Message not received by first consumer", tm);
+ assertEquals("message1", tm.getText());
+ consumer1.close();
- public void onException(JMSException arg0)
- {
- _exceptions.add(arg0);
+ final MessageConsumer consumer2 = session.createConsumer(queue);
+ conn.start();
+ tm = (TextMessage) consumer2.receive(2000);
+ assertNotNull("Message not received by second consumer", tm);
+ assertEquals("message2", tm.getText());
+ consumer2.close();
}
-
}
Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Fri Oct 28 15:52:13 2011
@@ -262,7 +262,7 @@ public class DurableSubscriptionTest ext
producer.send(session1.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(500);
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Consumer 1 should get message 'B'.", msg);
assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
@@ -287,13 +287,13 @@ public class DurableSubscriptionTest ext
else
{
_logger.info("Receive message on consumer 3 :expecting B");
- msg = consumer3.receive(500);
+ msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Consumer 3 should get message 'B'.", msg);
assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage) msg).getText());
}
_logger.info("Receive message on consumer 1 :expecting C");
- msg = consumer1.receive(500);
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Consumer 1 should get message 'C'.", msg);
assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
@@ -301,7 +301,7 @@ public class DurableSubscriptionTest ext
assertNull("There should be no more messages for consumption on consumer1.", msg);
_logger.info("Receive message on consumer 3 :expecting C");
- msg = consumer3.receive(500);
+ msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Consumer 3 should get message 'C'.", msg);
assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
@@ -358,7 +358,7 @@ public class DurableSubscriptionTest ext
// Send message and check that both consumers get it and only it.
producer.send(session0.createTextMessage("A"));
- msg = consumer1.receive(500);
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Message should be available", msg);
assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText());
msg = consumer1.receive(500);
@@ -729,7 +729,7 @@ public class DurableSubscriptionTest ext
conn.start();
- Message rMsg = subB.receive(1000);
+ Message rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelectorAndRestart2",
@@ -797,7 +797,7 @@ public class DurableSubscriptionTest ext
conn.start();
- Message rMsg = subTwo.receive(1000);
+ Message rMsg = subTwo.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelectorAndRestart1",
Modified: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Fri Oct 28 15:52:13 2011
@@ -20,38 +20,28 @@
*/
package org.apache.qpid.test.unit.topic;
+import javax.jms.Connection;
import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Session;
-import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
+import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQTopicSessionAdaptor;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
/** @author Apache Software Foundation */
public class TopicSessionTest extends QpidBrokerTestCase
{
-
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
-
public void testTopicSubscriptionUnsubscription() throws Exception
{
@@ -228,83 +218,6 @@ public class TopicSessionTest extends Qp
con.close();
}
- public void testSendingSameMessage() throws Exception
- {
- AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
- TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
- TemporaryTopic topic = session.createTemporaryTopic();
- assertNotNull(topic);
- TopicPublisher producer = session.createPublisher(topic);
- MessageConsumer consumer = session.createConsumer(topic);
- conn.start();
- TextMessage sentMessage = session.createTextMessage("Test Message");
- producer.send(sentMessage);
- session.commit();
- TextMessage receivedMessage = (TextMessage) consumer.receive(2000);
- assertNotNull(receivedMessage);
- assertEquals(sentMessage.getText(), receivedMessage.getText());
- producer.send(sentMessage);
- session.commit();
- receivedMessage = (TextMessage) consumer.receive(2000);
- assertNotNull(receivedMessage);
- assertEquals(sentMessage.getText(), receivedMessage.getText());
- session.commit();
- conn.close();
-
- }
-
- public void testTemporaryTopic() throws Exception
- {
- AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
- TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
- TemporaryTopic topic = session.createTemporaryTopic();
- assertNotNull(topic);
- TopicPublisher producer = session.createPublisher(topic);
- MessageConsumer consumer = session.createConsumer(topic);
- conn.start();
- producer.send(session.createTextMessage("hello"));
- session.commit();
- TextMessage tm = (TextMessage) consumer.receive(2000);
- assertNotNull(tm);
- assertEquals("hello", tm.getText());
- session.commit();
- try
- {
- topic.delete();
- fail("Expected JMSException : should not be able to delete while there are active consumers");
- }
- catch (JMSException je)
- {
- ; //pass
- }
-
- consumer.close();
-
- try
- {
- topic.delete();
- }
- catch (JMSException je)
- {
- fail("Unexpected Exception: " + je.getMessage());
- }
-
- TopicSession session2 = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- try
- {
- session2.createConsumer(topic);
- fail("Expected a JMSException when subscribing to a temporary topic created on adifferent session");
- }
- catch (JMSException je)
- {
- ; // pass
- }
-
-
- conn.close();
- }
-
-
public void testNoLocal() throws Exception
{
@@ -398,56 +311,39 @@ public class TopicSessionTest extends Qp
}
/**
- * This tests QPID-1191, where messages which are sent to a topic but are not consumed by a subscriber
- * due to a selector can be leaked.
- * @throws Exception
+ * This tests was added to demonstrate QPID-3542. The Java Client when used with the CPP Broker was failing to
+ * ack messages received that did not match the selector. This meant the messages remained indefinitely on the Broker.
*/
- public void testNonMatchingMessagesDoNotFillQueue() throws Exception
+ public void testNonMatchingMessagesHandledCorrectly() throws Exception
{
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
-
- // Setup Topic
- AMQTopic topic = new AMQTopic(con, "testNoLocal");
-
- TopicSession session = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
+ final String topicName = getName();
+ final String clientId = "clientId" + topicName;
+ final Connection con1 = getConnection();
+ final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Topic topic1 = session1.createTopic(topicName);
+ final AMQQueue internalNameOnBroker = new AMQQueue("amq.topic", "clientid" + ":" + clientId);
// Setup subscriber with selector
- TopicSubscriber selector = session.createSubscriber(topic, "Selector = 'select'", false);
- TopicPublisher publisher = session.createPublisher(topic);
+ final TopicSubscriber subscriberWithSelector = session1.createDurableSubscriber(topic1, clientId, "Selector = 'select'", false);
+ final MessageProducer publisher = session1.createProducer(topic1);
- con.start();
- TextMessage m;
- TextMessage message;
+ con1.start();
// Send non-matching message
- message = session.createTextMessage("non-matching 1");
- publisher.publish(message);
- session.commit();
-
- // Send and consume matching message
- message = session.createTextMessage("hello");
- message.setStringProperty("Selector", "select");
-
- publisher.publish(message);
- session.commit();
-
- m = (TextMessage) selector.receive(1000);
- assertNotNull("should have received message", m);
- assertEquals("Message contents were wrong", "hello", m.getText());
-
- // Send non-matching message
- message = session.createTextMessage("non-matching 2");
- publisher.publish(message);
- session.commit();
-
- // Assert queue count is 0
- long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic);
- assertEquals("Queue depth was wrong", 0, depth);
-
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TopicSessionTest.class);
+ final Message sentMessage = session1.createTextMessage("hello");
+ sentMessage.setStringProperty("Selector", "nonMatch");
+ publisher.send(sentMessage);
+
+ // Try to consume non-message, expect this to fail.
+ final Message message1 = subscriberWithSelector.receive(1000);
+ assertNull("should not have received message", message1);
+ subscriberWithSelector.close();
+
+ session1.close();
+
+ // Now verify queue depth on broker.
+ final Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final long depth = ((AMQSession) session2).getQueueDepth(internalNameOnBroker);
+ assertEquals("Expected queue depth of zero", 0, depth);
}
}
Propchange: qpid/branches/qpid-3346/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,5 +3,5 @@
/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java:805429-821809
/qpid/branches/qpid-2935/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java:1061302-1072333
-/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java:1144319-1179750
+/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java:1144319-1190375
/qpid/trunk/qpid/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java:796646-796653
Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -1,3 +1,3 @@
/qpid/branches/0.5.x-dev/qpid/java/test-profiles:931179
/qpid/branches/qpid-2935/qpid/java/test-profiles:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles:1144319-1190375
Modified: qpid/branches/qpid-3346/qpid/java/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/test-profiles/CPPExcludes?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/test-profiles/CPPExcludes (original)
+++ qpid/branches/qpid-3346/qpid/java/test-profiles/CPPExcludes Fri Oct 28 15:52:13 2011
@@ -64,9 +64,6 @@ org.apache.qpid.test.unit.client.connect
// 0-10 c++ broker in cpp.testprofile is started with no auth so won't pass this test
org.apache.qpid.test.unit.client.connection.ConnectionTest#testPasswordFailureConnection
-// c++ broker doesn't do selectors, so this will fail
-org.apache.qpid.test.unit.topic.TopicSessionTest#testNonMatchingMessagesDoNotFillQueue
-
// InVM Broker tests
org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#*
Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/CPPExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/CPPExcludes:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/CPPExcludes:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/CPPExcludes:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/Excludes:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/Excludes:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/Excludes:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/Excludes:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/JavaBDBExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,3 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/08TransientExcludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/08TransientExcludes:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/JavaTransientExcludes:1061302-1072333
+/qpid/trunk/qpid/java/test-profiles/JavaBDBExcludes:1175236-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/JavaExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/08Excludes:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/JavaExcludes:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/JavaExcludes:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/JavaExcludes:1144319-1190375
Modified: qpid/branches/qpid-3346/qpid/java/test-profiles/JavaPre010Excludes
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/test-profiles/JavaPre010Excludes?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/test-profiles/JavaPre010Excludes (original)
+++ qpid/branches/qpid-3346/qpid/java/test-profiles/JavaPre010Excludes Fri Oct 28 15:52:13 2011
@@ -28,6 +28,7 @@ org.apache.qpid.test.client.message.JMSD
org.apache.qpid.test.client.message.JMSDestinationTest#testGetDestinationWithCustomExchange
// The new addressing based syntax is not supported for AMQP 0-8/0-9 versions
+org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#*
org.apache.qpid.test.client.destination.AddressBasedDestinationTest#*
org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy
org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy
@@ -43,20 +44,14 @@ org.apache.qpid.test.unit.client.connect
// uses AMQP 0-10 related properties
org.apache.qpid.test.unit.message.JMSPropertiesTest#testQpidExtensionProperties
-// QPID-3034: tests are passing on 0.10 profiles but failing on 0.9.1 profiles
-org.apache.qpid.test.unit.ack.RecoverTest#testRecoverResendsMsgs
-org.apache.qpid.test.unit.ack.RecoverTest#testRecoverResendsMsgsAckOnEarlier
-org.apache.qpid.test.unit.ack.RecoverTest#testAcknowledgePerConsumer
-org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testRecoverResendsMsgs
-org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testRecoverResendsMsgsAckOnEarlier
-org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testAcknowledgePerConsumer
-org.apache.qpid.test.unit.ack.RecoverTest#testOderingWithSyncConsumer
-org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testOderingWithSyncConsumer
-
-
// LVQ tests use new address syntax and can not be run on 0.9.1 profiles
org.apache.qpid.test.client.queue.LVQTest#*
// Verification of unique client id is 0-10 specific
org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificationForSameUser
org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificationForDifferentUsers
+
+// Under AMQP 0-8..0-9-1 temporary queues are deleted on consumer close, rather than connection close
+// and for this reason this test would fail.
+org.apache.qpid.test.unit.client.temporaryqueue.TemporaryQueueTest#testTemporaryQueueReused
+
Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/JavaPre010Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/08StandaloneExcludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/08StandaloneExcludes:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/JavaTransientExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/08TransientExcludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/08TransientExcludes:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/JavaTransientExcludes:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/XAExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/XAExcludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/XAExcludes:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/XAExcludes:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/XAExcludes:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/XAExcludes:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/cpp.async.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.async.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.async.testprofile:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.async.testprofile:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/cpp.cluster.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.cluster.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.cluster.testprofile:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.cluster.testprofile:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.cluster.testprofile:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/cpp.cluster.testprofile:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/cpp.noprefetch.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.noprefetch.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.noprefetch.testprofile:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.noprefetch.testprofile:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/cpp.ssl.excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.ssl.excludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.ssl.excludes:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.ssl.excludes:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/cpp.ssl.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.ssl.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.ssl.testprofile:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.ssl.testprofile:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.ssl.testprofile:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/cpp.ssl.testprofile:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/test-profiles/cpp.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.testprofile:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.testprofile:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:1144319-1179750
+/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:1144319-1190375
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org