You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/11/17 19:28:47 UTC

[3/7] incubator-brooklyn git commit: Adds ReachableSocketFinder

Adds ReachableSocketFinder

Used by JcloudsLocation to determine which IP of the node is reachable.
Previously it used the jclouds’:
    managementContext.utils().sshForNode().apply(node)
but that fails with certain user/login configurations - e.g. when “node”
doesn’t have login-credentials, but the JcloudsLocation.obtain knows
about the credentials through some other configuration.

Code is based on the Jclouds ConcurrentOpenSocketFinder. However, it
is much simplified because we don’t abort-early if the VM’s status
goes to failed (we don’t concurrently poll the cloud-provider to find
out the state, as jclouds was doing).


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/4cc09b0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/4cc09b0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/4cc09b0a

Branch: refs/heads/master
Commit: 4cc09b0ae049d6f0c78ec691993f80fd8e82765c
Parents: af39886
Author: Aled Sage <al...@gmail.com>
Authored: Mon Nov 9 08:13:58 2015 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 17 17:49:26 2015 +0000

----------------------------------------------------------------------
 .../brooklyn/location/jclouds/JcloudsUtil.java  |  50 ++++--
 .../util/net/ReachableSocketFinder.java         | 154 +++++++++++++++++
 .../util/net/ReachableSocketFinderTest.java     | 165 +++++++++++++++++++
 3 files changed, 357 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4cc09b0a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsUtil.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsUtil.java b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsUtil.java
index 1101b34..685f81d 100644
--- a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsUtil.java
+++ b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsUtil.java
@@ -31,6 +31,7 @@ import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -41,10 +42,12 @@ import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.net.Protocol;
+import org.apache.brooklyn.util.net.ReachableSocketFinder;
 import org.apache.brooklyn.util.ssh.BashCommands;
 import org.apache.brooklyn.util.ssh.IptablesCommands;
 import org.apache.brooklyn.util.ssh.IptablesCommands.Chain;
 import org.apache.brooklyn.util.ssh.IptablesCommands.Policy;
+import org.apache.brooklyn.util.time.Duration;
 import org.jclouds.Constants;
 import org.jclouds.ContextBuilder;
 import org.jclouds.aws.ec2.AWSEC2Api;
@@ -68,7 +71,6 @@ import org.jclouds.encryption.bouncycastle.config.BouncyCastleCryptoModule;
 import org.jclouds.logging.slf4j.config.SLF4JLoggingModule;
 import org.jclouds.scriptbuilder.domain.Statement;
 import org.jclouds.scriptbuilder.domain.Statements;
-import org.jclouds.ssh.SshClient;
 import org.jclouds.sshj.config.SshjSshClientModule;
 import org.jclouds.util.Predicates2;
 import org.slf4j.Logger;
@@ -80,11 +82,15 @@ import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
+import com.google.common.net.HostAndPort;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Module;
 
 public class JcloudsUtil implements JcloudsLocationConfig {
@@ -319,8 +325,11 @@ public class JcloudsUtil implements JcloudsLocationConfig {
                 interpret("chmod 600 /root/.ssh/authorized_keys"));
     }
 
