You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2015/04/08 11:53:45 UTC
svn commit: r1672050 - in /tomcat/trunk/java/org/apache/tomcat/util/net:
Nio2Channel.java Nio2Endpoint.java SecureNio2Channel.java
Author: remm
Date: Wed Apr 8 09:53:45 2015
New Revision: 1672050
URL: http://svn.apache.org/r1672050
Log:
Add read, fixes and cleanups.
Modified:
tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java
tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java?rev=1672050&r1=1672049&r2=1672050&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java Wed Apr 8 09:53:45 2015
@@ -135,6 +135,12 @@ public class Nio2Channel implements Asyn
sc.read(dst, timeout, unit, attachment, handler);
}
+ public <A> void read(ByteBuffer[] dsts,
+ int offset, int length, long timeout, TimeUnit unit,
+ A attachment, CompletionHandler<Long,? super A> handler) {
+ sc.read(dsts, offset, length, timeout, unit, attachment, handler);;
+ }
+
@Override
public Future<Integer> write(ByteBuffer src) {
return sc.write(src);
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1672050&r1=1672049&r2=1672050&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Wed Apr 8 09:53:45 2015
@@ -1044,7 +1044,60 @@ public class Nio2Endpoint extends Abstra
}
// TODO: NIO2 style scatter/gather methods.
- // TODO: SecureNio2Channel gather would need to be improved
+ // TODO: SecureNio2Channel scatter/gather would need to be improved
+
+ public enum CompletionState {
+ /**
+ * Operation is pending and the completion handler will
+ * be called later.
+ */
+ PENDING,
+ /**
+ * The operation completed inline, and the completion handler
+ * will not be called unless an error occurred.
+ */
+ INLINE,
+ /**
+ * The operation completed, but not inline.
+ */
+ DONE
+ }
+
+ public enum CompletionHandlerCall {
+ /**
+ * Operation should continue, the completion handler shouldn't be
+ * called.
+ */
+ CONTINUE,
+ /**
+ * The operation completed but the completion handler shouldn't be
+ * called. This is possibly useful if the operation completed
+ * inline.
+ */
+ NONE,
+ /**
+ * The operation is complete, call the completion handler.
+ */
+ DONE
+ }
+
+ public interface CompletionCheck {
+ /**
+ * Return true if enough data has been read or written and the
+ * handler should be notified. Return false if the IO is
+ * incomplete (data has not been fully written while it should,
+ * or more data read is needed for further processing) and should
+ * be continued before the completion handler is called.
+ *
+ * @param state of the operation (done or done inline since the
+ * IO call is done)
+ * @param buffers ByteBuffer[] that has been passed to the
+ * original IO call
+ * @param offset that has been passed to the original IO call
+ * @param length that has been passed to the original IO call
+ */
+ public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers, int offset, int length);
+ }
/**
* Internal state tracker for scatter/gather operations.
@@ -1069,13 +1122,64 @@ public class Nio2Endpoint extends Abstra
this.attachment = attachment;
this.check = check;
this.handler = handler;
- this.pos = offset;
}
private long nBytes = 0;
- private int pos;
private CompletionState state = CompletionState.PENDING;
}
+ private class ScatterReadCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
+ @Override
+ public void completed(Long nBytes, OperationState<A> state) {
+ if (nBytes.intValue() < 0) {
+ failed(new EOFException(), state);
+ } else {
+ state.nBytes += nBytes.longValue();
+ CompletionState currentState = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE;
+ boolean complete = true;
+ boolean completion = true;
+ if (state.check != null) {
+ switch (state.check.callHandler(currentState, state.buffers, state.offset, state.length)) {
+ case CONTINUE:
+ complete = false;
+ break;
+ case DONE:
+ break;
+ case NONE:
+ completion = false;
+ break;
+ }
+ }
+ if (complete) {
+ readPending.release();
+ state.state = currentState;
+ if (completion) {
+ state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
+ }
+ } else {
+ getSocket().read(state.buffers, state.offset, state.length,
+ state.timeout, state.unit, state, this);
+ }
+ }
+ }
+ @Override
+ public void failed(Throwable exc, OperationState<A> state) {
+ IOException ioe;
+ if (exc instanceof IOException) {
+ ioe = (IOException) exc;
+ } else {
+ ioe = new IOException(exc);
+ }
+ Nio2SocketWrapper.this.setError(ioe);
+ readPending.release();
+ if (exc instanceof AsynchronousCloseException) {
+ // If already closed, don't call onError and close again
+ return;
+ }
+ state.state = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE;
+ state.handler.failed(ioe, state.attachment);
+ }
+ }
+
private class GatherWriteCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> {
@Override
public void completed(Long nBytes, OperationState<A> state) {
@@ -1083,38 +1187,30 @@ public class Nio2Endpoint extends Abstra
failed(new EOFException(), state);
} else {
state.nBytes += nBytes.longValue();
- if (state.pos == state.offset + state.length) {
+ CompletionState currentState = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE;
+ boolean complete = true;
+ boolean completion = true;
+ if (state.check != null) {
+ switch (state.check.callHandler(currentState, state.buffers, state.offset, state.length)) {
+ case CONTINUE:
+ complete = false;
+ break;
+ case DONE:
+ break;
+ case NONE:
+ completion = false;
+ break;
+ }
+ }
+ if (complete) {
writePending.release();
- state.state = Nio2Endpoint.isInline() ? CompletionState.INLINE : CompletionState.DONE;
- if (state.check == null
- || state.check.callHandler(state.state, state.buffers, state.offset, state.length)
- == CompletionHandlerCall.DONE) {
+ state.state = currentState;
+ if (completion) {
state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
}
} else {
- if (state.check == null) {
- // Call completion handler
- writePending.release();
- state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
- } else {
- boolean inline = Nio2Endpoint.isInline();
- switch (state.check.callHandler(inline ? CompletionState.INLINE : CompletionState.DONE,
- state.buffers, state.offset, state.length)) {
- case CONTINUE:
- getSocket().write(state.buffers, state.offset, state.length,
- state.timeout, state.unit, state, this);
- break;
- case DONE:
- writePending.release();
- state.state = inline ? CompletionState.INLINE : CompletionState.DONE;
- state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
- break;
- case NONE:
- writePending.release();
- state.state = inline ? CompletionState.INLINE : CompletionState.DONE;
- break;
- }
- }
+ getSocket().write(state.buffers, state.offset, state.length,
+ state.timeout, state.unit, state, this);
}
}
}
@@ -1133,62 +1229,10 @@ public class Nio2Endpoint extends Abstra
}
}
- public enum CompletionState {
- /**
- * Operation is pending and the completion handler will
- * be called later.
- */
- PENDING,
- /**
- * The operation completed inline, and the completion handler
- * will not be called unless an error occurred.
- */
- INLINE,
- /**
- * The operation completed, but not inline.
- */
- DONE
- }
-
- public enum CompletionHandlerCall {
- /**
- * Operation should continue, the completion handler shouldn't be
- * called.
- */
- CONTINUE,
- /**
- * The operation completed but the completion handler shouldn't be
- * called. This is possibly useful if the operation completed
- * inline.
- */
- NONE,
- /**
- * The operation is complete, call the completion handler.
- */
- DONE
- }
-
- public interface CompletionCheck {
- /**
- * Return true if enough data has been read or written and the
- * handler should be notified. Return false if the IO is
- * incomplete (data has not been fully written while it should,
- * or more data read is needed for further processing) and should
- * be continued before the completion handler is called.
- *
- * @param state of the operation (done or done inline since the
- * IO call is done)
- * @param buffers ByteBuffer[] that has been passed to the
- * original IO call
- * @param offset that has been passed to the original IO call
- * @param length that has been passed to the original IO call
- */
- public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers, int offset, int length);
- }
-
/**
* This utility CompletionCheck will cause the write to fully write
- * all remaining data.
+ * all remaining data. If the operation completes inline, the
+ * completion handler will not be called.
*/
public static final CompletionCheck COMPLETE_WRITE = new CompletionCheck() {
@Override
@@ -1204,7 +1248,8 @@ public class Nio2Endpoint extends Abstra
/**
* This utility CompletionCheck will cause the completion handler
- * to be called once some data has been read.
+ * to be called once some data has been read. If the operation
+ * completes inline, the completion handler will not be called.
*/
public static final CompletionCheck READ_DATA = new CompletionCheck() {
@Override
@@ -1240,8 +1285,7 @@ public class Nio2Endpoint extends Abstra
OperationState<A> state = new OperationState<>(dsts, offset, length, timeout, unit, attachment, check, handler);
if (readPending.tryAcquire()) {
Nio2Endpoint.startInline();
- // FIXME: Add scatter read to Nio2Channel and ScatterReadCompletionHandler class
- //getSocket().read(dsts, offset, length, timeout, unit, state, new ScatterReadCompletionHandler<>());
+ getSocket().read(dsts, offset, length, timeout, unit, state, new ScatterReadCompletionHandler<>());
Nio2Endpoint.endInline();
} else {
throw new ReadPendingException();
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java?rev=1672050&r1=1672049&r2=1672050&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java Wed Apr 8 09:53:45 2015
@@ -794,6 +794,37 @@ public class SecureNio2Channel extends N
}
@Override
+ public <A> void read(ByteBuffer[] dsts, int offset, int length,
+ long timeout, TimeUnit unit, A attachment,
+ final CompletionHandler<Long, ? super A> handler) {
+ if (offset < 0 || dsts == null || (offset + length) > dsts.length) {
+ throw new IllegalArgumentException();
+ }
+ ByteBuffer dst = null;
+ // Find the first buffer with space
+ for (int i = 0; i < length; i++) {
+ ByteBuffer current = dsts[offset + i];
+ if (current.position() < current.limit()) {
+ dst = current;
+ }
+ }
+ if (dst == null) {
+ throw new IllegalArgumentException();
+ }
+ CompletionHandler<Integer, ? super A> handlerWrapper = new CompletionHandler<Integer, A>() {
+ @Override
+ public void completed(Integer result, A attachment) {
+ handler.completed(Long.valueOf(result.longValue()), attachment);
+ }
+ @Override
+ public void failed(Throwable exc, A attachment) {
+ handler.failed(exc, attachment);
+ }
+ };
+ read(dst, timeout, unit, attachment, handlerWrapper);
+ }
+
+ @Override
public <A> void write(final ByteBuffer src, final long timeout, final TimeUnit unit,
final A attachment, final CompletionHandler<Integer, ? super A> handler) {
// Check state
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org