You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2017/05/09 19:58:09 UTC

[17/50] [abbrv] httpcomponents-core git commit: Minor refactoring of I/O reactor code

Minor refactoring of I/O reactor code

git-svn-id: https://svn.apache.org/repos/asf/httpcomponents/httpcore/trunk@1792526 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/af44438d
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/af44438d
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/af44438d

Branch: refs/heads/trunk
Commit: af44438d73091a7e9ba8351cf521f95f79b28825
Parents: 61a8518
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Mon Apr 24 18:10:55 2017 +0000
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Mon Apr 24 18:10:55 2017 +0000

----------------------------------------------------------------------
 .../hc/core5/testing/nio/AsyncRequester.java    |  5 +-
 .../hc/core5/testing/nio/AsyncServer.java       |  5 +-
 .../hc/core5/testing/nio/IOReactorExecutor.java |  3 +-
 .../hc/core5/testing/nio/ThreadFactoryImpl.java | 61 --------------
 .../nio/TestDefaultListeningIOReactor.java      | 88 ++++++++++----------
 .../core5/concurrent/DefaultThreadFactory.java  | 64 ++++++++++++++
 .../http/impl/bootstrap/AsyncRequester.java     |  5 +-
 .../core5/http/impl/bootstrap/AsyncServer.java  |  5 +-
 .../core5/http/impl/bootstrap/HttpServer.java   |  5 +-
 .../http/impl/bootstrap/ThreadFactoryImpl.java  | 64 --------------
 .../reactor/AbstractMultiworkerIOReactor.java   | 54 ++++--------
 .../reactor/DefaultConnectingIOReactor.java     | 15 ++--
 .../reactor/DefaultListeningIOReactor.java      | 39 ++++-----
 .../reactor/IOReactorExceptionHandler.java      | 12 ---
 .../apache/hc/core5/reactor/IOReactorImpl.java  | 21 +----
 15 files changed, 174 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java
index 6b864b1..436fdf0 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java
@@ -32,6 +32,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.concurrent.ThreadFactory;
 
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.net.NamedEndpoint;
@@ -49,8 +50,8 @@ public class AsyncRequester extends IOReactorExecutor<DefaultConnectingIOReactor
 
     public AsyncRequester(final IOReactorConfig ioReactorConfig) {
         super(ioReactorConfig,
-                new ThreadFactoryImpl("connector", true),
-                new ThreadFactoryImpl("requester-dispatch", true));
+                new DefaultThreadFactory("connector", true),
+                new DefaultThreadFactory("requester-dispatch", true));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java
index 798cf1f..8698134 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java
@@ -32,6 +32,7 @@ import java.net.InetSocketAddress;
 import java.util.Set;
 import java.util.concurrent.ThreadFactory;
 
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.reactor.DefaultListeningIOReactor;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
@@ -42,7 +43,7 @@ import org.apache.hc.core5.reactor.ListenerEndpoint;
 public class AsyncServer extends IOReactorExecutor<DefaultListeningIOReactor> {
 
     public AsyncServer(final IOReactorConfig ioReactorConfig) {
-        super(ioReactorConfig, new ThreadFactoryImpl("listener", true), new ThreadFactoryImpl("server-dispatch", true));
+        super(ioReactorConfig, new DefaultThreadFactory("listener", true), new DefaultThreadFactory("server-dispatch", true));
     }
 
     @Override
@@ -52,7 +53,7 @@ public class AsyncServer extends IOReactorExecutor<DefaultListeningIOReactor> {
             final ThreadFactory threadFactory,
             final Callback<IOSession> sessionShutdownCallback) throws IOException {
         return new DefaultListeningIOReactor(
-                ioEventHandlerFactory, ioReactorConfig, threadFactory, sessionShutdownCallback);
+                ioEventHandlerFactory, ioReactorConfig, threadFactory, null, sessionShutdownCallback);
     }
 
     public ListenerEndpoint listen(final InetSocketAddress address) {

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
index 2af9092..e0ed7e4 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
@@ -94,7 +95,7 @@ abstract class IOReactorExecutor<T extends AbstractMultiworkerIOReactor> impleme
         if (ioReactorRef.compareAndSet(null, createIOReactor(
                 ioEventHandlerFactory,
                 ioReactorConfig,
-                workerThreadFactory != null ? workerThreadFactory : new ThreadFactoryImpl("i/o dispatch"),
+                workerThreadFactory != null ? workerThreadFactory : new DefaultThreadFactory("i/o dispatch"),
                 new Callback<IOSession>() {
 
                     @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ThreadFactoryImpl.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ThreadFactoryImpl.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ThreadFactoryImpl.java
deleted file mode 100644
index 48d95dd..0000000
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ThreadFactoryImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.testing.nio;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicLong;
-
-class ThreadFactoryImpl implements ThreadFactory {
-
-    private final String namePrefix;
-    private final ThreadGroup group;
-    private final AtomicLong count;
-    private final boolean daemon;
-
-    ThreadFactoryImpl(final String namePrefix, final ThreadGroup group, final boolean daemon) {
-        this.namePrefix = namePrefix;
-        this.group = group;
-        this.daemon = daemon;
-        this.count = new AtomicLong();
-    }
-
-    ThreadFactoryImpl(final String namePrefix, final boolean daemon) {
-        this(namePrefix, null, daemon);
-    }
-
-    ThreadFactoryImpl(final String namePrefix) {
-        this(namePrefix, null, false);
-    }
-
-    @Override
-    public Thread newThread(final Runnable target) {
-        final Thread thread = new Thread(this.group, target, this.namePrefix + "-"  + this.count.incrementAndGet());
-        thread.setDaemon(daemon);
-        return thread;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
index cdc79df..6c7ff90 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
@@ -54,45 +54,47 @@ import org.junit.Test;
  */
 public class TestDefaultListeningIOReactor {
 
-    protected DefaultListeningIOReactor ioreactor;
+    private DefaultListeningIOReactor ioreactor;
 
-    @Before
-    public void setup() throws Exception {
-        final IOReactorConfig reactorConfig = IOReactorConfig.custom()
-                .setIoThreadCount(1)
-                .build();
-        this.ioreactor = new DefaultListeningIOReactor(new IOEventHandlerFactory() {
+    private static class NoopIOEventHandlerFactory implements IOEventHandlerFactory {
 
-            @Override
-            public IOEventHandler createHandler(final TlsCapableIOSession ioSession, final Object attachment) {
-                return new IOEventHandler() {
+        @Override
+        public IOEventHandler createHandler(final TlsCapableIOSession ioSession, final Object attachment) {
+            return new IOEventHandler() {
 
-                    @Override
-                    public void connected(final IOSession session) {
-                    }
+                @Override
+                public void connected(final IOSession session) {
+                }
 
-                    @Override
-                    public void inputReady(final IOSession session) {
-                    }
+                @Override
+                public void inputReady(final IOSession session) {
+                }
 
-                    @Override
-                    public void outputReady(final IOSession session) {
-                    }
+                @Override
+                public void outputReady(final IOSession session) {
+                }
 
-                    @Override
-                    public void timeout(final IOSession session) {
-                    }
+                @Override
+                public void timeout(final IOSession session) {
+                }
 
-                    @Override
-                    public void exception(final IOSession session, final Exception cause) {
-                    }
+                @Override
+                public void exception(final IOSession session, final Exception cause) {
+                }
 
-                    @Override
-                    public void disconnected(final IOSession session) {
-                    }
-                };
-            }
-        }, reactorConfig, null);
+                @Override
+                public void disconnected(final IOSession session) {
+                }
+            };
+        }
+    }
+
+    @Before
+    public void setup() throws Exception {
+        final IOReactorConfig reactorConfig = IOReactorConfig.custom()
+                .setIoThreadCount(1)
+                .build();
+        this.ioreactor = new DefaultListeningIOReactor(new NoopIOEventHandlerFactory(), reactorConfig, null, null);
     }
 
     @After
@@ -194,20 +196,20 @@ public class TestDefaultListeningIOReactor {
 
     @Test
     public void testEndpointAlreadyBoundNonFatal() throws Exception {
-        ioreactor.setExceptionHandler(new IOReactorExceptionHandler() {
-
-            @Override
-            public boolean handle(final IOException ex) {
-                return (ex instanceof BindException);
-            }
-
-            @Override
-            public boolean handle(final RuntimeException ex) {
-                return false;
-            }
+        final IOReactorConfig reactorConfig = IOReactorConfig.custom()
+                .setIoThreadCount(1)
+                .build();
+        ioreactor = new DefaultListeningIOReactor(
+                new NoopIOEventHandlerFactory(),
+                reactorConfig,
+                new IOReactorExceptionHandler() {
 
-        });
+                    @Override
+                    public boolean handle(final IOException ex) {
+                        return (ex instanceof BindException);
+                    }
 
+                }, null);
         final Thread t = new Thread(new Runnable() {
 
             @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/concurrent/DefaultThreadFactory.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/concurrent/DefaultThreadFactory.java b/httpcore5/src/main/java/org/apache/hc/core5/concurrent/DefaultThreadFactory.java
new file mode 100644
index 0000000..ab589e9
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/concurrent/DefaultThreadFactory.java
@@ -0,0 +1,64 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.concurrent;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * @since 5.0
+ */
+public class DefaultThreadFactory implements ThreadFactory {
+
+    private final String namePrefix;
+    private final ThreadGroup group;
+    private final AtomicLong count;
+    private final boolean daemon;
+
+    public DefaultThreadFactory(final String namePrefix, final ThreadGroup group, final boolean daemon) {
+        this.namePrefix = namePrefix;
+        this.group = group;
+        this.daemon = daemon;
+        this.count = new AtomicLong();
+    }
+
+    public DefaultThreadFactory(final String namePrefix, final boolean daemon) {
+        this(namePrefix, null, daemon);
+    }
+
+    public DefaultThreadFactory(final String namePrefix) {
+        this(namePrefix, null, false);
+    }
+
+    @Override
+    public Thread newThread(final Runnable target) {
+        final Thread thread = new Thread(this.group, target, this.namePrefix + "-"  + this.count.incrementAndGet());
+        thread.setDaemon(daemon);
+        return thread;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java
index f0da106..181bd74 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java
@@ -30,6 +30,7 @@ package org.apache.hc.core5.http.impl.bootstrap;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.http.HttpHost;
@@ -53,9 +54,9 @@ public class AsyncRequester extends IOReactorExecutor<DefaultConnectingIOReactor
             final ExceptionListener exceptionListener,
             final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
         super(new DefaultConnectingIOReactor(
-                        eventHandlerFactory, ioReactorConfig, new ThreadFactoryImpl("requester-dispatch", true), sessionShutdownCallback),
+                        eventHandlerFactory, ioReactorConfig, new DefaultThreadFactory("requester-dispatch", true), sessionShutdownCallback),
                 exceptionListener,
-                new ThreadFactoryImpl("connector", true));
+                new DefaultThreadFactory("connector", true));
     }
 
     private InetSocketAddress toSocketAddress(final HttpHost host) {

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
index 031223b..0e0ed5e 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
@@ -30,6 +30,7 @@ package org.apache.hc.core5.http.impl.bootstrap;
 import java.net.InetSocketAddress;
 import java.util.Set;
 
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.reactor.DefaultListeningIOReactor;
@@ -47,9 +48,9 @@ public class AsyncServer extends IOReactorExecutor<DefaultListeningIOReactor> {
             final ExceptionListener exceptionListener,
             final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
         super(new DefaultListeningIOReactor(
-                    eventHandlerFactory, ioReactorConfig, new ThreadFactoryImpl("server-dispatch", true), sessionShutdownCallback),
+                    eventHandlerFactory, ioReactorConfig, new DefaultThreadFactory("server-dispatch", true), null, sessionShutdownCallback),
                 exceptionListener,
-                new ThreadFactoryImpl("listener", true));
+                new DefaultThreadFactory("listener", true));
     }
 
     public ListenerEndpoint listen(final InetSocketAddress address) {

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpServer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpServer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpServer.java
index 8d3da4a..258f926 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpServer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpServer.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.net.ServerSocketFactory;
 import javax.net.ssl.SSLServerSocket;
 
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.http.ExceptionListener;
 import org.apache.hc.core5.http.config.SocketConfig;
 import org.apache.hc.core5.http.impl.io.DefaultBHttpServerConnection;
@@ -93,12 +94,12 @@ public class HttpServer implements GracefullyCloseable {
         this.listenerExecutorService = new ThreadPoolExecutor(
                 1, 1, 0L, TimeUnit.MILLISECONDS,
                 new SynchronousQueue<Runnable>(),
-                new ThreadFactoryImpl("HTTP-listener-" + this.port));
+                new DefaultThreadFactory("HTTP-listener-" + this.port));
         this.workerThreads = new ThreadGroup("HTTP-workers");
         this.workerExecutorService = new WorkerPoolExecutor(
                 0, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS,
                 new SynchronousQueue<Runnable>(),
-                new ThreadFactoryImpl("HTTP-worker", this.workerThreads, false));
+                new DefaultThreadFactory("HTTP-worker", this.workerThreads, false));
         this.status = new AtomicReference<>(Status.READY);
     }
 

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/ThreadFactoryImpl.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/ThreadFactoryImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/ThreadFactoryImpl.java
deleted file mode 100644
index 32ce23e..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/ThreadFactoryImpl.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.impl.bootstrap;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * @since 4.4
- */
-class ThreadFactoryImpl implements ThreadFactory {
-
-    private final String namePrefix;
-    private final ThreadGroup group;
-    private final AtomicLong count;
-    private final boolean daemon;
-
-    ThreadFactoryImpl(final String namePrefix, final ThreadGroup group, final boolean daemon) {
-        this.namePrefix = namePrefix;
-        this.group = group;
-        this.daemon = daemon;
-        this.count = new AtomicLong();
-    }
-
-    ThreadFactoryImpl(final String namePrefix, final boolean daemon) {
-        this(namePrefix, null, daemon);
-    }
-
-    ThreadFactoryImpl(final String namePrefix) {
-        this(namePrefix, null, false);
-    }
-
-    @Override
-    public Thread newThread(final Runnable target) {
-        final Thread thread = new Thread(this.group, target, this.namePrefix + "-"  + this.count.incrementAndGet());
-        thread.setDaemon(daemon);
-        return thread;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
index ebb9c77..1ea7634 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
@@ -42,9 +42,9 @@ import java.util.Date;
 import java.util.List;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
 import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.io.ShutdownType;
 import org.apache.hc.core5.util.Args;
@@ -92,22 +92,19 @@ import org.apache.hc.core5.util.TimeValue;
  */
 public abstract class AbstractMultiworkerIOReactor implements IOReactor {
 
-    protected final IOReactorConfig reactorConfig;
-    protected final Selector selector;
+    private final IOReactorConfig reactorConfig;
+    private final Selector selector;
     private final int workerCount;
     private final IOEventHandlerFactory eventHandlerFactory;
     private final ThreadFactory threadFactory;
     private final Callback<IOSession> sessionShutdownCallback;
     private final IOReactorImpl[] dispatchers;
-    private final Worker[] workers;
+    private final IODispatchWorker[] workers;
     private final Thread[] threads;
+    private final List<ExceptionEvent> auditLog;
     private final AtomicReference<IOReactorStatus> status;
     private final Object shutdownMutex;
 
-    //TODO: make final
-    protected IOReactorExceptionHandler exceptionHandler;
-    protected List<ExceptionEvent> auditLog;
-
     private int currentWorker = 0;
 
     /**
@@ -134,12 +131,12 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor {
         } catch (final IOException ex) {
             throw new IOReactorException("Failure opening selector", ex);
         }
-        this.threadFactory = threadFactory != null ? threadFactory : new DefaultThreadFactory();
+        this.threadFactory = threadFactory != null ? threadFactory : new DefaultThreadFactory("I/O dispatcher", true);
         this.sessionShutdownCallback = sessionShutdownCallback;
         this.auditLog = new ArrayList<>();
         this.workerCount = this.reactorConfig.getIoThreadCount();
         this.dispatchers = new IOReactorImpl[workerCount];
-        this.workers = new Worker[workerCount];
+        this.workers = new IODispatchWorker[workerCount];
         this.threads = new Thread[workerCount];
         this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
         this.shutdownMutex = new Object();
@@ -151,6 +148,10 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor {
         this(eventHandlerFactory, null, null, sessionShutdownCallback);
     }
 
+    Selector selector() {
+        return selector;
+    }
+
     @Override
     public IOReactorStatus getStatus() {
         return this.status.get();
@@ -176,7 +177,7 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor {
      * @param timestamp the time stamp of the exception. Can be
      * {@code null} in which case the current date / time will be used.
      */
-    protected synchronized void addExceptionEvent(final Throwable ex, final Date timestamp) {
+    void addExceptionEvent(final Throwable ex, final Date timestamp) {
         if (ex == null) {
             return;
         }
@@ -190,20 +191,11 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor {
      *
      * @param ex the exception thrown by the I/O reactor.
      */
-    protected void addExceptionEvent(final Throwable ex) {
+    void addExceptionEvent(final Throwable ex) {
         addExceptionEvent(ex, null);
     }
 
     /**
-     * Sets exception handler for this I/O reactor.
-     *
-     * @param exceptionHandler the exception handler.
-     */
-    public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
-        this.exceptionHandler = exceptionHandler;
-    }
-
-    /**
      * Triggered to process I/O events registered by the main {@link Selector}.
      * <p>
      * Super-classes can implement this method to react to the event.
@@ -256,13 +248,12 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor {
                 final IOReactorImpl dispatcher = new IOReactorImpl(
                         this.eventHandlerFactory,
                         this.reactorConfig,
-                        this.exceptionHandler,
                         this.sessionShutdownCallback);
                 this.dispatchers[i] = dispatcher;
             }
             for (int i = 0; i < this.workerCount; i++) {
                 final IOReactorImpl dispatcher = this.dispatchers[i];
-                this.workers[i] = new Worker(dispatcher);
+                this.workers[i] = new IODispatchWorker(dispatcher);
                 this.threads[i] = this.threadFactory.newThread(this.workers[i]);
             }
 
@@ -296,7 +287,7 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor {
 
                 // Verify I/O dispatchers
                 for (int i = 0; i < this.workerCount; i++) {
-                    final Worker worker = this.workers[i];
+                    final IODispatchWorker worker = this.workers[i];
                     final Throwable ex = worker.getThrowable();
                     if (ex != null) {
                         throw new IOReactorException("I/O dispatch worker terminated abnormally", ex);
@@ -487,13 +478,13 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor {
         }
     }
 
-    static class Worker implements Runnable {
+    static class IODispatchWorker implements Runnable {
 
         final IOReactorImpl dispatcher;
 
         private volatile Throwable throwable;
 
-        public Worker(final IOReactorImpl dispatcher) {
+        public IODispatchWorker(final IOReactorImpl dispatcher) {
             super();
             this.dispatcher = dispatcher;
         }
@@ -516,15 +507,4 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor {
 
     }
 
-    static class DefaultThreadFactory implements ThreadFactory {
-
-        private final static AtomicLong COUNT = new AtomicLong(1);
-
-        @Override
-        public Thread newThread(final Runnable r) {
-            return new Thread(r, "I/O dispatcher " + COUNT.getAndIncrement());
-        }
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
index 5b5c9f2..bd914ae 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
@@ -55,17 +55,18 @@ import org.apache.hc.core5.util.Asserts;
 public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
         implements ConnectingIOReactor {
 
+    private final IOReactorConfig reactorConfig;
     private final Queue<SessionRequestImpl> requestQueue;
-
     private final long selectInterval;
     private long lastTimeoutCheck;
 
     public DefaultConnectingIOReactor(
             final IOEventHandlerFactory eventHandlerFactory,
-            final IOReactorConfig reactorConfig,
+            final IOReactorConfig ioReactorConfig,
             final ThreadFactory threadFactory,
             final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
-        super(eventHandlerFactory, reactorConfig, threadFactory, sessionShutdownCallback);
+        super(eventHandlerFactory, ioReactorConfig, threadFactory, sessionShutdownCallback);
+        this.reactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
         this.requestQueue = new ConcurrentLinkedQueue<>();
         this.selectInterval = this.reactorConfig.getSelectInterval();
         this.lastTimeoutCheck = System.currentTimeMillis();
@@ -103,7 +104,7 @@ public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
         processSessionRequests();
 
         if (readyCount > 0) {
-            final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
+            final Set<SelectionKey> selectedKeys = selector().selectedKeys();
             for (final SelectionKey key : selectedKeys) {
 
                 processEvent(key);
@@ -115,7 +116,7 @@ public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
         final long currentTime = System.currentTimeMillis();
         if ((currentTime - this.lastTimeoutCheck) >= this.selectInterval) {
             this.lastTimeoutCheck = currentTime;
-            final Set<SelectionKey> keys = this.selector.keys();
+            final Set<SelectionKey> keys = selector().keys();
             processTimeouts(keys);
         }
     }
@@ -197,7 +198,7 @@ public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
                 callback);
 
         this.requestQueue.add(sessionRequest);
-        this.selector.wakeup();
+        selector().wakeup();
 
         return sessionRequest;
     }
@@ -252,7 +253,7 @@ public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
 
             final SessionRequestHandle requestHandle = new SessionRequestHandle(request);
             try {
-                final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT,
+                final SelectionKey key = socketChannel.register(selector(), SelectionKey.OP_CONNECT,
                         requestHandle);
                 request.setKey(key);
             } catch (final IOException ex) {

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
index 2a8c3d1..1276435 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
@@ -55,6 +55,8 @@ import org.apache.hc.core5.util.Asserts;
 public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
         implements ListeningIOReactor {
 
+    private final IOReactorConfig reactorConfig;
+    private final IOReactorExceptionHandler exceptionHandler;
     private final Queue<ListenerEndpointImpl> requestQueue;
     private final Set<ListenerEndpointImpl> endpoints;
     private final Set<SocketAddress> pausedEndpoints;
@@ -65,7 +67,7 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
      * Creates an instance of DefaultListeningIOReactor with the given configuration.
      *
      * @param eventHandlerFactory the factory to create I/O event handlers.
-     * @param config I/O reactor configuration.
+     * @param ioReactorConfig I/O reactor configuration.
      * @param threadFactory the factory to create threads.
      *   Can be {@code null}.
      * @throws IOReactorException in case if a non-recoverable I/O error.
@@ -74,10 +76,13 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
      */
     public DefaultListeningIOReactor(
             final IOEventHandlerFactory eventHandlerFactory,
-            final IOReactorConfig config,
+            final IOReactorConfig ioReactorConfig,
             final ThreadFactory threadFactory,
+            final IOReactorExceptionHandler exceptionHandler,
             final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
-        super(eventHandlerFactory, config, threadFactory, sessionShutdownCallback);
+        super(eventHandlerFactory, ioReactorConfig, threadFactory, sessionShutdownCallback);
+        this.reactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
+        this.exceptionHandler = exceptionHandler;
         this.requestQueue = new ConcurrentLinkedQueue<>();
         this.endpoints = Collections.synchronizedSet(new HashSet<ListenerEndpointImpl>());
         this.pausedEndpoints = new HashSet<>();
@@ -96,8 +101,9 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
     public DefaultListeningIOReactor(
             final IOEventHandlerFactory eventHandlerFactory,
             final IOReactorConfig config,
+            final IOReactorExceptionHandler exceptionHandler,
             final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
-        this(eventHandlerFactory, config, null, sessionShutdownCallback);
+        this(eventHandlerFactory, config, null, exceptionHandler, sessionShutdownCallback);
     }
 
     /**
@@ -108,9 +114,8 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
      *
      * @since 5.0
      */
-    public DefaultListeningIOReactor(
-            final IOEventHandlerFactory eventHandlerFactory) throws IOReactorException {
-        this(eventHandlerFactory, null, null);
+    public DefaultListeningIOReactor(final IOEventHandlerFactory eventHandlerFactory) throws IOReactorException {
+        this(eventHandlerFactory, null, null, null);
     }
 
     @Override
@@ -128,7 +133,7 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
         }
 
         if (readyCount > 0) {
-            final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
+            final Set<SelectionKey> selectedKeys = selector().selectedKeys();
             for (final SelectionKey key : selectedKeys) {
 
                 processEvent(key);
@@ -150,10 +155,8 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
                     try {
                         socketChannel = serverChannel.accept();
                     } catch (final IOException ex) {
-                        if (this.exceptionHandler == null ||
-                                !this.exceptionHandler.handle(ex)) {
-                            throw new IOReactorException(
-                                    "Failure accepting connection", ex);
+                        if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
+                            throw new IOReactorException("Failure accepting connection", ex);
                         }
                     }
                     if (socketChannel == null) {
@@ -162,10 +165,8 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
                     try {
                         prepareSocket(socketChannel.socket());
                     } catch (final IOException ex) {
-                        if (this.exceptionHandler == null ||
-                                !this.exceptionHandler.handle(ex)) {
-                            throw new IOReactorException(
-                                    "Failure initalizing socket", ex);
+                        if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
+                            throw new IOReactorException("Failure initalizing socket", ex);
                         }
                     }
                     enqueuePendingSession(socketChannel, null);
@@ -198,7 +199,7 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
         Asserts.check(status == IOReactorStatus.INACTIVE || status == IOReactorStatus.ACTIVE, "I/O reactor has been shut down");
         final ListenerEndpointImpl request = createEndpoint(address);
         this.requestQueue.add(request);
-        this.selector.wakeup();
+        selector().wakeup();
         return request;
     }
 
@@ -234,7 +235,7 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
                 return;
             }
             try {
-                final SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
+                final SelectionKey key = serverChannel.register(selector(), SelectionKey.OP_ACCEPT);
                 key.attach(request);
                 request.setKey(key);
             } catch (final IOException ex) {
@@ -293,7 +294,7 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
             this.requestQueue.add(request);
         }
         this.pausedEndpoints.clear();
-        this.selector.wakeup();
+        selector().wakeup();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorExceptionHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorExceptionHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorExceptionHandler.java
index 021cc21..3327641 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorExceptionHandler.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorExceptionHandler.java
@@ -49,16 +49,4 @@ public interface IOReactorExceptionHandler {
      */
     boolean handle(IOException ex);
 
-    /**
-     * This method is expected to examine the runtime exception passed as
-     * a parameter and decide whether it is safe to continue execution of
-     * the I/O reactor.
-     *
-     * @param ex potentially recoverable runtime exception
-     * @return {@code true} if it is safe to ignore the exception
-     * and continue execution of the I/O reactor; {@code false} if the
-     * I/O reactor must throw {@link RuntimeException} and terminate
-     */
-    boolean handle(RuntimeException ex);
-
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
index 34731af..dbdce05 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
@@ -63,7 +63,6 @@ class IOReactorImpl implements IOReactor {
     private final AtomicReference<IOReactorStatus> status;
     private final AtomicBoolean shutdownInitiated;
     private final Object shutdownMutex;
-    private final IOReactorExceptionHandler exceptionHandler;
     private final Callback<IOSession> sessionShutdownCallback;
 
     private volatile long lastTimeoutCheck;
@@ -71,12 +70,10 @@ class IOReactorImpl implements IOReactor {
     IOReactorImpl(
             final IOEventHandlerFactory eventHandlerFactory,
             final IOReactorConfig reactorConfig,
-            final IOReactorExceptionHandler exceptionHandler,
             final Callback<IOSession> sessionShutdownCallback) {
         super();
         this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
         this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
-        this.exceptionHandler = exceptionHandler;
         this.sessionShutdownCallback = sessionShutdownCallback;
         this.shutdownInitiated = new AtomicBoolean(false);
         this.closedSessions = new ConcurrentLinkedQueue<>();
@@ -232,12 +229,6 @@ class IOReactorImpl implements IOReactor {
         selectedKeys.clear();
     }
 
-    private void handleRuntimeException(final RuntimeException ex) {
-        if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
-            throw ex;
-        }
-    }
-
     private void processEvent(final SelectionKey key) {
         final InternalIOSession session = (InternalIOSession) key.attachment();
         try {
@@ -253,7 +244,7 @@ class IOReactorImpl implements IOReactor {
             session.shutdown(ShutdownType.GRACEFUL);
         } catch (final RuntimeException ex) {
             session.shutdown(ShutdownType.IMMEDIATE);
-            handleRuntimeException(ex);
+            throw ex;
         }
     }
 
@@ -288,11 +279,7 @@ class IOReactorImpl implements IOReactor {
                 if (sessionRequest != null) {
                     sessionRequest.completed(session);
                 }
-                try {
-                    session.onConnected();
-                } catch (final RuntimeException ex) {
-                    handleRuntimeException(ex);
-                }
+                session.onConnected();
             } catch (final CancelledKeyException ex) {
                 session.shutdown(ShutdownType.GRACEFUL);
             }
@@ -309,8 +296,6 @@ class IOReactorImpl implements IOReactor {
                 session.onDisconnected();
             } catch (final CancelledKeyException ex) {
                 // ignore and move on
-            } catch (final RuntimeException ex) {
-                handleRuntimeException(ex);
             }
         }
     }
@@ -329,7 +314,7 @@ class IOReactorImpl implements IOReactor {
                 session.shutdown(ShutdownType.GRACEFUL);
             } catch (final RuntimeException ex) {
                 session.shutdown(ShutdownType.IMMEDIATE);
-                handleRuntimeException(ex);
+                throw ex;
             }
         }
     }