You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/11/30 14:54:12 UTC

svn commit: r599822 - in /mina/branches: 1.0/core/src/main/java/org/apache/mina/common/support/ 1.1/core/src/main/java/org/apache/mina/common/ 1.1/core/src/main/java/org/apache/mina/common/support/

Author: trustin
Date: Fri Nov 30 05:54:07 2007
New Revision: 599822

URL: http://svn.apache.org/viewvc?rev=599822&view=rev
Log:
Fixed issue: DIRMINA-486 (Deadlock in SocketConnectorIoProcessor & AnonymousIoService)


Modified:
    mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/common/IoFilter.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/common/IoFuture.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultCloseFuture.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultConnectFuture.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultWriteFuture.java

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java?rev=599822&r1=599821&r2=599822&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java Fri Nov 30 05:54:07 2007
@@ -35,21 +35,18 @@
  * @version $Rev$, $Date$
  */
 public class DefaultIoFuture implements IoFuture {
-    private final IoSession session;
 
+    private final IoSession session;
     private final Object lock;
-
     private IoFutureListener firstListener;
-
     private List otherListeners;
-
     private Object result;
-
     private boolean ready;
+    private int waiters;
 
     /**
      * Creates a new instance.
-     * 
+     *
      * @param session an {@link IoSession} which is associated with this future
      */
     public DefaultIoFuture(IoSession session) {
@@ -58,12 +55,11 @@
     }
 
     /**
-     * Creates a new instance which uses the specified object as a lock.
+     * Creates a new instance.
+     *
+     * @param session an {@link IoSession} which is associated with this future
      */
     public DefaultIoFuture(IoSession session, Object lock) {
-        if (lock == null) {
-            throw new NullPointerException("lock");
-        }
         this.session = session;
         this.lock = lock;
     }
@@ -71,26 +67,46 @@
     public IoSession getSession() {
         return session;
     }
-
+    
     public Object getLock() {
         return lock;
     }
 
     public void join() {
+        awaitUninterruptibly();
+    }
+
+    public boolean join(long timeoutMillis) {
+        return awaitUninterruptibly(timeoutMillis);
+    }
+
+    private IoFuture awaitUninterruptibly() {
         synchronized (lock) {
             while (!ready) {
+                waiters++;
                 try {
                     lock.wait();
                 } catch (InterruptedException e) {
+                } finally {
+                    waiters--;
                 }
             }
         }
+
+        return this;
+    }
+
+    private boolean awaitUninterruptibly(long timeoutMillis) {
+        try {
+            return await0(timeoutMillis, false);
+        } catch (InterruptedException e) {
+            throw new InternalError();
+        }
     }
 
-    public boolean join(long timeoutInMillis) {
-        long startTime = (timeoutInMillis <= 0) ? 0 : System
-                .currentTimeMillis();
-        long waitTime = timeoutInMillis;
+    private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
+        long startTime = timeoutMillis <= 0 ? 0 : System.currentTimeMillis();
+        long waitTime = timeoutMillis;
 
         synchronized (lock) {
             if (ready) {
@@ -99,21 +115,29 @@
                 return ready;
             }
 
-            for (;;) {
-                try {
-                    lock.wait(waitTime);
-                } catch (InterruptedException e) {
-                }
+            waiters++;
+            try {
+                for (;;) {
+                    try {
+                        lock.wait(waitTime);
+                    } catch (InterruptedException e) {
+                        if (interruptable) {
+                            throw e;
+                        }
+                    }
 
-                if (ready)
-                    return true;
-                else {
-                    waitTime = timeoutInMillis
-                            - (System.currentTimeMillis() - startTime);
-                    if (waitTime <= 0) {
-                        return ready;
+                    if (ready) {
+                        return true;
+                    } else {
+                        waitTime = timeoutMillis
+                                - (System.currentTimeMillis() - startTime);
+                        if (waitTime <= 0) {
+                            return ready;
+                        }
                     }
                 }
+            } finally {
+                waiters--;
             }
         }
     }
@@ -136,10 +160,12 @@
 
             result = newValue;
             ready = true;
-            lock.notifyAll();
-
-            notifyListeners();
+            if (waiters > 0) {
+                lock.notifyAll();
+            }
         }
+
+        notifyListeners();
     }
 
     /**
@@ -156,8 +182,11 @@
             throw new NullPointerException("listener");
         }
 
+        boolean notifyNow = false;
         synchronized (lock) {
-            if (!ready) {
+            if (ready) {
+                notifyNow = true;
+            } else {
                 if (firstListener == null) {
                     firstListener = listener;
                 } else {
@@ -166,10 +195,12 @@
                     }
                     otherListeners.add(listener);
                 }
-            } else {
-                notifyListener(listener);
             }
         }
+
+        if (notifyNow) {
+            notifyListener(listener);
+        }
     }
 
     public void removeListener(IoFutureListener listener) {
@@ -178,27 +209,33 @@
         }
 
         synchronized (lock) {
-            if (listener == firstListener) {
-                if (otherListeners != null && !otherListeners.isEmpty()) {
-                    firstListener = (IoFutureListener) otherListeners.remove(0);
-                } else {
-                    firstListener = null;
+            if (!ready) {
+                if (listener == firstListener) {
+                    if (otherListeners != null && !otherListeners.isEmpty()) {
+                        firstListener = (IoFutureListener) otherListeners.remove(0);
+                    } else {
+                        firstListener = null;
+                    }
+                } else if (otherListeners != null) {
+                    otherListeners.remove(listener);
                 }
-            } else if (otherListeners != null) {
-                otherListeners.remove(listener);
             }
         }
     }
 
     private void notifyListeners() {
-        synchronized (lock) {
-            if (firstListener != null) {
-                notifyListener(firstListener);
-                if (otherListeners != null) {
-                    for (Iterator i = otherListeners.iterator(); i.hasNext();) {
-                        notifyListener((IoFutureListener) i.next());
-                    }
+        // There won't be any visibility problem or concurrent modification
+        // because 'ready' flag will be checked against both addListener and
+        // removeListener calls.
+        if (firstListener != null) {
+            notifyListener(firstListener);
+            firstListener = null;
+
+            if (otherListeners != null) {
+                for (Iterator i = otherListeners.iterator(); i.hasNext(); ) {
+                    notifyListener((IoFutureListener) i.next());
                 }
+                otherListeners = null;
             }
         }
     }
@@ -210,4 +247,4 @@
             ExceptionMonitor.getInstance().exceptionCaught(t);
         }
     }
-}
+}
\ No newline at end of file

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/common/IoFilter.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/common/IoFilter.java?rev=599822&r1=599821&r2=599822&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/common/IoFilter.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/common/IoFilter.java Fri Nov 30 05:54:07 2007
@@ -291,6 +291,10 @@
                 throw new IllegalStateException(
                         "You can't add a listener to a dummy future.");
             }
+
+            public Object getLock() {
+                return this;
+            }
         };
 
         private final Object message;
@@ -361,6 +365,7 @@
             return destination;
         }
 
+        @Override
         public String toString() {
             return message.toString();
         }

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/common/IoFuture.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/common/IoFuture.java?rev=599822&r1=599821&r2=599822&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/common/IoFuture.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/common/IoFuture.java Fri Nov 30 05:54:07 2007
@@ -32,6 +32,11 @@
     IoSession getSession();
 
     /**
+     * Returns the lock object this future acquires.
+     */
+    Object getLock();
+
+    /**
      * Wait for the asynchronous operation to end.
      */
     void join();

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultCloseFuture.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultCloseFuture.java?rev=599822&r1=599821&r2=599822&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultCloseFuture.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultCloseFuture.java Fri Nov 30 05:54:07 2007
@@ -36,6 +36,13 @@
         super(session);
     }
 
+    /**
+     * Creates a new instance which uses the specified object as a lock.
+     */
+    public DefaultCloseFuture(IoSession session, Object lock) {
+        super(session, lock);
+    }
+
     public boolean isClosed() {
         if (isReady()) {
             return (Boolean) getValue();

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultConnectFuture.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultConnectFuture.java?rev=599822&r1=599821&r2=599822&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultConnectFuture.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultConnectFuture.java Fri Nov 30 05:54:07 2007
@@ -47,6 +47,13 @@
         super(null);
     }
 
+    /**
+     * Creates a new instance which uses the specified object as a lock.
+     */
+    public DefaultConnectFuture(Object lock) {
+        super(null, lock);
+    }
+
     @Override
     public IoSession getSession() throws RuntimeIOException {
         Object v = getValue();

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java?rev=599822&r1=599821&r2=599822&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java Fri Nov 30 05:54:07 2007
@@ -34,21 +34,18 @@
  * @version $Rev$, $Date$
  */
 public class DefaultIoFuture implements IoFuture {
-    private final IoSession session;
 
+    private final IoSession session;
     private final Object lock;
-
     private IoFutureListener firstListener;
-
     private List<IoFutureListener> otherListeners;
-
     private Object result;
-
     private boolean ready;
+    private int waiters;
 
     /**
      * Creates a new instance.
-     * 
+     *
      * @param session an {@link IoSession} which is associated with this future
      */
     public DefaultIoFuture(IoSession session) {
@@ -56,23 +53,58 @@
         this.lock = this;
     }
 
+    /**
+     * Creates a new instance.
+     *
+     * @param session an {@link IoSession} which is associated with this future
+     */
+    public DefaultIoFuture(IoSession session, Object lock) {
+        this.session = session;
+        this.lock = lock;
+    }
+
     public IoSession getSession() {
         return session;
     }
 
+    public Object getLock() {
+        return lock;
+    }
+
     public void join() {
+        awaitUninterruptibly();
+    }
+
+    public boolean join(long timeoutMillis) {
+        return awaitUninterruptibly(timeoutMillis);
+    }
+
+    private IoFuture awaitUninterruptibly() {
         synchronized (lock) {
             while (!ready) {
+                waiters++;
                 try {
                     lock.wait();
                 } catch (InterruptedException e) {
+                } finally {
+                    waiters--;
                 }
             }
         }
+
+        return this;
     }
 
-    public boolean join(long timeoutMillis) {
-        long startTime = (timeoutMillis <= 0) ? 0 : System.currentTimeMillis();
+    private boolean awaitUninterruptibly(long timeoutMillis) {
+        try {
+            return await0(timeoutMillis, false);
+        } catch (InterruptedException e) {
+            throw new InternalError();
+        }
+    }
+
+    private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
+        long startTime = timeoutMillis <= 0 ? 0 : System.currentTimeMillis();
         long waitTime = timeoutMillis;
 
         synchronized (lock) {
@@ -82,21 +114,29 @@
                 return ready;
             }
 
-            for (;;) {
-                try {
-                    lock.wait(waitTime);
-                } catch (InterruptedException e) {
-                }
+            waiters++;
+            try {
+                for (;;) {
+                    try {
+                        lock.wait(waitTime);
+                    } catch (InterruptedException e) {
+                        if (interruptable) {
+                            throw e;
+                        }
+                    }
 
-                if (ready) {
-                    return true;
-                } else {
-                    waitTime = timeoutMillis
-                            - (System.currentTimeMillis() - startTime);
-                    if (waitTime <= 0) {
-                        return ready;
+                    if (ready) {
+                        return true;
+                    } else {
+                        waitTime = timeoutMillis
+                                - (System.currentTimeMillis() - startTime);
+                        if (waitTime <= 0) {
+                            return ready;
+                        }
                     }
                 }
+            } finally {
+                waiters--;
             }
         }
     }
@@ -119,10 +159,12 @@
 
             result = newValue;
             ready = true;
-            lock.notifyAll();
-
-            notifyListeners();
+            if (waiters > 0) {
+                lock.notifyAll();
+            }
         }
+
+        notifyListeners();
     }
 
     /**
@@ -139,8 +181,11 @@
             throw new NullPointerException("listener");
         }
 
+        boolean notifyNow = false;
         synchronized (lock) {
-            if (!ready) {
+            if (ready) {
+                notifyNow = true;
+            } else {
                 if (firstListener == null) {
                     firstListener = listener;
                 } else {
@@ -149,10 +194,12 @@
                     }
                     otherListeners.add(listener);
                 }
-            } else {
-                notifyListener(listener);
             }
         }
+
+        if (notifyNow) {
+            notifyListener(listener);
+        }
     }
 
     public void removeListener(IoFutureListener listener) {
@@ -161,31 +208,38 @@
         }
 
         synchronized (lock) {
-            if (listener == firstListener) {
-                if (otherListeners != null && !otherListeners.isEmpty()) {
-                    firstListener = otherListeners.remove(0);
-                } else {
-                    firstListener = null;
+            if (!ready) {
+                if (listener == firstListener) {
+                    if (otherListeners != null && !otherListeners.isEmpty()) {
+                        firstListener = otherListeners.remove(0);
+                    } else {
+                        firstListener = null;
+                    }
+                } else if (otherListeners != null) {
+                    otherListeners.remove(listener);
                 }
-            } else if (otherListeners != null) {
-                otherListeners.remove(listener);
             }
         }
     }
 
     private void notifyListeners() {
-        synchronized (lock) {
-            if (firstListener != null) {
-                notifyListener(firstListener);
-                if (otherListeners != null) {
-                    for (IoFutureListener l : otherListeners) {
-                        notifyListener(l);
-                    }
+        // There won't be any visibility problem or concurrent modification
+        // because 'ready' flag will be checked against both addListener and
+        // removeListener calls.
+        if (firstListener != null) {
+            notifyListener(firstListener);
+            firstListener = null;
+
+            if (otherListeners != null) {
+                for (IoFutureListener l : otherListeners) {
+                    notifyListener(l);
                 }
+                otherListeners = null;
             }
         }
     }
-    
+
+    @SuppressWarnings("unchecked")
     private void notifyListener(IoFutureListener l) {
         try {
             l.operationComplete(this);
@@ -193,4 +247,4 @@
             ExceptionMonitor.getInstance().exceptionCaught(t);
         }
     }
-}
+}
\ No newline at end of file

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultWriteFuture.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultWriteFuture.java?rev=599822&r1=599821&r2=599822&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultWriteFuture.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/DefaultWriteFuture.java Fri Nov 30 05:54:07 2007
@@ -54,6 +54,13 @@
         super(session);
     }
 
+    /**
+     * Creates a new instance which uses the specified object as a lock.
+     */
+    public DefaultWriteFuture(IoSession session, Object lock) {
+        super(session, lock);
+    }
+
     public boolean isWritten() {
         if (isReady()) {
             return (Boolean) getValue();