+    /**
+     * @deprecated since 0.9.0; use {@link #getFirstReachableAddress(NodeMetadata, Duration)}
+     */
     public static String getFirstReachableAddress(ComputeServiceContext context, NodeMetadata node) {
-        // To pick the address, it relies on jclouds `sshForNode().apply(Node)` to check all IPs of node (private+public),
+        // Previously this called jclouds `sshForNode().apply(Node)` to check all IPs of node (private+public),
         // to find one that is reachable. It does `openSocketFinder.findOpenSocketOnNode(node, node.getLoginPort(), ...)`.
         // This keeps trying for time org.jclouds.compute.reference.ComputeServiceConstants.Timeouts.portOpen.
         // TODO Want to configure this timeout here.
@@ -333,21 +342,38 @@ public class JcloudsUtil implements JcloudsLocationConfig {
         //     https://issues.apache.org/jira/browse/WHIRR-420
         //     jclouds.ssh.max-retries
         //     jclouds.ssh.retry-auth
-
-        SshClient client;
+        //
+        // With `sshForNode`, we'd seen exceptions:
+        //     java.lang.IllegalStateException: Optional.get() cannot be called on an absent value
+        //     from org.jclouds.crypto.ASN1Codec.createASN1Sequence(ASN1Codec.java:86), if the ssh key has a passphrase, against AWS.
+        // And others reported:
+        //     java.lang.IllegalArgumentException: DER length more than 4 bytes
+        //     when using a key with a passphrase (perhaps from other clouds?); not sure if that's this callpath or a different one.
+
+        return getFirstReachableAddress(node, Duration.FIVE_MINUTES);
+    }
+    
+    public static String getFirstReachableAddress(NodeMetadata node, Duration timeout) {
+        final int port = node.getLoginPort();
+        List<HostAndPort> sockets = FluentIterable
+                .from(Iterables.concat(node.getPublicAddresses(), node.getPrivateAddresses()))
+                .transform(new Function<String, HostAndPort>() {
+                        @Override public HostAndPort apply(String input) {
+                            return HostAndPort.fromParts(input, port);
+                        }})
+                .toList();
+        
+        ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
         try {
-            client = context.utils().sshForNode().apply(node);
+            ReachableSocketFinder finder = new ReachableSocketFinder(executor);
+            HostAndPort result = finder.findOpenSocketOnNode(sockets, timeout);
+            return result.getHostText();
         } catch (Exception e) {
             Exceptions.propagateIfFatal(e);
-            /* i've seen: java.lang.IllegalStateException: Optional.get() cannot be called on an absent value
-             * from org.jclouds.crypto.ASN1Codec.createASN1Sequence(ASN1Codec.java:86), if the ssh key has a passphrase, against AWS.
-             *
-             * others have reported: java.lang.IllegalArgumentException: DER length more than 4 bytes
-             * when using a key with a passphrase (perhaps from other clouds?); not sure if that's this callpath or a different one.
-             */
             throw new IllegalStateException("Unable to connect SshClient to "+node+"; check that the node is accessible and that the SSH key exists and is correctly configured, including any passphrase defined", e);
+        } finally {
+            executor.shutdownNow();
         }
-        return client.getHostAddress();
     }
 
     // Suggest at least 15 minutes for timeout

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4cc09b0a/utils/common/src/main/java/org/apache/brooklyn/util/net/ReachableSocketFinder.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/net/ReachableSocketFinder.java b/utils/common/src/main/java/org/apache/brooklyn/util/net/ReachableSocketFinder.java
new file mode 100644
index 0000000..716ad72
--- /dev/null
+++ b/utils/common/src/main/java/org/apache/brooklyn/util/net/ReachableSocketFinder.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.brooklyn.util.net;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
+import org.apache.brooklyn.util.repeat.Repeater;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.net.HostAndPort;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+/**
+ * For finding an open/reachable ip:port for a node.
+ */
+public class ReachableSocketFinder {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReachableSocketFinder.class);
+
+    private final Predicate<HostAndPort> socketTester;
+    private final ListeningExecutorService userExecutor;
+
+    public ReachableSocketFinder(ListeningExecutorService userExecutor) {
+        this(
+                new Predicate<HostAndPort>() {
+                    @Override public boolean apply(HostAndPort input) {
+                        return Networking.isReachable(input);
+                    }}, 
+                userExecutor);
+    }
+
+    public ReachableSocketFinder(Predicate<HostAndPort> socketTester, ListeningExecutorService userExecutor) {
+        this.socketTester = checkNotNull(socketTester, "socketTester");
+        this.userExecutor = checkNotNull(userExecutor, "userExecutor");
+    }
+
+    /**
+     * 
+     * @param sockets The host-and-ports to test
+     * @param timeout Max time to try to connect to the ip:port
+     * 
+     * @return The reachable ip:port
+     * @throws NoSuchElementException If no ports accessible within the given time
+     * @throws NullPointerException  If the sockets or duration is null
+     * @throws IllegalStateException  If the sockets to test is empty
+     */
+    public HostAndPort findOpenSocketOnNode(final Collection<? extends HostAndPort> sockets, Duration timeout) {
+        checkNotNull(sockets, "sockets");
+        checkState(sockets.size() > 0, "No hostAndPort sockets supplied");
+        
+        LOG.debug("blocking on any reachable socket in {} for {}", sockets, timeout);
+
+        final AtomicReference<HostAndPort> result = new AtomicReference<HostAndPort>();
+        boolean passed = Repeater.create("socket-reachable")
+                .limitTimeTo(timeout)
+                .backoffTo(Duration.FIVE_SECONDS)
+                .until(new Callable<Boolean>() {
+                        public Boolean call() {
+                            Optional<HostAndPort> reachableSocket = tryReachable(sockets, Duration.seconds(2));
+                            if (reachableSocket.isPresent()) {
+                                result.compareAndSet(null, reachableSocket.get());
+                                return true;
+                            }
+                            return false;
+                        }})
+                .run();
+
+        if (passed) {
+            LOG.debug("<< socket {} opened", result);
+            assert result.get() != null;
+            return result.get();
+        } else {
+            LOG.warn("No sockets in {} reachable after {}", sockets, timeout);
+            throw new NoSuchElementException("could not connect to any socket in " + sockets);
+        }
+    }
+
+    /**
+     * Checks if any any of the given HostAndPorts are reachable. It checks them all concurrently, and
+     * returns the first that is reachable (or Optional.absent).
+     */
+    private Optional<HostAndPort> tryReachable(Collection<? extends HostAndPort> sockets, Duration timeout) {
+        final AtomicReference<HostAndPort> reachableSocket = new AtomicReference<HostAndPort>();
+        final CountDownLatch latch = new CountDownLatch(1);
+        List<ListenableFuture<?>> futures = Lists.newArrayList();
+        for (final HostAndPort socket : sockets) {
+            futures.add(userExecutor.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            if (socketTester.apply(socket)) {
+                                reachableSocket.compareAndSet(null, socket);
+                                latch.countDown();
+                            }
+                        } catch (RuntimeInterruptedException e) {
+                            throw e;
+                        } catch (RuntimeException e) {
+                            LOG.warn("Error checking reachability of ip:port "+socket, e);
+                        }
+                    }}));
+        }
+        
+        ListenableFuture<List<Object>> compoundFuture = Futures.successfulAsList(futures);
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        try {
+            while (reachableSocket.get() == null && !compoundFuture.isDone() && timeout.isLongerThan(stopwatch)) {
+                latch.await(50, TimeUnit.MILLISECONDS);
+            }            
+            return Optional.fromNullable(reachableSocket.get());
+            
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        } finally {
+            for (Future<?> future : futures) {
+                future.cancel(true);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4cc09b0a/utils/common/src/test/java/org/apache/brooklyn/util/net/ReachableSocketFinderTest.java
----------------------------------------------------------------------
diff --git a/utils/common/src/test/java/org/apache/brooklyn/util/net/ReachableSocketFinderTest.java b/utils/common/src/test/java/org/apache/brooklyn/util/net/ReachableSocketFinderTest.java
new file mode 100644
index 0000000..16228ef
--- /dev/null
+++ b/utils/common/src/test/java/org/apache/brooklyn/util/net/ReachableSocketFinderTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.brooklyn.util.net;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.javalang.JavaClassNames;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.net.HostAndPort;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class ReachableSocketFinderTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReachableSocketFinderTest.class);
+
+    private HostAndPort socket1;
+    private HostAndPort socket2;
+    private Map<HostAndPort, Boolean> reachabilityResults;
+    private ListeningExecutorService executor;
+    private Predicate<HostAndPort> socketTester;
+    private ReachableSocketFinder finder;
+
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        socket1 = HostAndPort.fromParts("1.1.1.1", 1111);
+        socket2 = HostAndPort.fromParts("1.1.1.2", 1112);
+        reachabilityResults = Maps.newConcurrentMap();
+        executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+        socketTester = new Predicate<HostAndPort>() {
+            @Override public boolean apply(HostAndPort input) {
+                return Boolean.TRUE.equals(reachabilityResults.get(input));
+            }
+        };
+        
+        finder = new ReachableSocketFinder(socketTester, executor);
+    }
+
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (executor != null) executor.shutdownNow();
+    }
+    
+    @Test(expectedExceptions=IllegalStateException.class)
+    public void testWhenNoSocketsThrowsIllegalState() throws Exception {
+        finder.findOpenSocketOnNode(ImmutableList.<HostAndPort>of(), Duration.TEN_SECONDS);
+    }
+    
+    @Test
+    public void testReturnsReachableSocket() throws Exception {
+        reachabilityResults.put(socket1, true);
+        reachabilityResults.put(socket2, false);
+        assertEquals(finder.findOpenSocketOnNode(ImmutableList.<HostAndPort>of(socket1, socket2), Duration.TEN_SECONDS), socket1);
+        
+        reachabilityResults.put(socket1, false);
+        reachabilityResults.put(socket2, true);
+        assertEquals(finder.findOpenSocketOnNode(ImmutableList.<HostAndPort>of(socket1, socket2), Duration.TEN_SECONDS), socket2);
+    }
+    
+    @Test
+    public void testPollsUntilPortReachable() throws Exception {
+        reachabilityResults.put(socket1, false);
+        reachabilityResults.put(socket2, false);
+        final ListenableFuture<HostAndPort> future = executor.submit(new Callable<HostAndPort>() {
+                @Override public HostAndPort call() throws Exception {
+                    return finder.findOpenSocketOnNode(ImmutableList.<HostAndPort>of(socket1, socket2), Duration.TEN_SECONDS);
+                }});
+
+        // Should keep trying
+        Asserts.succeedsContinually(new Runnable() {
+            @Override public void run() {
+                assertFalse(future.isDone());
+            }});
+
+        // When port is reached, it completes
+        reachabilityResults.put(socket1, true);
+        assertEquals(future.get(30, TimeUnit.SECONDS), socket1);
+    }
+    
+    // Mark as integration, as can't rely (in Apache infra) for a port to stay unused during test!
+    @Test(groups="Integration")
+    public void testReturnsRealReachableSocket() throws Exception {
+        ReachableSocketFinder realFinder = new ReachableSocketFinder(executor);
+        ServerSocket socket = connectToPort();
+        try {
+            HostAndPort addr = HostAndPort.fromParts(socket.getInetAddress().getHostAddress(), socket.getLocalPort());
+            HostAndPort wrongAddr = HostAndPort.fromParts(socket.getInetAddress().getHostAddress(), findAvailablePort());
+            
+            assertEquals(realFinder.findOpenSocketOnNode(ImmutableList.of(addr, wrongAddr), Duration.ONE_MINUTE), addr);
+        } finally {
+            if (socket != null) {
+                socket.close();
+            }
+        }
+    }
+
+    // Mark as integration, as can't rely (in Apache infra) for a port to stay unused during test!
+    // And slow test - takes 5 seconds.
+    @Test(groups="Integration")
+    public void testFailsIfRealSocketUnreachable() throws Exception {
+        ReachableSocketFinder realFinder = new ReachableSocketFinder(executor);
+        HostAndPort wrongAddr = HostAndPort.fromParts(Networking.getLocalHost().getHostAddress(), findAvailablePort());
+        
+        try {
+            HostAndPort result = realFinder.findOpenSocketOnNode(ImmutableList.of(wrongAddr), Duration.FIVE_SECONDS);
+            fail("Expected failure, but got "+result);
+        } catch (NoSuchElementException e) {
+            // success
+        }
+    }
+
+    private ServerSocket connectToPort() throws Exception {
+        ServerSocket result = new ServerSocket(0);
+        LOG.info("Acquired port "+result+" for test "+JavaClassNames.niceClassAndMethod());
+        return result;
+    }
+    
+    private int findAvailablePort() throws Exception {
+        final int startPort = 58767;
+        final int endPort = 60000;
+        int port = startPort;
+        do {
+            if (Networking.isPortAvailable(port)) {
+                return port;
+            }
+            port++;
+            // repeat until we can get a port
+        } while (port <= endPort);
+        throw new Exception("could not get a port in range "+startPort+"-"+endPort);
+    }
+}