You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2016/07/16 11:27:18 UTC

svn commit: r1752927 [2/3] - in /httpcomponents/httpcore/trunk/httpcore5/src: examples/org/apache/hc/core5/http/examples/ main/java/org/apache/hc/core5/http/bootstrap/nio/ main/java/org/apache/hc/core5/http/impl/nio/ main/java/org/apache/hc/core5/http/...

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/BaseIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/BaseIOReactor.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/BaseIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/BaseIOReactor.java Sat Jul 16 11:27:17 2016
@@ -34,14 +34,9 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.hc.core5.util.Args;
-
 /**
  * Default implementation of {@link AbstractIOReactor} that serves as a base
- * for more advanced {@link IOReactor}
- * implementations. This class adds support for the I/O event dispatching
- * using {@link IOEventDispatch}, management of buffering sessions, and
- * session timeout handling.
+ * for more advanced {@link IOReactor} implementations.
  *
  * @since 4.0
  */
@@ -53,35 +48,34 @@ public class BaseIOReactor extends Abstr
     private long lastTimeoutCheck;
 
     private IOReactorExceptionHandler exceptionHandler = null;
-    private IOEventDispatch eventDispatch = null;
 
     /**
      * Creates new BaseIOReactor instance.
      *
-     * @param selectTimeout the select timeout.
+     * @param eventHandlerFactory the event handler factory.
+     * @param reactorConfig the reactor configuration.
      * @throws IOReactorException in case if a non-recoverable I/O error.
      */
