You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by dr...@apache.org on 2017/09/19 15:17:04 UTC
[1/2] brooklyn-server git commit: BROOKLYN-533: adds
maxConcurrentMachineDeletions
Repository: brooklyn-server
Updated Branches:
refs/heads/master d389e40d5 -> 7bdae1bae
BROOKLYN-533: adds maxConcurrentMachineDeletions
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/c8f470c0
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/c8f470c0
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/c8f470c0
Branch: refs/heads/master
Commit: c8f470c097db6ae8892c39ce2aeb304789bf8ddd
Parents: e85d264
Author: Aled Sage <al...@gmail.com>
Authored: Mon Sep 18 12:11:32 2017 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Mon Sep 18 13:23:35 2017 +0100
----------------------------------------------------------------------
.../location/jclouds/JcloudsLocation.java | 73 +++++-
.../api/JcloudsLocationConfigPublic.java | 6 +
.../JcloudsMaxConcurrencyStubbedTest.java | 223 +++++++++++++++++++
3 files changed, 292 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c8f470c0/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java
index a914847..8c01e24 100644
--- a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java
+++ b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java
@@ -23,7 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis;
import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
import static org.apache.brooklyn.util.ssh.BashCommands.sbinPath;
-import static org.jclouds.compute.predicates.NodePredicates.*;
+import static org.jclouds.compute.predicates.NodePredicates.withIds;
import static org.jclouds.util.Throwables2.getFirstThrowableOfType;
import java.io.ByteArrayOutputStream;
@@ -48,7 +48,6 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.xml.ws.WebServiceException;
-import com.google.common.primitives.Ints;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.location.LocationSpec;
import org.apache.brooklyn.api.location.MachineLocation;
@@ -146,7 +145,6 @@ import org.jclouds.compute.domain.OsFamily;
import org.jclouds.compute.domain.Template;
import org.jclouds.compute.domain.TemplateBuilder;
import org.jclouds.compute.options.TemplateOptions;
-import org.jclouds.compute.predicates.NodePredicates;
import org.jclouds.domain.Credentials;
import org.jclouds.domain.LocationScope;
import org.jclouds.domain.LoginCredentials;
@@ -183,6 +181,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
import com.google.common.net.HostAndPort;
+import com.google.common.primitives.Ints;
/**
* For provisioning and managing VMs in a particular provider/region, using jclouds.
@@ -252,6 +251,14 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
}
config().set(MACHINE_CREATION_SEMAPHORE, new Semaphore(maxConcurrent, true));
}
+
+ if (getConfig(MACHINE_DELETION_SEMAPHORE) == null) {
+ Integer maxConcurrent = getConfig(MAX_CONCURRENT_MACHINE_DELETIONS);
+ if (maxConcurrent == null || maxConcurrent < 1) {
+ throw new IllegalStateException(MAX_CONCURRENT_MACHINE_DELETIONS.getName() + " must be >= 1, but was "+maxConcurrent);
+ }
+ config().set(MACHINE_DELETION_SEMAPHORE, new Semaphore(maxConcurrent, true));
+ }
return this;
}
@@ -275,6 +282,7 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
.parent(this)
.configure(config().getLocalBag().getAllConfig()) // FIXME Should this just be inherited?
.configure(MACHINE_CREATION_SEMAPHORE, getMachineCreationSemaphore())
+ .configure(MACHINE_DELETION_SEMAPHORE, getMachineDeletionSemaphore())
.configure(newFlags));
}
@@ -380,6 +388,10 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
return checkNotNull(getConfig(MACHINE_CREATION_SEMAPHORE), MACHINE_CREATION_SEMAPHORE.getName());
}
+ protected Semaphore getMachineDeletionSemaphore() {
+ return checkNotNull(getConfig(MACHINE_DELETION_SEMAPHORE), MACHINE_DELETION_SEMAPHORE.getName());
+ }
+
protected CloudMachineNamer getCloudMachineNamer(ConfigBag config) {
String namerClass = config.get(LocationConfigKeys.CLOUD_MACHINE_NAMER_CLASS);
if (Strings.isNonBlank(namerClass)) {
@@ -2105,6 +2117,11 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
@Override
public void release(MachineLocation rawMachine) {
+ Duration preSemaphoreTimestamp = null;
+ Duration semaphoreTimestamp = null;
+ Duration destroyTimestamp = null;
+ Stopwatch destroyingStopwatch = Stopwatch.createStarted();
+
String instanceId = vmInstanceIds.remove(rawMachine);
if (instanceId == null) {
LOG.info("Attempted release of unknown machine "+rawMachine+" in "+toString());
@@ -2142,14 +2159,34 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
}
try {
- releaseNode(instanceId);
- } catch (Exception e) {
- LOG.error("Problem releasing machine "+machine+" in "+this+", instance id "+instanceId+
- "; ignoring and continuing, "
- + (tothrow==null ? "will throw subsequently" : "swallowing due to previous error")+": "+e, e);
- if (tothrow==null) tothrow = e;
- }
+ preSemaphoreTimestamp = Duration.of(destroyingStopwatch);
+ Semaphore machineDeletionSemaphore = getMachineDeletionSemaphore();
+ boolean acquired = machineDeletionSemaphore.tryAcquire(0, TimeUnit.SECONDS);
+ if (!acquired) {
+ LOG.info("Waiting in {} for machine-deletion permit ({} other queuing requests already)", new Object[] {this, machineDeletionSemaphore.getQueueLength()});
+ Stopwatch blockStopwatch = Stopwatch.createStarted();
+ machineDeletionSemaphore.acquire();
+ LOG.info("Acquired in {} machine-deletion permit, after waiting {}", this, Time.makeTimeStringRounded(blockStopwatch));
+ } else {
+ LOG.debug("Acquired in {} machine-deletion permit immediately", this);
+ }
+ semaphoreTimestamp = Duration.of(destroyingStopwatch);
+ try {
+ releaseNode(instanceId);
+ destroyTimestamp = Duration.of(destroyingStopwatch);
+ } catch (Exception e) {
+ LOG.error("Problem releasing machine "+machine+" in "+this+", instance id "+instanceId+
+ "; ignoring and continuing, "
+ + (tothrow==null ? "will throw subsequently" : "swallowing due to previous error")+": "+e, e);
+ if (tothrow==null) tothrow = e;
+ } finally {
+ machineDeletionSemaphore.release();
+ }
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+
removeChild(machine);
try {
@@ -2162,8 +2199,24 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
}
if (tothrow != null) {
+ LOG.error("Problem releasing machine " + machine + " (propagating) "
+ + " after "+Duration.of(destroyingStopwatch).toStringRounded()
+ + (semaphoreTimestamp != null ? " ("
+ + "semaphore obtained in "+Duration.of(semaphoreTimestamp).subtract(preSemaphoreTimestamp).toStringRounded()+";"
+ + (destroyTimestamp != null ? " node destroyed in "+Duration.of(destroyTimestamp).subtract(semaphoreTimestamp).toStringRounded() : "")
+ + ")"
+ : "")
+ + ": "+tothrow.getMessage());
+
throw Exceptions.propagate(tothrow);
}
+
+ String logMessage = "Released machine " + machine +":"
+ + " total time "+Duration.of(destroyingStopwatch).toStringRounded()
+ + " ("
+ + "semaphore obtained in "+Duration.of(semaphoreTimestamp).subtract(preSemaphoreTimestamp).toStringRounded()+";"
+ + " node destroyed in "+Duration.of(destroyTimestamp).subtract(semaphoreTimestamp).toStringRounded()+")";
+ LOG.info(logMessage);
}
protected void releaseSafely(MachineLocation machine) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c8f470c0/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/api/JcloudsLocationConfigPublic.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/api/JcloudsLocationConfigPublic.java b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/api/JcloudsLocationConfigPublic.java
index 55b0e64..8c9a3c4 100644
--- a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/api/JcloudsLocationConfigPublic.java
+++ b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/api/JcloudsLocationConfigPublic.java
@@ -228,9 +228,15 @@ public interface JcloudsLocationConfigPublic extends CloudLocationConfig {
public static final ConfigKey<Integer> MAX_CONCURRENT_MACHINE_CREATIONS = ConfigKeys.newIntegerConfigKey(
"maxConcurrentMachineCreations", "Maximum number of concurrent machine-creations", Integer.MAX_VALUE);
+ public static final ConfigKey<Integer> MAX_CONCURRENT_MACHINE_DELETIONS = ConfigKeys.newIntegerConfigKey(
+ "maxConcurrentMachineDeletions", "Maximum number of concurrent machine-deletions", Integer.MAX_VALUE);
+
public static final ConfigKey<Semaphore> MACHINE_CREATION_SEMAPHORE = ConfigKeys.newConfigKey(
Semaphore.class, "machineCreationSemaphore", "Semaphore for controlling concurrent machine creation", null);
+ public static final ConfigKey<Semaphore> MACHINE_DELETION_SEMAPHORE = ConfigKeys.newConfigKey(
+ Semaphore.class, "machineDeletionSemaphore", "Semaphore for controlling concurrent machine deletion", null);
+
@SuppressWarnings("serial")
public static final ConfigKey<Map<String,Object>> TEMPLATE_OPTIONS = ConfigKeys.newConfigKey(
new TypeToken<Map<String, Object>>() {}, "templateOptions", "Additional jclouds template options");
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c8f470c0/locations/jclouds/src/test/java/org/apache/brooklyn/location/jclouds/JcloudsMaxConcurrencyStubbedTest.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/test/java/org/apache/brooklyn/location/jclouds/JcloudsMaxConcurrencyStubbedTest.java b/locations/jclouds/src/test/java/org/apache/brooklyn/location/jclouds/JcloudsMaxConcurrencyStubbedTest.java
new file mode 100644
index 0000000..880048a
--- /dev/null
+++ b/locations/jclouds/src/test/java/org/apache/brooklyn/location/jclouds/JcloudsMaxConcurrencyStubbedTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.location.jclouds;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.location.jclouds.StubbedComputeServiceRegistry.AbstractNodeCreator;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.time.Duration;
+import org.jclouds.compute.domain.NodeMetadata;
+import org.jclouds.compute.domain.NodeMetadata.Status;
+import org.jclouds.compute.domain.NodeMetadataBuilder;
+import org.jclouds.compute.domain.Template;
+import org.jclouds.domain.LoginCredentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * Simulates the creation of a VM that has multiple IPs. Checks that we choose the right address.
+ */
+public class JcloudsMaxConcurrencyStubbedTest extends AbstractJcloudsStubbedUnitTest {
+
+ private static class ConcurrencyMonitor {
+ private final Object mutex = new Object();
+ private final AtomicInteger concurrentCalls = new AtomicInteger();
+ private final AtomicInteger maxConcurrentCalls = new AtomicInteger();
+ private CountDownLatch latch;
+
+ public void setLatch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ public int getMaxConcurrentCalls() {
+ return maxConcurrentCalls.get();
+ }
+
+ public void onStart() {
+ synchronized (mutex) {
+ int concurrentCallCount = concurrentCalls.incrementAndGet();
+ if (concurrentCallCount > maxConcurrentCalls.get()) {
+ maxConcurrentCalls.set(concurrentCallCount);
+ }
+ }
+ if (latch != null) {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+ }
+
+ public void onEnd() {
+ synchronized (mutex) {
+ concurrentCalls.decrementAndGet();
+ }
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private static final Logger LOG = LoggerFactory.getLogger(JcloudsMaxConcurrencyStubbedTest.class);
+
+ private ListeningExecutorService executor;
+ private ConcurrencyMonitor creationConcurrencyMonitor;
+ private ConcurrencyMonitor deletionConcurrencyMonitor;
+
+ @BeforeMethod(alwaysRun=true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+ creationConcurrencyMonitor = new ConcurrencyMonitor();
+ deletionConcurrencyMonitor = new ConcurrencyMonitor();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ try {
+ super.tearDown();
+ } finally {
+ if (executor != null) executor.shutdownNow();
+ }
+ }
+
+ protected AbstractNodeCreator newNodeCreator() {
+ return new AbstractNodeCreator() {
+ @Override protected NodeMetadata newNode(String group, Template template) {
+ try {
+ creationConcurrencyMonitor.onStart();
+
+ NodeMetadata result = new NodeMetadataBuilder()
+ .id("myid")
+ .credentials(LoginCredentials.builder().identity("myuser").credential("mypassword").build())
+ .loginPort(22)
+ .status(Status.RUNNING)
+ .publicAddresses(ImmutableList.of("173.194.32.123"))
+ .privateAddresses(ImmutableList.of("172.168.10.11"))
+ .build();
+ return result;
+ } finally {
+ creationConcurrencyMonitor.onEnd();
+ }
+ }
+ @Override public void destroyNode(String id) {
+ try {
+ deletionConcurrencyMonitor.onStart();
+
+ super.destroyNode(id);
+ } finally {
+ deletionConcurrencyMonitor.onEnd();
+ }
+ }
+ @Override public Set<? extends NodeMetadata> destroyNodesMatching(Predicate<? super NodeMetadata> filter) {
+ try {
+ deletionConcurrencyMonitor.onStart();
+
+ return super.destroyNodesMatching(filter);
+ } finally {
+ deletionConcurrencyMonitor.onEnd();
+ }
+ }
+
+ };
+ }
+
+ @Test
+ public void testConcurrentCreateCalls() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ creationConcurrencyMonitor.setLatch(latch);
+
+ initNodeCreatorAndJcloudsLocation(newNodeCreator(), ImmutableMap.of(JcloudsLocation.MAX_CONCURRENT_MACHINE_CREATIONS, 2));
+
+ List<ListenableFuture<?>> futures = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ futures.add(executor.submit(new Callable<JcloudsSshMachineLocation>() {
+ public JcloudsSshMachineLocation call() throws Exception {
+ return obtainMachine();
+ }}));
+ }
+
+ assertMaxConcurrentCallsEventually(creationConcurrencyMonitor, 2);
+ assertMaxConcurrentCallsContinually(creationConcurrencyMonitor, 2);
+ latch.countDown();
+ Futures.allAsList(futures).get();
+ }
+
+ @Test
+ public void testConcurrentDeletionCalls() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ deletionConcurrencyMonitor.setLatch(latch);
+
+ initNodeCreatorAndJcloudsLocation(newNodeCreator(), ImmutableMap.of(JcloudsLocation.MAX_CONCURRENT_MACHINE_DELETIONS, 2));
+
+ List<JcloudsSshMachineLocation> machines = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ machines.add(obtainMachine());
+ }
+
+ List<ListenableFuture<?>> futures = new ArrayList<>();
+ for (final JcloudsSshMachineLocation machine : machines) {
+ futures.add(executor.submit(new Callable<Void>() {
+ public Void call() throws Exception {
+ jcloudsLocation.release(machine);
+ return null;
+ }}));
+ }
+
+ assertMaxConcurrentCallsEventually(deletionConcurrencyMonitor, 2);
+ assertMaxConcurrentCallsContinually(deletionConcurrencyMonitor, 2);
+ latch.countDown();
+ Futures.allAsList(futures).get();
+ }
+
+ void assertMaxConcurrentCallsEventually(ConcurrencyMonitor monitor, int expected) {
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ assertEquals(monitor.getMaxConcurrentCalls(), expected);
+ }});
+ }
+
+ void assertMaxConcurrentCallsContinually(ConcurrencyMonitor monitor, int expected) {
+ Asserts.succeedsContinually(MutableMap.of("timeout", Duration.millis(100)), new Runnable() {
+ public void run() {
+ assertEquals(monitor.getMaxConcurrentCalls(), expected);
+ }});
+ }
+}
[2/2] brooklyn-server git commit: This closes #828
Posted by dr...@apache.org.
This closes #828
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/7bdae1ba
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/7bdae1ba
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/7bdae1ba
Branch: refs/heads/master
Commit: 7bdae1baeb6784d9cb8c379d4fee337ea900a46c
Parents: d389e40 c8f470c
Author: Duncan Godwin <dr...@googlemail.com>
Authored: Tue Sep 19 16:16:27 2017 +0100
Committer: Duncan Godwin <dr...@googlemail.com>
Committed: Tue Sep 19 16:16:27 2017 +0100
----------------------------------------------------------------------
.../location/jclouds/JcloudsLocation.java | 73 +++++-
.../api/JcloudsLocationConfigPublic.java | 6 +
.../JcloudsMaxConcurrencyStubbedTest.java | 223 +++++++++++++++++++
3 files changed, 292 insertions(+), 10 deletions(-)
----------------------------------------------------------------------