You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by dj...@apache.org on 2008/12/29 08:56:09 UTC
svn commit: r729835 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
thread/Valve.java transport/vm/VMTransport.java
Author: djencks
Date: Sun Dec 28 23:56:09 2008
New Revision: 729835
URL: http://svn.apache.org/viewvc?rev=729835&view=rev
Log:
AMQ-2045 Add consistency checks to Valve and balance usage in VMTransport
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java?rev=729835&r1=729834&r2=729835&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java Sun Dec 28 23:56:09 2008
@@ -36,7 +36,7 @@
/**
* Turns the valve on. This method blocks until the valve is off.
*
- * @throws InterruptedException
+ * @throws InterruptedException if wait is interrupted
*/
public void turnOn() throws InterruptedException {
synchronized (mutex) {
@@ -58,10 +58,13 @@
* Turns the valve off. This method blocks until the valve is on and the
* valve is not in use.
*
- * @throws InterruptedException
+ * @throws InterruptedException if wait is interrupted
*/
public void turnOff() throws InterruptedException {
synchronized (mutex) {
+ if (turningOff < 0) {
+ throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
+ }
try {
++turningOff;
while (usage > 0 || !on) {
@@ -79,10 +82,16 @@
* Increments the use counter of the valve. This method blocks if the valve
* is off, or is being turned off.
*
- * @throws InterruptedException
+ * @throws InterruptedException if wait is interrupted
*/
public void increment() throws InterruptedException {
synchronized (mutex) {
+ if (turningOff < 0) {
+ throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
+ }
+ if (usage < 0) {
+ throw new IllegalStateException("Unbalanced usage: " + usage);
+ }
// Do we have to wait for the value to be on?
while (turningOff > 0 || !on) {
mutex.wait();
@@ -97,6 +106,12 @@
public void decrement() {
synchronized (mutex) {
usage--;
+ if (turningOff < 0) {
+ throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
+ }
+ if (usage < 0) {
+ throw new IllegalStateException("Unbalanced usage: " + usage);
+ }
if (turningOff > 0 && usage < 1) {
mutex.notifyAll();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=729835&r1=729834&r2=729835&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Sun Dec 28 23:56:09 2008
@@ -94,7 +94,6 @@
} else {
transportListener = peer.transportListener;
}
- enqueueValve.decrement();
} else {
peer.getMessageQueue().put(command);
}