You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:49:20 UTC

[16/50] brooklyn-server git commit: Add JcloudsLocation.MAX_CONCURRENT_MACHINE_CREATIONS

Add JcloudsLocation.MAX_CONCURRENT_MACHINE_CREATIONS

- Prevent more than the given number of concurrent calls to create
  VMs for a given JcloudsLocation instance.
- Required in various clouds (sometimes, at least!) such as aws-ec2
  where one gets http response 503 RequestLimitExceeded when trying to
  provision 18 machines concurrently.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/4265e664
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/4265e664
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/4265e664

Branch: refs/heads/0.6.0
Commit: 4265e66496e379f1d0a367cdcd5f5bcc4dfe7a12
Parents: 2fc1caf
Author: Aled Sage <al...@gmail.com>
Authored: Wed Oct 30 11:06:03 2013 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Wed Nov 6 16:12:02 2013 +0000

----------------------------------------------------------------------
 .../location/jclouds/JcloudsLocation.java       |  56 ++++--
 .../location/jclouds/JcloudsLocationConfig.java |   7 +
 .../location/jclouds/JcloudsLocationTest.java   | 191 +++++++++++++++++--
 3 files changed, 224 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4265e664/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocation.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocation.java b/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocation.java
index 135cbcd..736a461 100644
--- a/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocation.java
+++ b/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocation.java
@@ -19,6 +19,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -162,6 +164,14 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
         }
         
         setCreationString(getConfigBag());
+        
+        if (getConfig(MACHINE_CREATION_SEMAPHORE) == null) {
+            Integer maxConcurrent = getConfig(MAX_CONCURRENT_MACHINE_CREATIONS);
+            if (maxConcurrent == null || maxConcurrent < 1) {
+                throw new IllegalStateException(MAX_CONCURRENT_MACHINE_CREATIONS.getName() + " must be >= 1, but was "+maxConcurrent);
+            }
+            setConfig(MACHINE_CREATION_SEMAPHORE, new Semaphore(maxConcurrent, true));
+        }
     }
     
     @Override
@@ -178,7 +188,8 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
         return getManagementContext().getLocationManager().createLocation(LocationSpec.create(getClass())
                 .parent(this)
                 .configure(getRawLocalConfigBag().getAllConfig())
-                .configure(newFlags));
+                .configure(newFlags)
+                .configure(MACHINE_CREATION_SEMAPHORE, getMachineCreationSemaphore()));
     }
 
     @Override
@@ -232,6 +243,10 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
                 USER, JCLOUDS_KEY_USERNAME);
     }
     
