You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by dr...@apache.org on 2017/09/19 15:17:04 UTC

[1/2] brooklyn-server git commit: BROOKLYN-533: adds maxConcurrentMachineDeletions

Repository: brooklyn-server
Updated Branches:
  refs/heads/master d389e40d5 -> 7bdae1bae


BROOKLYN-533: adds maxConcurrentMachineDeletions


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/c8f470c0
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/c8f470c0
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/c8f470c0

Branch: refs/heads/master
Commit: c8f470c097db6ae8892c39ce2aeb304789bf8ddd
Parents: e85d264
Author: Aled Sage <al...@gmail.com>
Authored: Mon Sep 18 12:11:32 2017 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Mon Sep 18 13:23:35 2017 +0100

----------------------------------------------------------------------
 .../location/jclouds/JcloudsLocation.java       |  73 +++++-
 .../api/JcloudsLocationConfigPublic.java        |   6 +
 .../JcloudsMaxConcurrencyStubbedTest.java       | 223 +++++++++++++++++++
 3 files changed, 292 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c8f470c0/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java
index a914847..8c01e24 100644
--- a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java
+++ b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java
@@ -23,7 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
 import static org.apache.brooklyn.util.ssh.BashCommands.sbinPath;
-import static org.jclouds.compute.predicates.NodePredicates.*;
+import static org.jclouds.compute.predicates.NodePredicates.withIds;
 import static org.jclouds.util.Throwables2.getFirstThrowableOfType;
 
 import java.io.ByteArrayOutputStream;
@@ -48,7 +48,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import javax.xml.ws.WebServiceException;
 
-import com.google.common.primitives.Ints;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.location.LocationSpec;
 import org.apache.brooklyn.api.location.MachineLocation;
@@ -146,7 +145,6 @@ import org.jclouds.compute.domain.OsFamily;
 import org.jclouds.compute.domain.Template;
 import org.jclouds.compute.domain.TemplateBuilder;
 import org.jclouds.compute.options.TemplateOptions;
-import org.jclouds.compute.predicates.NodePredicates;
 import org.jclouds.domain.Credentials;
 import org.jclouds.domain.LocationScope;
 import org.jclouds.domain.LoginCredentials;
@@ -183,6 +181,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Sets.SetView;
 import com.google.common.net.HostAndPort;
