You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ro...@apache.org on 2022/09/16 10:14:41 UTC

[cloudstack] branch main updated: Resource reservation framework (#6694)

This is an automated email from the ASF dual-hosted git repository.

rohit pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to refs/heads/main by this push:
     new bbc12605767 Resource reservation framework (#6694)
bbc12605767 is described below

commit bbc126057674a6cda047c2ea941d09af5c0e14a6
Author: dahn <da...@onecht.net>
AuthorDate: Fri Sep 16 12:14:35 2022 +0200

    Resource reservation framework (#6694)
    
    This PR addresses parallel resource allocation as a generalization of the problem and solution described in #6644. Instead of the Global lock on the resources a reservation record is created which is added in the resource check count in the ResourceLimitService/ResourceLimitManagerImpl. As a convenience a CheckedReservation is created. This is an implementation of AutoClosable and can be used as a guard in a try-with-resource fashion. The close method of the CheckedReservation wil del [...]
    
    Co-authored-by: Boris Stoyanov - a.k.a Bobby <bs...@gmail.com>
---
 .../java/com/cloud/user/ResourceLimitService.java  |  17 +-
 .../ResourceReservation.java}                      |  25 +-
 .../template/LocalTemplateDownloaderTest.java      |   6 +-
 .../cloudstack/reservation/ReservationVO.java      |  90 ++++
 .../cloudstack/reservation/dao/ReservationDao.java |  20 +-
 .../reservation/dao/ReservationDaoImpl.java        |  75 +++
 .../spring-engine-schema-core-daos-context.xml     |   1 +
 .../resources/META-INF/db/schema-41710to41800.sql  |  12 +
 .../cloud/resourcelimit/CheckedReservation.java    | 134 ++++++
 .../resourcelimit/ResourceLimitManagerImpl.java    |  65 +--
 .../main/java/com/cloud/vm/UserVmManagerImpl.java  | 508 +++++++++++----------
 .../resourcelimit/CheckedReservationTest.java      | 102 +++++
 .../cloud/vpc/MockResourceLimitManagerImpl.java    |   7 +
 .../smoke/test_deploy_vms_in_parallel.py           | 172 +++++++
 14 files changed, 946 insertions(+), 288 deletions(-)

diff --git a/api/src/main/java/com/cloud/user/ResourceLimitService.java b/api/src/main/java/com/cloud/user/ResourceLimitService.java
index 886cd23c639..41b2c8135cf 100644
--- a/api/src/main/java/com/cloud/user/ResourceLimitService.java
+++ b/api/src/main/java/com/cloud/user/ResourceLimitService.java
@@ -24,6 +24,7 @@ import com.cloud.configuration.ResourceLimit;
 import com.cloud.domain.Domain;
 import com.cloud.exception.ResourceAllocationException;
 import org.apache.cloudstack.framework.config.ConfigKey;
+import org.apache.cloudstack.user.ResourceReservation;
 
 public interface ResourceLimitService {
 
@@ -91,7 +92,7 @@ public interface ResourceLimitService {
     /**
      * This call should be used when we have already queried resource limit for an account. This is to handle
      * some corner cases where queried limit may be null.
-     * @param accountType
+     * @param accountId
      * @param limit
      * @param type
      * @return
@@ -102,7 +103,7 @@ public interface ResourceLimitService {
      * Finds the resource limit for a specified domain and type. If the domain has an infinite limit, will check
      * up the domain hierarchy
      *
-     * @param account
+     * @param domain
      * @param type
      * @return resource limit
      */
@@ -197,4 +198,16 @@ public interface ResourceLimitService {
      * @param delta
      */
     void decrementResourceCount(long accountId, ResourceType type, Boolean displayResource, Long... delta);
+
+    /**
+     * Adds a reservation that will be counted in subsequent calls to {count}getResourceCount{code} until {code}this[code}
+     * is closed. It will create a reservation record that will be counted when resource limits are checked.
+     * @param account The account for which the reservation is.
+     * @param displayResource whether this resource is shown to users at all (if not it is not counted to limits)
+     * @param type resource type
+     * @param delta amount to reserve (will not be <+ 0)
+     * @return a {code}AutoClosable{Code} object representing the resource the user needs
+     */
+    ResourceReservation getReservation(Account account, Boolean displayResource, ResourceType type, Long delta) throws ResourceAllocationException;
+
 }
diff --git a/api/src/main/java/org/apache/cloudstack/acl/APILimitChecker.java b/api/src/main/java/org/apache/cloudstack/user/ResourceReservation.java
similarity index 64%
copy from api/src/main/java/org/apache/cloudstack/acl/APILimitChecker.java
copy to api/src/main/java/org/apache/cloudstack/user/ResourceReservation.java
index 110742c059d..170193570cf 100644
--- a/api/src/main/java/org/apache/cloudstack/acl/APILimitChecker.java
+++ b/api/src/main/java/org/apache/cloudstack/user/ResourceReservation.java
@@ -1,3 +1,4 @@
+//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -14,17 +15,23 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-package org.apache.cloudstack.acl;
-
-import org.apache.cloudstack.api.ServerApiException;
+//
+package org.apache.cloudstack.user;
 
-import com.cloud.user.Account;
-import com.cloud.utils.component.Adapter;
+import com.cloud.configuration.Resource;
+import org.apache.cloudstack.api.InternalIdentity;
 
 /**
- * APILimitChecker checks if we should block an API request based on pre-set account based api limit.
+ * an interface defining an {code}AutoClosable{code} reservation object
  */
-public interface APILimitChecker extends Adapter {
-    // Interface for checking if the account is over its api limit
-    void checkLimit(Account account) throws ServerApiException;
+public interface
+ResourceReservation extends InternalIdentity {
+
+    Long getAccountId();
+
+    Long getDomainId();
+
+    Resource.ResourceType getResourceType();
+
+    Long getReservedAmount();
 }
diff --git a/core/src/test/java/com/cloud/storage/template/LocalTemplateDownloaderTest.java b/core/src/test/java/com/cloud/storage/template/LocalTemplateDownloaderTest.java
index 0a5d2f6657d..48253800801 100644
--- a/core/src/test/java/com/cloud/storage/template/LocalTemplateDownloaderTest.java
+++ b/core/src/test/java/com/cloud/storage/template/LocalTemplateDownloaderTest.java
@@ -19,7 +19,7 @@
 
 package com.cloud.storage.template;
 
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 
@@ -33,9 +33,7 @@ public class LocalTemplateDownloaderTest {
         String url = new File("pom.xml").toURI().toURL().toString();
         TemplateDownloader td = new LocalTemplateDownloader(null, url, System.getProperty("java.io.tmpdir"), TemplateDownloader.DEFAULT_MAX_TEMPLATE_SIZE_IN_BYTES, null);
         long bytes = td.download(true, null);
-        if (!(bytes > 0)) {
-            fail("Failed download");
-        }
+        assertTrue("Failed download", bytes > 0);
     }
 
 }
diff --git a/engine/schema/src/main/java/org/apache/cloudstack/reservation/ReservationVO.java b/engine/schema/src/main/java/org/apache/cloudstack/reservation/ReservationVO.java
new file mode 100644
index 00000000000..e5636f0bfc9
--- /dev/null
+++ b/engine/schema/src/main/java/org/apache/cloudstack/reservation/ReservationVO.java
@@ -0,0 +1,90 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package org.apache.cloudstack.reservation;
+
+import com.cloud.configuration.Resource;
+import org.apache.cloudstack.user.ResourceReservation;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+
+@Entity
+@Table(name = "resource_reservation")
+public class ReservationVO implements ResourceReservation {
+
+    @Id
+    @GeneratedValue(strategy = GenerationType.IDENTITY)
+    @Column(name = "id")
+    Long id;
+
+    @Column(name = "account_id")
+    long accountId;
+
+    @Column(name = "domain_id")
+    long domainId;
+
+    @Column(name = "resource_type", nullable = false)
+    Resource.ResourceType resourceType;
+
+    @Column(name = "amount")
+    long amount;
+
+    protected ReservationVO()
+    {}
+
+    public ReservationVO(Long accountId, Long domainId, Resource.ResourceType resourceType, Long delta) {
+        if (delta == null || delta <= 0) {
+            throw new CloudRuntimeException("resource reservations can not be made for no resources");
+        }
+        this.accountId = accountId;
+        this.domainId = domainId;
+        this.resourceType = resourceType;
+        this.amount = delta;
+    }
+
+    @Override
+    public long getId() {
+        return this.id;
+    }
+
+    @Override
+    public Long getAccountId() {
+        return accountId;
+    }
+
+    @Override
+    public Long getDomainId() {
+        return domainId;
+    }
+
+    @Override
+    public Resource.ResourceType getResourceType() {
+        return resourceType;
+    }
+
+    @Override
+    public Long getReservedAmount() {
+        return amount;
+    }
+}
diff --git a/api/src/main/java/org/apache/cloudstack/acl/APILimitChecker.java b/engine/schema/src/main/java/org/apache/cloudstack/reservation/dao/ReservationDao.java
similarity index 63%
rename from api/src/main/java/org/apache/cloudstack/acl/APILimitChecker.java
rename to engine/schema/src/main/java/org/apache/cloudstack/reservation/dao/ReservationDao.java
index 110742c059d..eead91c7b8e 100644
--- a/api/src/main/java/org/apache/cloudstack/acl/APILimitChecker.java
+++ b/engine/schema/src/main/java/org/apache/cloudstack/reservation/dao/ReservationDao.java
@@ -1,3 +1,4 @@
+//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -14,17 +15,14 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-package org.apache.cloudstack.acl;
-
-import org.apache.cloudstack.api.ServerApiException;
+//
+package org.apache.cloudstack.reservation.dao;
 
-import com.cloud.user.Account;
-import com.cloud.utils.component.Adapter;
+import com.cloud.configuration.Resource;
+import org.apache.cloudstack.reservation.ReservationVO;
+import com.cloud.utils.db.GenericDao;
 
-/**
- * APILimitChecker checks if we should block an API request based on pre-set account based api limit.
- */
-public interface APILimitChecker extends Adapter {
-    // Interface for checking if the account is over its api limit
-    void checkLimit(Account account) throws ServerApiException;
+public interface ReservationDao extends GenericDao<ReservationVO, Long> {
+    long getAccountReservation(Long account, Resource.ResourceType resourceType);
+    long getDomainReservation(Long domain, Resource.ResourceType resourceType);
 }
diff --git a/engine/schema/src/main/java/org/apache/cloudstack/reservation/dao/ReservationDaoImpl.java b/engine/schema/src/main/java/org/apache/cloudstack/reservation/dao/ReservationDaoImpl.java
new file mode 100644
index 00000000000..2b8b74224f3
--- /dev/null
+++ b/engine/schema/src/main/java/org/apache/cloudstack/reservation/dao/ReservationDaoImpl.java
@@ -0,0 +1,75 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package org.apache.cloudstack.reservation.dao;
+
+import com.cloud.configuration.Resource;
+import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import org.apache.cloudstack.reservation.ReservationVO;
+
+import java.util.List;
+
+public class ReservationDaoImpl extends GenericDaoBase<ReservationVO, Long> implements ReservationDao {
+
+    private static final String RESOURCE_TYPE = "resourceType";
+    private static final String ACCOUNT_ID = "accountId";
+    private static final String DOMAIN_ID = "domainId";
+    private final SearchBuilder<ReservationVO> listAccountAndTypeSearch;
+
+    private final SearchBuilder<ReservationVO> listDomainAndTypeSearch;
+
+    public ReservationDaoImpl() {
+        listAccountAndTypeSearch = createSearchBuilder();
+        listAccountAndTypeSearch.and(ACCOUNT_ID, listAccountAndTypeSearch.entity().getAccountId(), SearchCriteria.Op.EQ);
+        listAccountAndTypeSearch.and(RESOURCE_TYPE, listAccountAndTypeSearch.entity().getResourceType(), SearchCriteria.Op.EQ);
+        listAccountAndTypeSearch.done();
+
+        listDomainAndTypeSearch = createSearchBuilder();
+        listDomainAndTypeSearch.and(DOMAIN_ID, listDomainAndTypeSearch.entity().getDomainId(), SearchCriteria.Op.EQ);
+        listDomainAndTypeSearch.and(RESOURCE_TYPE, listDomainAndTypeSearch.entity().getResourceType(), SearchCriteria.Op.EQ);
+        listDomainAndTypeSearch.done();
+    }
+
+    @Override
+    public long getAccountReservation(Long accountId, Resource.ResourceType resourceType) {
+        long total = 0;
+        SearchCriteria<ReservationVO> sc = listAccountAndTypeSearch.create();
+        sc.setParameters(ACCOUNT_ID, accountId);
+        sc.setParameters(RESOURCE_TYPE, resourceType);
+        List<ReservationVO> reservations = listBy(sc);
+        for (ReservationVO reservation : reservations) {
+            total += reservation.getReservedAmount();
+        }
+        return total;
+    }
+
+    @Override
+    public long getDomainReservation(Long domainId, Resource.ResourceType resourceType) {
+        long total = 0;
+        SearchCriteria<ReservationVO> sc = listAccountAndTypeSearch.create();
+        sc.setParameters(DOMAIN_ID, domainId);
+        sc.setParameters(RESOURCE_TYPE, resourceType);
+        List<ReservationVO> reservations = listBy(sc);
+        for (ReservationVO reservation : reservations) {
+            total += reservation.getReservedAmount();
+        }
+        return total;
+    }
+}
diff --git a/engine/schema/src/main/resources/META-INF/cloudstack/core/spring-engine-schema-core-daos-context.xml b/engine/schema/src/main/resources/META-INF/cloudstack/core/spring-engine-schema-core-daos-context.xml
index f13ae6e6803..fcd3be6c92e 100644
--- a/engine/schema/src/main/resources/META-INF/cloudstack/core/spring-engine-schema-core-daos-context.xml
+++ b/engine/schema/src/main/resources/META-INF/cloudstack/core/spring-engine-schema-core-daos-context.xml
@@ -175,6 +175,7 @@
   <bean id="projectJoinDaoImpl" class="com.cloud.api.query.dao.ProjectJoinDaoImpl" />
   <bean id="regionDaoImpl" class="org.apache.cloudstack.region.dao.RegionDaoImpl" />
   <bean id="remoteAccessVpnDaoImpl" class="com.cloud.network.dao.RemoteAccessVpnDaoImpl" />
+  <bean id="reservationDao" class="org.apache.cloudstack.reservation.dao.ReservationDaoImpl" />
   <bean id="resourceCountDaoImpl" class="com.cloud.configuration.dao.ResourceCountDaoImpl" />
   <bean id="resourceIconDaoImpl" class="com.cloud.resource.icon.dao.ResourceIconDaoImpl" />
   <bean id="resourceLimitDaoImpl" class="com.cloud.configuration.dao.ResourceLimitDaoImpl" />
diff --git a/engine/schema/src/main/resources/META-INF/db/schema-41710to41800.sql b/engine/schema/src/main/resources/META-INF/db/schema-41710to41800.sql
index f57b75c7376..f5d06a38117 100644
--- a/engine/schema/src/main/resources/META-INF/db/schema-41710to41800.sql
+++ b/engine/schema/src/main/resources/META-INF/db/schema-41710to41800.sql
@@ -27,6 +27,18 @@ WHERE so.default_use = 1 AND so.vm_type IN ('domainrouter', 'secondarystoragevm'
 ALTER TABLE `cloud`.`load_balancing_rules`
 ADD cidr_list VARCHAR(4096);
 
+-- savely add resources in parallel
+-- PR#5984 Create table to persist VM stats.
+DROP TABLE IF EXISTS `cloud`.`resource_reservation`;
+CREATE TABLE `cloud`.`resource_reservation` (
+  `id` bigint unsigned NOT NULL auto_increment COMMENT 'id',
+  `account_id` bigint unsigned NOT NULL,
+  `domain_id` bigint unsigned NOT NULL,
+  `resource_type` varchar(255) NOT NULL,
+  `amount` bigint unsigned NOT NULL,
+  PRIMARY KEY (`id`)
+  ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
 -- Alter networks table to add ip6dns1 and ip6dns2
 ALTER TABLE `cloud`.`networks`
     ADD COLUMN `ip6dns1` varchar(255) DEFAULT NULL COMMENT 'first IPv6 DNS for the network' AFTER `dns2`,
diff --git a/server/src/main/java/com/cloud/resourcelimit/CheckedReservation.java b/server/src/main/java/com/cloud/resourcelimit/CheckedReservation.java
new file mode 100644
index 00000000000..0075ef1b4f1
--- /dev/null
+++ b/server/src/main/java/com/cloud/resourcelimit/CheckedReservation.java
@@ -0,0 +1,134 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package com.cloud.resourcelimit;
+
+import com.cloud.configuration.Resource.ResourceType;
+import com.cloud.exception.ResourceAllocationException;
+import com.cloud.user.Account;
+import com.cloud.user.ResourceLimitService;
+import com.cloud.utils.db.GlobalLock;
+import org.apache.cloudstack.user.ResourceReservation;
+import com.cloud.utils.exception.CloudRuntimeException;
+import org.apache.cloudstack.reservation.ReservationVO;
+import org.apache.cloudstack.reservation.dao.ReservationDao;
+import org.apache.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
+
+
+public class CheckedReservation  implements AutoCloseable, ResourceReservation {
+    private static final Logger LOG = Logger.getLogger(CheckedReservation.class);
+
+    private static final int TRY_TO_GET_LOCK_TIME = 120;
+    private GlobalLock quotaLimitLock;
+    ReservationDao reservationDao;
+    private final Account account;
+    private final ResourceType resourceType;
+    private Long amount;
+    private ResourceReservation reservation;
+
+    /**
+     * - check if adding a reservation is allowed
+     * - create DB entry for reservation
+     * - hold the id of this record as a ticket for implementation
+     *
+     * @param amount positive number of the resource type to reserve
+     * @throws ResourceAllocationException
+     */
+    public CheckedReservation(Account account, ResourceType resourceType, Long amount, ReservationDao reservationDao, ResourceLimitService resourceLimitService) throws ResourceAllocationException {
+        this.reservationDao = reservationDao;
+        this.account = account;
+        this.resourceType = resourceType;
+        this.amount = amount;
+        this.reservation = null;
+        setGlobalLock(account, resourceType);
+        if (this.amount != null && this.amount <= 0) {
+            if(LOG.isDebugEnabled()){
+                LOG.debug(String.format("not reserving no amount of resources for %s in domain %d, type: %s, %s ", account.getAccountName(), account.getDomainId(), resourceType, amount));
+            }
+            this.amount = null;
+        }
+
+        if (this.amount != null) {
+            if(quotaLimitLock.lock(TRY_TO_GET_LOCK_TIME)) {
+                try {
+                    resourceLimitService.checkResourceLimit(account,resourceType,amount);
+                    ReservationVO reservationVO = new ReservationVO(account.getAccountId(), account.getDomainId(), resourceType, amount);
+                    this.reservation = reservationDao.persist(reservationVO);
+                } catch (NullPointerException npe) {
+                    throw new CloudRuntimeException("not enough means to check limits", npe);
+                } finally {
+                    quotaLimitLock.unlock();
+                }
+            } else {
+                throw new ResourceAllocationException(String.format("unable to acquire resource reservation \"%s\"", quotaLimitLock.getName()), resourceType);
+            }
+        } else {
+            if(LOG.isDebugEnabled()){
+                LOG.debug(String.format("not reserving no amount of resources for %s in domain %d, type: %s ", account.getAccountName(), account.getDomainId(), resourceType));
+            }
+        }
+    }
+
+    @NotNull
+    private void setGlobalLock(Account account, ResourceType resourceType) {
+        String lockName = String.format("CheckedReservation-%s/%d", account.getDomainId(), resourceType.getOrdinal());
+        setQuotaLimitLock(GlobalLock.getInternLock(lockName));
+    }
+
+    protected void setQuotaLimitLock(GlobalLock quotaLimitLock) {
+        this.quotaLimitLock = quotaLimitLock;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (this.reservation != null){
+            reservationDao.remove(reservation.getId());
+            reservation = null;
+        }
+    }
+
+    public Account getAccount() {
+        return account;
+    }
+
+    @Override
+    public Long getAccountId() {
+        return account.getId();
+    }
+
+    @Override
+    public Long getDomainId() {
+        return account.getDomainId();
+    }
+
+    @Override
+    public ResourceType getResourceType() {
+        return resourceType;
+    }
+
+    @Override
+    public Long getReservedAmount() {
+        return amount;
+    }
+
+    @Override
+    public long getId() {
+        return this.reservation.getId();
+    }
+}
diff --git a/server/src/main/java/com/cloud/resourcelimit/ResourceLimitManagerImpl.java b/server/src/main/java/com/cloud/resourcelimit/ResourceLimitManagerImpl.java
index 3afc47f418b..27c44f4df2f 100644
--- a/server/src/main/java/com/cloud/resourcelimit/ResourceLimitManagerImpl.java
+++ b/server/src/main/java/com/cloud/resourcelimit/ResourceLimitManagerImpl.java
@@ -36,10 +36,12 @@ import org.apache.cloudstack.framework.config.ConfigKey;
 import org.apache.cloudstack.framework.config.Configurable;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 import org.apache.cloudstack.managed.context.ManagedContextRunnable;
+import org.apache.cloudstack.reservation.dao.ReservationDao;
 import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
 import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+import org.apache.cloudstack.user.ResourceReservation;
 import org.apache.log4j.Logger;
 import org.springframework.stereotype.Component;
 
@@ -71,7 +73,6 @@ import com.cloud.projects.Project;
 import com.cloud.projects.ProjectAccount.Role;
 import com.cloud.projects.dao.ProjectAccountDao;
 import com.cloud.projects.dao.ProjectDao;
-import com.cloud.service.dao.ServiceOfferingDao;
 import com.cloud.storage.DataStoreRole;
 import com.cloud.storage.SnapshotVO;
 import com.cloud.storage.VMTemplateStorageResourceAssoc.Status;
@@ -113,52 +114,54 @@ import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
 public class ResourceLimitManagerImpl extends ManagerBase implements ResourceLimitService, Configurable {
     public static final Logger s_logger = Logger.getLogger(ResourceLimitManagerImpl.class);
 
-    @Inject
-    private DomainDao _domainDao;
     @Inject
     private AccountManager _accountMgr;
     @Inject
     private AlertManager _alertMgr;
     @Inject
-    private ResourceCountDao _resourceCountDao;
-    @Inject
-    private ResourceLimitDao _resourceLimitDao;
-    @Inject
-    private UserVmDao _userVmDao;
-    @Inject
     private AccountDao _accountDao;
     @Inject
-    protected SnapshotDao _snapshotDao;
+    private ConfigurationDao _configDao;
     @Inject
-    protected VMTemplateDao _vmTemplateDao;
+    private DomainDao _domainDao;
     @Inject
-    private VolumeDao _volumeDao;
+    private EntityManager _entityMgr;
     @Inject
     private IPAddressDao _ipAddressDao;
     @Inject
-    private VMInstanceDao _vmDao;
-    @Inject
-    private ConfigurationDao _configDao;
-    @Inject
-    private EntityManager _entityMgr;
+    private NetworkDao _networkDao;
     @Inject
     private ProjectDao _projectDao;
     @Inject
     private ProjectAccountDao _projectAccountDao;
     @Inject
-    private NetworkDao _networkDao;
+    private ResourceCountDao _resourceCountDao;
     @Inject
-    private VpcDao _vpcDao;
+    private ResourceLimitDao _resourceLimitDao;
     @Inject
-    private ServiceOfferingDao _serviceOfferingDao;
+    private ResourceLimitService resourceLimitService;
     @Inject
-    private TemplateDataStoreDao _vmTemplateStoreDao;
+    private ReservationDao reservationDao;
     @Inject
-    private VlanDao _vlanDao;
+    protected SnapshotDao _snapshotDao;
     @Inject
     private SnapshotDataStoreDao _snapshotDataStoreDao;
     @Inject
+    private TemplateDataStoreDao _vmTemplateStoreDao;
+    @Inject
+    private UserVmDao _userVmDao;
+    @Inject
     private UserVmJoinDao _userVmJoinDao;
+    @Inject
+    private VMInstanceDao _vmDao;
+    @Inject
+    protected VMTemplateDao _vmTemplateDao;
+    @Inject
+    private VolumeDao _volumeDao;
+    @Inject
+    private VpcDao _vpcDao;
+    @Inject
+    private VlanDao _vlanDao;
 
     protected GenericSearchBuilder<TemplateDataStoreVO, SumCount> templateSizeSearch;
     protected GenericSearchBuilder<SnapshotDataStoreVO, SumCount> snapshotSizeSearch;
@@ -428,7 +431,8 @@ public class ResourceLimitManagerImpl extends ManagerBase implements ResourceLim
             if (domainId != Domain.ROOT_DOMAIN) {
                 long domainResourceLimit = findCorrectResourceLimitForDomain(domain, type);
                 long currentDomainResourceCount = _resourceCountDao.getResourceCount(domainId, ResourceOwnerType.Domain, type);
-                long requestedDomainResourceCount = currentDomainResourceCount + numResources;
+                long currentResourceReservation = reservationDao.getDomainReservation(domainId, type);
+                long requestedDomainResourceCount = currentDomainResourceCount + currentResourceReservation + numResources;
                 String messageSuffix = " domain resource limits of Type '" + type + "'" + " for Domain Id = " + domainId + " is exceeded: Domain Resource Limit = " + toHumanReadableSize(domainResourceLimit)
                         + ", Current Domain Resource Amount = " + toHumanReadableSize(currentDomainResourceCount) + ", Requested Resource Amount = " + toHumanReadableSize(numResources) + ".";
 
@@ -451,7 +455,8 @@ public class ResourceLimitManagerImpl extends ManagerBase implements ResourceLim
         // Check account limits
         long accountResourceLimit = findCorrectResourceLimitForAccount(account, type);
         long currentResourceCount = _resourceCountDao.getResourceCount(account.getId(), ResourceOwnerType.Account, type);
-        long requestedResourceCount = currentResourceCount + numResources;
+        long currentResourceReservation = reservationDao.getAccountReservation(account.getId(), type);
+        long requestedResourceCount = currentResourceCount + currentResourceReservation + numResources;
 
         String convertedAccountResourceLimit = String.valueOf(accountResourceLimit);
         String convertedCurrentResourceCount = String.valueOf(currentResourceCount);
@@ -1060,7 +1065,7 @@ public class ResourceLimitManagerImpl extends ManagerBase implements ResourceLim
 
         // 1. If its null assume displayResource = 1
         // 2. If its not null then send true if displayResource = 1
-        return (displayResource == null) || (displayResource != null && displayResource);
+        return ! Boolean.FALSE.equals(displayResource);
     }
 
     @Override
@@ -1103,6 +1108,14 @@ public class ResourceLimitManagerImpl extends ManagerBase implements ResourceLim
         }
     }
 
+    @Override
+    public ResourceReservation getReservation(final Account account, final Boolean displayResource, final Resource.ResourceType type, final Long delta) throws ResourceAllocationException {
+        if (! Boolean.FALSE.equals(displayResource)) {
+            return new CheckedReservation(account, type, delta, reservationDao, resourceLimitService);
+        }
+        throw new CloudRuntimeException("no reservation needed for resources that display as false");
+    }
+
     @Override
     public String getConfigComponentName() {
         return ResourceLimitManagerImpl.class.getName();
@@ -1124,7 +1137,7 @@ public class ResourceLimitManagerImpl extends ManagerBase implements ResourceLim
             List<DomainVO> domains = _domainDao.findImmediateChildrenForParent(Domain.ROOT_DOMAIN);
             List<AccountVO> accounts = _accountDao.findActiveAccountsForDomain(Domain.ROOT_DOMAIN);
 
-            for (ResourceType type : ResourceCount.ResourceType.values()) {
+            for (ResourceType type : ResourceType.values()) {
                 if (type.supportsOwner(ResourceOwnerType.Domain)) {
                     recalculateDomainResourceCount(Domain.ROOT_DOMAIN, type);
                     for (Domain domain : domains) {
diff --git a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
index 608448c8e47..3f0238aa8ff 100644
--- a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
+++ b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
@@ -51,6 +51,7 @@ import javax.naming.ConfigurationException;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.ParserConfigurationException;
 
+import com.cloud.resourcelimit.CheckedReservation;
 import org.apache.cloudstack.acl.ControlledEntity;
 import org.apache.cloudstack.acl.ControlledEntity.ACLType;
 import org.apache.cloudstack.acl.SecurityChecker.AccessType;
@@ -112,6 +113,7 @@ import org.apache.cloudstack.framework.config.Configurable;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 import org.apache.cloudstack.managed.context.ManagedContextRunnable;
 import org.apache.cloudstack.query.QueryService;
+import org.apache.cloudstack.reservation.dao.ReservationDao;
 import org.apache.cloudstack.snapshot.SnapshotHelper;
 import org.apache.cloudstack.storage.command.DeleteCommand;
 import org.apache.cloudstack.storage.command.DettachCommand;
@@ -365,7 +367,7 @@ import com.cloud.vm.snapshot.VMSnapshotManager;
 import com.cloud.vm.snapshot.VMSnapshotVO;
 import com.cloud.vm.snapshot.dao.VMSnapshotDao;
 
-public class UserVmManagerImpl extends ManagerBase implements UserVmManager, VirtualMachineGuru, UserVmService, Configurable {
+public class UserVmManagerImpl extends ManagerBase implements UserVmManager, VirtualMachineGuru, Configurable {
     private static final Logger s_logger = Logger.getLogger(UserVmManagerImpl.class);
 
     /**
@@ -560,6 +562,11 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
     @Autowired
     @Qualifier("networkHelper")
     protected NetworkHelper nwHelper;
+    @Inject
+    ReservationDao reservationDao;
+    @Inject
+    ResourceLimitService resourceLimitService;
+
     @Inject
     private StatsCollector statsCollector;
 
@@ -3877,289 +3884,327 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
             DiskOfferingVO diskOffering = _diskOfferingDao.findById(diskOfferingId);
             volumesSize += verifyAndGetDiskSize(diskOffering, diskSize);
         }
-        if (! VirtualMachineManager.ResourceCountRunningVMsonly.value()) {
-            resourceLimitCheck(owner, isDisplayVm, new Long(offering.getCpu()), new Long(offering.getRamSize()));
-        }
+        UserVm vm = getCheckedUserVmResource(zone, hostName, displayName, owner, diskOfferingId, diskSize, networkList, securityGroupIdList, group, httpmethod, userData, sshKeyPairs, caller, requestedIps, defaultIps, isDisplayVm, keyboard, affinityGroupIdList, customParameters, customId, dhcpOptionMap, datadiskTemplateToDiskOfferringMap, userVmOVFPropertiesMap, dynamicScalingEnabled, vmType, template, hypervisorType, accountId, offering, isIso, rootDiskOfferingId, volumesSize);
 
-        _resourceLimitMgr.checkResourceLimit(owner, ResourceType.volume, (isIso || diskOfferingId == null ? 1 : 2));
-        _resourceLimitMgr.checkResourceLimit(owner, ResourceType.primary_storage, volumesSize);
+        _securityGroupMgr.addInstanceToGroups(vm.getId(), securityGroupIdList);
 
-        // verify security group ids
-        if (securityGroupIdList != null) {
-            for (Long securityGroupId : securityGroupIdList) {
-                SecurityGroup sg = _securityGroupDao.findById(securityGroupId);
-                if (sg == null) {
-                    throw new InvalidParameterValueException("Unable to find security group by id " + securityGroupId);
-                } else {
-                    // verify permissions
-                    _accountMgr.checkAccess(caller, null, true, owner, sg);
-                }
+        if (affinityGroupIdList != null && !affinityGroupIdList.isEmpty()) {
+            _affinityGroupVMMapDao.updateMap(vm.getId(), affinityGroupIdList);
+        }
+
+        CallContext.current().putContextParameter(VirtualMachine.class, vm.getUuid());
+        return vm;
+    }
+    private UserVm getCheckedUserVmResource(DataCenter zone, String hostName, String displayName, Account owner, Long diskOfferingId, Long diskSize, List<NetworkVO> networkList, List<Long> securityGroupIdList, String group, HTTPMethod httpmethod, String userData, List<String> sshKeyPairs, Account caller, Map<Long, IpAddresses> requestedIps, IpAddresses defaultIps, Boolean isDisplayVm, String keyboard, List<Long> affinityGroupIdList, Map<String, String> customParameters, String customId,  [...]
+        if (!VirtualMachineManager.ResourceCountRunningVMsonly.value()) {
+            try (CheckedReservation vmReservation = new CheckedReservation(owner, ResourceType.user_vm, 1l, reservationDao, resourceLimitService);
+                 CheckedReservation cpuReservation = new CheckedReservation(owner, ResourceType.cpu, Long.valueOf(offering.getCpu()), reservationDao, resourceLimitService);
+                 CheckedReservation memReservation = new CheckedReservation(owner, ResourceType.memory, Long.valueOf(offering.getRamSize()), reservationDao, resourceLimitService);
+            ) {
+                return getUncheckedUserVmResource(zone, hostName, displayName, owner, diskOfferingId, diskSize, networkList, securityGroupIdList, group, httpmethod, userData, sshKeyPairs, caller, requestedIps, defaultIps, isDisplayVm, keyboard, affinityGroupIdList, customParameters, customId, dhcpOptionMap, datadiskTemplateToDiskOfferringMap, userVmOVFPropertiesMap, dynamicScalingEnabled, vmType, template, hypervisorType, accountId, offering, isIso, rootDiskOfferingId, volumesSize);
+            } catch (ResourceAllocationException | CloudRuntimeException  e) {
+                throw e;
+            } catch (Exception e) {
+                s_logger.error("error during resource reservation and allocation", e);
+                throw new CloudRuntimeException(e);
             }
+
+        } else {
+            return getUncheckedUserVmResource(zone, hostName, displayName, owner, diskOfferingId, diskSize, networkList, securityGroupIdList, group, httpmethod, userData, sshKeyPairs, caller, requestedIps, defaultIps, isDisplayVm, keyboard, affinityGroupIdList, customParameters, customId, dhcpOptionMap, datadiskTemplateToDiskOfferringMap, userVmOVFPropertiesMap, dynamicScalingEnabled, vmType, template, hypervisorType, accountId, offering, isIso, rootDiskOfferingId, volumesSize);
         }
+    }
 
-        if (datadiskTemplateToDiskOfferringMap != null && !datadiskTemplateToDiskOfferringMap.isEmpty()) {
-            for (Entry<Long, DiskOffering> datadiskTemplateToDiskOffering : datadiskTemplateToDiskOfferringMap.entrySet()) {
-                VMTemplateVO dataDiskTemplate = _templateDao.findById(datadiskTemplateToDiskOffering.getKey());
-                DiskOffering dataDiskOffering = datadiskTemplateToDiskOffering.getValue();
+    private UserVm getUncheckedUserVmResource(DataCenter zone, String hostName, String displayName, Account owner, Long diskOfferingId, Long diskSize, List<NetworkVO> networkList, List<Long> securityGroupIdList, String group, HTTPMethod httpmethod, String userData, List<String> sshKeyPairs, Account caller, Map<Long, IpAddresses> requestedIps, IpAddresses defaultIps, Boolean isDisplayVm, String keyboard, List<Long> affinityGroupIdList, Map<String, String> customParameters, String customId [...]
+        try (CheckedReservation volumeReservation = new CheckedReservation(owner, ResourceType.volume, (isIso || diskOfferingId == null ? 1l : 2), reservationDao, resourceLimitService);
+             CheckedReservation primaryStorageReservation = new CheckedReservation(owner, ResourceType.primary_storage, volumesSize, reservationDao, resourceLimitService)) {
 
-                if (dataDiskTemplate == null
-                        || (!dataDiskTemplate.getTemplateType().equals(TemplateType.DATADISK)) && (dataDiskTemplate.getState().equals(VirtualMachineTemplate.State.Active))) {
-                    throw new InvalidParameterValueException("Invalid template id specified for Datadisk template" + datadiskTemplateToDiskOffering.getKey());
-                }
-                long dataDiskTemplateId = datadiskTemplateToDiskOffering.getKey();
-                if (!dataDiskTemplate.getParentTemplateId().equals(template.getId())) {
-                    throw new InvalidParameterValueException("Invalid Datadisk template. Specified Datadisk template" + dataDiskTemplateId
-                            + " doesn't belong to template " + template.getId());
-                }
-                if (dataDiskOffering == null) {
-                    throw new InvalidParameterValueException("Invalid disk offering id " + datadiskTemplateToDiskOffering.getValue().getId() +
-                            " specified for datadisk template " + dataDiskTemplateId);
-                }
-                if (dataDiskOffering.isCustomized()) {
-                    throw new InvalidParameterValueException("Invalid disk offering id " + dataDiskOffering.getId() + " specified for datadisk template " +
-                            dataDiskTemplateId + ". Custom Disk offerings are not supported for Datadisk templates");
+            // verify security group ids
+            if (securityGroupIdList != null) {
+                for (Long securityGroupId : securityGroupIdList) {
+                    SecurityGroup sg = _securityGroupDao.findById(securityGroupId);
+                    if (sg == null) {
+                        throw new InvalidParameterValueException("Unable to find security group by id " + securityGroupId);
+                    } else {
+                        // verify permissions
+                        _accountMgr.checkAccess(caller, null, true, owner, sg);
+                    }
                 }
-                if (dataDiskOffering.getDiskSize() < dataDiskTemplate.getSize()) {
-                    throw new InvalidParameterValueException("Invalid disk offering id " + dataDiskOffering.getId() + " specified for datadisk template " +
-                            dataDiskTemplateId + ". Disk offering size should be greater than or equal to the template size");
+            }
+
+            if (datadiskTemplateToDiskOfferringMap != null && !datadiskTemplateToDiskOfferringMap.isEmpty()) {
+                for (Entry<Long, DiskOffering> datadiskTemplateToDiskOffering : datadiskTemplateToDiskOfferringMap.entrySet()) {
+                    VMTemplateVO dataDiskTemplate = _templateDao.findById(datadiskTemplateToDiskOffering.getKey());
+                    DiskOffering dataDiskOffering = datadiskTemplateToDiskOffering.getValue();
+
+                    if (dataDiskTemplate == null
+                            || (!dataDiskTemplate.getTemplateType().equals(TemplateType.DATADISK)) && (dataDiskTemplate.getState().equals(VirtualMachineTemplate.State.Active))) {
+                        throw new InvalidParameterValueException("Invalid template id specified for Datadisk template" + datadiskTemplateToDiskOffering.getKey());
+                    }
+                    long dataDiskTemplateId = datadiskTemplateToDiskOffering.getKey();
+                    if (!dataDiskTemplate.getParentTemplateId().equals(template.getId())) {
+                        throw new InvalidParameterValueException("Invalid Datadisk template. Specified Datadisk template" + dataDiskTemplateId
+                                + " doesn't belong to template " + template.getId());
+                    }
+                    if (dataDiskOffering == null) {
+                        throw new InvalidParameterValueException("Invalid disk offering id " + datadiskTemplateToDiskOffering.getValue().getId() +
+                                " specified for datadisk template " + dataDiskTemplateId);
+                    }
+                    if (dataDiskOffering.isCustomized()) {
+                        throw new InvalidParameterValueException("Invalid disk offering id " + dataDiskOffering.getId() + " specified for datadisk template " +
+                                dataDiskTemplateId + ". Custom Disk offerings are not supported for Datadisk templates");
+                    }
+                    if (dataDiskOffering.getDiskSize() < dataDiskTemplate.getSize()) {
+                        throw new InvalidParameterValueException("Invalid disk offering id " + dataDiskOffering.getId() + " specified for datadisk template " +
+                                dataDiskTemplateId + ". Disk offering size should be greater than or equal to the template size");
+                    }
+                    _templateDao.loadDetails(dataDiskTemplate);
+                    _resourceLimitMgr.checkResourceLimit(owner, ResourceType.volume, 1);
+                    _resourceLimitMgr.checkResourceLimit(owner, ResourceType.primary_storage, dataDiskOffering.getDiskSize());
                 }
-                _templateDao.loadDetails(dataDiskTemplate);
-                _resourceLimitMgr.checkResourceLimit(owner, ResourceType.volume, 1);
-                _resourceLimitMgr.checkResourceLimit(owner, ResourceType.primary_storage, dataDiskOffering.getDiskSize());
             }
-        }
 
-        // check that the affinity groups exist
-        if (affinityGroupIdList != null) {
-            for (Long affinityGroupId : affinityGroupIdList) {
-                AffinityGroupVO ag = _affinityGroupDao.findById(affinityGroupId);
-                if (ag == null) {
-                    throw new InvalidParameterValueException("Unable to find affinity group " + ag);
-                } else if (!_affinityGroupService.isAffinityGroupProcessorAvailable(ag.getType())) {
-                    throw new InvalidParameterValueException("Affinity group type is not supported for group: " + ag + " ,type: " + ag.getType()
-                    + " , Please try again after removing the affinity group");
-                } else {
-                    // verify permissions
-                    if (ag.getAclType() == ACLType.Domain) {
-                        _accountMgr.checkAccess(caller, null, false, owner, ag);
-                        // Root admin has access to both VM and AG by default,
-                        // but
-                        // make sure the owner of these entities is same
-                        if (caller.getId() == Account.ACCOUNT_ID_SYSTEM || _accountMgr.isRootAdmin(caller.getId())) {
-                            if (!_affinityGroupService.isAffinityGroupAvailableInDomain(ag.getId(), owner.getDomainId())) {
-                                throw new PermissionDeniedException("Affinity Group " + ag + " does not belong to the VM's domain");
-                            }
-                        }
+            // check that the affinity groups exist
+            if (affinityGroupIdList != null) {
+                for (Long affinityGroupId : affinityGroupIdList) {
+                    AffinityGroupVO ag = _affinityGroupDao.findById(affinityGroupId);
+                    if (ag == null) {
+                        throw new InvalidParameterValueException("Unable to find affinity group " + ag);
+                    } else if (!_affinityGroupService.isAffinityGroupProcessorAvailable(ag.getType())) {
+                        throw new InvalidParameterValueException("Affinity group type is not supported for group: " + ag + " ,type: " + ag.getType()
+                                + " , Please try again after removing the affinity group");
                     } else {
-                        _accountMgr.checkAccess(caller, null, true, owner, ag);
-                        // Root admin has access to both VM and AG by default,
-                        // but
-                        // make sure the owner of these entities is same
-                        if (caller.getId() == Account.ACCOUNT_ID_SYSTEM || _accountMgr.isRootAdmin(caller.getId())) {
-                            if (ag.getAccountId() != owner.getAccountId()) {
-                                throw new PermissionDeniedException("Affinity Group " + ag + " does not belong to the VM's account");
+                        // verify permissions
+                        if (ag.getAclType() == ACLType.Domain) {
+                            _accountMgr.checkAccess(caller, null, false, owner, ag);
+                            // Root admin has access to both VM and AG by default,
+                            // but
+                            // make sure the owner of these entities is same
+                            if (caller.getId() == Account.ACCOUNT_ID_SYSTEM || _accountMgr.isRootAdmin(caller.getId())) {
+                                if (!_affinityGroupService.isAffinityGroupAvailableInDomain(ag.getId(), owner.getDomainId())) {
+                                    throw new PermissionDeniedException("Affinity Group " + ag + " does not belong to the VM's domain");
+                                }
+                            }
+                        } else {
+                            _accountMgr.checkAccess(caller, null, true, owner, ag);
+                            // Root admin has access to both VM and AG by default,
+                            // but
+                            // make sure the owner of these entities is same
+                            if (caller.getId() == Account.ACCOUNT_ID_SYSTEM || _accountMgr.isRootAdmin(caller.getId())) {
+                                if (ag.getAccountId() != owner.getAccountId()) {
+                                    throw new PermissionDeniedException("Affinity Group " + ag + " does not belong to the VM's account");
+                                }
                             }
                         }
                     }
                 }
             }
-        }
 
-        if (hypervisorType != HypervisorType.BareMetal) {
-            // check if we have available pools for vm deployment
-            long availablePools = _storagePoolDao.countPoolsByStatus(StoragePoolStatus.Up);
-            if (availablePools < 1) {
-                throw new StorageUnavailableException("There are no available pools in the UP state for vm deployment", -1);
+            if (hypervisorType != HypervisorType.BareMetal) {
+                // check if we have available pools for vm deployment
+                long availablePools = _storagePoolDao.countPoolsByStatus(StoragePoolStatus.Up);
+                if (availablePools < 1) {
+                    throw new StorageUnavailableException("There are no available pools in the UP state for vm deployment", -1);
+                }
             }
-        }
 
-        if (template.getTemplateType().equals(TemplateType.SYSTEM) && !CKS_NODE.equals(vmType)) {
-            throw new InvalidParameterValueException("Unable to use system template " + template.getId() + " to deploy a user vm");
-        }
-        List<VMTemplateZoneVO> listZoneTemplate = _templateZoneDao.listByZoneTemplate(zone.getId(), template.getId());
-        if (listZoneTemplate == null || listZoneTemplate.isEmpty()) {
-            throw new InvalidParameterValueException("The template " + template.getId() + " is not available for use");
-        }
+            if (template.getTemplateType().equals(TemplateType.SYSTEM) && !CKS_NODE.equals(vmType)) {
+                throw new InvalidParameterValueException("Unable to use system template " + template.getId() + " to deploy a user vm");
+            }
+            List<VMTemplateZoneVO> listZoneTemplate = _templateZoneDao.listByZoneTemplate(zone.getId(), template.getId());
+            if (listZoneTemplate == null || listZoneTemplate.isEmpty()) {
+                throw new InvalidParameterValueException("The template " + template.getId() + " is not available for use");
+            }
 
-        if (isIso && !template.isBootable()) {
-            throw new InvalidParameterValueException("Installing from ISO requires an ISO that is bootable: " + template.getId());
-        }
+            if (isIso && !template.isBootable()) {
+                throw new InvalidParameterValueException("Installing from ISO requires an ISO that is bootable: " + template.getId());
+            }
 
-        // Check templates permissions
-        _accountMgr.checkAccess(owner, AccessType.UseEntry, false, template);
+            // Check templates permissions
+            _accountMgr.checkAccess(owner, AccessType.UseEntry, false, template);
 
-        // check if the user data is correct
-        userData = validateUserData(userData, httpmethod);
+            // check if the user data is correct
+            userData = validateUserData(userData, httpmethod);
 
-        // Find an SSH public key corresponding to the key pair name, if one is
-        // given
-        String sshPublicKeys = "";
-        String keypairnames = "";
-        if (!sshKeyPairs.isEmpty()) {
-            List<SSHKeyPairVO> pairs = _sshKeyPairDao.findByNames(owner.getAccountId(), owner.getDomainId(), sshKeyPairs);
-            if (pairs == null || pairs.size() != sshKeyPairs.size()) {
-                throw new InvalidParameterValueException("Not all specified keyparis exist");
+            // Find an SSH public key corresponding to the key pair name, if one is
+            // given
+            String sshPublicKeys = "";
+            String keypairnames = "";
+            if (!sshKeyPairs.isEmpty()) {
+                List<SSHKeyPairVO> pairs = _sshKeyPairDao.findByNames(owner.getAccountId(), owner.getDomainId(), sshKeyPairs);
+                if (pairs == null || pairs.size() != sshKeyPairs.size()) {
+                    throw new InvalidParameterValueException("Not all specified keyparis exist");
+                }
+
+                sshPublicKeys = pairs.stream().map(p -> p.getPublicKey()).collect(Collectors.joining("\n"));
+                keypairnames = String.join(",", sshKeyPairs);
             }
 
-            sshPublicKeys = pairs.stream().map(p -> p.getPublicKey()).collect(Collectors.joining("\n"));
-            keypairnames = String.join(",", sshKeyPairs);
-        }
+            LinkedHashMap<String, List<NicProfile>> networkNicMap = new LinkedHashMap<>();
+
+            short defaultNetworkNumber = 0;
+            boolean securityGroupEnabled = false;
+            int networkIndex = 0;
+            for (NetworkVO network : networkList) {
+                if ((network.getDataCenterId() != zone.getId())) {
+                    if (!network.isStrechedL2Network()) {
+                        throw new InvalidParameterValueException("Network id=" + network.getId() +
+                                " doesn't belong to zone " + zone.getId());
+                    }
 
-        LinkedHashMap<String, List<NicProfile>> networkNicMap = new LinkedHashMap<>();
+                    NetworkOffering ntwkOffering = _networkOfferingDao.findById(network.getNetworkOfferingId());
+                    Long physicalNetworkId = _networkModel.findPhysicalNetworkId(zone.getId(), ntwkOffering.getTags(), ntwkOffering.getTrafficType());
 
-        short defaultNetworkNumber = 0;
-        boolean securityGroupEnabled = false;
-        int networkIndex = 0;
-        for (NetworkVO network : networkList) {
-            if ((network.getDataCenterId() != zone.getId())) {
-                if (!network.isStrechedL2Network()) {
-                    throw new InvalidParameterValueException("Network id=" + network.getId() +
-                            " doesn't belong to zone " + zone.getId());
+                    String provider = _ntwkSrvcDao.getProviderForServiceInNetwork(network.getId(), Service.Connectivity);
+                    if (!_networkModel.isProviderEnabledInPhysicalNetwork(physicalNetworkId, provider)) {
+                        throw new InvalidParameterValueException("Network in which is VM getting deployed could not be" +
+                                " streched to the zone, as we could not find a valid physical network");
+                    }
                 }
 
-                NetworkOffering ntwkOffering = _networkOfferingDao.findById(network.getNetworkOfferingId());
-                Long physicalNetworkId = _networkModel.findPhysicalNetworkId(zone.getId(), ntwkOffering.getTags(), ntwkOffering.getTrafficType());
+                _accountMgr.checkAccess(owner, AccessType.UseEntry, false, network);
 
-                String provider = _ntwkSrvcDao.getProviderForServiceInNetwork(network.getId(), Service.Connectivity);
-                if (!_networkModel.isProviderEnabledInPhysicalNetwork(physicalNetworkId, provider)) {
-                    throw new InvalidParameterValueException("Network in which is VM getting deployed could not be" +
-                            " streched to the zone, as we could not find a valid physical network");
+                IpAddresses requestedIpPair = null;
+                if (requestedIps != null && !requestedIps.isEmpty()) {
+                    requestedIpPair = requestedIps.get(network.getId());
                 }
-            }
-
-            _accountMgr.checkAccess(owner, AccessType.UseEntry, false, network);
 
-            IpAddresses requestedIpPair = null;
-            if (requestedIps != null && !requestedIps.isEmpty()) {
-                requestedIpPair = requestedIps.get(network.getId());
-            }
+                if (requestedIpPair == null) {
+                    requestedIpPair = new IpAddresses(null, null);
+                } else {
+                    _networkModel.checkRequestedIpAddresses(network.getId(), requestedIpPair);
+                }
+
+                NicProfile profile = new NicProfile(requestedIpPair.getIp4Address(), requestedIpPair.getIp6Address(), requestedIpPair.getMacAddress());
+                profile.setOrderIndex(networkIndex);
+                if (defaultNetworkNumber == 0) {
+                    defaultNetworkNumber++;
+                    // if user requested specific ip for default network, add it
+                    if (defaultIps.getIp4Address() != null || defaultIps.getIp6Address() != null) {
+                        _networkModel.checkRequestedIpAddresses(network.getId(), defaultIps);
+                        profile = new NicProfile(defaultIps.getIp4Address(), defaultIps.getIp6Address());
+                    } else if (defaultIps.getMacAddress() != null) {
+                        profile = new NicProfile(null, null, defaultIps.getMacAddress());
+                    }
 
-            if (requestedIpPair == null) {
-                requestedIpPair = new IpAddresses(null, null);
-            } else {
-                _networkModel.checkRequestedIpAddresses(network.getId(), requestedIpPair);
-            }
+                    profile.setDefaultNic(true);
+                    if (!_networkModel.areServicesSupportedInNetwork(network.getId(), new Service[]{Service.UserData})) {
+                        if ((userData != null) && (!userData.isEmpty())) {
+                            throw new InvalidParameterValueException(String.format("Unable to deploy VM as UserData is provided while deploying the VM, but there is no support for %s service in the default network %s/%s.", Service.UserData.getName(), network.getName(), network.getUuid()));
+                        }
 
-            NicProfile profile = new NicProfile(requestedIpPair.getIp4Address(), requestedIpPair.getIp6Address(), requestedIpPair.getMacAddress());
-            profile.setOrderIndex(networkIndex);
-            if (defaultNetworkNumber == 0) {
-                defaultNetworkNumber++;
-                // if user requested specific ip for default network, add it
-                if (defaultIps.getIp4Address() != null || defaultIps.getIp6Address() != null) {
-                    _networkModel.checkRequestedIpAddresses(network.getId(), defaultIps);
-                    profile = new NicProfile(defaultIps.getIp4Address(), defaultIps.getIp6Address());
-                } else if (defaultIps.getMacAddress() != null) {
-                    profile = new NicProfile(null, null, defaultIps.getMacAddress());
-                }
-
-                profile.setDefaultNic(true);
-                if (!_networkModel.areServicesSupportedInNetwork(network.getId(), new Service[]{Service.UserData})) {
-                    if ((userData != null) && (!userData.isEmpty())) {
-                        throw new InvalidParameterValueException("Unable to deploy VM as UserData is provided while deploying the VM, but there is no support for " + Network.Service.UserData.getName() + " service in the default network " + network.getId());
-                    }
+                        if ((sshPublicKeys != null) && (!sshPublicKeys.isEmpty())) {
+                            throw new InvalidParameterValueException(String.format("Unable to deploy VM as SSH keypair is provided while deploying the VM, but there is no support for %s service in the default network %s/%s", Service.UserData.getName(), network.getName(), network.getUuid()));
+                        }
 
-                    if ((sshPublicKeys != null) && (!sshPublicKeys.isEmpty())) {
-                        throw new InvalidParameterValueException("Unable to deploy VM as SSH keypair is provided while deploying the VM, but there is no support for " + Network.Service.UserData.getName() + " service in the default network " + network.getId());
+                        if (template.isEnablePassword()) {
+                            throw new InvalidParameterValueException(String.format("Unable to deploy VM as template %s is password enabled, but there is no support for %s service in the default network %s/%s", template.getId(), Service.UserData.getName(), network.getName(), network.getUuid()));
+                        }
                     }
+                }
 
-                    if (template.isEnablePassword()) {
-                        throw new InvalidParameterValueException("Unable to deploy VM as template " + template.getId() + " is password enabled, but there is no support for " + Network.Service.UserData.getName() + " service in the default network " + network.getId());
-                    }
+                if (_networkModel.isSecurityGroupSupportedInNetwork(network)) {
+                    securityGroupEnabled = true;
                 }
+                List<NicProfile> profiles = networkNicMap.get(network.getUuid());
+                if (CollectionUtils.isEmpty(profiles)) {
+                    profiles = new ArrayList<>();
+                }
+                profiles.add(profile);
+                networkNicMap.put(network.getUuid(), profiles);
+                networkIndex++;
             }
 
-            if (_networkModel.isSecurityGroupSupportedInNetwork(network)) {
-                securityGroupEnabled = true;
-            }
-            List<NicProfile> profiles = networkNicMap.get(network.getUuid());
-            if (CollectionUtils.isEmpty(profiles)) {
-                profiles = new ArrayList<>();
+            if (securityGroupIdList != null && !securityGroupIdList.isEmpty() && !securityGroupEnabled) {
+                throw new InvalidParameterValueException("Unable to deploy vm with security groups as SecurityGroup service is not enabled for the vm's network");
             }
-            profiles.add(profile);
-            networkNicMap.put(network.getUuid(), profiles);
-            networkIndex++;
-        }
 
-        if (securityGroupIdList != null && !securityGroupIdList.isEmpty() && !securityGroupEnabled) {
-            throw new InvalidParameterValueException("Unable to deploy vm with security groups as SecurityGroup service is not enabled for the vm's network");
-        }
-
-        // Verify network information - network default network has to be set;
-        // and vm can't have more than one default network
-        // This is a part of business logic because default network is required
-        // by Agent Manager in order to configure default
-        // gateway for the vm
-        if (defaultNetworkNumber == 0) {
-            throw new InvalidParameterValueException("At least 1 default network has to be specified for the vm");
-        } else if (defaultNetworkNumber > 1) {
-            throw new InvalidParameterValueException("Only 1 default network per vm is supported");
-        }
+            // Verify network information - network default network has to be set;
+            // and vm can't have more than one default network
+            // This is a part of business logic because default network is required
+            // by Agent Manager in order to configure default
+            // gateway for the vm
+            if (defaultNetworkNumber == 0) {
+                throw new InvalidParameterValueException("At least 1 default network has to be specified for the vm");
+            } else if (defaultNetworkNumber > 1) {
+                throw new InvalidParameterValueException("Only 1 default network per vm is supported");
+            }
 
-        long id = _vmDao.getNextInSequence(Long.class, "id");
+            long id = _vmDao.getNextInSequence(Long.class, "id");
 
-        if (hostName != null) {
-            // Check is hostName is RFC compliant
-            checkNameForRFCCompliance(hostName);
-        }
-
-        String instanceName = null;
-        String instanceSuffix = _instance;
-        String uuidName = _uuidMgr.generateUuid(UserVm.class, customId);
-        if (_instanceNameFlag && HypervisorType.VMware.equals(hypervisorType)) {
-            if (StringUtils.isNotEmpty(hostName)) {
-                instanceSuffix = hostName;
+            if (hostName != null) {
+                // Check is hostName is RFC compliant
+                checkNameForRFCCompliance(hostName);
             }
-            if (hostName == null) {
-                if (displayName != null) {
-                    hostName = displayName;
-                } else {
+
+            String instanceName = null;
+            String instanceSuffix = _instance;
+            String uuidName = _uuidMgr.generateUuid(UserVm.class, customId);
+            if (_instanceNameFlag && HypervisorType.VMware.equals(hypervisorType)) {
+                if (StringUtils.isNotEmpty(hostName)) {
+                    instanceSuffix = hostName;
+                }
+                if (hostName == null) {
+                    if (displayName != null) {
+                        hostName = displayName;
+                    } else {
+                        hostName = generateHostName(uuidName);
+                    }
+                }
+                // If global config vm.instancename.flag is set to true, then CS will set guest VM's name as it appears on the hypervisor, to its hostname.
+                // In case of VMware since VM name must be unique within a DC, check if VM with the same hostname already exists in the zone.
+                VMInstanceVO vmByHostName = _vmInstanceDao.findVMByHostNameInZone(hostName, zone.getId());
+                if (vmByHostName != null && vmByHostName.getState() != State.Expunging) {
+                    throw new InvalidParameterValueException("There already exists a VM by the name: " + hostName + ".");
+                }
+            } else {
+                if (hostName == null) {
+                    //Generate name using uuid and instance.name global config
                     hostName = generateHostName(uuidName);
                 }
             }
-            // If global config vm.instancename.flag is set to true, then CS will set guest VM's name as it appears on the hypervisor, to its hostname.
-            // In case of VMware since VM name must be unique within a DC, check if VM with the same hostname already exists in the zone.
-            VMInstanceVO vmByHostName = _vmInstanceDao.findVMByHostNameInZone(hostName, zone.getId());
-            if (vmByHostName != null && vmByHostName.getState() != VirtualMachine.State.Expunging) {
-                throw new InvalidParameterValueException("There already exists a VM by the name: " + hostName + ".");
+
+            if (hostName != null) {
+                // Check is hostName is RFC compliant
+                checkNameForRFCCompliance(hostName);
             }
-        } else {
-            if (hostName == null) {
-                //Generate name using uuid and instance.name global config
-                hostName = generateHostName(uuidName);
+            instanceName = VirtualMachineName.getVmName(id, owner.getId(), instanceSuffix);
+            if (_instanceNameFlag && HypervisorType.VMware.equals(hypervisorType) && !instanceSuffix.equals(_instance)) {
+                customParameters.put(VmDetailConstants.NAME_ON_HYPERVISOR, instanceName);
             }
-        }
 
-        if (hostName != null) {
-            // Check is hostName is RFC compliant
-            checkNameForRFCCompliance(hostName);
-        }
-        instanceName = VirtualMachineName.getVmName(id, owner.getId(), instanceSuffix);
-        if (_instanceNameFlag && HypervisorType.VMware.equals(hypervisorType) && !instanceSuffix.equals(_instance)) {
-            customParameters.put(VmDetailConstants.NAME_ON_HYPERVISOR, instanceName);
-        }
-
-        // Check if VM with instanceName already exists.
-        VMInstanceVO vmObj = _vmInstanceDao.findVMByInstanceName(instanceName);
-        if (vmObj != null && vmObj.getState() != VirtualMachine.State.Expunging) {
-            throw new InvalidParameterValueException("There already exists a VM by the display name supplied");
-        }
+            // Check if VM with instanceName already exists.
+            VMInstanceVO vmObj = _vmInstanceDao.findVMByInstanceName(instanceName);
+            if (vmObj != null && vmObj.getState() != State.Expunging) {
+                throw new InvalidParameterValueException("There already exists a VM by the display name supplied");
+            }
 
-        checkIfHostNameUniqueInNtwkDomain(hostName, networkList);
+            checkIfHostNameUniqueInNtwkDomain(hostName, networkList);
 
-        long userId = CallContext.current().getCallingUserId();
-        if (CallContext.current().getCallingAccount().getId() != owner.getId()) {
-            List<UserVO> userVOs = _userDao.listByAccount(owner.getAccountId());
-            if (!userVOs.isEmpty()) {
-                userId =  userVOs.get(0).getId();
+            long userId = CallContext.current().getCallingUserId();
+            if (CallContext.current().getCallingAccount().getId() != owner.getId()) {
+                List<UserVO> userVOs = _userDao.listByAccount(owner.getAccountId());
+                if (!userVOs.isEmpty()) {
+                    userId = userVOs.get(0).getId();
+                }
             }
-        }
 
-        dynamicScalingEnabled = dynamicScalingEnabled && checkIfDynamicScalingCanBeEnabled(null, offering, template, zone.getId());
+            dynamicScalingEnabled = dynamicScalingEnabled && checkIfDynamicScalingCanBeEnabled(null, offering, template, zone.getId());
 
-        UserVmVO vm = commitUserVm(zone, template, hostName, displayName, owner, diskOfferingId, diskSize, userData, caller, isDisplayVm, keyboard, accountId, userId, offering,
-                isIso, sshPublicKeys, networkNicMap, id, instanceName, uuidName, hypervisorType, customParameters, dhcpOptionMap,
-                datadiskTemplateToDiskOfferringMap, userVmOVFPropertiesMap, dynamicScalingEnabled, vmType, rootDiskOfferingId, keypairnames);
+            UserVmVO vm = commitUserVm(zone, template, hostName, displayName, owner, diskOfferingId, diskSize, userData, caller, isDisplayVm, keyboard, accountId, userId, offering,
+                    isIso, sshPublicKeys, networkNicMap, id, instanceName, uuidName, hypervisorType, customParameters, dhcpOptionMap,
+                    datadiskTemplateToDiskOfferringMap, userVmOVFPropertiesMap, dynamicScalingEnabled, vmType, rootDiskOfferingId, keypairnames);
 
+            assignInstanceToGroup(group, id);
+            return vm;
+        } catch (ResourceAllocationException | CloudRuntimeException  e) {
+            throw e;
+        } catch (Exception e) {
+            s_logger.error("error during resource reservation and allocation", e);
+            throw new CloudRuntimeException(e);
+        }
+    }
+
+    private void assignInstanceToGroup(String group, long id) {
         // Assign instance to the group
         try {
             if (group != null) {
@@ -4171,15 +4216,6 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
         } catch (Exception ex) {
             throw new CloudRuntimeException("Unable to assign Vm to the group " + group);
         }
-
-        _securityGroupMgr.addInstanceToGroups(vm.getId(), securityGroupIdList);
-
-        if (affinityGroupIdList != null && !affinityGroupIdList.isEmpty()) {
-            _affinityGroupVMMapDao.updateMap(vm.getId(), affinityGroupIdList);
-        }
-
-        CallContext.current().putContextParameter(VirtualMachine.class, vm.getUuid());
-        return vm;
     }
 
     private long verifyAndGetDiskSize(DiskOfferingVO diskOffering, Long diskSize) {
diff --git a/server/src/test/java/com/cloud/resourcelimit/CheckedReservationTest.java b/server/src/test/java/com/cloud/resourcelimit/CheckedReservationTest.java
new file mode 100644
index 00000000000..a5ec65e913c
--- /dev/null
+++ b/server/src/test/java/com/cloud/resourcelimit/CheckedReservationTest.java
@@ -0,0 +1,102 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package com.cloud.resourcelimit;
+
+import com.cloud.configuration.Resource;
+import com.cloud.exception.ResourceAllocationException;
+import com.cloud.user.Account;
+import com.cloud.user.ResourceLimitService;
+import com.cloud.utils.db.GlobalLock;
+import com.cloud.utils.exception.CloudRuntimeException;
+import org.apache.cloudstack.reservation.ReservationVO;
+import org.apache.cloudstack.reservation.dao.ReservationDao;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+@PrepareForTest(CheckedReservation.class)
+public class CheckedReservationTest {
+
+    @Mock
+    Account account;
+    @Mock
+    ReservationDao reservationDao;
+    @Mock
+    ResourceLimitService resourceLimitService;
+
+    @Mock
+    ReservationVO reservation = new ReservationVO(1l, 1l, Resource.ResourceType.user_vm, 1l);
+
+    @Mock
+    GlobalLock quotaLimitLock;
+
+    @Before
+    public void setup() {
+        initMocks(this);
+        when(reservation.getId()).thenReturn(1l);
+    }
+
+    @Test
+    public void getId() {
+        when(reservationDao.persist(any())).thenReturn(reservation);
+        when(account.getAccountId()).thenReturn(1l);
+        when(account.getDomainId()).thenReturn(4l);
+        when(quotaLimitLock.lock(anyInt())).thenReturn(true);
+        boolean fail = false;
+        try (CheckedReservation cr = new CheckedReservation(account, Resource.ResourceType.user_vm,1l, reservationDao, resourceLimitService); ) {
+            long id = cr.getId();
+            assertEquals(1l, id);
+        } catch (NullPointerException npe) {
+            fail("NPE caught");
+        } catch (ResourceAllocationException rae) {
+            // this does not work on all plafroms because of the static methods being used in the global lock mechanism
+            // normally one would
+            // throw new CloudRuntimeException(rae);
+            // but we'll ignore this for platforms that can not humour the static bits of the system.
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void getNoAmount() {
+        when(reservationDao.persist(any())).thenReturn(reservation);
+        when(account.getAccountId()).thenReturn(1l);
+        boolean fail = false;
+        try (CheckedReservation cr = new CheckedReservation(account, Resource.ResourceType.cpu,-11l, reservationDao, resourceLimitService); ) {
+            Long amount = cr.getReservedAmount();
+            assertNull(amount);
+        } catch (NullPointerException npe) {
+            fail("NPE caught");
+        } catch (ResourceAllocationException rae) {
+            throw new CloudRuntimeException(rae);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/server/src/test/java/com/cloud/vpc/MockResourceLimitManagerImpl.java b/server/src/test/java/com/cloud/vpc/MockResourceLimitManagerImpl.java
index 8ad08eef653..1d1da5331d9 100644
--- a/server/src/test/java/com/cloud/vpc/MockResourceLimitManagerImpl.java
+++ b/server/src/test/java/com/cloud/vpc/MockResourceLimitManagerImpl.java
@@ -21,6 +21,8 @@ import java.util.Map;
 
 import javax.naming.ConfigurationException;
 
+import com.cloud.utils.exception.CloudRuntimeException;
+import org.apache.cloudstack.user.ResourceReservation;
 import org.springframework.stereotype.Component;
 
 import com.cloud.configuration.Resource.ResourceType;
@@ -171,6 +173,11 @@ public class MockResourceLimitManagerImpl extends ManagerBase implements Resourc
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    @Override
+    public ResourceReservation getReservation(Account account, Boolean displayResource, ResourceType type, Long delta) {
+        throw new CloudRuntimeException("no reservation implemented for mock resource management.");
+    }
+
     /* (non-Javadoc)
      * @see com.cloud.utils.component.Manager#configure(java.lang.String, java.util.Map)
      */
diff --git a/test/integration/smoke/test_deploy_vms_in_parallel.py b/test/integration/smoke/test_deploy_vms_in_parallel.py
new file mode 100644
index 00000000000..03049a29a91
--- /dev/null
+++ b/test/integration/smoke/test_deploy_vms_in_parallel.py
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+""" P1 for Deploy VM from ISO
+"""
+# Import Local Modules
+from nose.plugins.attrib import attr
+from marvin.cloudstackTestCase import cloudstackTestCase
+from marvin.lib.base import (Account,
+                             DiskOffering,
+                             Domain,
+                             Resources,
+                             ServiceOffering,
+                             VirtualMachine)
+from marvin.lib.common import (get_zone,
+                               get_domain,
+                               get_test_template)
+from marvin.cloudstackAPI import (deployVirtualMachine,
+                                  queryAsyncJobResult)
+
+
+class TestDeployVMsInParallel(cloudstackTestCase):
+
+    @classmethod
+    def setUpClass(cls):
+
+        cls.testClient = super(TestDeployVMsInParallel, cls).getClsTestClient()
+        cls.api_client = cls.testClient.getApiClient()
+
+        cls.testdata = cls.testClient.getParsedTestDataConfig()
+        # Get Zone, Domain and templates
+        cls.domain = get_domain(cls.api_client)
+        cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
+        cls.hypervisor = cls.testClient.getHypervisorInfo()
+
+        cls._cleanup = []
+
+        cls.template = get_test_template(
+            cls.api_client,
+            cls.zone.id,
+            cls.hypervisor
+        )
+
+        # Create service, disk offerings  etc
+        cls.service_offering = ServiceOffering.create(
+            cls.api_client,
+            cls.testdata["service_offering"]
+        )
+        cls._cleanup.append(cls.service_offering)
+
+        cls.disk_offering = DiskOffering.create(
+            cls.api_client,
+            cls.testdata["disk_offering"]
+        )
+        cls._cleanup.append(cls.disk_offering)
+
+        return
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestDeployVMsInParallel, cls).tearDownClass()
+
+    def setUp(self):
+
+        self.apiclient = self.testClient.getApiClient()
+
+        self.dbclient = self.testClient.getDbConnection()
+        self.cleanup = []
+        self.hypervisor = self.testClient.getHypervisorInfo()
+        self.testdata["virtual_machine"]["zoneid"] = self.zone.id
+        self.testdata["virtual_machine"]["template"] = self.template.id
+        self.testdata["iso"]["zoneid"] = self.zone.id
+        self.domain = Domain.create(
+            self.apiclient,
+            self.testdata["domain"]
+        )
+        self.cleanup.append(self.domain)
+
+        self.account = Account.create(
+            self.apiclient,
+            self.testdata["account"],
+            domainid=self.domain.id
+        )
+        self.cleanup.append(self.account)
+
+        self.userApiClient = self.testClient.getUserApiClient(UserName=self.account.name, DomainName=self.domain.name)
+        virtual_machine = VirtualMachine.create(
+            self.userApiClient,
+            self.testdata["virtual_machine"],
+            accountid=self.account.name,
+            domainid=self.account.domainid,
+            templateid=self.template.id,
+            serviceofferingid=self.service_offering.id,
+            diskofferingid=self.disk_offering.id,
+            hypervisor=self.hypervisor
+        )
+        self.cleanup.append(virtual_machine)
+        self.networkids = virtual_machine.nic[0].networkid
+        virtual_machine.delete(self.apiclient)
+        self.cleanup.remove(virtual_machine)
+        return
+
+    def update_resource_limit(self, max=1):
+        Resources.updateLimit(
+            self.apiclient,
+            domainid=self.domain.id,
+            resourcetype=0,
+            max=max
+        )
+
+    def tearDown(self):
+        super(TestDeployVMsInParallel, self).tearDown()
+
+    @attr(
+        tags=[
+            "advanced",
+            "basic",
+            "sg"],
+        required_hardware="false")
+    def test_deploy_more_vms_than_limit_allows(self):
+        """
+        Test Deploy Virtual Machines in parallel
+
+        Validate the following:
+        1. set limit to 2
+        2. deploy more than 2 VMs
+        """
+        self.test_limits(vm_limit=2)
+
+    def test_limits(self, vm_limit=1):
+        self.info(f"==== limit: {vm_limit} ====")
+
+        self.update_resource_limit(max=vm_limit)
+
+        cmd = deployVirtualMachine.deployVirtualMachineCmd()
+        cmd.serviceofferingid=self.service_offering.id
+        cmd.diskofferingid=self.disk_offering.id
+        cmd.templateid=self.template.id
+        cmd.accountid=self.account.id
+        cmd.domainid=self.account.domainid
+        cmd.zoneid=self.zone.id
+        cmd.networkids = self.networkids
+        cmd.isAsync = "false"
+
+        responses = []
+        failed = 0
+        for i in range(vm_limit+3):
+            try:
+                self.info(f"==== deploying instance #{i}")
+                response = self.userApiClient.deployVirtualMachine(cmd, method="GET")
+                responses.append(response)
+            except Exception as e:
+                failed += 1
+
+        self.info(f"==== failed deploys: {failed} ====")
+
+        self.assertEqual(failed, 3)
+        self.assertEqual(len(responses), vm_limit) # we donĀ“t care if the deploy succeed or failed for some other reason