You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by pr...@apache.org on 2006/07/16 21:23:30 UTC

svn commit: r422506 - /directory/trunks/mina/core/src/main/java/org/apache/mina/handler/support/IoSessionInputStream.java

Author: proyal
Date: Sun Jul 16 12:23:30 2006
New Revision: 422506

URL: http://svn.apache.org/viewvc?rev=422506&view=rev
Log:
DIRMINA-226 - flip synchronization to not depend upon 'this', rather use a separate mutex object. Also modify the behavior of read() to always return -1 when there is no data available

Modified:
    directory/trunks/mina/core/src/main/java/org/apache/mina/handler/support/IoSessionInputStream.java

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/handler/support/IoSessionInputStream.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/handler/support/IoSessionInputStream.java?rev=422506&r1=422505&r2=422506&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/handler/support/IoSessionInputStream.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/handler/support/IoSessionInputStream.java Sun Jul 16 12:23:30 2006
@@ -31,16 +31,16 @@
  *
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
  * @version $Rev$, $Date$
- *
  */
 public class IoSessionInputStream extends InputStream
 {
+    private final Object mutex = new Object();
     private final ByteBuffer buf;
-    private boolean closed;
-    private boolean released;
+
+    private volatile boolean closed;
+    private volatile boolean released;
     private IOException exception;
-    private int waiters;
-    
+
     public IoSessionInputStream()
     {
         buf = ByteBuffer.allocate( 16 );
@@ -48,7 +48,7 @@
         buf.limit( 0 );
     }
 
-    public synchronized int available()
+    public int available()
     {
         if( released )
         {
@@ -56,106 +56,108 @@
         }
         else
         {
-            return buf.remaining();
+            synchronized( mutex )
+            {
+                return buf.remaining();
+            }
         }
     }
 
-    public synchronized void close()
+    public void close()
     {
         if( closed )
         {
             return;
         }
 
-        closed = true;
-        releaseBuffer();
-        
-        if( waiters != 0 )
+        synchronized( mutex )
         {
-            this.notifyAll();
+            closed = true;
+            releaseBuffer();
+
+            mutex.notifyAll();
         }
     }
 
-    public void mark( int readlimit )
+    public int read() throws IOException
     {
-    }
+        synchronized( mutex )
+        {
+            if( !waitForData() )
+            {
+                return -1;
+            }
 
-    public boolean markSupported()
-    {
-        return false;
+            return buf.get() & 0xff;
+        }
     }
 
-    public synchronized int read() throws IOException
+    public int read( byte[] b, int off, int len ) throws IOException
     {
-        waitForData();
-        if( released )
+        synchronized( mutex )
         {
-            return -1;
-        }
+            if( !waitForData() )
+            {
+                return -1;
+            }
 
-        int ret = buf.get() & 0xff;
-        return ret;
-    }
+            int readBytes;
 
-    public synchronized int read( byte[] b, int off, int len ) throws IOException
-    {
-        waitForData();
-        if( released )
-        {
-            return -1;
-        }
+            if( len > buf.remaining() )
+            {
+                readBytes = buf.remaining();
+            }
+            else
+            {
+                readBytes = len;
+            }
 
-        int readBytes;
-        if( len > buf.remaining() )
-        {
-            readBytes = buf.remaining();
-        }
-        else
-        {
-            readBytes = len;
-        }
-        buf.get( b, off, readBytes );
-        
-        return readBytes;
-    }
+            buf.get( b, off, readBytes );
 
-    public synchronized void reset() throws IOException
-    {
-        throw new IOException( "Mark is not supported." );
+            return readBytes;
+        }
     }
 
-    private void waitForData() throws IOException
+    private boolean waitForData() throws IOException
     {
         if( released )
         {
-            throw new IOException( "Stream is closed." );
+            return false;
         }
 
-        waiters ++;
-        while( !released && buf.remaining() == 0 && exception == null )
+        synchronized( mutex )
         {
-            try
-            {
-                this.wait();
-            }
-            catch( InterruptedException e )
+            while( !released && buf.remaining() == 0 && exception == null )
             {
+                try
+                {
+                    mutex.wait();
+                }
+                catch( InterruptedException e )
+                {
+                    IOException ioe = new IOException( "Interrupted while waiting for more data" );
+                    ioe.initCause( e );
+                    throw ioe;
+                }
             }
         }
-        waiters --;
-        
+
         if( exception != null )
         {
             releaseBuffer();
             throw exception;
         }
-        
+
         if( closed && buf.remaining() == 0 )
         {
             releaseBuffer();
+
+            return false;
         }
+
+        return true;
     }
-    
+
     private void releaseBuffer()
     {
         if( released )
@@ -167,37 +169,40 @@
         buf.release();
     }
 
-    public synchronized void write( ByteBuffer src )
+    public void write( ByteBuffer src )
     {
-        if( closed )
-        {
-            return;
-        }
-        
-        if( buf.hasRemaining() )
+        synchronized( mutex )
         {
-            this.buf.compact();
-            this.buf.put( src );
-            this.buf.flip();
-        }
-        else
-        {
-            this.buf.clear();
-            this.buf.put( src );
-            this.buf.flip();
-            this.notify();
+            if( closed )
+            {
+                return;
+            }
+
+            if( buf.hasRemaining() )
+            {
+                this.buf.compact();
+                this.buf.put( src );
+                this.buf.flip();
+            }
+            else
+            {
+                this.buf.clear();
+                this.buf.put( src );
+                this.buf.flip();
+                mutex.notifyAll();
+            }
         }
     }
-    
-    public synchronized void throwException( IOException e )
+
+    public void throwException( IOException e )
     {
-        if( exception == null )
+        synchronized( mutex )
         {
-            exception = e;
-            
-            if( waiters != 0 )
+            if( exception == null )
             {
-                this.notifyAll();
+                exception = e;
+
+                mutex.notifyAll();
             }
         }
     }