+import com.google.common.primitives.Ints;
 
 /**
  * For provisioning and managing VMs in a particular provider/region, using jclouds.
@@ -252,6 +251,14 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
             }
             config().set(MACHINE_CREATION_SEMAPHORE, new Semaphore(maxConcurrent, true));
         }
+
+        if (getConfig(MACHINE_DELETION_SEMAPHORE) == null) {
+            Integer maxConcurrent = getConfig(MAX_CONCURRENT_MACHINE_DELETIONS);
+            if (maxConcurrent == null || maxConcurrent < 1) {
+                throw new IllegalStateException(MAX_CONCURRENT_MACHINE_DELETIONS.getName() + " must be >= 1, but was "+maxConcurrent);
+            }
+            config().set(MACHINE_DELETION_SEMAPHORE, new Semaphore(maxConcurrent, true));
+        }
         return this;
     }
 
@@ -275,6 +282,7 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
                 .parent(this)
                 .configure(config().getLocalBag().getAllConfig())  // FIXME Should this just be inherited?
                 .configure(MACHINE_CREATION_SEMAPHORE, getMachineCreationSemaphore())
+                .configure(MACHINE_DELETION_SEMAPHORE, getMachineDeletionSemaphore())
                 .configure(newFlags));
     }
 
@@ -380,6 +388,10 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
         return checkNotNull(getConfig(MACHINE_CREATION_SEMAPHORE), MACHINE_CREATION_SEMAPHORE.getName());
     }
 
+    protected Semaphore getMachineDeletionSemaphore() {
+        return checkNotNull(getConfig(MACHINE_DELETION_SEMAPHORE), MACHINE_DELETION_SEMAPHORE.getName());
+    }
+
     protected CloudMachineNamer getCloudMachineNamer(ConfigBag config) {
         String namerClass = config.get(LocationConfigKeys.CLOUD_MACHINE_NAMER_CLASS);
         if (Strings.isNonBlank(namerClass)) {
@@ -2105,6 +2117,11 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
 
     @Override
     public void release(MachineLocation rawMachine) {
+        Duration preSemaphoreTimestamp = null;
+        Duration semaphoreTimestamp = null;
+        Duration destroyTimestamp = null;
+        Stopwatch destroyingStopwatch = Stopwatch.createStarted();
+
         String instanceId = vmInstanceIds.remove(rawMachine);
         if (instanceId == null) {
             LOG.info("Attempted release of unknown machine "+rawMachine+" in "+toString());
@@ -2142,14 +2159,34 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
         }
 
         try {
-            releaseNode(instanceId);
-        } catch (Exception e) {
-            LOG.error("Problem releasing machine "+machine+" in "+this+", instance id "+instanceId+
-                    "; ignoring and continuing, "
-                    + (tothrow==null ? "will throw subsequently" : "swallowing due to previous error")+": "+e, e);
-            if (tothrow==null) tothrow = e;
-        }
+            preSemaphoreTimestamp = Duration.of(destroyingStopwatch);
+            Semaphore machineDeletionSemaphore = getMachineDeletionSemaphore();
+            boolean acquired = machineDeletionSemaphore.tryAcquire(0, TimeUnit.SECONDS);
+            if (!acquired) {
+                LOG.info("Waiting in {} for machine-deletion permit ({} other queuing requests already)", new Object[] {this, machineDeletionSemaphore.getQueueLength()});
+                Stopwatch blockStopwatch = Stopwatch.createStarted();
+                machineDeletionSemaphore.acquire();
+                LOG.info("Acquired in {} machine-deletion permit, after waiting {}", this, Time.makeTimeStringRounded(blockStopwatch));
+            } else {
+                LOG.debug("Acquired in {} machine-deletion permit immediately", this);
+            }
+            semaphoreTimestamp = Duration.of(destroyingStopwatch);
 
+            try {
+                releaseNode(instanceId);
+                destroyTimestamp = Duration.of(destroyingStopwatch);
+            } catch (Exception e) {
+                LOG.error("Problem releasing machine "+machine+" in "+this+", instance id "+instanceId+
+                        "; ignoring and continuing, "
+                        + (tothrow==null ? "will throw subsequently" : "swallowing due to previous error")+": "+e, e);
+                if (tothrow==null) tothrow = e;
+            } finally {
+                machineDeletionSemaphore.release();
+            }
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        }
+        
         removeChild(machine);
 
         try {
@@ -2162,8 +2199,24 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
         }
         
         if (tothrow != null) {
+            LOG.error("Problem releasing machine " + machine + " (propagating) " 
+                    + " after "+Duration.of(destroyingStopwatch).toStringRounded()
+                    + (semaphoreTimestamp != null ? " ("
+                            + "semaphore obtained in "+Duration.of(semaphoreTimestamp).subtract(preSemaphoreTimestamp).toStringRounded()+";"
+                            + (destroyTimestamp != null ? " node destroyed in "+Duration.of(destroyTimestamp).subtract(semaphoreTimestamp).toStringRounded() : "")
+                            + ")"
+                            : "")
+                    + ": "+tothrow.getMessage());
+            
             throw Exceptions.propagate(tothrow);
         }
+        
+        String logMessage = "Released machine " + machine +":"
+                + " total time "+Duration.of(destroyingStopwatch).toStringRounded()
+                + " ("
+                + "semaphore obtained in "+Duration.of(semaphoreTimestamp).subtract(preSemaphoreTimestamp).toStringRounded()+";"
+                + " node destroyed in "+Duration.of(destroyTimestamp).subtract(semaphoreTimestamp).toStringRounded()+")";
+        LOG.info(logMessage);
     }
 
     protected void releaseSafely(MachineLocation machine) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c8f470c0/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/api/JcloudsLocationConfigPublic.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/api/JcloudsLocationConfigPublic.java b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/api/JcloudsLocationConfigPublic.java
index 55b0e64..8c9a3c4 100644
--- a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/api/JcloudsLocationConfigPublic.java
+++ b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/api/JcloudsLocationConfigPublic.java
@@ -228,9 +228,15 @@ public interface JcloudsLocationConfigPublic extends CloudLocationConfig {
     public static final ConfigKey<Integer> MAX_CONCURRENT_MACHINE_CREATIONS = ConfigKeys.newIntegerConfigKey(
             "maxConcurrentMachineCreations", "Maximum number of concurrent machine-creations", Integer.MAX_VALUE);
 
+    public static final ConfigKey<Integer> MAX_CONCURRENT_MACHINE_DELETIONS = ConfigKeys.newIntegerConfigKey(
+            "maxConcurrentMachineDeletions", "Maximum number of concurrent machine-deletions", Integer.MAX_VALUE);
+
     public static final ConfigKey<Semaphore> MACHINE_CREATION_SEMAPHORE = ConfigKeys.newConfigKey(
             Semaphore.class, "machineCreationSemaphore", "Semaphore for controlling concurrent machine creation", null);
     
+    public static final ConfigKey<Semaphore> MACHINE_DELETION_SEMAPHORE = ConfigKeys.newConfigKey(
+            Semaphore.class, "machineDeletionSemaphore", "Semaphore for controlling concurrent machine deletion", null);
+    
     @SuppressWarnings("serial")
     public static final ConfigKey<Map<String,Object>> TEMPLATE_OPTIONS = ConfigKeys.newConfigKey(
             new TypeToken<Map<String, Object>>() {}, "templateOptions", "Additional jclouds template options");

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c8f470c0/locations/jclouds/src/test/java/org/apache/brooklyn/location/jclouds/JcloudsMaxConcurrencyStubbedTest.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/test/java/org/apache/brooklyn/location/jclouds/JcloudsMaxConcurrencyStubbedTest.java b/locations/jclouds/src/test/java/org/apache/brooklyn/location/jclouds/JcloudsMaxConcurrencyStubbedTest.java
new file mode 100644
index 0000000..880048a
--- /dev/null
+++ b/locations/jclouds/src/test/java/org/apache/brooklyn/location/jclouds/JcloudsMaxConcurrencyStubbedTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.location.jclouds;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.location.jclouds.StubbedComputeServiceRegistry.AbstractNodeCreator;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.time.Duration;
+import org.jclouds.compute.domain.NodeMetadata;
+import org.jclouds.compute.domain.NodeMetadata.Status;
+import org.jclouds.compute.domain.NodeMetadataBuilder;
+import org.jclouds.compute.domain.Template;
+import org.jclouds.domain.LoginCredentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * Simulates the creation of a VM that has multiple IPs. Checks that we choose the right address.
+ */
+public class JcloudsMaxConcurrencyStubbedTest extends AbstractJcloudsStubbedUnitTest {
+
+    private static class ConcurrencyMonitor {
+        private final Object mutex = new Object();
+        private final AtomicInteger concurrentCalls = new AtomicInteger();
+        private final AtomicInteger maxConcurrentCalls = new AtomicInteger();
+        private CountDownLatch latch;
+        
+        public void setLatch(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        public int getMaxConcurrentCalls() {
+            return maxConcurrentCalls.get();
+        }
+        
+        public void onStart() {
+            synchronized (mutex) {
+                int concurrentCallCount = concurrentCalls.incrementAndGet();
+                if (concurrentCallCount > maxConcurrentCalls.get()) {
+                    maxConcurrentCalls.set(concurrentCallCount);
+                }
+            }
+            if (latch != null) {
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                    throw Exceptions.propagate(e);
+                }
+            }
+        }
+        
+        public void onEnd() {
+            synchronized (mutex) {
+                concurrentCalls.decrementAndGet();
+            }
+        }
+    }
+    
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(JcloudsMaxConcurrencyStubbedTest.class);
+    
+    private ListeningExecutorService executor;
+    private ConcurrencyMonitor creationConcurrencyMonitor;
+    private ConcurrencyMonitor deletionConcurrencyMonitor;
+    
+    @BeforeMethod(alwaysRun=true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+        creationConcurrencyMonitor = new ConcurrencyMonitor();
+        deletionConcurrencyMonitor = new ConcurrencyMonitor();
+    }
+    
+    @Override
+    public void tearDown() throws Exception {
+        try {
+            super.tearDown();
+        } finally {
+            if (executor != null) executor.shutdownNow();
+        }
+    }
+    
+    protected AbstractNodeCreator newNodeCreator() {
+        return new AbstractNodeCreator() {
+            @Override protected NodeMetadata newNode(String group, Template template) {
+                try {
+                    creationConcurrencyMonitor.onStart();
+                    
+                    NodeMetadata result = new NodeMetadataBuilder()
+                            .id("myid")
+                            .credentials(LoginCredentials.builder().identity("myuser").credential("mypassword").build())
+                            .loginPort(22)
+                            .status(Status.RUNNING)
+                            .publicAddresses(ImmutableList.of("173.194.32.123"))
+                            .privateAddresses(ImmutableList.of("172.168.10.11"))
+                            .build();
+                    return result;
+                } finally {
+                    creationConcurrencyMonitor.onEnd();
+                }
+            }
+            @Override public void destroyNode(String id) {
+                try {
+                    deletionConcurrencyMonitor.onStart();
+
+                    super.destroyNode(id);
+                } finally {
+                    deletionConcurrencyMonitor.onEnd();
+                }
+            }
+            @Override public Set<? extends NodeMetadata> destroyNodesMatching(Predicate<? super NodeMetadata> filter) {
+                try {
+                    deletionConcurrencyMonitor.onStart();
+
+                    return super.destroyNodesMatching(filter);
+                } finally {
+                    deletionConcurrencyMonitor.onEnd();
+                }
+            }
+            
+        };
+    }
+
+    @Test
+    public void testConcurrentCreateCalls() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        creationConcurrencyMonitor.setLatch(latch);
+        
+        initNodeCreatorAndJcloudsLocation(newNodeCreator(), ImmutableMap.of(JcloudsLocation.MAX_CONCURRENT_MACHINE_CREATIONS, 2));
+        
+        List<ListenableFuture<?>> futures = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            futures.add(executor.submit(new Callable<JcloudsSshMachineLocation>() {
+                public JcloudsSshMachineLocation call() throws Exception {
+                    return obtainMachine();
+                }}));
+        }
+        
+        assertMaxConcurrentCallsEventually(creationConcurrencyMonitor, 2);
+        assertMaxConcurrentCallsContinually(creationConcurrencyMonitor, 2);
+        latch.countDown();
+        Futures.allAsList(futures).get();
+    }
+    
+    @Test
+    public void testConcurrentDeletionCalls() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        deletionConcurrencyMonitor.setLatch(latch);
+        
+        initNodeCreatorAndJcloudsLocation(newNodeCreator(), ImmutableMap.of(JcloudsLocation.MAX_CONCURRENT_MACHINE_DELETIONS, 2));
+
+        List<JcloudsSshMachineLocation> machines = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            machines.add(obtainMachine());
+        }
+
+        List<ListenableFuture<?>> futures = new ArrayList<>();
+        for (final JcloudsSshMachineLocation machine : machines) {
+            futures.add(executor.submit(new Callable<Void>() {
+                public Void call() throws Exception {
+                    jcloudsLocation.release(machine);
+                    return null;
+                }}));
+        }
+        
+        assertMaxConcurrentCallsEventually(deletionConcurrencyMonitor, 2);
+        assertMaxConcurrentCallsContinually(deletionConcurrencyMonitor, 2);
+        latch.countDown();
+        Futures.allAsList(futures).get();
+    }
+    
+    void assertMaxConcurrentCallsEventually(ConcurrencyMonitor monitor, int expected) {
+        Asserts.succeedsEventually(new Runnable() {
+            public void run() {
+                assertEquals(monitor.getMaxConcurrentCalls(), expected);
+            }});
+    }
+    
+    void assertMaxConcurrentCallsContinually(ConcurrencyMonitor monitor, int expected) {
+        Asserts.succeedsContinually(MutableMap.of("timeout", Duration.millis(100)), new Runnable() {
+            public void run() {
+                assertEquals(monitor.getMaxConcurrentCalls(), expected);
+            }});
+    }
+}


[2/2] brooklyn-server git commit: This closes #828

Posted by dr...@apache.org.
This closes #828


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/7bdae1ba
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/7bdae1ba
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/7bdae1ba

Branch: refs/heads/master
Commit: 7bdae1baeb6784d9cb8c379d4fee337ea900a46c
Parents: d389e40 c8f470c
Author: Duncan Godwin <dr...@googlemail.com>
Authored: Tue Sep 19 16:16:27 2017 +0100
Committer: Duncan Godwin <dr...@googlemail.com>
Committed: Tue Sep 19 16:16:27 2017 +0100

----------------------------------------------------------------------
 .../location/jclouds/JcloudsLocation.java       |  73 +++++-
 .../api/JcloudsLocationConfigPublic.java        |   6 +
 .../JcloudsMaxConcurrencyStubbedTest.java       | 223 +++++++++++++++++++
 3 files changed, 292 insertions(+), 10 deletions(-)
----------------------------------------------------------------------