You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/11/17 19:28:47 UTC
[3/7] incubator-brooklyn git commit: Adds ReachableSocketFinder
Adds ReachableSocketFinder
Used by JcloudsLocation to determine which IP of the node is reachable.
Previously it used the jclouds’:
managementContext.utils().sshForNode().apply(node)
but that fails with certain user/login configurations - e.g. when “node”
doesn’t have login-credentials, but the JcloudsLocation.obtain knows
about the credentials through some other configuration.
Code is based on the Jclouds ConcurrentOpenSocketFinder. However, it
is much simplified because we don’t abort-early if the VM’s status
goes to failed (we don’t concurrently poll the cloud-provider to find
out the state, as jclouds was doing).
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/4cc09b0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/4cc09b0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/4cc09b0a
Branch: refs/heads/master
Commit: 4cc09b0ae049d6f0c78ec691993f80fd8e82765c
Parents: af39886
Author: Aled Sage <al...@gmail.com>
Authored: Mon Nov 9 08:13:58 2015 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 17 17:49:26 2015 +0000
----------------------------------------------------------------------
.../brooklyn/location/jclouds/JcloudsUtil.java | 50 ++++--
.../util/net/ReachableSocketFinder.java | 154 +++++++++++++++++
.../util/net/ReachableSocketFinderTest.java | 165 +++++++++++++++++++
3 files changed, 357 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4cc09b0a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsUtil.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsUtil.java b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsUtil.java
index 1101b34..685f81d 100644
--- a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsUtil.java
+++ b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsUtil.java
@@ -31,6 +31,7 @@ import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -41,10 +42,12 @@ import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.net.Protocol;
+import org.apache.brooklyn.util.net.ReachableSocketFinder;
import org.apache.brooklyn.util.ssh.BashCommands;
import org.apache.brooklyn.util.ssh.IptablesCommands;
import org.apache.brooklyn.util.ssh.IptablesCommands.Chain;
import org.apache.brooklyn.util.ssh.IptablesCommands.Policy;
+import org.apache.brooklyn.util.time.Duration;
import org.jclouds.Constants;
import org.jclouds.ContextBuilder;
import org.jclouds.aws.ec2.AWSEC2Api;
@@ -68,7 +71,6 @@ import org.jclouds.encryption.bouncycastle.config.BouncyCastleCryptoModule;
import org.jclouds.logging.slf4j.config.SLF4JLoggingModule;
import org.jclouds.scriptbuilder.domain.Statement;
import org.jclouds.scriptbuilder.domain.Statements;
-import org.jclouds.ssh.SshClient;
import org.jclouds.sshj.config.SshjSshClientModule;
import org.jclouds.util.Predicates2;
import org.slf4j.Logger;
@@ -80,11 +82,15 @@ import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
+import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
+import com.google.common.net.HostAndPort;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Module;
public class JcloudsUtil implements JcloudsLocationConfig {
@@ -319,8 +325,11 @@ public class JcloudsUtil implements JcloudsLocationConfig {
interpret("chmod 600 /root/.ssh/authorized_keys"));
}
+ /**
+ * @deprecated since 0.9.0; use {@link #getFirstReachableAddress(NodeMetadata, Duration)}
+ */
public static String getFirstReachableAddress(ComputeServiceContext context, NodeMetadata node) {
- // To pick the address, it relies on jclouds `sshForNode().apply(Node)` to check all IPs of node (private+public),
+ // Previously this called jclouds `sshForNode().apply(Node)` to check all IPs of node (private+public),
// to find one that is reachable. It does `openSocketFinder.findOpenSocketOnNode(node, node.getLoginPort(), ...)`.
// This keeps trying for time org.jclouds.compute.reference.ComputeServiceConstants.Timeouts.portOpen.
// TODO Want to configure this timeout here.
@@ -333,21 +342,38 @@ public class JcloudsUtil implements JcloudsLocationConfig {
// https://issues.apache.org/jira/browse/WHIRR-420
// jclouds.ssh.max-retries
// jclouds.ssh.retry-auth
-
- SshClient client;
+ //
+ // With `sshForNode`, we'd seen exceptions:
+ // java.lang.IllegalStateException: Optional.get() cannot be called on an absent value
+ // from org.jclouds.crypto.ASN1Codec.createASN1Sequence(ASN1Codec.java:86), if the ssh key has a passphrase, against AWS.
+ // And others reported:
+ // java.lang.IllegalArgumentException: DER length more than 4 bytes
+ // when using a key with a passphrase (perhaps from other clouds?); not sure if that's this callpath or a different one.
+
+ return getFirstReachableAddress(node, Duration.FIVE_MINUTES);
+ }
+
+ public static String getFirstReachableAddress(NodeMetadata node, Duration timeout) {
+ final int port = node.getLoginPort();
+ List<HostAndPort> sockets = FluentIterable
+ .from(Iterables.concat(node.getPublicAddresses(), node.getPrivateAddresses()))
+ .transform(new Function<String, HostAndPort>() {
+ @Override public HostAndPort apply(String input) {
+ return HostAndPort.fromParts(input, port);
+ }})
+ .toList();
+
+ ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
try {
- client = context.utils().sshForNode().apply(node);
+ ReachableSocketFinder finder = new ReachableSocketFinder(executor);
+ HostAndPort result = finder.findOpenSocketOnNode(sockets, timeout);
+ return result.getHostText();
} catch (Exception e) {
Exceptions.propagateIfFatal(e);
- /* i've seen: java.lang.IllegalStateException: Optional.get() cannot be called on an absent value
- * from org.jclouds.crypto.ASN1Codec.createASN1Sequence(ASN1Codec.java:86), if the ssh key has a passphrase, against AWS.
- *
- * others have reported: java.lang.IllegalArgumentException: DER length more than 4 bytes
- * when using a key with a passphrase (perhaps from other clouds?); not sure if that's this callpath or a different one.
- */
throw new IllegalStateException("Unable to connect SshClient to "+node+"; check that the node is accessible and that the SSH key exists and is correctly configured, including any passphrase defined", e);
+ } finally {
+ executor.shutdownNow();
}
- return client.getHostAddress();
}
// Suggest at least 15 minutes for timeout
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4cc09b0a/utils/common/src/main/java/org/apache/brooklyn/util/net/ReachableSocketFinder.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/net/ReachableSocketFinder.java b/utils/common/src/main/java/org/apache/brooklyn/util/net/ReachableSocketFinder.java
new file mode 100644
index 0000000..716ad72
--- /dev/null
+++ b/utils/common/src/main/java/org/apache/brooklyn/util/net/ReachableSocketFinder.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.brooklyn.util.net;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
+import org.apache.brooklyn.util.repeat.Repeater;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.net.HostAndPort;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+/**
+ * For finding an open/reachable ip:port for a node.
+ */
+public class ReachableSocketFinder {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReachableSocketFinder.class);
+
+ private final Predicate<HostAndPort> socketTester;
+ private final ListeningExecutorService userExecutor;
+
+ public ReachableSocketFinder(ListeningExecutorService userExecutor) {
+ this(
+ new Predicate<HostAndPort>() {
+ @Override public boolean apply(HostAndPort input) {
+ return Networking.isReachable(input);
+ }},
+ userExecutor);
+ }
+
+ public ReachableSocketFinder(Predicate<HostAndPort> socketTester, ListeningExecutorService userExecutor) {
+ this.socketTester = checkNotNull(socketTester, "socketTester");
+ this.userExecutor = checkNotNull(userExecutor, "userExecutor");
+ }
+
+ /**
+ *
+ * @param sockets The host-and-ports to test
+ * @param timeout Max time to try to connect to the ip:port
+ *
+ * @return The reachable ip:port
+ * @throws NoSuchElementException If no ports accessible within the given time
+ * @throws NullPointerException If the sockets or duration is null
+ * @throws IllegalStateException If the sockets to test is empty
+ */
+ public HostAndPort findOpenSocketOnNode(final Collection<? extends HostAndPort> sockets, Duration timeout) {
+ checkNotNull(sockets, "sockets");
+ checkState(sockets.size() > 0, "No hostAndPort sockets supplied");
+
+ LOG.debug("blocking on any reachable socket in {} for {}", sockets, timeout);
+
+ final AtomicReference<HostAndPort> result = new AtomicReference<HostAndPort>();
+ boolean passed = Repeater.create("socket-reachable")
+ .limitTimeTo(timeout)
+ .backoffTo(Duration.FIVE_SECONDS)
+ .until(new Callable<Boolean>() {
+ public Boolean call() {
+ Optional<HostAndPort> reachableSocket = tryReachable(sockets, Duration.seconds(2));
+ if (reachableSocket.isPresent()) {
+ result.compareAndSet(null, reachableSocket.get());
+ return true;
+ }
+ return false;
+ }})
+ .run();
+
+ if (passed) {
+ LOG.debug("<< socket {} opened", result);
+ assert result.get() != null;
+ return result.get();
+ } else {
+ LOG.warn("No sockets in {} reachable after {}", sockets, timeout);
+ throw new NoSuchElementException("could not connect to any socket in " + sockets);
+ }
+ }
+
+ /**
+ * Checks if any any of the given HostAndPorts are reachable. It checks them all concurrently, and
+ * returns the first that is reachable (or Optional.absent).
+ */
+ private Optional<HostAndPort> tryReachable(Collection<? extends HostAndPort> sockets, Duration timeout) {
+ final AtomicReference<HostAndPort> reachableSocket = new AtomicReference<HostAndPort>();
+ final CountDownLatch latch = new CountDownLatch(1);
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+ for (final HostAndPort socket : sockets) {
+ futures.add(userExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (socketTester.apply(socket)) {
+ reachableSocket.compareAndSet(null, socket);
+ latch.countDown();
+ }
+ } catch (RuntimeInterruptedException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ LOG.warn("Error checking reachability of ip:port "+socket, e);
+ }
+ }}));
+ }
+
+ ListenableFuture<List<Object>> compoundFuture = Futures.successfulAsList(futures);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ try {
+ while (reachableSocket.get() == null && !compoundFuture.isDone() && timeout.isLongerThan(stopwatch)) {
+ latch.await(50, TimeUnit.MILLISECONDS);
+ }
+ return Optional.fromNullable(reachableSocket.get());
+
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ } finally {
+ for (Future<?> future : futures) {
+ future.cancel(true);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4cc09b0a/utils/common/src/test/java/org/apache/brooklyn/util/net/ReachableSocketFinderTest.java
----------------------------------------------------------------------
diff --git a/utils/common/src/test/java/org/apache/brooklyn/util/net/ReachableSocketFinderTest.java b/utils/common/src/test/java/org/apache/brooklyn/util/net/ReachableSocketFinderTest.java
new file mode 100644
index 0000000..16228ef
--- /dev/null
+++ b/utils/common/src/test/java/org/apache/brooklyn/util/net/ReachableSocketFinderTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.brooklyn.util.net;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.javalang.JavaClassNames;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.net.HostAndPort;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class ReachableSocketFinderTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReachableSocketFinderTest.class);
+
+ private HostAndPort socket1;
+ private HostAndPort socket2;
+ private Map<HostAndPort, Boolean> reachabilityResults;
+ private ListeningExecutorService executor;
+ private Predicate<HostAndPort> socketTester;
+ private ReachableSocketFinder finder;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ socket1 = HostAndPort.fromParts("1.1.1.1", 1111);
+ socket2 = HostAndPort.fromParts("1.1.1.2", 1112);
+ reachabilityResults = Maps.newConcurrentMap();
+ executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+ socketTester = new Predicate<HostAndPort>() {
+ @Override public boolean apply(HostAndPort input) {
+ return Boolean.TRUE.equals(reachabilityResults.get(input));
+ }
+ };
+
+ finder = new ReachableSocketFinder(socketTester, executor);
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (executor != null) executor.shutdownNow();
+ }
+
+ @Test(expectedExceptions=IllegalStateException.class)
+ public void testWhenNoSocketsThrowsIllegalState() throws Exception {
+ finder.findOpenSocketOnNode(ImmutableList.<HostAndPort>of(), Duration.TEN_SECONDS);
+ }
+
+ @Test
+ public void testReturnsReachableSocket() throws Exception {
+ reachabilityResults.put(socket1, true);
+ reachabilityResults.put(socket2, false);
+ assertEquals(finder.findOpenSocketOnNode(ImmutableList.<HostAndPort>of(socket1, socket2), Duration.TEN_SECONDS), socket1);
+
+ reachabilityResults.put(socket1, false);
+ reachabilityResults.put(socket2, true);
+ assertEquals(finder.findOpenSocketOnNode(ImmutableList.<HostAndPort>of(socket1, socket2), Duration.TEN_SECONDS), socket2);
+ }
+
+ @Test
+ public void testPollsUntilPortReachable() throws Exception {
+ reachabilityResults.put(socket1, false);
+ reachabilityResults.put(socket2, false);
+ final ListenableFuture<HostAndPort> future = executor.submit(new Callable<HostAndPort>() {
+ @Override public HostAndPort call() throws Exception {
+ return finder.findOpenSocketOnNode(ImmutableList.<HostAndPort>of(socket1, socket2), Duration.TEN_SECONDS);
+ }});
+
+ // Should keep trying
+ Asserts.succeedsContinually(new Runnable() {
+ @Override public void run() {
+ assertFalse(future.isDone());
+ }});
+
+ // When port is reached, it completes
+ reachabilityResults.put(socket1, true);
+ assertEquals(future.get(30, TimeUnit.SECONDS), socket1);
+ }
+
+ // Mark as integration, as can't rely (in Apache infra) for a port to stay unused during test!
+ @Test(groups="Integration")
+ public void testReturnsRealReachableSocket() throws Exception {
+ ReachableSocketFinder realFinder = new ReachableSocketFinder(executor);
+ ServerSocket socket = connectToPort();
+ try {
+ HostAndPort addr = HostAndPort.fromParts(socket.getInetAddress().getHostAddress(), socket.getLocalPort());
+ HostAndPort wrongAddr = HostAndPort.fromParts(socket.getInetAddress().getHostAddress(), findAvailablePort());
+
+ assertEquals(realFinder.findOpenSocketOnNode(ImmutableList.of(addr, wrongAddr), Duration.ONE_MINUTE), addr);
+ } finally {
+ if (socket != null) {
+ socket.close();
+ }
+ }
+ }
+
+ // Mark as integration, as can't rely (in Apache infra) for a port to stay unused during test!
+ // And slow test - takes 5 seconds.
+ @Test(groups="Integration")
+ public void testFailsIfRealSocketUnreachable() throws Exception {
+ ReachableSocketFinder realFinder = new ReachableSocketFinder(executor);
+ HostAndPort wrongAddr = HostAndPort.fromParts(Networking.getLocalHost().getHostAddress(), findAvailablePort());
+
+ try {
+ HostAndPort result = realFinder.findOpenSocketOnNode(ImmutableList.of(wrongAddr), Duration.FIVE_SECONDS);
+ fail("Expected failure, but got "+result);
+ } catch (NoSuchElementException e) {
+ // success
+ }
+ }
+
+ private ServerSocket connectToPort() throws Exception {
+ ServerSocket result = new ServerSocket(0);
+ LOG.info("Acquired port "+result+" for test "+JavaClassNames.niceClassAndMethod());
+ return result;
+ }
+
+ private int findAvailablePort() throws Exception {
+ final int startPort = 58767;
+ final int endPort = 60000;
+ int port = startPort;
+ do {
+ if (Networking.isPortAvailable(port)) {
+ return port;
+ }
+ port++;
+ // repeat until we can get a port
+ } while (port <= endPort);
+ throw new Exception("could not get a port in range "+startPort+"-"+endPort);
+ }
+}