You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2019/12/03 19:38:42 UTC

[tinkerpop] 05/08: Upgrade Netty containing fix for proper close of FixedChannelPool

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 5e9aa7bb9986dbf923f5995a190094e7f516d33f
Author: Divij Vaidya <di...@gmail.com>
AuthorDate: Wed Oct 2 09:50:59 2019 -0700

    Upgrade Netty containing fix for proper close of FixedChannelPool
---
 .../gremlin/driver/ConnectionPoolImpl.java         |  21 +-
 .../gremlin/driver/TinkerpopFixedChannelPool.java  | 508 ---------------------
 2 files changed, 11 insertions(+), 518 deletions(-)

diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java
index 63cab7e..018faed 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java
@@ -24,6 +24,7 @@ import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.pool.ChannelHealthChecker;
 import io.netty.channel.pool.ChannelPoolHandler;
+import io.netty.channel.pool.FixedChannelPool;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.GlobalEventExecutor;
@@ -52,7 +53,7 @@ public class ConnectionPoolImpl implements ConnectionPool {
      * Netty's implementation of channel management with an upper bound. This connection
      * pool is responsible for attaching a channel with each request.
      */
-    private TinkerpopFixedChannelPool channelPool;
+    private FixedChannelPool channelPool;
     /**
      * Channel initializer that is safe to be re-used across multiple channels.
      */
@@ -129,17 +130,17 @@ public class ConnectionPoolImpl implements ConnectionPool {
         logger.debug("Initialized {} successfully.", this);
     }
 
-    private TinkerpopFixedChannelPool createChannelPool(final Bootstrap b,
+    private FixedChannelPool createChannelPool(final Bootstrap b,
                                                         final Settings.ConnectionPoolSettings connectionPoolSettings,
                                                         final ChannelPoolHandler handler) {
-        return new TinkerpopFixedChannelPool(b,
-                                             handler,
-                                             ChannelHealthChecker.ACTIVE,
-                                             TinkerpopFixedChannelPool.AcquireTimeoutAction.FAIL, // throw an exception on acquire timeout
-                                             connectionPoolSettings.maxWaitForConnection,
-                                             calculateMaxPoolSize(connectionPoolSettings), /*maxConnections*/
-                                             1, /*maxPendingAcquires*/
-                                             true);/*releaseHealthCheck*/
+        return new FixedChannelPool(b,
+                                    handler,
+                                    ChannelHealthChecker.ACTIVE,
+                                    FixedChannelPool.AcquireTimeoutAction.FAIL, // throw an exception on acquire timeout
+                                    connectionPoolSettings.maxWaitForConnection,
+                                    calculateMaxPoolSize(connectionPoolSettings), /*maxConnections*/
+                  1, /*maxPendingAcquires*/
+                   true);/*releaseHealthCheck*/
     }
 
     @Override
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/TinkerpopFixedChannelPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/TinkerpopFixedChannelPool.java
deleted file mode 100644
index ac77b38..0000000
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/TinkerpopFixedChannelPool.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.driver;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.pool.*;
-import io.netty.util.concurrent.*;
-import io.netty.util.internal.ObjectUtil;
-import io.netty.util.internal.ThrowableUtil;
-
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * TODO
- * This class is a fork of Netty's {@link FixedChannelPool}. This should be removed once
- * https://github.com/netty/netty/pull/9226 is resolved and we start consuming the release containing
- * the fix.
- */
-public class TinkerpopFixedChannelPool extends SimpleChannelPool {
-    private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace(
-            new IllegalStateException("Too many outstanding acquire operations"),
-            TinkerpopFixedChannelPool.class, "acquire0(...)");
-    private static final TimeoutException TIMEOUT_EXCEPTION = ThrowableUtil.unknownStackTrace(
-            new TimeoutException("Acquire operation took longer then configured maximum time"),
-            TinkerpopFixedChannelPool.class, "<init>(...)");
-    static final IllegalStateException POOL_CLOSED_ON_RELEASE_EXCEPTION = ThrowableUtil.unknownStackTrace(
-            new IllegalStateException("FixedChannelPool was closed"),
-            TinkerpopFixedChannelPool.class, "release(...)");
-    static final IllegalStateException POOL_CLOSED_ON_ACQUIRE_EXCEPTION = ThrowableUtil.unknownStackTrace(
-            new IllegalStateException("FixedChannelPool was closed"),
-            TinkerpopFixedChannelPool.class, "acquire0(...)");
-    public enum AcquireTimeoutAction {
-        /**
-         * Create a new connection when the timeout is detected.
-         */
-        NEW,
-
-        /**
-         * Fail the {@link Future} of the acquire call with a {@link TimeoutException}.
-         */
-        FAIL
-    }
-
-    private final EventExecutor executor;
-    private final long acquireTimeoutNanos;
-    private final Runnable timeoutTask;
-
-    // There is no need to worry about synchronization as everything that modified the queue or counts is done
-    // by the above EventExecutor.
-    private final Queue<TinkerpopFixedChannelPool.AcquireTask> pendingAcquireQueue = new ArrayDeque<TinkerpopFixedChannelPool.AcquireTask>();
-    private final int maxConnections;
-    private final int maxPendingAcquires;
-    private final AtomicInteger acquiredChannelCount = new AtomicInteger();
-    private int pendingAcquireCount;
-    private boolean closed;
-
-    /**
-     * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
-     *
-     * @param bootstrap         the {@link Bootstrap} that is used for connections
-     * @param handler           the {@link ChannelPoolHandler} that will be notified for the different pool actions
-     * @param maxConnections    the number of maximal active connections, once this is reached new tries to acquire
-     *                          a {@link Channel} will be delayed until a connection is returned to the pool again.
-     */
-    public TinkerpopFixedChannelPool(Bootstrap bootstrap,
-                            ChannelPoolHandler handler, int maxConnections) {
-        this(bootstrap, handler, maxConnections, Integer.MAX_VALUE);
-    }
-
-    /**
-     * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
-     *
-     * @param bootstrap             the {@link Bootstrap} that is used for connections
-     * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
-     * @param maxConnections        the number of maximal active connections, once this is reached new tries to
-     *                              acquire a {@link Channel} will be delayed until a connection is returned to the
-     *                              pool again.
-     * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
-     *                              be failed.
-     */
-    public TinkerpopFixedChannelPool(Bootstrap bootstrap,
-                            ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) {
-        this(bootstrap, handler, ChannelHealthChecker.ACTIVE, null, -1, maxConnections, maxPendingAcquires);
-    }
-
-    /**
-     * Creates a new instance.
-     *
-     * @param bootstrap             the {@link Bootstrap} that is used for connections
-     * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
-     * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
-     *                              still healthy when obtain from the {@link ChannelPool}
-     * @param action                the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} to use or {@code null} if non should be used.
-     *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
-     * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
-     *                              the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} takes place.
-     * @param maxConnections        the number of maximal active connections, once this is reached new tries to
-     *                              acquire a {@link Channel} will be delayed until a connection is returned to the
-     *                              pool again.
-     * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
-     *                              be failed.
-     */
-    public TinkerpopFixedChannelPool(Bootstrap bootstrap,
-                            ChannelPoolHandler handler,
-                            ChannelHealthChecker healthCheck, TinkerpopFixedChannelPool.AcquireTimeoutAction action,
-                            final long acquireTimeoutMillis,
-                            int maxConnections, int maxPendingAcquires) {
-        this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, true);
-    }
-
-    /**
-     * Creates a new instance.
-     *
-     * @param bootstrap             the {@link Bootstrap} that is used for connections
-     * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
-     * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
-     *                              still healthy when obtain from the {@link ChannelPool}
-     * @param action                the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} to use or {@code null} if non should be used.
-     *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
-     * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
-     *                              the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} takes place.
-     * @param maxConnections        the number of maximal active connections, once this is reached new tries to
-     *                              acquire a {@link Channel} will be delayed until a connection is returned to the
-     *                              pool again.
-     * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
-     *                              be failed.
-     * @param releaseHealthCheck    will check channel health before offering back if this parameter set to
-     *                              {@code true}.
-     */
-    public TinkerpopFixedChannelPool(Bootstrap bootstrap,
-                            ChannelPoolHandler handler,
-                            ChannelHealthChecker healthCheck, TinkerpopFixedChannelPool.AcquireTimeoutAction action,
-                            final long acquireTimeoutMillis,
-                            int maxConnections, int maxPendingAcquires, final boolean releaseHealthCheck) {
-        this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires,
-                releaseHealthCheck, true);
-    }
-
-    /**
-     * Creates a new instance.
-     *
-     * @param bootstrap             the {@link Bootstrap} that is used for connections
-     * @param handler               the {@link ChannelPoolHandler} that will be notified for the different pool actions
-     * @param healthCheck           the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
-     *                              still healthy when obtain from the {@link ChannelPool}
-     * @param action                the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} to use or {@code null} if non should be used.
-     *                              In this case {@param acquireTimeoutMillis} must be {@code -1}.
-     * @param acquireTimeoutMillis  the time (in milliseconds) after which an pending acquire must complete or
-     *                              the {@link TinkerpopFixedChannelPool.AcquireTimeoutAction} takes place.
-     * @param maxConnections        the number of maximal active connections, once this is reached new tries to
-     *                              acquire a {@link Channel} will be delayed until a connection is returned to the
-     *                              pool again.
-     * @param maxPendingAcquires    the maximum number of pending acquires. Once this is exceed acquire tries will
-     *                              be failed.
-     * @param releaseHealthCheck    will check channel health before offering back if this parameter set to
-     *                              {@code true}.
-     * @param lastRecentUsed        {@code true} {@link Channel} selection will be LIFO, if {@code false} FIFO.
-     */
-    public TinkerpopFixedChannelPool(Bootstrap bootstrap,
-                            ChannelPoolHandler handler,
-                            ChannelHealthChecker healthCheck, TinkerpopFixedChannelPool.AcquireTimeoutAction action,
-                            final long acquireTimeoutMillis,
-                            int maxConnections, int maxPendingAcquires,
-                            boolean releaseHealthCheck, boolean lastRecentUsed) {
-        super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
-        if (maxConnections < 1) {
-            throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
-        }
-        if (maxPendingAcquires < 1) {
-            throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
-        }
-        if (action == null && acquireTimeoutMillis == -1) {
-            timeoutTask = null;
-            acquireTimeoutNanos = -1;
-        } else if (action == null && acquireTimeoutMillis != -1) {
-            throw new NullPointerException("action");
-        } else if (action != null && acquireTimeoutMillis < 0) {
-            throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
-        } else {
-            acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
-            switch (action) {
-                case FAIL:
-                    timeoutTask = new TinkerpopFixedChannelPool.TimeoutTask() {
-                        @Override
-                        public void onTimeout(TinkerpopFixedChannelPool.AcquireTask task) {
-                            // Fail the promise as we timed out.
-                            task.promise.setFailure(TIMEOUT_EXCEPTION);
-                        }
-                    };
-                    break;
-                case NEW:
-                    timeoutTask = new TinkerpopFixedChannelPool.TimeoutTask() {
-                        @Override
-                        public void onTimeout(TinkerpopFixedChannelPool.AcquireTask task) {
-                            // Increment the acquire count and delegate to super to actually acquire a Channel which will
-                            // create a new connection.
-                            task.acquired();
-
-                            TinkerpopFixedChannelPool.super.acquire(task.promise);
-                        }
-                    };
-                    break;
-                default:
-                    throw new Error();
-            }
-        }
-        executor = bootstrap.config().group().next();
-        this.maxConnections = maxConnections;
-        this.maxPendingAcquires = maxPendingAcquires;
-    }
-
-    /** Returns the number of acquired channels that this pool thinks it has. */
-    public int acquiredChannelCount() {
-        return acquiredChannelCount.get();
-    }
-
-    @Override
-    public Future<Channel> acquire(final Promise<Channel> promise) {
-        try {
-            if (executor.inEventLoop()) {
-                acquire0(promise);
-            } else {
-                executor.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        acquire0(promise);
-                    }
-                });
-            }
-        } catch (Throwable cause) {
-            promise.setFailure(cause);
-        }
-        return promise;
-    }
-
-    private void acquire0(final Promise<Channel> promise) {
-        assert executor.inEventLoop();
-
-        if (closed) {
-            promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
-            return;
-        }
-        if (acquiredChannelCount.get() < maxConnections) {
-            assert acquiredChannelCount.get() >= 0;
-
-            // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
-            // EventLoop
-            Promise<Channel> p = executor.newPromise();
-            TinkerpopFixedChannelPool.AcquireListener l = new TinkerpopFixedChannelPool.AcquireListener(promise);
-            l.acquired();
-            p.addListener(l);
-            super.acquire(p);
-        } else {
-            if (pendingAcquireCount >= maxPendingAcquires) {
-                promise.setFailure(FULL_EXCEPTION);
-            } else {
-                TinkerpopFixedChannelPool.AcquireTask task = new TinkerpopFixedChannelPool.AcquireTask(promise);
-                if (pendingAcquireQueue.offer(task)) {
-                    ++pendingAcquireCount;
-
-                    if (timeoutTask != null) {
-                        task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
-                    }
-                } else {
-                    promise.setFailure(FULL_EXCEPTION);
-                }
-            }
-
-            assert pendingAcquireCount > 0;
-        }
-    }
-
-    @Override
-    public Future<Void> release(final Channel channel, final Promise<Void> promise) {
-        ObjectUtil.checkNotNull(promise, "promise");
-        final Promise<Void> p = executor.newPromise();
-        super.release(channel, p.addListener(new FutureListener<Void>() {
-
-            @Override
-            public void operationComplete(Future<Void> future) throws Exception {
-                assert executor.inEventLoop();
-
-                if (closed) {
-                    // Since the pool is closed, we have no choice but to close the channel
-                    channel.close();
-                    promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION);
-                    return;
-                }
-
-                if (future.isSuccess()) {
-                    decrementAndRunTaskQueue();
-                    promise.setSuccess(null);
-                } else {
-                    Throwable cause = future.cause();
-                    // Check if the exception was not because of we passed the Channel to the wrong pool.
-                    if (!(cause instanceof IllegalArgumentException)) {
-                        decrementAndRunTaskQueue();
-                    }
-                    promise.setFailure(future.cause());
-                }
-            }
-        }));
-        return promise;
-    }
-
-    private void decrementAndRunTaskQueue() {
-        // We should never have a negative value.
-        int currentCount = acquiredChannelCount.decrementAndGet();
-        assert currentCount >= 0;
-
-        // Run the pending acquire tasks before notify the original promise so if the user would
-        // try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >=
-        // maxPendingAcquires we may be able to run some pending tasks first and so allow to add
-        // more.
-        runTaskQueue();
-    }
-
-    private void runTaskQueue() {
-        while (acquiredChannelCount.get() < maxConnections) {
-            TinkerpopFixedChannelPool.AcquireTask task = pendingAcquireQueue.poll();
-            if (task == null) {
-                break;
-            }
-
-            // Cancel the timeout if one was scheduled
-            ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
-            if (timeoutFuture != null) {
-                timeoutFuture.cancel(false);
-            }
-
-            --pendingAcquireCount;
-            task.acquired();
-
-            super.acquire(task.promise);
-        }
-
-        // We should never have a negative value.
-        assert pendingAcquireCount >= 0;
-        assert acquiredChannelCount.get() >= 0;
-    }
-
-    // AcquireTask extends AcquireListener to reduce object creations and so GC pressure
-    private final class AcquireTask extends TinkerpopFixedChannelPool.AcquireListener {
-        final Promise<Channel> promise;
-        final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos;
-        ScheduledFuture<?> timeoutFuture;
-
-        public AcquireTask(Promise<Channel> promise) {
-            super(promise);
-            // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
-            // EventLoop.
-            this.promise = executor.<Channel>newPromise().addListener(this);
-        }
-    }
-
-    private abstract class TimeoutTask implements Runnable {
-        @Override
-        public final void run() {
-            assert executor.inEventLoop();
-            long nanoTime = System.nanoTime();
-            for (;;) {
-                TinkerpopFixedChannelPool.AcquireTask task = pendingAcquireQueue.peek();
-                // Compare nanoTime as descripted in the javadocs of System.nanoTime()
-                //
-                // See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime()
-                // See https://github.com/netty/netty/issues/3705
-                if (task == null || nanoTime - task.expireNanoTime < 0) {
-                    break;
-                }
-                pendingAcquireQueue.remove();
-
-                --pendingAcquireCount;
-                onTimeout(task);
-            }
-        }
-
-        public abstract void onTimeout(TinkerpopFixedChannelPool.AcquireTask task);
-    }
-
-    private class AcquireListener implements FutureListener<Channel> {
-        private final Promise<Channel> originalPromise;
-        protected boolean acquired;
-
-        AcquireListener(Promise<Channel> originalPromise) {
-            this.originalPromise = originalPromise;
-        }
-
-        @Override
-        public void operationComplete(Future<Channel> future) throws Exception {
-            assert executor.inEventLoop();
-
-            if (closed) {
-                if (future.isSuccess()) {
-                    // Since the pool is closed, we have no choice but to close the channel
-                    future.getNow().close();
-                }
-                originalPromise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
-                return;
-            }
-
-            if (future.isSuccess()) {
-                originalPromise.setSuccess(future.getNow());
-            } else {
-                if (acquired) {
-                    decrementAndRunTaskQueue();
-                } else {
-                    runTaskQueue();
-                }
-
-                originalPromise.setFailure(future.cause());
-            }
-        }
-
-        public void acquired() {
-            if (acquired) {
-                return;
-            }
-            acquiredChannelCount.incrementAndGet();
-            acquired = true;
-        }
-    }
-
-    /**
-     * Closes the pool in an async manner.
-     *
-     * @return Future which represents completion of the close task
-     */
-    public Future<Void> closeAsync() {
-        if (executor.inEventLoop()) {
-            return close0();
-        } else {
-            final Promise<Void> closeComplete = executor.newPromise();
-            executor.execute(new Runnable() {
-                @Override
-                public void run() {
-                    close0().addListener(new FutureListener<Void>() {
-                        @Override
-                        public void operationComplete(Future<Void> f) throws Exception {
-                            if (f.isSuccess()) {
-                                closeComplete.setSuccess(null);
-                            } else {
-                                closeComplete.setFailure(f.cause());
-                            }
-                        }
-                    });
-                }
-            });
-            return closeComplete;
-        }
-    }
-
-    private Future<Void> close0() {
-        assert executor.inEventLoop();
-
-        if (!closed) {
-            closed = true;
-            for (;;) {
-                TinkerpopFixedChannelPool.AcquireTask task = pendingAcquireQueue.poll();
-                if (task == null) {
-                    break;
-                }
-                ScheduledFuture<?> f = task.timeoutFuture;
-                if (f != null) {
-                    f.cancel(false);
-                }
-                task.promise.setFailure(new ClosedChannelException());
-            }
-            acquiredChannelCount.set(0);
-            pendingAcquireCount = 0;
-
-            // Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need
-            // to ensure we will not block in a EventExecutor.
-            return GlobalEventExecutor.INSTANCE.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    TinkerpopFixedChannelPool.super.close();
-                    return null;
-                }
-            });
-        }
-
-        return GlobalEventExecutor.INSTANCE.newSucceededFuture(null);
-    }
-}