-    public BaseIOReactor(final long selectTimeout) throws IOReactorException {
-        super(selectTimeout);
+    public BaseIOReactor(
+            final IOEventHandlerFactory eventHandlerFactory,
+            final IOReactorConfig reactorConfig) throws IOReactorException {
+        super(eventHandlerFactory, reactorConfig);
         this.bufferingSessions = new HashSet<>();
-        this.timeoutCheckInterval = selectTimeout;
+        this.timeoutCheckInterval = reactorConfig.getSelectInterval();
         this.lastTimeoutCheck = System.currentTimeMillis();
     }
 
     /**
      * Activates the I/O reactor. The I/O reactor will start reacting to I/O
-     * events and dispatch I/O event notifications to the given
-     * {@link IOEventDispatch}.
+     * events and and dispatch I/O event notifications to the {@link IOEventHandler}
+     * associated with the given I/O session.
      *
      * @throws InterruptedIOException if the dispatch thread is interrupted.
      * @throws IOReactorException in case if a non-recoverable I/O error.
      */
     @Override
-    public void execute(
-            final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
-        Args.notNull(eventDispatch, "Event dispatcher");
-        this.eventDispatch = eventDispatch;
-        execute();
+    public void execute() throws InterruptedIOException, IOReactorException {
+        super.execute();
     }
 
     /**
@@ -128,18 +122,20 @@ public class BaseIOReactor extends Abstr
 
     /**
      * Processes {@link SelectionKey#OP_READ} event on the given selection key.
-     * This method dispatches the event notification to the
-     * {@link IOEventDispatch#inputReady(IOSession)} method.
+     * This method dispatches the event
+     * to the {@link IOEventHandler#inputReady(IOSession)} method of the event
+     * handler associated with the I/O session.
      */
     @Override
     protected void readable(final SelectionKey key) {
         final IOSession session = getSession(key);
         try {
+            final IOEventHandler eventHandler = ensureEventHandler(session);
             // Try to gently feed more data to the event dispatcher
             // if the session input buffer has not been fully exhausted
             // (the choice of 5 iterations is purely arbitrary)
             for (int i = 0; i < 5; i++) {
-                this.eventDispatch.inputReady(session);
+                eventHandler.inputReady(session);
                 if (!session.hasBufferedInput()
                         || (session.getEventMask() & SelectionKey.OP_READ) == 0) {
                     break;
@@ -149,7 +145,7 @@ public class BaseIOReactor extends Abstr
                 this.bufferingSessions.add(session);
             }
         } catch (final CancelledKeyException ex) {
-            queueClosedSession(session);
+            session.shutdown();
             key.attach(null);
         } catch (final RuntimeException ex) {
             handleRuntimeException(ex);
@@ -158,17 +154,18 @@ public class BaseIOReactor extends Abstr
 
     /**
      * Processes {@link SelectionKey#OP_WRITE} event on the given selection key.
-     * This method dispatches the event notification to the
-     * {@link IOEventDispatch#outputReady(IOSession)} method.
+     * This method dispatches the event to
+     * the {@link IOEventHandler#outputReady(IOSession)} method of the event
+     * handler associated with the I/O session.
      */
     @Override
     protected void writable(final SelectionKey key) {
         final IOSession session = getSession(key);
         try {
-            this.eventDispatch.outputReady(session);
+            final IOEventHandler eventHandler = ensureEventHandler(session);
+            eventHandler.outputReady(session);
         } catch (final CancelledKeyException ex) {
-            queueClosedSession(session);
-            key.attach(null);
+            session.shutdown();
         } catch (final RuntimeException ex) {
             handleRuntimeException(ex);
         }
@@ -178,10 +175,6 @@ public class BaseIOReactor extends Abstr
      * Verifies whether any of the sessions associated with the given selection
      * keys timed out by invoking the {@link #timeoutCheck(SelectionKey, long)}
      * method.
-     * <p>
-     * This method will also invoke the
-     * {@link IOEventDispatch#inputReady(IOSession)} method on all sessions
-     * that have buffered input data.
      */
     @Override
     protected void validate(final Set<SelectionKey> keys) {
@@ -203,14 +196,15 @@ public class BaseIOReactor extends Abstr
                 }
                 try {
                     if ((session.getEventMask() & EventMask.READ) > 0) {
-                        this.eventDispatch.inputReady(session);
+                        final IOEventHandler eventHandler = ensureEventHandler(session);
+                        eventHandler.inputReady(session);
                         if (!session.hasBufferedInput()) {
                             it.remove();
                         }
                     }
                 } catch (final CancelledKeyException ex) {
                     it.remove();
-                    queueClosedSession(session);
+                    session.shutdown();
                 } catch (final RuntimeException ex) {
                     handleRuntimeException(ex);
                 }
@@ -220,14 +214,16 @@ public class BaseIOReactor extends Abstr
 
     /**
      * Processes newly created I/O session. This method dispatches the event
-     * notification to the {@link IOEventDispatch#connected(IOSession)} method.
+     * to the {@link IOEventHandler#connected(IOSession)} method of the event
+     * handler associated with the I/O session.
      */
     @Override
-    protected void sessionCreated(final SelectionKey key, final IOSession session) {
+    protected void sessionCreated(final IOSession session) {
         try {
-            this.eventDispatch.connected(session);
+            final IOEventHandler eventHandler = ensureEventHandler(session);
+            eventHandler.connected(session);
         } catch (final CancelledKeyException ex) {
-            queueClosedSession(session);
+            session.shutdown();
         } catch (final RuntimeException ex) {
             handleRuntimeException(ex);
         }
@@ -235,14 +231,16 @@ public class BaseIOReactor extends Abstr
 
     /**
      * Processes timed out I/O session. This method dispatches the event
-     * notification to the {@link IOEventDispatch#timeout(IOSession)} method.
+     * to the {@link IOEventHandler#timeout(IOSession)} method of the event
+     * handler associated with the I/O session.
      */
     @Override
     protected void sessionTimedOut(final IOSession session) {
         try {
-            this.eventDispatch.timeout(session);
+            final IOEventHandler eventHandler = ensureEventHandler(session);
+            eventHandler.timeout(session);
         } catch (final CancelledKeyException ex) {
-            queueClosedSession(session);
+            session.shutdown();
         } catch (final RuntimeException ex) {
             handleRuntimeException(ex);
         }
@@ -250,13 +248,14 @@ public class BaseIOReactor extends Abstr
 
     /**
      * Processes closed I/O session. This method dispatches the event
-     * notification to the {@link IOEventDispatch#disconnected(IOSession)}
-     * method.
+     * to the {@link IOEventHandler#timeout(IOSession)} method of the event
+     * handler associated with the I/O session.
      */
     @Override
     protected void sessionClosed(final IOSession session) {
         try {
-            this.eventDispatch.disconnected(session);
+            final IOEventHandler eventHandler = ensureEventHandler(session);
+            eventHandler.disconnected(session);
         } catch (final CancelledKeyException ex) {
             // ignore
         } catch (final RuntimeException ex) {

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java Sat Jul 16 11:27:17 2016
@@ -54,37 +54,44 @@ public class DefaultConnectingIOReactor
 
     private final Queue<SessionRequestImpl> requestQueue;
 
+    private final long selectInterval;
     private long lastTimeoutCheck;
 
     /**
      * Creates an instance of DefaultConnectingIOReactor with the given configuration.
      *
-     * @param config I/O reactor configuration.
+     * @param eventHandlerFactory the factory to create I/O event handlers.
+     * @param reactorConfig I/O reactor configuration.
      * @param threadFactory the factory to create threads.
      *   Can be {@code null}.
      * @throws IOReactorException in case if a non-recoverable I/O error.
      *
-     * @since 4.2
+     * @since 5.0
      */
     public DefaultConnectingIOReactor(
-            final IOReactorConfig config,
+            final IOEventHandlerFactory eventHandlerFactory,
+            final IOReactorConfig reactorConfig,
             final ThreadFactory threadFactory) throws IOReactorException {
-        super(config, threadFactory);
+        super(eventHandlerFactory, reactorConfig, threadFactory);
         this.requestQueue = new ConcurrentLinkedQueue<>();
+        this.selectInterval = reactorConfig.getSelectInterval();
         this.lastTimeoutCheck = System.currentTimeMillis();
     }
 
     /**
      * Creates an instance of DefaultConnectingIOReactor with the given configuration.
      *
+     * @param eventHandlerFactory the factory to create I/O event handlers.
      * @param config I/O reactor configuration.
      *   Can be {@code null}.
      * @throws IOReactorException in case if a non-recoverable I/O error.
      *
-     * @since 4.2
+     * @since 5.0
      */
-    public DefaultConnectingIOReactor(final IOReactorConfig config) throws IOReactorException {
-        this(config, null);
+    public DefaultConnectingIOReactor(
+            final IOEventHandlerFactory eventHandlerFactory,
+            final IOReactorConfig config) throws IOReactorException {
+        this(eventHandlerFactory, config, null);
     }
 
     /**
@@ -92,10 +99,11 @@ public class DefaultConnectingIOReactor
      *
      * @throws IOReactorException in case if a non-recoverable I/O error.
      *
-     * @since 4.2
+     * @since 5.0
      */
-    public DefaultConnectingIOReactor() throws IOReactorException {
-        this(null, null);
+    public DefaultConnectingIOReactor(
+            final IOEventHandlerFactory eventHandlerFactory) throws IOReactorException {
+        this(eventHandlerFactory, null, null);
     }
 
     @Override
@@ -121,7 +129,7 @@ public class DefaultConnectingIOReactor
         }
 
         final long currentTime = System.currentTimeMillis();
-        if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) {
+        if ((currentTime - this.lastTimeoutCheck) >= this.selectInterval) {
             this.lastTimeoutCheck = currentTime;
             final Set<SelectionKey> keys = this.selector.keys();
             processTimeouts(keys);
@@ -133,24 +141,24 @@ public class DefaultConnectingIOReactor
 
             if (key.isConnectable()) {
 
-                final SocketChannel channel = (SocketChannel) key.channel();
+                final SocketChannel socketChannel = (SocketChannel) key.channel();
                 // Get request handle
                 final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
                 final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
 
                 // Finish connection process
                 try {
-                    channel.finishConnect();
+                    socketChannel.finishConnect();
                 } catch (final IOException ex) {
                     sessionRequest.failed(ex);
                 }
                 key.cancel();
                 key.attach(null);
                 if (!sessionRequest.isCompleted()) {
-                    addChannel(new ChannelEntry(channel, sessionRequest));
+                    enqueuePendingSession(socketChannel, sessionRequest);
                 } else {
                     try {
-                        channel.close();
+                        socketChannel.close();
                     } catch (final IOException ignore) {
                     }
                 }
@@ -197,7 +205,7 @@ public class DefaultConnectingIOReactor
             "I/O reactor has been shut down");
         final SessionRequestImpl sessionRequest = new SessionRequestImpl(
                 remoteAddress, localAddress, attachment, callback);
-        sessionRequest.setConnectTimeout(this.config.getConnectTimeout());
+        sessionRequest.setConnectTimeout(this.reactorConfig.getConnectTimeout());
 
         this.requestQueue.add(sessionRequest);
         this.selector.wakeup();
@@ -239,13 +247,12 @@ public class DefaultConnectingIOReactor
 
                 if (request.getLocalAddress() != null) {
                     final Socket sock = socketChannel.socket();
-                    sock.setReuseAddress(this.config.isSoReuseAddress());
+                    sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
                     sock.bind(request.getLocalAddress());
                 }
                 final boolean connected = socketChannel.connect(request.getRemoteAddress());
                 if (connected) {
-                    final ChannelEntry entry = new ChannelEntry(socketChannel, request);
-                    addChannel(entry);
+                    enqueuePendingSession(socketChannel, request);
                     continue;
                 }
             } catch (final IOException ex) {

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java Sat Jul 16 11:27:17 2016
@@ -63,17 +63,19 @@ public class DefaultListeningIOReactor e
     /**
      * Creates an instance of DefaultListeningIOReactor with the given configuration.
      *
+     * @param eventHandlerFactory the factory to create I/O event handlers.
      * @param config I/O reactor configuration.
      * @param threadFactory the factory to create threads.
      *   Can be {@code null}.
      * @throws IOReactorException in case if a non-recoverable I/O error.
      *
-     * @since 4.2
+     * @since 5.0
      */
     public DefaultListeningIOReactor(
+            final IOEventHandlerFactory eventHandlerFactory,
             final IOReactorConfig config,
             final ThreadFactory threadFactory) throws IOReactorException {
-        super(config, threadFactory);
+        super(eventHandlerFactory, config, threadFactory);
         this.requestQueue = new ConcurrentLinkedQueue<>();
         this.endpoints = Collections.synchronizedSet(new HashSet<ListenerEndpointImpl>());
         this.pausedEndpoints = new HashSet<>();
@@ -82,25 +84,30 @@ public class DefaultListeningIOReactor e
     /**
      * Creates an instance of DefaultListeningIOReactor with the given configuration.
      *
+     * @param eventHandlerFactory the factory to create I/O event handlers.
      * @param config I/O reactor configuration.
      *   Can be {@code null}.
      * @throws IOReactorException in case if a non-recoverable I/O error.
      *
-     * @since 4.2
+     * @since 5.0
      */
-    public DefaultListeningIOReactor(final IOReactorConfig config) throws IOReactorException {
-        this(config, null);
+    public DefaultListeningIOReactor(
+            final IOEventHandlerFactory eventHandlerFactory,
+            final IOReactorConfig config) throws IOReactorException {
+        this(eventHandlerFactory, config, null);
     }
 
     /**
      * Creates an instance of DefaultListeningIOReactor with default configuration.
      *
+     * @param eventHandlerFactory the factory to create I/O event handlers.
      * @throws IOReactorException in case if a non-recoverable I/O error.
      *
-     * @since 4.2
+     * @since 5.0
      */
-    public DefaultListeningIOReactor() throws IOReactorException {
-        this(null, null);
+    public DefaultListeningIOReactor(
+            final IOEventHandlerFactory eventHandlerFactory) throws IOReactorException {
+        this(eventHandlerFactory, null, null);
     }
 
     @Override
@@ -158,8 +165,7 @@ public class DefaultListeningIOReactor e
                                     "Failure initalizing socket", ex);
                         }
                     }
-                    final ChannelEntry entry = new ChannelEntry(socketChannel);
-                    addChannel(entry);
+                    enqueuePendingSession(socketChannel, null);
                 }
             }
 
@@ -205,15 +211,15 @@ public class DefaultListeningIOReactor e
             }
             try {
                 final ServerSocket socket = serverChannel.socket();
-                socket.setReuseAddress(this.config.isSoReuseAddress());
-                if (this.config.getSoTimeout() > 0) {
-                    socket.setSoTimeout(this.config.getSoTimeout());
+                socket.setReuseAddress(this.reactorConfig.isSoReuseAddress());
+                if (this.reactorConfig.getSoTimeout() > 0) {
+                    socket.setSoTimeout(this.reactorConfig.getSoTimeout());
                 }
-                if (this.config.getRcvBufSize() > 0) {
-                    socket.setReceiveBufferSize(this.config.getRcvBufSize());
+                if (this.reactorConfig.getRcvBufSize() > 0) {
+                    socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
                 }
                 serverChannel.configureBlocking(false);
-                socket.bind(address, this.config.getBacklogSize());
+                socket.bind(address, this.reactorConfig.getBacklogSize());
             } catch (final IOException ex) {
                 closeChannel(serverChannel);
                 request.failed(ex);

Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java (from r1752926, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventDispatch.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventDispatch.java&r1=1752926&r2=1752927&rev=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventDispatch.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java Sat Jul 16 11:27:17 2016
@@ -28,21 +28,15 @@
 package org.apache.hc.core5.reactor;
 
 /**
- * IOEventDispatch interface is used by I/O reactors to notify clients of I/O
- * events pending for a particular session. All methods of this interface are
- * executed on a dispatch thread of the I/O reactor. Therefore, it is important
- * that processing that takes place in the event methods will not block the
- * dispatch thread for too long, as the I/O reactor will be unable to react to
+ * IOEventHandler interface is used by I/O reactors to handle I/O events for individual
+ * I/O sessions. All methods of this interface are executed on a single dispatch thread
+ * of the I/O reactor. Therefore, it is important that event processing does not not block
+ * the I/O dispatch thread for too long, thus making the I/O reactor unable to react to
  * other events.
  *
- * @since 4.0
+ * @since 5.0
  */
-public interface IOEventDispatch {
-
-    /**
-     * Attribute name of an object that represents a non-blocking connection.
-     */
-    public static final String CONNECTION_KEY = "http.connection";
+public interface IOEventHandler {
 
     /**
      * Triggered after the given session has been just created.

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandlerFactory.java (from r1752926, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionClosedCallback.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandlerFactory.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandlerFactory.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionClosedCallback.java&r1=1752926&r2=1752927&rev=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionClosedCallback.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandlerFactory.java Sat Jul 16 11:27:17 2016
@@ -28,12 +28,12 @@
 package org.apache.hc.core5.reactor;
 
 /**
- * Session callback interface used internally by I/O reactor implementations.
+ * Factory interface to create {@link IOEventHandler} instances for the given connected endpoints.
  *
- * @since 4.0
+ * @since 5.0
  */
-public interface SessionClosedCallback {
+public interface IOEventHandlerFactory {
 
-    void sessionClosed(IOSession session);
+    IOEventHandler createHandler(IOSession ioSession);
 
 }

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandlerFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandlerFactory.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandlerFactory.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactor.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactor.java Sat Jul 16 11:27:17 2016
@@ -56,13 +56,12 @@ public interface IOReactor {
     IOReactorStatus getStatus();
 
     /**
-     * Starts the reactor and initiates the dispatch of I/O event notifications
-     * to the given {@link IOEventDispatch}.
+     * Starts the reactor and initiates the dispatch of I/O event to I/O session
+     * event handlers.
      *
-     * @param eventDispatch the I/O event dispatch.
      * @throws IOException in case of an I/O error.
      */
-    void execute(IOEventDispatch eventDispatch)
+    void execute()
         throws IOException;
 
     /**

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java Sat Jul 16 11:27:17 2016
@@ -45,7 +45,6 @@ public final class IOReactorConfig {
 
     private final long selectInterval;
     private final long shutdownGracePeriod;
-    private final boolean interestOpQueued;
     private final int ioThreadCount;
     private final int soTimeout;
     private final boolean soReuseAddress;
@@ -60,7 +59,6 @@ public final class IOReactorConfig {
     IOReactorConfig(
             final long selectInterval,
             final long shutdownGracePeriod,
-            final boolean interestOpQueued,
             final int ioThreadCount,
             final int soTimeout,
             final boolean soReuseAddress,
@@ -74,7 +72,6 @@ public final class IOReactorConfig {
         super();
         this.selectInterval = selectInterval;
         this.shutdownGracePeriod = shutdownGracePeriod;
-        this.interestOpQueued = interestOpQueued;
         this.ioThreadCount = ioThreadCount;
         this.soTimeout = soTimeout;
         this.soReuseAddress = soReuseAddress;
@@ -108,21 +105,6 @@ public final class IOReactorConfig {
     }
 
     /**
-     * Determines whether or not I/O interest operations are to be queued and executed
-     * asynchronously by the I/O reactor thread or to be applied to the underlying
-     * {@link java.nio.channels.SelectionKey} immediately.
-     * <p>
-     * Default: {@code false}
-     *
-     * @see java.nio.channels.SelectionKey
-     * @see java.nio.channels.SelectionKey#interestOps()
-     * @see java.nio.channels.SelectionKey#interestOps(int)
-     */
-    public boolean isInterestOpQueued() {
-        return this.interestOpQueued;
-    }
-
-    /**
      * Determines the number of I/O dispatch threads to be used by the I/O reactor.
      * <p>
      * Default: {@code 2}
@@ -243,7 +225,6 @@ public final class IOReactorConfig {
         return new Builder()
             .setSelectInterval(config.getSelectInterval())
             .setShutdownGracePeriod(config.getShutdownGracePeriod())
-            .setInterestOpQueued(config.isInterestOpQueued())
             .setIoThreadCount(config.getIoThreadCount())
             .setSoTimeout(config.getSoTimeout())
             .setSoReuseAddress(config.isSoReuseAddress())
@@ -260,7 +241,6 @@ public final class IOReactorConfig {
 
         private long selectInterval;
         private long shutdownGracePeriod;
-        private boolean interestOpQueued;
         private int ioThreadCount;
         private int soTimeout;
         private boolean soReuseAddress;
@@ -275,7 +255,6 @@ public final class IOReactorConfig {
         Builder() {
             this.selectInterval = 1000;
             this.shutdownGracePeriod = 500;
-            this.interestOpQueued = false;
             this.ioThreadCount = AVAIL_PROCS;
             this.soTimeout = 0;
             this.soReuseAddress = false;
@@ -298,11 +277,6 @@ public final class IOReactorConfig {
             return this;
         }
 
-        public Builder setInterestOpQueued(final boolean interestOpQueued) {
-            this.interestOpQueued = interestOpQueued;
-            return this;
-        }
-
         public Builder setIoThreadCount(final int ioThreadCount) {
             this.ioThreadCount = ioThreadCount;
             return this;
@@ -355,7 +329,7 @@ public final class IOReactorConfig {
 
         public IOReactorConfig build() {
             return new IOReactorConfig(
-                    selectInterval, shutdownGracePeriod, interestOpQueued, ioThreadCount,
+                    selectInterval, shutdownGracePeriod, ioThreadCount,
                     soTimeout, soReuseAddress, soLinger, soKeepAlive, tcpNoDelay,
                     connectTimeout, sndBufSize, rcvBufSize, backlogSize);
         }
@@ -367,7 +341,6 @@ public final class IOReactorConfig {
         final StringBuilder builder = new StringBuilder();
         builder.append("[selectInterval=").append(this.selectInterval)
                 .append(", shutdownGracePeriod=").append(this.shutdownGracePeriod)
-                .append(", interestOpQueued=").append(this.interestOpQueued)
                 .append(", ioThreadCount=").append(this.ioThreadCount)
                 .append(", soTimeout=").append(this.soTimeout)
                 .append(", soReuseAddress=").append(this.soReuseAddress)

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java Sat Jul 16 11:27:17 2016
@@ -52,11 +52,25 @@ public interface IOSession {
      * Name of the context attribute key, which can be used to obtain the
      * session attachment object.
      */
-    public static final String ATTACHMENT_KEY = "http.session.attachment";
+    String ATTACHMENT_KEY = "http.session.attachment";
 
-    public static final int ACTIVE       = 0;
-    public static final int CLOSING      = 1;
-    public static final int CLOSED       = Integer.MAX_VALUE;
+    int ACTIVE       = 0;
+    int CLOSING      = 1;
+    int CLOSED       = Integer.MAX_VALUE;
+
+    /**
+     * Returns the event handler for this session.
+     *
+     * @since 5.0
+     */
+    IOEventHandler getHandler();
+
+    /**
+     * Sets the event handler for this session.
+     *
+     * @since 5.0
+     */
+    void setHandler(IOEventHandler handler);
 
     /**
      * Returns the underlying I/O channel associated with this session.

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java Sat Jul 16 11:27:17 2016
@@ -35,6 +35,7 @@ import java.nio.channels.ByteChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -48,17 +49,18 @@ import org.apache.hc.core5.util.Args;
  * @since 4.0
  */
 @Contract(threading = ThreadingBehavior.SAFE)
-public class IOSessionImpl implements IOSession, SocketAccessor {
+class IOSessionImpl implements IOSession, SocketAccessor {
 
     private final SelectionKey key;
-    private final ByteChannel channel;
+    private final SocketChannel channel;
     private final Map<String, Object> attributes;
-    private final SessionClosedCallback sessionClosedCallback;
+    private final Queue<IOSession> closedSessions;
 
     private final long startedTime;
     private final AtomicInteger status;
     private final AtomicInteger eventMask;
 
+    private volatile IOEventHandler eventHandler;
     private volatile SessionBufferStatus bufferStatus;
     private volatile int socketTimeout;
 
@@ -70,18 +72,19 @@ public class IOSessionImpl implements IO
      * Creates new instance of IOSessionImpl.
      *
      * @param key the selection key.
-     * @param sessionClosedCallback session closed callback.
+     * @param socketChannel the socket channel
+     * @param closedSessions the queue containing closed sessions
      *
      * @since 4.1
      */
     public IOSessionImpl(
             final SelectionKey key,
-            final SessionClosedCallback sessionClosedCallback) {
+            final SocketChannel socketChannel,
+            final Queue<IOSession> closedSessions) {
         super();
-        Args.notNull(key, "Selection key");
-        this.key = key;
-        this.channel = (ByteChannel) this.key.channel();
-        this.sessionClosedCallback = sessionClosedCallback;
+        this.key = Args.notNull(key, "Selection key");
+        this.channel = Args.notNull(socketChannel, "Socket channel");
+        this.closedSessions = closedSessions;
         this.attributes = new ConcurrentHashMap<>();
         this.socketTimeout = 0;
         this.eventMask = new AtomicInteger(key.interestOps());
@@ -94,24 +97,28 @@ public class IOSessionImpl implements IO
     }
 
     @Override
+    public IOEventHandler getHandler() {
+        return this.eventHandler;
+    }
+
+    @Override
+    public void setHandler(final IOEventHandler handler) {
+        this.eventHandler = handler;
+    }
+
+    @Override
     public ByteChannel channel() {
         return this.channel;
     }
 
     @Override
     public SocketAddress getLocalAddress() {
-        if (this.channel instanceof SocketChannel) {
-            return ((SocketChannel)this.channel).socket().getLocalSocketAddress();
-        }
-        return null;
+        return this.channel.socket().getLocalSocketAddress();
     }
 
     @Override
     public SocketAddress getRemoteAddress() {
-        if (this.channel instanceof SocketChannel) {
-            return ((SocketChannel)this.channel).socket().getRemoteSocketAddress();
-        }
-        return null;
+        return this.channel.socket().getRemoteSocketAddress();
     }
 
     @Override
@@ -180,14 +187,15 @@ public class IOSessionImpl implements IO
     @Override
     public void close() {
         if (this.status.compareAndSet(ACTIVE, CLOSED)) {
+            if (this.closedSessions != null) {
+                this.closedSessions.add(this);
+            }
             this.key.cancel();
+            this.key.attach(null);
             try {
                 this.key.channel().close();
             } catch (final IOException ignore) {
             }
-            if (this.sessionClosedCallback != null) {
-                this.sessionClosedCallback.sessionClosed(this);
-            }
             if (this.key.selector().isOpen()) {
                 this.key.selector().wakeup();
             }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java Sat Jul 16 11:27:17 2016
@@ -47,6 +47,7 @@ import org.apache.hc.core5.annotation.Co
 import org.apache.hc.core5.annotation.ThreadingBehavior;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.reactor.EventMask;
+import org.apache.hc.core5.reactor.IOEventHandler;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.reactor.SessionBufferStatus;
 import org.apache.hc.core5.reactor.SocketAccessor;
@@ -730,6 +731,16 @@ public class SSLIOSession implements IOS
         this.session.setAttribute(name, obj);
     }
 
+    @Override
+    public IOEventHandler getHandler() {
+        return this.session.getHandler();
+    }
+
+    @Override
+    public void setHandler(final IOEventHandler handler) {
+        this.session.setHandler(handler);
+    }
+
     private static void formatOps(final StringBuilder buffer, final int ops) {
         if ((ops & SelectionKey.OP_READ) > 0) {
             buffer.append('r');

Copied: httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestAsyncHttp.java (from r1752926, httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncHandlers.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestAsyncHttp.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestAsyncHttp.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncHandlers.java&r1=1752926&r2=1752927&rev=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncHandlers.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestAsyncHttp.java Sat Jul 16 11:27:17 2016
@@ -66,12 +66,14 @@ import org.apache.hc.core5.http.nio.enti
 import org.apache.hc.core5.http.protocol.BasicHttpContext;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.http.protocol.ImmutableHttpProcessor;
-import org.apache.hc.core5.http.protocol.RequestTargetHost;
 import org.apache.hc.core5.http.protocol.RequestConnControl;
 import org.apache.hc.core5.http.protocol.RequestContent;
 import org.apache.hc.core5.http.protocol.RequestExpectContinue;
+import org.apache.hc.core5.http.protocol.RequestTargetHost;
 import org.apache.hc.core5.http.protocol.RequestUserAgent;
+import org.apache.hc.core5.http.testserver.nio.HttpClientNio;
 import org.apache.hc.core5.http.testserver.nio.HttpCoreNIOTestBase;
+import org.apache.hc.core5.http.testserver.nio.HttpServerNio;
 import org.apache.hc.core5.reactor.ListenerEndpoint;
 import org.junit.After;
 import org.junit.Assert;
@@ -84,7 +86,7 @@ import org.junit.runners.Parameterized;
  * HttpCore NIO integration tests for async handlers.
  */
 @RunWith(Parameterized.class)
-public class TestHttpAsyncHandlers extends HttpCoreNIOTestBase {
+public class TestAsyncHttp extends HttpCoreNIOTestBase {
 
     @Parameterized.Parameters(name = "{0}")
     public static Collection<Object[]> protocols() {
@@ -94,7 +96,7 @@ public class TestHttpAsyncHandlers exten
         });
     }
 
-    public TestHttpAsyncHandlers(final ProtocolScheme scheme) {
+    public TestAsyncHttp(final ProtocolScheme scheme) {
         super(scheme);
     }
 
@@ -337,11 +339,16 @@ public class TestHttpAsyncHandlers exten
     public void testHttpPostNoContentLength() throws Exception {
         this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
 
-        this.client.setHttpProcessor(new ImmutableHttpProcessor(
-                new RequestTargetHost(),
-                new RequestConnControl(),
-                new RequestUserAgent(),
-                new RequestExpectContinue()));
+        // Rewire client
+        this.client = new HttpClientNio(
+                new ImmutableHttpProcessor(
+                        new RequestTargetHost(),
+                        new RequestConnControl(),
+                        new RequestUserAgent(),
+                        new RequestExpectContinue()),
+                createHttpAsyncRequestExecutor(),
+                createClientConnectionFactory(),
+                createClientIOReactorConfig());
 
         final HttpHost target = start();
 
@@ -366,21 +373,26 @@ public class TestHttpAsyncHandlers exten
     public void testHttpPostIdentity() throws Exception {
         this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
 
-        this.client.setHttpProcessor(new ImmutableHttpProcessor(
-                new HttpRequestInterceptor() {
-
-                    @Override
-                    public void process(
-                            final HttpRequest request,
-                            final HttpContext context) throws HttpException, IOException {
-                        request.addHeader(HttpHeaders.TRANSFER_ENCODING, "identity");
-                    }
-
-                },
-                new RequestTargetHost(),
-                new RequestConnControl(),
-                new RequestUserAgent(),
-                new RequestExpectContinue()));
+        // Rewire client
+        this.client = new HttpClientNio(
+                new ImmutableHttpProcessor(
+                        new HttpRequestInterceptor() {
+
+                            @Override
+                            public void process(
+                                    final HttpRequest request,
+                                    final HttpContext context) throws HttpException, IOException {
+                                request.addHeader(HttpHeaders.TRANSFER_ENCODING, "identity");
+                            }
+
+                        },
+                        new RequestTargetHost(),
+                        new RequestConnControl(),
+                        new RequestUserAgent(),
+                        new RequestExpectContinue()),
+                createHttpAsyncRequestExecutor(),
+                createClientConnectionFactory(),
+                createClientIOReactorConfig());
 
         final HttpHost target = start();
 
@@ -436,8 +448,7 @@ public class TestHttpAsyncHandlers exten
 
     @Test
     public void testHttpPostsWithExpectationVerification() throws Exception {
-        this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
-        this.server.setExpectationVerifier(new HttpAsyncExpectationVerifier() {
+        final HttpAsyncExpectationVerifier expectationVerifier = new HttpAsyncExpectationVerifier() {
 
             @Override
             public void verify(
@@ -454,7 +465,15 @@ public class TestHttpAsyncHandlers exten
                 }
             }
 
-        });
+        };
+        // Rewire server
+        this.server = new HttpServerNio(
+                createServerHttpProcessor(),
+                createServerConnectionFactory(),
+                expectationVerifier,
+                createServerIOReactorConfig());
+
+        this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
 
         final HttpHost target = start();
 
@@ -559,8 +578,7 @@ public class TestHttpAsyncHandlers exten
 
     @Test
     public void testHttpPostsWithExpectationVerificationDelayedResponse() throws Exception {
-        this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
-        this.server.setExpectationVerifier(new HttpAsyncExpectationVerifier() {
+        final HttpAsyncExpectationVerifier expectationVerifier = new HttpAsyncExpectationVerifier() {
 
             @Override
             public void verify(
@@ -588,7 +606,15 @@ public class TestHttpAsyncHandlers exten
                 }.start();
             }
 
-        });
+        };
+        // Rewire server
+        this.server = new HttpServerNio(
+                createServerHttpProcessor(),
+                createServerConnectionFactory(),
+                expectationVerifier,
+                createServerIOReactorConfig());
+        this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
+
         final HttpHost target = start();
 
         final BasicHttpRequest request1 = new BasicHttpRequest(
@@ -625,13 +651,12 @@ public class TestHttpAsyncHandlers exten
 
     @Test
     public void testHttpPostsFailedExpectionContentLengthNonReusableConnection() throws Exception {
-        this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
-        this.server.setExpectationVerifier(new HttpAsyncExpectationVerifier() {
+        final HttpAsyncExpectationVerifier expectationVerifier = new HttpAsyncExpectationVerifier() {
 
             @Override
             public void verify(
-                    final HttpAsyncExchange httpexchange,
-                    final HttpContext context) throws HttpException {
+            final HttpAsyncExchange httpexchange,
+            final HttpContext context) throws HttpException {
                 new Thread() {
                     @Override
                     public void run() {
@@ -648,7 +673,14 @@ public class TestHttpAsyncHandlers exten
                 }.start();
             }
 
-        });
+        };
+        // Rewire server
+        this.server = new HttpServerNio(
+                createServerHttpProcessor(),
+                createServerConnectionFactory(),
+                expectationVerifier,
+                createServerIOReactorConfig());
+        this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
 
         this.client.setMaxPerRoute(1);
         this.client.setMaxTotal(1);
@@ -677,8 +709,7 @@ public class TestHttpAsyncHandlers exten
 
     @Test
     public void testHttpPostsFailedExpectionConnectionReuse() throws Exception {
-        this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
-        this.server.setExpectationVerifier(new HttpAsyncExpectationVerifier() {
+        final HttpAsyncExpectationVerifier expectationVerifier = new HttpAsyncExpectationVerifier() {
 
             @Override
             public void verify(
@@ -700,7 +731,14 @@ public class TestHttpAsyncHandlers exten
                 }.start();
             }
 
-        });
+        };
+        // Rewire server
+        this.server = new HttpServerNio(
+                createServerHttpProcessor(),
+                createServerConnectionFactory(),
+                expectationVerifier,
+                createServerIOReactorConfig());
+        this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
 
         this.client.setMaxPerRoute(1);
         this.client.setMaxTotal(1);
@@ -731,8 +769,7 @@ public class TestHttpAsyncHandlers exten
 
     @Test
     public void testHttpPostsFailedExpectionConnectionReuseLateResponseBody() throws Exception {
-        this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
-        this.server.setExpectationVerifier(new HttpAsyncExpectationVerifier() {
+        final HttpAsyncExpectationVerifier expectationVerifier = new HttpAsyncExpectationVerifier() {
 
             @Override
             public void verify(
@@ -768,7 +805,14 @@ public class TestHttpAsyncHandlers exten
                 }.start();
             }
 
-        });
+        };
+        // Rewire server
+        this.server = new HttpServerNio(
+                createServerHttpProcessor(),
+                createServerConnectionFactory(),
+                expectationVerifier,
+                createServerIOReactorConfig());
+        this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
 
         this.client.setMaxPerRoute(1);
         this.client.setMaxTotal(1);
@@ -919,7 +963,14 @@ public class TestHttpAsyncHandlers exten
             }
 
         }));
-        this.client.setHttpProcessor(new ImmutableHttpProcessor(new RequestContent(), new RequestConnControl()));
+
+        // Rewire client
+        this.client = new HttpClientNio(
+                new ImmutableHttpProcessor(new RequestContent(), new RequestConnControl()),
+                createHttpAsyncRequestExecutor(),
+                createClientConnectionFactory(),
+                createClientIOReactorConfig());
+
         final HttpHost target = start();
 
         this.client.setMaxPerRoute(3);

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestAsyncHttp.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestAsyncHttp.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestAsyncHttp.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestClientOutOfSequenceResponse.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestClientOutOfSequenceResponse.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestClientOutOfSequenceResponse.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestClientOutOfSequenceResponse.java Sat Jul 16 11:27:17 2016
@@ -37,10 +37,17 @@ import org.apache.hc.core5.http.HttpExce
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpResponse;
-import org.apache.hc.core5.http.config.ConnectionConfig;
+import org.apache.hc.core5.http.impl.nio.DefaultNHttpClientConnectionFactory;
+import org.apache.hc.core5.http.impl.nio.HttpAsyncRequestExecutor;
 import org.apache.hc.core5.http.message.BasicHttpRequest;
-import org.apache.hc.core5.http.pool.nio.BasicNIOConnFactory;
+import org.apache.hc.core5.http.protocol.HttpProcessor;
+import org.apache.hc.core5.http.protocol.ImmutableHttpProcessor;
+import org.apache.hc.core5.http.protocol.RequestConnControl;
+import org.apache.hc.core5.http.protocol.RequestContent;
+import org.apache.hc.core5.http.protocol.RequestTargetHost;
+import org.apache.hc.core5.http.protocol.RequestUserAgent;
 import org.apache.hc.core5.http.testserver.nio.HttpClientNio;
+import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -57,7 +64,22 @@ public class TestClientOutOfSequenceResp
     @Before
     public void setup() throws Exception {
         server = new ServerSocket(0, 1);
-        client = new HttpClientNio(new BasicNIOConnFactory(ConnectionConfig.DEFAULT));
+        final HttpProcessor httpProcessor = new ImmutableHttpProcessor(
+                new RequestContent(),
+                new RequestTargetHost(),
+                new RequestConnControl(),
+                new RequestUserAgent("TEST-CLIENT/1.1"));
+
+        final IOReactorConfig reactorConfig = IOReactorConfig.custom()
+                .setConnectTimeout(5000)
+                .setSoTimeout(5000)
+                .build();
+
+        client = new HttpClientNio(
+                httpProcessor,
+                new HttpAsyncRequestExecutor(),
+                DefaultNHttpClientConnectionFactory.INSTANCE,
+                reactorConfig);
     }
 
     @After

Modified: httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestCustomSSL.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestCustomSSL.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestCustomSSL.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestCustomSSL.java Sat Jul 16 11:27:17 2016
@@ -46,12 +46,10 @@ import org.apache.hc.core5.http.impl.nio
 import org.apache.hc.core5.http.io.HttpRequestHandler;
 import org.apache.hc.core5.http.message.BasicHttpRequest;
 import org.apache.hc.core5.http.nio.NHttpConnection;
-import org.apache.hc.core5.http.pool.nio.BasicNIOConnFactory;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.http.protocol.HttpCoreContext;
 import org.apache.hc.core5.http.testserver.nio.ClientConnectionFactory;
-import org.apache.hc.core5.http.testserver.nio.HttpClientNio;
-import org.apache.hc.core5.http.testserver.nio.HttpServerNio;
+import org.apache.hc.core5.http.testserver.nio.HttpCoreNIOTestBase;
 import org.apache.hc.core5.http.testserver.nio.ServerConnectionFactory;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.reactor.ListenerEndpoint;
@@ -59,31 +57,17 @@ import org.apache.hc.core5.reactor.ssl.S
 import org.apache.hc.core5.ssl.SSLContextBuilder;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
-public class TestCustomSSL {
+public class TestCustomSSL extends HttpCoreNIOTestBase {
 
-    protected HttpServerNio server;
-    protected HttpClientNio client;
-
-    @After
-    public void shutDownClient() throws Exception {
-        if (this.client != null) {
-            this.client.shutdown();
-            this.client = null;
-        }
-    }
-
-    @After
-    public void shutDownServer() throws Exception {
-        if (this.server != null) {
-            this.server.shutdown();
-            this.server = null;
-        }
+    public TestCustomSSL() {
+        super(ProtocolScheme.https);
     }
 
-    @Test
-    public void testCustomSSLContext() throws Exception {
+    @Override
+    protected ServerConnectionFactory createServerConnectionFactory() throws Exception {
         final SSLSetupHandler sslSetupHandler = new SSLSetupHandler() {
 
             @Override
@@ -99,7 +83,40 @@ public class TestCustomSSL {
             }
 
         };
+        final URL keyStoreURL = getClass().getResource("/test.keystore");
+        final String storePassword = "nopassword";
+        final SSLContext serverSSLContext = SSLContextBuilder.create()
+                .loadTrustMaterial(keyStoreURL, storePassword.toCharArray())
+                .loadKeyMaterial(keyStoreURL, storePassword.toCharArray(), storePassword.toCharArray())
+                .build();
+        return new ServerConnectionFactory(serverSSLContext, sslSetupHandler);
+    }
+
+    @Override
+    protected ClientConnectionFactory createClientConnectionFactory() throws Exception {
+        final URL keyStoreURL = getClass().getResource("/test.keystore");
+        final String storePassword = "nopassword";
+        final SSLContext clientSSLContext = SSLContextBuilder.create()
+                .loadTrustMaterial(keyStoreURL, storePassword.toCharArray())
+                .build();
+
+        return new ClientConnectionFactory(clientSSLContext, null);
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        initServer();
+        initClient();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        shutDownClient();
+        shutDownServer();
+    }
 
+    @Test
+    public void testCustomSSLContext() throws Exception {
         final HttpRequestHandler requestHandler = new HttpRequestHandler() {
 
             @Override
@@ -115,24 +132,6 @@ public class TestCustomSSL {
             }
 
         };
-
-        final URL keyStoreURL = getClass().getResource("/test.keystore");
-        final String storePassword = "nopassword";
-        final SSLContext serverSSLContext = SSLContextBuilder.create()
-                .loadTrustMaterial(keyStoreURL, storePassword.toCharArray())
-                .loadKeyMaterial(keyStoreURL, storePassword.toCharArray(), storePassword.toCharArray())
-                .build();
-        this.server = new HttpServerNio();
-        this.server.setConnectionFactory(new ServerConnectionFactory(serverSSLContext, sslSetupHandler));
-        this.server.setTimeout(5000);
-
-        final SSLContext clientSSLContext = SSLContextBuilder.create()
-                .loadTrustMaterial(keyStoreURL, storePassword.toCharArray())
-                .build();
-
-        this.client = new HttpClientNio(new BasicNIOConnFactory(new ClientConnectionFactory(clientSSLContext), null));
-        this.client.setTimeout(5000);
-
         this.server.registerHandler("*", new BasicAsyncRequestHandler(requestHandler));
 
         this.server.start();

Modified: httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestDefaultListeningIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestDefaultListeningIOReactor.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestDefaultListeningIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestDefaultListeningIOReactor.java Sat Jul 16 11:27:17 2016
@@ -34,9 +34,8 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hc.core5.http.HttpResponseInterceptor;
 import org.apache.hc.core5.http.config.ConnectionConfig;
-import org.apache.hc.core5.http.impl.nio.DefaultHttpServerIODispatch;
+import org.apache.hc.core5.http.impl.nio.DefaultHttpServerIOEventHandlerFactory;
 import org.apache.hc.core5.http.impl.nio.HttpAsyncService;
 import org.apache.hc.core5.http.impl.nio.UriHttpAsyncRequestHandlerMapper;
 import org.apache.hc.core5.http.protocol.HttpProcessor;
@@ -46,13 +45,14 @@ import org.apache.hc.core5.http.protocol
 import org.apache.hc.core5.http.protocol.ResponseDate;
 import org.apache.hc.core5.http.protocol.ResponseServer;
 import org.apache.hc.core5.reactor.DefaultListeningIOReactor;
-import org.apache.hc.core5.reactor.IOEventDispatch;
+import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.reactor.IOReactorExceptionHandler;
 import org.apache.hc.core5.reactor.IOReactorStatus;
 import org.apache.hc.core5.reactor.ListenerEndpoint;
-import org.apache.hc.core5.reactor.ListeningIOReactor;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 /**
@@ -60,30 +60,43 @@ import org.junit.Test;
  */
 public class TestDefaultListeningIOReactor {
 
-    private static IOEventDispatch createIOEventDispatch() {
-        final HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {
-                new ResponseDate(),
+    protected DefaultListeningIOReactor ioreactor;
+
+    @Before
+    public void setup() throws Exception {
+        final HttpProcessor httpproc = new ImmutableHttpProcessor(new ResponseDate(),
                 new ResponseServer(),
                 new ResponseContent(),
-                new ResponseConnControl()
-        });
-        final HttpAsyncService serviceHandler = new HttpAsyncService(httpproc,
+                new ResponseConnControl());
+        final HttpAsyncService serviceHandler = new HttpAsyncService(
+                httpproc,
                 new UriHttpAsyncRequestHandlerMapper());
-        return new DefaultHttpServerIODispatch(serviceHandler, ConnectionConfig.DEFAULT);
+        final IOEventHandlerFactory eventHandlerFactory = new DefaultHttpServerIOEventHandlerFactory(
+                serviceHandler,
+                ConnectionConfig.DEFAULT);
+
+        final IOReactorConfig reactorConfig = IOReactorConfig.custom()
+                .setIoThreadCount(1)
+                .build();
+        this.ioreactor = new DefaultListeningIOReactor(eventHandlerFactory, reactorConfig);
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        if (this.ioreactor != null) {
+            this.ioreactor.shutdown();
+        }
     }
 
     @Test
     public void testEndpointUpAndDown() throws Exception {
-        final IOEventDispatch eventDispatch = createIOEventDispatch();
-        final IOReactorConfig config = IOReactorConfig.custom().setIoThreadCount(1).build();
-        final ListeningIOReactor ioreactor = new DefaultListeningIOReactor(config);
 
         final Thread t = new Thread(new Runnable() {
 
             @Override
             public void run() {
                 try {
-                    ioreactor.execute(eventDispatch);
+                    ioreactor.execute();
                 } catch (final IOException ex) {
                 }
             }
@@ -125,10 +138,6 @@ public class TestDefaultListeningIOReact
 
     @Test
     public void testEndpointAlreadyBoundFatal() throws Exception {
-        final IOEventDispatch eventDispatch = createIOEventDispatch();
-        final IOReactorConfig config = IOReactorConfig.custom().setIoThreadCount(1).build();
-        final ListeningIOReactor ioreactor = new DefaultListeningIOReactor(config);
-
         final CountDownLatch latch = new CountDownLatch(1);
 
         final Thread t = new Thread(new Runnable() {
@@ -136,7 +145,7 @@ public class TestDefaultListeningIOReact
             @Override
             public void run() {
                 try {
-                    ioreactor.execute(eventDispatch);
+                    ioreactor.execute();
                     Assert.fail("IOException should have been thrown");
                 } catch (final IOException ex) {
                     latch.countDown();
@@ -171,10 +180,6 @@ public class TestDefaultListeningIOReact
 
     @Test
     public void testEndpointAlreadyBoundNonFatal() throws Exception {
-        final IOEventDispatch eventDispatch = createIOEventDispatch();
-        final IOReactorConfig config = IOReactorConfig.custom().setIoThreadCount(1).build();
-        final DefaultListeningIOReactor ioreactor = new DefaultListeningIOReactor(config);
-
         ioreactor.setExceptionHandler(new IOReactorExceptionHandler() {
 
             @Override
@@ -194,7 +199,7 @@ public class TestDefaultListeningIOReact
             @Override
             public void run() {
                 try {
-                    ioreactor.execute(eventDispatch);
+                    ioreactor.execute();
                 } catch (final IOException ex) {
                 }
             }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncHandlersBrokenExpectContinue.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncHandlersBrokenExpectContinue.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncHandlersBrokenExpectContinue.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncHandlersBrokenExpectContinue.java Sat Jul 16 11:27:17 2016
@@ -39,6 +39,7 @@ import org.apache.hc.core5.http.HttpHost
 import org.apache.hc.core5.http.HttpResponse;
 import org.apache.hc.core5.http.HttpStatus;
 import org.apache.hc.core5.http.impl.nio.BasicAsyncRequestHandler;
+import org.apache.hc.core5.http.impl.nio.HttpAsyncRequestExecutor;
 import org.apache.hc.core5.http.message.BasicHttpRequest;
 import org.apache.hc.core5.http.nio.HttpAsyncExchange;
 import org.apache.hc.core5.http.nio.HttpAsyncExpectationVerifier;
@@ -68,6 +69,31 @@ public class TestHttpAsyncHandlersBroken
         });
     }
 
+    @Override
+    protected HttpAsyncExpectationVerifier createExpectationVerifier() {
+        return new HttpAsyncExpectationVerifier() {
+
+            @Override
+            public void verify(
+                    final HttpAsyncExchange httpexchange,
+                    final HttpContext context) throws HttpException {
+                try {
+                    Thread.sleep(1200);
+                } catch (final InterruptedException ignore) {
+                }
+
+                httpexchange.submitResponse();
+            }
+
+        };
+    }
+
+    @Override
+    protected HttpAsyncRequestExecutor createHttpAsyncRequestExecutor() throws Exception {
+        // Do not wait for continue
+        return new HttpAsyncRequestExecutor(1);
+    }
+
     public TestHttpAsyncHandlersBrokenExpectContinue(final ProtocolScheme scheme) {
         super(scheme);
     }
@@ -110,26 +136,9 @@ public class TestHttpAsyncHandlersBroken
     @Test
     public void testHttpPostsWithExpectationVerificationSendWithoutAck() throws Exception {
         this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
-        this.server.setExpectationVerifier(new HttpAsyncExpectationVerifier() {
-
-            @Override
-            public void verify(
-                    final HttpAsyncExchange httpexchange,
-                    final HttpContext context) throws HttpException {
-                try {
-                    Thread.sleep(1200);
-                } catch (final InterruptedException ignore) {
-                }
-
-                httpexchange.submitResponse();
-            }
-
-        });
 
         this.client.setMaxPerRoute(1);
         this.client.setMaxTotal(1);
-        // Do not wait for continue
-        this.client.setWaitForContinue(1);
 
         final HttpHost target = start();
 

Modified: httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncHandlersPipelining.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncHandlersPipelining.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncHandlersPipelining.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncHandlersPipelining.java Sat Jul 16 11:27:17 2016
@@ -64,7 +64,7 @@ import org.apache.hc.core5.http.nio.Http
 import org.apache.hc.core5.http.nio.entity.NStringEntity;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.http.protocol.HttpProcessor;
-import org.apache.hc.core5.http.protocol.ImmutableHttpProcessor;
+import org.apache.hc.core5.http.protocol.HttpProcessorBuilder;
 import org.apache.hc.core5.http.protocol.RequestConnControl;
 import org.apache.hc.core5.http.protocol.RequestContent;
 import org.apache.hc.core5.http.protocol.RequestTargetHost;
@@ -92,16 +92,20 @@ public class TestHttpAsyncHandlersPipeli
         });
     }
 
+    @Override
+    protected HttpProcessor createClientHttpProcessor() {
+        return HttpProcessorBuilder.create().addAll(
+                new RequestContent(),
+                new RequestTargetHost(),
+                new RequestConnControl(),
+                new RequestUserAgent("TEST-CLIENT/1.1"))
+                .build();
+    }
+
     public TestHttpAsyncHandlersPipelining(final ProtocolScheme scheme) {
         super(scheme);
     }
 
-    public static final HttpProcessor DEFAULT_HTTP_PROC = new ImmutableHttpProcessor(
-            new RequestContent(),
-            new RequestTargetHost(),
-            new RequestConnControl(),
-            new RequestUserAgent("TEST-CLIENT/1.1"));
-
     @Before
     public void setUp() throws Exception {
         initServer();
@@ -116,7 +120,6 @@ public class TestHttpAsyncHandlersPipeli
 
     private HttpHost start() throws Exception {
         this.server.start();
-        this.client.setHttpProcessor(DEFAULT_HTTP_PROC);
         this.client.start();
 
         final ListenerEndpoint endpoint = this.server.getListenerEndpoint();

Modified: httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncTimeout.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncTimeout.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncTimeout.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncTimeout.java Sat Jul 16 11:27:17 2016
@@ -45,6 +45,7 @@ import org.apache.hc.core5.http.message.
 import org.apache.hc.core5.http.protocol.BasicHttpContext;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.http.testserver.nio.HttpCoreNIOTestBase;
+import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -69,6 +70,15 @@ public class TestHttpAsyncTimeout extend
 
     private ServerSocket serverSocket;
 
+    @Override
+    protected IOReactorConfig createClientIOReactorConfig() {
+        return IOReactorConfig.custom()
+                .setIoThreadCount(1)
+                .setConnectTimeout(1000)
+                .setSoTimeout(1000)
+                .build();
+    }
+
     @Before
     public void setUp() throws Exception {
         initClient();
@@ -120,13 +130,12 @@ public class TestHttpAsyncTimeout extend
 
         final HttpRequest request = new BasicHttpRequest("GET", "/");
         final HttpContext context = new BasicHttpContext();
-        this.client.setTimeout(1000);
         this.client.execute(
                 new BasicAsyncRequestProducer(target, request),
                 new BasicAsyncResponseConsumer(),
                 context, callback);
         try (final Socket accepted = serverSocket.accept()) {
-            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+            Assert.assertTrue(latch.await(10000, TimeUnit.SECONDS));
         }
     }
 

Modified: httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestServerSidePipelining.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestServerSidePipelining.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestServerSidePipelining.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestServerSidePipelining.java Sat Jul 16 11:27:17 2016
@@ -45,11 +45,11 @@ import org.apache.hc.core5.http.HttpResp
 import org.apache.hc.core5.http.entity.ContentType;
 import org.apache.hc.core5.http.entity.EntityUtils;
 import org.apache.hc.core5.http.impl.nio.BasicAsyncRequestHandler;
-import org.apache.hc.core5.http.impl.nio.UriHttpAsyncRequestHandlerMapper;
 import org.apache.hc.core5.http.io.HttpRequestHandler;
 import org.apache.hc.core5.http.nio.entity.NByteArrayEntity;
 import org.apache.hc.core5.http.nio.entity.NStringEntity;
 import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpProcessor;
 import org.apache.hc.core5.http.protocol.ImmutableHttpProcessor;
 import org.apache.hc.core5.http.protocol.ResponseConnControl;
 import org.apache.hc.core5.http.protocol.ResponseContent;
@@ -66,12 +66,17 @@ import org.junit.Test;
  */
 public class TestServerSidePipelining extends HttpCoreNIOTestBase {
 
+    @Override
+    protected HttpProcessor createServerHttpProcessor() {
+        return new ImmutableHttpProcessor(
+                new ResponseServer("TEST-SERVER/1.1"),
+                new ResponseContent(),
+                new ResponseConnControl());
+    }
+
     @Before
     public void setUp() throws Exception {
         initServer();
-        this.server.setHttpProcessor(new ImmutableHttpProcessor(
-                new ResponseServer("TEST-SERVER/1.1"), new ResponseContent(), new ResponseConnControl()));
-        final UriHttpAsyncRequestHandlerMapper registry = new UriHttpAsyncRequestHandlerMapper();
         this.server.registerHandler("*", new BasicAsyncRequestHandler(new HttpRequestHandler() {
 
             @Override

Modified: httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/testserver/nio/HttpClientNio.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/testserver/nio/HttpClientNio.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/testserver/nio/HttpClientNio.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/testserver/nio/HttpClientNio.java Sat Jul 16 11:27:17 2016
@@ -28,7 +28,6 @@
 package org.apache.hc.core5.http.testserver.nio;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -40,81 +39,46 @@ import org.apache.hc.core5.http.HttpHost
 import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpResponse;
 import org.apache.hc.core5.http.OoopsieRuntimeException;
-import org.apache.hc.core5.http.config.ConnectionConfig;
 import org.apache.hc.core5.http.impl.nio.BasicAsyncRequestProducer;
 import org.apache.hc.core5.http.impl.nio.BasicAsyncResponseConsumer;
-import org.apache.hc.core5.http.impl.nio.DefaultHttpClientIODispatch;
+import org.apache.hc.core5.http.impl.nio.DefaultHttpClientIOEventHandlerFactory;
 import org.apache.hc.core5.http.impl.nio.DefaultNHttpClientConnection;
-import org.apache.hc.core5.http.impl.nio.DefaultNHttpClientConnectionFactory;
-import org.apache.hc.core5.http.impl.nio.HttpAsyncRequestExecutor;
 import org.apache.hc.core5.http.impl.nio.HttpAsyncRequester;
 import org.apache.hc.core5.http.nio.HttpAsyncRequestProducer;
 import org.apache.hc.core5.http.nio.HttpAsyncResponseConsumer;
-import org.apache.hc.core5.http.nio.NHttpClientConnection;
 import org.apache.hc.core5.http.nio.NHttpClientEventHandler;
+import org.apache.hc.core5.http.nio.NHttpConnectionFactory;
 import org.apache.hc.core5.http.pool.nio.BasicNIOConnPool;
 import org.apache.hc.core5.http.pool.nio.BasicNIOPoolEntry;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.http.protocol.HttpCoreContext;
 import org.apache.hc.core5.http.protocol.HttpProcessor;
-import org.apache.hc.core5.http.protocol.ImmutableHttpProcessor;
-import org.apache.hc.core5.http.protocol.RequestConnControl;
-import org.apache.hc.core5.http.protocol.RequestContent;
-import org.apache.hc.core5.http.protocol.RequestExpectContinue;
-import org.apache.hc.core5.http.protocol.RequestTargetHost;
-import org.apache.hc.core5.http.protocol.RequestUserAgent;
-import org.apache.hc.core5.pool.nio.NIOConnFactory;
 import org.apache.hc.core5.reactor.ConnectingIOReactor;
 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
 import org.apache.hc.core5.reactor.ExceptionEvent;
-import org.apache.hc.core5.reactor.IOEventDispatch;
+import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.reactor.IOReactorExceptionHandler;
 import org.apache.hc.core5.reactor.IOReactorStatus;
-import org.apache.hc.core5.reactor.IOSession;
-import org.apache.hc.core5.reactor.SessionRequest;
 
 public class HttpClientNio {
 
-    public static final HttpProcessor DEFAULT_HTTP_PROC = new ImmutableHttpProcessor(
-            new RequestContent(),
-            new RequestTargetHost(),
-            new RequestConnControl(),
-            new RequestUserAgent("TEST-CLIENT/1.1"),
-            new RequestExpectContinue());
-
     private final DefaultConnectingIOReactor ioReactor;
     private final BasicNIOConnPool connpool;
-
-    private volatile HttpProcessor httpProcessor;
-    private volatile HttpAsyncRequester executor;
-    private volatile IOReactorThread thread;
-    private volatile int timeout;
-    private volatile int waitForContinue;
+    private final HttpAsyncRequester executor;
+    private final IOReactorThread thread;
 
     public HttpClientNio(
-            final NIOConnFactory<HttpHost, NHttpClientConnection> connFactory) throws IOException {
+            final HttpProcessor httpProcessor,
+            final NHttpClientEventHandler protoclHandler,
+            final NHttpConnectionFactory<DefaultNHttpClientConnection> connFactory,
+            final IOReactorConfig reactorConfig) throws IOException {
         super();
-        this.ioReactor = new DefaultConnectingIOReactor();
+        this.ioReactor = new DefaultConnectingIOReactor(
+                new DefaultHttpClientIOEventHandlerFactory(protoclHandler, connFactory), reactorConfig);
         this.ioReactor.setExceptionHandler(new SimpleIOReactorExceptionHandler());
-        this.connpool = new BasicNIOConnPool(this.ioReactor, new NIOConnFactory<HttpHost, NHttpClientConnection>() {
-
-            @Override
-            public NHttpClientConnection create(
-                final HttpHost route, final IOSession session) throws IOException {
-                final NHttpClientConnection conn = connFactory.create(route, session);
-                conn.setSocketTimeout(timeout);
-                return conn;
-            }
-
-        }, 0);
-    }
-
-    public int getTimeout() {
-        return this.timeout;
-    }
-
-    public void setTimeout(final int timeout) {
-        this.timeout = timeout;
+        this.connpool = new BasicNIOConnPool(this.ioReactor, reactorConfig.getConnectTimeout());
+        this.executor = new HttpAsyncRequester(httpProcessor);
+        this.thread = new IOReactorThread();
     }
 
     public void setMaxTotal(final int max) {
@@ -125,18 +89,11 @@ public class HttpClientNio {
         this.connpool.setDefaultMaxPerRoute(max);
     }
 
-    public void setHttpProcessor(final HttpProcessor httpProcessor) {
-        this.httpProcessor = httpProcessor;
-    }
-
-    public void setWaitForContinue(final int waitForContinue) {
-        this.waitForContinue = waitForContinue;
-    }
-
     public Future<BasicNIOPoolEntry> lease(
             final HttpHost host,
+            final int connectTimeout,
             final FutureCallback<BasicNIOPoolEntry> callback) {
-        return this.connpool.lease(host, null, this.timeout, TimeUnit.MILLISECONDS, callback);
+        return this.connpool.lease(host, null, connectTimeout, TimeUnit.MILLISECONDS, callback);
     }
 
     public void release(final BasicNIOPoolEntry poolEntry, final boolean reusable) {
@@ -217,31 +174,7 @@ public class HttpClientNio {
         return executePipelined(target, Arrays.asList(requests), null, null);
     }
 
-    private void execute(final NHttpClientEventHandler clientHandler) throws IOException {
-        final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(clientHandler,
-            new DefaultNHttpClientConnectionFactory(ConnectionConfig.DEFAULT)) {
-
-            @Override
-            protected DefaultNHttpClientConnection createConnection(final IOSession session) {
-                final DefaultNHttpClientConnection conn = super.createConnection(session);
-                conn.setSocketTimeout(timeout);
-                return conn;
-            }
-
-        };
-        this.ioReactor.execute(ioEventDispatch);
-    }
-
-    public SessionRequest openConnection(final InetSocketAddress address, final Object attachment) {
-        final SessionRequest sessionRequest = this.ioReactor.connect(address, null, attachment, null);
-        sessionRequest.setConnectTimeout(this.timeout);
-        return sessionRequest;
-    }
-
     public void start() {
-        this.executor = new HttpAsyncRequester(this.httpProcessor != null ? this.httpProcessor : DEFAULT_HTTP_PROC);
-        this.thread = new IOReactorThread(new HttpAsyncRequestExecutor(
-                this.waitForContinue > 0 ? this.waitForContinue : HttpAsyncRequestExecutor.DEFAULT_WAIT_FOR_CONTINUE));
         this.thread.start();
     }
 
@@ -281,19 +214,16 @@ public class HttpClientNio {
 
     private class IOReactorThread extends Thread {
 
-        private final NHttpClientEventHandler clientHandler;
-
         private volatile Exception ex;
 
-        public IOReactorThread(final NHttpClientEventHandler clientHandler) {
+        public IOReactorThread() {
             super();
-            this.clientHandler = clientHandler;
         }
 
         @Override
         public void run() {
             try {
-                execute(this.clientHandler);
+                ioReactor.execute();
             } catch (final Exception ex) {
                 this.ex = ex;
             }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/testserver/nio/HttpCoreNIOTestBase.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/testserver/nio/HttpCoreNIOTestBase.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/testserver/nio/HttpCoreNIOTestBase.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/testserver/nio/HttpCoreNIOTestBase.java Sat Jul 16 11:27:17 2016
@@ -31,7 +31,23 @@ import java.net.URL;
 
 import javax.net.ssl.SSLContext;
 
-import org.apache.hc.core5.http.pool.nio.BasicNIOConnFactory;
+import org.apache.hc.core5.http.HttpRequestInterceptor;
+import org.apache.hc.core5.http.HttpResponseInterceptor;
+import org.apache.hc.core5.http.impl.nio.HttpAsyncRequestExecutor;
+import org.apache.hc.core5.http.nio.HttpAsyncExpectationVerifier;
+import org.apache.hc.core5.http.protocol.HttpProcessor;
+import org.apache.hc.core5.http.protocol.ImmutableHttpProcessor;
+import org.apache.hc.core5.http.protocol.RequestConnControl;
+import org.apache.hc.core5.http.protocol.RequestContent;
+import org.apache.hc.core5.http.protocol.RequestExpectContinue;
+import org.apache.hc.core5.http.protocol.RequestTargetHost;
+import org.apache.hc.core5.http.protocol.RequestUserAgent;
+import org.apache.hc.core5.http.protocol.RequestValidateHost;
+import org.apache.hc.core5.http.protocol.ResponseConnControl;
+import org.apache.hc.core5.http.protocol.ResponseContent;
+import org.apache.hc.core5.http.protocol.ResponseDate;
+import org.apache.hc.core5.http.protocol.ResponseServer;
+import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.ssl.SSLContextBuilder;
 import org.junit.After;
 
@@ -77,27 +93,73 @@ public abstract class HttpCoreNIOTestBas
                 .build();
     }
 
+    protected HttpProcessor createServerHttpProcessor() {
+        return new ImmutableHttpProcessor(
+                new HttpRequestInterceptor[] {
+                        new RequestValidateHost()
+                },
+                new HttpResponseInterceptor[]{
+                        new ResponseDate(),
+                        new ResponseServer("TEST-SERVER/1.1"),
+                        new ResponseContent(),
+                        new ResponseConnControl()
+                });
+    }
+
+    protected HttpAsyncExpectationVerifier createExpectationVerifier() {
+        return null;
+    }
+
+    protected HttpProcessor createClientHttpProcessor() {
+        return new ImmutableHttpProcessor(
+                new RequestContent(),
+                new RequestTargetHost(),
+                new RequestConnControl(),
+                new RequestUserAgent("TEST-CLIENT/1.1"),
+                new RequestExpectContinue());
+    }
+
     protected ServerConnectionFactory createServerConnectionFactory() throws Exception {
         return new ServerConnectionFactory(
                 this.scheme.equals(ProtocolScheme.https) ? createServerSSLContext() : null);
     }
 
-    protected BasicNIOConnFactory createClientConnectionFactory() throws Exception {
-        return new BasicNIOConnFactory(
-                new ClientConnectionFactory(),
-                this.scheme.equals(ProtocolScheme.https) ? new ClientConnectionFactory(createClientSSLContext()) : null);
+    protected IOReactorConfig createServerIOReactorConfig() {
+        return IOReactorConfig.custom()
+                .setSoTimeout(5000)
+                .build();
+    }
+
+    protected IOReactorConfig createClientIOReactorConfig() {
+        return IOReactorConfig.custom()
+                .setConnectTimeout(5000)
+                .setSoTimeout(5000)
+                .build();
+    }
+
+    protected HttpAsyncRequestExecutor createHttpAsyncRequestExecutor() throws Exception {
+        return new HttpAsyncRequestExecutor();
+    }
 
+    protected ClientConnectionFactory createClientConnectionFactory() throws Exception {
+        return new ClientConnectionFactory(
+                this.scheme.equals(ProtocolScheme.https) ? createClientSSLContext() : null);
     }
 
     public void initServer() throws Exception {
-        this.server = new HttpServerNio();
-        this.server.setConnectionFactory(createServerConnectionFactory());
-        this.server.setTimeout(5000);
+        this.server = new HttpServerNio(
+                createServerHttpProcessor(),
+                createServerConnectionFactory(),
+                createExpectationVerifier(),
+                createServerIOReactorConfig());
     }
 
     public void initClient() throws Exception {
-        this.client = new HttpClientNio(createClientConnectionFactory());
-        this.client.setTimeout(5000);
+        this.client = new HttpClientNio(
+                createClientHttpProcessor(),
+                createHttpAsyncRequestExecutor(),
+                createClientConnectionFactory(),
+                createClientIOReactorConfig());
     }
 
     @After