You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by pr...@apache.org on 2006/07/16 21:23:30 UTC
svn commit: r422506 -
/directory/trunks/mina/core/src/main/java/org/apache/mina/handler/support/IoSessionInputStream.java
Author: proyal
Date: Sun Jul 16 12:23:30 2006
New Revision: 422506
URL: http://svn.apache.org/viewvc?rev=422506&view=rev
Log:
DIRMINA-226 - flip synchronization to not depend upon 'this', rather use a separate mutex object. Also modify the behavior of read() to always return -1 when there is no data available
Modified:
directory/trunks/mina/core/src/main/java/org/apache/mina/handler/support/IoSessionInputStream.java
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/handler/support/IoSessionInputStream.java
URL: http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/handler/support/IoSessionInputStream.java?rev=422506&r1=422505&r2=422506&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/handler/support/IoSessionInputStream.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/handler/support/IoSessionInputStream.java Sun Jul 16 12:23:30 2006
@@ -31,16 +31,16 @@
*
* @author The Apache Directory Project (mina-dev@directory.apache.org)
* @version $Rev$, $Date$
- *
*/
public class IoSessionInputStream extends InputStream
{
+ private final Object mutex = new Object();
private final ByteBuffer buf;
- private boolean closed;
- private boolean released;
+
+ private volatile boolean closed;
+ private volatile boolean released;
private IOException exception;
- private int waiters;
-
+
public IoSessionInputStream()
{
buf = ByteBuffer.allocate( 16 );
@@ -48,7 +48,7 @@
buf.limit( 0 );
}
- public synchronized int available()
+ public int available()
{
if( released )
{
@@ -56,106 +56,108 @@
}
else
{
- return buf.remaining();
+ synchronized( mutex )
+ {
+ return buf.remaining();
+ }
}
}
- public synchronized void close()
+ public void close()
{
if( closed )
{
return;
}
- closed = true;
- releaseBuffer();
-
- if( waiters != 0 )
+ synchronized( mutex )
{
- this.notifyAll();
+ closed = true;
+ releaseBuffer();
+
+ mutex.notifyAll();
}
}
- public void mark( int readlimit )
+ public int read() throws IOException
{
- }
+ synchronized( mutex )
+ {
+ if( !waitForData() )
+ {
+ return -1;
+ }
- public boolean markSupported()
- {
- return false;
+ return buf.get() & 0xff;
+ }
}
- public synchronized int read() throws IOException
+ public int read( byte[] b, int off, int len ) throws IOException
{
- waitForData();
- if( released )
+ synchronized( mutex )
{
- return -1;
- }
+ if( !waitForData() )
+ {
+ return -1;
+ }
- int ret = buf.get() & 0xff;
- return ret;
- }
+ int readBytes;
- public synchronized int read( byte[] b, int off, int len ) throws IOException
- {
- waitForData();
- if( released )
- {
- return -1;
- }
+ if( len > buf.remaining() )
+ {
+ readBytes = buf.remaining();
+ }
+ else
+ {
+ readBytes = len;
+ }
- int readBytes;
- if( len > buf.remaining() )
- {
- readBytes = buf.remaining();
- }
- else
- {
- readBytes = len;
- }
- buf.get( b, off, readBytes );
-
- return readBytes;
- }
+ buf.get( b, off, readBytes );
- public synchronized void reset() throws IOException
- {
- throw new IOException( "Mark is not supported." );
+ return readBytes;
+ }
}
- private void waitForData() throws IOException
+ private boolean waitForData() throws IOException
{
if( released )
{
- throw new IOException( "Stream is closed." );
+ return false;
}
- waiters ++;
- while( !released && buf.remaining() == 0 && exception == null )
+ synchronized( mutex )
{
- try
- {
- this.wait();
- }
- catch( InterruptedException e )
+ while( !released && buf.remaining() == 0 && exception == null )
{
+ try
+ {
+ mutex.wait();
+ }
+ catch( InterruptedException e )
+ {
+ IOException ioe = new IOException( "Interrupted while waiting for more data" );
+ ioe.initCause( e );
+ throw ioe;
+ }
}
}
- waiters --;
-
+
if( exception != null )
{
releaseBuffer();
throw exception;
}
-
+
if( closed && buf.remaining() == 0 )
{
releaseBuffer();
+
+ return false;
}
+
+ return true;
}
-
+
private void releaseBuffer()
{
if( released )
@@ -167,37 +169,40 @@
buf.release();
}
- public synchronized void write( ByteBuffer src )
+ public void write( ByteBuffer src )
{
- if( closed )
- {
- return;
- }
-
- if( buf.hasRemaining() )
+ synchronized( mutex )
{
- this.buf.compact();
- this.buf.put( src );
- this.buf.flip();
- }
- else
- {
- this.buf.clear();
- this.buf.put( src );
- this.buf.flip();
- this.notify();
+ if( closed )
+ {
+ return;
+ }
+
+ if( buf.hasRemaining() )
+ {
+ this.buf.compact();
+ this.buf.put( src );
+ this.buf.flip();
+ }
+ else
+ {
+ this.buf.clear();
+ this.buf.put( src );
+ this.buf.flip();
+ mutex.notifyAll();
+ }
}
}
-
- public synchronized void throwException( IOException e )
+
+ public void throwException( IOException e )
{
- if( exception == null )
+ synchronized( mutex )
{
- exception = e;
-
- if( waiters != 0 )
+ if( exception == null )
{
- this.notifyAll();
+ exception = e;
+
+ mutex.notifyAll();
}
}
}