You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2019/05/01 13:13:32 UTC

[tomcat] branch master updated: Add classic NIO2 style read and write

This is an automated email from the ASF dual-hosted git repository.

remm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/master by this push:
     new 2abb1b9  Add classic NIO2 style read and write
2abb1b9 is described below

commit 2abb1b9d4d5b1d2e3aa9b0a5907c4c81b61ac367
Author: remm <re...@apache.org>
AuthorDate: Wed May 1 15:13:20 2019 +0200

    Add classic NIO2 style read and write
    
    Possible use with CompletableFuture which would need exceptions or
    completion handler failed call to proceed.
---
 java/org/apache/tomcat/util/net/Nio2Endpoint.java  | 20 ++++++--
 java/org/apache/tomcat/util/net/NioEndpoint.java   | 20 ++++++--
 .../apache/tomcat/util/net/SocketWrapperBase.java  | 54 +++++++++++++++++++++-
 webapps/docs/changelog.xml                         |  5 ++
 4 files changed, 90 insertions(+), 9 deletions(-)

diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
index 6d3e5d0..815713a 100644
--- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java
+++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java
@@ -31,6 +31,8 @@ import java.nio.channels.CompletionHandler;
 import java.nio.channels.FileChannel;
 import java.nio.channels.InterruptedByTimeoutException;
 import java.nio.channels.NetworkChannel;
+import java.nio.channels.ReadPendingException;
+import java.nio.channels.WritePendingException;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.concurrent.ExecutionException;
@@ -1007,7 +1009,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
             }
             // Disable any regular read notifications caused by registerReadInterest
             readNotify = true;
