You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/02/12 14:01:29 UTC

[03/77] [partial] incubator-tinkerpop git commit: moved com/tinkerpop directories to org/apache/tinkerpop

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
new file mode 100644
index 0000000..e01ecc0
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.util.iterator;
+
+import com.tinkerpop.gremlin.process.FastNoSuchElementException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class IteratorUtils {
+
+    private IteratorUtils() {
+
+    }
+
+    public static final <S> Iterator<S> of(final S a) {
+        return new SingleIterator<>(a);
+    }
+
+    public static final <S> Iterator<S> of(final S a, S b) {
+        return new DoubleIterator<>(a, b);
+    }
+
+    ///////////////
+
+    public static Iterator convertToIterator(final Object o) {
+        final Iterator itty;
+        if (o instanceof Iterable)
+            itty = ((Iterable) o).iterator();
+        else if (o instanceof Iterator)
+            itty = (Iterator) o;
+        else if (o instanceof Object[])
+            itty = new ArrayIterator<>((Object[]) o);
+        else if (o instanceof Stream)
+            itty = ((Stream) o).iterator();
+        else if (o instanceof Map)
+            itty = ((Map) o).entrySet().iterator();
+        else if (o instanceof Throwable)
+            itty = IteratorUtils.of(((Throwable) o).getMessage());
+        else
+            itty = IteratorUtils.of(o);
+        return itty;
+    }
+
+    public static List convertToList(final Object o) {
+        final Iterator iterator = IteratorUtils.convertToIterator(o);
+        return list(iterator);
+    }
+
+    public static final <S extends Collection<T>, T> S fill(final Iterator<T> iterator, final S collection) {
+        while (iterator.hasNext()) {
+            collection.add(iterator.next());
+        }
+        return collection;
+    }
+
+    public static final long count(final Iterator iterator) {
+        long ix = 0;
+        for (; iterator.hasNext(); ++ix) iterator.next();
+        return ix;
+    }
+
+    public static <S> List<S> list(final Iterator<S> iterator) {
+        return fill(iterator, new ArrayList<>());
+    }
+
+    public static <K, S> Map<K, S> collectMap(final Iterator<S> iterator, final Function<S, K> key) {
+        return collectMap(iterator, key, Function.identity());
+    }
+
+    public static <K, S, V> Map<K, V> collectMap(final Iterator<S> iterator, final Function<S, K> key, final Function<S, V> value) {
+        final Map<K, V> map = new HashMap<>();
+        while (iterator.hasNext()) {
+            final S obj = iterator.next();
+            map.put(key.apply(obj), value.apply(obj));
+        }
+        return map;
+    }
+
+    public static <K, S> Map<K, List<S>> groupBy(final Iterator<S> iterator, final Function<S, K> groupBy) {
+        final Map<K, List<S>> map = new HashMap<>();
+        while (iterator.hasNext()) {
+            final S obj = iterator.next();
+            map.computeIfAbsent(groupBy.apply(obj), k -> new ArrayList<>()).add(obj);
+        }
+        return map;
+    }
+
+    ///////////////
+
+    public static final <S, E> Iterator<E> map(final Iterator<S> iterator, final Function<S, E> function) {
+        return new Iterator<E>() {
+            @Override
+            public boolean hasNext() {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public E next() {
+                return function.apply(iterator.next());
+            }
+        };
+    }
+
+    public static final <S, E> Iterable<E> map(final Iterable<S> iterable, final Function<S, E> function) {
+        return new Iterable<E>() {
+            @Override
+            public Iterator<E> iterator() {
+                return IteratorUtils.map(iterable.iterator(), function);
+            }
+        };
+    }
+
+    ///////////////
+
+    public static final <S> Iterator<S> filter(final Iterator<S> iterator, final Predicate<S> predicate) {
+
+
+        return new Iterator<S>() {
+            S nextResult = null;
+
+            @Override
+            public boolean hasNext() {
+                if (null != this.nextResult) {
+                    return true;
+                } else {
+                    advance();
+                    return null != this.nextResult;
+                }
+            }
+
+            @Override
+            public S next() {
+                try {
+                    if (null != this.nextResult) {
+                        return this.nextResult;
+                    } else {
+                        advance();
+                        if (null != this.nextResult)
+                            return this.nextResult;
+                        else
+                            throw FastNoSuchElementException.instance();
+                    }
+                } finally {
+                    this.nextResult = null;
+                }
+            }
+
+            private final void advance() {
+                this.nextResult = null;
+                while (iterator.hasNext()) {
+                    final S s = iterator.next();
+                    if (predicate.test(s)) {
+                        this.nextResult = s;
+                        return;
+                    }
+                }
+            }
+        };
+    }
+
+    public static final <S> Iterable<S> filter(final Iterable<S> iterable, final Predicate<S> predicate) {
+        return new Iterable<S>() {
+            @Override
+            public Iterator<S> iterator() {
+                return IteratorUtils.filter(iterable.iterator(), predicate);
+            }
+        };
+    }
+
+    ///////////////////
+
+    public static final <S> Iterator<S> concat(final Iterator<S>... iterators) {
+        final MultiIterator<S> iterator = new MultiIterator<>();
+        for (final Iterator<S> itty : iterators) {
+            iterator.addIterator(itty);
+        }
+        return iterator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
new file mode 100644
index 0000000..c5dca2d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.util.iterator;
+
+import com.tinkerpop.gremlin.process.FastNoSuchElementException;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MultiIterator<T> implements Iterator<T>, Serializable {
+
+    private final List<Iterator<T>> iterators = new ArrayList<>();
+    private int current = 0;
+
+    public void addIterator(final Iterator<T> iterator) {
+        this.iterators.add(iterator);
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (this.current >= this.iterators.size())
+            return false;
+
+        Iterator<T> currentIterator = iterators.get(this.current);
+
+        while (true) {
+            if (currentIterator.hasNext()) {
+                return true;
+            } else {
+                this.current++;
+                if (this.current >= iterators.size())
+                    break;
+                currentIterator = iterators.get(this.current);
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public T next() {
+        Iterator<T> currentIterator = iterators.get(this.current);
+        while (true) {
+            if (currentIterator.hasNext()) {
+                return currentIterator.next();
+            } else {
+                this.current++;
+                if (this.current >= iterators.size())
+                    break;
+                currentIterator = iterators.get(current);
+            }
+        }
+        throw FastNoSuchElementException.instance();
+    }
+
+    public void clear() {
+        this.iterators.clear();
+        this.current = 0;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/SingleIterator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/SingleIterator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/SingleIterator.java
new file mode 100644
index 0000000..178c6a9
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/SingleIterator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.util.iterator;
+
+import com.tinkerpop.gremlin.process.FastNoSuchElementException;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+final class SingleIterator<T> implements Iterator<T>,Serializable {
+
+    private final T t;
+    private boolean alive = true;
+
+    protected SingleIterator(final T t) {
+        this.t = t;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return this.alive;
+    }
+
+    @Override
+    public T next() {
+        if (!this.alive)
+            throw FastNoSuchElementException.instance();
+        else {
+            this.alive = false;
+            return t;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/tools/MultiMap.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/tools/MultiMap.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/tools/MultiMap.java
new file mode 100644
index 0000000..22d377d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/tools/MultiMap.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.util.tools;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A number of static methods to interact with a multi map, i.e. a map that maps keys to sets of values.
+ *
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+public class MultiMap {
+
+    public static <K, V> boolean putAll(final Map<K, Set<V>> map, final K key, final Collection<V> values) {
+        return getMapSet(map, key).addAll(values);
+    }
+
+    public static <K, V> boolean put(final Map<K, Set<V>> map, final K key, final V value) {
+        return getMapSet(map, key).add(value);
+    }
+
+    public static <K, V> boolean containsEntry(final Map<K, Set<V>> map, final K key, final V value) {
+        final Set<V> set = map.get(key);
+        return set != null && set.contains(value);
+    }
+
+    public static <K, V> Set<V> get(final Map<K, Set<V>> map, final K key) {
+        final Set<V> set = getMapSet(map, key);
+        return set == null ? Collections.emptySet() : set;
+    }
+
+    private static <K, V> Set<V> getMapSet(final Map<K, Set<V>> map, final K key) {
+        Set<V> set = map.get(key);
+        if (set == null) {
+            set = new HashSet<>();
+            map.put(key, set);
+        }
+        return set;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/tools/TimeUtils.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/tools/TimeUtils.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/tools/TimeUtils.java
new file mode 100644
index 0000000..030e615
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/tools/TimeUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.util.tools;
+
+import java.util.stream.IntStream;
+
+/**
+ * @author Daniel Kuppitz (http://gremlin.guru)
+ */
+public class TimeUtils {
+
+    public static double clock(final Runnable runnable) {
+        return clock(100, runnable);
+    }
+
+    public static double clock(final int loops, final Runnable runnable) {
+        runnable.run(); // warm-up
+        return IntStream.range(0, loops).mapToDouble(i -> {
+            long t = System.nanoTime();
+            runnable.run();
+            return (System.nanoTime() - t) * 0.000001;
+        }).sum() / loops;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Channelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Channelizer.java
deleted file mode 100644
index 19e2cba..0000000
--- a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Channelizer.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.driver;
-
-import com.tinkerpop.gremlin.driver.handler.NioGremlinRequestEncoder;
-import com.tinkerpop.gremlin.driver.handler.NioGremlinResponseDecoder;
-import com.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
-import com.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
-import com.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.http.HttpClientCodec;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
-import io.netty.handler.codec.http.websocketx.WebSocketVersion;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.util.SelfSignedCertificate;
-
-import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Client-side channel initializer interface.
- *
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-public interface Channelizer extends ChannelHandler {
-
-    /**
-     * Initializes the {@code Channelizer}. Called just after construction.
-     */
-    public void init(final Connection connection);
-
-    /**
-     * Called after the channel connects. The {@code Channelizer} may need to perform some functions, such as a
-     * handshake.
-     */
-    public default void connected() {
-    }
-
-    /**
-     * Base implementation of the client side {@link Channelizer}.
-     */
-    abstract class AbstractChannelizer extends ChannelInitializer<SocketChannel> implements Channelizer {
-        protected Connection connection;
-        protected Cluster cluster;
-        private ConcurrentMap<UUID, ResponseQueue> pending;
-
-        protected static final String PIPELINE_GREMLIN_HANDLER = "gremlin-handler";
-
-        public boolean supportsSsl() {
-            return cluster.connectionPoolSettings().enableSsl;
-        }
-
-        public abstract void configure(final ChannelPipeline pipeline);
-
-        public void finalize(final ChannelPipeline pipeline) {
-            // do nothing
-        }
-
-        @Override
-        public void init(final Connection connection) {
-            this.connection = connection;
-            this.cluster = connection.getCluster();
-            this.pending = connection.getPending();
-        }
-
-        @Override
-        protected void initChannel(final SocketChannel socketChannel) throws Exception {
-            final ChannelPipeline pipeline = socketChannel.pipeline();
-            final Optional<SslContext> sslCtx;
-            if (supportsSsl()) {
-                try {
-                    final SelfSignedCertificate ssc = new SelfSignedCertificate();
-                    sslCtx = Optional.of(SslContext.newServerContext(ssc.certificate(), ssc.privateKey()));
-                } catch (Exception ex) {
-                    throw new RuntimeException(ex);
-                }
-            } else {
-                sslCtx = Optional.empty();
-            }
-
-            if (sslCtx.isPresent()) {
-                pipeline.addLast(sslCtx.get().newHandler(socketChannel.alloc(), connection.getUri().getHost(), connection.getUri().getPort()));
-            }
-
-            configure(pipeline);
-            pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new Handler.GremlinResponseHandler(pending));
-        }
-    }
-
-    /**
-     * WebSocket {@link Channelizer} implementation.
-     */
-    class WebSocketChannelizer extends AbstractChannelizer {
-        private WebSocketClientHandler handler;
-
-        private WebSocketGremlinRequestEncoder webSocketGremlinRequestEncoder;
-        private WebSocketGremlinResponseDecoder webSocketGremlinResponseDecoder;
-
-        @Override
-        public void init(final Connection connection) {
-            super.init(connection);
-            webSocketGremlinRequestEncoder = new WebSocketGremlinRequestEncoder(true, cluster.getSerializer());
-            webSocketGremlinResponseDecoder = new WebSocketGremlinResponseDecoder(cluster.getSerializer());
-        }
-
-        @Override
-        public boolean supportsSsl() {
-            final String scheme = connection.getUri().getScheme();
-            return "wss".equalsIgnoreCase(scheme);
-        }
-
-        @Override
-        public void configure(final ChannelPipeline pipeline) {
-            final String scheme = connection.getUri().getScheme();
-            if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme))
-                throw new IllegalStateException("Unsupported scheme (only ws: or wss: supported): " + scheme);
-
-            if (!supportsSsl() && "wss".equalsIgnoreCase(scheme))
-                throw new IllegalStateException("To use wss scheme ensure that enableSsl is set to true in configuration");
-
-            final int maxContentLength = cluster.connectionPoolSettings().maxContentLength;
-            handler = new WebSocketClientHandler(
-                    WebSocketClientHandshakerFactory.newHandshaker(
-                            connection.getUri(), WebSocketVersion.V13, null, false, HttpHeaders.EMPTY_HEADERS, maxContentLength));
-
-            pipeline.addLast("http-codec", new HttpClientCodec());
-            pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength));
-            pipeline.addLast("ws-handler", handler);
-            pipeline.addLast("gremlin-encoder", webSocketGremlinRequestEncoder);
-            pipeline.addLast("gremlin-decoder", webSocketGremlinResponseDecoder);
-        }
-
-        @Override
-        public void connected() {
-            try {
-                handler.handshakeFuture().sync();
-            } catch (Exception ex) {
-                throw new RuntimeException(ex);
-            }
-        }
-    }
-
-    /**
-     * NIO {@link Channelizer} implementation.
-     */
-    class NioChannelizer extends AbstractChannelizer {
-        private NioGremlinRequestEncoder nioGremlinRequestEncoder;
-
-        @Override
-        public void init(final Connection connection) {
-            super.init(connection);
-            nioGremlinRequestEncoder = new NioGremlinRequestEncoder(true, cluster.getSerializer());
-        }
-
-        @Override
-        public void configure(ChannelPipeline pipeline) {
-            pipeline.addLast("gremlin-decoder", new NioGremlinResponseDecoder(cluster.getSerializer()));
-            pipeline.addLast("gremlin-encoder", nioGremlinRequestEncoder);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Client.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Client.java
deleted file mode 100644
index e6f14b6..0000000
--- a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Client.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.driver;
-
-import com.tinkerpop.gremlin.driver.exception.ConnectionException;
-import com.tinkerpop.gremlin.driver.message.RequestMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-
-/**
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-public abstract class Client {
-
-    private static final Logger logger = LoggerFactory.getLogger(Client.class);
-
-    protected final Cluster cluster;
-    protected volatile boolean initialized;
-
-    Client(final Cluster cluster) {
-        this.cluster = cluster;
-    }
-
-    /**
-     * Makes any final changes to the builder and returns the constructed {@link RequestMessage}.  Implementers
-     * may choose to override this message to append data to the request before sending.  By default, this method
-     * will simply call the {@link com.tinkerpop.gremlin.driver.message.RequestMessage.Builder#create()} and return
-     * the {@link RequestMessage}.
-     */
-    public RequestMessage buildMessage(final RequestMessage.Builder builder) {
-        return builder.create();
-    }
-
-    /**
-     * Called in the {@link #init} method.
-     */
-    protected abstract void initializeImplementation();
-
-    /**
-     * Chooses a {@link Connection} to write the message to.
-     */
-    protected abstract Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException;
-
-    /**
-     * Asynchronous close of the {@code Client}.
-     */
-    public abstract CompletableFuture<Void> closeAsync();
-
-    public synchronized Client init() {
-        if (initialized)
-            return this;
-
-        logger.debug("Initializing client on cluster [{}]", cluster);
-
-        cluster.init();
-        initializeImplementation();
-
-        initialized = true;
-        return this;
-    }
-
-    public ResultSet submit(final String gremlin) {
-        return submit(gremlin, null);
-    }
-
-    public ResultSet submit(final String gremlin, final Map<String, Object> parameters) {
-        try {
-            return submitAsync(gremlin, parameters).get();
-        } catch (Exception ex) {
-            throw new RuntimeException(ex);
-        }
-    }
-
-    public CompletableFuture<ResultSet> submitAsync(final String gremlin) {
-        return submitAsync(gremlin, null);
-    }
-
-    public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String, Object> parameters) {
-        final RequestMessage.Builder request = RequestMessage.build(Tokens.OPS_EVAL)
-                .add(Tokens.ARGS_GREMLIN, gremlin)
-                .add(Tokens.ARGS_BATCH_SIZE, cluster.connectionPoolSettings().resultIterationBatchSize);
-
-        Optional.ofNullable(parameters).ifPresent(params -> request.addArg(Tokens.ARGS_BINDINGS, parameters));
-        return submitAsync(buildMessage(request));
-    }
-
-    public CompletableFuture<ResultSet> submitAsync(final RequestMessage msg) {
-        if (!initialized)
-            init();
-
-        final CompletableFuture<ResultSet> future = new CompletableFuture<>();
-        Connection connection = null;
-        try {
-            // the connection is returned to the pool once the response has been completed...see Connection.write()
-            // the connection may be returned to the pool with the host being marked as "unavailable"
-            connection = chooseConnection(msg);
-            connection.write(msg, future);
-            return future;
-        } catch (TimeoutException toe) {
-            // there was a timeout borrowing a connection
-            throw new RuntimeException(toe);
-        } catch (ConnectionException ce) {
-            throw new RuntimeException(ce);
-        } catch (Exception ex) {
-            throw new RuntimeException(ex);
-        } finally {
-            logger.debug("Submitted {} to - {}", msg, null == connection ? "connection not initialized" : connection.toString());
-        }
-    }
-
-    public void close() {
-        closeAsync().join();
-    }
-
-    /**
-     * A {@code Client} implementation that does not operate in a session.  Requests are sent to multiple servers
-     * given a {@link com.tinkerpop.gremlin.driver.LoadBalancingStrategy}.  Transactions are automatically committed
-     * (or rolled-back on error) after each request.
-     */
-    public static class ClusteredClient extends Client {
-
-        private ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>();
-
-        ClusteredClient(final Cluster cluster) {
-            super(cluster);
-        }
-
-        @Override
-        protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {
-            final Iterator<Host> possibleHosts = this.cluster.loadBalancingStrategy().select(msg);
-            if (!possibleHosts.hasNext()) throw new TimeoutException("Timed out waiting for an available host.");
-
-            final Host bestHost = this.cluster.loadBalancingStrategy().select(msg).next();
-            final ConnectionPool pool = hostConnectionPools.get(bestHost);
-            return pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
-        }
-
-        @Override
-        protected void initializeImplementation() {
-            cluster.getClusterInfo().allHosts().forEach(host -> {
-                try {
-                    // hosts that don't initialize connection pools will come up as a dead host
-                    hostConnectionPools.put(host, new ConnectionPool(host, cluster));
-
-                    // added a new host to the cluster so let the load-balancer know
-                    this.cluster.loadBalancingStrategy().onNew(host);
-                } catch (Exception ex) {
-                    // catch connection errors and prevent them from failing the creation
-                    logger.warn("Could not initialize connection pool for {} - will try later", host);
-                }
-            });
-        }
-
-        @Override
-        public CompletableFuture<Void> closeAsync() {
-            final CompletableFuture[] poolCloseFutures = new CompletableFuture[hostConnectionPools.size()];
-            hostConnectionPools.values().stream().map(ConnectionPool::closeAsync).collect(Collectors.toList()).toArray(poolCloseFutures);
-            return CompletableFuture.allOf(poolCloseFutures);
-        }
-    }
-
-    /**
-     * A {@code Client} implementation that operates in the context of a session.  Requests are sent to a single
-     * server, where each request is bound to the same thread with the same set of bindings across requests.
-     * Transaction are not automatically committed. It is up the client to issue commit/rollback commands.
-     */
-    public static class SessionedClient extends Client {
-        private final String sessionId;
-
-        private ConnectionPool connectionPool;
-
-        SessionedClient(final Cluster cluster, final String sessionId) {
-            super(cluster);
-            this.sessionId = sessionId;
-        }
-
-        @Override
-        public RequestMessage buildMessage(final RequestMessage.Builder builder) {
-            builder.processor("session");
-            builder.addArg(Tokens.ARGS_SESSION, sessionId);
-            return builder.create();
-        }
-
-        @Override
-        protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {
-            return connectionPool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
-        }
-
-        @Override
-        protected void initializeImplementation() {
-            // chooses an available host at random
-            final List<Host> hosts = cluster.getClusterInfo().allHosts()
-                    .stream().filter(Host::isAvailable).collect(Collectors.toList());
-            Collections.shuffle(hosts);
-            final Host host = hosts.get(0);
-            connectionPool = new ConnectionPool(host, cluster);
-        }
-
-        @Override
-        public CompletableFuture<Void> closeAsync() {
-            return connectionPool.closeAsync();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Cluster.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Cluster.java
deleted file mode 100644
index d1a1ee3..0000000
--- a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Cluster.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.driver;
-
-import com.tinkerpop.gremlin.driver.ser.Serializers;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.stream.Collectors;
-
-/**
- * A bunch of Gremlin Server instances.
- *
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-public class Cluster {
-
-    private Manager manager;
-
-    private Cluster(final List<InetSocketAddress> contactPoints, final MessageSerializer serializer,
-                    final int nioPoolSize, final int workerPoolSize,
-                    final Settings.ConnectionPoolSettings connectionPoolSettings,
-                    final LoadBalancingStrategy loadBalancingStrategy) {
-        this.manager = new Manager(contactPoints, serializer, nioPoolSize, workerPoolSize, connectionPoolSettings, loadBalancingStrategy);
-    }
-
-    public synchronized void init() {
-        if (!manager.initialized)
-            manager.init();
-    }
-
-    /**
-     * Creates a {@link Client.ClusteredClient} instance to this {@code Cluster}, meaning requests will be routed to
-     * one or more servers (depending on the cluster configuration), where each request represents the entirety of a
-     * transaction.  A commit or rollback (in case of error) is automatically executed at the end of the request.
-     */
-    public Client connect() {
-        return new Client.ClusteredClient(this);
-    }
-
-    /**
-     * Creates a {@link Client.SessionedClient} instance to this {@code Cluster}, meaning requests will be routed to
-     * a single server (randomly selected from the cluster), where the same bindings will be available on each request.
-     * Requests are bound to the same thread on the server and thus transactions may extend beyond the bounds of a
-     * single request.  The transactions are managed by the user and must be committed or rolledback manually.
-     *
-     * @param sessionId user supplied id for the session which should be unique (a UUID is ideal).
-     */
-    public Client connect(final String sessionId) {
-        if (null == sessionId || sessionId.isEmpty())
-            throw new IllegalArgumentException("sessionId cannot be null or empty");
-        return new Client.SessionedClient(this, sessionId);
-    }
-
-    @Override
-    public String toString() {
-        return manager.toString();
-    }
-
-    public static Builder build() {
-        return new Builder();
-    }
-
-    public static Builder build(final String address) {
-        return new Builder(address);
-    }
-
-    public static Builder build(final File configurationFile) throws FileNotFoundException {
-        final Settings settings = Settings.read(new FileInputStream(configurationFile));
-        final List<String> addresses = settings.hosts;
-        if (addresses.size() == 0)
-            throw new IllegalStateException("At least one value must be specified to the hosts setting");
-
-        final Builder builder = new Builder(settings.hosts.get(0))
-                .port(settings.port)
-                .nioPoolSize(settings.nioPoolSize)
-                .workerPoolSize(settings.workerPoolSize)
-                .maxInProcessPerConnection(settings.connectionPool.maxInProcessPerConnection)
-                .maxSimultaneousRequestsPerConnection(settings.connectionPool.maxSimultaneousRequestsPerConnection)
-                .minSimultaneousRequestsPerConnection(settings.connectionPool.minSimultaneousRequestsPerConnection)
-                .maxConnectionPoolSize(settings.connectionPool.maxSize)
-                .minConnectionPoolSize(settings.connectionPool.minSize);
-
-        // the first address was added above in the constructor, so skip it if there are more
-        if (addresses.size() > 1)
-            addresses.stream().skip(1).forEach(builder::addContactPoint);
-
-        try {
-            builder.serializer(settings.serializer.create());
-        } catch (Exception ex) {
-            throw new IllegalStateException("Could not establish serializer - " + ex.getMessage());
-        }
-
-        return builder;
-    }
-
-    /**
-     * Create a {@code Cluster} with all default settings which will connect to one contact point at {@code localhost}.
-     */
-    public static Cluster open() {
-        return build("localhost").create();
-    }
-
-    /**
-     * Create a {@code Cluster} using a YAML-based configuration file.
-     */
-    public static Cluster open(final String configurationFile) throws Exception {
-        final File file = new File(configurationFile);
-        if (!file.exists())
-            throw new IllegalArgumentException(String.format("Configuration file at %s does not exist", configurationFile));
-
-        return build(file).create();
-    }
-
-    public void close() {
-        closeAsync().join();
-    }
-
-    public CompletableFuture<Void> closeAsync() {
-        return manager.close();
-    }
-
-    public List<URI> availableHosts() {
-        return Collections.unmodifiableList(getClusterInfo().allHosts().stream()
-                .filter(Host::isAvailable)
-                .map(Host::getHostUri)
-                .collect(Collectors.toList()));
-    }
-
-    Factory getFactory() {
-        return manager.factory;
-    }
-
-    MessageSerializer getSerializer() {
-        return manager.serializer;
-    }
-
-    ScheduledExecutorService executor() {
-        return manager.executor;
-    }
-
-    Settings.ConnectionPoolSettings connectionPoolSettings() {
-        return manager.connectionPoolSettings;
-    }
-
-    LoadBalancingStrategy loadBalancingStrategy() {
-        return manager.loadBalancingStrategy;
-    }
-
-    ClusterInfo getClusterInfo() {
-        return manager.clusterInfo;
-    }
-
-    public static class Builder {
-        private List<InetAddress> addresses = new ArrayList<>();
-        private int port = 8182;
-        private MessageSerializer serializer = Serializers.KRYO_V1D0.simpleInstance();
-        private int nioPoolSize = Runtime.getRuntime().availableProcessors();
-        private int workerPoolSize = Runtime.getRuntime().availableProcessors() * 2;
-        private int minConnectionPoolSize = ConnectionPool.MIN_POOL_SIZE;
-        private int maxConnectionPoolSize = ConnectionPool.MAX_POOL_SIZE;
-        private int minSimultaneousRequestsPerConnection = ConnectionPool.MIN_SIMULTANEOUS_REQUESTS_PER_CONNECTION;
-        private int maxSimultaneousRequestsPerConnection = ConnectionPool.MAX_SIMULTANEOUS_REQUESTS_PER_CONNECTION;
-        private int maxInProcessPerConnection = Connection.MAX_IN_PROCESS;
-        private int minInProcessPerConnection = Connection.MIN_IN_PROCESS;
-        private int maxWaitForConnection = Connection.MAX_WAIT_FOR_CONNECTION;
-        private int maxContentLength = Connection.MAX_CONTENT_LENGTH;
-        private int reconnectInitialDelay = Connection.RECONNECT_INITIAL_DELAY;
-        private int reconnectInterval = Connection.RECONNECT_INTERVAL;
-        private int resultIterationBatchSize = Connection.RESULT_ITERATION_BATCH_SIZE;
-        private boolean enableSsl = false;
-        private LoadBalancingStrategy loadBalancingStrategy = new LoadBalancingStrategy.RoundRobin();
-
-        private Builder() {
-            // empty to prevent direct instantiation
-        }
-
-        private Builder(final String address) {
-            addContactPoint(address);
-        }
-
-        /**
-         * Size of the pool for handling request/response operations.  Defaults to the number of available processors.
-         */
-        public Builder nioPoolSize(final int nioPoolSize) {
-            if (nioPoolSize < 1) throw new IllegalArgumentException("The workerPoolSize must be greater than zero");
-            this.nioPoolSize = nioPoolSize;
-            return this;
-        }
-
-        /**
-         * Size of the pool for handling background work.  Defaults to the number of available processors multiplied
-         * by 2
-         */
-        public Builder workerPoolSize(final int workerPoolSize) {
-            if (workerPoolSize < 1) throw new IllegalArgumentException("The workerPoolSize must be greater than zero");
-            this.workerPoolSize = workerPoolSize;
-            return this;
-        }
-
-        public Builder serializer(final String mimeType) {
-            serializer = Serializers.valueOf(mimeType).simpleInstance();
-            return this;
-        }
-
-        public Builder serializer(final Serializers mimeType) {
-            serializer = mimeType.simpleInstance();
-            return this;
-        }
-
-        public Builder serializer(final MessageSerializer serializer) {
-            this.serializer = serializer;
-            return this;
-        }
-
-        public Builder enableSsl(final boolean enable) {
-            this.enableSsl = enable;
-            return this;
-        }
-
-        public Builder minInProcessPerConnection(final int minInProcessPerConnection) {
-            this.minInProcessPerConnection = minInProcessPerConnection;
-            return this;
-        }
-
-        public Builder maxInProcessPerConnection(final int maxInProcessPerConnection) {
-            this.maxInProcessPerConnection = maxInProcessPerConnection;
-            return this;
-        }
-
-        public Builder maxSimultaneousRequestsPerConnection(final int maxSimultaneousRequestsPerConnection) {
-            this.maxSimultaneousRequestsPerConnection = maxSimultaneousRequestsPerConnection;
-            return this;
-        }
-
-        public Builder minSimultaneousRequestsPerConnection(final int minSimultaneousRequestsPerConnection) {
-            this.minSimultaneousRequestsPerConnection = minSimultaneousRequestsPerConnection;
-            return this;
-        }
-
-        public Builder maxConnectionPoolSize(final int maxSize) {
-            this.maxConnectionPoolSize = maxSize;
-            return this;
-        }
-
-        public Builder minConnectionPoolSize(final int minSize) {
-            this.minConnectionPoolSize = minSize;
-            return this;
-        }
-
-        /**
-         * Override the server setting that determines how many results are returned per batch.
-         */
-        public Builder resultIterationBatchSize(final int size) {
-            this.resultIterationBatchSize = size;
-            return this;
-        }
-
-        /**
-         * The maximum amount of time to wait for a connection to be borrowed from the connection pool.
-         */
-        public Builder maxWaitForConnection(final int maxWait) {
-            this.maxWaitForConnection = maxWait;
-            return this;
-        }
-
-        /**
-         * The maximum size in bytes of any request sent to the server.   This number should not exceed the same
-         * setting defined on the server.
-         */
-        public Builder maxContentLength(final int maxContentLength) {
-            this.maxContentLength = maxContentLength;
-            return this;
-        }
-
-        /**
-         * Time in milliseconds to wait before attempting to reconnect to a dead host after it has been marked dead.
-         */
-        public Builder reconnectIntialDelay(final int initialDelay) {
-            this.reconnectInitialDelay = initialDelay;
-            return this;
-        }
-
-        /**
-         * Time in milliseconds to wait between retries when attempting to reconnect to a dead host.
-         */
-        public Builder reconnectInterval(final int interval) {
-            this.reconnectInterval = interval;
-            return this;
-        }
-
-        public Builder loadBalancingStrategy(final LoadBalancingStrategy loadBalancingStrategy) {
-            this.loadBalancingStrategy = loadBalancingStrategy;
-            return this;
-        }
-
-        public Builder addContactPoint(final String address) {
-            try {
-                this.addresses.add(InetAddress.getByName(address));
-                return this;
-            } catch (UnknownHostException e) {
-                throw new IllegalArgumentException(e.getMessage());
-            }
-        }
-
-        public Builder addContactPoints(final String... addresses) {
-            for (String address : addresses)
-                addContactPoint(address);
-            return this;
-        }
-
-        public Builder port(final int port) {
-            this.port = port;
-            return this;
-        }
-
-        private List<InetSocketAddress> getContactPoints() {
-            return addresses.stream().map(addy -> new InetSocketAddress(addy, port)).collect(Collectors.toList());
-        }
-
-        public Cluster create() {
-            if (addresses.size() == 0) addContactPoint("localhost");
-            final Settings.ConnectionPoolSettings connectionPoolSettings = new Settings.ConnectionPoolSettings();
-            connectionPoolSettings.maxInProcessPerConnection = this.maxInProcessPerConnection;
-            connectionPoolSettings.minInProcessPerConnection = this.minInProcessPerConnection;
-            connectionPoolSettings.maxSimultaneousRequestsPerConnection = this.maxSimultaneousRequestsPerConnection;
-            connectionPoolSettings.minSimultaneousRequestsPerConnection = this.minSimultaneousRequestsPerConnection;
-            connectionPoolSettings.maxSize = this.maxConnectionPoolSize;
-            connectionPoolSettings.minSize = this.minConnectionPoolSize;
-            connectionPoolSettings.maxWaitForConnection = this.maxWaitForConnection;
-            connectionPoolSettings.maxContentLength = this.maxContentLength;
-            connectionPoolSettings.reconnectInitialDelay = this.reconnectInitialDelay;
-            connectionPoolSettings.reconnectInterval = this.reconnectInterval;
-            connectionPoolSettings.resultIterationBatchSize = this.resultIterationBatchSize;
-            connectionPoolSettings.enableSsl = this.enableSsl;
-            return new Cluster(getContactPoints(), serializer, this.nioPoolSize, this.workerPoolSize,
-                    connectionPoolSettings, loadBalancingStrategy);
-        }
-    }
-
-    static class Factory {
-        private final EventLoopGroup group;
-
-        public Factory(final int nioPoolSize) {
-            final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("gremlin-driver-loop-%d").build();
-            group = new NioEventLoopGroup(nioPoolSize, threadFactory);
-        }
-
-        Bootstrap createBootstrap() {
-            return new Bootstrap().group(group);
-        }
-
-        void shutdown() {
-            group.shutdownGracefully().awaitUninterruptibly();
-        }
-    }
-
-    class Manager {
-        private ClusterInfo clusterInfo;
-        private boolean initialized;
-        private final List<InetSocketAddress> contactPoints;
-        private final Factory factory;
-        private final MessageSerializer serializer;
-        private final Settings.ConnectionPoolSettings connectionPoolSettings;
-        private final LoadBalancingStrategy loadBalancingStrategy;
-
-        private final ScheduledExecutorService executor;
-
-        private Manager(final List<InetSocketAddress> contactPoints, final MessageSerializer serializer,
-                        final int nioPoolSize, final int workerPoolSize, final Settings.ConnectionPoolSettings connectionPoolSettings,
-                        final LoadBalancingStrategy loadBalancingStrategy) {
-            this.loadBalancingStrategy = loadBalancingStrategy;
-            this.clusterInfo = new ClusterInfo(Cluster.this);
-            this.contactPoints = contactPoints;
-            this.connectionPoolSettings = connectionPoolSettings;
-            this.factory = new Factory(nioPoolSize);
-            this.serializer = serializer;
-            this.executor = Executors.newScheduledThreadPool(workerPoolSize,
-                    new BasicThreadFactory.Builder().namingPattern("gremlin-driver-worker-%d").build());
-        }
-
-        synchronized void init() {
-            if (initialized)
-                return;
-
-            initialized = true;
-
-            contactPoints.forEach(address -> {
-                final Host host = clusterInfo.add(address);
-                if (host != null)
-                    host.makeAvailable();
-            });
-        }
-
-        CompletableFuture<Void> close() {
-            // this method is exposed publicly in both blocking and non-blocking forms.
-            return CompletableFuture.supplyAsync(() -> {
-                this.factory.shutdown();
-                return null;
-            });
-        }
-
-        @Override
-        public String toString() {
-            return String.join(", ", contactPoints.stream().map(InetSocketAddress::toString).collect(Collectors.<String>toList()));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/ClusterInfo.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/ClusterInfo.java b/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/ClusterInfo.java
deleted file mode 100644
index b9dbb33..0000000
--- a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/ClusterInfo.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.driver;
-
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-class ClusterInfo {
-
-    private final Cluster cluster;
-    private final ConcurrentMap<InetSocketAddress, Host> hosts = new ConcurrentHashMap<>();
-
-    public ClusterInfo(final Cluster cluster) {
-        this.cluster = cluster;
-    }
-
-    public Host add(final InetSocketAddress address) {
-        final Host newHost = new Host(address, cluster);
-        final Host previous = hosts.putIfAbsent(address, newHost);
-        return previous == null ? newHost : null;
-    }
-
-    Collection<Host> allHosts() {
-        return hosts.values();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Connection.java
deleted file mode 100644
index 41458ae..0000000
--- a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Connection.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.driver;
-
-import com.tinkerpop.gremlin.driver.exception.ConnectionException;
-import com.tinkerpop.gremlin.driver.message.RequestMessage;
-import com.tinkerpop.gremlin.driver.message.ResponseMessage;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * A single connection to a Gremlin Server instance.
- *
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-class Connection {
-    private static final Logger logger = LoggerFactory.getLogger(Connection.class);
-
-    private final Channel channel;
-    private final URI uri;
-    private final ConcurrentMap<UUID, ResponseQueue> pending = new ConcurrentHashMap<>();
-    private final Cluster cluster;
-    private final ConnectionPool pool;
-
-    public static final int MAX_IN_PROCESS = 4;
-    public static final int MIN_IN_PROCESS = 1;
-    public static final int MAX_WAIT_FOR_CONNECTION = 3000;
-    public static final int MAX_CONTENT_LENGTH = 65536;
-    public static final int RECONNECT_INITIAL_DELAY = 1000;
-    public static final int RECONNECT_INTERVAL = 1000;
-    public static final int RESULT_ITERATION_BATCH_SIZE = 64;
-
-    public final AtomicInteger inFlight = new AtomicInteger(0);
-    private volatile boolean isDead = false;
-    private final int maxInProcess;
-
-    private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
-
-    public Connection(final URI uri, final ConnectionPool pool, final Cluster cluster, final int maxInProcess) throws ConnectionException {
-        this.uri = uri;
-        this.cluster = cluster;
-        this.pool = pool;
-        this.maxInProcess = maxInProcess;
-
-        final Bootstrap b = this.cluster.getFactory().createBootstrap();
-
-        // todo: dynamically instantiate the channelizer from settings
-        final Channelizer channelizer = new Channelizer.WebSocketChannelizer();
-        channelizer.init(this);
-        b.channel(NioSocketChannel.class).handler(channelizer);
-
-        try {
-            channel = b.connect(uri.getHost(), uri.getPort()).sync().channel();
-            channelizer.connected();
-
-            logger.info("Created new connection for {}", uri);
-        } catch (Exception ie) {
-            logger.debug("Error opening connection on {}", uri);
-            throw new ConnectionException(uri, "Could not open connection", ie);
-        }
-    }
-
-    /**
-     * A connection can only have so many things in process happening on it at once, where "in process" refers to
-     * the maximum number of in-process requests less the number of pending responses.
-     */
-    public int availableInProcess() {
-        return maxInProcess - pending.size();
-    }
-
-    public boolean isDead() {
-        return isDead;
-    }
-
-    public boolean isClosed() {
-        return closeFuture.get() != null;
-    }
-
-    URI getUri() {
-        return uri;
-    }
-
-    Cluster getCluster() {
-        return cluster;
-    }
-
-    ConcurrentMap<UUID, ResponseQueue> getPending() {
-        return pending;
-    }
-
-    public CompletableFuture<Void> closeAsync() {
-        final CompletableFuture<Void> future = new CompletableFuture<>();
-        if (!closeFuture.compareAndSet(null, future))
-            return closeFuture.get();
-
-        // make sure all requests in the queue are fully processed before killing.  if they are then shutdown
-        // can be immediate.  if not this method will signal the readCompleted future defined in the write()
-        // operation to check if it can close.  in this way the connection no longer receives writes, but
-        // can continue to read. If a request never comes back the future won't get fulfilled and the connection
-        // will maintain a "pending" request, that won't quite ever go away.  The build up of such a dead requests
-        // on a connection in the connection pool will force the pool to replace the connection for a fresh one
-        if (pending.isEmpty()) {
-            if (null == channel)
-                future.complete(null);
-            else
-                shutdown(future);
-        }
-
-        return future;
-    }
-
-    public void close() {
-        try {
-            closeAsync().get();
-        } catch (Exception ex) {
-            throw new RuntimeException(ex);
-        }
-    }
-
-    public ChannelPromise write(final RequestMessage requestMessage, final CompletableFuture<ResultSet> future) {
-        // once there is a completed write, then create a traverser for the result set and complete
-        // the promise so that the client knows that that it can start checking for results.
-        final Connection thisConnection = this;
-        final ChannelPromise promise = channel.newPromise()
-                .addListener(f -> {
-                    if (!f.isSuccess()) {
-                        logger.debug(String.format("Write on connection %s failed", thisConnection), f.cause());
-                        thisConnection.isDead = true;
-                        thisConnection.returnToPool();
-                        future.completeExceptionally(f.cause());
-                    } else {
-                        final LinkedBlockingQueue<ResponseMessage> responseQueue = new LinkedBlockingQueue<>();
-                        final CompletableFuture<Void> readCompleted = new CompletableFuture<>();
-                        readCompleted.thenAcceptAsync(v -> {
-                            thisConnection.returnToPool();
-                            if (isClosed() && pending.isEmpty())
-                                shutdown(closeFuture.get());
-                        });
-                        final ResponseQueue handler = new ResponseQueue(responseQueue, readCompleted);
-                        pending.put(requestMessage.getRequestId(), handler);
-                        final ResultSet resultSet = new ResultSet(handler, cluster.executor(), channel,
-                                () -> {
-                                    pending.remove(requestMessage.getRequestId());
-                                    return null;
-                                });
-                        future.complete(resultSet);
-                    }
-                });
-        channel.writeAndFlush(requestMessage, promise);
-
-        return promise;
-    }
-
-    public void returnToPool() {
-        try {
-            if (pool != null) pool.returnConnection(this);
-        } catch (ConnectionException ce) {
-            logger.debug("Returned {} connection to {} but an error occurred - {}", this, pool, ce.getMessage());
-        }
-    }
-
-    private void shutdown(final CompletableFuture<Void> future) {
-        channel.writeAndFlush(new CloseWebSocketFrame());
-        final ChannelPromise promise = channel.newPromise();
-        promise.addListener(f -> {
-            if (f.cause() != null)
-                future.completeExceptionally(f.cause());
-            else
-                future.complete(null);
-        });
-
-        channel.close(promise);
-    }
-
-    @Override
-    public String toString() {
-        return String.format("Connection{host=%s, isDead=%s, inFlight=%s, pending=%s}",
-                pool.host, isDead, inFlight, pending.size());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/ConnectionPool.java
deleted file mode 100644
index 11eecc6..0000000
--- a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/ConnectionPool.java
+++ /dev/null
@@ -1,442 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.driver;
-
-import com.tinkerpop.gremlin.driver.exception.ConnectionException;
-import com.tinkerpop.gremlin.util.TimeUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-class ConnectionPool {
-    private static final Logger logger = LoggerFactory.getLogger(ConnectionPool.class);
-
-    public static final int MIN_POOL_SIZE = 2;
-    public static final int MAX_POOL_SIZE = 8;
-    public static final int MIN_SIMULTANEOUS_REQUESTS_PER_CONNECTION = 8;
-    public static final int MAX_SIMULTANEOUS_REQUESTS_PER_CONNECTION = 16;
-
-    public final Host host;
-    private final Cluster cluster;
-    private final List<Connection> connections;
-    private final AtomicInteger open;
-    private final Set<Connection> bin = new CopyOnWriteArraySet<>();
-    private final int minPoolSize;
-    private final int maxPoolSize;
-    private final int minSimultaneousRequestsPerConnection;
-    private final int maxSimultaneousRequestsPerConnection;
-    private final int minInProcess;
-
-    private final AtomicInteger scheduledForCreation = new AtomicInteger();
-
-    private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
-
-    private volatile int waiter = 0;
-    private final Lock waitLock = new ReentrantLock(true);
-    private final Condition hasAvailableConnection = waitLock.newCondition();
-
-    public ConnectionPool(final Host host, final Cluster cluster) {
-        this.host = host;
-        this.cluster = cluster;
-
-        final Settings.ConnectionPoolSettings settings = settings();
-        this.minPoolSize = settings.minSize;
-        this.maxPoolSize = settings.maxSize;
-        this.minSimultaneousRequestsPerConnection = settings.minSimultaneousRequestsPerConnection;
-        this.maxSimultaneousRequestsPerConnection = settings.maxSimultaneousRequestsPerConnection;
-        this.minInProcess = settings.minInProcessPerConnection;
-
-        final List<Connection> l = new ArrayList<>(minPoolSize);
-
-        try {
-            for (int i = 0; i < minPoolSize; i++)
-                l.add(new Connection(host.getHostUri(), this, cluster, settings.maxInProcessPerConnection));
-        } catch (ConnectionException ce) {
-            // ok if we don't get it initialized here - when a request is attempted in a connection from the
-            // pool it will try to create new connections as needed.
-            logger.debug("Could not initialize connections in pool for {} - pool size at {}", host, l.size());
-            considerUnavailable();
-        }
-
-        this.connections = new CopyOnWriteArrayList<>(l);
-        this.open = new AtomicInteger(connections.size());
-
-        logger.info("Opening connection pool on {} with core size of {}", host, minPoolSize);
-    }
-
-    public Settings.ConnectionPoolSettings settings() {
-        return cluster.connectionPoolSettings();
-    }
-
-    public Connection borrowConnection(final long timeout, final TimeUnit unit) throws TimeoutException, ConnectionException {
-        logger.debug("Borrowing connection from pool on {} - timeout in {} {}", host, timeout, unit);
-
-        if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
-
-        final Connection leastUsedConn = selectLeastUsed();
-
-        if (connections.isEmpty()) {
-            logger.debug("Tried to borrow connection but the pool was empty for {} - scheduling pool creation and waiting for connection", host);
-            for (int i = 0; i < minPoolSize; i++) {
-                scheduledForCreation.incrementAndGet();
-                newConnection();
-            }
-
-            return waitForConnection(timeout, unit);
-        }
-
-        if (null == leastUsedConn) {
-            if (isClosed())
-                throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
-            logger.debug("Pool was initialized but a connection could not be selected earlier - waiting for connection on {}", host);
-            return waitForConnection(timeout, unit);
-        }
-
-        // if the number in flight on the least used connection exceeds the max allowed and the pool size is
-        // not at maximum then consider opening a connection
-        final int currentPoolSize = connections.size();
-        if (leastUsedConn.inFlight.get() >= maxSimultaneousRequestsPerConnection && currentPoolSize < maxPoolSize) {
-            logger.debug("Least used {} on {} exceeds maxSimultaneousRequestsPerConnection but pool size {} < maxPoolSize - consider new connection",
-                    leastUsedConn, host, currentPoolSize);
-            considerNewConnection();
-        }
-
-        while (true) {
-            final int inFlight = leastUsedConn.inFlight.get();
-            final int availableInProcess = leastUsedConn.availableInProcess();
-
-            // if the number in flight starts to exceed what's available for this connection, then we need
-            // to wait for a connection to become available.
-            if (inFlight >= leastUsedConn.availableInProcess()) {
-                logger.debug("Least used connection selected from pool for {} but inFlight [{}] >= availableInProcess [{}] - wait",
-                        host, inFlight, availableInProcess);
-                return waitForConnection(timeout, unit);
-            }
-
-            if (leastUsedConn.inFlight.compareAndSet(inFlight, inFlight + 1)) {
-                logger.debug("Return least used {} on {}", leastUsedConn, host);
-                return leastUsedConn;
-            }
-        }
-    }
-
-    public void returnConnection(final Connection connection) throws ConnectionException {
-        logger.debug("Attempting to return {} on {}", connection, host);
-        if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
-
-        int inFlight = connection.inFlight.decrementAndGet();
-        if (connection.isDead()) {
-            logger.debug("Marking {} as dead", this.host);
-            considerUnavailable();
-        } else {
-            if (bin.contains(connection) && inFlight == 0) {
-                logger.debug("{} is already in the bin and it has no inflight requests so it is safe to close", connection);
-                if (bin.remove(connection))
-                    connection.closeAsync();
-                return;
-            }
-
-            // destroy a connection that exceeds the minimum pool size - it does not have the right to live if it
-            // isn't busy. replace a connection that has a low available in process count which likely means that
-            // it's backing up with requests that might never have returned. if neither of these scenarios are met
-            // then let the world know the connection is available.
-            final int poolSize = connections.size();
-            final int availableInProcess = connection.availableInProcess();
-            if (poolSize > minPoolSize && inFlight <= minSimultaneousRequestsPerConnection) {
-                logger.debug("On {} pool size of {} > minPoolSize {} and inFlight of {} <= minSimultaneousRequestsPerConnection {} so destroy {}",
-                        host, poolSize, minPoolSize, inFlight, minSimultaneousRequestsPerConnection, connection);
-                destroyConnection(connection);
-            } else if (connection.availableInProcess() < minInProcess) {
-                logger.debug("On {} availableInProcess {} < minInProcess {} so replace {}", host, availableInProcess, minInProcess, connection);
-                replaceConnection(connection);
-            } else
-                announceAvailableConnection();
-        }
-    }
-
-    public boolean isClosed() {
-        return closeFuture.get() != null;
-    }
-
-    /**
-     * Permanently kills the pool.
-     */
-    public CompletableFuture<Void> closeAsync() {
-        logger.info("Signalled closing of connection pool on {} with core size of {}", host, minPoolSize);
-
-        CompletableFuture<Void> future = closeFuture.get();
-        if (future != null)
-            return future;
-
-        announceAllAvailableConnection();
-        future = CompletableFuture.allOf(killAvailableConnections());
-
-        return closeFuture.compareAndSet(null, future) ? future : closeFuture.get();
-    }
-
-    public int opened() {
-        return open.get();
-    }
-
-    private CompletableFuture[] killAvailableConnections() {
-        final List<CompletableFuture<Void>> futures = new ArrayList<>(connections.size());
-        for (Connection connection : connections) {
-            final CompletableFuture<Void> future = connection.closeAsync();
-            future.thenRunAsync(open::decrementAndGet);
-            futures.add(future);
-        }
-        return futures.toArray(new CompletableFuture[futures.size()]);
-    }
-
-    private void replaceConnection(final Connection connection) {
-        logger.debug("Replace {}", connection);
-
-        open.decrementAndGet();
-        considerNewConnection();
-        definitelyDestroyConnection(connection);
-    }
-
-    private void considerNewConnection() {
-        logger.debug("Considering new connection on {} where pool size is {}", host, connections.size());
-        while (true) {
-            int inCreation = scheduledForCreation.get();
-
-            logger.debug("There are {} connections scheduled for creation on {}", inCreation, host);
-
-            // don't create more than one at a time
-            if (inCreation >= 1)
-                return;
-            if (scheduledForCreation.compareAndSet(inCreation, inCreation + 1))
-                break;
-        }
-
-        newConnection();
-    }
-
-    private void newConnection() {
-        cluster.executor().submit(() -> {
-            addConnectionIfUnderMaximum();
-            scheduledForCreation.decrementAndGet();
-            return null;
-        });
-    }
-
-    private boolean addConnectionIfUnderMaximum() {
-        while (true) {
-            int opened = open.get();
-            if (opened >= maxPoolSize)
-                return false;
-
-            if (open.compareAndSet(opened, opened + 1))
-                break;
-        }
-
-        if (isClosed()) {
-            open.decrementAndGet();
-            return false;
-        }
-
-        try {
-            connections.add(new Connection(host.getHostUri(), this, cluster, settings().maxInProcessPerConnection));
-        } catch (ConnectionException ce) {
-            logger.debug("Connections were under max, but there was an error creating the connection.", ce);
-            considerUnavailable();
-            return false;
-        }
-
-        announceAvailableConnection();
-        return true;
-    }
-
-    private boolean destroyConnection(final Connection connection) {
-        while (true) {
-            int opened = open.get();
-            if (opened <= minPoolSize)
-                return false;
-
-            if (open.compareAndSet(opened, opened - 1))
-                break;
-        }
-
-        definitelyDestroyConnection(connection);
-        return true;
-    }
-
-    private void definitelyDestroyConnection(final Connection connection) {
-        bin.add(connection);
-        connections.remove(connection);
-
-        if (connection.inFlight.get() == 0 && bin.remove(connection))
-            connection.closeAsync();
-
-        logger.debug("{} destroyed", connection);
-    }
-
-    private Connection waitForConnection(final long timeout, final TimeUnit unit) throws TimeoutException, ConnectionException {
-        long start = System.nanoTime();
-        long remaining = timeout;
-        long to = timeout;
-        do {
-            try {
-                awaitAvailableConnection(remaining, unit);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                to = 0;
-            }
-
-            if (isClosed())
-                throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
-
-            final Connection leastUsed = selectLeastUsed();
-            if (leastUsed != null) {
-                while (true) {
-                    final int inFlight = leastUsed.inFlight.get();
-                    final int availableInProcess = leastUsed.availableInProcess();
-                    if (inFlight >= availableInProcess) {
-                        logger.debug("Least used {} on {} has requests inFlight [{}] >= availableInProcess [{}] - may timeout waiting for connection",
-                                leastUsed, host, inFlight, availableInProcess);
-                        break;
-                    }
-
-                    if (leastUsed.inFlight.compareAndSet(inFlight, inFlight + 1)) {
-                        logger.debug("Return least used {} on {} after waiting", leastUsed, host);
-                        return leastUsed;
-                    }
-                }
-            }
-
-            remaining = to - TimeUtil.timeSince(start, unit);
-            logger.debug("Continue to wait for connection on {} if {} > 0", remaining);
-        } while (remaining > 0);
-
-        logger.debug("Timed-out waiting for connection on {} - possibly unavailable", host);
-
-        // if we timeout borrowing a connection that might mean the host is dead (or the timeout was super short).
-        // either way supply a function to reconnect
-        this.considerUnavailable();
-
-        throw new TimeoutException();
-    }
-
-    private void considerUnavailable() {
-        // called when a connection is "dead" right now such that a "dead" connection means the host is basically
-        // "dead".  that's probably ok for now, but this decision should likely be more flexible.
-        host.makeUnavailable(this::tryReconnect);
-
-        // let the load-balancer know that the host is acting poorly
-        this.cluster.loadBalancingStrategy().onUnavailable(host);
-
-    }
-
-    private boolean tryReconnect(final Host h) {
-        logger.debug("Trying to re-establish connection on {}", host);
-
-        try {
-
-            connections.add(new Connection(host.getHostUri(), this, cluster, settings().maxInProcessPerConnection));
-            this.open.set(connections.size());
-
-            // host is reconnected and a connection is now available
-            this.cluster.loadBalancingStrategy().onAvailable(host);
-            return true;
-        } catch (Exception ex) {
-            return false;
-        }
-    }
-
-    private void announceAvailableConnection() {
-        logger.debug("Announce connection available on {}", host);
-
-        if (waiter == 0)
-            return;
-
-        waitLock.lock();
-        try {
-            hasAvailableConnection.signal();
-        } finally {
-            waitLock.unlock();
-        }
-    }
-
-    private Connection selectLeastUsed() {
-        int minInFlight = Integer.MAX_VALUE;
-        Connection leastBusy = null;
-        for (Connection connection : connections) {
-            int inFlight = connection.inFlight.get();
-            if (!connection.isDead() && inFlight < minInFlight) {
-                minInFlight = inFlight;
-                leastBusy = connection;
-            }
-        }
-        return leastBusy;
-    }
-
-    private void awaitAvailableConnection(long timeout, TimeUnit unit) throws InterruptedException {
-        logger.debug("Wait {} {} for an available connection on {} with {}", timeout, unit, host, Thread.currentThread());
-
-        waitLock.lock();
-        waiter++;
-        try {
-            hasAvailableConnection.await(timeout, unit);
-        } finally {
-            waiter--;
-            waitLock.unlock();
-        }
-    }
-
-    private void announceAllAvailableConnection() {
-        if (waiter == 0)
-            return;
-
-        waitLock.lock();
-        try {
-            hasAvailableConnection.signalAll();
-        } finally {
-            waitLock.unlock();
-        }
-    }
-
-    @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder("ConnectionPool (");
-        sb.append(host);
-        sb.append(") - ");
-        connections.forEach(c -> {
-            sb.append(c);
-            sb.append(",");
-        });
-        return sb.toString().trim();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Handler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Handler.java
deleted file mode 100644
index b54b04c..0000000
--- a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Handler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.driver;
-
-import com.tinkerpop.gremlin.driver.exception.ResponseException;
-import com.tinkerpop.gremlin.driver.message.ResponseMessage;
-import com.tinkerpop.gremlin.driver.message.ResponseStatusCode;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.util.ReferenceCountUtil;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Traverser for internal handler classes for constructing the Channel Pipeline.
- *
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-class Handler {
-
-    static class GremlinResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> {
-        private final ConcurrentMap<UUID, ResponseQueue> pending;
-
-        public GremlinResponseHandler(final ConcurrentMap<UUID, ResponseQueue> pending) {
-            this.pending = pending;
-        }
-
-        @Override
-        protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception {
-            try {
-                if (response.getStatus().getCode() == ResponseStatusCode.SUCCESS) {
-                    final Object data = response.getResult().getData();
-                    if (data instanceof List) {
-                        // unrolls the collection into individual response messages to be handled by the queue
-                        final List<Object> listToUnroll = (List<Object>) data;
-                        final ResponseQueue queue = pending.get(response.getRequestId());
-                        listToUnroll.forEach(item -> queue.add(
-                                ResponseMessage.build(response.getRequestId())
-                                        .result(item).create()));
-                    } else {
-                        // since this is not a list it can just be added to the queue
-                        pending.get(response.getRequestId()).add(response);
-                    }
-                } else if (response.getStatus().getCode() == ResponseStatusCode.SUCCESS_TERMINATOR)
-                    pending.remove(response.getRequestId()).markComplete();
-                else
-                    pending.get(response.getRequestId()).markError(new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage()));
-            } finally {
-                ReferenceCountUtil.release(response);
-            }
-        }
-    }
-
-}