You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2021/12/28 18:22:29 UTC
[tinkerpop] 01/01: Revert "Changes to address re-try failures on initially dead host"
This is an automated email from the ASF dual-hosted git repository.
spmallette pushed a commit to branch ci-fix
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 477926fe3168abe776adcfb07b0ba2cdc8be904a
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Tue Dec 28 13:20:26 2021 -0500
Revert "Changes to address re-try failures on initially dead host"
This reverts commit 586ec3dda858119961b8f0c23a88131f47314515.
---
CHANGELOG.asciidoc | 1 -
.../console/jsr223/DriverRemoteAcceptorTest.java | 12 ++--
.../apache/tinkerpop/gremlin/driver/Client.java | 79 +++++-----------------
.../apache/tinkerpop/gremlin/driver/Cluster.java | 2 +
.../gremlin/server/GremlinDriverIntegrateTest.java | 62 -----------------
5 files changed, 27 insertions(+), 129 deletions(-)
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index ed87ae9..a00b118 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -27,7 +27,6 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
* Prevented XML External Entity (XXE) style attacks via `GraphMLReader` by disabling DTD and external entities by default.
* Improved error message for failed serialization for HTTP-based requests.
* Fixed a `NullPointerException` that could occur during a failed `Connection` initialization due to uninstantiated `AtomicInteger`.
-* Minor changes to the initialization of Java driver `Cluster` and `Client` such that hosts are marked as available only after successfully initializing connection pools.
* `NoHostAvailableException` now contains a cause for the failure.
* Bumped to Netty 4.1.72.
diff --git a/gremlin-console/src/test/java/org/apache/tinkerpop/gremlin/console/jsr223/DriverRemoteAcceptorTest.java b/gremlin-console/src/test/java/org/apache/tinkerpop/gremlin/console/jsr223/DriverRemoteAcceptorTest.java
index 81c996e..ab11d2e 100644
--- a/gremlin-console/src/test/java/org/apache/tinkerpop/gremlin/console/jsr223/DriverRemoteAcceptorTest.java
+++ b/gremlin-console/src/test/java/org/apache/tinkerpop/gremlin/console/jsr223/DriverRemoteAcceptorTest.java
@@ -30,7 +30,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
+import static org.hamcrest.CoreMatchers.startsWith;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
/**
* @author Stephen Mallette (http://stephen.genoprime.com)
@@ -124,9 +126,11 @@ public class DriverRemoteAcceptorTest {
acceptor.configure(Arrays.asList("timeout", "-1"));
}
- @Test(expected = RemoteException.class)
- public void shouldNotConnectWhenNoHostIsAvailable() throws Exception {
- // there is no gremlin server running for this test, so this remote should throw due to NoHostAvailable exception thrown by the driver
- acceptor.connect(Arrays.asList(Storage.toPath(TestHelper.generateTempFileFromResource(this.getClass(), "remote.yaml", ".tmp"))));
+ @Test
+ public void shouldConnect() throws Exception {
+ // there is no gremlin server running for this test, but gremlin-driver lazily connects so this should
+ // be ok to just validate that a connection is created
+ assertThat(acceptor.connect(Arrays.asList(Storage.toPath(TestHelper.generateTempFileFromResource(this.getClass(), "remote.yaml", ".tmp")))).toString(),
+ startsWith("Configured "));
}
}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index a816286..09de29d 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -32,7 +32,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -542,23 +541,16 @@ public abstract class Client {
.toArray(CompletableFuture[]::new))
.join();
} catch (CompletionException ex) {
- this.initializationFailure = ExceptionUtils.getRootCause(ex) != null ? ExceptionUtils.getRootCause(ex) : ex;
- logger.error("", this.initializationFailure);
+ Throwable cause;
+ Throwable result = ex;
+ if (null != (cause = ex.getCause())) {
+ result = cause;
+ }
+
+ logger.error("", result);
} finally {
hostExecutor.shutdown();
}
-
- // throw an error if there is no host available after initializing connection pool.
- if (cluster.availableHosts().isEmpty()) {
- throw new NoHostAvailableException();
- }
-
- // try to re-initiate any unavailable hosts in the background.
- final List<Host> unavailableHosts = cluster.allHosts()
- .stream().filter(host -> !host.isAvailable()).collect(Collectors.toList());
- if (!unavailableHosts.isEmpty()) {
- CompletableFuture.runAsync(() -> handleUnavailableHosts(unavailableHosts));
- }
}
/**
@@ -580,47 +572,16 @@ public abstract class Client {
private Consumer<Host> initializeConnectionSetupForHost = host -> {
try {
- // hosts that don't initialize connection pools will come up as a dead host.
+ // hosts that don't initialize connection pools will come up as a dead host
hostConnectionPools.put(host, new ConnectionPool(host, ClusteredClient.this));
- // hosts are not marked as available at cluster initialization, and are made available here instead.
- host.makeAvailable();
-
- // added a new host to the cluster so let the load-balancer know.
+ // added a new host to the cluster so let the load-balancer know
ClusteredClient.this.cluster.loadBalancingStrategy().onNew(host);
} catch (RuntimeException ex) {
- throw new RuntimeException(String.format("Could not initialize client for %s.", host), ex);
+ final String errMsg = "Could not initialize client for " + host;
+ throw new RuntimeException(errMsg, ex);
}
};
-
- private void handleUnavailableHosts(List<Host> unavailableHosts) {
- // start the re-initialization attempt for each of the unavailable hosts through Host.makeUnavailable().
- try {
- CompletableFuture.allOf(unavailableHosts.stream()
- .map(host -> CompletableFuture.runAsync(() -> host.makeUnavailable(this::tryReInitializeHost)))
- .toArray(CompletableFuture[]::new))
- .join();
- } catch (CompletionException ex) {
- logger.error("", (ex.getCause() == null) ? ex : ex.getCause());
- }
- }
-
- /**
- * Attempt to re-initialize the {@link Host} that was previously marked as unavailable. This method gets called
- * as part of a schedule in {@link Host} to periodically try to re-initialize.
- */
- public boolean tryReInitializeHost(final Host host) {
- logger.debug("Trying to re-initiate host connection pool on {}", host);
-
- try {
- initializeConnectionSetupForHost.accept(host);
- return true;
- } catch (Exception ex) {
- logger.debug("Failed re-initialization attempt on {}", host, ex);
- return false;
- }
- }
-
}
/**
@@ -792,21 +753,15 @@ public abstract class Client {
*/
@Override
protected void initializeImplementation() {
- // chooses a host at random from all hosts
- if (cluster.allHosts().isEmpty()) {
- throw new IllegalStateException("No available host in the cluster");
- }
-
- final List<Host> hosts = new ArrayList<>(cluster.allHosts());
+ // chooses an available host at random
+ final List<Host> hosts = cluster.allHosts()
+ .stream().filter(Host::isAvailable).collect(Collectors.toList());
+ if (hosts.isEmpty()) throw new IllegalStateException("No available host in the cluster");
Collections.shuffle(hosts);
- // if a host has been marked as available, use it instead
- Optional<Host> host = hosts.stream().filter(Host::isAvailable).findFirst();
- final Host selectedHost = host.orElse(hosts.get(0));
+ final Host host = hosts.get(0);
- // only mark host as available if we can initialize the connection pool successfully
try {
- connectionPool = new ConnectionPool(selectedHost, this, Optional.of(1), Optional.of(1));
- selectedHost.makeAvailable();
+ connectionPool = new ConnectionPool(host, this, Optional.of(1), Optional.of(1));
} catch (RuntimeException ex) {
logger.error("Could not initialize client for {}", host, ex);
this.initializationFailure = ex;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index 1d73343..eaa5e99 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -1283,6 +1283,8 @@ public final class Cluster {
contactPoints.forEach(address -> {
final Host host = add(address);
+ if (host != null)
+ host.makeAvailable();
});
}
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 2eb6aa7..f4afa9d 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -1877,66 +1877,4 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
assertEquals(100, result1.get().one().getInt());
}
-
- /**
- * Client created on an initially dead host should fail initially, and recover after the dead host has restarted
- * @param testClusterClient - boolean flag set to test clustered client if true and sessioned client if false
- * @throws Exception
- */
- public void testShouldFailOnInitiallyDeadHost(final boolean testClusterClient) throws Exception {
- logger.info("Stopping server.");
- this.stopServer();
-
- final Cluster cluster = TestClientFactory.build().create();
- final Client client = testClusterClient? cluster.connect() : cluster.connect("sessionClient");
-
- try {
- // try to re-issue a request now that the server is down
- logger.info("Verifying driver cannot connect to server.");
- client.submit("g").all().get(500, TimeUnit.MILLISECONDS);
- fail("Should throw an exception.");
- } catch (RuntimeException re) {
- // Client would have no active connections to the host, hence it would encounter a timeout
- // trying to find an alive connection to the host.
- assertThat(re.getCause(), instanceOf(NoHostAvailableException.class));
-
- //
- // should recover when the server comes back
- //
-
- // restart server
- logger.info("Restarting server.");
- this.startServer();
-
- // try a bunch of times to reconnect. on slower systems this may simply take longer...looking at you travis
- for (int ix = 1; ix < 11; ix++) {
- // the retry interval is 1 second, wait a bit longer
- TimeUnit.MILLISECONDS.sleep(1250);
-
- try {
- logger.info(String.format("Connecting driver to server - attempt # %s. ", 1 + ix));
- final List<Result> results = client.submit("1+1").all().get(3000, TimeUnit.MILLISECONDS);
- assertEquals(1, results.size());
- assertEquals(2, results.get(0).getInt());
- logger.info("Connection successful.");
- break;
- } catch (Exception ex) {
- if (ix == 10)
- fail("Should have eventually succeeded");
- }
- }
- } finally {
- cluster.close();
- }
- }
-
- @Test
- public void shouldFailOnInitiallyDeadHostForClusterClient() throws Exception {
- testShouldFailOnInitiallyDeadHost(true);
- }
-
- @Test
- public void shouldFailOnInitiallyDeadHostForSessionClient() throws Exception {
- testShouldFailOnInitiallyDeadHost(false);
- }
}