You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/07/31 22:31:28 UTC
svn commit: r681474 - in
/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io:
IoReceiver.java IoSender.java IoTransport.java
Author: rhs
Date: Thu Jul 31 13:31:28 2008
New Revision: 681474
URL: http://svn.apache.org/viewvc?rev=681474&view=rev
Log:
QPID-1207: fixed io transport close to ensure threads shutdown properly
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=681474&r1=681473&r2=681474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Thu Jul 31 13:31:28 2008
@@ -28,6 +28,7 @@
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* IoReceiver
@@ -43,18 +44,50 @@
private final Receiver<ByteBuffer> receiver;
private final int bufferSize;
private final Socket socket;
+ private final long timeout;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
- public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver, int bufferSize)
+ public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver,
+ int bufferSize, long timeout)
{
this.transport = transport;
this.receiver = receiver;
this.bufferSize = bufferSize;
this.socket = transport.getSocket();
+ this.timeout = timeout;
- setName(String.format("IoReceive - %s", socket.getRemoteSocketAddress()));
+ setDaemon(true);
+ setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
start();
}
+ void close()
+ {
+ if (!closed.getAndSet(true))
+ {
+ try
+ {
+ socket.shutdownInput();
+ if (Thread.currentThread() != this)
+ {
+ join(timeout);
+ if (isAlive())
+ {
+ throw new TransportException("join timed out");
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new TransportException(e);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+ }
+ }
+
public void run()
{
final int threshold = bufferSize / 2;
@@ -67,7 +100,7 @@
InputStream in = socket.getInputStream();
int read = 0;
int offset = 0;
- while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
+ while (!closed.get() && (read = in.read(buffer, offset, bufferSize-offset)) != -1)
{
if (read > 0)
{
@@ -84,22 +117,12 @@
}
catch (Throwable t)
{
- receiver.exception(new TransportException("error in read thread", t));
+ receiver.exception(t);
}
finally
{
- try
- {
- transport.getSender().close();
- }
- catch (TransportException e)
- {
- log.error(e, "error closing");
- }
- finally
- {
- receiver.closed();
- }
+ receiver.closed();
+ transport.getSender().close();
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=681474&r1=681473&r2=681474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Thu Jul 31 13:31:28 2008
@@ -24,7 +24,6 @@
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
@@ -55,7 +54,7 @@
private final Object notEmpty = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
- private IOException exception = null;
+ private volatile Throwable exception = null;
public IoSender(IoTransport transport, int bufferSize, long timeout)
@@ -74,6 +73,7 @@
throw new TransportException("Error getting output stream for socket", e);
}
+ setDaemon(true);
setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
start();
}
@@ -159,6 +159,11 @@
public void close()
{
+ close(true);
+ }
+
+ void close(boolean reportException)
+ {
if (!closed.getAndSet(true))
{
synchronized (notEmpty)
@@ -168,11 +173,15 @@
try
{
- join(timeout);
- if (isAlive())
+ if (Thread.currentThread() != this)
{
- throw new TransportException("join timed out");
+ join(timeout);
+ if (isAlive())
+ {
+ throw new TransportException("join timed out");
+ }
}
+ transport.getReceiver().close();
socket.close();
}
catch (InterruptedException e)
@@ -184,7 +193,7 @@
throw new TransportException(e);
}
- if (exception != null)
+ if (reportException && exception != null)
{
throw new TransportException(exception);
}
@@ -246,6 +255,7 @@
{
log.error(e, "error in write thread");
exception = e;
+ close(false);
break;
}
tail.getAndAdd(length);
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=681474&r1=681473&r2=681474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java Thu Jul 31 13:31:28 2008
@@ -106,7 +106,8 @@
Connection conn = new Connection
(new Disassembler(new OutputHandler(sender), 64*1024 - 1),
delegate);
- receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)), 2*readBufferSize);
+ receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)),
+ 2*readBufferSize, timeout);
return conn;
}