You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by dj...@apache.org on 2008/12/29 08:56:09 UTC

svn commit: r729835 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: thread/Valve.java transport/vm/VMTransport.java

Author: djencks
Date: Sun Dec 28 23:56:09 2008
New Revision: 729835

URL: http://svn.apache.org/viewvc?rev=729835&view=rev
Log:
AMQ-2045 Add consistency checks to Valve and balance usage in VMTransport

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java?rev=729835&r1=729834&r2=729835&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java Sun Dec 28 23:56:09 2008
@@ -36,7 +36,7 @@
     /**
      * Turns the valve on. This method blocks until the valve is off.
      * 
-     * @throws InterruptedException
+     * @throws InterruptedException if wait is interrupted
      */
     public void turnOn() throws InterruptedException {
         synchronized (mutex) {
@@ -58,10 +58,13 @@
      * Turns the valve off. This method blocks until the valve is on and the
      * valve is not in use.
      * 
-     * @throws InterruptedException
+     * @throws InterruptedException if wait is interrupted
      */
     public void turnOff() throws InterruptedException {
         synchronized (mutex) {
+            if (turningOff < 0) {
+                throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
+            }
             try {
                 ++turningOff;
                 while (usage > 0 || !on) {
@@ -79,10 +82,16 @@
      * Increments the use counter of the valve. This method blocks if the valve
      * is off, or is being turned off.
      * 
-     * @throws InterruptedException
+     * @throws InterruptedException  if wait is interrupted
      */
     public void increment() throws InterruptedException {
         synchronized (mutex) {
+            if (turningOff < 0) {
+                throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
+            }
+            if (usage < 0) {
+                throw new IllegalStateException("Unbalanced usage: " + usage);
+            }
             // Do we have to wait for the value to be on?
             while (turningOff > 0 || !on) {
                 mutex.wait();
@@ -97,6 +106,12 @@
     public void decrement() {
         synchronized (mutex) {
             usage--;
+            if (turningOff < 0) {
+                throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
+            }
+            if (usage < 0) {
+                throw new IllegalStateException("Unbalanced usage: " + usage);
+            }
             if (turningOff > 0 && usage < 1) {
                 mutex.notifyAll();
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=729835&r1=729834&r2=729835&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Sun Dec 28 23:56:09 2008
@@ -94,7 +94,6 @@
                 } else {
                     transportListener = peer.transportListener;
                 }
-                enqueueValve.decrement();
             } else {
                 peer.getMessageQueue().put(command);
             }