You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/02/12 14:01:29 UTC
[03/77] [partial] incubator-tinkerpop git commit: moved com/tinkerpop
directories to org/apache/tinkerpop
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
new file mode 100644
index 0000000..e01ecc0
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.util.iterator;
+
+import com.tinkerpop.gremlin.process.FastNoSuchElementException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class IteratorUtils {
+
+ private IteratorUtils() {
+
+ }
+
+ public static final <S> Iterator<S> of(final S a) {
+ return new SingleIterator<>(a);
+ }
+
+ public static final <S> Iterator<S> of(final S a, S b) {
+ return new DoubleIterator<>(a, b);
+ }
+
+ ///////////////
+
+ public static Iterator convertToIterator(final Object o) {
+ final Iterator itty;
+ if (o instanceof Iterable)
+ itty = ((Iterable) o).iterator();
+ else if (o instanceof Iterator)
+ itty = (Iterator) o;
+ else if (o instanceof Object[])
+ itty = new ArrayIterator<>((Object[]) o);
+ else if (o instanceof Stream)
+ itty = ((Stream) o).iterator();
+ else if (o instanceof Map)
+ itty = ((Map) o).entrySet().iterator();
+ else if (o instanceof Throwable)
+ itty = IteratorUtils.of(((Throwable) o).getMessage());
+ else
+ itty = IteratorUtils.of(o);
+ return itty;
+ }
+
+ public static List convertToList(final Object o) {
+ final Iterator iterator = IteratorUtils.convertToIterator(o);
+ return list(iterator);
+ }
+
+ public static final <S extends Collection<T>, T> S fill(final Iterator<T> iterator, final S collection) {
+ while (iterator.hasNext()) {
+ collection.add(iterator.next());
+ }
+ return collection;
+ }
+
+ public static final long count(final Iterator iterator) {
+ long ix = 0;
+ for (; iterator.hasNext(); ++ix) iterator.next();
+ return ix;
+ }
+
+ public static <S> List<S> list(final Iterator<S> iterator) {
+ return fill(iterator, new ArrayList<>());
+ }
+
+ public static <K, S> Map<K, S> collectMap(final Iterator<S> iterator, final Function<S, K> key) {
+ return collectMap(iterator, key, Function.identity());
+ }
+
+ public static <K, S, V> Map<K, V> collectMap(final Iterator<S> iterator, final Function<S, K> key, final Function<S, V> value) {
+ final Map<K, V> map = new HashMap<>();
+ while (iterator.hasNext()) {
+ final S obj = iterator.next();
+ map.put(key.apply(obj), value.apply(obj));
+ }
+ return map;
+ }
+
+ public static <K, S> Map<K, List<S>> groupBy(final Iterator<S> iterator, final Function<S, K> groupBy) {
+ final Map<K, List<S>> map = new HashMap<>();
+ while (iterator.hasNext()) {
+ final S obj = iterator.next();
+ map.computeIfAbsent(groupBy.apply(obj), k -> new ArrayList<>()).add(obj);
+ }
+ return map;
+ }
+
+ ///////////////
+
+ public static final <S, E> Iterator<E> map(final Iterator<S> iterator, final Function<S, E> function) {
+ return new Iterator<E>() {
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public E next() {
+ return function.apply(iterator.next());
+ }
+ };
+ }
+
+ public static final <S, E> Iterable<E> map(final Iterable<S> iterable, final Function<S, E> function) {
+ return new Iterable<E>() {
+ @Override
+ public Iterator<E> iterator() {
+ return IteratorUtils.map(iterable.iterator(), function);
+ }
+ };
+ }
+
+ ///////////////
+
+ public static final <S> Iterator<S> filter(final Iterator<S> iterator, final Predicate<S> predicate) {
+
+
+ return new Iterator<S>() {
+ S nextResult = null;
+
+ @Override
+ public boolean hasNext() {
+ if (null != this.nextResult) {
+ return true;
+ } else {
+ advance();
+ return null != this.nextResult;
+ }
+ }
+
+ @Override
+ public S next() {
+ try {
+ if (null != this.nextResult) {
+ return this.nextResult;
+ } else {
+ advance();
+ if (null != this.nextResult)
+ return this.nextResult;
+ else
+ throw FastNoSuchElementException.instance();
+ }
+ } finally {
+ this.nextResult = null;
+ }
+ }
+
+ private final void advance() {
+ this.nextResult = null;
+ while (iterator.hasNext()) {
+ final S s = iterator.next();
+ if (predicate.test(s)) {
+ this.nextResult = s;
+ return;
+ }
+ }
+ }
+ };
+ }
+
+ public static final <S> Iterable<S> filter(final Iterable<S> iterable, final Predicate<S> predicate) {
+ return new Iterable<S>() {
+ @Override
+ public Iterator<S> iterator() {
+ return IteratorUtils.filter(iterable.iterator(), predicate);
+ }
+ };
+ }
+
+ ///////////////////
+
+ public static final <S> Iterator<S> concat(final Iterator<S>... iterators) {
+ final MultiIterator<S> iterator = new MultiIterator<>();
+ for (final Iterator<S> itty : iterators) {
+ iterator.addIterator(itty);
+ }
+ return iterator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
new file mode 100644
index 0000000..c5dca2d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.util.iterator;
+
+import com.tinkerpop.gremlin.process.FastNoSuchElementException;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MultiIterator<T> implements Iterator<T>, Serializable {
+
+ private final List<Iterator<T>> iterators = new ArrayList<>();
+ private int current = 0;
+
+ public void addIterator(final Iterator<T> iterator) {
+ this.iterators.add(iterator);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (this.current >= this.iterators.size())
+ return false;
+
+ Iterator<T> currentIterator = iterators.get(this.current);
+
+ while (true) {
+ if (currentIterator.hasNext()) {
+ return true;
+ } else {
+ this.current++;
+ if (this.current >= iterators.size())
+ break;
+ currentIterator = iterators.get(this.current);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public T next() {
+ Iterator<T> currentIterator = iterators.get(this.current);
+ while (true) {
+ if (currentIterator.hasNext()) {
+ return currentIterator.next();
+ } else {
+ this.current++;
+ if (this.current >= iterators.size())
+ break;
+ currentIterator = iterators.get(current);
+ }
+ }
+ throw FastNoSuchElementException.instance();
+ }
+
+ public void clear() {
+ this.iterators.clear();
+ this.current = 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/SingleIterator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/SingleIterator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/SingleIterator.java
new file mode 100644
index 0000000..178c6a9
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/SingleIterator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.util.iterator;
+
+import com.tinkerpop.gremlin.process.FastNoSuchElementException;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+final class SingleIterator<T> implements Iterator<T>,Serializable {
+
+ private final T t;
+ private boolean alive = true;
+
+ protected SingleIterator(final T t) {
+ this.t = t;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return this.alive;
+ }
+
+ @Override
+ public T next() {
+ if (!this.alive)
+ throw FastNoSuchElementException.instance();
+ else {
+ this.alive = false;
+ return t;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/tools/MultiMap.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/tools/MultiMap.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/tools/MultiMap.java
new file mode 100644
index 0000000..22d377d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/tools/MultiMap.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.util.tools;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A number of static methods to interact with a multi map, i.e. a map that maps keys to sets of values.
+ *
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+public class MultiMap {
+
+ public static <K, V> boolean putAll(final Map<K, Set<V>> map, final K key, final Collection<V> values) {
+ return getMapSet(map, key).addAll(values);
+ }
+
+ public static <K, V> boolean put(final Map<K, Set<V>> map, final K key, final V value) {
+ return getMapSet(map, key).add(value);
+ }
+
+ public static <K, V> boolean containsEntry(final Map<K, Set<V>> map, final K key, final V value) {
+ final Set<V> set = map.get(key);
+ return set != null && set.contains(value);
+ }
+
+ public static <K, V> Set<V> get(final Map<K, Set<V>> map, final K key) {
+ final Set<V> set = getMapSet(map, key);
+ return set == null ? Collections.emptySet() : set;
+ }
+
+ private static <K, V> Set<V> getMapSet(final Map<K, Set<V>> map, final K key) {
+ Set<V> set = map.get(key);
+ if (set == null) {
+ set = new HashSet<>();
+ map.put(key, set);
+ }
+ return set;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/tools/TimeUtils.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/tools/TimeUtils.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/tools/TimeUtils.java
new file mode 100644
index 0000000..030e615
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/tools/TimeUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.util.tools;
+
+import java.util.stream.IntStream;
+
+/**
+ * @author Daniel Kuppitz (http://gremlin.guru)
+ */
+public class TimeUtils {
+
+ public static double clock(final Runnable runnable) {
+ return clock(100, runnable);
+ }
+
+ public static double clock(final int loops, final Runnable runnable) {
+ runnable.run(); // warm-up
+ return IntStream.range(0, loops).mapToDouble(i -> {
+ long t = System.nanoTime();
+ runnable.run();
+ return (System.nanoTime() - t) * 0.000001;
+ }).sum() / loops;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Channelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Channelizer.java
deleted file mode 100644
index 19e2cba..0000000
--- a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Channelizer.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.driver;
-
-import com.tinkerpop.gremlin.driver.handler.NioGremlinRequestEncoder;
-import com.tinkerpop.gremlin.driver.handler.NioGremlinResponseDecoder;
-import com.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
-import com.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
-import com.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.http.HttpClientCodec;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
-import io.netty.handler.codec.http.websocketx.WebSocketVersion;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.util.SelfSignedCertificate;
-
-import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Client-side channel initializer interface.
- *
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-public interface Channelizer extends ChannelHandler {
-
- /**
- * Initializes the {@code Channelizer}. Called just after construction.
- */
- public void init(final Connection connection);
-
- /**
- * Called after the channel connects. The {@code Channelizer} may need to perform some functions, such as a
- * handshake.
- */
- public default void connected() {
- }
-
- /**
- * Base implementation of the client side {@link Channelizer}.
- */
- abstract class AbstractChannelizer extends ChannelInitializer<SocketChannel> implements Channelizer {
- protected Connection connection;
- protected Cluster cluster;
- private ConcurrentMap<UUID, ResponseQueue> pending;
-
- protected static final String PIPELINE_GREMLIN_HANDLER = "gremlin-handler";
-
- public boolean supportsSsl() {
- return cluster.connectionPoolSettings().enableSsl;
- }
-
- public abstract void configure(final ChannelPipeline pipeline);
-
- public void finalize(final ChannelPipeline pipeline) {
- // do nothing
- }
-
- @Override
- public void init(final Connection connection) {
- this.connection = connection;
- this.cluster = connection.getCluster();
- this.pending = connection.getPending();
- }
-
- @Override
- protected void initChannel(final SocketChannel socketChannel) throws Exception {
- final ChannelPipeline pipeline = socketChannel.pipeline();
- final Optional<SslContext> sslCtx;
- if (supportsSsl()) {
- try {
- final SelfSignedCertificate ssc = new SelfSignedCertificate();
- sslCtx = Optional.of(SslContext.newServerContext(ssc.certificate(), ssc.privateKey()));
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- } else {
- sslCtx = Optional.empty();
- }
-
- if (sslCtx.isPresent()) {
- pipeline.addLast(sslCtx.get().newHandler(socketChannel.alloc(), connection.getUri().getHost(), connection.getUri().getPort()));
- }
-
- configure(pipeline);
- pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new Handler.GremlinResponseHandler(pending));
- }
- }
-
- /**
- * WebSocket {@link Channelizer} implementation.
- */
- class WebSocketChannelizer extends AbstractChannelizer {
- private WebSocketClientHandler handler;
-
- private WebSocketGremlinRequestEncoder webSocketGremlinRequestEncoder;
- private WebSocketGremlinResponseDecoder webSocketGremlinResponseDecoder;
-
- @Override
- public void init(final Connection connection) {
- super.init(connection);
- webSocketGremlinRequestEncoder = new WebSocketGremlinRequestEncoder(true, cluster.getSerializer());
- webSocketGremlinResponseDecoder = new WebSocketGremlinResponseDecoder(cluster.getSerializer());
- }
-
- @Override
- public boolean supportsSsl() {
- final String scheme = connection.getUri().getScheme();
- return "wss".equalsIgnoreCase(scheme);
- }
-
- @Override
- public void configure(final ChannelPipeline pipeline) {
- final String scheme = connection.getUri().getScheme();
- if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme))
- throw new IllegalStateException("Unsupported scheme (only ws: or wss: supported): " + scheme);
-
- if (!supportsSsl() && "wss".equalsIgnoreCase(scheme))
- throw new IllegalStateException("To use wss scheme ensure that enableSsl is set to true in configuration");
-
- final int maxContentLength = cluster.connectionPoolSettings().maxContentLength;
- handler = new WebSocketClientHandler(
- WebSocketClientHandshakerFactory.newHandshaker(
- connection.getUri(), WebSocketVersion.V13, null, false, HttpHeaders.EMPTY_HEADERS, maxContentLength));
-
- pipeline.addLast("http-codec", new HttpClientCodec());
- pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength));
- pipeline.addLast("ws-handler", handler);
- pipeline.addLast("gremlin-encoder", webSocketGremlinRequestEncoder);
- pipeline.addLast("gremlin-decoder", webSocketGremlinResponseDecoder);
- }
-
- @Override
- public void connected() {
- try {
- handler.handshakeFuture().sync();
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- }
-
- /**
- * NIO {@link Channelizer} implementation.
- */
- class NioChannelizer extends AbstractChannelizer {
- private NioGremlinRequestEncoder nioGremlinRequestEncoder;
-
- @Override
- public void init(final Connection connection) {
- super.init(connection);
- nioGremlinRequestEncoder = new NioGremlinRequestEncoder(true, cluster.getSerializer());
- }
-
- @Override
- public void configure(ChannelPipeline pipeline) {
- pipeline.addLast("gremlin-decoder", new NioGremlinResponseDecoder(cluster.getSerializer()));
- pipeline.addLast("gremlin-encoder", nioGremlinRequestEncoder);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Client.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Client.java
deleted file mode 100644
index e6f14b6..0000000
--- a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Client.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.driver;
-
-import com.tinkerpop.gremlin.driver.exception.ConnectionException;
-import com.tinkerpop.gremlin.driver.message.RequestMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-
-/**
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-public abstract class Client {
-
- private static final Logger logger = LoggerFactory.getLogger(Client.class);
-
- protected final Cluster cluster;
- protected volatile boolean initialized;
-
- Client(final Cluster cluster) {
- this.cluster = cluster;
- }
-
- /**
- * Makes any final changes to the builder and returns the constructed {@link RequestMessage}. Implementers
- * may choose to override this message to append data to the request before sending. By default, this method
- * will simply call the {@link com.tinkerpop.gremlin.driver.message.RequestMessage.Builder#create()} and return
- * the {@link RequestMessage}.
- */
- public RequestMessage buildMessage(final RequestMessage.Builder builder) {
- return builder.create();
- }
-
- /**
- * Called in the {@link #init} method.
- */
- protected abstract void initializeImplementation();
-
- /**
- * Chooses a {@link Connection} to write the message to.
- */
- protected abstract Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException;
-
- /**
- * Asynchronous close of the {@code Client}.
- */
- public abstract CompletableFuture<Void> closeAsync();
-
- public synchronized Client init() {
- if (initialized)
- return this;
-
- logger.debug("Initializing client on cluster [{}]", cluster);
-
- cluster.init();
- initializeImplementation();
-
- initialized = true;
- return this;
- }
-
- public ResultSet submit(final String gremlin) {
- return submit(gremlin, null);
- }
-
- public ResultSet submit(final String gremlin, final Map<String, Object> parameters) {
- try {
- return submitAsync(gremlin, parameters).get();
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-
- public CompletableFuture<ResultSet> submitAsync(final String gremlin) {
- return submitAsync(gremlin, null);
- }
-
- public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String, Object> parameters) {
- final RequestMessage.Builder request = RequestMessage.build(Tokens.OPS_EVAL)
- .add(Tokens.ARGS_GREMLIN, gremlin)
- .add(Tokens.ARGS_BATCH_SIZE, cluster.connectionPoolSettings().resultIterationBatchSize);
-
- Optional.ofNullable(parameters).ifPresent(params -> request.addArg(Tokens.ARGS_BINDINGS, parameters));
- return submitAsync(buildMessage(request));
- }
-
- public CompletableFuture<ResultSet> submitAsync(final RequestMessage msg) {
- if (!initialized)
- init();
-
- final CompletableFuture<ResultSet> future = new CompletableFuture<>();
- Connection connection = null;
- try {
- // the connection is returned to the pool once the response has been completed...see Connection.write()
- // the connection may be returned to the pool with the host being marked as "unavailable"
- connection = chooseConnection(msg);
- connection.write(msg, future);
- return future;
- } catch (TimeoutException toe) {
- // there was a timeout borrowing a connection
- throw new RuntimeException(toe);
- } catch (ConnectionException ce) {
- throw new RuntimeException(ce);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- } finally {
- logger.debug("Submitted {} to - {}", msg, null == connection ? "connection not initialized" : connection.toString());
- }
- }
-
- public void close() {
- closeAsync().join();
- }
-
- /**
- * A {@code Client} implementation that does not operate in a session. Requests are sent to multiple servers
- * given a {@link com.tinkerpop.gremlin.driver.LoadBalancingStrategy}. Transactions are automatically committed
- * (or rolled-back on error) after each request.
- */
- public static class ClusteredClient extends Client {
-
- private ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>();
-
- ClusteredClient(final Cluster cluster) {
- super(cluster);
- }
-
- @Override
- protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {
- final Iterator<Host> possibleHosts = this.cluster.loadBalancingStrategy().select(msg);
- if (!possibleHosts.hasNext()) throw new TimeoutException("Timed out waiting for an available host.");
-
- final Host bestHost = this.cluster.loadBalancingStrategy().select(msg).next();
- final ConnectionPool pool = hostConnectionPools.get(bestHost);
- return pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
- }
-
- @Override
- protected void initializeImplementation() {
- cluster.getClusterInfo().allHosts().forEach(host -> {
- try {
- // hosts that don't initialize connection pools will come up as a dead host
- hostConnectionPools.put(host, new ConnectionPool(host, cluster));
-
- // added a new host to the cluster so let the load-balancer know
- this.cluster.loadBalancingStrategy().onNew(host);
- } catch (Exception ex) {
- // catch connection errors and prevent them from failing the creation
- logger.warn("Could not initialize connection pool for {} - will try later", host);
- }
- });
- }
-
- @Override
- public CompletableFuture<Void> closeAsync() {
- final CompletableFuture[] poolCloseFutures = new CompletableFuture[hostConnectionPools.size()];
- hostConnectionPools.values().stream().map(ConnectionPool::closeAsync).collect(Collectors.toList()).toArray(poolCloseFutures);
- return CompletableFuture.allOf(poolCloseFutures);
- }
- }
-
- /**
- * A {@code Client} implementation that operates in the context of a session. Requests are sent to a single
- * server, where each request is bound to the same thread with the same set of bindings across requests.
- * Transaction are not automatically committed. It is up the client to issue commit/rollback commands.
- */
- public static class SessionedClient extends Client {
- private final String sessionId;
-
- private ConnectionPool connectionPool;
-
- SessionedClient(final Cluster cluster, final String sessionId) {
- super(cluster);
- this.sessionId = sessionId;
- }
-
- @Override
- public RequestMessage buildMessage(final RequestMessage.Builder builder) {
- builder.processor("session");
- builder.addArg(Tokens.ARGS_SESSION, sessionId);
- return builder.create();
- }
-
- @Override
- protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {
- return connectionPool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
- }
-
- @Override
- protected void initializeImplementation() {
- // chooses an available host at random
- final List<Host> hosts = cluster.getClusterInfo().allHosts()
- .stream().filter(Host::isAvailable).collect(Collectors.toList());
- Collections.shuffle(hosts);
- final Host host = hosts.get(0);
- connectionPool = new ConnectionPool(host, cluster);
- }
-
- @Override
- public CompletableFuture<Void> closeAsync() {
- return connectionPool.closeAsync();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Cluster.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Cluster.java
deleted file mode 100644
index d1a1ee3..0000000
--- a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Cluster.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.driver;
-
-import com.tinkerpop.gremlin.driver.ser.Serializers;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.stream.Collectors;
-
-/**
- * A bunch of Gremlin Server instances.
- *
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-public class Cluster {
-
- private Manager manager;
-
- private Cluster(final List<InetSocketAddress> contactPoints, final MessageSerializer serializer,
- final int nioPoolSize, final int workerPoolSize,
- final Settings.ConnectionPoolSettings connectionPoolSettings,
- final LoadBalancingStrategy loadBalancingStrategy) {
- this.manager = new Manager(contactPoints, serializer, nioPoolSize, workerPoolSize, connectionPoolSettings, loadBalancingStrategy);
- }
-
- public synchronized void init() {
- if (!manager.initialized)
- manager.init();
- }
-
- /**
- * Creates a {@link Client.ClusteredClient} instance to this {@code Cluster}, meaning requests will be routed to
- * one or more servers (depending on the cluster configuration), where each request represents the entirety of a
- * transaction. A commit or rollback (in case of error) is automatically executed at the end of the request.
- */
- public Client connect() {
- return new Client.ClusteredClient(this);
- }
-
- /**
- * Creates a {@link Client.SessionedClient} instance to this {@code Cluster}, meaning requests will be routed to
- * a single server (randomly selected from the cluster), where the same bindings will be available on each request.
- * Requests are bound to the same thread on the server and thus transactions may extend beyond the bounds of a
- * single request. The transactions are managed by the user and must be committed or rolledback manually.
- *
- * @param sessionId user supplied id for the session which should be unique (a UUID is ideal).
- */
- public Client connect(final String sessionId) {
- if (null == sessionId || sessionId.isEmpty())
- throw new IllegalArgumentException("sessionId cannot be null or empty");
- return new Client.SessionedClient(this, sessionId);
- }
-
- @Override
- public String toString() {
- return manager.toString();
- }
-
- public static Builder build() {
- return new Builder();
- }
-
- public static Builder build(final String address) {
- return new Builder(address);
- }
-
- public static Builder build(final File configurationFile) throws FileNotFoundException {
- final Settings settings = Settings.read(new FileInputStream(configurationFile));
- final List<String> addresses = settings.hosts;
- if (addresses.size() == 0)
- throw new IllegalStateException("At least one value must be specified to the hosts setting");
-
- final Builder builder = new Builder(settings.hosts.get(0))
- .port(settings.port)
- .nioPoolSize(settings.nioPoolSize)
- .workerPoolSize(settings.workerPoolSize)
- .maxInProcessPerConnection(settings.connectionPool.maxInProcessPerConnection)
- .maxSimultaneousRequestsPerConnection(settings.connectionPool.maxSimultaneousRequestsPerConnection)
- .minSimultaneousRequestsPerConnection(settings.connectionPool.minSimultaneousRequestsPerConnection)
- .maxConnectionPoolSize(settings.connectionPool.maxSize)
- .minConnectionPoolSize(settings.connectionPool.minSize);
-
- // the first address was added above in the constructor, so skip it if there are more
- if (addresses.size() > 1)
- addresses.stream().skip(1).forEach(builder::addContactPoint);
-
- try {
- builder.serializer(settings.serializer.create());
- } catch (Exception ex) {
- throw new IllegalStateException("Could not establish serializer - " + ex.getMessage());
- }
-
- return builder;
- }
-
- /**
- * Create a {@code Cluster} with all default settings which will connect to one contact point at {@code localhost}.
- */
- public static Cluster open() {
- return build("localhost").create();
- }
-
- /**
- * Create a {@code Cluster} using a YAML-based configuration file.
- */
- public static Cluster open(final String configurationFile) throws Exception {
- final File file = new File(configurationFile);
- if (!file.exists())
- throw new IllegalArgumentException(String.format("Configuration file at %s does not exist", configurationFile));
-
- return build(file).create();
- }
-
- public void close() {
- closeAsync().join();
- }
-
- public CompletableFuture<Void> closeAsync() {
- return manager.close();
- }
-
- public List<URI> availableHosts() {
- return Collections.unmodifiableList(getClusterInfo().allHosts().stream()
- .filter(Host::isAvailable)
- .map(Host::getHostUri)
- .collect(Collectors.toList()));
- }
-
- Factory getFactory() {
- return manager.factory;
- }
-
- MessageSerializer getSerializer() {
- return manager.serializer;
- }
-
- ScheduledExecutorService executor() {
- return manager.executor;
- }
-
- Settings.ConnectionPoolSettings connectionPoolSettings() {
- return manager.connectionPoolSettings;
- }
-
- LoadBalancingStrategy loadBalancingStrategy() {
- return manager.loadBalancingStrategy;
- }
-
- ClusterInfo getClusterInfo() {
- return manager.clusterInfo;
- }
-
- public static class Builder {
- private List<InetAddress> addresses = new ArrayList<>();
- private int port = 8182;
- private MessageSerializer serializer = Serializers.KRYO_V1D0.simpleInstance();
- private int nioPoolSize = Runtime.getRuntime().availableProcessors();
- private int workerPoolSize = Runtime.getRuntime().availableProcessors() * 2;
- private int minConnectionPoolSize = ConnectionPool.MIN_POOL_SIZE;
- private int maxConnectionPoolSize = ConnectionPool.MAX_POOL_SIZE;
- private int minSimultaneousRequestsPerConnection = ConnectionPool.MIN_SIMULTANEOUS_REQUESTS_PER_CONNECTION;
- private int maxSimultaneousRequestsPerConnection = ConnectionPool.MAX_SIMULTANEOUS_REQUESTS_PER_CONNECTION;
- private int maxInProcessPerConnection = Connection.MAX_IN_PROCESS;
- private int minInProcessPerConnection = Connection.MIN_IN_PROCESS;
- private int maxWaitForConnection = Connection.MAX_WAIT_FOR_CONNECTION;
- private int maxContentLength = Connection.MAX_CONTENT_LENGTH;
- private int reconnectInitialDelay = Connection.RECONNECT_INITIAL_DELAY;
- private int reconnectInterval = Connection.RECONNECT_INTERVAL;
- private int resultIterationBatchSize = Connection.RESULT_ITERATION_BATCH_SIZE;
- private boolean enableSsl = false;
- private LoadBalancingStrategy loadBalancingStrategy = new LoadBalancingStrategy.RoundRobin();
-
- private Builder() {
- // empty to prevent direct instantiation
- }
-
- private Builder(final String address) {
- addContactPoint(address);
- }
-
- /**
- * Size of the pool for handling request/response operations. Defaults to the number of available processors.
- */
- public Builder nioPoolSize(final int nioPoolSize) {
- if (nioPoolSize < 1) throw new IllegalArgumentException("The workerPoolSize must be greater than zero");
- this.nioPoolSize = nioPoolSize;
- return this;
- }
-
- /**
- * Size of the pool for handling background work. Defaults to the number of available processors multiplied
- * by 2
- */
- public Builder workerPoolSize(final int workerPoolSize) {
- if (workerPoolSize < 1) throw new IllegalArgumentException("The workerPoolSize must be greater than zero");
- this.workerPoolSize = workerPoolSize;
- return this;
- }
-
- public Builder serializer(final String mimeType) {
- serializer = Serializers.valueOf(mimeType).simpleInstance();
- return this;
- }
-
- public Builder serializer(final Serializers mimeType) {
- serializer = mimeType.simpleInstance();
- return this;
- }
-
- public Builder serializer(final MessageSerializer serializer) {
- this.serializer = serializer;
- return this;
- }
-
- public Builder enableSsl(final boolean enable) {
- this.enableSsl = enable;
- return this;
- }
-
- public Builder minInProcessPerConnection(final int minInProcessPerConnection) {
- this.minInProcessPerConnection = minInProcessPerConnection;
- return this;
- }
-
- public Builder maxInProcessPerConnection(final int maxInProcessPerConnection) {
- this.maxInProcessPerConnection = maxInProcessPerConnection;
- return this;
- }
-
- public Builder maxSimultaneousRequestsPerConnection(final int maxSimultaneousRequestsPerConnection) {
- this.maxSimultaneousRequestsPerConnection = maxSimultaneousRequestsPerConnection;
- return this;
- }
-
- public Builder minSimultaneousRequestsPerConnection(final int minSimultaneousRequestsPerConnection) {
- this.minSimultaneousRequestsPerConnection = minSimultaneousRequestsPerConnection;
- return this;
- }
-
- public Builder maxConnectionPoolSize(final int maxSize) {
- this.maxConnectionPoolSize = maxSize;
- return this;
- }
-
- public Builder minConnectionPoolSize(final int minSize) {
- this.minConnectionPoolSize = minSize;
- return this;
- }
-
- /**
- * Override the server setting that determines how many results are returned per batch.
- */
- public Builder resultIterationBatchSize(final int size) {
- this.resultIterationBatchSize = size;
- return this;
- }
-
- /**
- * The maximum amount of time to wait for a connection to be borrowed from the connection pool.
- */
- public Builder maxWaitForConnection(final int maxWait) {
- this.maxWaitForConnection = maxWait;
- return this;
- }
-
- /**
- * The maximum size in bytes of any request sent to the server. This number should not exceed the same
- * setting defined on the server.
- */
- public Builder maxContentLength(final int maxContentLength) {
- this.maxContentLength = maxContentLength;
- return this;
- }
-
- /**
- * Time in milliseconds to wait before attempting to reconnect to a dead host after it has been marked dead.
- */
- public Builder reconnectIntialDelay(final int initialDelay) {
- this.reconnectInitialDelay = initialDelay;
- return this;
- }
-
- /**
- * Time in milliseconds to wait between retries when attempting to reconnect to a dead host.
- */
- public Builder reconnectInterval(final int interval) {
- this.reconnectInterval = interval;
- return this;
- }
-
- public Builder loadBalancingStrategy(final LoadBalancingStrategy loadBalancingStrategy) {
- this.loadBalancingStrategy = loadBalancingStrategy;
- return this;
- }
-
- public Builder addContactPoint(final String address) {
- try {
- this.addresses.add(InetAddress.getByName(address));
- return this;
- } catch (UnknownHostException e) {
- throw new IllegalArgumentException(e.getMessage());
- }
- }
-
- public Builder addContactPoints(final String... addresses) {
- for (String address : addresses)
- addContactPoint(address);
- return this;
- }
-
- public Builder port(final int port) {
- this.port = port;
- return this;
- }
-
- private List<InetSocketAddress> getContactPoints() {
- return addresses.stream().map(addy -> new InetSocketAddress(addy, port)).collect(Collectors.toList());
- }
-
- public Cluster create() {
- if (addresses.size() == 0) addContactPoint("localhost");
- final Settings.ConnectionPoolSettings connectionPoolSettings = new Settings.ConnectionPoolSettings();
- connectionPoolSettings.maxInProcessPerConnection = this.maxInProcessPerConnection;
- connectionPoolSettings.minInProcessPerConnection = this.minInProcessPerConnection;
- connectionPoolSettings.maxSimultaneousRequestsPerConnection = this.maxSimultaneousRequestsPerConnection;
- connectionPoolSettings.minSimultaneousRequestsPerConnection = this.minSimultaneousRequestsPerConnection;
- connectionPoolSettings.maxSize = this.maxConnectionPoolSize;
- connectionPoolSettings.minSize = this.minConnectionPoolSize;
- connectionPoolSettings.maxWaitForConnection = this.maxWaitForConnection;
- connectionPoolSettings.maxContentLength = this.maxContentLength;
- connectionPoolSettings.reconnectInitialDelay = this.reconnectInitialDelay;
- connectionPoolSettings.reconnectInterval = this.reconnectInterval;
- connectionPoolSettings.resultIterationBatchSize = this.resultIterationBatchSize;
- connectionPoolSettings.enableSsl = this.enableSsl;
- return new Cluster(getContactPoints(), serializer, this.nioPoolSize, this.workerPoolSize,
- connectionPoolSettings, loadBalancingStrategy);
- }
- }
-
- static class Factory {
- private final EventLoopGroup group;
-
- public Factory(final int nioPoolSize) {
- final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("gremlin-driver-loop-%d").build();
- group = new NioEventLoopGroup(nioPoolSize, threadFactory);
- }
-
- Bootstrap createBootstrap() {
- return new Bootstrap().group(group);
- }
-
- void shutdown() {
- group.shutdownGracefully().awaitUninterruptibly();
- }
- }
-
- class Manager {
- private ClusterInfo clusterInfo;
- private boolean initialized;
- private final List<InetSocketAddress> contactPoints;
- private final Factory factory;
- private final MessageSerializer serializer;
- private final Settings.ConnectionPoolSettings connectionPoolSettings;
- private final LoadBalancingStrategy loadBalancingStrategy;
-
- private final ScheduledExecutorService executor;
-
- private Manager(final List<InetSocketAddress> contactPoints, final MessageSerializer serializer,
- final int nioPoolSize, final int workerPoolSize, final Settings.ConnectionPoolSettings connectionPoolSettings,
- final LoadBalancingStrategy loadBalancingStrategy) {
- this.loadBalancingStrategy = loadBalancingStrategy;
- this.clusterInfo = new ClusterInfo(Cluster.this);
- this.contactPoints = contactPoints;
- this.connectionPoolSettings = connectionPoolSettings;
- this.factory = new Factory(nioPoolSize);
- this.serializer = serializer;
- this.executor = Executors.newScheduledThreadPool(workerPoolSize,
- new BasicThreadFactory.Builder().namingPattern("gremlin-driver-worker-%d").build());
- }
-
- synchronized void init() {
- if (initialized)
- return;
-
- initialized = true;
-
- contactPoints.forEach(address -> {
- final Host host = clusterInfo.add(address);
- if (host != null)
- host.makeAvailable();
- });
- }
-
- CompletableFuture<Void> close() {
- // this method is exposed publicly in both blocking and non-blocking forms.
- return CompletableFuture.supplyAsync(() -> {
- this.factory.shutdown();
- return null;
- });
- }
-
- @Override
- public String toString() {
- return String.join(", ", contactPoints.stream().map(InetSocketAddress::toString).collect(Collectors.<String>toList()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/ClusterInfo.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/ClusterInfo.java b/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/ClusterInfo.java
deleted file mode 100644
index b9dbb33..0000000
--- a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/ClusterInfo.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.driver;
-
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-class ClusterInfo {
-
- private final Cluster cluster;
- private final ConcurrentMap<InetSocketAddress, Host> hosts = new ConcurrentHashMap<>();
-
- public ClusterInfo(final Cluster cluster) {
- this.cluster = cluster;
- }
-
- public Host add(final InetSocketAddress address) {
- final Host newHost = new Host(address, cluster);
- final Host previous = hosts.putIfAbsent(address, newHost);
- return previous == null ? newHost : null;
- }
-
- Collection<Host> allHosts() {
- return hosts.values();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Connection.java
deleted file mode 100644
index 41458ae..0000000
--- a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Connection.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.driver;
-
-import com.tinkerpop.gremlin.driver.exception.ConnectionException;
-import com.tinkerpop.gremlin.driver.message.RequestMessage;
-import com.tinkerpop.gremlin.driver.message.ResponseMessage;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * A single connection to a Gremlin Server instance.
- *
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-class Connection {
- private static final Logger logger = LoggerFactory.getLogger(Connection.class);
-
- private final Channel channel;
- private final URI uri;
- private final ConcurrentMap<UUID, ResponseQueue> pending = new ConcurrentHashMap<>();
- private final Cluster cluster;
- private final ConnectionPool pool;
-
- public static final int MAX_IN_PROCESS = 4;
- public static final int MIN_IN_PROCESS = 1;
- public static final int MAX_WAIT_FOR_CONNECTION = 3000;
- public static final int MAX_CONTENT_LENGTH = 65536;
- public static final int RECONNECT_INITIAL_DELAY = 1000;
- public static final int RECONNECT_INTERVAL = 1000;
- public static final int RESULT_ITERATION_BATCH_SIZE = 64;
-
- public final AtomicInteger inFlight = new AtomicInteger(0);
- private volatile boolean isDead = false;
- private final int maxInProcess;
-
- private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
-
- public Connection(final URI uri, final ConnectionPool pool, final Cluster cluster, final int maxInProcess) throws ConnectionException {
- this.uri = uri;
- this.cluster = cluster;
- this.pool = pool;
- this.maxInProcess = maxInProcess;
-
- final Bootstrap b = this.cluster.getFactory().createBootstrap();
-
- // todo: dynamically instantiate the channelizer from settings
- final Channelizer channelizer = new Channelizer.WebSocketChannelizer();
- channelizer.init(this);
- b.channel(NioSocketChannel.class).handler(channelizer);
-
- try {
- channel = b.connect(uri.getHost(), uri.getPort()).sync().channel();
- channelizer.connected();
-
- logger.info("Created new connection for {}", uri);
- } catch (Exception ie) {
- logger.debug("Error opening connection on {}", uri);
- throw new ConnectionException(uri, "Could not open connection", ie);
- }
- }
-
- /**
- * A connection can only have so many things in process happening on it at once, where "in process" refers to
- * the maximum number of in-process requests less the number of pending responses.
- */
- public int availableInProcess() {
- return maxInProcess - pending.size();
- }
-
- public boolean isDead() {
- return isDead;
- }
-
- public boolean isClosed() {
- return closeFuture.get() != null;
- }
-
- URI getUri() {
- return uri;
- }
-
- Cluster getCluster() {
- return cluster;
- }
-
- ConcurrentMap<UUID, ResponseQueue> getPending() {
- return pending;
- }
-
- public CompletableFuture<Void> closeAsync() {
- final CompletableFuture<Void> future = new CompletableFuture<>();
- if (!closeFuture.compareAndSet(null, future))
- return closeFuture.get();
-
- // make sure all requests in the queue are fully processed before killing. if they are then shutdown
- // can be immediate. if not this method will signal the readCompleted future defined in the write()
- // operation to check if it can close. in this way the connection no longer receives writes, but
- // can continue to read. If a request never comes back the future won't get fulfilled and the connection
- // will maintain a "pending" request, that won't quite ever go away. The build up of such a dead requests
- // on a connection in the connection pool will force the pool to replace the connection for a fresh one
- if (pending.isEmpty()) {
- if (null == channel)
- future.complete(null);
- else
- shutdown(future);
- }
-
- return future;
- }
-
- public void close() {
- try {
- closeAsync().get();
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-
- public ChannelPromise write(final RequestMessage requestMessage, final CompletableFuture<ResultSet> future) {
- // once there is a completed write, then create a traverser for the result set and complete
- // the promise so that the client knows that that it can start checking for results.
- final Connection thisConnection = this;
- final ChannelPromise promise = channel.newPromise()
- .addListener(f -> {
- if (!f.isSuccess()) {
- logger.debug(String.format("Write on connection %s failed", thisConnection), f.cause());
- thisConnection.isDead = true;
- thisConnection.returnToPool();
- future.completeExceptionally(f.cause());
- } else {
- final LinkedBlockingQueue<ResponseMessage> responseQueue = new LinkedBlockingQueue<>();
- final CompletableFuture<Void> readCompleted = new CompletableFuture<>();
- readCompleted.thenAcceptAsync(v -> {
- thisConnection.returnToPool();
- if (isClosed() && pending.isEmpty())
- shutdown(closeFuture.get());
- });
- final ResponseQueue handler = new ResponseQueue(responseQueue, readCompleted);
- pending.put(requestMessage.getRequestId(), handler);
- final ResultSet resultSet = new ResultSet(handler, cluster.executor(), channel,
- () -> {
- pending.remove(requestMessage.getRequestId());
- return null;
- });
- future.complete(resultSet);
- }
- });
- channel.writeAndFlush(requestMessage, promise);
-
- return promise;
- }
-
- public void returnToPool() {
- try {
- if (pool != null) pool.returnConnection(this);
- } catch (ConnectionException ce) {
- logger.debug("Returned {} connection to {} but an error occurred - {}", this, pool, ce.getMessage());
- }
- }
-
- private void shutdown(final CompletableFuture<Void> future) {
- channel.writeAndFlush(new CloseWebSocketFrame());
- final ChannelPromise promise = channel.newPromise();
- promise.addListener(f -> {
- if (f.cause() != null)
- future.completeExceptionally(f.cause());
- else
- future.complete(null);
- });
-
- channel.close(promise);
- }
-
- @Override
- public String toString() {
- return String.format("Connection{host=%s, isDead=%s, inFlight=%s, pending=%s}",
- pool.host, isDead, inFlight, pending.size());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/ConnectionPool.java
deleted file mode 100644
index 11eecc6..0000000
--- a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/ConnectionPool.java
+++ /dev/null
@@ -1,442 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.driver;
-
-import com.tinkerpop.gremlin.driver.exception.ConnectionException;
-import com.tinkerpop.gremlin.util.TimeUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-class ConnectionPool {
- private static final Logger logger = LoggerFactory.getLogger(ConnectionPool.class);
-
- public static final int MIN_POOL_SIZE = 2;
- public static final int MAX_POOL_SIZE = 8;
- public static final int MIN_SIMULTANEOUS_REQUESTS_PER_CONNECTION = 8;
- public static final int MAX_SIMULTANEOUS_REQUESTS_PER_CONNECTION = 16;
-
- public final Host host;
- private final Cluster cluster;
- private final List<Connection> connections;
- private final AtomicInteger open;
- private final Set<Connection> bin = new CopyOnWriteArraySet<>();
- private final int minPoolSize;
- private final int maxPoolSize;
- private final int minSimultaneousRequestsPerConnection;
- private final int maxSimultaneousRequestsPerConnection;
- private final int minInProcess;
-
- private final AtomicInteger scheduledForCreation = new AtomicInteger();
-
- private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
-
- private volatile int waiter = 0;
- private final Lock waitLock = new ReentrantLock(true);
- private final Condition hasAvailableConnection = waitLock.newCondition();
-
- public ConnectionPool(final Host host, final Cluster cluster) {
- this.host = host;
- this.cluster = cluster;
-
- final Settings.ConnectionPoolSettings settings = settings();
- this.minPoolSize = settings.minSize;
- this.maxPoolSize = settings.maxSize;
- this.minSimultaneousRequestsPerConnection = settings.minSimultaneousRequestsPerConnection;
- this.maxSimultaneousRequestsPerConnection = settings.maxSimultaneousRequestsPerConnection;
- this.minInProcess = settings.minInProcessPerConnection;
-
- final List<Connection> l = new ArrayList<>(minPoolSize);
-
- try {
- for (int i = 0; i < minPoolSize; i++)
- l.add(new Connection(host.getHostUri(), this, cluster, settings.maxInProcessPerConnection));
- } catch (ConnectionException ce) {
- // ok if we don't get it initialized here - when a request is attempted in a connection from the
- // pool it will try to create new connections as needed.
- logger.debug("Could not initialize connections in pool for {} - pool size at {}", host, l.size());
- considerUnavailable();
- }
-
- this.connections = new CopyOnWriteArrayList<>(l);
- this.open = new AtomicInteger(connections.size());
-
- logger.info("Opening connection pool on {} with core size of {}", host, minPoolSize);
- }
-
- public Settings.ConnectionPoolSettings settings() {
- return cluster.connectionPoolSettings();
- }
-
- public Connection borrowConnection(final long timeout, final TimeUnit unit) throws TimeoutException, ConnectionException {
- logger.debug("Borrowing connection from pool on {} - timeout in {} {}", host, timeout, unit);
-
- if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
-
- final Connection leastUsedConn = selectLeastUsed();
-
- if (connections.isEmpty()) {
- logger.debug("Tried to borrow connection but the pool was empty for {} - scheduling pool creation and waiting for connection", host);
- for (int i = 0; i < minPoolSize; i++) {
- scheduledForCreation.incrementAndGet();
- newConnection();
- }
-
- return waitForConnection(timeout, unit);
- }
-
- if (null == leastUsedConn) {
- if (isClosed())
- throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
- logger.debug("Pool was initialized but a connection could not be selected earlier - waiting for connection on {}", host);
- return waitForConnection(timeout, unit);
- }
-
- // if the number in flight on the least used connection exceeds the max allowed and the pool size is
- // not at maximum then consider opening a connection
- final int currentPoolSize = connections.size();
- if (leastUsedConn.inFlight.get() >= maxSimultaneousRequestsPerConnection && currentPoolSize < maxPoolSize) {
- logger.debug("Least used {} on {} exceeds maxSimultaneousRequestsPerConnection but pool size {} < maxPoolSize - consider new connection",
- leastUsedConn, host, currentPoolSize);
- considerNewConnection();
- }
-
- while (true) {
- final int inFlight = leastUsedConn.inFlight.get();
- final int availableInProcess = leastUsedConn.availableInProcess();
-
- // if the number in flight starts to exceed what's available for this connection, then we need
- // to wait for a connection to become available.
- if (inFlight >= leastUsedConn.availableInProcess()) {
- logger.debug("Least used connection selected from pool for {} but inFlight [{}] >= availableInProcess [{}] - wait",
- host, inFlight, availableInProcess);
- return waitForConnection(timeout, unit);
- }
-
- if (leastUsedConn.inFlight.compareAndSet(inFlight, inFlight + 1)) {
- logger.debug("Return least used {} on {}", leastUsedConn, host);
- return leastUsedConn;
- }
- }
- }
-
- public void returnConnection(final Connection connection) throws ConnectionException {
- logger.debug("Attempting to return {} on {}", connection, host);
- if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
-
- int inFlight = connection.inFlight.decrementAndGet();
- if (connection.isDead()) {
- logger.debug("Marking {} as dead", this.host);
- considerUnavailable();
- } else {
- if (bin.contains(connection) && inFlight == 0) {
- logger.debug("{} is already in the bin and it has no inflight requests so it is safe to close", connection);
- if (bin.remove(connection))
- connection.closeAsync();
- return;
- }
-
- // destroy a connection that exceeds the minimum pool size - it does not have the right to live if it
- // isn't busy. replace a connection that has a low available in process count which likely means that
- // it's backing up with requests that might never have returned. if neither of these scenarios are met
- // then let the world know the connection is available.
- final int poolSize = connections.size();
- final int availableInProcess = connection.availableInProcess();
- if (poolSize > minPoolSize && inFlight <= minSimultaneousRequestsPerConnection) {
- logger.debug("On {} pool size of {} > minPoolSize {} and inFlight of {} <= minSimultaneousRequestsPerConnection {} so destroy {}",
- host, poolSize, minPoolSize, inFlight, minSimultaneousRequestsPerConnection, connection);
- destroyConnection(connection);
- } else if (connection.availableInProcess() < minInProcess) {
- logger.debug("On {} availableInProcess {} < minInProcess {} so replace {}", host, availableInProcess, minInProcess, connection);
- replaceConnection(connection);
- } else
- announceAvailableConnection();
- }
- }
-
- public boolean isClosed() {
- return closeFuture.get() != null;
- }
-
- /**
- * Permanently kills the pool.
- */
- public CompletableFuture<Void> closeAsync() {
- logger.info("Signalled closing of connection pool on {} with core size of {}", host, minPoolSize);
-
- CompletableFuture<Void> future = closeFuture.get();
- if (future != null)
- return future;
-
- announceAllAvailableConnection();
- future = CompletableFuture.allOf(killAvailableConnections());
-
- return closeFuture.compareAndSet(null, future) ? future : closeFuture.get();
- }
-
- public int opened() {
- return open.get();
- }
-
- private CompletableFuture[] killAvailableConnections() {
- final List<CompletableFuture<Void>> futures = new ArrayList<>(connections.size());
- for (Connection connection : connections) {
- final CompletableFuture<Void> future = connection.closeAsync();
- future.thenRunAsync(open::decrementAndGet);
- futures.add(future);
- }
- return futures.toArray(new CompletableFuture[futures.size()]);
- }
-
- private void replaceConnection(final Connection connection) {
- logger.debug("Replace {}", connection);
-
- open.decrementAndGet();
- considerNewConnection();
- definitelyDestroyConnection(connection);
- }
-
- private void considerNewConnection() {
- logger.debug("Considering new connection on {} where pool size is {}", host, connections.size());
- while (true) {
- int inCreation = scheduledForCreation.get();
-
- logger.debug("There are {} connections scheduled for creation on {}", inCreation, host);
-
- // don't create more than one at a time
- if (inCreation >= 1)
- return;
- if (scheduledForCreation.compareAndSet(inCreation, inCreation + 1))
- break;
- }
-
- newConnection();
- }
-
- private void newConnection() {
- cluster.executor().submit(() -> {
- addConnectionIfUnderMaximum();
- scheduledForCreation.decrementAndGet();
- return null;
- });
- }
-
- private boolean addConnectionIfUnderMaximum() {
- while (true) {
- int opened = open.get();
- if (opened >= maxPoolSize)
- return false;
-
- if (open.compareAndSet(opened, opened + 1))
- break;
- }
-
- if (isClosed()) {
- open.decrementAndGet();
- return false;
- }
-
- try {
- connections.add(new Connection(host.getHostUri(), this, cluster, settings().maxInProcessPerConnection));
- } catch (ConnectionException ce) {
- logger.debug("Connections were under max, but there was an error creating the connection.", ce);
- considerUnavailable();
- return false;
- }
-
- announceAvailableConnection();
- return true;
- }
-
- private boolean destroyConnection(final Connection connection) {
- while (true) {
- int opened = open.get();
- if (opened <= minPoolSize)
- return false;
-
- if (open.compareAndSet(opened, opened - 1))
- break;
- }
-
- definitelyDestroyConnection(connection);
- return true;
- }
-
- private void definitelyDestroyConnection(final Connection connection) {
- bin.add(connection);
- connections.remove(connection);
-
- if (connection.inFlight.get() == 0 && bin.remove(connection))
- connection.closeAsync();
-
- logger.debug("{} destroyed", connection);
- }
-
- private Connection waitForConnection(final long timeout, final TimeUnit unit) throws TimeoutException, ConnectionException {
- long start = System.nanoTime();
- long remaining = timeout;
- long to = timeout;
- do {
- try {
- awaitAvailableConnection(remaining, unit);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- to = 0;
- }
-
- if (isClosed())
- throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
-
- final Connection leastUsed = selectLeastUsed();
- if (leastUsed != null) {
- while (true) {
- final int inFlight = leastUsed.inFlight.get();
- final int availableInProcess = leastUsed.availableInProcess();
- if (inFlight >= availableInProcess) {
- logger.debug("Least used {} on {} has requests inFlight [{}] >= availableInProcess [{}] - may timeout waiting for connection",
- leastUsed, host, inFlight, availableInProcess);
- break;
- }
-
- if (leastUsed.inFlight.compareAndSet(inFlight, inFlight + 1)) {
- logger.debug("Return least used {} on {} after waiting", leastUsed, host);
- return leastUsed;
- }
- }
- }
-
- remaining = to - TimeUtil.timeSince(start, unit);
- logger.debug("Continue to wait for connection on {} if {} > 0", remaining);
- } while (remaining > 0);
-
- logger.debug("Timed-out waiting for connection on {} - possibly unavailable", host);
-
- // if we timeout borrowing a connection that might mean the host is dead (or the timeout was super short).
- // either way supply a function to reconnect
- this.considerUnavailable();
-
- throw new TimeoutException();
- }
-
- private void considerUnavailable() {
- // called when a connection is "dead" right now such that a "dead" connection means the host is basically
- // "dead". that's probably ok for now, but this decision should likely be more flexible.
- host.makeUnavailable(this::tryReconnect);
-
- // let the load-balancer know that the host is acting poorly
- this.cluster.loadBalancingStrategy().onUnavailable(host);
-
- }
-
- private boolean tryReconnect(final Host h) {
- logger.debug("Trying to re-establish connection on {}", host);
-
- try {
-
- connections.add(new Connection(host.getHostUri(), this, cluster, settings().maxInProcessPerConnection));
- this.open.set(connections.size());
-
- // host is reconnected and a connection is now available
- this.cluster.loadBalancingStrategy().onAvailable(host);
- return true;
- } catch (Exception ex) {
- return false;
- }
- }
-
- private void announceAvailableConnection() {
- logger.debug("Announce connection available on {}", host);
-
- if (waiter == 0)
- return;
-
- waitLock.lock();
- try {
- hasAvailableConnection.signal();
- } finally {
- waitLock.unlock();
- }
- }
-
- private Connection selectLeastUsed() {
- int minInFlight = Integer.MAX_VALUE;
- Connection leastBusy = null;
- for (Connection connection : connections) {
- int inFlight = connection.inFlight.get();
- if (!connection.isDead() && inFlight < minInFlight) {
- minInFlight = inFlight;
- leastBusy = connection;
- }
- }
- return leastBusy;
- }
-
- private void awaitAvailableConnection(long timeout, TimeUnit unit) throws InterruptedException {
- logger.debug("Wait {} {} for an available connection on {} with {}", timeout, unit, host, Thread.currentThread());
-
- waitLock.lock();
- waiter++;
- try {
- hasAvailableConnection.await(timeout, unit);
- } finally {
- waiter--;
- waitLock.unlock();
- }
- }
-
- private void announceAllAvailableConnection() {
- if (waiter == 0)
- return;
-
- waitLock.lock();
- try {
- hasAvailableConnection.signalAll();
- } finally {
- waitLock.unlock();
- }
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder("ConnectionPool (");
- sb.append(host);
- sb.append(") - ");
- connections.forEach(c -> {
- sb.append(c);
- sb.append(",");
- });
- return sb.toString().trim();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Handler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Handler.java
deleted file mode 100644
index b54b04c..0000000
--- a/gremlin-driver/src/main/java/com/tinkerpop/gremlin/driver/Handler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.tinkerpop.gremlin.driver;
-
-import com.tinkerpop.gremlin.driver.exception.ResponseException;
-import com.tinkerpop.gremlin.driver.message.ResponseMessage;
-import com.tinkerpop.gremlin.driver.message.ResponseStatusCode;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.util.ReferenceCountUtil;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Traverser for internal handler classes for constructing the Channel Pipeline.
- *
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-class Handler {
-
- static class GremlinResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> {
- private final ConcurrentMap<UUID, ResponseQueue> pending;
-
- public GremlinResponseHandler(final ConcurrentMap<UUID, ResponseQueue> pending) {
- this.pending = pending;
- }
-
- @Override
- protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception {
- try {
- if (response.getStatus().getCode() == ResponseStatusCode.SUCCESS) {
- final Object data = response.getResult().getData();
- if (data instanceof List) {
- // unrolls the collection into individual response messages to be handled by the queue
- final List<Object> listToUnroll = (List<Object>) data;
- final ResponseQueue queue = pending.get(response.getRequestId());
- listToUnroll.forEach(item -> queue.add(
- ResponseMessage.build(response.getRequestId())
- .result(item).create()));
- } else {
- // since this is not a list it can just be added to the queue
- pending.get(response.getRequestId()).add(response);
- }
- } else if (response.getStatus().getCode() == ResponseStatusCode.SUCCESS_TERMINATOR)
- pending.remove(response.getRequestId()).markComplete();
- else
- pending.get(response.getRequestId()).markError(new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage()));
- } finally {
- ReferenceCountUtil.release(response);
- }
- }
- }
-
-}