You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2017/05/09 19:58:09 UTC
[17/50] [abbrv] httpcomponents-core git commit: Minor refactoring of
I/O reactor code
Minor refactoring of I/O reactor code
git-svn-id: https://svn.apache.org/repos/asf/httpcomponents/httpcore/trunk@1792526 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/af44438d
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/af44438d
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/af44438d
Branch: refs/heads/trunk
Commit: af44438d73091a7e9ba8351cf521f95f79b28825
Parents: 61a8518
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Mon Apr 24 18:10:55 2017 +0000
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Mon Apr 24 18:10:55 2017 +0000
----------------------------------------------------------------------
.../hc/core5/testing/nio/AsyncRequester.java | 5 +-
.../hc/core5/testing/nio/AsyncServer.java | 5 +-
.../hc/core5/testing/nio/IOReactorExecutor.java | 3 +-
.../hc/core5/testing/nio/ThreadFactoryImpl.java | 61 --------------
.../nio/TestDefaultListeningIOReactor.java | 88 ++++++++++----------
.../core5/concurrent/DefaultThreadFactory.java | 64 ++++++++++++++
.../http/impl/bootstrap/AsyncRequester.java | 5 +-
.../core5/http/impl/bootstrap/AsyncServer.java | 5 +-
.../core5/http/impl/bootstrap/HttpServer.java | 5 +-
.../http/impl/bootstrap/ThreadFactoryImpl.java | 64 --------------
.../reactor/AbstractMultiworkerIOReactor.java | 54 ++++--------
.../reactor/DefaultConnectingIOReactor.java | 15 ++--
.../reactor/DefaultListeningIOReactor.java | 39 ++++-----
.../reactor/IOReactorExceptionHandler.java | 12 ---
.../apache/hc/core5/reactor/IOReactorImpl.java | 21 +----
15 files changed, 174 insertions(+), 272 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java
index 6b864b1..436fdf0 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java
@@ -32,6 +32,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ThreadFactory;
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.net.NamedEndpoint;
@@ -49,8 +50,8 @@ public class AsyncRequester extends IOReactorExecutor<DefaultConnectingIOReactor
public AsyncRequester(final IOReactorConfig ioReactorConfig) {
super(ioReactorConfig,
- new ThreadFactoryImpl("connector", true),
- new ThreadFactoryImpl("requester-dispatch", true));
+ new DefaultThreadFactory("connector", true),
+ new DefaultThreadFactory("requester-dispatch", true));
}
@Override
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java
index 798cf1f..8698134 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java
@@ -32,6 +32,7 @@ import java.net.InetSocketAddress;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.reactor.DefaultListeningIOReactor;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
@@ -42,7 +43,7 @@ import org.apache.hc.core5.reactor.ListenerEndpoint;
public class AsyncServer extends IOReactorExecutor<DefaultListeningIOReactor> {
public AsyncServer(final IOReactorConfig ioReactorConfig) {
- super(ioReactorConfig, new ThreadFactoryImpl("listener", true), new ThreadFactoryImpl("server-dispatch", true));
+ super(ioReactorConfig, new DefaultThreadFactory("listener", true), new DefaultThreadFactory("server-dispatch", true));
}
@Override
@@ -52,7 +53,7 @@ public class AsyncServer extends IOReactorExecutor<DefaultListeningIOReactor> {
final ThreadFactory threadFactory,
final Callback<IOSession> sessionShutdownCallback) throws IOException {
return new DefaultListeningIOReactor(
- ioEventHandlerFactory, ioReactorConfig, threadFactory, sessionShutdownCallback);
+ ioEventHandlerFactory, ioReactorConfig, threadFactory, null, sessionShutdownCallback);
}
public ListenerEndpoint listen(final InetSocketAddress address) {
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
index 2af9092..e0ed7e4 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/IOReactorExecutor.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.http.ExceptionListener;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
@@ -94,7 +95,7 @@ abstract class IOReactorExecutor<T extends AbstractMultiworkerIOReactor> impleme
if (ioReactorRef.compareAndSet(null, createIOReactor(
ioEventHandlerFactory,
ioReactorConfig,
- workerThreadFactory != null ? workerThreadFactory : new ThreadFactoryImpl("i/o dispatch"),
+ workerThreadFactory != null ? workerThreadFactory : new DefaultThreadFactory("i/o dispatch"),
new Callback<IOSession>() {
@Override
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ThreadFactoryImpl.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ThreadFactoryImpl.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ThreadFactoryImpl.java
deleted file mode 100644
index 48d95dd..0000000
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ThreadFactoryImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.testing.nio;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicLong;
-
-class ThreadFactoryImpl implements ThreadFactory {
-
- private final String namePrefix;
- private final ThreadGroup group;
- private final AtomicLong count;
- private final boolean daemon;
-
- ThreadFactoryImpl(final String namePrefix, final ThreadGroup group, final boolean daemon) {
- this.namePrefix = namePrefix;
- this.group = group;
- this.daemon = daemon;
- this.count = new AtomicLong();
- }
-
- ThreadFactoryImpl(final String namePrefix, final boolean daemon) {
- this(namePrefix, null, daemon);
- }
-
- ThreadFactoryImpl(final String namePrefix) {
- this(namePrefix, null, false);
- }
-
- @Override
- public Thread newThread(final Runnable target) {
- final Thread thread = new Thread(this.group, target, this.namePrefix + "-" + this.count.incrementAndGet());
- thread.setDaemon(daemon);
- return thread;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
index cdc79df..6c7ff90 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
@@ -54,45 +54,47 @@ import org.junit.Test;
*/
public class TestDefaultListeningIOReactor {
- protected DefaultListeningIOReactor ioreactor;
+ private DefaultListeningIOReactor ioreactor;
- @Before
- public void setup() throws Exception {
- final IOReactorConfig reactorConfig = IOReactorConfig.custom()
- .setIoThreadCount(1)
- .build();
- this.ioreactor = new DefaultListeningIOReactor(new IOEventHandlerFactory() {
+ private static class NoopIOEventHandlerFactory implements IOEventHandlerFactory {
- @Override
- public IOEventHandler createHandler(final TlsCapableIOSession ioSession, final Object attachment) {
- return new IOEventHandler() {
+ @Override
+ public IOEventHandler createHandler(final TlsCapableIOSession ioSession, final Object attachment) {
+ return new IOEventHandler() {
- @Override
- public void connected(final IOSession session) {
- }
+ @Override
+ public void connected(final IOSession session) {
+ }
- @Override
- public void inputReady(final IOSession session) {
- }
+ @Override
+ public void inputReady(final IOSession session) {
+ }
- @Override
- public void outputReady(final IOSession session) {
- }
+ @Override
+ public void outputReady(final IOSession session) {
+ }
- @Override
- public void timeout(final IOSession session) {
- }
+ @Override
+ public void timeout(final IOSession session) {
+ }
- @Override
- public void exception(final IOSession session, final Exception cause) {
- }
+ @Override
+ public void exception(final IOSession session, final Exception cause) {
+ }
- @Override
- public void disconnected(final IOSession session) {
- }
- };
- }
- }, reactorConfig, null);
+ @Override
+ public void disconnected(final IOSession session) {
+ }
+ };
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+ final IOReactorConfig reactorConfig = IOReactorConfig.custom()
+ .setIoThreadCount(1)
+ .build();
+ this.ioreactor = new DefaultListeningIOReactor(new NoopIOEventHandlerFactory(), reactorConfig, null, null);
}
@After
@@ -194,20 +196,20 @@ public class TestDefaultListeningIOReactor {
@Test
public void testEndpointAlreadyBoundNonFatal() throws Exception {
- ioreactor.setExceptionHandler(new IOReactorExceptionHandler() {
-
- @Override
- public boolean handle(final IOException ex) {
- return (ex instanceof BindException);
- }
-
- @Override
- public boolean handle(final RuntimeException ex) {
- return false;
- }
+ final IOReactorConfig reactorConfig = IOReactorConfig.custom()
+ .setIoThreadCount(1)
+ .build();
+ ioreactor = new DefaultListeningIOReactor(
+ new NoopIOEventHandlerFactory(),
+ reactorConfig,
+ new IOReactorExceptionHandler() {
- });
+ @Override
+ public boolean handle(final IOException ex) {
+ return (ex instanceof BindException);
+ }
+ }, null);
final Thread t = new Thread(new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/concurrent/DefaultThreadFactory.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/concurrent/DefaultThreadFactory.java b/httpcore5/src/main/java/org/apache/hc/core5/concurrent/DefaultThreadFactory.java
new file mode 100644
index 0000000..ab589e9
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/concurrent/DefaultThreadFactory.java
@@ -0,0 +1,64 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.concurrent;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * @since 5.0
+ */
+public class DefaultThreadFactory implements ThreadFactory {
+
+ private final String namePrefix;
+ private final ThreadGroup group;
+ private final AtomicLong count;
+ private final boolean daemon;
+
+ public DefaultThreadFactory(final String namePrefix, final ThreadGroup group, final boolean daemon) {
+ this.namePrefix = namePrefix;
+ this.group = group;
+ this.daemon = daemon;
+ this.count = new AtomicLong();
+ }
+
+ public DefaultThreadFactory(final String namePrefix, final boolean daemon) {
+ this(namePrefix, null, daemon);
+ }
+
+ public DefaultThreadFactory(final String namePrefix) {
+ this(namePrefix, null, false);
+ }
+
+ @Override
+ public Thread newThread(final Runnable target) {
+ final Thread thread = new Thread(this.group, target, this.namePrefix + "-" + this.count.incrementAndGet());
+ thread.setDaemon(daemon);
+ return thread;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java
index f0da106..181bd74 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java
@@ -30,6 +30,7 @@ package org.apache.hc.core5.http.impl.bootstrap;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.http.ExceptionListener;
import org.apache.hc.core5.http.HttpHost;
@@ -53,9 +54,9 @@ public class AsyncRequester extends IOReactorExecutor<DefaultConnectingIOReactor
final ExceptionListener exceptionListener,
final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
super(new DefaultConnectingIOReactor(
- eventHandlerFactory, ioReactorConfig, new ThreadFactoryImpl("requester-dispatch", true), sessionShutdownCallback),
+ eventHandlerFactory, ioReactorConfig, new DefaultThreadFactory("requester-dispatch", true), sessionShutdownCallback),
exceptionListener,
- new ThreadFactoryImpl("connector", true));
+ new DefaultThreadFactory("connector", true));
}
private InetSocketAddress toSocketAddress(final HttpHost host) {
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
index 031223b..0e0ed5e 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
@@ -30,6 +30,7 @@ package org.apache.hc.core5.http.impl.bootstrap;
import java.net.InetSocketAddress;
import java.util.Set;
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.http.ExceptionListener;
import org.apache.hc.core5.reactor.DefaultListeningIOReactor;
@@ -47,9 +48,9 @@ public class AsyncServer extends IOReactorExecutor<DefaultListeningIOReactor> {
final ExceptionListener exceptionListener,
final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
super(new DefaultListeningIOReactor(
- eventHandlerFactory, ioReactorConfig, new ThreadFactoryImpl("server-dispatch", true), sessionShutdownCallback),
+ eventHandlerFactory, ioReactorConfig, new DefaultThreadFactory("server-dispatch", true), null, sessionShutdownCallback),
exceptionListener,
- new ThreadFactoryImpl("listener", true));
+ new DefaultThreadFactory("listener", true));
}
public ListenerEndpoint listen(final InetSocketAddress address) {
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpServer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpServer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpServer.java
index 8d3da4a..258f926 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpServer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpServer.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLServerSocket;
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.http.ExceptionListener;
import org.apache.hc.core5.http.config.SocketConfig;
import org.apache.hc.core5.http.impl.io.DefaultBHttpServerConnection;
@@ -93,12 +94,12 @@ public class HttpServer implements GracefullyCloseable {
this.listenerExecutorService = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
- new ThreadFactoryImpl("HTTP-listener-" + this.port));
+ new DefaultThreadFactory("HTTP-listener-" + this.port));
this.workerThreads = new ThreadGroup("HTTP-workers");
this.workerExecutorService = new WorkerPoolExecutor(
0, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
- new ThreadFactoryImpl("HTTP-worker", this.workerThreads, false));
+ new DefaultThreadFactory("HTTP-worker", this.workerThreads, false));
this.status = new AtomicReference<>(Status.READY);
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/ThreadFactoryImpl.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/ThreadFactoryImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/ThreadFactoryImpl.java
deleted file mode 100644
index 32ce23e..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/ThreadFactoryImpl.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-package org.apache.hc.core5.http.impl.bootstrap;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * @since 4.4
- */
-class ThreadFactoryImpl implements ThreadFactory {
-
- private final String namePrefix;
- private final ThreadGroup group;
- private final AtomicLong count;
- private final boolean daemon;
-
- ThreadFactoryImpl(final String namePrefix, final ThreadGroup group, final boolean daemon) {
- this.namePrefix = namePrefix;
- this.group = group;
- this.daemon = daemon;
- this.count = new AtomicLong();
- }
-
- ThreadFactoryImpl(final String namePrefix, final boolean daemon) {
- this(namePrefix, null, daemon);
- }
-
- ThreadFactoryImpl(final String namePrefix) {
- this(namePrefix, null, false);
- }
-
- @Override
- public Thread newThread(final Runnable target) {
- final Thread thread = new Thread(this.group, target, this.namePrefix + "-" + this.count.incrementAndGet());
- thread.setDaemon(daemon);
- return thread;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
index ebb9c77..1ea7634 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
@@ -42,9 +42,9 @@ import java.util.Date;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.io.ShutdownType;
import org.apache.hc.core5.util.Args;
@@ -92,22 +92,19 @@ import org.apache.hc.core5.util.TimeValue;
*/
public abstract class AbstractMultiworkerIOReactor implements IOReactor {
- protected final IOReactorConfig reactorConfig;
- protected final Selector selector;
+ private final IOReactorConfig reactorConfig;
+ private final Selector selector;
private final int workerCount;
private final IOEventHandlerFactory eventHandlerFactory;
private final ThreadFactory threadFactory;
private final Callback<IOSession> sessionShutdownCallback;
private final IOReactorImpl[] dispatchers;
- private final Worker[] workers;
+ private final IODispatchWorker[] workers;
private final Thread[] threads;
+ private final List<ExceptionEvent> auditLog;
private final AtomicReference<IOReactorStatus> status;
private final Object shutdownMutex;
- //TODO: make final
- protected IOReactorExceptionHandler exceptionHandler;
- protected List<ExceptionEvent> auditLog;
-
private int currentWorker = 0;
/**
@@ -134,12 +131,12 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor {
} catch (final IOException ex) {
throw new IOReactorException("Failure opening selector", ex);
}
- this.threadFactory = threadFactory != null ? threadFactory : new DefaultThreadFactory();
+ this.threadFactory = threadFactory != null ? threadFactory : new DefaultThreadFactory("I/O dispatcher", true);
this.sessionShutdownCallback = sessionShutdownCallback;
this.auditLog = new ArrayList<>();
this.workerCount = this.reactorConfig.getIoThreadCount();
this.dispatchers = new IOReactorImpl[workerCount];
- this.workers = new Worker[workerCount];
+ this.workers = new IODispatchWorker[workerCount];
this.threads = new Thread[workerCount];
this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
this.shutdownMutex = new Object();
@@ -151,6 +148,10 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor {
this(eventHandlerFactory, null, null, sessionShutdownCallback);
}
+ Selector selector() {
+ return selector;
+ }
+
@Override
public IOReactorStatus getStatus() {
return this.status.get();
@@ -176,7 +177,7 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor {
* @param timestamp the time stamp of the exception. Can be
* {@code null} in which case the current date / time will be used.
*/
- protected synchronized void addExceptionEvent(final Throwable ex, final Date timestamp) {
+ void addExceptionEvent(final Throwable ex, final Date timestamp) {
if (ex == null) {
return;
}
@@ -190,20 +191,11 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor {
*
* @param ex the exception thrown by the I/O reactor.
*/
- protected void addExceptionEvent(final Throwable ex) {
+ void addExceptionEvent(final Throwable ex) {
addExceptionEvent(ex, null);
}
/**
- * Sets exception handler for this I/O reactor.
- *
- * @param exceptionHandler the exception handler.
- */
- public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
- this.exceptionHandler = exceptionHandler;
- }
-
- /**
* Triggered to process I/O events registered by the main {@link Selector}.
* <p>
* Super-classes can implement this method to react to the event.
@@ -256,13 +248,12 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor {
final IOReactorImpl dispatcher = new IOReactorImpl(
this.eventHandlerFactory,
this.reactorConfig,
- this.exceptionHandler,
this.sessionShutdownCallback);
this.dispatchers[i] = dispatcher;
}
for (int i = 0; i < this.workerCount; i++) {
final IOReactorImpl dispatcher = this.dispatchers[i];
- this.workers[i] = new Worker(dispatcher);
+ this.workers[i] = new IODispatchWorker(dispatcher);
this.threads[i] = this.threadFactory.newThread(this.workers[i]);
}
@@ -296,7 +287,7 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor {
// Verify I/O dispatchers
for (int i = 0; i < this.workerCount; i++) {
- final Worker worker = this.workers[i];
+ final IODispatchWorker worker = this.workers[i];
final Throwable ex = worker.getThrowable();
if (ex != null) {
throw new IOReactorException("I/O dispatch worker terminated abnormally", ex);
@@ -487,13 +478,13 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor {
}
}
- static class Worker implements Runnable {
+ static class IODispatchWorker implements Runnable {
final IOReactorImpl dispatcher;
private volatile Throwable throwable;
- public Worker(final IOReactorImpl dispatcher) {
+ public IODispatchWorker(final IOReactorImpl dispatcher) {
super();
this.dispatcher = dispatcher;
}
@@ -516,15 +507,4 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor {
}
- static class DefaultThreadFactory implements ThreadFactory {
-
- private final static AtomicLong COUNT = new AtomicLong(1);
-
- @Override
- public Thread newThread(final Runnable r) {
- return new Thread(r, "I/O dispatcher " + COUNT.getAndIncrement());
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
index 5b5c9f2..bd914ae 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
@@ -55,17 +55,18 @@ import org.apache.hc.core5.util.Asserts;
public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
implements ConnectingIOReactor {
+ private final IOReactorConfig reactorConfig;
private final Queue<SessionRequestImpl> requestQueue;
-
private final long selectInterval;
private long lastTimeoutCheck;
public DefaultConnectingIOReactor(
final IOEventHandlerFactory eventHandlerFactory,
- final IOReactorConfig reactorConfig,
+ final IOReactorConfig ioReactorConfig,
final ThreadFactory threadFactory,
final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
- super(eventHandlerFactory, reactorConfig, threadFactory, sessionShutdownCallback);
+ super(eventHandlerFactory, ioReactorConfig, threadFactory, sessionShutdownCallback);
+ this.reactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
this.requestQueue = new ConcurrentLinkedQueue<>();
this.selectInterval = this.reactorConfig.getSelectInterval();
this.lastTimeoutCheck = System.currentTimeMillis();
@@ -103,7 +104,7 @@ public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
processSessionRequests();
if (readyCount > 0) {
- final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
+ final Set<SelectionKey> selectedKeys = selector().selectedKeys();
for (final SelectionKey key : selectedKeys) {
processEvent(key);
@@ -115,7 +116,7 @@ public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
final long currentTime = System.currentTimeMillis();
if ((currentTime - this.lastTimeoutCheck) >= this.selectInterval) {
this.lastTimeoutCheck = currentTime;
- final Set<SelectionKey> keys = this.selector.keys();
+ final Set<SelectionKey> keys = selector().keys();
processTimeouts(keys);
}
}
@@ -197,7 +198,7 @@ public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
callback);
this.requestQueue.add(sessionRequest);
- this.selector.wakeup();
+ selector().wakeup();
return sessionRequest;
}
@@ -252,7 +253,7 @@ public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
final SessionRequestHandle requestHandle = new SessionRequestHandle(request);
try {
- final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT,
+ final SelectionKey key = socketChannel.register(selector(), SelectionKey.OP_CONNECT,
requestHandle);
request.setKey(key);
} catch (final IOException ex) {
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
index 2a8c3d1..1276435 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
@@ -55,6 +55,8 @@ import org.apache.hc.core5.util.Asserts;
public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
implements ListeningIOReactor {
+ private final IOReactorConfig reactorConfig;
+ private final IOReactorExceptionHandler exceptionHandler;
private final Queue<ListenerEndpointImpl> requestQueue;
private final Set<ListenerEndpointImpl> endpoints;
private final Set<SocketAddress> pausedEndpoints;
@@ -65,7 +67,7 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
* Creates an instance of DefaultListeningIOReactor with the given configuration.
*
* @param eventHandlerFactory the factory to create I/O event handlers.
- * @param config I/O reactor configuration.
+ * @param ioReactorConfig I/O reactor configuration.
* @param threadFactory the factory to create threads.
* Can be {@code null}.
* @throws IOReactorException in case if a non-recoverable I/O error.
@@ -74,10 +76,13 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
*/
public DefaultListeningIOReactor(
final IOEventHandlerFactory eventHandlerFactory,
- final IOReactorConfig config,
+ final IOReactorConfig ioReactorConfig,
final ThreadFactory threadFactory,
+ final IOReactorExceptionHandler exceptionHandler,
final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
- super(eventHandlerFactory, config, threadFactory, sessionShutdownCallback);
+ super(eventHandlerFactory, ioReactorConfig, threadFactory, sessionShutdownCallback);
+ this.reactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
+ this.exceptionHandler = exceptionHandler;
this.requestQueue = new ConcurrentLinkedQueue<>();
this.endpoints = Collections.synchronizedSet(new HashSet<ListenerEndpointImpl>());
this.pausedEndpoints = new HashSet<>();
@@ -96,8 +101,9 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
public DefaultListeningIOReactor(
final IOEventHandlerFactory eventHandlerFactory,
final IOReactorConfig config,
+ final IOReactorExceptionHandler exceptionHandler,
final Callback<IOSession> sessionShutdownCallback) throws IOReactorException {
- this(eventHandlerFactory, config, null, sessionShutdownCallback);
+ this(eventHandlerFactory, config, null, exceptionHandler, sessionShutdownCallback);
}
/**
@@ -108,9 +114,8 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
*
* @since 5.0
*/
- public DefaultListeningIOReactor(
- final IOEventHandlerFactory eventHandlerFactory) throws IOReactorException {
- this(eventHandlerFactory, null, null);
+ public DefaultListeningIOReactor(final IOEventHandlerFactory eventHandlerFactory) throws IOReactorException {
+ this(eventHandlerFactory, null, null, null);
}
@Override
@@ -128,7 +133,7 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
}
if (readyCount > 0) {
- final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
+ final Set<SelectionKey> selectedKeys = selector().selectedKeys();
for (final SelectionKey key : selectedKeys) {
processEvent(key);
@@ -150,10 +155,8 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
try {
socketChannel = serverChannel.accept();
} catch (final IOException ex) {
- if (this.exceptionHandler == null ||
- !this.exceptionHandler.handle(ex)) {
- throw new IOReactorException(
- "Failure accepting connection", ex);
+ if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
+ throw new IOReactorException("Failure accepting connection", ex);
}
}
if (socketChannel == null) {
@@ -162,10 +165,8 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
try {
prepareSocket(socketChannel.socket());
} catch (final IOException ex) {
- if (this.exceptionHandler == null ||
- !this.exceptionHandler.handle(ex)) {
- throw new IOReactorException(
- "Failure initalizing socket", ex);
+ if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
+ throw new IOReactorException("Failure initalizing socket", ex);
}
}
enqueuePendingSession(socketChannel, null);
@@ -198,7 +199,7 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
Asserts.check(status == IOReactorStatus.INACTIVE || status == IOReactorStatus.ACTIVE, "I/O reactor has been shut down");
final ListenerEndpointImpl request = createEndpoint(address);
this.requestQueue.add(request);
- this.selector.wakeup();
+ selector().wakeup();
return request;
}
@@ -234,7 +235,7 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
return;
}
try {
- final SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
+ final SelectionKey key = serverChannel.register(selector(), SelectionKey.OP_ACCEPT);
key.attach(request);
request.setKey(key);
} catch (final IOException ex) {
@@ -293,7 +294,7 @@ public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
this.requestQueue.add(request);
}
this.pausedEndpoints.clear();
- this.selector.wakeup();
+ selector().wakeup();
}
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorExceptionHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorExceptionHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorExceptionHandler.java
index 021cc21..3327641 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorExceptionHandler.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorExceptionHandler.java
@@ -49,16 +49,4 @@ public interface IOReactorExceptionHandler {
*/
boolean handle(IOException ex);
- /**
- * This method is expected to examine the runtime exception passed as
- * a parameter and decide whether it is safe to continue execution of
- * the I/O reactor.
- *
- * @param ex potentially recoverable runtime exception
- * @return {@code true} if it is safe to ignore the exception
- * and continue execution of the I/O reactor; {@code false} if the
- * I/O reactor must throw {@link RuntimeException} and terminate
- */
- boolean handle(RuntimeException ex);
-
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/af44438d/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
index 34731af..dbdce05 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
@@ -63,7 +63,6 @@ class IOReactorImpl implements IOReactor {
private final AtomicReference<IOReactorStatus> status;
private final AtomicBoolean shutdownInitiated;
private final Object shutdownMutex;
- private final IOReactorExceptionHandler exceptionHandler;
private final Callback<IOSession> sessionShutdownCallback;
private volatile long lastTimeoutCheck;
@@ -71,12 +70,10 @@ class IOReactorImpl implements IOReactor {
IOReactorImpl(
final IOEventHandlerFactory eventHandlerFactory,
final IOReactorConfig reactorConfig,
- final IOReactorExceptionHandler exceptionHandler,
final Callback<IOSession> sessionShutdownCallback) {
super();
this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
- this.exceptionHandler = exceptionHandler;
this.sessionShutdownCallback = sessionShutdownCallback;
this.shutdownInitiated = new AtomicBoolean(false);
this.closedSessions = new ConcurrentLinkedQueue<>();
@@ -232,12 +229,6 @@ class IOReactorImpl implements IOReactor {
selectedKeys.clear();
}
- private void handleRuntimeException(final RuntimeException ex) {
- if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
- throw ex;
- }
- }
-
private void processEvent(final SelectionKey key) {
final InternalIOSession session = (InternalIOSession) key.attachment();
try {
@@ -253,7 +244,7 @@ class IOReactorImpl implements IOReactor {
session.shutdown(ShutdownType.GRACEFUL);
} catch (final RuntimeException ex) {
session.shutdown(ShutdownType.IMMEDIATE);
- handleRuntimeException(ex);
+ throw ex;
}
}
@@ -288,11 +279,7 @@ class IOReactorImpl implements IOReactor {
if (sessionRequest != null) {
sessionRequest.completed(session);
}
- try {
- session.onConnected();
- } catch (final RuntimeException ex) {
- handleRuntimeException(ex);
- }
+ session.onConnected();
} catch (final CancelledKeyException ex) {
session.shutdown(ShutdownType.GRACEFUL);
}
@@ -309,8 +296,6 @@ class IOReactorImpl implements IOReactor {
session.onDisconnected();
} catch (final CancelledKeyException ex) {
// ignore and move on
- } catch (final RuntimeException ex) {
- handleRuntimeException(ex);
}
}
}
@@ -329,7 +314,7 @@ class IOReactorImpl implements IOReactor {
session.shutdown(ShutdownType.GRACEFUL);
} catch (final RuntimeException ex) {
session.shutdown(ShutdownType.IMMEDIATE);
- handleRuntimeException(ex);
+ throw ex;
}
}
}