You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:49:20 UTC
[16/50] brooklyn-server git commit: Add
JcloudsLocation.MAX_CONCURRENT_MACHINE_CREATIONS
Add JcloudsLocation.MAX_CONCURRENT_MACHINE_CREATIONS
- Prevent more than the given number of concurrent calls to create
VMs for a given JcloudsLocation instance.
- Required in various clouds (sometimes, at least!) such as aws-ec2
where one gets http response 503 RequestLimitExceeded when trying to
provision 18 machines concurrently.
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/4265e664
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/4265e664
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/4265e664
Branch: refs/heads/0.6.0
Commit: 4265e66496e379f1d0a367cdcd5f5bcc4dfe7a12
Parents: 2fc1caf
Author: Aled Sage <al...@gmail.com>
Authored: Wed Oct 30 11:06:03 2013 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Wed Nov 6 16:12:02 2013 +0000
----------------------------------------------------------------------
.../location/jclouds/JcloudsLocation.java | 56 ++++--
.../location/jclouds/JcloudsLocationConfig.java | 7 +
.../location/jclouds/JcloudsLocationTest.java | 191 +++++++++++++++++--
3 files changed, 224 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4265e664/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocation.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocation.java b/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocation.java
index 135cbcd..736a461 100644
--- a/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocation.java
+++ b/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocation.java
@@ -19,6 +19,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -162,6 +164,14 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
}
setCreationString(getConfigBag());
+
+ if (getConfig(MACHINE_CREATION_SEMAPHORE) == null) {
+ Integer maxConcurrent = getConfig(MAX_CONCURRENT_MACHINE_CREATIONS);
+ if (maxConcurrent == null || maxConcurrent < 1) {
+ throw new IllegalStateException(MAX_CONCURRENT_MACHINE_CREATIONS.getName() + " must be >= 1, but was "+maxConcurrent);
+ }
+ setConfig(MACHINE_CREATION_SEMAPHORE, new Semaphore(maxConcurrent, true));
+ }
}
@Override
@@ -178,7 +188,8 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
return getManagementContext().getLocationManager().createLocation(LocationSpec.create(getClass())
.parent(this)
.configure(getRawLocalConfigBag().getAllConfig())
- .configure(newFlags));
+ .configure(newFlags)
+ .configure(MACHINE_CREATION_SEMAPHORE, getMachineCreationSemaphore()));
}
@Override
@@ -232,6 +243,10 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
USER, JCLOUDS_KEY_USERNAME);
}
+ protected Semaphore getMachineCreationSemaphore() {
+ return checkNotNull(getConfig(MACHINE_CREATION_SEMAPHORE), MACHINE_CREATION_SEMAPHORE.getName());
+ }
+
protected Collection<JcloudsLocationCustomizer> getCustomizers(ConfigBag setup) {
JcloudsLocationCustomizer customizer = setup.get(JCLOUDS_LOCATION_CUSTOMIZER);
Collection<JcloudsLocationCustomizer> customizers = setup.get(JCLOUDS_LOCATION_CUSTOMIZERS);
@@ -386,19 +401,36 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
try {
LOG.info("Creating VM in "+setup.getDescription()+" for "+this);
- Template template = buildTemplate(computeService, setup);
- LoginCredentials initialCredentials = initUserTemplateOptions(template, setup);
- for (JcloudsLocationCustomizer customizer : getCustomizers(setup)) {
- customizer.customize(this, computeService, template.getOptions());
+ LoginCredentials initialCredentials;
+ Set<? extends NodeMetadata> nodes;
+ Semaphore machineCreationSemaphore = getMachineCreationSemaphore();
+ boolean acquired = machineCreationSemaphore.tryAcquire(0, TimeUnit.SECONDS);
+ if (!acquired) {
+ LOG.info("Waiting in {} for machine-creation permit ({} other queuing requests already)", new Object[] {this, machineCreationSemaphore.getQueueLength()});
+ Stopwatch stopwatch = new Stopwatch().start();
+ machineCreationSemaphore.acquire();
+ LOG.info("Acquired in {} machine-creation permit, after waiting {}", this, Time.makeTimeStringRounded(stopwatch));
+ } else {
+ LOG.info("Acquired in {} machine-creation permit immediately", this);
+ }
+ try {
+ Template template = buildTemplate(computeService, setup);
+ initialCredentials = initUserTemplateOptions(template, setup);
+ for (JcloudsLocationCustomizer customizer : getCustomizers(setup)) {
+ customizer.customize(this, computeService, template.getOptions());
+ }
+ LOG.debug("jclouds using template {} / options {} to provision machine in {}", new Object[] {
+ template, template.getOptions(), setup.getDescription()});
+
+ if (!setup.getUnusedConfig().isEmpty())
+ LOG.debug("NOTE: unused flags passed to obtain VM in "+setup.getDescription()+": "+
+ setup.getUnusedConfig());
+
+ nodes = computeService.createNodesInGroup(groupId, 1, template);
+ } finally {
+ machineCreationSemaphore.release();
}
- LOG.debug("jclouds using template {} / options {} to provision machine in {}", new Object[] {
- template, template.getOptions(), setup.getDescription()});
-
- if (!setup.getUnusedConfig().isEmpty())
- LOG.debug("NOTE: unused flags passed to obtain VM in "+setup.getDescription()+": "+
- setup.getUnusedConfig());
- Set<? extends NodeMetadata> nodes = computeService.createNodesInGroup(groupId, 1, template);
node = Iterables.getOnlyElement(nodes, null);
LOG.debug("jclouds created {} for {}", node, setup.getDescription());
if (node == null)
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4265e664/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocationConfig.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocationConfig.java b/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocationConfig.java
index c634482..b01a355 100644
--- a/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocationConfig.java
+++ b/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocationConfig.java
@@ -2,6 +2,7 @@ package brooklyn.location.jclouds;
import java.io.File;
import java.util.Collection;
+import java.util.concurrent.Semaphore;
import org.jclouds.Constants;
import org.jclouds.compute.domain.TemplateBuilder;
@@ -143,6 +144,12 @@ public interface JcloudsLocationConfig extends CloudLocationConfig {
public static final ConfigKey<Integer> MACHINE_CREATE_ATTEMPTS = ConfigKeys.newIntegerConfigKey(
"machineCreateAttempts", "Number of times to retry if jclouds fails to create a VM", 1);
+ public static final ConfigKey<Integer> MAX_CONCURRENT_MACHINE_CREATIONS = ConfigKeys.newIntegerConfigKey(
+ "maxConcurrentMachineCreations", "Maximum number of concurrent machine-creations", Integer.MAX_VALUE);
+
+ public static final ConfigKey<Semaphore> MACHINE_CREATION_SEMAPHORE = ConfigKeys.newConfigKey(
+ Semaphore.class, "machineCreationSemaphore", "Semaphore for controlling concurrent machine creation", null);
+
// TODO
// "noDefaultSshKeys" - hints that local ssh keys should not be read as defaults
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4265e664/locations/jclouds/src/test/java/brooklyn/location/jclouds/JcloudsLocationTest.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/test/java/brooklyn/location/jclouds/JcloudsLocationTest.java b/locations/jclouds/src/test/java/brooklyn/location/jclouds/JcloudsLocationTest.java
index b859a23..aaf9734 100644
--- a/locations/jclouds/src/test/java/brooklyn/location/jclouds/JcloudsLocationTest.java
+++ b/locations/jclouds/src/test/java/brooklyn/location/jclouds/JcloudsLocationTest.java
@@ -1,6 +1,11 @@
package brooklyn.location.jclouds;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
@@ -13,18 +18,22 @@ import org.testng.annotations.Test;
import brooklyn.config.BrooklynProperties;
import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
import brooklyn.entity.basic.Entities;
import brooklyn.location.LocationSpec;
import brooklyn.location.NoMachinesAvailableException;
import brooklyn.management.internal.LocalManagementContext;
+import brooklyn.test.Asserts;
import brooklyn.util.collections.MutableMap;
import brooklyn.util.config.ConfigBag;
import brooklyn.util.exceptions.Exceptions;
+import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import com.google.common.reflect.TypeToken;
/**
* @author Shane Witbeck
@@ -35,6 +44,8 @@ public class JcloudsLocationTest implements JcloudsLocationConfig {
new RuntimeException("early termination for test");
public static class BailOutJcloudsLocation extends JcloudsLocation {
+ public static final ConfigKey<Function<ConfigBag,Void>> BUILD_TEMPLATE_INTERCEPTOR = ConfigKeys.newConfigKey(new TypeToken<Function<ConfigBag,Void>>() {}, "buildtemplateinterceptor");
+
ConfigBag lastConfigBag;
public BailOutJcloudsLocation() {
@@ -48,9 +59,10 @@ public class JcloudsLocationTest implements JcloudsLocationConfig {
@Override
protected Template buildTemplate(ComputeService computeService, ConfigBag config) {
lastConfigBag = config;
+ if (getConfig(BUILD_TEMPLATE_INTERCEPTOR) != null) getConfig(BUILD_TEMPLATE_INTERCEPTOR).apply(config);
throw BAIL_OUT_FOR_TESTING;
}
- protected synchronized void tryObtainAndCheck(Map<?,?> flags, Predicate<ConfigBag> test) {
+ protected void tryObtainAndCheck(Map<?,?> flags, Predicate<? super ConfigBag> test) {
try {
obtain(flags);
} catch (NoMachinesAvailableException e) {
@@ -110,34 +122,49 @@ public class JcloudsLocationTest implements JcloudsLocationConfig {
}
protected BailOutJcloudsLocation newSampleBailOutJcloudsLocationForTesting() {
+ return newSampleBailOutJcloudsLocationForTesting(ImmutableMap.<ConfigKey<?>,Object>of());
+ }
+
+ protected BailOutJcloudsLocation newSampleBailOutJcloudsLocationForTesting(Map<?,?> config) {
+ Map<ConfigKey<?>,?> allConfig = MutableMap.<ConfigKey<?>,Object>builder()
+ .put(IMAGE_ID, "bogus")
+ .put(CLOUD_PROVIDER, "aws-ec2")
+ .put(ACCESS_IDENTITY, "bogus")
+ .put(CLOUD_REGION_ID, "bogus")
+ .put(ACCESS_CREDENTIAL, "bogus")
+ .put(USER, "fred")
+ .put(MIN_RAM, 16)
+ .putAll((Map)config)
+ .build();
return managementContext.getLocationManager().createLocation(LocationSpec.create(BailOutJcloudsLocation.class)
- .configure(MutableMap.of(
- IMAGE_ID, "bogus",
- CLOUD_PROVIDER, "aws-ec2",
- ACCESS_IDENTITY, "bogus",
- CLOUD_REGION_ID, "bogus",
- ACCESS_CREDENTIAL, "bogus",
- USER, "fred",
- MIN_RAM, 16)));
+ .configure(allConfig));
}
protected BailOutWithTemplateJcloudsLocation newSampleBailOutWithTemplateJcloudsLocation() {
+ return newSampleBailOutWithTemplateJcloudsLocation(ImmutableMap.<ConfigKey<?>,Object>of());
+ }
+
+ protected BailOutWithTemplateJcloudsLocation newSampleBailOutWithTemplateJcloudsLocation(Map<?,?> config) {
String identity = (String) brooklynProperties.get("brooklyn.location.jclouds.aws-ec2.identity");
if (identity == null) identity = (String) brooklynProperties.get("brooklyn.jclouds.aws-ec2.identity");
String credential = (String) brooklynProperties.get("brooklyn.location.jclouds.aws-ec2.credential");
if (credential == null) identity = (String) brooklynProperties.get("brooklyn.jclouds.aws-ec2.credential");
+ Map<ConfigKey<?>,?> allConfig = MutableMap.<ConfigKey<?>,Object>builder()
+ .put(CLOUD_PROVIDER, "aws-ec2")
+ .put(CLOUD_REGION_ID, "eu-west-1")
+ .put(IMAGE_ID, "us-east-1/ami-7d7bfc14") // so it runs faster, without loading all EC2 images
+ .put(ACCESS_IDENTITY, identity)
+ .put(ACCESS_CREDENTIAL, credential)
+ .put(USER, "fred")
+ .put(INBOUND_PORTS, "[22, 80, 9999]")
+ .putAll((Map)config)
+ .build();
+
return managementContext.getLocationManager().createLocation(LocationSpec.create(BailOutWithTemplateJcloudsLocation.class)
- .configure(MutableMap.of(
- CLOUD_PROVIDER, "aws-ec2",
- CLOUD_REGION_ID, "eu-west-1",
- IMAGE_ID, "us-east-1/ami-7d7bfc14", // so it runs faster, without loading all EC2 images
- ACCESS_IDENTITY, identity,
- ACCESS_CREDENTIAL, credential,
- USER, "fred",
- INBOUND_PORTS, "[22, 80, 9999]")));
+ .configure(allConfig));
}
-
+
public static Predicate<ConfigBag> checkerFor(final String user, final Integer minRam, final Integer minCores) {
return new Predicate<ConfigBag>() {
@Override
@@ -266,5 +293,133 @@ public class JcloudsLocationTest implements JcloudsLocationConfig {
Assert.assertEquals(jcloudsLocation.template.getOptions().getInboundPorts(), ports);
}
+ @Test
+ public void testCreateWithMaxConcurrentCallsUnboundedByDefault() throws Exception {
+ final int numCalls = 20;
+ ConcurrencyTracker interceptor = new ConcurrencyTracker();
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ try {
+ final BailOutJcloudsLocation jcloudsLocation = newSampleBailOutJcloudsLocationForTesting(ImmutableMap.of(BailOutJcloudsLocation.BUILD_TEMPLATE_INTERCEPTOR, interceptor));
+
+ for (int i = 0; i < numCalls; i++) {
+ executor.execute(new Runnable() {
+ @Override public void run() {
+ jcloudsLocation.tryObtainAndCheck(MutableMap.of(), Predicates.alwaysTrue());
+ }});
+ }
+
+ interceptor.assertCallCountEventually(numCalls);
+
+ interceptor.unblock();
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test(groups="Integration") // because takes 1 sec
+ public void testCreateWithMaxConcurrentCallsRespectsConfig() throws Exception {
+ final int numCalls = 4;
+ final int maxConcurrentCreations = 2;
+ ConcurrencyTracker interceptor = new ConcurrencyTracker();
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ try {
+ final BailOutJcloudsLocation jcloudsLocation = newSampleBailOutJcloudsLocationForTesting(ImmutableMap.of(
+ BailOutJcloudsLocation.BUILD_TEMPLATE_INTERCEPTOR, interceptor,
+ JcloudsLocation.MAX_CONCURRENT_MACHINE_CREATIONS, maxConcurrentCreations));
+
+ for (int i = 0; i < numCalls; i++) {
+ executor.execute(new Runnable() {
+ @Override public void run() {
+ jcloudsLocation.tryObtainAndCheck(MutableMap.of(), Predicates.alwaysTrue());
+ }});
+ }
+
+ interceptor.assertCallCountEventually(maxConcurrentCreations);
+ interceptor.assertCallCountContinually(maxConcurrentCreations);
+
+ interceptor.unblock();
+ interceptor.assertCallCountEventually(numCalls);
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test(groups="Integration") // because takes 1 sec
+ public void testCreateWithMaxConcurrentCallsAppliesToSubLocations() throws Exception {
+ final int numCalls = 4;
+ final int maxConcurrentCreations = 2;
+ ConcurrencyTracker interceptor = new ConcurrencyTracker();
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ try {
+ final BailOutJcloudsLocation jcloudsLocation = newSampleBailOutJcloudsLocationForTesting(ImmutableMap.of(
+ BailOutJcloudsLocation.BUILD_TEMPLATE_INTERCEPTOR, interceptor,
+ JcloudsLocation.MAX_CONCURRENT_MACHINE_CREATIONS, maxConcurrentCreations));
+
+
+ for (int i = 0; i < numCalls; i++) {
+ final BailOutJcloudsLocation subLocation = (BailOutJcloudsLocation) jcloudsLocation.newSubLocation(MutableMap.of());
+ executor.execute(new Runnable() {
+ @Override public void run() {
+ subLocation.tryObtainAndCheck(MutableMap.of(), Predicates.alwaysTrue());
+ }});
+ }
+
+ interceptor.assertCallCountEventually(maxConcurrentCreations);
+ interceptor.assertCallCountContinually(maxConcurrentCreations);
+
+ interceptor.unblock();
+ interceptor.assertCallCountEventually(numCalls);
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ public static class ConcurrencyTracker implements Function<ConfigBag,Void> {
+ final AtomicInteger concurrentCallsCounter = new AtomicInteger();
+ final CountDownLatch continuationLatch = new CountDownLatch(1);
+
+ @Override public Void apply(ConfigBag input) {
+ concurrentCallsCounter.incrementAndGet();
+ try {
+ continuationLatch.await();
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ return null;
+ }
+
+ public void unblock() {
+ continuationLatch.countDown();
+ }
+
+ public void assertCallCountEventually(final int expected) {
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ Assert.assertEquals(concurrentCallsCounter.get(), expected);
+ }
+ });
+ }
+
+ public void assertCallCountContinually(final int expected) {
+ Asserts.succeedsContinually(new Runnable() {
+ @Override public void run() {
+ Assert.assertEquals(concurrentCallsCounter.get(), expected);
+ }
+ });
+ }
+ }
+
// TODO more tests, where flags come in from resolver, named locations, etc
}