You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2017/11/01 09:19:14 UTC
[3/4] httpcomponents-core git commit: Shared I/O session and HTTP/2
connection pool implementations
Shared I/O session and HTTP/2 connection pool implementations
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/221f695c
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/221f695c
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/221f695c
Branch: refs/heads/master
Commit: 221f695c3660518eb941c9b8ed95ad69fc85048d
Parents: 3d80bec
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Sat Oct 28 13:02:25 2017 +0200
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Wed Nov 1 09:59:59 2017 +0100
----------------------------------------------------------------------
.../hc/core5/http2/nio/pool/H2ConnPool.java | 156 ++++++++++
.../hc/core5/concurrent/ComplexFuture.java | 14 +
.../hc/core5/reactor/AbstractIOSessionPool.java | 300 +++++++++++++++++++
.../reactor/TestAbstractIOSessionPool.java | 289 ++++++++++++++++++
4 files changed, 759 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/221f695c/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/pool/H2ConnPool.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/pool/H2ConnPool.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/pool/H2ConnPool.java
new file mode 100644
index 0000000..700698a
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/pool/H2ConnPool.java
@@ -0,0 +1,156 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http2.nio.pool;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Future;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.function.Resolver;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.http.impl.DefaultAddressResolver;
+import org.apache.hc.core5.http.nio.command.ShutdownCommand;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.http2.nio.command.PingCommand;
+import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.AbstractIOSessionPool;
+import org.apache.hc.core5.reactor.ConnectionInitiator;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public final class H2ConnPool extends AbstractIOSessionPool<HttpHost> {
+
+ private final ConnectionInitiator connectionInitiator;
+ private final Resolver<HttpHost, InetSocketAddress> addressResolver;
+ private final TlsStrategy tlsStrategy;
+
+ private volatile TimeValue validateAfterInactivity;
+
+ public H2ConnPool(
+ final ConnectionInitiator connectionInitiator,
+ final Resolver<HttpHost, InetSocketAddress> addressResolver,
+ final TlsStrategy tlsStrategy) {
+ super();
+ this.connectionInitiator = Args.notNull(connectionInitiator, "Connection initiator");
+ this.addressResolver = addressResolver != null ? addressResolver : DefaultAddressResolver.INSTANCE;
+ this.tlsStrategy = tlsStrategy;
+ }
+
+ public TimeValue getValidateAfterInactivity() {
+ return validateAfterInactivity;
+ }
+
+ public void setValidateAfterInactivity(final TimeValue timeValue) {
+ this.validateAfterInactivity = timeValue;
+ }
+
+ @Override
+ protected void closeSession(
+ final IOSession ioSession,
+ final ShutdownType shutdownType) {
+ if (shutdownType == ShutdownType.GRACEFUL) {
+ ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+ } else {
+ ioSession.shutdown(shutdownType);
+ }
+ }
+
+ @Override
+ protected Future<IOSession> connectSession(
+ final HttpHost namedEndpoint,
+ final Timeout requestTimeout,
+ final FutureCallback<IOSession> callback) {
+ final InetSocketAddress remoteAddress = addressResolver.resolve(namedEndpoint);
+ return connectionInitiator.connect(namedEndpoint, remoteAddress, null, requestTimeout, null, new FutureCallback<IOSession>() {
+
+ @Override
+ public void completed(final IOSession ioSession) {
+ if (tlsStrategy != null
+ && URIScheme.HTTPS.same(namedEndpoint.getSchemeName())
+ && ioSession instanceof TransportSecurityLayer) {
+ tlsStrategy.upgrade(
+ (TransportSecurityLayer) ioSession,
+ namedEndpoint,
+ ioSession.getLocalAddress(),
+ ioSession.getRemoteAddress(),
+ null);
+ ioSession.setSocketTimeout(requestTimeout.toMillisIntBound());
+ }
+ callback.completed(ioSession);
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ callback.failed(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ callback.cancelled();
+ }
+
+ });
+ }
+
+ @Override
+ protected void validateSession(
+ final IOSession ioSession,
+ final Callback<Boolean> callback) {
+ final TimeValue timeValue = validateAfterInactivity;
+ if (TimeValue.isPositive(timeValue)) {
+ final long lastAccessTime = Math.min(ioSession.getLastReadTime(), ioSession.getLastWriteTime());
+ final long deadline = lastAccessTime + timeValue.toMillis();
+ if (deadline <= System.currentTimeMillis()) {
+ final int socketTimeout = ioSession.getSocketTimeout();
+ ioSession.addLast(new PingCommand(new BasicPingHandler(new Callback<Boolean>() {
+
+ @Override
+ public void execute(final Boolean result) {
+ ioSession.setSocketTimeout(socketTimeout);
+ callback.execute(result);
+ }
+
+ })));
+ return;
+ }
+ }
+ callback.execute(true);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/221f695c/httpcore5/src/main/java/org/apache/hc/core5/concurrent/ComplexFuture.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/concurrent/ComplexFuture.java b/httpcore5/src/main/java/org/apache/hc/core5/concurrent/ComplexFuture.java
index 3b94528..c0c8adc 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/concurrent/ComplexFuture.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/concurrent/ComplexFuture.java
@@ -74,6 +74,20 @@ public final class ComplexFuture<T> extends BasicFuture<T> {
}
@Override
+ public boolean completed(final T result) {
+ final boolean completed = super.completed(result);
+ clearDependency();
+ return completed;
+ }
+
+ @Override
+ public boolean failed(final Exception exception) {
+ final boolean failed = super.failed(exception);
+ clearDependency();
+ return failed;
+ }
+
+ @Override
public boolean cancel(final boolean mayInterruptIfRunning) {
final boolean cancelled = super.cancel(mayInterruptIfRunning);
final Cancellable dependency = dependencyRef.getAndSet(null);
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/221f695c/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOSessionPool.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOSessionPool.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOSessionPool.java
new file mode 100644
index 0000000..adfa360
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOSessionPool.java
@@ -0,0 +1,300 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactor;
+
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.ComplexFuture;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.http.ConnectionClosedException;
+import org.apache.hc.core5.io.GracefullyCloseable;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.Asserts;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public abstract class AbstractIOSessionPool<T> implements GracefullyCloseable {
+
+ private final ConcurrentMap<T, PoolEntry> sessionPool;
+ private final AtomicBoolean closed;
+
+ public AbstractIOSessionPool() {
+ super();
+ this.sessionPool = new ConcurrentHashMap<>();
+ this.closed = new AtomicBoolean(false);
+ }
+
+ protected abstract Future<IOSession> connectSession(
+ T namedEndpoint,
+ Timeout requestTimeout,
+ FutureCallback<IOSession> callback);
+
+ protected abstract void validateSession(
+ IOSession ioSession,
+ Callback<Boolean> callback);
+
+ protected abstract void closeSession(
+ IOSession ioSession,
+ ShutdownType shutdownType);
+
+ @Override
+ public final void shutdown(final ShutdownType shutdownType) {
+ if (closed.compareAndSet(false, true)) {
+ for (final PoolEntry poolEntry : sessionPool.values()) {
+ synchronized (poolEntry) {
+ if (poolEntry.session != null) {
+ closeSession(poolEntry.session, shutdownType);
+ poolEntry.session = null;
+ }
+ if (poolEntry.sessionFuture != null) {
+ poolEntry.sessionFuture.cancel(true);
+ poolEntry.sessionFuture = null;
+ }
+ for (;;) {
+ final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
+ if (callback != null) {
+ callback.cancelled();
+ } else {
+ break;
+ }
+ }
+ }
+ }
+ sessionPool.clear();
+ }
+ }
+
+ @Override
+ public final void close() {
+ shutdown(ShutdownType.GRACEFUL);
+ }
+
+ PoolEntry getPoolEntry(final T endpoint) {
+ PoolEntry poolEntry = sessionPool.get(endpoint);
+ if (poolEntry == null) {
+ final PoolEntry newPoolEntry = new PoolEntry();
+ poolEntry = sessionPool.putIfAbsent(endpoint, newPoolEntry);
+ if (poolEntry == null) {
+ poolEntry = newPoolEntry;
+ }
+ }
+ return poolEntry;
+ }
+
+ public final Future<IOSession> getSession(
+ final T endpoint,
+ final Timeout requestTimeout,
+ final FutureCallback<IOSession> callback) {
+ Args.notNull(endpoint, "Endpoint");
+ Asserts.check(!closed.get(), "Connection pool shut down");
+ final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
+ final PoolEntry poolEntry = getPoolEntry(endpoint);
+ getSessionInternal(poolEntry, false, endpoint, requestTimeout, new FutureCallback<IOSession>() {
+
+ @Override
+ public void completed(final IOSession ioSession) {
+ validateSession(ioSession, new Callback<Boolean>() {
+
+ @Override
+ public void execute(final Boolean result) {
+ if (result) {
+ future.completed(ioSession);
+ } else {
+ getSessionInternal(poolEntry, true, endpoint, requestTimeout, new FutureCallback<IOSession>() {
+
+ @Override
+ public void completed(final IOSession ioSession) {
+ future.completed(ioSession);
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ future.failed(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ future.cancel();
+ }
+
+ });
+ }
+ }
+
+ });
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ future.failed(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ future.cancel();
+ }
+
+ });
+ return future;
+ }
+
+ private void getSessionInternal(
+ final PoolEntry poolEntry,
+ final boolean requestNew,
+ final T namedEndpoint,
+ final Timeout requestTimeout,
+ final FutureCallback<IOSession> callback) {
+ synchronized (poolEntry) {
+ if (poolEntry.session != null && requestNew) {
+ closeSession(poolEntry.session, ShutdownType.GRACEFUL);
+ poolEntry.session = null;
+ }
+ if (poolEntry.session != null && poolEntry.session.isClosed()) {
+ poolEntry.session = null;
+ }
+ if (poolEntry.session != null) {
+ callback.completed(poolEntry.session);
+ } else {
+ poolEntry.requestQueue.add(callback);
+ if (poolEntry.sessionFuture == null) {
+ poolEntry.sessionFuture = connectSession(
+ namedEndpoint,
+ requestTimeout,
+ new FutureCallback<IOSession>() {
+
+ @Override
+ public void completed(final IOSession result) {
+ synchronized (poolEntry) {
+ poolEntry.session = result;
+ poolEntry.sessionFuture = null;
+ for (;;) {
+ final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
+ if (callback != null) {
+ callback.completed(result);
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ synchronized (poolEntry) {
+ poolEntry.session = null;
+ poolEntry.sessionFuture = null;
+ for (;;) {
+ final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
+ if (callback != null) {
+ callback.failed(ex);
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void cancelled() {
+ failed(new ConnectionClosedException("Connection request cancelled"));
+ }
+
+ });
+ }
+ }
+ }
+ }
+
+ public final void enumAvailable(final Callback<IOSession> callback) {
+ for (final PoolEntry poolEntry: sessionPool.values()) {
+ if (poolEntry.session != null) {
+ synchronized (poolEntry) {
+ if (poolEntry.session != null) {
+ callback.execute(poolEntry.session);
+ if (poolEntry.session.isClosed()) {
+ poolEntry.session = null;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public final void closeIdle(final TimeValue idleTime) {
+ final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMillis() : 0);
+ for (final PoolEntry poolEntry: sessionPool.values()) {
+ if (poolEntry.session != null) {
+ synchronized (poolEntry) {
+ if (poolEntry.session != null && poolEntry.session.getLastReadTime() <= deadline) {
+ closeSession(poolEntry.session, ShutdownType.GRACEFUL);
+ poolEntry.session = null;
+ }
+ }
+ }
+ }
+ }
+
+ public final Set<T> getRoutes() {
+ return new HashSet<>(sessionPool.keySet());
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder buffer = new StringBuilder();
+ buffer.append("I/O sessions: ");
+ buffer.append(sessionPool.size());
+ return buffer.toString();
+ }
+
+ static class PoolEntry {
+
+ final Queue<FutureCallback<IOSession>> requestQueue;
+ volatile Future<IOSession> sessionFuture;
+ volatile IOSession session;
+
+ PoolEntry() {
+ this.requestQueue = new ArrayDeque<>();
+ }
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/221f695c/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestAbstractIOSessionPool.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestAbstractIOSessionPool.java b/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestAbstractIOSessionPool.java
new file mode 100644
index 0000000..1085bf2
--- /dev/null
+++ b/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestAbstractIOSessionPool.java
@@ -0,0 +1,289 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactor;
+
+import java.util.concurrent.Future;
+
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.util.Timeout;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestAbstractIOSessionPool {
+
+ @Mock
+ private Future<IOSession> connectFuture;
+ @Mock
+ private FutureCallback<IOSession> callback1;
+ @Mock
+ private FutureCallback<IOSession> callback2;
+ @Mock
+ private IOSession ioSession1;
+ @Mock
+ private IOSession ioSession2;
+
+ private AbstractIOSessionPool<String> impl;
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setup() {
+ impl = Mockito.mock(AbstractIOSessionPool.class, Mockito.withSettings()
+ .defaultAnswer(Answers.CALLS_REAL_METHODS)
+ .useConstructor());
+ }
+
+ @Test
+ public void testGetSessions() throws Exception {
+
+ Mockito.when(impl.connectSession(
+ Mockito.anyString(),
+ Mockito.<Timeout>any(),
+ Mockito.<FutureCallback<IOSession>>any())).thenReturn(connectFuture);
+
+ Mockito.doAnswer(new Answer() {
+
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable {
+ final Callback<Boolean> callback = invocation.getArgument(1);
+ callback.execute(true);
+ return null;
+ }
+
+ }).when(impl).validateSession(Mockito.<IOSession>any(), Mockito.<Callback<Boolean>>any());
+
+ final Future<IOSession> future1 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);
+ Assert.assertThat(future1, CoreMatchers.notNullValue());
+ Assert.assertThat(future1.isDone(), CoreMatchers.equalTo(false));
+ Assert.assertThat(impl.getRoutes(), CoreMatchers.hasItem("somehost"));
+
+ Mockito.verify(impl).connectSession(
+ Mockito.eq("somehost"),
+ Mockito.eq(Timeout.ofSeconds(123L)),
+ Mockito.<FutureCallback<IOSession>>any());
+
+ final Future<IOSession> future2 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);
+ Assert.assertThat(future2, CoreMatchers.notNullValue());
+ Assert.assertThat(future2.isDone(), CoreMatchers.equalTo(false));
+ Assert.assertThat(impl.getRoutes(), CoreMatchers.hasItem("somehost"));
+
+ Mockito.verify(impl, Mockito.times(1)).connectSession(
+ Mockito.eq("somehost"),
+ Mockito.<Timeout>any(),
+ Mockito.argThat(new ArgumentMatcher<FutureCallback<IOSession>>() {
+
+ @Override
+ public boolean matches(final FutureCallback<IOSession> callback) {
+ callback.completed(ioSession1);
+ return true;
+ }
+
+ }));
+
+ Assert.assertThat(future1.isDone(), CoreMatchers.equalTo(true));
+ Assert.assertThat(future1.get(), CoreMatchers.sameInstance(ioSession1));
+
+ Assert.assertThat(future2.isDone(), CoreMatchers.equalTo(true));
+ Assert.assertThat(future2.get(), CoreMatchers.sameInstance(ioSession1));
+
+ Mockito.verify(impl, Mockito.times(2)).validateSession(Mockito.<IOSession>any(), Mockito.<Callback<Boolean>>any());
+
+ final Future<IOSession> future3 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);
+
+ Mockito.verify(impl, Mockito.times(1)).connectSession(
+ Mockito.eq("somehost"),
+ Mockito.<Timeout>any(),
+ Mockito.<FutureCallback<IOSession>>any());
+
+ Mockito.verify(impl, Mockito.times(3)).validateSession(Mockito.<IOSession>any(), Mockito.<Callback<Boolean>>any());
+
+ Assert.assertThat(future3.isDone(), CoreMatchers.equalTo(true));
+ Assert.assertThat(future3.get(), CoreMatchers.sameInstance(ioSession1));
+ }
+
+ @Test
+ public void testGetSessionFailure() throws Exception {
+
+ Mockito.when(impl.connectSession(
+ Mockito.anyString(),
+ Mockito.<Timeout>any(),
+ Mockito.<FutureCallback<IOSession>>any())).thenReturn(connectFuture);
+
+ final Future<IOSession> future1 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);
+ Assert.assertThat(future1, CoreMatchers.notNullValue());
+ Assert.assertThat(future1.isDone(), CoreMatchers.equalTo(false));
+ Assert.assertThat(impl.getRoutes(), CoreMatchers.hasItem("somehost"));
+
+ Mockito.verify(impl).connectSession(
+ Mockito.eq("somehost"),
+ Mockito.eq(Timeout.ofSeconds(123L)),
+ Mockito.<FutureCallback<IOSession>>any());
+
+ final Future<IOSession> future2 = impl.getSession("somehost", Timeout.ofSeconds(123L), null);
+ Assert.assertThat(future2, CoreMatchers.notNullValue());
+ Assert.assertThat(future2.isDone(), CoreMatchers.equalTo(false));
+ Assert.assertThat(impl.getRoutes(), CoreMatchers.hasItem("somehost"));
+
+ Mockito.verify(impl, Mockito.times(1)).connectSession(
+ Mockito.eq("somehost"),
+ Mockito.<Timeout>any(),
+ Mockito.argThat(new ArgumentMatcher<FutureCallback<IOSession>>() {
+
+ @Override
+ public boolean matches(final FutureCallback<IOSession> callback) {
+ callback.failed(new Exception("Boom"));
+ return true;
+ }
+
+ }));
+
+ Assert.assertThat(future1.isDone(), CoreMatchers.equalTo(true));
+ Assert.assertThat(future2.isDone(), CoreMatchers.equalTo(true));
+ }
+
+ @Test
+ public void testShutdownPool() throws Exception {
+ final AbstractIOSessionPool.PoolEntry entry1 = impl.getPoolEntry("host1");
+ Assert.assertThat(entry1, CoreMatchers.notNullValue());
+ entry1.session = ioSession1;
+
+ final AbstractIOSessionPool.PoolEntry entry2 = impl.getPoolEntry("host2");
+ Assert.assertThat(entry2, CoreMatchers.notNullValue());
+ entry2.session = ioSession2;
+
+ final AbstractIOSessionPool.PoolEntry entry3 = impl.getPoolEntry("host3");
+ Assert.assertThat(entry3, CoreMatchers.notNullValue());
+ entry3.sessionFuture = connectFuture;
+ entry3.requestQueue.add(callback1);
+ entry3.requestQueue.add(callback2);
+
+ impl.shutdown(ShutdownType.GRACEFUL);
+
+ Mockito.verify(impl).closeSession(ioSession1, ShutdownType.GRACEFUL);
+ Mockito.verify(impl).closeSession(ioSession2, ShutdownType.GRACEFUL);
+ Mockito.verify(connectFuture).cancel(Mockito.anyBoolean());
+ Mockito.verify(callback1).cancelled();
+ Mockito.verify(callback2).cancelled();
+ }
+
+ @Test
+ public void testCloseIdleSessions() throws Exception {
+ final AbstractIOSessionPool.PoolEntry entry1 = impl.getPoolEntry("host1");
+ Assert.assertThat(entry1, CoreMatchers.notNullValue());
+ entry1.session = ioSession1;
+
+ final AbstractIOSessionPool.PoolEntry entry2 = impl.getPoolEntry("host2");
+ Assert.assertThat(entry2, CoreMatchers.notNullValue());
+ entry2.session = ioSession2;
+
+ impl.closeIdle(TimeValue.ofMillis(0L));
+
+ Mockito.verify(impl).closeSession(ioSession1, ShutdownType.GRACEFUL);
+ Mockito.verify(impl).closeSession(ioSession2, ShutdownType.GRACEFUL);
+
+ Assert.assertThat(entry1.session, CoreMatchers.nullValue());
+ Assert.assertThat(entry2.session, CoreMatchers.nullValue());
+ }
+
+ @Test
+ public void testEnumSessions() throws Exception {
+ final AbstractIOSessionPool.PoolEntry entry1 = impl.getPoolEntry("host1");
+ Assert.assertThat(entry1, CoreMatchers.notNullValue());
+ entry1.session = ioSession1;
+
+ final AbstractIOSessionPool.PoolEntry entry2 = impl.getPoolEntry("host2");
+ Assert.assertThat(entry2, CoreMatchers.notNullValue());
+ entry2.session = ioSession2;
+
+ impl.enumAvailable(new Callback<IOSession>() {
+
+ @Override
+ public void execute(final IOSession ioSession) {
+ ioSession.shutdown(ShutdownType.GRACEFUL);
+ }
+
+ });
+ Mockito.verify(ioSession1).shutdown(ShutdownType.GRACEFUL);
+ Mockito.verify(ioSession2).shutdown(ShutdownType.GRACEFUL);
+ }
+
+ @Test
+ public void testGetSessionReconnectAfterValidate() throws Exception {
+ final AbstractIOSessionPool.PoolEntry entry1 = impl.getPoolEntry("somehost");
+ Assert.assertThat(entry1, CoreMatchers.notNullValue());
+ entry1.session = ioSession1;
+
+ Mockito.when(ioSession1.isClosed()).thenReturn(false);
+ Mockito.doAnswer(new Answer() {
+
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable {
+ final Callback<Boolean> callback = invocation.getArgument(1);
+ callback.execute(false);
+ return null;
+ }
+
+ }).when(impl).validateSession(Mockito.<IOSession>any(), Mockito.<Callback<Boolean>>any());
+
+ impl.getSession("somehost", Timeout.ofSeconds(123L), null);
+
+ Mockito.verify(impl, Mockito.times(1)).connectSession(
+ Mockito.eq("somehost"),
+ Mockito.eq(Timeout.ofSeconds(123L)),
+ Mockito.<FutureCallback<IOSession>>any());
+ }
+
+ @Test
+ public void testGetSessionReconnectIfClosed() throws Exception {
+ final AbstractIOSessionPool.PoolEntry entry1 = impl.getPoolEntry("somehost");
+ Assert.assertThat(entry1, CoreMatchers.notNullValue());
+ entry1.session = ioSession1;
+
+ Mockito.when(ioSession1.isClosed()).thenReturn(true);
+
+ impl.getSession("somehost", Timeout.ofSeconds(123L), null);
+
+ Mockito.verify(impl).connectSession(
+ Mockito.eq("somehost"),
+ Mockito.eq(Timeout.ofSeconds(123L)),
+ Mockito.<FutureCallback<IOSession>>any());
+ }
+
+}