You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2016/11/03 19:28:18 UTC
svn commit: r1767954 - in /httpcomponents/httpcore/trunk:
httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/
httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/
httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/ht...
Author: olegk
Date: Thu Nov 3 19:28:18 2016
New Revision: 1767954
URL: http://svn.apache.org/viewvc?rev=1767954&view=rev
Log:
Redesign of non-blocking I/O session APIs
Added:
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java (contents, props changed)
- copied, changed from r1767392, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactor.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ManagedIOSession.java (with props)
Removed:
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactor.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/BaseIOReactor.java
Modified:
httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2IOEventHandler.java
httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOEventHandler.java
httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java
httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestClient.java
httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestServer.java
httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestClient.java
httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestServer.java
httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpointImpl.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncRequester.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncServer.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2IOEventHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2IOEventHandler.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2IOEventHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2IOEventHandler.java Thu Nov 3 19:28:18 2016
@@ -81,6 +81,11 @@ class AbstractHttp2IOEventHandler implem
}
@Override
+ public void exception(final IOSession session, final Exception cause) {
+ streamMultiplexer.onException(cause);
+ }
+
+ @Override
public void disconnected(final IOSession session) {
streamMultiplexer.onDisconnect();
}
Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java Thu Nov 3 19:28:18 2016
@@ -36,7 +36,6 @@ import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
@@ -506,9 +505,11 @@ abstract class AbstractHttp2StreamMultip
}
private void processPendingCommands() throws IOException, HttpException {
- final Queue<Command> commandQueue = ioSession.getCommandQueue();
- while (!commandQueue.isEmpty() && streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
- final Command command = commandQueue.remove();
+ while (streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
+ final Command command = ioSession.getCommand();
+ if (command == null) {
+ break;
+ }
if (command instanceof ShutdownCommand) {
final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
if (shutdownCommand.getType() == ShutdownType.IMMEDIATE) {
@@ -558,9 +559,8 @@ abstract class AbstractHttp2StreamMultip
}
private void cancelPendingCommands() {
- final Deque<Command> commandQueue = ioSession.getCommandQueue();
for (;;) {
- final Command command = commandQueue.poll();
+ final Command command = ioSession.getCommand();
if (command != null) {
command.cancel();
} else {
@@ -1063,14 +1063,12 @@ abstract class AbstractHttp2StreamMultip
@Override
public void close() throws IOException {
- ioSession.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
- ioSession.setEvent(SelectionKey.OP_WRITE);
+ ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
}
@Override
public void shutdown() throws IOException {
- ioSession.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.IMMEDIATE));
- ioSession.setEvent(SelectionKey.OP_WRITE);
+ ioSession.addFirst(new ShutdownCommand(ShutdownType.IMMEDIATE));
}
@Override
Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttpProtocolNegotiator.java Thu Nov 3 19:28:18 2016
@@ -29,6 +29,7 @@ package org.apache.hc.core5.http2.impl.n
import java.io.IOException;
import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.charset.Charset;
@@ -125,6 +126,15 @@ public class ClientHttpProtocolNegotiato
@Override
public void timeout(final IOSession session) {
+ exception(session, new SocketTimeoutException());
+ }
+
+ @Override
+ public void exception(final IOSession session, final Exception cause) {
+ session.close();
+ if (connectionListener != null) {
+ connectionListener.onError(this, new SocketTimeoutException());
+ }
}
@Override
Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttpProtocolNegotiator.java Thu Nov 3 19:28:18 2016
@@ -126,6 +126,11 @@ public class ServerHttpProtocolNegotiato
@Override
public void timeout(final IOSession session) {
+ exception(session, new SocketTimeoutException());
+ }
+
+ @Override
+ public void exception(final IOSession session, final Exception cause) {
session.close();
if (connectionListener != null) {
connectionListener.onError(this, new SocketTimeoutException());
Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOEventHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOEventHandler.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOEventHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOEventHandler.java Thu Nov 3 19:28:18 2016
@@ -84,6 +84,11 @@ public class LoggingIOEventHandler imple
}
@Override
+ public void exception(final IOSession session, final Exception cause) {
+ handler.exception(session, cause);
+ }
+
+ @Override
public void disconnected(final IOSession session) {
if (log.isDebugEnabled()) {
log.debug(id + " " + session + " disconnected");
Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java Thu Nov 3 19:28:18 2016
@@ -32,7 +32,6 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
-import java.util.Deque;
import org.apache.hc.core5.reactor.Command;
import org.apache.hc.core5.reactor.IOEventHandler;
@@ -61,8 +60,18 @@ public class LoggingIOSession implements
}
@Override
- public Deque<Command> getCommandQueue() {
- return this.session.getCommandQueue();
+ public void addLast(final Command command) {
+ this.session.addLast(command);
+ }
+
+ @Override
+ public void addFirst(final Command command) {
+ this.session.addFirst(command);
+ }
+
+ @Override
+ public Command getCommand() {
+ return this.session.getCommand();
}
@Override
Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestClient.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestClient.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestClient.java Thu Nov 3 19:28:18 2016
@@ -30,7 +30,6 @@ package org.apache.hc.core5.testing.nio.
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
-import java.nio.channels.SelectionKey;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -69,8 +68,7 @@ public class Http1TestClient extends Asy
}, new IOSessionCallback() {
@Override
public void execute(final IOSession session) throws IOException {
- session.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
- session.setEvent(SelectionKey.OP_WRITE);
+ session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
}
});
}
Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestServer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestServer.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestServer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http/Http1TestServer.java Thu Nov 3 19:28:18 2016
@@ -29,7 +29,6 @@ package org.apache.hc.core5.testing.nio.
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.channels.SelectionKey;
import org.apache.hc.core5.http.ExceptionListener;
import org.apache.hc.core5.http.config.ConnectionConfig;
@@ -71,8 +70,7 @@ public class Http1TestServer extends Asy
@Override
public void execute(final IOSession session) throws IOException {
- session.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
- session.setEvent(SelectionKey.OP_WRITE);
+ session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
}
});
Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestClient.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestClient.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestClient.java Thu Nov 3 19:28:18 2016
@@ -30,7 +30,6 @@ package org.apache.hc.core5.testing.nio.
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
-import java.nio.channels.SelectionKey;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -82,8 +81,7 @@ public class Http2TestClient extends Asy
@Override
public void execute(final IOSession session) throws IOException {
- session.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
- session.setEvent(SelectionKey.OP_WRITE);
+ session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
}
});
Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestServer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestServer.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestServer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/http2/Http2TestServer.java Thu Nov 3 19:28:18 2016
@@ -29,7 +29,6 @@ package org.apache.hc.core5.testing.nio.
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.channels.SelectionKey;
import java.nio.charset.StandardCharsets;
import org.apache.hc.core5.http.ExceptionListener;
@@ -70,8 +69,7 @@ public class Http2TestServer extends Asy
@Override
public void execute(final IOSession session) throws IOException {
- session.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
- session.setEvent(SelectionKey.OP_WRITE);
+ session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
}
});
Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java Thu Nov 3 19:28:18 2016
@@ -82,6 +82,10 @@ public class TestDefaultListeningIOReact
}
@Override
+ public void exception(final IOSession session, final Exception cause) {
+ }
+
+ @Override
public void disconnected(final IOSession session) {
}
};
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1IOEventHandler.java Thu Nov 3 19:28:18 2016
@@ -80,6 +80,11 @@ class AbstractHttp1IOEventHandler implem
}
@Override
+ public void exception(final IOSession session, final Exception cause) {
+ streamDuplexer.onException(cause);
+ }
+
+ @Override
public void disconnected(final IOSession session) {
streamDuplexer.onDisconnect();
}
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java Thu Nov 3 19:28:18 2016
@@ -35,7 +35,6 @@ import java.nio.channels.ClosedChannelEx
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.WritableByteChannel;
-import java.util.Deque;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -164,7 +163,7 @@ abstract class AbstractHttp1StreamDuplex
private void processCommands() throws HttpException, IOException {
for (;;) {
- final Command command = ioSession.getCommandQueue().poll();
+ final Command command = ioSession.getCommand();
if (command == null) {
return;
}
@@ -320,9 +319,8 @@ abstract class AbstractHttp1StreamDuplex
}
private void cancelPendingCommands() {
- final Deque<Command> commandQueue = ioSession.getCommandQueue();
for (;;) {
- final Command command = commandQueue.poll();
+ final Command command = ioSession.getCommand();
if (command != null) {
command.cancel();
} else {
@@ -433,14 +431,12 @@ abstract class AbstractHttp1StreamDuplex
@Override
public void close() throws IOException {
- ioSession.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
- ioSession.setEvent(SelectionKey.OP_WRITE);
+ ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
}
@Override
public void shutdown() throws IOException {
- ioSession.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.IMMEDIATE));
- ioSession.setEvent(SelectionKey.OP_WRITE);
+ ioSession.addFirst(new ShutdownCommand(ShutdownType.IMMEDIATE));
}
@Override
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpointImpl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpointImpl.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpointImpl.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/ClientEndpointImpl.java Thu Nov 3 19:28:18 2016
@@ -27,7 +27,6 @@
package org.apache.hc.core5.http.impl.nio.bootstrap;
-import java.nio.channels.SelectionKey;
import java.util.concurrent.Future;
import org.apache.hc.core5.annotation.Contract;
@@ -68,8 +67,7 @@ public final class ClientEndpointImpl im
final Command executionCommand = new ExecutionCommand(
exchangeHandler,
context != null ? context : HttpCoreContext.create());
- ioSession.getCommandQueue().add(executionCommand);
- ioSession.setEvent(SelectionKey.OP_WRITE);
+ ioSession.addLast(executionCommand);
if (ioSession.isClosed()) {
executionCommand.cancel();
}
@@ -120,8 +118,7 @@ public final class ClientEndpointImpl im
@Override
public void shutdown(final ShutdownType type) {
- ioSession.getCommandQueue().addFirst(new ShutdownCommand(type));
- ioSession.setEvent(SelectionKey.OP_WRITE);
+ ioSession.addFirst(new ShutdownCommand(type));
}
}
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncRequester.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncRequester.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncRequester.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncRequester.java Thu Nov 3 19:28:18 2016
@@ -30,7 +30,6 @@ package org.apache.hc.core5.http.impl.ni
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
-import java.nio.channels.SelectionKey;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -63,8 +62,7 @@ public class HttpAsyncRequester extends
@Override
public void execute(final IOSession session) throws IOException {
- session.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
- session.setEvent(SelectionKey.OP_WRITE);
+ session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
}
});
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncServer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncServer.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncServer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/bootstrap/HttpAsyncServer.java Thu Nov 3 19:28:18 2016
@@ -27,7 +27,6 @@
package org.apache.hc.core5.http.impl.nio.bootstrap;
import java.io.IOException;
-import java.nio.channels.SelectionKey;
import org.apache.hc.core5.http.ExceptionListener;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
@@ -53,8 +52,7 @@ public class HttpAsyncServer extends Asy
@Override
public void execute(final IOSession session) throws IOException {
- session.getCommandQueue().addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
- session.setEvent(SelectionKey.OP_WRITE);
+ session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
}
});
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java Thu Nov 3 19:28:18 2016
@@ -49,7 +49,7 @@ import org.apache.hc.core5.util.Args;
/**
* Generic implementation of {@link IOReactor} that can run multiple
- * {@link BaseIOReactor} instance in separate worker threads and distribute
+ * {@link IOReactorImpl} instance in separate worker threads and distribute
* newly created I/O session equally across those I/O reactors for a more
* optimal resource utilization and a better I/O performance. Usually it is
* recommended to have one worker I/O reactor per physical CPU core.
@@ -94,7 +94,7 @@ public abstract class AbstractMultiworke
private final int workerCount;
private final IOEventHandlerFactory eventHandlerFactory;
private final ThreadFactory threadFactory;
- private final BaseIOReactor[] dispatchers;
+ private final IOReactorImpl[] dispatchers;
private final Worker[] workers;
private final Thread[] threads;
private final AtomicReference<IOReactorStatus> status;
@@ -136,7 +136,7 @@ public abstract class AbstractMultiworke
}
this.auditLog = new ArrayList<>();
this.workerCount = this.reactorConfig.getIoThreadCount();
- this.dispatchers = new BaseIOReactor[workerCount];
+ this.dispatchers = new IOReactorImpl[workerCount];
this.workers = new Worker[workerCount];
this.threads = new Thread[workerCount];
this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
@@ -257,11 +257,11 @@ public abstract class AbstractMultiworke
try {
// Start I/O dispatchers
for (int i = 0; i < this.dispatchers.length; i++) {
- final BaseIOReactor dispatcher = new BaseIOReactor(this.eventHandlerFactory, this.reactorConfig, this.exceptionHandler);
+ final IOReactorImpl dispatcher = new IOReactorImpl(this.eventHandlerFactory, this.reactorConfig, this.exceptionHandler);
this.dispatchers[i] = dispatcher;
}
for (int i = 0; i < this.workerCount; i++) {
- final BaseIOReactor dispatcher = this.dispatchers[i];
+ final IOReactorImpl dispatcher = this.dispatchers[i];
this.workers[i] = new Worker(dispatcher);
this.threads[i] = this.threadFactory.newThread(this.workers[i]);
}
@@ -400,7 +400,7 @@ public abstract class AbstractMultiworke
if (callback == null) {
return;
}
- for (BaseIOReactor dispatcher: dispatchers) {
+ for (IOReactorImpl dispatcher: dispatchers) {
if (dispatcher != null) {
dispatcher.enumSessions(callback);
}
@@ -412,7 +412,7 @@ public abstract class AbstractMultiworke
if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)) {
selector.wakeup();
for (int i = 0; i < this.workerCount; i++) {
- final BaseIOReactor dispatcher = this.dispatchers[i];
+ final IOReactorImpl dispatcher = this.dispatchers[i];
if (dispatcher != null) {
dispatcher.initiateShutdown();
}
@@ -439,7 +439,7 @@ public abstract class AbstractMultiworke
}
}
for (int i = 0; i < this.dispatchers.length; i++) {
- final BaseIOReactor dispatcher = this.dispatchers[i];
+ final IOReactorImpl dispatcher = this.dispatchers[i];
if (dispatcher != null) {
if (dispatcher.getStatus().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
dispatcher.awaitShutdown(remaining, TimeUnit.MILLISECONDS);
@@ -464,7 +464,7 @@ public abstract class AbstractMultiworke
this.status.set(IOReactorStatus.SHUT_DOWN);
this.selector.wakeup();
for (int i = 0; i < this.dispatchers.length; i++) {
- final BaseIOReactor dispatcher = this.dispatchers[i];
+ final IOReactorImpl dispatcher = this.dispatchers[i];
if (dispatcher != null) {
dispatcher.forceShutdown();
}
@@ -503,11 +503,11 @@ public abstract class AbstractMultiworke
static class Worker implements Runnable {
- final BaseIOReactor dispatcher;
+ final IOReactorImpl dispatcher;
private volatile Exception exception;
- public Worker(final BaseIOReactor dispatcher) {
+ public Worker(final IOReactorImpl dispatcher) {
super();
this.dispatcher = dispatcher;
}
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java Thu Nov 3 19:28:18 2016
@@ -67,6 +67,13 @@ public interface IOEventHandler {
void timeout(IOSession session);
/**
+ * Triggered when the given session throws a exception.
+ *
+ * @param session the I/O session.
+ */
+ void exception(IOSession session, Exception cause);
+
+ /**
* Triggered when the given session has been terminated.
*
* @param session the I/O session.
Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java (from r1767392, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactor.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactor.java&r1=1767392&r2=1767954&rev=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java Thu Nov 3 19:28:18 2016
@@ -42,36 +42,33 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.util.Args;
-import org.apache.hc.core5.util.Asserts;
/**
- * Generic implementation of {@link IOReactor} that can used as a subclass
- * for more specialized I/O reactors. It is based on a single {@link Selector}
- * instance.
+ * {@link IOReactor} implementation.
*
* @since 4.0
*/
-abstract class AbstractIOReactor implements IOReactor {
+class IOReactorImpl implements IOReactor {
private final IOReactorConfig reactorConfig;
private final IOEventHandlerFactory eventHandlerFactory;
private final Selector selector;
- private final Queue<IOSession> closedSessions;
+ private final Queue<ManagedIOSession> closedSessions;
private final Queue<PendingSession> pendingSessions;
+ private final AtomicReference<IOReactorStatus> status;
private final Object shutdownMutex;
+ private final IOReactorExceptionHandler exceptionHandler;
- private final AtomicReference<IOReactorStatus> status;
+ private volatile long lastTimeoutCheck;
- /**
- * Creates new AbstractIOReactor instance.
- *
- * @param eventHandlerFactory the event handler factory.
- * @param reactorConfig the reactor configuration.
- */
- public AbstractIOReactor(final IOEventHandlerFactory eventHandlerFactory, final IOReactorConfig reactorConfig) {
+ IOReactorImpl(
+ final IOEventHandlerFactory eventHandlerFactory,
+ final IOReactorConfig reactorConfig,
+ final IOReactorExceptionHandler exceptionHandler) {
super();
this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
+ this.exceptionHandler = exceptionHandler;
this.closedSessions = new ConcurrentLinkedQueue<>();
this.pendingSessions = new ConcurrentLinkedQueue<>();
try {
@@ -83,112 +80,12 @@ abstract class AbstractIOReactor impleme
this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
}
- /**
- * Triggered when the key signals {@link SelectionKey#OP_ACCEPT} readiness.
- * <p>
- * Super-classes can implement this method to react to the event.
- *
- * @param key the selection key.
- */
- protected abstract void acceptable(SelectionKey key);
-
- /**
- * Triggered when the key signals {@link SelectionKey#OP_CONNECT} readiness.
- * <p>
- * Super-classes can implement this method to react to the event.
- *
- * @param key the selection key.
- */
- protected abstract void connectable(SelectionKey key);
-
- /**
- * Triggered when the key signals {@link SelectionKey#OP_READ} readiness.
- * <p>
- * Super-classes can implement this method to react to the event.
- *
- * @param key the selection key.
- */
- protected abstract void readable(SelectionKey key);
-
- /**
- * Triggered when the key signals {@link SelectionKey#OP_WRITE} readiness.
- * <p>
- * Super-classes can implement this method to react to the event.
- *
- * @param key the selection key.
- */
- protected abstract void writable(SelectionKey key);
-
- /**
- * Triggered to validate keys currently registered with the selector. This
- * method is called after each I/O select loop.
- * <p>
- * Super-classes can implement this method to run validity checks on
- * active sessions and include additional processing that needs to be
- * executed after each I/O select loop.
- *
- * @param keys all selection keys registered with the selector.
- */
- protected abstract void validate(Set<SelectionKey> keys);
-
- /**
- * Triggered when new session has been created.
- * <p>
- * Super-classes can implement this method to react to the event.
- *
- * @param session new I/O session.
- */
- protected abstract void sessionCreated(final IOSession session);
-
- /**
- * Triggered when a session has been closed.
- * <p>
- * Super-classes can implement this method to react to the event.
- *
- * @param session closed I/O session.
- */
- protected abstract void sessionClosed(final IOSession session);
-
- /**
- * Triggered when a session has timed out.
- * <p>
- * Super-classes can implement this method to react to the event.
- *
- * @param session timed out I/O session.
- */
- protected abstract void sessionTimedOut(final IOSession session);
-
- /**
- * Obtains {@link IOSession} instance associated with the given selection
- * key.
- *
- * @param key the selection key.
- * @return I/O session.
- */
- protected IOSession getSession(final SelectionKey key) {
- return (IOSession) key.attachment();
- }
-
- protected IOEventHandler ensureEventHandler(final IOSession ioSession) {
- Asserts.notNull(ioSession, "IO session");
- final IOEventHandler handler = ioSession.getHandler();
- Asserts.notNull(handler, "IO event handler");
- return handler;
- }
-
@Override
public IOReactorStatus getStatus() {
return this.status.get();
}
- /**
- * Enqueues pending session. The socket channel will be asynchronously registered
- * with the selector.
- *
- * @param socketChannel the new socketChannel.
- * @param sessionRequest the session request if applicable.
- */
- public void enqueuePendingSession(final SocketChannel socketChannel, final SessionRequestImpl sessionRequest) {
+ void enqueuePendingSession(final SocketChannel socketChannel, final SessionRequestImpl sessionRequest) {
Args.notNull(socketChannel, "SocketChannel");
this.pendingSessions.add(new PendingSession(socketChannel, sessionRequest));
this.selector.wakeup();
@@ -205,15 +102,6 @@ abstract class AbstractIOReactor impleme
* The method will remain blocked unto the I/O reactor is shut down or the
* execution thread is interrupted.
*
- * @see #acceptable(SelectionKey)
- * @see #connectable(SelectionKey)
- * @see #readable(SelectionKey)
- * @see #writable(SelectionKey)
- * @see #timeoutCheck(SelectionKey, long)
- * @see #validate(Set)
- * @see #sessionCreated(IOSession)
- * @see #sessionClosed(IOSession)
- *
* @throws InterruptedIOException if the dispatch thread is interrupted.
* @throws IOReactorException in case if a non-recoverable I/O error.
*/
@@ -284,51 +172,59 @@ abstract class AbstractIOReactor impleme
}
}
+ private void validate(final Set<SelectionKey> keys) {
+ final long currentTime = System.currentTimeMillis();
+ if( (currentTime - this.lastTimeoutCheck) >= this.reactorConfig.getSelectInterval()) {
+ this.lastTimeoutCheck = currentTime;
+ if (keys != null) {
+ for (final SelectionKey key : keys) {
+ timeoutCheck(key, currentTime);
+ }
+ }
+ }
+ }
+
private void processEvents(final Set<SelectionKey> selectedKeys) {
for (final SelectionKey key : selectedKeys) {
-
processEvent(key);
-
}
selectedKeys.clear();
}
- /**
- * Processes new event on the given selection key.
- *
- * @param key the selection key that triggered an event.
- */
- protected void processEvent(final SelectionKey key) {
- final IOSessionImpl session = (IOSessionImpl) key.attachment();
+ private void handleRuntimeException(final RuntimeException ex) {
+ if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
+ throw ex;
+ }
+ }
+
+ private void processEvent(final SelectionKey key) {
+ final ManagedIOSession session = (ManagedIOSession) key.attachment();
try {
- if (key.isAcceptable()) {
- acceptable(key);
- }
- if (key.isConnectable()) {
- connectable(key);
- }
if (key.isReadable()) {
- session.resetLastRead();
- readable(key);
+ session.updateAccessTime();
+ session.onInputReady();
}
if (key.isWritable()) {
- session.resetLastWrite();
- writable(key);
+ session.updateAccessTime();
+ session.onOutputReady();
}
} catch (final CancelledKeyException ex) {
session.shutdown();
+ } catch (final RuntimeException ex) {
+ session.shutdown();
+ handleRuntimeException(ex);
}
}
private void processPendingSessions() throws IOReactorException {
PendingSession pendingSession;
while ((pendingSession = this.pendingSessions.poll()) != null) {
- final IOSession session;
+ final ManagedIOSession session;
try {
final SocketChannel socketChannel = pendingSession.socketChannel;
socketChannel.configureBlocking(false);
final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_READ);
- session = new IOSessionImpl(key, socketChannel, this.closedSessions);
+ session = new ManagedIOSession(new IOSessionImpl(key, socketChannel), closedSessions);
session.setHandler(this.eventHandlerFactory.createHandler(session));
session.setSocketTimeout(this.reactorConfig.getSoTimeout());
key.attach(session);
@@ -346,7 +242,11 @@ abstract class AbstractIOReactor impleme
if (sessionRequest != null) {
sessionRequest.completed(session);
}
- sessionCreated(session);
+ try {
+ session.onConnected();
+ } catch (final RuntimeException ex) {
+ handleRuntimeException(ex);
+ }
} catch (final CancelledKeyException ex) {
session.shutdown();
}
@@ -354,33 +254,36 @@ abstract class AbstractIOReactor impleme
}
private void processClosedSessions() {
- IOSession session;
- while ((session = this.closedSessions.poll()) != null) {
+ for (;;) {
+ final ManagedIOSession session = this.closedSessions.poll();
+ if (session == null) {
+ break;
+ }
try {
- sessionClosed(session);
+ session.onDisconnected();
} catch (final CancelledKeyException ex) {
// ignore and move on
+ } catch (final RuntimeException ex) {
+ handleRuntimeException(ex);
}
}
}
- /**
- * Triggered to verify whether the I/O session associated with the
- * given selection key has not timed out.
- * <p>
- * Super-classes can implement this method to react to the event.
- *
- * @param key the selection key.
- * @param now current time as long value.
- */
- protected void timeoutCheck(final SelectionKey key, final long now) {
- final IOSessionImpl session = (IOSessionImpl) key.attachment();
+ private void timeoutCheck(final SelectionKey key, final long now) {
+ final ManagedIOSession session = (ManagedIOSession) key.attachment();
if (session != null) {
- final int timeout = session.getSocketTimeout();
- if (timeout > 0) {
- if (session.getLastAccessTime() + timeout < now) {
- sessionTimedOut(session);
+ try {
+ final int timeout = session.getSocketTimeout();
+ if (timeout > 0) {
+ if (session.getLastAccessTime() + timeout < now) {
+ session.onTimeout();
+ }
}
+ } catch (final CancelledKeyException ex) {
+ session.shutdown();
+ } catch (final RuntimeException ex) {
+ session.shutdown();
+ handleRuntimeException(ex);
}
}
}
@@ -400,15 +303,11 @@ abstract class AbstractIOReactor impleme
}
}
- /**
- * Closes out all active channels registered with the selector of
- * this I/O reactor.
- */
- protected void closeActiveChannels() {
+ private void closeActiveChannels() {
try {
final Set<SelectionKey> keys = this.selector.keys();
for (final SelectionKey key : keys) {
- final IOSession session = getSession(key);
+ final ManagedIOSession session = (ManagedIOSession) key.attachment();
if (session != null) {
session.close();
}
@@ -418,19 +317,18 @@ abstract class AbstractIOReactor impleme
}
}
- /**
- * Enumerates all active sessions
- *
- * @since 5.0
- */
- protected void enumSessions(final IOSessionCallback callback) throws IOException {
+ void enumSessions(final IOSessionCallback callback) throws IOException {
if (this.selector.isOpen()) {
try {
final Set<SelectionKey> keys = this.selector.keys();
for (final SelectionKey key : keys) {
- final IOSession session = getSession(key);
+ final ManagedIOSession session = (ManagedIOSession) key.attachment();
if (session != null) {
- callback.execute(session);
+ try {
+ callback.execute(session);
+ } catch (CancelledKeyException ex) {
+ session.close();
+ }
}
}
} catch (ClosedSelectorException ignore) {
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java Thu Nov 3 19:28:18 2016
@@ -29,7 +29,6 @@ package org.apache.hc.core5.reactor;
import java.net.SocketAddress;
import java.nio.channels.ByteChannel;
-import java.util.Deque;
/**
* IOSession interface represents a sequence of logically related data exchanges
@@ -49,12 +48,6 @@ import java.util.Deque;
*/
public interface IOSession {
- /**
- * Name of the context attribute key, which can be used to obtain the
- * session attachment object.
- */
- String ATTACHMENT_KEY = "http.session.attachment";
-
int ACTIVE = 0;
int CLOSING = 1;
int CLOSED = Integer.MAX_VALUE;
@@ -74,11 +67,24 @@ public interface IOSession {
void setHandler(IOEventHandler handler);
/**
- * Returns the command queue for this session.
+ * Inserts {@link Command} at the end of the command queue.
+ *
+ * @since 5.0
+ */
+ void addLast(Command command);
+
+ /**
+ * Inserts {@link Command} at the front of the command queue.
+ *
+ * @since 5.0
+ */
+ void addFirst(Command command);
+ /**
+ * Retrieves and removes first {@link Command} from the command queue.
*
* @since 5.0
*/
- Deque<Command> getCommandQueue();
+ Command getCommand();
/**
* Returns the underlying I/O channel associated with this session.
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java Thu Nov 3 19:28:18 2016
@@ -34,9 +34,6 @@ import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Deque;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
@@ -54,10 +51,6 @@ class IOSessionImpl implements IOSession
private final SelectionKey key;
private final SocketChannel channel;
- private final Map<String, Object> attributes;
- private final Queue<IOSession> closedSessions;
-
- private final long startedTime;
private final AtomicInteger status;
private final AtomicInteger eventMask;
private final Deque<Command> commandQueue;
@@ -65,37 +58,22 @@ class IOSessionImpl implements IOSession
private volatile IOEventHandler eventHandler;
private volatile int socketTimeout;
- private volatile long lastReadTime;
- private volatile long lastWriteTime;
- private volatile long lastAccessTime;
-
/**
* Creates new instance of IOSessionImpl.
*
* @param key the selection key.
* @param socketChannel the socket channel
- * @param closedSessions the queue containing closed sessions
*
* @since 4.1
*/
- public IOSessionImpl(
- final SelectionKey key,
- final SocketChannel socketChannel,
- final Queue<IOSession> closedSessions) {
+ public IOSessionImpl(final SelectionKey key, final SocketChannel socketChannel) {
super();
this.key = Args.notNull(key, "Selection key");
this.channel = Args.notNull(socketChannel, "Socket channel");
- this.closedSessions = closedSessions;
- this.attributes = new ConcurrentHashMap<>();
this.commandQueue = new ConcurrentLinkedDeque<>();
this.socketTimeout = 0;
this.eventMask = new AtomicInteger(key.interestOps());
this.status = new AtomicInteger(ACTIVE);
- final long now = System.currentTimeMillis();
- this.startedTime = now;
- this.lastReadTime = now;
- this.lastWriteTime = now;
- this.lastAccessTime = now;
}
@Override
@@ -109,8 +87,20 @@ class IOSessionImpl implements IOSession
}
@Override
- public Deque<Command> getCommandQueue() {
- return commandQueue;
+ public void addLast(final Command command) {
+ commandQueue.addLast(command);
+ setEvent(SelectionKey.OP_WRITE);
+ }
+
+ @Override
+ public void addFirst(final Command command) {
+ commandQueue.addFirst(command);
+ setEvent(SelectionKey.OP_WRITE);
+ }
+
+ @Override
+ public Command getCommand() {
+ return commandQueue.poll();
}
@Override
@@ -188,15 +178,11 @@ class IOSessionImpl implements IOSession
@Override
public void setSocketTimeout(final int timeout) {
this.socketTimeout = timeout;
- this.lastAccessTime = System.currentTimeMillis();
}
@Override
public void close() {
if (this.status.compareAndSet(ACTIVE, CLOSED)) {
- if (this.closedSessions != null) {
- this.closedSessions.add(this);
- }
this.key.cancel();
this.key.attach(null);
try {
@@ -226,34 +212,6 @@ class IOSessionImpl implements IOSession
close();
}
- public long getStartedTime() {
- return this.startedTime;
- }
-
- public long getLastReadTime() {
- return this.lastReadTime;
- }
-
- public long getLastWriteTime() {
- return this.lastWriteTime;
- }
-
- public long getLastAccessTime() {
- return this.lastAccessTime;
- }
-
- void resetLastRead() {
- final long now = System.currentTimeMillis();
- this.lastReadTime = now;
- this.lastAccessTime = now;
- }
-
- void resetLastWrite() {
- final long now = System.currentTimeMillis();
- this.lastWriteTime = now;
- this.lastAccessTime = now;
- }
-
private static void formatOps(final StringBuilder buffer, final int ops) {
if ((ops & SelectionKey.OP_READ) > 0) {
buffer.append('r');
Added: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ManagedIOSession.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ManagedIOSession.java?rev=1767954&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ManagedIOSession.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ManagedIOSession.java Thu Nov 3 19:28:18 2016
@@ -0,0 +1,288 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.channels.ByteChannel;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.reactor.ssl.SSLIOSession;
+import org.apache.hc.core5.util.Asserts;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+class ManagedIOSession implements IOSession {
+
+ private final IOSession ioSession;
+ private final AtomicReference<SSLIOSession> tlsSessionRef;
+ private final Queue<ManagedIOSession> closedSessions;
+ private final AtomicBoolean closed;
+
+ private volatile long lastAccessTime;
+
+ ManagedIOSession(final IOSession ioSession, final Queue<ManagedIOSession> closedSessions) {
+ this.ioSession = ioSession;
+ this.closedSessions = closedSessions;
+ this.tlsSessionRef = new AtomicReference<>(null);
+ this.closed = new AtomicBoolean(false);
+ updateAccessTime();
+ }
+
+ void updateAccessTime() {
+ this.lastAccessTime = System.currentTimeMillis();
+ }
+
+ long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ private IOSession getSessionImpl() {
+ final SSLIOSession tlsSession = tlsSessionRef.get();
+ if (tlsSession != null) {
+ return tlsSession;
+ } else {
+ return ioSession;
+ }
+ }
+
+ IOEventHandler getEventHandler() {
+ final IOEventHandler handler = ioSession.getHandler();
+ Asserts.notNull(handler, "IO event handler");
+ return handler;
+ }
+
+ void onConnected() {
+ try {
+ final IOEventHandler handler = getEventHandler();
+ final SSLIOSession tlsSession = tlsSessionRef.get();
+ if (tlsSession != null) {
+ try {
+ if (tlsSession.isInitialized()) {
+ tlsSession.initialize();
+ }
+ handler.connected(this);
+ } catch (Exception ex) {
+ handler.exception(tlsSession, ex);
+ }
+ } else {
+ handler.connected(this);
+ }
+ } catch (RuntimeException ex) {
+ shutdown();
+ throw ex;
+ }
+ }
+
+ void onInputReady() {
+ try {
+ final IOEventHandler handler = getEventHandler();
+ final SSLIOSession tlsSession = tlsSessionRef.get();
+ if (tlsSession != null) {
+ try {
+ if (!tlsSession.isInitialized()) {
+ tlsSession.initialize();
+ }
+ if (tlsSession.isAppInputReady()) {
+ handler.inputReady(this);
+ }
+ tlsSession.inboundTransport();
+ } catch (final IOException ex) {
+ handler.exception(tlsSession, ex);
+ tlsSession.shutdown();
+ }
+ } else {
+ handler.inputReady(this);
+ }
+ } catch (RuntimeException ex) {
+ shutdown();
+ throw ex;
+ }
+ }
+
+ void onOutputReady() {
+ try {
+ final IOEventHandler handler = getEventHandler();
+ final SSLIOSession tlsSession = tlsSessionRef.get();
+ if (tlsSession != null) {
+ try {
+ if (!tlsSession.isInitialized()) {
+ tlsSession.initialize();
+ }
+ if (tlsSession.isAppOutputReady()) {
+ handler.outputReady(this);
+ }
+ tlsSession.outboundTransport();
+ } catch (final IOException ex) {
+ handler.exception(tlsSession, ex);
+ tlsSession.shutdown();
+ }
+ } else {
+ handler.outputReady(this);
+ }
+ } catch (RuntimeException ex) {
+ shutdown();
+ throw ex;
+ }
+ }
+
+ void onTimeout() {
+ try {
+ final IOEventHandler handler = getEventHandler();
+ handler.timeout(this);
+ final SSLIOSession tlsSession = tlsSessionRef.get();
+ if (tlsSession != null) {
+ if (tlsSession.isOutboundDone() && !tlsSession.isInboundDone()) {
+ // The session failed to terminate cleanly
+ tlsSession.shutdown();
+ }
+ }
+ } catch (RuntimeException ex) {
+ shutdown();
+ throw ex;
+ }
+ }
+
+ void onDisconnected() {
+ final IOEventHandler handler = getEventHandler();
+ handler.disconnected(this);
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ try {
+ getSessionImpl().close();
+ } finally {
+ closedSessions.add(this);
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ if (closed.compareAndSet(false, true)) {
+ try {
+ getSessionImpl().shutdown();
+ } finally {
+ closedSessions.add(this);
+ }
+ }
+ }
+
+ @Override
+ public int getStatus() {
+ return getSessionImpl().getStatus();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return getSessionImpl().isClosed();
+ }
+
+ @Override
+ public IOEventHandler getHandler() {
+ return ioSession.getHandler();
+ }
+
+ public void setHandler(final IOEventHandler eventHandler) {
+ ioSession.setHandler(eventHandler);
+ }
+
+ @Override
+ public void addLast(final Command command) {
+ ioSession.addLast(command);
+ }
+
+ @Override
+ public void addFirst(final Command command) {
+ ioSession.addFirst(command);
+ }
+
+ @Override
+ public Command getCommand() {
+ return ioSession.getCommand();
+ }
+
+ @Override
+ public ByteChannel channel() {
+ return getSessionImpl().channel();
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress() {
+ return ioSession.getRemoteAddress();
+ }
+
+ @Override
+ public SocketAddress getLocalAddress() {
+ return ioSession.getLocalAddress();
+ }
+
+ @Override
+ public int getEventMask() {
+ return getSessionImpl().getEventMask();
+ }
+
+ @Override
+ public void setEventMask(final int ops) {
+ getSessionImpl().setEventMask(ops);
+ }
+
+ @Override
+ public void setEvent(final int op) {
+ getSessionImpl().setEvent(op);
+ }
+
+ @Override
+ public void clearEvent(final int op) {
+ getSessionImpl().clearEvent(op);
+ }
+
+ @Override
+ public int getSocketTimeout() {
+ return ioSession.getSocketTimeout();
+ }
+
+ @Override
+ public void setSocketTimeout(final int timeout) {
+ ioSession.setSocketTimeout(timeout);
+ }
+
+ @Override
+ public String toString() {
+ return getSessionImpl().toString();
+ }
+
+}
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ManagedIOSession.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ManagedIOSession.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ManagedIOSession.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java?rev=1767954&r1=1767953&r2=1767954&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java Thu Nov 3 19:28:18 2016
@@ -34,7 +34,6 @@ import java.nio.channels.ByteChannel;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
-import java.util.Deque;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
@@ -58,28 +57,12 @@ import org.apache.hc.core5.util.Asserts;
* {@code SSLIOSession} is a decorator class intended to transparently extend
* an {@link IOSession} with transport layer security capabilities based on
* the SSL/TLS protocol.
- * <p>
- * The resultant instance of {@code SSLIOSession} must be added to the original
- * I/O session as an attribute with the {@link #SESSION_KEY} key.
- * <pre>
- * SSLContext sslcontext = SSLContext.getInstance("SSL");
- * sslcontext.init(null, null, null);
- * SSLIOSession sslsession = new SSLIOSession(
- * iosession, SSLMode.CLIENT, sslcontext, null);
- * iosession.setAttribute(SSLIOSession.SESSION_KEY, sslsession);
- * </pre>
*
* @since 4.2
*/
@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public class SSLIOSession implements IOSession {
- /**
- * Name of the context attribute key, which can be used to obtain the
- * SSL session.
- */
- public static final String SESSION_KEY = "http.session.ssl";
-
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
private final IOSession session;
@@ -179,10 +162,6 @@ public class SSLIOSession implements IOS
this(session, sslMode, null, sslContext, handler);
}
- protected SSLSetupHandler getSSLSetupHandler() {
- return this.handler;
- }
-
/**
* Returns {@code true} is the session has been fully initialized,
* {@code false} otherwise.
@@ -644,8 +623,18 @@ public class SSLIOSession implements IOS
}
@Override
- public Deque<Command> getCommandQueue() {
- return this.session.getCommandQueue();
+ public void addLast(final Command command) {
+ this.session.addLast(command);
+ }
+
+ @Override
+ public void addFirst(final Command command) {
+ this.session.addFirst(command);
+ }
+
+ @Override
+ public Command getCommand() {
+ return this.session.getCommand();
}
@Override