You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/09/29 22:42:05 UTC

[3/3] incubator-tinkerpop git commit: Adjustments to how hosts are marked unavailable.

Adjustments to how hosts are marked unavailable.

Connections are destroyed when it happens and connection retry follows more standard patterns.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/4bcddd7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/4bcddd7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/4bcddd7a

Branch: refs/heads/tp30
Commit: 4bcddd7ad43799ccc22a12a240eed2626b17a5d8
Parents: 57c38a8
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue Sep 29 16:40:01 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Tue Sep 29 16:40:01 2015 -0400

----------------------------------------------------------------------
 .../tinkerpop/gremlin/driver/ConnectionPool.java   | 17 +++++++++++++++--
 .../apache/tinkerpop/gremlin/driver/Handler.java   | 10 +++++++---
 2 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4bcddd7a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index 16a9796..96c151c 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.gremlin.driver;
 
 import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.util.TimeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -385,22 +386,34 @@ final class ConnectionPool {
         // "dead".  that's probably ok for now, but this decision should likely be more flexible.
         host.makeUnavailable(this::tryReconnect);
 
+        // if the host is unavailable then we should release the connections
+        connections.forEach(this::definitelyDestroyConnection);
+
         // let the load-balancer know that the host is acting poorly
         this.cluster.loadBalancingStrategy().onUnavailable(host);
 
     }
 
+    /**
+     * Attempt to reconnect to the {@link Host} that was previously marked as unavailable.  This method gets called
+     * as part of a schedule in {@link Host} to periodically try to create working connections.
+     */
     private boolean tryReconnect(final Host h) {
         logger.debug("Trying to re-establish connection on {}", host);
 
+        Connection connection = null;
         try {
-            connections.add(new Connection(host.getHostUri(), this, settings().maxInProcessPerConnection));
-            this.open.set(connections.size());
+            connection = borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
+            final RequestMessage ping = RequestMessage.build(Tokens.OPS_EVAL).add(Tokens.ARGS_GREMLIN, "''").create();
+            final CompletableFuture<ResultSet> f = new CompletableFuture<>();
+            connection.write(ping, f);
+            f.get().all().get();
 
             // host is reconnected and a connection is now available
             this.cluster.loadBalancingStrategy().onAvailable(host);
             return true;
         } catch (Exception ex) {
+            if (connection != null) definitelyDestroyConnection(connection);
             return false;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4bcddd7a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index 8871f13..fce658f 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
@@ -27,6 +28,7 @@ import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.util.Attribute;
 import io.netty.util.AttributeKey;
 import io.netty.util.ReferenceCountUtil;
+import org.apache.tinkerpop.gremlin.driver.ser.SerializationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -207,13 +209,15 @@ final class Handler {
             // if this happens enough times (like the client is unable to deserialize a response) the pending
             // messages queue will not clear.  wonder if there is some way to cope with that.  of course, if
             // there are that many failures someone would take notice and hopefully stop the client.
-            logger.error("Could not process the response - correct the problem and restart the driver.", cause);
+            logger.error("Could not process the response", cause);
 
-            // the channel is getting closed because of something pretty bad so release all the completeable
+            // the channel took an error because of something pretty bad so release all the completeable
             // futures out there
             pending.entrySet().stream().forEach(kv -> kv.getValue().markError(cause));
 
-            ctx.close();
+            // serialization exceptions should not close the channel - that's worth a retry
+            if (!ExceptionUtils.getThrowableList(cause).stream().anyMatch(t -> t instanceof SerializationException))
+                ctx.close();
         }
     }