You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/11/28 09:30:17 UTC
svn commit: r349399 - in /directory/network:
branches/0.8/src/examples/org/apache/mina/examples/httpserver/
branches/0.8/src/java/org/apache/mina/io/handler/
trunk/src/java/org/apache/mina/handler/
trunk/src/java/org/apache/mina/handler/support/
Author: trustin
Date: Mon Nov 28 00:28:41 2005
New Revision: 349399
URL: http://svn.apache.org/viewcvs?rev=349399&view=rev
Log:
Resolved issue: DIRMINA-129 "Pipe broken" IOExceptions countered on non-active stream connections
* Replaced PipeI/OStream with a brand new IoSessionInputStream
* Pulled ServiceOutputStream out to a new IoSessionOutputStream
* Changed the default settings og HTTP server example in branches/0.8
Added:
directory/network/branches/0.8/src/java/org/apache/mina/io/handler/IoSessionInputStream.java
directory/network/branches/0.8/src/java/org/apache/mina/io/handler/IoSessionOutputStream.java
directory/network/trunk/src/java/org/apache/mina/handler/support/
directory/network/trunk/src/java/org/apache/mina/handler/support/IoSessionInputStream.java (with props)
directory/network/trunk/src/java/org/apache/mina/handler/support/IoSessionOutputStream.java (with props)
Removed:
directory/network/trunk/src/java/org/apache/mina/handler/StreamIoHandler.java
Modified:
directory/network/branches/0.8/src/examples/org/apache/mina/examples/httpserver/Main.java
directory/network/branches/0.8/src/java/org/apache/mina/io/handler/StreamIoHandler.java
Modified: directory/network/branches/0.8/src/examples/org/apache/mina/examples/httpserver/Main.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/0.8/src/examples/org/apache/mina/examples/httpserver/Main.java?rev=349399&r1=349398&r2=349399&view=diff
==============================================================================
--- directory/network/branches/0.8/src/examples/org/apache/mina/examples/httpserver/Main.java (original)
+++ directory/network/branches/0.8/src/examples/org/apache/mina/examples/httpserver/Main.java Mon Nov 28 00:28:41 2005
@@ -35,9 +35,9 @@
public class Main
{
/** Choose your favorite port number. */
- private static final int PORT = 8081;
+ private static final int PORT = 8080;
- private static final boolean USE_SSL = true;
+ private static final boolean USE_SSL = false;
public static void main( String[] args ) throws Exception
{
Added: directory/network/branches/0.8/src/java/org/apache/mina/io/handler/IoSessionInputStream.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/0.8/src/java/org/apache/mina/io/handler/IoSessionInputStream.java?rev=349399&view=auto
==============================================================================
--- directory/network/branches/0.8/src/java/org/apache/mina/io/handler/IoSessionInputStream.java (added)
+++ directory/network/branches/0.8/src/java/org/apache/mina/io/handler/IoSessionInputStream.java Mon Nov 28 00:28:41 2005
@@ -0,0 +1,204 @@
+/*
+ * @(#) $Id: AbstractIoFilterChain.java 330415 2005-11-03 02:19:03Z trustin $
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.io.handler;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.io.IoHandler;
+import org.apache.mina.io.IoSession;
+
+/**
+ * An {@link InputStream} that buffers data read from
+ * {@link IoHandler#dataRead(IoSession, ByteBuffer)} events.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ *
+ */
+class IoSessionInputStream extends InputStream
+{
+ private final ByteBuffer buf;
+ private boolean closed;
+ private boolean released;
+ private IOException exception;
+ private int waiters;
+
+ IoSessionInputStream()
+ {
+ buf = ByteBuffer.allocate( 16 );
+ buf.setAutoExpand( true );
+ buf.limit( 0 );
+ }
+
+ public synchronized int available()
+ {
+ if( released )
+ {
+ return 0;
+ }
+ else
+ {
+ return buf.remaining();
+ }
+ }
+
+ public synchronized void close()
+ {
+ if( closed )
+ {
+ return;
+ }
+
+ closed = true;
+ releaseBuffer();
+
+ if( waiters != 0 )
+ {
+ this.notifyAll();
+ }
+ }
+
+ public void mark( int readlimit )
+ {
+ }
+
+ public boolean markSupported()
+ {
+ return false;
+ }
+
+ public synchronized int read() throws IOException
+ {
+ waitForData();
+ if( released )
+ {
+ return -1;
+ }
+
+ int ret = buf.get() & 0xff;
+ return ret;
+ }
+
+ public synchronized int read( byte[] b, int off, int len ) throws IOException
+ {
+ waitForData();
+ if( released )
+ {
+ return -1;
+ }
+
+ int readBytes;
+ if( len > buf.remaining() )
+ {
+ readBytes = buf.remaining();
+ }
+ else
+ {
+ readBytes = len;
+ }
+ buf.get( b, off, readBytes );
+
+ return readBytes;
+ }
+
+ public synchronized void reset() throws IOException
+ {
+ throw new IOException( "Mark is not supported." );
+ }
+
+ private void waitForData() throws IOException
+ {
+ if( released )
+ {
+ throw new IOException( "Stream is closed." );
+ }
+
+ waiters ++;
+ while( !released && buf.remaining() == 0 && exception == null )
+ {
+ try
+ {
+ this.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ waiters --;
+
+ if( exception != null )
+ {
+ releaseBuffer();
+ throw exception;
+ }
+
+ if( closed && buf.remaining() == 0 )
+ {
+ releaseBuffer();
+ }
+ }
+
+ private void releaseBuffer()
+ {
+ if( released )
+ {
+ return;
+ }
+
+ released = true;
+ buf.release();
+ }
+
+ synchronized void write( ByteBuffer src )
+ {
+ if( closed )
+ {
+ return;
+ }
+
+ if( buf.hasRemaining() )
+ {
+ this.buf.compact();
+ this.buf.put( src );
+ this.buf.flip();
+ }
+ else
+ {
+ this.buf.clear();
+ this.buf.put( src );
+ this.buf.flip();
+ this.notify();
+ }
+ }
+
+ synchronized void throwException( IOException e )
+ {
+ if( exception == null )
+ {
+ exception = e;
+
+ if( waiters != 0 )
+ {
+ this.notifyAll();
+ }
+ }
+ }
+}
\ No newline at end of file
Added: directory/network/branches/0.8/src/java/org/apache/mina/io/handler/IoSessionOutputStream.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/0.8/src/java/org/apache/mina/io/handler/IoSessionOutputStream.java?rev=349399&view=auto
==============================================================================
--- directory/network/branches/0.8/src/java/org/apache/mina/io/handler/IoSessionOutputStream.java (added)
+++ directory/network/branches/0.8/src/java/org/apache/mina/io/handler/IoSessionOutputStream.java Mon Nov 28 00:28:41 2005
@@ -0,0 +1,73 @@
+/*
+ * @(#) $Id: AbstractIoFilterChain.java 330415 2005-11-03 02:19:03Z trustin $
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.io.handler;
+
+import java.io.OutputStream;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.io.IoSession;
+
+/**
+ * An {@link OutputStream} that forwards all write operations to
+ * the associated {@link IoSession}.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ *
+ */
+class IoSessionOutputStream extends OutputStream
+{
+ private final IoSession session;
+
+ IoSessionOutputStream( IoSession session )
+ {
+ this.session = session;
+ }
+
+ public void close()
+ {
+ session.close( true );
+ }
+
+ public void flush()
+ {
+ }
+
+ public void write( byte[] b, int off, int len )
+ {
+ ByteBuffer buf = ByteBuffer.wrap( b, off, len );
+ buf.acquire(); // prevent from being pooled.
+ session.write( buf, null );
+ }
+
+ public void write( byte[] b )
+ {
+ ByteBuffer buf = ByteBuffer.wrap( b );
+ buf.acquire(); // prevent from being pooled.
+ session.write( buf, null );
+ }
+
+ public void write( int b )
+ {
+ ByteBuffer buf = ByteBuffer.allocate( 1 );
+ buf.put( ( byte ) b );
+ buf.flip();
+ session.write( buf, null );
+ }
+}
Modified: directory/network/branches/0.8/src/java/org/apache/mina/io/handler/StreamIoHandler.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/0.8/src/java/org/apache/mina/io/handler/StreamIoHandler.java?rev=349399&r1=349398&r2=349399&view=diff
==============================================================================
--- directory/network/branches/0.8/src/java/org/apache/mina/io/handler/StreamIoHandler.java (original)
+++ directory/network/branches/0.8/src/java/org/apache/mina/io/handler/StreamIoHandler.java Mon Nov 28 00:28:41 2005
@@ -1,9 +1,26 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
package org.apache.mina.io.handler;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.PipedOutputStream;
import java.net.SocketTimeoutException;
import org.apache.mina.common.ByteBuffer;
@@ -26,12 +43,10 @@
*/
public abstract class StreamIoHandler extends IoHandlerAdapter
{
- private static final String KEY_IN = "BlockingIoHandler.in";
- private static final String KEY_OUT = "BlockingIoHandler.out";
- private static final String KEY_STARTED = "BlockingIoHandler.started";
+ private static final String KEY_IN = StreamIoHandler.class.getName() + ".in";
+ private static final String KEY_OUT = StreamIoHandler.class.getName() + ".out";
private int readTimeout;
-
private int writeTimeout;
protected StreamIoHandler()
@@ -92,16 +107,11 @@
session.getConfig().setIdleTime( IdleStatus.READER_IDLE, readTimeout );
// Create streams
- PipedOutputStream out = new PipedOutputStream();
+ InputStream in = new IoSessionInputStream();
+ OutputStream out = new IoSessionOutputStream( session );
+ session.setAttribute( KEY_IN, in );
session.setAttribute( KEY_OUT, out );
- try
- {
- session.setAttribute( KEY_IN, new PipedInputStream( out ) );
- }
- catch( IOException e )
- {
- throw new StreamIoException( e );
- }
+ processStreamIo( session, in, out );
}
/**
@@ -109,14 +119,10 @@
*/
public void sessionClosed( IoSession session )
{
- final PipedOutputStream out = ( PipedOutputStream ) session.getAttribute( KEY_OUT );
- try {
- out.close();
- }
- catch( IOException e )
- {
- throw new StreamIoException( e );
- }
+ IoSessionInputStream in = ( IoSessionInputStream ) session.getAttribute( KEY_IN );
+ IoSessionOutputStream out = ( IoSessionOutputStream ) session.getAttribute( KEY_OUT );
+ in.close();
+ out.close();
}
/**
@@ -124,33 +130,8 @@
*/
public void dataRead( IoSession session, ByteBuffer buf )
{
- final PipedInputStream in = ( PipedInputStream ) session.getAttribute( KEY_IN );
- final PipedOutputStream out = ( PipedOutputStream ) session.getAttribute( KEY_OUT );
-
- java.nio.ByteBuffer nioBuf = buf.buf();
- int offset = nioBuf.position();
- int length = nioBuf.limit() - offset;
- if( !nioBuf.hasArray() )
- {
- ByteBuffer heapBuf = ByteBuffer.allocate( length, false );
- heapBuf.put( buf );
- heapBuf.flip();
- nioBuf = heapBuf.buf();
- offset = 0;
- }
-
- try
- {
- out.write( nioBuf.array(), offset, length );
- }
- catch( IOException e )
- {
- throw new StreamIoException( e );
- }
- finally
- {
- beginService( session, in );
- }
+ final IoSessionInputStream in = ( IoSessionInputStream ) session.getAttribute( KEY_IN );
+ in.write( buf );
}
/**
@@ -158,7 +139,7 @@
*/
public void exceptionCaught( IoSession session, Throwable cause )
{
- final PipedInputStream in = ( PipedInputStream ) session.getAttribute( KEY_IN );
+ final IoSessionInputStream in = ( IoSessionInputStream ) session.getAttribute( KEY_IN );
IOException e = null;
if( cause instanceof StreamIoException )
@@ -172,8 +153,7 @@
if( e != null && in != null )
{
- in.setException( e );
- beginService( session, in );
+ in.throwException( e );
}
else
{
@@ -194,91 +174,6 @@
}
}
- private void beginService( IoSession session, PipedInputStream in )
- {
- if( session.getAttribute( KEY_STARTED ) == null )
- {
- session.setAttribute( KEY_STARTED, Boolean.TRUE );
- processStreamIo( session, in, new ServiceOutputStream( session ) );
- }
- }
-
- private static class PipedInputStream extends java.io.PipedInputStream
- {
- private IOException exception;
-
- public PipedInputStream(PipedOutputStream src) throws IOException
- {
- super( src );
- }
-
- public void setException( IOException e )
- {
- this.exception = e;
- }
-
- public synchronized int read() throws IOException
- {
- throwException();
- return super.read();
- }
-
- public synchronized int read( byte[] b, int off, int len ) throws IOException
- {
- throwException();
- return super.read( b, off, len );
- }
-
- private void throwException() throws IOException
- {
- if( exception != null )
- {
- throw exception;
- }
- }
- }
-
- private static class ServiceOutputStream extends OutputStream
- {
- private final IoSession session;
-
- public ServiceOutputStream( IoSession session )
- {
- this.session = session;
- }
-
- public void close()
- {
- session.close( true );
- }
-
- public void flush()
- {
- }
-
- public void write( byte[] b, int off, int len )
- {
- ByteBuffer buf = ByteBuffer.wrap( b, off, len );
- buf.acquire(); // prevent from being pooled.
- session.write( buf, null );
- }
-
- public void write( byte[] b )
- {
- ByteBuffer buf = ByteBuffer.wrap( b );
- buf.acquire(); // prevent from being pooled.
- session.write( buf, null );
- }
-
- public void write( int b )
- {
- ByteBuffer buf = ByteBuffer.allocate( 1 );
- buf.put( ( byte ) b );
- buf.flip();
- session.write( buf, null );
- }
- }
-
private static class StreamIoException extends RuntimeException
{
private static final long serialVersionUID = 3976736960742503222L;
Added: directory/network/trunk/src/java/org/apache/mina/handler/support/IoSessionInputStream.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/handler/support/IoSessionInputStream.java?rev=349399&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/handler/support/IoSessionInputStream.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/handler/support/IoSessionInputStream.java Mon Nov 28 00:28:41 2005
@@ -0,0 +1,204 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.handler.support;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+
+/**
+ * An {@link InputStream} that buffers data read from
+ * {@link IoHandler#messageReceived(IoSession, Object)} events.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ *
+ */
+public class IoSessionInputStream extends InputStream
+{
+ private final ByteBuffer buf;
+ private boolean closed;
+ private boolean released;
+ private IOException exception;
+ private int waiters;
+
+ public IoSessionInputStream()
+ {
+ buf = ByteBuffer.allocate( 16 );
+ buf.setAutoExpand( true );
+ buf.limit( 0 );
+ }
+
+ public synchronized int available()
+ {
+ if( released )
+ {
+ return 0;
+ }
+ else
+ {
+ return buf.remaining();
+ }
+ }
+
+ public synchronized void close()
+ {
+ if( closed )
+ {
+ return;
+ }
+
+ closed = true;
+ releaseBuffer();
+
+ if( waiters != 0 )
+ {
+ this.notifyAll();
+ }
+ }
+
+ public void mark( int readlimit )
+ {
+ }
+
+ public boolean markSupported()
+ {
+ return false;
+ }
+
+ public synchronized int read() throws IOException
+ {
+ waitForData();
+ if( released )
+ {
+ return -1;
+ }
+
+ int ret = buf.get() & 0xff;
+ return ret;
+ }
+
+ public synchronized int read( byte[] b, int off, int len ) throws IOException
+ {
+ waitForData();
+ if( released )
+ {
+ return -1;
+ }
+
+ int readBytes;
+ if( len > buf.remaining() )
+ {
+ readBytes = buf.remaining();
+ }
+ else
+ {
+ readBytes = len;
+ }
+ buf.get( b, off, readBytes );
+
+ return readBytes;
+ }
+
+ public synchronized void reset() throws IOException
+ {
+ throw new IOException( "Mark is not supported." );
+ }
+
+ private void waitForData() throws IOException
+ {
+ if( released )
+ {
+ throw new IOException( "Stream is closed." );
+ }
+
+ waiters ++;
+ while( !released && buf.remaining() == 0 && exception == null )
+ {
+ try
+ {
+ this.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ waiters --;
+
+ if( exception != null )
+ {
+ releaseBuffer();
+ throw exception;
+ }
+
+ if( closed && buf.remaining() == 0 )
+ {
+ releaseBuffer();
+ }
+ }
+
+ private void releaseBuffer()
+ {
+ if( released )
+ {
+ return;
+ }
+
+ released = true;
+ buf.release();
+ }
+
+ public synchronized void write( ByteBuffer src )
+ {
+ if( closed )
+ {
+ return;
+ }
+
+ if( buf.hasRemaining() )
+ {
+ this.buf.compact();
+ this.buf.put( src );
+ this.buf.flip();
+ }
+ else
+ {
+ this.buf.clear();
+ this.buf.put( src );
+ this.buf.flip();
+ this.notify();
+ }
+ }
+
+ public synchronized void throwException( IOException e )
+ {
+ if( exception == null )
+ {
+ exception = e;
+
+ if( waiters != 0 )
+ {
+ this.notifyAll();
+ }
+ }
+ }
+}
\ No newline at end of file
Propchange: directory/network/trunk/src/java/org/apache/mina/handler/support/IoSessionInputStream.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Added: directory/network/trunk/src/java/org/apache/mina/handler/support/IoSessionOutputStream.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/handler/support/IoSessionOutputStream.java?rev=349399&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/handler/support/IoSessionOutputStream.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/handler/support/IoSessionOutputStream.java Mon Nov 28 00:28:41 2005
@@ -0,0 +1,73 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.handler.support;
+
+import java.io.OutputStream;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+
+/**
+ * An {@link OutputStream} that forwards all write operations to
+ * the associated {@link IoSession}.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ *
+ */
+public class IoSessionOutputStream extends OutputStream
+{
+ private final IoSession session;
+
+ public IoSessionOutputStream( IoSession session )
+ {
+ this.session = session;
+ }
+
+ public void close()
+ {
+ session.close().join();
+ }
+
+ public void flush()
+ {
+ }
+
+ public void write( byte[] b, int off, int len )
+ {
+ ByteBuffer buf = ByteBuffer.wrap( b, off, len );
+ buf.acquire(); // prevent from being pooled.
+ session.write( buf );
+ }
+
+ public void write( byte[] b )
+ {
+ ByteBuffer buf = ByteBuffer.wrap( b );
+ buf.acquire(); // prevent from being pooled.
+ session.write( buf );
+ }
+
+ public void write( int b )
+ {
+ ByteBuffer buf = ByteBuffer.allocate( 1 );
+ buf.put( ( byte ) b );
+ buf.flip();
+ session.write( buf );
+ }
+}
Propchange: directory/network/trunk/src/java/org/apache/mina/handler/support/IoSessionOutputStream.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision