You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2019/05/14 14:21:04 UTC
[tomcat] branch master updated: Avoid blocking write of internal
buffer
This is an automated email from the ASF dual-hosted git repository.
remm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/master by this push:
new 0b7f845 Avoid blocking write of internal buffer
0b7f845 is described below
commit 0b7f845e191e6e88fe364190e4adfa6900d8e468
Author: remm <re...@apache.org>
AuthorDate: Tue May 14 16:20:54 2019 +0200
Avoid blocking write of internal buffer
This introduces some "useless" code, but this is to align with NIO and
APR code before moving code into SocketWrapperBase. There are a couple
of slightly risky changes (the async write of the write buffer, and
delaying setting the read/writeNotify flags until after getting the
semaphores) that could cause CI instability again (as usual the
testsuite passes for me ...).
---
java/org/apache/tomcat/util/net/Nio2Endpoint.java | 215 +++++++++++-----------
webapps/docs/changelog.xml | 3 +-
2 files changed, 107 insertions(+), 111 deletions(-)
diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
index b56cbad..8f2de8d 100644
--- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
+++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
@@ -37,6 +37,7 @@ import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -953,22 +954,23 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
/**
* Internal state tracker for scatter/gather operations.
*/
- private static class OperationState<A> {
- private final boolean read;
- private final ByteBuffer[] buffers;
- private final int offset;
- private final int length;
- private final A attachment;
- private final long timeout;
- private final TimeUnit unit;
- private final BlockingMode block;
- private final CompletionCheck check;
- private final CompletionHandler<Long, ? super A> handler;
- private final Semaphore semaphore;
- private OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
+ protected class OperationState<A> implements Runnable {
+ protected final boolean read;
+ protected final ByteBuffer[] buffers;
+ protected final int offset;
+ protected final int length;
+ protected final A attachment;
+ protected final long timeout;
+ protected final TimeUnit unit;
+ protected final BlockingMode block;
+ protected final CompletionCheck check;
+ protected final CompletionHandler<Long, ? super A> handler;
+ protected final Semaphore semaphore;
+ protected final VectoredIOCompletionHandler<A> completion;
+ protected OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
BlockingMode block, long timeout, TimeUnit unit, A attachment,
CompletionCheck check, CompletionHandler<Long, ? super A> handler,
- Semaphore semaphore) {
+ Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
this.read = read;
this.buffers = buffers;
this.offset = offset;
@@ -980,102 +982,115 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
this.check = check;
this.handler = handler;
this.semaphore = semaphore;
+ this.completion = completion;
}
- private volatile long nBytes = 0;
- private volatile CompletionState state = CompletionState.PENDING;
- }
+ protected volatile long nBytes = 0;
+ protected volatile CompletionState state = CompletionState.PENDING;
- @Override
- public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
- BlockingMode block, long timeout, TimeUnit unit, A attachment,
- CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
- IOException ioe = getError();
- if (ioe != null) {
- handler.failed(ioe, attachment);
- return CompletionState.ERROR;
- }
- if (timeout == -1) {
- timeout = toTimeout(getReadTimeout());
+ public boolean isInline() {
+ return Nio2Endpoint.isInline();
}
- // Disable any regular read notifications caused by registerReadInterest
- readNotify = true;
- if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
+
+ public boolean process() {
try {
- if (!readPending.tryAcquire(timeout, unit)) {
- handler.failed(new SocketTimeoutException(), attachment);
- return CompletionState.ERROR;
- }
- } catch (InterruptedException e) {
- handler.failed(e, attachment);
- return CompletionState.ERROR;
- }
- } else {
- if (!readPending.tryAcquire()) {
- if (block == BlockingMode.NON_BLOCK) {
- return CompletionState.NOT_DONE;
- } else {
- handler.failed(new ReadPendingException(), attachment);
- return CompletionState.ERROR;
- }
+ getEndpoint().getExecutor().execute(this);
+ } catch (RejectedExecutionException ree) {
+ log.warn(sm.getString("endpoint.executor.fail", Nio2SocketWrapper.this) , ree);
+ return false;
+ } catch (Throwable t) {
+ ExceptionUtils.handleThrowable(t);
+ // This means we got an OOM or similar creating a thread, or that
+ // the pool and its queue are full
+ log.error(sm.getString("endpoint.process.fail"), t);
+ return false;
}
+ return true;
}
- OperationState<A> state = new OperationState<>(true, dsts, offset, length, block,
- timeout, unit, attachment, check, handler, readPending);
- VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
- Nio2Endpoint.startInline();
- long nBytes = 0;
- if (!socketBufferHandler.isReadBufferEmpty()) {
- // There is still data inside the main read buffer, use it to fill out the destination buffers
- synchronized (readCompletionHandler) {
- // Note: It is not necessary to put this code in the completion handler
- socketBufferHandler.configureReadBufferForRead();
- for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) {
- nBytes += transfer(socketBufferHandler.getReadBuffer(), dsts[offset + i]);
- }
- }
- if (nBytes > 0) {
- completion.completed(Long.valueOf(nBytes), state);
+
+ public void start() {
+ if (read) {
+ // Disable any regular read notifications caused by registerReadInterest
+ readNotify = true;
+ } else {
+ // Disable any regular write notifications caused by registerWriteInterest
+ writeNotify = true;
}
+ Nio2Endpoint.startInline();
+ run();
+ Nio2Endpoint.endInline();
}
- if (nBytes == 0) {
- getSocket().read(dsts, offset, length, timeout, unit, state, completion);
- }
- Nio2Endpoint.endInline();
- if (block == BlockingMode.BLOCK) {
- synchronized (state) {
- if (state.state == CompletionState.PENDING) {
- try {
- state.wait(unit.toMillis(timeout));
- if (state.state == CompletionState.PENDING) {
- return CompletionState.ERROR;
+
+ @Override
+ public void run() {
+ if (read) {
+ long nBytes = 0;
+ if (!socketBufferHandler.isReadBufferEmpty()) {
+ // There is still data inside the main read buffer, use it to fill out the destination buffers
+ synchronized (readCompletionHandler) {
+ // Note: It is not necessary to put this code in the completion handler
+ socketBufferHandler.configureReadBufferForRead();
+ for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) {
+ nBytes += transfer(socketBufferHandler.getReadBuffer(), buffers[offset + i]);
}
- } catch (InterruptedException e) {
- handler.failed(new SocketTimeoutException(), attachment);
- return CompletionState.ERROR;
}
+ if (nBytes > 0) {
+ completion.completed(Long.valueOf(nBytes), this);
+ }
+ }
+ if (nBytes == 0) {
+ getSocket().read(buffers, offset, length, timeout, unit, this, completion);
}
+ } else {
+ if (!socketBufferHandler.isWriteBufferEmpty()) {
+ // First flush the main buffer as needed
+ socketBufferHandler.configureWriteBufferForRead();
+ getSocket().write(socketBufferHandler.getWriteBuffer(), null, new CompletionHandler<Integer, Void>() {
+ @Override
+ public void completed(Integer result, Void attachment) {
+ run();
+ }
+ @Override
+ public void failed(Throwable exc, Void attachment) {
+ handler.failed(exc, OperationState.this.attachment);
+ }
+ });
+ return;
+ }
+ // It should be less necessary to check the buffer state as it is easy to flush before
+ getSocket().write(buffers, offset, length, timeout, unit, this, completion);
}
}
- return state.state;
+ }
+
+ @Override
+ public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
+ return readOrWrite(true, dsts, offset, length, block, timeout, unit, attachment, check, handler);
}
@Override
public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
BlockingMode block, long timeout, TimeUnit unit, A attachment,
CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
+ return readOrWrite(false, srcs, offset, length, block, timeout, unit, attachment, check, handler);
+ }
+
+ private <A> CompletionState readOrWrite(boolean read,
+ ByteBuffer[] buffers, int offset, int length,
+ BlockingMode block, long timeout, TimeUnit unit, A attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
IOException ioe = getError();
if (ioe != null) {
handler.failed(ioe, attachment);
return CompletionState.ERROR;
}
if (timeout == -1) {
- timeout = toTimeout(getWriteTimeout());
+ timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout());
}
- // Disable any regular write notifications caused by registerWriteInterest
- writeNotify = true;
if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
try {
- if (!writePending.tryAcquire(timeout, unit)) {
+ if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) {
handler.failed(new SocketTimeoutException(), attachment);
return CompletionState.ERROR;
}
@@ -1084,31 +1099,19 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
return CompletionState.ERROR;
}
} else {
- if (!writePending.tryAcquire()) {
+ if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) {
if (block == BlockingMode.NON_BLOCK) {
return CompletionState.NOT_DONE;
} else {
- handler.failed(new WritePendingException(), attachment);
+ handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment);
return CompletionState.ERROR;
}
}
}
- if (!socketBufferHandler.isWriteBufferEmpty()) {
- // First flush the main buffer as needed
- try {
- doWrite(true);
- } catch (IOException e) {
- handler.failed(e, attachment);
- return CompletionState.ERROR;
- }
- }
- OperationState<A> state = new OperationState<>(false, srcs, offset, length, block,
- timeout, unit, attachment, check, handler, writePending);
VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
- Nio2Endpoint.startInline();
- // It should be less necessary to check the buffer state as it is easy to flush before
- getSocket().write(srcs, offset, length, timeout, unit, state, completion);
- Nio2Endpoint.endInline();
+ OperationState<A> state = new OperationState<>(read, buffers, offset, length, block, timeout, unit,
+ attachment, check, handler, read ? readPending : writePending, completion);
+ state.start();
if (block == BlockingMode.BLOCK) {
synchronized (state) {
if (state.state == CompletionState.PENDING) {
@@ -1134,7 +1137,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
failed(new EOFException(), state);
} else {
state.nBytes += nBytes.longValue();
- CompletionState currentState = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE;
+ CompletionState currentState = state.isInline() ? CompletionState.INLINE : CompletionState.DONE;
boolean complete = true;
boolean completion = true;
if (state.check != null) {
@@ -1163,13 +1166,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
}
}
} else {
- if (state.read) {
- getSocket().read(state.buffers, state.offset, state.length,
- state.timeout, state.unit, state, this);
- } else {
- getSocket().write(state.buffers, state.offset, state.length,
- state.timeout, state.unit, state, this);
- }
+ state.run();
}
}
}
@@ -1188,14 +1185,14 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
if (state.block == BlockingMode.BLOCK) {
notify = true;
} else {
- state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE;
+ state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE;
}
if (state.handler != null) {
state.handler.failed(exc, state.attachment);
}
if (notify) {
synchronized (state) {
- state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR : CompletionState.DONE;
+ state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE;
state.notify();
}
}
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 4f83bb2..1bbaa2f 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -106,8 +106,7 @@
default due to low performance. (remm)
</add>
<fix>
- Avoid blocking write of internal buffer for NIO when using async IO.
- (remm)
+ Avoid blocking write of internal buffer when using async IO. (remm)
</fix>
</changelog>
</subsection>
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org