+    protected Semaphore getMachineCreationSemaphore() {
+        return checkNotNull(getConfig(MACHINE_CREATION_SEMAPHORE), MACHINE_CREATION_SEMAPHORE.getName());
+    }
+    
     protected Collection<JcloudsLocationCustomizer> getCustomizers(ConfigBag setup) {
         JcloudsLocationCustomizer customizer = setup.get(JCLOUDS_LOCATION_CUSTOMIZER);
         Collection<JcloudsLocationCustomizer> customizers = setup.get(JCLOUDS_LOCATION_CUSTOMIZERS);
@@ -386,19 +401,36 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
         try {
             LOG.info("Creating VM in "+setup.getDescription()+" for "+this);
 
-            Template template = buildTemplate(computeService, setup);
-            LoginCredentials initialCredentials = initUserTemplateOptions(template, setup);
-            for (JcloudsLocationCustomizer customizer : getCustomizers(setup)) {
-                customizer.customize(this, computeService, template.getOptions());
+            LoginCredentials initialCredentials;
+            Set<? extends NodeMetadata> nodes;
+            Semaphore machineCreationSemaphore = getMachineCreationSemaphore();
+            boolean acquired = machineCreationSemaphore.tryAcquire(0, TimeUnit.SECONDS);
+            if (!acquired) {
+                LOG.info("Waiting in {} for machine-creation permit ({} other queuing requests already)", new Object[] {this, machineCreationSemaphore.getQueueLength()});
+                Stopwatch stopwatch = new Stopwatch().start();
+                machineCreationSemaphore.acquire();
+                LOG.info("Acquired in {} machine-creation permit, after waiting {}", this, Time.makeTimeStringRounded(stopwatch));
+            } else {
+                LOG.info("Acquired in {} machine-creation permit immediately", this);
+            }
+            try {
+                Template template = buildTemplate(computeService, setup);
+                initialCredentials = initUserTemplateOptions(template, setup);
+                for (JcloudsLocationCustomizer customizer : getCustomizers(setup)) {
+                    customizer.customize(this, computeService, template.getOptions());
+                }
+                LOG.debug("jclouds using template {} / options {} to provision machine in {}", new Object[] {
+                        template, template.getOptions(), setup.getDescription()});
+    
+                if (!setup.getUnusedConfig().isEmpty())
+                    LOG.debug("NOTE: unused flags passed to obtain VM in "+setup.getDescription()+": "+
+                            setup.getUnusedConfig());
+                
+                nodes = computeService.createNodesInGroup(groupId, 1, template);
+            } finally {
+                machineCreationSemaphore.release();
             }
-            LOG.debug("jclouds using template {} / options {} to provision machine in {}", new Object[] {
-                    template, template.getOptions(), setup.getDescription()});
-
-            if (!setup.getUnusedConfig().isEmpty())
-                LOG.debug("NOTE: unused flags passed to obtain VM in "+setup.getDescription()+": "+
-                        setup.getUnusedConfig());
             
-            Set<? extends NodeMetadata> nodes = computeService.createNodesInGroup(groupId, 1, template);
             node = Iterables.getOnlyElement(nodes, null);
             LOG.debug("jclouds created {} for {}", node, setup.getDescription());
             if (node == null)

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4265e664/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocationConfig.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocationConfig.java b/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocationConfig.java
index c634482..b01a355 100644
--- a/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocationConfig.java
+++ b/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocationConfig.java
@@ -2,6 +2,7 @@ package brooklyn.location.jclouds;
 
 import java.io.File;
 import java.util.Collection;
+import java.util.concurrent.Semaphore;
 
 import org.jclouds.Constants;
 import org.jclouds.compute.domain.TemplateBuilder;
@@ -143,6 +144,12 @@ public interface JcloudsLocationConfig extends CloudLocationConfig {
     public static final ConfigKey<Integer> MACHINE_CREATE_ATTEMPTS = ConfigKeys.newIntegerConfigKey(
             "machineCreateAttempts", "Number of times to retry if jclouds fails to create a VM", 1);
 
+    public static final ConfigKey<Integer> MAX_CONCURRENT_MACHINE_CREATIONS = ConfigKeys.newIntegerConfigKey(
+            "maxConcurrentMachineCreations", "Maximum number of concurrent machine-creations", Integer.MAX_VALUE);
+
+    public static final ConfigKey<Semaphore> MACHINE_CREATION_SEMAPHORE = ConfigKeys.newConfigKey(
+            Semaphore.class, "machineCreationSemaphore", "Semaphore for controlling concurrent machine creation", null);
+
     // TODO
     
 //  "noDefaultSshKeys" - hints that local ssh keys should not be read as defaults

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4265e664/locations/jclouds/src/test/java/brooklyn/location/jclouds/JcloudsLocationTest.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/test/java/brooklyn/location/jclouds/JcloudsLocationTest.java b/locations/jclouds/src/test/java/brooklyn/location/jclouds/JcloudsLocationTest.java
index b859a23..aaf9734 100644
--- a/locations/jclouds/src/test/java/brooklyn/location/jclouds/JcloudsLocationTest.java
+++ b/locations/jclouds/src/test/java/brooklyn/location/jclouds/JcloudsLocationTest.java
@@ -1,6 +1,11 @@
 package brooklyn.location.jclouds;
 
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.annotation.Nullable;
 
@@ -13,18 +18,22 @@ import org.testng.annotations.Test;
 
 import brooklyn.config.BrooklynProperties;
 import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
 import brooklyn.entity.basic.Entities;
 import brooklyn.location.LocationSpec;
 import brooklyn.location.NoMachinesAvailableException;
 import brooklyn.management.internal.LocalManagementContext;
+import brooklyn.test.Asserts;
 import brooklyn.util.collections.MutableMap;
 import brooklyn.util.config.ConfigBag;
 import brooklyn.util.exceptions.Exceptions;
 
+import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import com.google.common.reflect.TypeToken;
 
 /**
  * @author Shane Witbeck
@@ -35,6 +44,8 @@ public class JcloudsLocationTest implements JcloudsLocationConfig {
             new RuntimeException("early termination for test");
     
     public static class BailOutJcloudsLocation extends JcloudsLocation {
+        public static final ConfigKey<Function<ConfigBag,Void>> BUILD_TEMPLATE_INTERCEPTOR = ConfigKeys.newConfigKey(new TypeToken<Function<ConfigBag,Void>>() {}, "buildtemplateinterceptor");
+        
         ConfigBag lastConfigBag;
 
         public BailOutJcloudsLocation() {
@@ -48,9 +59,10 @@ public class JcloudsLocationTest implements JcloudsLocationConfig {
         @Override
         protected Template buildTemplate(ComputeService computeService, ConfigBag config) {
             lastConfigBag = config;
+            if (getConfig(BUILD_TEMPLATE_INTERCEPTOR) != null) getConfig(BUILD_TEMPLATE_INTERCEPTOR).apply(config);
             throw BAIL_OUT_FOR_TESTING;
         }
-        protected synchronized void tryObtainAndCheck(Map<?,?> flags, Predicate<ConfigBag> test) {
+        protected void tryObtainAndCheck(Map<?,?> flags, Predicate<? super ConfigBag> test) {
             try {
                 obtain(flags);
             } catch (NoMachinesAvailableException e) {
@@ -110,34 +122,49 @@ public class JcloudsLocationTest implements JcloudsLocationConfig {
     }
     
     protected BailOutJcloudsLocation newSampleBailOutJcloudsLocationForTesting() {
+        return newSampleBailOutJcloudsLocationForTesting(ImmutableMap.<ConfigKey<?>,Object>of());
+    }
+    
+    protected BailOutJcloudsLocation newSampleBailOutJcloudsLocationForTesting(Map<?,?> config) {
+        Map<ConfigKey<?>,?> allConfig = MutableMap.<ConfigKey<?>,Object>builder()
+                .put(IMAGE_ID, "bogus")
+                .put(CLOUD_PROVIDER, "aws-ec2")
+                .put(ACCESS_IDENTITY, "bogus")
+                .put(CLOUD_REGION_ID, "bogus")
+                .put(ACCESS_CREDENTIAL, "bogus")
+                .put(USER, "fred")
+                .put(MIN_RAM, 16)
+                .putAll((Map)config)
+                .build();
         return managementContext.getLocationManager().createLocation(LocationSpec.create(BailOutJcloudsLocation.class)
-                .configure(MutableMap.of(
-                        IMAGE_ID, "bogus",
-                        CLOUD_PROVIDER, "aws-ec2",
-                        ACCESS_IDENTITY, "bogus",
-                        CLOUD_REGION_ID, "bogus",
-                        ACCESS_CREDENTIAL, "bogus",
-                        USER, "fred",
-                        MIN_RAM, 16)));
+                .configure(allConfig));
     }
     
     protected BailOutWithTemplateJcloudsLocation newSampleBailOutWithTemplateJcloudsLocation() {
+        return newSampleBailOutWithTemplateJcloudsLocation(ImmutableMap.<ConfigKey<?>,Object>of());
+    }
+
+    protected BailOutWithTemplateJcloudsLocation newSampleBailOutWithTemplateJcloudsLocation(Map<?,?> config) {
         String identity = (String) brooklynProperties.get("brooklyn.location.jclouds.aws-ec2.identity");
         if (identity == null) identity = (String) brooklynProperties.get("brooklyn.jclouds.aws-ec2.identity");
         String credential = (String) brooklynProperties.get("brooklyn.location.jclouds.aws-ec2.credential");
         if (credential == null) identity = (String) brooklynProperties.get("brooklyn.jclouds.aws-ec2.credential");
         
+        Map<ConfigKey<?>,?> allConfig = MutableMap.<ConfigKey<?>,Object>builder()
+                .put(CLOUD_PROVIDER, "aws-ec2")
+                .put(CLOUD_REGION_ID, "eu-west-1")
+                .put(IMAGE_ID, "us-east-1/ami-7d7bfc14") // so it runs faster, without loading all EC2 images
+                .put(ACCESS_IDENTITY, identity)
+                .put(ACCESS_CREDENTIAL, credential)
+                .put(USER, "fred")
+                .put(INBOUND_PORTS, "[22, 80, 9999]")
+                .putAll((Map)config)
+                .build();
+        
         return managementContext.getLocationManager().createLocation(LocationSpec.create(BailOutWithTemplateJcloudsLocation.class)
-                .configure(MutableMap.of(
-                        CLOUD_PROVIDER, "aws-ec2",
-                        CLOUD_REGION_ID, "eu-west-1",
-                        IMAGE_ID, "us-east-1/ami-7d7bfc14", // so it runs faster, without loading all EC2 images
-                        ACCESS_IDENTITY, identity,
-                        ACCESS_CREDENTIAL, credential,
-                        USER, "fred",
-                        INBOUND_PORTS, "[22, 80, 9999]")));
+                .configure(allConfig));
     }
-    
+
     public static Predicate<ConfigBag> checkerFor(final String user, final Integer minRam, final Integer minCores) {
         return new Predicate<ConfigBag>() {
             @Override
@@ -266,5 +293,133 @@ public class JcloudsLocationTest implements JcloudsLocationConfig {
         Assert.assertEquals(jcloudsLocation.template.getOptions().getInboundPorts(), ports);
     }
 
+    @Test
+    public void testCreateWithMaxConcurrentCallsUnboundedByDefault() throws Exception {
+        final int numCalls = 20;
+        ConcurrencyTracker interceptor = new ConcurrencyTracker();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        try {
+            final BailOutJcloudsLocation jcloudsLocation = newSampleBailOutJcloudsLocationForTesting(ImmutableMap.of(BailOutJcloudsLocation.BUILD_TEMPLATE_INTERCEPTOR, interceptor));
+            
+            for (int i = 0; i < numCalls; i++) {
+                executor.execute(new Runnable() {
+                    @Override public void run() {
+                        jcloudsLocation.tryObtainAndCheck(MutableMap.of(), Predicates.alwaysTrue());
+                    }});
+            }
+            
+            interceptor.assertCallCountEventually(numCalls);
+            
+            interceptor.unblock();
+            executor.shutdown();
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+            
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
+    @Test(groups="Integration") // because takes 1 sec
+    public void testCreateWithMaxConcurrentCallsRespectsConfig() throws Exception {
+        final int numCalls = 4;
+        final int maxConcurrentCreations = 2;
+        ConcurrencyTracker interceptor = new ConcurrencyTracker();
+        ExecutorService executor = Executors.newCachedThreadPool();
+        
+        try {
+            final BailOutJcloudsLocation jcloudsLocation = newSampleBailOutJcloudsLocationForTesting(ImmutableMap.of(
+                    BailOutJcloudsLocation.BUILD_TEMPLATE_INTERCEPTOR, interceptor,
+                    JcloudsLocation.MAX_CONCURRENT_MACHINE_CREATIONS, maxConcurrentCreations));
+            
+            for (int i = 0; i < numCalls; i++) {
+                executor.execute(new Runnable() {
+                    @Override public void run() {
+                        jcloudsLocation.tryObtainAndCheck(MutableMap.of(), Predicates.alwaysTrue());
+                    }});
+            }
+            
+            interceptor.assertCallCountEventually(maxConcurrentCreations);
+            interceptor.assertCallCountContinually(maxConcurrentCreations);
+            
+            interceptor.unblock();
+            interceptor.assertCallCountEventually(numCalls);
+            executor.shutdown();
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+            
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
+    @Test(groups="Integration") // because takes 1 sec
+    public void testCreateWithMaxConcurrentCallsAppliesToSubLocations() throws Exception {
+        final int numCalls = 4;
+        final int maxConcurrentCreations = 2;
+        ConcurrencyTracker interceptor = new ConcurrencyTracker();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        try {
+            final BailOutJcloudsLocation jcloudsLocation = newSampleBailOutJcloudsLocationForTesting(ImmutableMap.of(
+                    BailOutJcloudsLocation.BUILD_TEMPLATE_INTERCEPTOR, interceptor,
+                    JcloudsLocation.MAX_CONCURRENT_MACHINE_CREATIONS, maxConcurrentCreations));
+            
+    
+            for (int i = 0; i < numCalls; i++) {
+                final BailOutJcloudsLocation subLocation = (BailOutJcloudsLocation) jcloudsLocation.newSubLocation(MutableMap.of());
+                executor.execute(new Runnable() {
+                    @Override public void run() {
+                        subLocation.tryObtainAndCheck(MutableMap.of(), Predicates.alwaysTrue());
+                    }});
+            }
+            
+            interceptor.assertCallCountEventually(maxConcurrentCreations);
+            interceptor.assertCallCountContinually(maxConcurrentCreations);
+            
+            interceptor.unblock();
+            interceptor.assertCallCountEventually(numCalls);
+            executor.shutdown();
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
+    public static class ConcurrencyTracker implements Function<ConfigBag,Void> {
+        final AtomicInteger concurrentCallsCounter = new AtomicInteger();
+        final CountDownLatch continuationLatch = new CountDownLatch(1);
+        
+        @Override public Void apply(ConfigBag input) {
+            concurrentCallsCounter.incrementAndGet();
+            try {
+                continuationLatch.await();
+            } catch (InterruptedException e) {
+                throw Exceptions.propagate(e);
+            }
+            return null;
+        }
+        
+        public void unblock() {
+            continuationLatch.countDown();
+        }
+
+        public void assertCallCountEventually(final int expected) {
+            Asserts.succeedsEventually(new Runnable() {
+                @Override public void run() {
+                    Assert.assertEquals(concurrentCallsCounter.get(), expected);
+                }
+            });
+        }
+        
+        public void assertCallCountContinually(final int expected) {
+            Asserts.succeedsContinually(new Runnable() {
+                @Override public void run() {
+                    Assert.assertEquals(concurrentCallsCounter.get(), expected);
+                }
+            });
+        }
+    }
+
     // TODO more tests, where flags come in from resolver, named locations, etc
 }