-            if (block != BlockingMode.NON_BLOCK) {
+            if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
                 try {
                     if (!readPending.tryAcquire(timeout, unit)) {
                         handler.failed(new SocketTimeoutException(), attachment);
@@ -1019,7 +1021,12 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
                 }
             } else {
                 if (!readPending.tryAcquire()) {
-                    return CompletionState.NOT_DONE;
+                    if (block == BlockingMode.NON_BLOCK) {
+                        return CompletionState.NOT_DONE;
+                    } else {
+                        handler.failed(new ReadPendingException(), attachment);
+                        return CompletionState.ERROR;
+                    }
                 }
             }
             OperationState<A> state = new OperationState<>(true, dsts, offset, length, block,
@@ -1076,7 +1083,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
             }
             // Disable any regular write notifications caused by registerWriteInterest
             writeNotify = true;
-            if (block != BlockingMode.NON_BLOCK) {
+            if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
                 try {
                     if (!writePending.tryAcquire(timeout, unit)) {
                         handler.failed(new SocketTimeoutException(), attachment);
@@ -1088,7 +1095,12 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS
                 }
             } else {
                 if (!writePending.tryAcquire()) {
-                    return CompletionState.NOT_DONE;
+                    if (block == BlockingMode.NON_BLOCK) {
+                        return CompletionState.NOT_DONE;
+                    } else {
+                        handler.failed(new WritePendingException(), attachment);
+                        return CompletionState.ERROR;
+                    }
                 }
             }
             if (!socketBufferHandler.isWriteBufferEmpty()) {
diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java
index eb9b87a..259dc64 100644
--- a/java/org/apache/tomcat/util/net/NioEndpoint.java
+++ b/java/org/apache/tomcat/util/net/NioEndpoint.java
@@ -31,11 +31,13 @@ import java.nio.channels.CompletionHandler;
 import java.nio.channels.FileChannel;
 import java.nio.channels.InterruptedByTimeoutException;
 import java.nio.channels.NetworkChannel;
+import java.nio.channels.ReadPendingException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.WritableByteChannel;
+import java.nio.channels.WritePendingException;
 import java.util.ConcurrentModificationException;
 import java.util.Iterator;
 import java.util.concurrent.CountDownLatch;
@@ -1568,7 +1570,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
             } else if (unit.toMillis(timeout) != getReadTimeout()) {
                 setReadTimeout(unit.toMillis(timeout));
             }
-            if (block != BlockingMode.NON_BLOCK) {
+            if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
                 try {
                     if (!readPending.tryAcquire(timeout, unit)) {
                         handler.failed(new SocketTimeoutException(), attachment);
@@ -1580,7 +1582,12 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
                 }
             } else {
                 if (!readPending.tryAcquire()) {
-                    return CompletionState.NOT_DONE;
+                    if (block == BlockingMode.NON_BLOCK) {
+                        return CompletionState.NOT_DONE;
+                    } else {
+                        handler.failed(new ReadPendingException(), attachment);
+                        return CompletionState.ERROR;
+                    }
                 }
             }
             VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
@@ -1634,7 +1641,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
             } else if (unit.toMillis(timeout) != getWriteTimeout()) {
                 setWriteTimeout(unit.toMillis(timeout));
             }
-            if (block != BlockingMode.NON_BLOCK) {
+            if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
                 try {
                     if (!writePending.tryAcquire(timeout, unit)) {
                         handler.failed(new SocketTimeoutException(), attachment);
@@ -1646,7 +1653,12 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
                 }
             } else {
                 if (!writePending.tryAcquire()) {
-                    return CompletionState.NOT_DONE;
+                    if (block == BlockingMode.NON_BLOCK) {
+                        return CompletionState.NOT_DONE;
+                    } else {
+                        handler.failed(new WritePendingException(), attachment);
+                        return CompletionState.ERROR;
+                    }
                 }
             }
             if (!socketBufferHandler.isWriteBufferEmpty()) {
diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
index cd2e619..b09284a 100644
--- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java
+++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java
@@ -808,7 +808,12 @@ public abstract class SocketWrapperBase<E> {
 
     public enum BlockingMode {
         /**
-         * The operation will now block. If there are pending operations,
+         * The operation will not block. If there are pending operations,
+         * the operation will throw a pending exception.
+         */
+        CLASSIC,
+        /**
+         * The operation will not block. If there are pending operations,
          * the operation will return CompletionState.NOT_DONE.
          */
         NON_BLOCK,
@@ -1006,6 +1011,29 @@ public abstract class SocketWrapperBase<E> {
 
     /**
      * Scatter read. The completion handler will be called once some
+     * data has been read or an error occurred. The default NIO2
+     * behavior is used: the completion handler will be called as soon
+     * as some data has been read, even if the read has completed inline.
+     *
+     * @param timeout timeout duration for the read
+     * @param unit units for the timeout duration
+     * @param attachment an object to attach to the I/O operation that will be
+     *        used when calling the completion handler
+     * @param handler to call when the IO is complete
+     * @param dsts buffers
+     * @param <A> The attachment type
+     * @return the completion state (done, done inline, or still pending)
+     */
+    public final <A> CompletionState read(long timeout, TimeUnit unit, A attachment,
+            CompletionHandler<Long, ? super A> handler, ByteBuffer... dsts) {
+        if (dsts == null) {
+            throw new IllegalArgumentException();
+        }
+        return read(dsts, 0, dsts.length, BlockingMode.CLASSIC, timeout, unit, attachment, null, handler);
+    }
+
+    /**
+     * Scatter read. The completion handler will be called once some
      * data has been read or an error occurred. If a CompletionCheck
      * object has been provided, the completion handler will only be
      * called if the callHandler method returned true. If no
@@ -1063,6 +1091,30 @@ public abstract class SocketWrapperBase<E> {
 
     /**
      * Gather write. The completion handler will be called once some
+     * data has been written or an error occurred. The default NIO2
+     * behavior is used: the completion handler will be called, even
+     * if the write is incomplete and data remains in the buffers, or
+     * if the write completed inline.
+     *
+     * @param timeout timeout duration for the write
+     * @param unit units for the timeout duration
+     * @param attachment an object to attach to the I/O operation that will be
+     *        used when calling the completion handler
+     * @param handler to call when the IO is complete
+     * @param srcs buffers
+     * @param <A> The attachment type
+     * @return the completion state (done, done inline, or still pending)
+     */
+    public final <A> CompletionState write(long timeout, TimeUnit unit, A attachment,
+            CompletionHandler<Long, ? super A> handler, ByteBuffer... srcs) {
+        if (srcs == null) {
+            throw new IllegalArgumentException();
+        }
+        return write(srcs, 0, srcs.length, BlockingMode.CLASSIC, timeout, unit, attachment, null, handler);
+    }
+
+    /**
+     * Gather write. The completion handler will be called once some
      * data has been written or an error occurred. If a CompletionCheck
      * object has been provided, the completion handler will only be
      * called if the callHandler method returned true. If no
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index c65b93b..21d2af2 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -166,6 +166,11 @@
         Refactor Hostname validation to improve performance. Patch provided by
         Uwe Hees. (markt)
       </scode>
+      <update>
+        Add additional NIO2 style read and write methods closer to core NIO2,
+        for possible use with an asynchronous workflow like CompletableFuture.
+        (remm)
+      </update>
     </changelog>
   </subsection>
   <subsection name="Other">


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org