You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by da...@apache.org on 2023/11/30 08:36:54 UTC

(cloudstack) branch main updated: Flexible tags for hosts and storage pools (#7489)

This is an automated email from the ASF dual-hosted git repository.

dahn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to refs/heads/main by this push:
     new 26b01f6f3be Flexible tags for hosts and storage pools (#7489)
26b01f6f3be is described below

commit 26b01f6f3be3f68dc0fd7ab6bade5129e9905f58
Author: João Jandre <48...@users.noreply.github.com>
AuthorDate: Thu Nov 30 05:36:47 2023 -0300

    Flexible tags for hosts and storage pools (#7489)
    
    Co-authored-by: João Jandre <jo...@scclouds.com.br>
---
 .../org/apache/cloudstack/api/ApiConstants.java    |   4 +
 .../api/command/admin/host/UpdateHostCmd.java      |   7 +
 .../admin/storage/CreateStoragePoolCmd.java        |   7 +
 .../admin/storage/UpdateStoragePoolCmd.java        |   7 +
 .../cloudstack/api/response/HostResponse.java      |  12 ++
 .../api/response/StoragePoolResponse.java          |  12 ++
 .../api/storage/PrimaryDataStoreParameters.java    |  10 ++
 .../src/main/java/com/cloud/host/HostTagVO.java    |  16 +++
 .../src/main/java/com/cloud/host/HostVO.java       |  24 +++-
 .../src/main/java/com/cloud/host/dao/HostDao.java  |   6 +
 .../main/java/com/cloud/host/dao/HostDaoImpl.java  | 103 ++++++++++++---
 .../main/java/com/cloud/host/dao/HostTagsDao.java  |  10 +-
 .../java/com/cloud/host/dao/HostTagsDaoImpl.java   |  39 ++++--
 .../java/com/cloud/storage/StoragePoolTagVO.java   |  28 ++++
 .../com/cloud/storage/dao/StoragePoolTagsDao.java  |   6 +-
 .../cloud/storage/dao/StoragePoolTagsDaoImpl.java  |  25 +++-
 .../storage/datastore/db/PrimaryDataStoreDao.java  |   8 +-
 .../datastore/db/PrimaryDataStoreDaoImpl.java      |  46 +++++--
 .../resources/META-INF/db/schema-41810to41900.sql  |   9 ++
 .../META-INF/db/views/cloud.host_view.sql          | 111 ++++++++++++++++
 .../META-INF/db/views/cloud.storage_pool_view.sql  |  68 ++++++++++
 .../src/test/java/com/cloud/host/HostVOTest.java   | 145 ++++++++++++---------
 .../allocator/AbstractStoragePoolAllocator.java    |   4 +
 .../ClusterScopeStoragePoolAllocator.java          |   6 +-
 .../allocator/LocalStoragePoolAllocator.java       |   5 +-
 .../allocator/ZoneWideStoragePoolAllocator.java    |   8 +-
 .../volume/datastore/PrimaryDataStoreHelper.java   |   2 +-
 .../quota/activationrule/presetvariables/Host.java |  10 ++
 .../presetvariables/PresetVariableHelper.java      |  26 +++-
 .../activationrule/presetvariables/Storage.java    |  11 ++
 .../presetvariables/PresetVariableHelperTest.java  |  83 ++++++++++--
 .../manager/allocator/impl/RandomAllocator.java    |  15 ++-
 .../cloudstack/metrics/PrometheusExporterImpl.java |   9 +-
 .../ElastistorPrimaryDataStoreLifeCycle.java       |   2 +
 .../lifecycle/DateraPrimaryDataStoreLifeCycle.java |   2 +
 .../CloudStackPrimaryDataStoreLifeCycleImpl.java   |   1 +
 .../LinstorPrimaryDataStoreLifeCycleImpl.java      |   2 +
 .../NexentaPrimaryDataStoreLifeCycle.java          |   2 +
 .../ScaleIOPrimaryDataStoreLifeCycle.java          |   2 +
 .../SolidFirePrimaryDataStoreLifeCycle.java        |   2 +
 .../SolidFireSharedPrimaryDataStoreLifeCycle.java  |   2 +
 .../manager/allocator/impl/FirstFitAllocator.java  |  11 +-
 .../com/cloud/api/query/dao/HostJoinDaoImpl.java   |   1 +
 .../cloud/api/query/dao/StoragePoolJoinDao.java    |   3 +
 .../api/query/dao/StoragePoolJoinDaoImpl.java      |  58 +++++++++
 .../java/com/cloud/api/query/vo/HostJoinVO.java    |   7 +
 .../com/cloud/api/query/vo/StoragePoolJoinVO.java  |   7 +
 .../configuration/ConfigurationManagerImpl.java    |  45 +++++--
 .../deploy/DeploymentPlanningManagerImpl.java      |   4 +-
 .../java/com/cloud/deploy/FirstFitPlanner.java     |   7 +
 .../com/cloud/resource/ResourceManagerImpl.java    |  22 ++--
 .../resource/RollingMaintenanceManagerImpl.java    |   7 +-
 .../java/com/cloud/storage/StorageManagerImpl.java |  30 +++--
 .../com/cloud/storage/VolumeApiServiceImpl.java    |  44 +++++--
 .../cloud/storage/listener/StoragePoolMonitor.java |   2 +-
 .../configuration/ConfigurationManagerTest.java    |  10 +-
 .../cloud/storage/VolumeApiServiceImplTest.java    |  89 ++++++++++---
 .../storage/listener/StoragePoolMonitorTest.java   |   4 +-
 ui/public/locales/en.json                          |   1 +
 ui/public/locales/pt_BR.json                       |   1 +
 ui/src/config/section/infra/hosts.js               |   2 +-
 ui/src/config/section/infra/primaryStorages.js     |   2 +-
 .../utils/jsinterpreter/TagAsRuleHelper.java       |  51 ++++++++
 63 files changed, 1092 insertions(+), 213 deletions(-)

diff --git a/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java b/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
index 5b6647991ef..4b437516bd8 100644
--- a/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
+++ b/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
@@ -1066,6 +1066,10 @@ public class ApiConstants {
     public static final String CLIENT_ID = "clientid";
     public static final String REDIRECT_URI = "redirecturi";
 
+    public static final String IS_TAG_A_RULE = "istagarule";
+
+    public static final String PARAMETER_DESCRIPTION_IS_TAG_A_RULE = "Whether the informed tag is a JS interpretable rule or not.";
+
     /**
      * This enum specifies IO Drivers, each option controls specific policies on I/O.
      * Qemu guests support "threads" and "native" options Since 0.8.8 ; "io_uring" is supported Since 6.3.0 (QEMU 5.0).
diff --git a/api/src/main/java/org/apache/cloudstack/api/command/admin/host/UpdateHostCmd.java b/api/src/main/java/org/apache/cloudstack/api/command/admin/host/UpdateHostCmd.java
index e3ff130e2d4..9cf47a9c4b9 100644
--- a/api/src/main/java/org/apache/cloudstack/api/command/admin/host/UpdateHostCmd.java
+++ b/api/src/main/java/org/apache/cloudstack/api/command/admin/host/UpdateHostCmd.java
@@ -60,6 +60,9 @@ public class UpdateHostCmd extends BaseCmd {
     @Parameter(name = ApiConstants.HOST_TAGS, type = CommandType.LIST, collectionType = CommandType.STRING, description = "list of tags to be added to the host")
     private List<String> hostTags;
 
+    @Parameter(name = ApiConstants.IS_TAG_A_RULE, type = CommandType.BOOLEAN, description = ApiConstants.PARAMETER_DESCRIPTION_IS_TAG_A_RULE)
+    private Boolean isTagARule;
+
     @Parameter(name = ApiConstants.URL, type = CommandType.STRING, description = "the new uri for the secondary storage: nfs://host/path")
     private String url;
 
@@ -90,6 +93,10 @@ public class UpdateHostCmd extends BaseCmd {
         return hostTags;
     }
 
+    public Boolean getIsTagARule() {
+        return isTagARule;
+    }
+
     public String getUrl() {
         return url;
     }
diff --git a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/CreateStoragePoolCmd.java b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/CreateStoragePoolCmd.java
index 0eacc5cda6b..477d7570dfa 100644
--- a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/CreateStoragePoolCmd.java
+++ b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/CreateStoragePoolCmd.java
@@ -90,6 +90,9 @@ public class CreateStoragePoolCmd extends BaseCmd {
                description = "hypervisor type of the hosts in zone that will be attached to this storage pool. KVM, VMware supported as of now.")
     private String hypervisor;
 
+    @Parameter(name = ApiConstants.IS_TAG_A_RULE, type = CommandType.BOOLEAN, description = ApiConstants.PARAMETER_DESCRIPTION_IS_TAG_A_RULE)
+    private Boolean isTagARule;
+
     /////////////////////////////////////////////////////
     /////////////////// Accessors ///////////////////////
     /////////////////////////////////////////////////////
@@ -146,6 +149,10 @@ public class CreateStoragePoolCmd extends BaseCmd {
         return hypervisor;
     }
 
+    public Boolean isTagARule() {
+        return this.isTagARule;
+    }
+
     @Override
     public long getEntityOwnerId() {
         return Account.ACCOUNT_ID_SYSTEM;
diff --git a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/UpdateStoragePoolCmd.java b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/UpdateStoragePoolCmd.java
index 9e34684a09d..09ec5394921 100644
--- a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/UpdateStoragePoolCmd.java
+++ b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/UpdateStoragePoolCmd.java
@@ -61,6 +61,9 @@ public class UpdateStoragePoolCmd extends BaseCmd {
             " enable it back.")
     private Boolean enabled;
 
+    @Parameter(name = ApiConstants.IS_TAG_A_RULE, type = CommandType.BOOLEAN, description = ApiConstants.PARAMETER_DESCRIPTION_IS_TAG_A_RULE)
+    private Boolean isTagARule;
+
     /////////////////////////////////////////////////////
     /////////////////// Accessors ///////////////////////
     /////////////////////////////////////////////////////
@@ -89,6 +92,10 @@ public class UpdateStoragePoolCmd extends BaseCmd {
         return enabled;
     }
 
+    public Boolean isTagARule() {
+        return isTagARule;
+    }
+
     /////////////////////////////////////////////////////
     /////////////// API Implementation///////////////////
     /////////////////////////////////////////////////////
diff --git a/api/src/main/java/org/apache/cloudstack/api/response/HostResponse.java b/api/src/main/java/org/apache/cloudstack/api/response/HostResponse.java
index e1f1e5ee241..d72d23b99c9 100644
--- a/api/src/main/java/org/apache/cloudstack/api/response/HostResponse.java
+++ b/api/src/main/java/org/apache/cloudstack/api/response/HostResponse.java
@@ -221,6 +221,10 @@ public class HostResponse extends BaseResponseWithAnnotations {
     @Param(description = "comma-separated list of tags for the host")
     private String hostTags;
 
+    @SerializedName(ApiConstants.IS_TAG_A_RULE)
+    @Param(description = ApiConstants.PARAMETER_DESCRIPTION_IS_TAG_A_RULE)
+    private Boolean isTagARule;
+
     @SerializedName("hasenoughcapacity")
     @Param(description = "true if this host has enough CPU and RAM capacity to migrate a VM to it, false otherwise")
     private Boolean hasEnoughCapacity;
@@ -732,4 +736,12 @@ public class HostResponse extends BaseResponseWithAnnotations {
     public void setEncryptionSupported(Boolean encryptionSupported) {
         this.encryptionSupported = encryptionSupported;
     }
+
+    public Boolean getIsTagARule() {
+        return isTagARule;
+    }
+
+    public void setIsTagARule(Boolean tagARule) {
+        isTagARule = tagARule;
+    }
 }
diff --git a/api/src/main/java/org/apache/cloudstack/api/response/StoragePoolResponse.java b/api/src/main/java/org/apache/cloudstack/api/response/StoragePoolResponse.java
index 89256c26473..183290ec9eb 100644
--- a/api/src/main/java/org/apache/cloudstack/api/response/StoragePoolResponse.java
+++ b/api/src/main/java/org/apache/cloudstack/api/response/StoragePoolResponse.java
@@ -101,6 +101,10 @@ public class StoragePoolResponse extends BaseResponseWithAnnotations {
     @Param(description = "the tags for the storage pool")
     private String tags;
 
+    @SerializedName(ApiConstants.IS_TAG_A_RULE)
+    @Param(description = ApiConstants.PARAMETER_DESCRIPTION_IS_TAG_A_RULE)
+    private Boolean isTagARule;
+
     @SerializedName(ApiConstants.STATE)
     @Param(description = "the state of the storage pool")
     private StoragePoolStatus state;
@@ -304,6 +308,14 @@ public class StoragePoolResponse extends BaseResponseWithAnnotations {
         this.tags = tags;
     }
 
+    public Boolean getIsTagARule() {
+        return isTagARule;
+    }
+
+    public void setIsTagARule(Boolean tagARule) {
+        isTagARule = tagARule;
+    }
+
     public StoragePoolStatus getState() {
         return state;
     }
diff --git a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/PrimaryDataStoreParameters.java b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/PrimaryDataStoreParameters.java
index 1dbff59a891..1b18264df15 100644
--- a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/PrimaryDataStoreParameters.java
+++ b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/PrimaryDataStoreParameters.java
@@ -43,6 +43,8 @@ public class PrimaryDataStoreParameters {
     private boolean managed;
     private Long capacityIops;
 
+    private Boolean isTagARule;
+
     /**
      * @return the userInfo
      */
@@ -277,4 +279,12 @@ public class PrimaryDataStoreParameters {
     public void setUsedBytes(long usedBytes) {
         this.usedBytes = usedBytes;
     }
+
+    public Boolean isTagARule() {
+        return isTagARule;
+    }
+
+    public void setIsTagARule(Boolean isTagARule) {
+        this.isTagARule = isTagARule;
+    }
 }
diff --git a/engine/schema/src/main/java/com/cloud/host/HostTagVO.java b/engine/schema/src/main/java/com/cloud/host/HostTagVO.java
index 6b8b754849c..cd4ac29738d 100644
--- a/engine/schema/src/main/java/com/cloud/host/HostTagVO.java
+++ b/engine/schema/src/main/java/com/cloud/host/HostTagVO.java
@@ -24,6 +24,7 @@ import javax.persistence.Id;
 import javax.persistence.Table;
 
 import org.apache.cloudstack.api.InternalIdentity;
+import org.apache.commons.lang3.BooleanUtils;
 
 @Entity
 @Table(name = "host_tags")
@@ -39,12 +40,22 @@ public class HostTagVO implements InternalIdentity {
     @Column(name = "tag")
     private String tag;
 
+    @Column(name = "is_tag_a_rule")
+    private boolean isTagARule;
+
     protected HostTagVO() {
     }
 
     public HostTagVO(long hostId, String tag) {
         this.hostId = hostId;
         this.tag = tag;
+        this.isTagARule = false;
+    }
+
+    public HostTagVO(long hostId, String tag, Boolean isTagARule) {
+        this.hostId = hostId;
+        this.tag = tag;
+        this.isTagARule = BooleanUtils.toBooleanDefaultIfNull(isTagARule, false);
     }
 
     public long getHostId() {
@@ -59,6 +70,11 @@ public class HostTagVO implements InternalIdentity {
         this.tag = tag;
     }
 
+    public boolean getIsTagARule() {
+        return isTagARule;
+    }
+
+
     @Override
     public long getId() {
         return id;
diff --git a/engine/schema/src/main/java/com/cloud/host/HostVO.java b/engine/schema/src/main/java/com/cloud/host/HostVO.java
index d6e7ea1fd87..697401ad069 100644
--- a/engine/schema/src/main/java/com/cloud/host/HostVO.java
+++ b/engine/schema/src/main/java/com/cloud/host/HostVO.java
@@ -39,6 +39,7 @@ import javax.persistence.TemporalType;
 import javax.persistence.Transient;
 
 import com.cloud.agent.api.VgpuTypesInfo;
+import com.cloud.host.dao.HostTagsDao;
 import com.cloud.hypervisor.Hypervisor.HypervisorType;
 import com.cloud.offering.ServiceOffering;
 import com.cloud.resource.ResourceState;
@@ -46,7 +47,10 @@ import com.cloud.storage.Storage.StoragePoolType;
 import com.cloud.utils.NumbersUtil;
 import com.cloud.utils.db.GenericDao;
 import java.util.Arrays;
+
+import org.apache.cloudstack.utils.jsinterpreter.TagAsRuleHelper;
 import org.apache.cloudstack.utils.reflectiontostringbuilderutils.ReflectionToStringBuilderUtils;
+import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang3.StringUtils;
 
 @Entity
@@ -159,6 +163,14 @@ public class HostVO implements Host {
     @Transient
     List<String> hostTags;
 
+    /**
+     * This is a delayed load value.
+     * If the value is null, then this field has not been loaded yet.
+     * Call host dao to load it.
+     */
+    @Transient
+    Boolean isTagARule;
+
     // This value is only for saving and current cannot be loaded.
     @Transient
     HashMap<String, HashMap<String, VgpuTypesInfo>> groupDetails = new HashMap<String, HashMap<String, VgpuTypesInfo>>();
@@ -322,8 +334,13 @@ public class HostVO implements Host {
         return hostTags;
     }
 
-    public void setHostTags(List<String> hostTags) {
+    public void setHostTags(List<String> hostTags, Boolean isTagARule) {
         this.hostTags = hostTags;
+        this.isTagARule = isTagARule;
+    }
+
+    public Boolean getIsTagARule() {
+        return isTagARule;
     }
 
     public  HashMap<String, HashMap<String, VgpuTypesInfo>> getGpuGroupDetails() {
@@ -748,6 +765,11 @@ public class HostVO implements Host {
         if (serviceOffering == null) {
             return false;
         }
+
+        if (BooleanUtils.isTrue(this.getIsTagARule())) {
+            return TagAsRuleHelper.interpretTagAsRule(this.getHostTags().get(0), serviceOffering.getHostTag(), HostTagsDao.hostTagRuleExecutionTimeout.value());
+        }
+
         if (StringUtils.isEmpty(serviceOffering.getHostTag())) {
             return true;
         }
diff --git a/engine/schema/src/main/java/com/cloud/host/dao/HostDao.java b/engine/schema/src/main/java/com/cloud/host/dao/HostDao.java
index ca51ad3f428..fe30722feb1 100644
--- a/engine/schema/src/main/java/com/cloud/host/dao/HostDao.java
+++ b/engine/schema/src/main/java/com/cloud/host/dao/HostDao.java
@@ -108,6 +108,8 @@ public interface HostDao extends GenericDao<HostVO, Long>, StateDao<Status, Stat
 
     List<HostVO> listAllHostsByZoneAndHypervisorType(long zoneId, HypervisorType hypervisorType);
 
+    List<HostVO> listAllHostsThatHaveNoRuleTag(Host.Type type, Long clusterId, Long podId, Long dcId);
+
     List<HostVO> listAllHostsByType(Host.Type type);
 
     HostVO findByPublicIp(String publicIp);
@@ -161,4 +163,8 @@ public interface HostDao extends GenericDao<HostVO, Long>, StateDao<Status, Stat
      * @return ordered list of hypervisor versions
      */
     List<String> listOrderedHostsHypervisorVersionsInDatacenter(long datacenterId, HypervisorType hypervisorType);
+
+    List<HostVO> findHostsWithTagRuleThatMatchComputeOferringTags(String computeOfferingTags);
+
+    List<Long> findClustersThatMatchHostTagRule(String computeOfferingTags);
 }
diff --git a/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java b/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java
index 392b623299f..136351527f6 100644
--- a/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java
+++ b/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java
@@ -22,9 +22,11 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.TimeZone;
 import java.util.stream.Collectors;
 
@@ -32,6 +34,8 @@ import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.persistence.TableGenerator;
 
+import org.apache.cloudstack.utils.jsinterpreter.TagAsRuleHelper;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.log4j.Logger;
 
 import com.cloud.agent.api.VgpuTypesInfo;
@@ -80,8 +84,8 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
     private static final Logger state_logger = Logger.getLogger(ResourceState.class);
 
     private static final String LIST_HOST_IDS_BY_COMPUTETAGS = "SELECT filtered.host_id, COUNT(filtered.tag) AS tag_count "
-                                                             + "FROM (SELECT host_id, tag FROM host_tags GROUP BY host_id,tag) AS filtered "
-                                                             + "WHERE tag IN(%s) "
+                                                             + "FROM (SELECT host_id, tag, is_tag_a_rule FROM host_tags GROUP BY host_id,tag) AS filtered "
+                                                             + "WHERE tag IN(%s) AND is_tag_a_rule = 0 "
                                                              + "GROUP BY host_id "
                                                              + "HAVING tag_count = %s ";
     private static final String SEPARATOR = ",";
@@ -148,6 +152,10 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
     protected GenericSearchBuilder<ClusterVO, Long> AllClustersSearch;
     protected SearchBuilder<HostVO> HostsInClusterSearch;
 
+    protected SearchBuilder<HostVO> searchBuilderFindByIdTypeClusterIdPodIdDcIdAndWithoutRuleTag;
+
+    protected SearchBuilder<HostTagVO> searchBuilderFindByRuleTag;
+
     protected Attribute _statusAttr;
     protected Attribute _resourceStateAttr;
     protected Attribute _msIdAttr;
@@ -455,6 +463,22 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
         HostIdSearch.and("dataCenterId", HostIdSearch.entity().getDataCenterId(), Op.EQ);
         HostIdSearch.done();
 
+        searchBuilderFindByRuleTag = _hostTagsDao.createSearchBuilder();
+        searchBuilderFindByRuleTag.and("is_tag_a_rule", searchBuilderFindByRuleTag.entity().getIsTagARule(), Op.EQ);
+        searchBuilderFindByRuleTag.or("tagDoesNotExist", searchBuilderFindByRuleTag.entity().getIsTagARule(), Op.NULL);
+
+        searchBuilderFindByIdTypeClusterIdPodIdDcIdAndWithoutRuleTag = createSearchBuilder();
+        searchBuilderFindByIdTypeClusterIdPodIdDcIdAndWithoutRuleTag.and("id", searchBuilderFindByIdTypeClusterIdPodIdDcIdAndWithoutRuleTag.entity().getId(), Op.EQ);
+        searchBuilderFindByIdTypeClusterIdPodIdDcIdAndWithoutRuleTag.and("type", searchBuilderFindByIdTypeClusterIdPodIdDcIdAndWithoutRuleTag.entity().getType(), Op.EQ);
+        searchBuilderFindByIdTypeClusterIdPodIdDcIdAndWithoutRuleTag.and("cluster_id", searchBuilderFindByIdTypeClusterIdPodIdDcIdAndWithoutRuleTag.entity().getClusterId(), Op.EQ);
+        searchBuilderFindByIdTypeClusterIdPodIdDcIdAndWithoutRuleTag.and("pod_id", searchBuilderFindByIdTypeClusterIdPodIdDcIdAndWithoutRuleTag.entity().getPodId(), Op.EQ);
+        searchBuilderFindByIdTypeClusterIdPodIdDcIdAndWithoutRuleTag.and("data_center_id", searchBuilderFindByIdTypeClusterIdPodIdDcIdAndWithoutRuleTag.entity().getDataCenterId(), Op.EQ);
+        searchBuilderFindByIdTypeClusterIdPodIdDcIdAndWithoutRuleTag.join("id", searchBuilderFindByRuleTag, searchBuilderFindByRuleTag.entity().getHostId(),
+                searchBuilderFindByIdTypeClusterIdPodIdDcIdAndWithoutRuleTag.entity().getId(), JoinType.LEFTOUTER);
+
+        searchBuilderFindByRuleTag.done();
+        searchBuilderFindByIdTypeClusterIdPodIdDcIdAndWithoutRuleTag.done();
+
         _statusAttr = _allAttributes.get("status");
         _msIdAttr = _allAttributes.get("managementServerId");
         _pingTimeAttr = _allAttributes.get("lastPinged");
@@ -792,9 +816,12 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
 
     @Override
     public List<HostVO> listAllUpAndEnabledNonHAHosts(Type type, Long clusterId, Long podId, long dcId, String haTag) {
-        SearchBuilder<HostTagVO> hostTagSearch = null;
+        SearchBuilder<HostTagVO> hostTagSearch = _hostTagsDao.createSearchBuilder();
+        hostTagSearch.and();
+        hostTagSearch.op("isTagARule", hostTagSearch.entity().getIsTagARule(), Op.EQ);
+        hostTagSearch.or("tagDoesNotExist", hostTagSearch.entity().getIsTagARule(), Op.NULL);
+        hostTagSearch.cp();
         if (haTag != null && !haTag.isEmpty()) {
-            hostTagSearch = _hostTagsDao.createSearchBuilder();
             hostTagSearch.and().op("tag", hostTagSearch.entity().getTag(), SearchCriteria.Op.NEQ);
             hostTagSearch.or("tagNull", hostTagSearch.entity().getTag(), SearchCriteria.Op.NULL);
             hostTagSearch.cp();
@@ -809,12 +836,14 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
         hostSearch.and("status", hostSearch.entity().getStatus(), SearchCriteria.Op.EQ);
         hostSearch.and("resourceState", hostSearch.entity().getResourceState(), SearchCriteria.Op.EQ);
 
-        if (haTag != null && !haTag.isEmpty()) {
-            hostSearch.join("hostTagSearch", hostTagSearch, hostSearch.entity().getId(), hostTagSearch.entity().getHostId(), JoinBuilder.JoinType.LEFTOUTER);
-        }
+
+        hostSearch.join("hostTagSearch", hostTagSearch, hostSearch.entity().getId(), hostTagSearch.entity().getHostId(), JoinBuilder.JoinType.LEFTOUTER);
+
 
         SearchCriteria<HostVO> sc = hostSearch.create();
 
+        sc.setJoinParameters("hostTagSearch", "isTagARule", false);
+
         if (haTag != null && !haTag.isEmpty()) {
             sc.setJoinParameters("hostTagSearch", "tag", haTag);
         }
@@ -846,8 +875,13 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
 
     @Override
     public void loadHostTags(HostVO host) {
-        List<String> hostTags = _hostTagsDao.getHostTags(host.getId());
-        host.setHostTags(hostTags);
+        List<HostTagVO> hostTagVOList = _hostTagsDao.getHostTags(host.getId());
+        if (CollectionUtils.isNotEmpty(hostTagVOList)) {
+            List<String> hostTagList = hostTagVOList.parallelStream().map(HostTagVO::getTag).collect(Collectors.toList());
+            host.setHostTags(hostTagList, hostTagVOList.get(0).getIsTagARule());
+        } else {
+            host.setHostTags(null, null);
+        }
     }
 
     @DB
@@ -881,10 +915,10 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
 
     protected void saveHostTags(HostVO host) {
         List<String> hostTags = host.getHostTags();
-        if (hostTags == null || (hostTags != null && hostTags.isEmpty())) {
+        if (CollectionUtils.isEmpty(hostTags)) {
             return;
         }
-        _hostTagsDao.persist(host.getId(), hostTags);
+        _hostTagsDao.persist(host.getId(), hostTags, host.getIsTagARule());
     }
 
     protected void saveGpuRecords(HostVO host) {
@@ -1244,6 +1278,26 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
         return listBy(sc);
     }
 
+    @Override
+    public List<HostVO> listAllHostsThatHaveNoRuleTag(Type type, Long clusterId, Long podId, Long dcId) {
+        SearchCriteria<HostVO> sc = searchBuilderFindByIdTypeClusterIdPodIdDcIdAndWithoutRuleTag.create();
+        if (type != null) {
+            sc.setParameters("type", type);
+        }
+        if (clusterId != null) {
+            sc.setParameters("cluster_id", clusterId);
+        }
+        if (podId != null) {
+            sc.setParameters("pod_id", podId);
+        }
+        if (dcId != null) {
+            sc.setParameters("data_center_id", dcId);
+        }
+        sc.setJoinParameters("id", "is_tag_a_rule", false);
+
+        return search(sc, null);
+    }
+
     @Override
     public List<Long> listClustersByHostTag(String computeOfferingTags) {
         TransactionLegacy txn = TransactionLegacy.currentTxn();
@@ -1266,9 +1320,6 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
                 result.add(rs.getLong(1));
             }
             pstmt.close();
-            if(result.isEmpty()){
-                throw new CloudRuntimeException("No suitable host found for follow compute offering tags: " + computeOfferingTags);
-            }
             return result;
         } catch (SQLException e) {
             throw new CloudRuntimeException("DB Exception on: " + sql, e);
@@ -1293,15 +1344,33 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
                 result.add(rs.getLong(1));
             }
             pstmt.close();
-            if(result.isEmpty()){
-                throw new CloudRuntimeException("No suitable host found for follow compute offering tags: " + computeOfferingTags);
-            }
             return result;
         } catch (SQLException e) {
             throw new CloudRuntimeException("DB Exception on: " + select, e);
         }
     }
 
+    public List<HostVO> findHostsWithTagRuleThatMatchComputeOferringTags(String computeOfferingTags) {
+        List<HostTagVO> hostTagVOList = _hostTagsDao.findHostRuleTags();
+        List<HostVO> result = new ArrayList<>();
+        for (HostTagVO rule: hostTagVOList) {
+            if (TagAsRuleHelper.interpretTagAsRule(rule.getTag(), computeOfferingTags, HostTagsDao.hostTagRuleExecutionTimeout.value())) {
+                result.add(findById(rule.getHostId()));
+            }
+        }
+
+        return result;
+    }
+
+    public List<Long> findClustersThatMatchHostTagRule(String computeOfferingTags) {
+        Set<Long> result = new HashSet<>();
+        List<HostVO> hosts = findHostsWithTagRuleThatMatchComputeOferringTags(computeOfferingTags);
+        for (HostVO host: hosts) {
+            result.add(host.getClusterId());
+        }
+        return new ArrayList<>(result);
+    }
+
     private String getHostIdsByComputeTags(List<String> offeringTags){
         List<String> questionMarks = new ArrayList();
         offeringTags.forEach((tag) -> { questionMarks.add("?"); });
diff --git a/engine/schema/src/main/java/com/cloud/host/dao/HostTagsDao.java b/engine/schema/src/main/java/com/cloud/host/dao/HostTagsDao.java
index 0fb5370d81a..d134db33403 100644
--- a/engine/schema/src/main/java/com/cloud/host/dao/HostTagsDao.java
+++ b/engine/schema/src/main/java/com/cloud/host/dao/HostTagsDao.java
@@ -20,15 +20,21 @@ import java.util.List;
 
 import com.cloud.host.HostTagVO;
 import com.cloud.utils.db.GenericDao;
+import org.apache.cloudstack.framework.config.ConfigKey;
 
 public interface HostTagsDao extends GenericDao<HostTagVO, Long> {
 
-    void persist(long hostId, List<String> hostTags);
+    ConfigKey<Long> hostTagRuleExecutionTimeout = new ConfigKey<>("Advanced", Long.class, "host.tag.rule.execution.timeout", "2000", "The maximum runtime, in milliseconds, " +
+        "to execute a host tag rule; if it is reached, a timeout will happen.", true);
 
-    List<String> getHostTags(long hostId);
+    void persist(long hostId, List<String> hostTags, Boolean isTagARule);
+
+    List<HostTagVO> getHostTags(long hostId);
 
     List<String> getDistinctImplicitHostTags(List<Long> hostIds, String[] implicitHostTags);
 
     void deleteTags(long hostId);
 
+    List<HostTagVO> findHostRuleTags();
+
 }
diff --git a/engine/schema/src/main/java/com/cloud/host/dao/HostTagsDaoImpl.java b/engine/schema/src/main/java/com/cloud/host/dao/HostTagsDaoImpl.java
index a73899b7b33..65deb1d1c9b 100644
--- a/engine/schema/src/main/java/com/cloud/host/dao/HostTagsDaoImpl.java
+++ b/engine/schema/src/main/java/com/cloud/host/dao/HostTagsDaoImpl.java
@@ -16,10 +16,10 @@
 // under the License.
 package com.cloud.host.dao;
 
-import java.util.ArrayList;
 import java.util.List;
 
-
+import org.apache.cloudstack.framework.config.ConfigKey;
+import org.apache.cloudstack.framework.config.Configurable;
 import org.springframework.stereotype.Component;
 
 import com.cloud.host.HostTagVO;
@@ -31,13 +31,14 @@ import com.cloud.utils.db.TransactionLegacy;
 import com.cloud.utils.db.SearchCriteria.Func;
 
 @Component
-public class HostTagsDaoImpl extends GenericDaoBase<HostTagVO, Long> implements HostTagsDao {
+public class HostTagsDaoImpl extends GenericDaoBase<HostTagVO, Long> implements HostTagsDao, Configurable {
     protected final SearchBuilder<HostTagVO> HostSearch;
     protected final GenericSearchBuilder<HostTagVO, String> DistinctImplictTagsSearch;
 
     public HostTagsDaoImpl() {
         HostSearch = createSearchBuilder();
         HostSearch.and("hostId", HostSearch.entity().getHostId(), SearchCriteria.Op.EQ);
+        HostSearch.and("isTagARule", HostSearch.entity().getIsTagARule(), SearchCriteria.Op.EQ);
         HostSearch.done();
 
         DistinctImplictTagsSearch = createSearchBuilder(String.class);
@@ -48,17 +49,11 @@ public class HostTagsDaoImpl extends GenericDaoBase<HostTagVO, Long> implements
     }
 
     @Override
-    public List<String> getHostTags(long hostId) {
+    public List<HostTagVO> getHostTags(long hostId) {
         SearchCriteria<HostTagVO> sc = HostSearch.create();
         sc.setParameters("hostId", hostId);
 
-        List<HostTagVO> results = search(sc, null);
-        List<String> hostTags = new ArrayList<String>(results.size());
-        for (HostTagVO result : results) {
-            hostTags.add(result.getTag());
-        }
-
-        return hostTags;
+        return search(sc, null);
     }
 
     @Override
@@ -80,7 +75,15 @@ public class HostTagsDaoImpl extends GenericDaoBase<HostTagVO, Long> implements
     }
 
     @Override
-    public void persist(long hostId, List<String> hostTags) {
+    public List<HostTagVO> findHostRuleTags() {
+        SearchCriteria<HostTagVO> sc = HostSearch.create();
+        sc.setParameters("isTagARule", true);
+
+        return search(sc, null);
+    }
+
+    @Override
+    public void persist(long hostId, List<String> hostTags, Boolean isTagARule) {
         TransactionLegacy txn = TransactionLegacy.currentTxn();
 
         txn.start();
@@ -91,10 +94,20 @@ public class HostTagsDaoImpl extends GenericDaoBase<HostTagVO, Long> implements
         for (String tag : hostTags) {
             tag = tag.trim();
             if (tag.length() > 0) {
-                HostTagVO vo = new HostTagVO(hostId, tag);
+                HostTagVO vo = new HostTagVO(hostId, tag, isTagARule);
                 persist(vo);
             }
         }
         txn.commit();
     }
+
+    @Override
+    public ConfigKey<?>[] getConfigKeys() {
+        return new ConfigKey<?>[] {hostTagRuleExecutionTimeout};
+    }
+
+    @Override
+    public String getConfigComponentName() {
+        return HostTagsDaoImpl.class.getSimpleName();
+    }
 }
diff --git a/engine/schema/src/main/java/com/cloud/storage/StoragePoolTagVO.java b/engine/schema/src/main/java/com/cloud/storage/StoragePoolTagVO.java
index 18c0dc3c326..2675c36f27f 100755
--- a/engine/schema/src/main/java/com/cloud/storage/StoragePoolTagVO.java
+++ b/engine/schema/src/main/java/com/cloud/storage/StoragePoolTagVO.java
@@ -23,7 +23,9 @@ import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.Table;
 
+import com.cloud.utils.NumbersUtil;
 import org.apache.cloudstack.api.InternalIdentity;
+import org.apache.commons.lang3.BooleanUtils;
 
 @Entity
 @Table(name = "storage_pool_tags")
@@ -43,9 +45,19 @@ public class StoragePoolTagVO implements InternalIdentity {
     @Column(name = "tag")
     private String tag;
 
+    @Column(name = "is_tag_a_rule")
+    private boolean isTagARule;
+
     public StoragePoolTagVO(long poolId, String tag) {
         this.poolId = poolId;
         this.tag = tag;
+        this.isTagARule = false;
+    }
+
+    public StoragePoolTagVO(long poolId, String tag, Boolean isTagARule) {
+        this.poolId = poolId;
+        this.tag = tag;
+        this.isTagARule = BooleanUtils.toBooleanDefaultIfNull(isTagARule, false);
     }
 
     @Override
@@ -61,4 +73,20 @@ public class StoragePoolTagVO implements InternalIdentity {
         return tag;
     }
 
+    public boolean isTagARule() {
+        return this.isTagARule;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof StoragePoolTagVO) {
+            return this.poolId == ((StoragePoolTagVO)obj).getPoolId();
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return NumbersUtil.hash(id);
+    }
 }
diff --git a/engine/schema/src/main/java/com/cloud/storage/dao/StoragePoolTagsDao.java b/engine/schema/src/main/java/com/cloud/storage/dao/StoragePoolTagsDao.java
index 946b46b5cfe..9352ee21858 100755
--- a/engine/schema/src/main/java/com/cloud/storage/dao/StoragePoolTagsDao.java
+++ b/engine/schema/src/main/java/com/cloud/storage/dao/StoragePoolTagsDao.java
@@ -25,10 +25,14 @@ import com.cloud.utils.db.GenericDao;
 
 public interface StoragePoolTagsDao extends GenericDao<StoragePoolTagVO, Long> {
 
-    void persist(long poolId, List<String> storagePoolTags);
+    void persist(long poolId, List<String> storagePoolTags, Boolean isTagARule);
+
+    void persist(List<StoragePoolTagVO> storagePoolTags);
     List<String> getStoragePoolTags(long poolId);
     void deleteTags(long poolId);
     List<StoragePoolTagVO> searchByIds(Long... stIds);
     StorageTagResponse newStorageTagResponse(StoragePoolTagVO tag);
 
+    List<StoragePoolTagVO> findStoragePoolTags(long poolId);
+
 }
diff --git a/engine/schema/src/main/java/com/cloud/storage/dao/StoragePoolTagsDaoImpl.java b/engine/schema/src/main/java/com/cloud/storage/dao/StoragePoolTagsDaoImpl.java
index f20e0c411de..c01c66763af 100755
--- a/engine/schema/src/main/java/com/cloud/storage/dao/StoragePoolTagsDaoImpl.java
+++ b/engine/schema/src/main/java/com/cloud/storage/dao/StoragePoolTagsDaoImpl.java
@@ -21,6 +21,9 @@ import java.util.List;
 
 import javax.inject.Inject;
 
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallbackNoReturn;
+import com.cloud.utils.db.TransactionStatus;
 import org.apache.cloudstack.api.response.StorageTagResponse;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 
@@ -50,7 +53,7 @@ public class StoragePoolTagsDaoImpl extends GenericDaoBase<StoragePoolTagVO, Lon
     }
 
     @Override
-    public void persist(long poolId, List<String> storagePoolTags) {
+    public void persist(long poolId, List<String> storagePoolTags, Boolean isTagARule) {
         TransactionLegacy txn = TransactionLegacy.currentTxn();
 
         txn.start();
@@ -61,13 +64,23 @@ public class StoragePoolTagsDaoImpl extends GenericDaoBase<StoragePoolTagVO, Lon
         for (String tag : storagePoolTags) {
             tag = tag.trim();
             if (tag.length() > 0) {
-                StoragePoolTagVO vo = new StoragePoolTagVO(poolId, tag);
+                StoragePoolTagVO vo = new StoragePoolTagVO(poolId, tag, isTagARule);
                 persist(vo);
             }
         }
         txn.commit();
     }
 
+    public void persist(List<StoragePoolTagVO> storagePoolTags) {
+        Transaction.execute(TransactionLegacy.CLOUD_DB, new TransactionCallbackNoReturn() {
+            @Override public void doInTransactionWithoutResult(TransactionStatus status) {
+                for (StoragePoolTagVO storagePoolTagVO : storagePoolTags) {
+                    persist(storagePoolTagVO);
+                }
+            }
+        });
+    }
+
     @Override
     public List<String> getStoragePoolTags(long poolId) {
         SearchCriteria<StoragePoolTagVO> sc = StoragePoolSearch.create();
@@ -157,4 +170,12 @@ public class StoragePoolTagsDaoImpl extends GenericDaoBase<StoragePoolTagVO, Lon
         return tagResponse;
     }
 
+    @Override
+    public List<StoragePoolTagVO> findStoragePoolTags(long poolId) {
+        SearchCriteria<StoragePoolTagVO> sc = StoragePoolSearch.create();
+        sc.setParameters("poolId", poolId);
+
+        return search(sc, null);
+    }
+
 }
diff --git a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDao.java b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDao.java
index 80fddc9bd94..8f77b4ba63e 100644
--- a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDao.java
+++ b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDao.java
@@ -53,7 +53,7 @@ public interface PrimaryDataStoreDao extends GenericDao<StoragePoolVO, Long> {
      */
     void updateCapacityIops(long id, long capacityIops);
 
-    StoragePoolVO persist(StoragePoolVO pool, Map<String, String> details, List<String> tags);
+    StoragePoolVO persist(StoragePoolVO pool, Map<String, String> details, List<String> tags, Boolean isTagARule);
 
     /**
      * Find pool by name.
@@ -77,7 +77,7 @@ public interface PrimaryDataStoreDao extends GenericDao<StoragePoolVO, Long> {
      */
     List<StoragePoolVO> findPoolsByDetails(long dcId, long podId, Long clusterId, Map<String, String> details, ScopeType scope);
 
-    List<StoragePoolVO> findPoolsByTags(long dcId, long podId, Long clusterId, String[] tags);
+    List<StoragePoolVO> findPoolsByTags(long dcId, long podId, Long clusterId, String[] tags, boolean validateTagRule, long ruleExecuteTimeout);
 
     List<StoragePoolVO> findDisabledPoolsByScope(long dcId, Long podId, Long clusterId, ScopeType scope);
 
@@ -112,9 +112,9 @@ public interface PrimaryDataStoreDao extends GenericDao<StoragePoolVO, Long> {
 
     List<StoragePoolVO> listPoolsByCluster(long clusterId);
 
-    List<StoragePoolVO> findLocalStoragePoolsByTags(long dcId, long podId, Long clusterId, String[] tags);
+    List<StoragePoolVO> findLocalStoragePoolsByTags(long dcId, long podId, Long clusterId, String[] tags, boolean validateTagRule);
 
-    List<StoragePoolVO> findZoneWideStoragePoolsByTags(long dcId, String[] tags);
+    List<StoragePoolVO> findZoneWideStoragePoolsByTags(long dcId, String[] tags, boolean validateTagRule);
 
     List<StoragePoolVO> findZoneWideStoragePoolsByHypervisor(long dataCenterId, HypervisorType hypervisorType);
 
diff --git a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDaoImpl.java b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDaoImpl.java
index 2fc52b3fb28..af7dbdc0225 100644
--- a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDaoImpl.java
+++ b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDaoImpl.java
@@ -67,11 +67,11 @@ public class PrimaryDataStoreDaoImpl extends GenericDaoBase<StoragePoolVO, Long>
 
     protected final String DetailsSqlPrefix = "SELECT storage_pool.* from storage_pool LEFT JOIN storage_pool_details ON storage_pool.id = storage_pool_details.pool_id WHERE storage_pool.removed is null and storage_pool.status = 'Up' and storage_pool.data_center_id = ? and (storage_pool.pod_id = ? or storage_pool.pod_id is null) and storage_pool.scope = ? and (";
     protected final String DetailsSqlSuffix = ") GROUP BY storage_pool_details.pool_id HAVING COUNT(storage_pool_details.name) >= ?";
-    private final String ZoneWideTagsSqlPrefix = "SELECT storage_pool.* from storage_pool LEFT JOIN storage_pool_tags ON storage_pool.id = storage_pool_tags.pool_id WHERE storage_pool.removed is null and storage_pool.status = 'Up' and storage_pool.data_center_id = ? and storage_pool.scope = ? and (";
+    private final String ZoneWideTagsSqlPrefix = "SELECT storage_pool.* from storage_pool LEFT JOIN storage_pool_tags ON storage_pool.id = storage_pool_tags.pool_id WHERE storage_pool.removed is null and storage_pool.status = 'Up' AND storage_pool_tags.is_tag_a_rule = 0 and storage_pool.data_center_id = ? and storage_pool.scope = ? and (";
     private final String ZoneWideTagsSqlSuffix = ") GROUP BY storage_pool_tags.pool_id HAVING COUNT(storage_pool_tags.tag) >= ?";
 
     // Storage tags are now separate from storage_pool_details, leaving only details on that table
-    protected final String TagsSqlPrefix = "SELECT storage_pool.* from storage_pool LEFT JOIN storage_pool_tags ON storage_pool.id = storage_pool_tags.pool_id WHERE storage_pool.removed is null and storage_pool.status = 'Up' and storage_pool.data_center_id = ? and (storage_pool.pod_id = ? or storage_pool.pod_id is null) and storage_pool.scope = ? and (";
+    protected final String TagsSqlPrefix = "SELECT storage_pool.* from storage_pool LEFT JOIN storage_pool_tags ON storage_pool.id = storage_pool_tags.pool_id WHERE storage_pool.removed is null and storage_pool.status = 'Up' AND storage_pool_tags.is_tag_a_rule = 0 and storage_pool.data_center_id = ? and (storage_pool.pod_id = ? or storage_pool.pod_id is null) and storage_pool.scope = ? and (";
     protected final String TagsSqlSuffix = ") GROUP BY storage_pool_tags.pool_id HAVING COUNT(storage_pool_tags.tag) >= ?";
 
     private static final String GET_STORAGE_POOLS_OF_VOLUMES_WITHOUT_OR_NOT_HAVING_TAGS = "SELECT s.* " +
@@ -278,7 +278,7 @@ public class PrimaryDataStoreDaoImpl extends GenericDaoBase<StoragePoolVO, Long>
 
     @Override
     @DB
-    public StoragePoolVO persist(StoragePoolVO pool, Map<String, String> details, List<String> tags) {
+    public StoragePoolVO persist(StoragePoolVO pool, Map<String, String> details, List<String> tags, Boolean isTagARule) {
         TransactionLegacy txn = TransactionLegacy.currentTxn();
         txn.start();
         pool = super.persist(pool);
@@ -289,7 +289,7 @@ public class PrimaryDataStoreDaoImpl extends GenericDaoBase<StoragePoolVO, Long>
             }
         }
         if (CollectionUtils.isNotEmpty(tags)) {
-            _tagsDao.persist(pool.getId(), tags);
+            _tagsDao.persist(pool.getId(), tags, isTagARule);
         }
         txn.commit();
         return pool;
@@ -404,10 +404,15 @@ public class PrimaryDataStoreDaoImpl extends GenericDaoBase<StoragePoolVO, Long>
     }
 
     @Override
-    public List<StoragePoolVO> findPoolsByTags(long dcId, long podId, Long clusterId, String[] tags) {
+    public List<StoragePoolVO> findPoolsByTags(long dcId, long podId, Long clusterId, String[] tags, boolean validateTagRule, long ruleExecuteTimeout) {
         List<StoragePoolVO> storagePools = null;
         if (tags == null || tags.length == 0) {
             storagePools = listBy(dcId, podId, clusterId, ScopeType.CLUSTER);
+
+            if (validateTagRule) {
+                storagePools = getPoolsWithoutTagRule(storagePools);
+            }
+
         } else {
             String sqlValues = getSqlValuesFromStorageTags(tags);
             storagePools = findPoolsByDetailsOrTagsInternal(dcId, podId, clusterId, ScopeType.CLUSTER, sqlValues, ValueType.TAGS, tags.length);
@@ -437,10 +442,14 @@ public class PrimaryDataStoreDaoImpl extends GenericDaoBase<StoragePoolVO, Long>
     }
 
     @Override
-    public List<StoragePoolVO> findLocalStoragePoolsByTags(long dcId, long podId, Long clusterId, String[] tags) {
+    public List<StoragePoolVO> findLocalStoragePoolsByTags(long dcId, long podId, Long clusterId, String[] tags, boolean validateTagRule) {
         List<StoragePoolVO> storagePools = null;
         if (tags == null || tags.length == 0) {
             storagePools = listBy(dcId, podId, clusterId, ScopeType.HOST);
+
+            if (validateTagRule) {
+                storagePools = getPoolsWithoutTagRule(storagePools);
+            }
         } else {
             String sqlValues = getSqlValuesFromStorageTags(tags);
             storagePools = findPoolsByDetailsOrTagsInternal(dcId, podId, clusterId, ScopeType.HOST, sqlValues, ValueType.TAGS, tags.length);
@@ -483,13 +492,20 @@ public class PrimaryDataStoreDaoImpl extends GenericDaoBase<StoragePoolVO, Long>
     }
 
     @Override
-    public List<StoragePoolVO> findZoneWideStoragePoolsByTags(long dcId, String[] tags) {
+    public List<StoragePoolVO> findZoneWideStoragePoolsByTags(long dcId, String[] tags, boolean validateTagRule) {
         if (tags == null || tags.length == 0) {
             QueryBuilder<StoragePoolVO> sc = QueryBuilder.create(StoragePoolVO.class);
             sc.and(sc.entity().getDataCenterId(), Op.EQ, dcId);
             sc.and(sc.entity().getStatus(), Op.EQ, Status.Up);
             sc.and(sc.entity().getScope(), Op.EQ, ScopeType.ZONE);
-            return sc.list();
+
+            List<StoragePoolVO> storagePools = sc.list();
+
+            if (validateTagRule) {
+                storagePools = getPoolsWithoutTagRule(storagePools);
+            }
+
+            return storagePools;
         } else {
             String sqlValues = getSqlValuesFromStorageTags(tags);
             String sql = getSqlPreparedStatement(ZoneWideTagsSqlPrefix, ZoneWideTagsSqlSuffix, sqlValues, null);
@@ -497,6 +513,20 @@ public class PrimaryDataStoreDaoImpl extends GenericDaoBase<StoragePoolVO, Long>
         }
     }
 
+    protected List<StoragePoolVO> getPoolsWithoutTagRule(List<StoragePoolVO> storagePools) {
+        List<StoragePoolVO> storagePoolsToReturn = new ArrayList<>();
+        for (StoragePoolVO storagePool : storagePools) {
+
+            List<StoragePoolTagVO> poolTags = _tagsDao.findStoragePoolTags(storagePool.getId());
+
+            if (CollectionUtils.isEmpty(poolTags) || !poolTags.get(0).isTagARule()) {
+                storagePoolsToReturn.add(storagePool);
+            }
+        }
+
+        return storagePoolsToReturn;
+    }
+
     @Override
     public List<String> searchForStoragePoolTags(long poolId) {
         return _tagsDao.getStoragePoolTags(poolId);
diff --git a/engine/schema/src/main/resources/META-INF/db/schema-41810to41900.sql b/engine/schema/src/main/resources/META-INF/db/schema-41810to41900.sql
index 61f03be1713..ec6972ccc59 100644
--- a/engine/schema/src/main/resources/META-INF/db/schema-41810to41900.sql
+++ b/engine/schema/src/main/resources/META-INF/db/schema-41810to41900.sql
@@ -224,3 +224,12 @@ CREATE TABLE `cloud`.`oauth_provider` (
   `removed` datetime COMMENT 'date removed if not null',
   PRIMARY KEY (`id`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+-- Flexible tags
+ALTER TABLE `cloud`.`storage_pool_tags` ADD COLUMN is_tag_a_rule int(1) UNSIGNED not null DEFAULT 0;
+
+ALTER TABLE `cloud`.`storage_pool_tags` MODIFY tag text NOT NULL;
+
+ALTER TABLE `cloud`.`host_tags` ADD COLUMN is_tag_a_rule int(1) UNSIGNED not null DEFAULT 0;
+
+ALTER TABLE `cloud`.`host_tags` MODIFY tag text NOT NULL;
diff --git a/engine/schema/src/main/resources/META-INF/db/views/cloud.host_view.sql b/engine/schema/src/main/resources/META-INF/db/views/cloud.host_view.sql
new file mode 100644
index 00000000000..5c6d4fd772b
--- /dev/null
+++ b/engine/schema/src/main/resources/META-INF/db/views/cloud.host_view.sql
@@ -0,0 +1,111 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- VIEW `cloud`.`host_view`;
+
+DROP VIEW IF EXISTS `cloud`.`host_view`;
+
+CREATE VIEW `cloud`.`host_view` AS
+SELECT
+    host.id,
+    host.uuid,
+    host.name,
+    host.status,
+    host.disconnected,
+    host.type,
+    host.private_ip_address,
+    host.version,
+    host.hypervisor_type,
+    host.hypervisor_version,
+    host.capabilities,
+    host.last_ping,
+    host.created,
+    host.removed,
+    host.resource_state,
+    host.mgmt_server_id,
+    host.cpu_sockets,
+    host.cpus,
+    host.speed,
+    host.ram,
+    cluster.id cluster_id,
+    cluster.uuid cluster_uuid,
+    cluster.name cluster_name,
+    cluster.cluster_type,
+    data_center.id data_center_id,
+    data_center.uuid data_center_uuid,
+    data_center.name data_center_name,
+    data_center.networktype data_center_type,
+    host_pod_ref.id pod_id,
+    host_pod_ref.uuid pod_uuid,
+    host_pod_ref.name pod_name,
+    GROUP_CONCAT(DISTINCT(host_tags.tag)) AS tag,
+    `host_tags`.`is_tag_a_rule` AS `is_tag_a_rule`,
+    guest_os_category.id guest_os_category_id,
+    guest_os_category.uuid guest_os_category_uuid,
+    guest_os_category.name guest_os_category_name,
+    mem_caps.used_capacity memory_used_capacity,
+    mem_caps.reserved_capacity memory_reserved_capacity,
+    cpu_caps.used_capacity cpu_used_capacity,
+    cpu_caps.reserved_capacity cpu_reserved_capacity,
+    async_job.id job_id,
+    async_job.uuid job_uuid,
+    async_job.job_status job_status,
+    async_job.account_id job_account_id,
+    oobm.enabled AS `oobm_enabled`,
+    oobm.power_state AS `oobm_power_state`,
+    ha_config.enabled AS `ha_enabled`,
+    ha_config.ha_state AS `ha_state`,
+    ha_config.provider AS `ha_provider`,
+    `last_annotation_view`.`annotation` AS `annotation`,
+    `last_annotation_view`.`created` AS `last_annotated`,
+    `user`.`username` AS `username`
+FROM
+    `cloud`.`host`
+        LEFT JOIN
+    `cloud`.`cluster` ON host.cluster_id = cluster.id
+        LEFT JOIN
+    `cloud`.`data_center` ON host.data_center_id = data_center.id
+        LEFT JOIN
+    `cloud`.`host_pod_ref` ON host.pod_id = host_pod_ref.id
+        LEFT JOIN
+    `cloud`.`host_details` ON host.id = host_details.host_id
+        AND host_details.name = 'guest.os.category.id'
+        LEFT JOIN
+    `cloud`.`guest_os_category` ON guest_os_category.id = CONVERT ( host_details.value, UNSIGNED )
+        LEFT JOIN
+    `cloud`.`host_tags` ON host_tags.host_id = host.id
+        LEFT JOIN
+    `cloud`.`op_host_capacity` mem_caps ON host.id = mem_caps.host_id
+        AND mem_caps.capacity_type = 0
+        LEFT JOIN
+    `cloud`.`op_host_capacity` cpu_caps ON host.id = cpu_caps.host_id
+        AND cpu_caps.capacity_type = 1
+        LEFT JOIN
+    `cloud`.`async_job` ON async_job.instance_id = host.id
+        AND async_job.instance_type = 'Host'
+        AND async_job.job_status = 0
+        LEFT JOIN
+    `cloud`.`oobm` ON oobm.host_id = host.id
+        left join
+    `cloud`.`ha_config` ON ha_config.resource_id=host.id
+        and ha_config.resource_type='Host'
+        LEFT JOIN
+    `cloud`.`last_annotation_view` ON `last_annotation_view`.`entity_uuid` = `host`.`uuid`
+        LEFT JOIN
+    `cloud`.`user` ON `user`.`uuid` = `last_annotation_view`.`user_uuid`
+GROUP BY
+    `host`.`id`;
diff --git a/engine/schema/src/main/resources/META-INF/db/views/cloud.storage_pool_view.sql b/engine/schema/src/main/resources/META-INF/db/views/cloud.storage_pool_view.sql
new file mode 100644
index 00000000000..e6cc9458208
--- /dev/null
+++ b/engine/schema/src/main/resources/META-INF/db/views/cloud.storage_pool_view.sql
@@ -0,0 +1,68 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- VIEW `cloud`.`storage_pool_view`;
+
+DROP VIEW IF EXISTS `cloud`.`storage_pool_view`;
+
+CREATE VIEW `cloud`.`storage_pool_view` AS
+SELECT
+    `storage_pool`.`id` AS `id`,
+    `storage_pool`.`uuid` AS `uuid`,
+    `storage_pool`.`name` AS `name`,
+    `storage_pool`.`status` AS `status`,
+    `storage_pool`.`path` AS `path`,
+    `storage_pool`.`pool_type` AS `pool_type`,
+    `storage_pool`.`host_address` AS `host_address`,
+    `storage_pool`.`created` AS `created`,
+    `storage_pool`.`removed` AS `removed`,
+    `storage_pool`.`capacity_bytes` AS `capacity_bytes`,
+    `storage_pool`.`capacity_iops` AS `capacity_iops`,
+    `storage_pool`.`scope` AS `scope`,
+    `storage_pool`.`hypervisor` AS `hypervisor`,
+    `storage_pool`.`storage_provider_name` AS `storage_provider_name`,
+    `storage_pool`.`parent` AS `parent`,
+    `cluster`.`id` AS `cluster_id`,
+    `cluster`.`uuid` AS `cluster_uuid`,
+    `cluster`.`name` AS `cluster_name`,
+    `cluster`.`cluster_type` AS `cluster_type`,
+    `data_center`.`id` AS `data_center_id`,
+    `data_center`.`uuid` AS `data_center_uuid`,
+    `data_center`.`name` AS `data_center_name`,
+    `data_center`.`networktype` AS `data_center_type`,
+    `host_pod_ref`.`id` AS `pod_id`,
+    `host_pod_ref`.`uuid` AS `pod_uuid`,
+    `host_pod_ref`.`name` AS `pod_name`,
+    `storage_pool_tags`.`tag` AS `tag`,
+    `storage_pool_tags`.`is_tag_a_rule` AS `is_tag_a_rule`,
+    `op_host_capacity`.`used_capacity` AS `disk_used_capacity`,
+    `op_host_capacity`.`reserved_capacity` AS `disk_reserved_capacity`,
+    `async_job`.`id` AS `job_id`,
+    `async_job`.`uuid` AS `job_uuid`,
+    `async_job`.`job_status` AS `job_status`,
+    `async_job`.`account_id` AS `job_account_id`
+FROM
+    ((((((`cloud`.`storage_pool`
+        LEFT JOIN `cloud`.`cluster` ON ((`storage_pool`.`cluster_id` = `cluster`.`id`)))
+        LEFT JOIN `cloud`.`data_center` ON ((`storage_pool`.`data_center_id` = `data_center`.`id`)))
+        LEFT JOIN `cloud`.`host_pod_ref` ON ((`storage_pool`.`pod_id` = `host_pod_ref`.`id`)))
+        LEFT JOIN `cloud`.`storage_pool_tags` ON (((`storage_pool_tags`.`pool_id` = `storage_pool`.`id`))))
+        LEFT JOIN `cloud`.`op_host_capacity` ON (((`storage_pool`.`id` = `op_host_capacity`.`host_id`)
+        AND (`op_host_capacity`.`capacity_type` IN (3 , 9)))))
+        LEFT JOIN `cloud`.`async_job` ON (((`async_job`.`instance_id` = `storage_pool`.`id`)
+        AND (`async_job`.`instance_type` = 'StoragePool')
+        AND (`async_job`.`job_status` = 0))));
diff --git a/engine/schema/src/test/java/com/cloud/host/HostVOTest.java b/engine/schema/src/test/java/com/cloud/host/HostVOTest.java
index 9ab010dd6a7..76bc5270b4f 100755
--- a/engine/schema/src/test/java/com/cloud/host/HostVOTest.java
+++ b/engine/schema/src/test/java/com/cloud/host/HostVOTest.java
@@ -1,61 +1,84 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.host;
-
-import com.cloud.service.ServiceOfferingVO;
-import com.cloud.vm.VirtualMachine;
-import java.util.Arrays;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import org.junit.Test;
-import org.junit.Before;
-
-public class HostVOTest {
-    HostVO host;
-    ServiceOfferingVO offering;
-
-    @Before
-    public void setUp() throws Exception {
-        host = new HostVO();
-        offering = new ServiceOfferingVO("TestSO", 0, 0, 0, 0, 0,
-                false, "TestSO", false,VirtualMachine.Type.User,false);
-    }
-
-    @Test
-    public void testNoSO() {
-        assertFalse(host.checkHostServiceOfferingTags(null));
-    }
-
-    @Test
-    public void testNoTag() {
-        assertTrue(host.checkHostServiceOfferingTags(offering));
-    }
-
-    @Test
-    public void testRightTag() {
-        host.setHostTags(Arrays.asList("tag1","tag2"));
-        offering.setHostTag("tag2,tag1");
-        assertTrue(host.checkHostServiceOfferingTags(offering));
-    }
-
-    @Test
-    public void testWrongTag() {
-        host.setHostTags(Arrays.asList("tag1","tag2"));
-        offering.setHostTag("tag2,tag4");
-        assertFalse(host.checkHostServiceOfferingTags(offering));
-    }
-}
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.host;
+
+import com.cloud.service.ServiceOfferingVO;
+import com.cloud.vm.VirtualMachine;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+import org.junit.Before;
+
+public class HostVOTest {
+    HostVO host;
+    ServiceOfferingVO offering;
+
+    @Before
+    public void setUp() throws Exception {
+        host = new HostVO();
+        offering = new ServiceOfferingVO("TestSO", 0, 0, 0, 0, 0,
+                false, "TestSO", false,VirtualMachine.Type.User,false);
+    }
+
+    @Test
+    public void testNoSO() {
+        assertFalse(host.checkHostServiceOfferingTags(null));
+    }
+
+    @Test
+    public void testNoTag() {
+        assertTrue(host.checkHostServiceOfferingTags(offering));
+    }
+
+    @Test
+    public void testRightTag() {
+        host.setHostTags(Arrays.asList("tag1","tag2"), false);
+        offering.setHostTag("tag2,tag1");
+        assertTrue(host.checkHostServiceOfferingTags(offering));
+    }
+
+    @Test
+    public void testWrongTag() {
+        host.setHostTags(Arrays.asList("tag1","tag2"), false);
+        offering.setHostTag("tag2,tag4");
+        assertFalse(host.checkHostServiceOfferingTags(offering));
+    }
+
+    @Test
+    public void checkHostServiceOfferingTagsTestRuleTagWithServiceTagThatMatches() {
+        host.setHostTags(List.of("tags[0] == 'A'"), true);
+        offering.setHostTag("A");
+        assertTrue(host.checkHostServiceOfferingTags(offering));
+    }
+
+    @Test
+    public void checkHostServiceOfferingTagsTestRuleTagWithServiceTagThatDoesNotMatch() {
+        host.setHostTags(List.of("tags[0] == 'A'"), true);
+        offering.setHostTag("B");
+        assertFalse(host.checkHostServiceOfferingTags(offering));
+    }
+
+    @Test
+    public void checkHostServiceOfferingTagsTestRuleTagWithNullServiceTag() {
+        host.setHostTags(List.of("tags[0] == 'A'"), true);
+        offering.setHostTag(null);
+        assertFalse(host.checkHostServiceOfferingTags(offering));
+    }
+}
diff --git a/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/AbstractStoragePoolAllocator.java b/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/AbstractStoragePoolAllocator.java
index f2b0f17232b..89a7b577ae7 100644
--- a/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/AbstractStoragePoolAllocator.java
+++ b/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/AbstractStoragePoolAllocator.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
+import com.cloud.api.query.dao.StoragePoolJoinDao;
 import com.cloud.exception.StorageUnavailableException;
 import com.cloud.storage.ScopeType;
 import com.cloud.storage.StoragePoolStatus;
@@ -85,6 +86,9 @@ public abstract class AbstractStoragePoolAllocator extends AdapterBase implement
      */
     private SecureRandom secureRandom = new SecureRandom();
 
+    @Inject
+    protected StoragePoolJoinDao storagePoolJoinDao;
+
     @Override
     public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
         super.configure(name, params);
diff --git a/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/ClusterScopeStoragePoolAllocator.java b/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/ClusterScopeStoragePoolAllocator.java
index fe49504fda1..9c0f84ab14a 100644
--- a/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/ClusterScopeStoragePoolAllocator.java
+++ b/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/ClusterScopeStoragePoolAllocator.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
+import com.cloud.storage.VolumeApiServiceImpl;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 import org.apache.log4j.Logger;
 import org.springframework.stereotype.Component;
@@ -78,11 +79,12 @@ public class ClusterScopeStoragePoolAllocator extends AbstractStoragePoolAllocat
             logDisabledStoragePools(dcId, podId, clusterId, ScopeType.CLUSTER);
         }
 
-        List<StoragePoolVO> pools = storagePoolDao.findPoolsByTags(dcId, podId, clusterId, dskCh.getTags());
+        List<StoragePoolVO> pools = storagePoolDao.findPoolsByTags(dcId, podId, clusterId, dskCh.getTags(), true, VolumeApiServiceImpl.storageTagRuleExecutionTimeout.value());
+        pools.addAll(storagePoolJoinDao.findStoragePoolByScopeAndRuleTags(dcId, podId, clusterId, ScopeType.CLUSTER, List.of(dskCh.getTags())));
         s_logger.debug(String.format("Found pools [%s] that match with tags [%s].", pools, Arrays.toString(dskCh.getTags())));
 
         // add remaining pools in cluster, that did not match tags, to avoid set
-        List<StoragePoolVO> allPools = storagePoolDao.findPoolsByTags(dcId, podId, clusterId, null);
+        List<StoragePoolVO> allPools = storagePoolDao.findPoolsByTags(dcId, podId, clusterId, null, false, 0);
         allPools.removeAll(pools);
         for (StoragePoolVO pool : allPools) {
             s_logger.trace(String.format("Adding pool [%s] to the 'avoid' set since it did not match any tags.", pool));
diff --git a/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/LocalStoragePoolAllocator.java b/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/LocalStoragePoolAllocator.java
index 774c2229a09..4ec15b9e43f 100644
--- a/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/LocalStoragePoolAllocator.java
+++ b/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/LocalStoragePoolAllocator.java
@@ -101,7 +101,8 @@ public class LocalStoragePoolAllocator extends AbstractStoragePoolAllocator {
                 return null;
             }
             List<StoragePoolVO> availablePools =
-                storagePoolDao.findLocalStoragePoolsByTags(plan.getDataCenterId(), plan.getPodId(), plan.getClusterId(), dskCh.getTags());
+                storagePoolDao.findLocalStoragePoolsByTags(plan.getDataCenterId(), plan.getPodId(), plan.getClusterId(), dskCh.getTags(), true);
+            availablePools.addAll(storagePoolJoinDao.findStoragePoolByScopeAndRuleTags(plan.getDataCenterId(), plan.getPodId(), plan.getClusterId(), ScopeType.HOST, List.of(dskCh.getTags())));
             for (StoragePoolVO pool : availablePools) {
                 if (suitablePools.size() == returnUpTo) {
                     break;
@@ -117,7 +118,7 @@ public class LocalStoragePoolAllocator extends AbstractStoragePoolAllocator {
             }
 
             // add remaining pools in cluster to the 'avoid' set which did not match tags
-            List<StoragePoolVO> allPools = storagePoolDao.findLocalStoragePoolsByTags(plan.getDataCenterId(), plan.getPodId(), plan.getClusterId(), null);
+            List<StoragePoolVO> allPools = storagePoolDao.findLocalStoragePoolsByTags(plan.getDataCenterId(), plan.getPodId(), plan.getClusterId(), null, false);
             allPools.removeAll(availablePools);
             for (StoragePoolVO pool : allPools) {
                 avoid.addPool(pool.getId());
diff --git a/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/ZoneWideStoragePoolAllocator.java b/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/ZoneWideStoragePoolAllocator.java
index 1b3835560df..ba130b4e2e5 100644
--- a/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/ZoneWideStoragePoolAllocator.java
+++ b/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/ZoneWideStoragePoolAllocator.java
@@ -63,9 +63,9 @@ public class ZoneWideStoragePoolAllocator extends AbstractStoragePoolAllocator {
         }
 
         List<StoragePool> suitablePools = new ArrayList<>();
-
-        List<StoragePoolVO> storagePools = storagePoolDao.findZoneWideStoragePoolsByTags(plan.getDataCenterId(), dskCh.getTags());
-        if (storagePools == null) {
+        List<StoragePoolVO> storagePools = storagePoolDao.findZoneWideStoragePoolsByTags(plan.getDataCenterId(), dskCh.getTags(), true);
+        storagePools.addAll(storagePoolJoinDao.findStoragePoolByScopeAndRuleTags(plan.getDataCenterId(), null, null, ScopeType.ZONE, List.of(dskCh.getTags())));
+        if (storagePools.isEmpty()) {
             LOGGER.debug(String.format("Could not find any zone wide storage pool that matched with any of the following tags [%s].", Arrays.toString(dskCh.getTags())));
             storagePools = new ArrayList<>();
         }
@@ -82,7 +82,7 @@ public class ZoneWideStoragePoolAllocator extends AbstractStoragePoolAllocator {
         storagePools.addAll(anyHypervisorStoragePools);
 
         // add remaining pools in zone, that did not match tags, to avoid set
-        List<StoragePoolVO> allPools = storagePoolDao.findZoneWideStoragePoolsByTags(plan.getDataCenterId(), null);
+        List<StoragePoolVO> allPools = storagePoolDao.findZoneWideStoragePoolsByTags(plan.getDataCenterId(), null, false);
         allPools.removeAll(storagePools);
         for (StoragePoolVO pool : allPools) {
             avoid.addPool(pool.getId());
diff --git a/engine/storage/src/main/java/org/apache/cloudstack/storage/volume/datastore/PrimaryDataStoreHelper.java b/engine/storage/src/main/java/org/apache/cloudstack/storage/volume/datastore/PrimaryDataStoreHelper.java
index c3379ad316b..fbb4a6e1618 100644
--- a/engine/storage/src/main/java/org/apache/cloudstack/storage/volume/datastore/PrimaryDataStoreHelper.java
+++ b/engine/storage/src/main/java/org/apache/cloudstack/storage/volume/datastore/PrimaryDataStoreHelper.java
@@ -136,7 +136,7 @@ public class PrimaryDataStoreHelper {
                 storageTags.add(tag);
             }
         }
-        dataStoreVO = dataStoreDao.persist(dataStoreVO, details, storageTags);
+        dataStoreVO = dataStoreDao.persist(dataStoreVO, details, storageTags, params.isTagARule());
         return dataStoreMgr.getDataStore(dataStoreVO.getId(), DataStoreRole.Primary);
     }
 
diff --git a/framework/quota/src/main/java/org/apache/cloudstack/quota/activationrule/presetvariables/Host.java b/framework/quota/src/main/java/org/apache/cloudstack/quota/activationrule/presetvariables/Host.java
index 337131038e8..fef3e4376dc 100644
--- a/framework/quota/src/main/java/org/apache/cloudstack/quota/activationrule/presetvariables/Host.java
+++ b/framework/quota/src/main/java/org/apache/cloudstack/quota/activationrule/presetvariables/Host.java
@@ -22,6 +22,8 @@ import java.util.List;
 public class Host extends GenericPresetVariable {
     private List<String> tags;
 
+    private Boolean isTagARule;
+
     public List<String> getTags() {
         return tags;
     }
@@ -31,4 +33,12 @@ public class Host extends GenericPresetVariable {
         fieldNamesToIncludeInToString.add("tags");
     }
 
+    public Boolean getIsTagARule() {
+        return isTagARule;
+    }
+
+    public void setIsTagARule(Boolean isTagARule) {
+        this.isTagARule = isTagARule;
+        fieldNamesToIncludeInToString.add("isTagARule");
+    }
 }
diff --git a/framework/quota/src/main/java/org/apache/cloudstack/quota/activationrule/presetvariables/PresetVariableHelper.java b/framework/quota/src/main/java/org/apache/cloudstack/quota/activationrule/presetvariables/PresetVariableHelper.java
index 6ce8bd889a6..9723d3e5899 100644
--- a/framework/quota/src/main/java/org/apache/cloudstack/quota/activationrule/presetvariables/PresetVariableHelper.java
+++ b/framework/quota/src/main/java/org/apache/cloudstack/quota/activationrule/presetvariables/PresetVariableHelper.java
@@ -25,9 +25,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import com.cloud.host.HostTagVO;
 import javax.inject.Inject;
 
 import com.cloud.hypervisor.Hypervisor;
+import com.cloud.storage.StoragePoolTagVO;
 import org.apache.cloudstack.acl.RoleVO;
 import org.apache.cloudstack.acl.dao.RoleDao;
 import org.apache.cloudstack.backup.BackupOfferingVO;
@@ -351,7 +353,17 @@ public class PresetVariableHelper {
         Host host = new Host();
         host.setId(hostVo.getUuid());
         host.setName(hostVo.getName());
-        host.setTags(hostTagsDao.getHostTags(hostId));
+        List<HostTagVO> hostTagVOList = hostTagsDao.getHostTags(hostId);
+        List<String> hostTags = new ArrayList<>();
+        boolean isTagARule = false;
+        if (CollectionUtils.isNotEmpty(hostTagVOList)) {
+            isTagARule = hostTagVOList.get(0).getIsTagARule();
+            if (!isTagARule) {
+                hostTags = hostTagVOList.parallelStream().map(HostTagVO::getTag).collect(Collectors.toList());
+            }
+        }
+        host.setTags(hostTags);
+        host.setIsTagARule(isTagARule);
 
         return host;
     }
@@ -508,7 +520,17 @@ public class PresetVariableHelper {
         storage.setId(storagePoolVo.getUuid());
         storage.setName(storagePoolVo.getName());
         storage.setScope(storagePoolVo.getScope());
-        storage.setTags(storagePoolTagsDao.getStoragePoolTags(storageId));
+        List<StoragePoolTagVO> storagePoolTagVOList = storagePoolTagsDao.findStoragePoolTags(storageId);
+        List<String> storageTags = new ArrayList<>();
+        boolean isTagARule = false;
+        if (CollectionUtils.isNotEmpty(storagePoolTagVOList)) {
+            isTagARule = storagePoolTagVOList.get(0).isTagARule();
+            if (!isTagARule) {
+                storageTags = storagePoolTagVOList.parallelStream().map(StoragePoolTagVO::getTag).collect(Collectors.toList());
+            }
+        }
+        storage.setTags(storageTags);
+        storage.setIsTagARule(isTagARule);
 
         return storage;
     }
diff --git a/framework/quota/src/main/java/org/apache/cloudstack/quota/activationrule/presetvariables/Storage.java b/framework/quota/src/main/java/org/apache/cloudstack/quota/activationrule/presetvariables/Storage.java
index 3533c5d45c1..6be1dfb025a 100644
--- a/framework/quota/src/main/java/org/apache/cloudstack/quota/activationrule/presetvariables/Storage.java
+++ b/framework/quota/src/main/java/org/apache/cloudstack/quota/activationrule/presetvariables/Storage.java
@@ -23,6 +23,8 @@ import com.cloud.storage.ScopeType;
 
 public class Storage extends GenericPresetVariable {
     private List<String> tags;
+
+    private Boolean isTagARule;
     private ScopeType scope;
 
     public List<String> getTags() {
@@ -34,6 +36,15 @@ public class Storage extends GenericPresetVariable {
         fieldNamesToIncludeInToString.add("tags");
     }
 
+    public Boolean getIsTagARule() {
+        return isTagARule;
+    }
+
+    public void setIsTagARule(Boolean isTagARule) {
+        this.isTagARule = isTagARule;
+        fieldNamesToIncludeInToString.add("isTagARule");
+    }
+
     public ScopeType getScope() {
         return scope;
     }
diff --git a/framework/quota/src/test/java/org/apache/cloudstack/quota/activationrule/presetvariables/PresetVariableHelperTest.java b/framework/quota/src/test/java/org/apache/cloudstack/quota/activationrule/presetvariables/PresetVariableHelperTest.java
index ae15e573fa8..b973d1145c3 100644
--- a/framework/quota/src/test/java/org/apache/cloudstack/quota/activationrule/presetvariables/PresetVariableHelperTest.java
+++ b/framework/quota/src/test/java/org/apache/cloudstack/quota/activationrule/presetvariables/PresetVariableHelperTest.java
@@ -27,7 +27,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.cloud.host.HostTagVO;
 import com.cloud.hypervisor.Hypervisor;
+import com.cloud.storage.StoragePoolTagVO;
 import org.apache.cloudstack.acl.RoleType;
 import org.apache.cloudstack.acl.RoleVO;
 import org.apache.cloudstack.acl.dao.RoleDao;
@@ -241,6 +243,14 @@ public class PresetVariableHelperTest {
         return host;
     }
 
+    private List<HostTagVO> getHostTagsForTests() {
+        return Arrays.asList(new HostTagVO(1, "tag1", false), new HostTagVO(1, "tag2", false));
+    }
+
+    private List<HostTagVO> getHostRuleTagsForTests() {
+        return List.of(new HostTagVO(1, "tagrule", true));
+    }
+
     private Storage getStorageForTests() {
         Storage storage = new Storage();
         storage.setId("storage_id");
@@ -250,6 +260,14 @@ public class PresetVariableHelperTest {
         return storage;
     }
 
+    private List<StoragePoolTagVO> getStorageTagsForTests() {
+        return Arrays.asList(new StoragePoolTagVO(1, "tag1", false), new StoragePoolTagVO(1, "tag2", false));
+    }
+
+    private List<StoragePoolTagVO> getStorageRuleTagsForTests() {
+        return List.of(new StoragePoolTagVO(1, "tagrule", true));
+    }
+
     private Set<Map.Entry<Integer, QuotaTypes>> getQuotaTypesForTests(Integer... typesToRemove) {
         Map<Integer, QuotaTypes> quotaTypesMap = new LinkedHashMap<>(QuotaTypes.listQuotaTypes());
 
@@ -539,15 +557,16 @@ public class PresetVariableHelperTest {
     @Test
     public void setPresetVariableHostInValueIfUsageTypeIsRunningVmTestQuotaTypeIsRunningVmSetHost() {
         Value result = new Value();
-        Host expected = getHostForTests();
+        Host expectedHost = getHostForTests();
+        List<HostTagVO> expectedHostTags = getHostTagsForTests();
 
-        Mockito.doReturn(expected).when(presetVariableHelperSpy).getPresetVariableValueHost(Mockito.anyLong());
+        Mockito.doReturn(expectedHost).when(presetVariableHelperSpy).getPresetVariableValueHost(Mockito.anyLong());
         presetVariableHelperSpy.setPresetVariableHostInValueIfUsageTypeIsRunningVm(result, UsageTypes.RUNNING_VM, vmInstanceVoMock);
 
         Assert.assertNotNull(result.getHost());
 
-        assertPresetVariableIdAndName(expected, result.getHost());
-        Assert.assertEquals(expected.getTags(), result.getHost().getTags());
+        assertPresetVariableIdAndName(expectedHost, result.getHost());
+        Assert.assertEquals(expectedHost.getTags(), result.getHost().getTags());
         validateFieldNamesToIncludeInToString(Arrays.asList("host"), result);
     }
 
@@ -555,18 +574,39 @@ public class PresetVariableHelperTest {
     public void getPresetVariableValueHostTestSetFieldsAndReturnObject() {
         Host expected = getHostForTests();
         HostVO hostVoMock = Mockito.mock(HostVO.class);
+        List<HostTagVO> hostTagVOListMock = getHostTagsForTests();
 
         Mockito.doReturn(hostVoMock).when(hostDaoMock).findByIdIncludingRemoved(Mockito.anyLong());
         mockMethodValidateIfObjectIsNull();
         Mockito.doReturn(expected.getId()).when(hostVoMock).getUuid();
         Mockito.doReturn(expected.getName()).when(hostVoMock).getName();
-        Mockito.doReturn(expected.getTags()).when(hostTagsDaoMock).getHostTags(Mockito.anyLong());
+        Mockito.doReturn(hostTagVOListMock).when(hostTagsDaoMock).getHostTags(Mockito.anyLong());
 
         Host result = presetVariableHelperSpy.getPresetVariableValueHost(1l);
 
         assertPresetVariableIdAndName(expected, result);
         Assert.assertEquals(expected.getTags(), result.getTags());
-        validateFieldNamesToIncludeInToString(Arrays.asList("id", "name", "tags"), result);
+        validateFieldNamesToIncludeInToString(Arrays.asList("id", "isTagARule", "name", "tags"), result);
+    }
+
+    @Test
+    public void getPresetVariableValueHostTestSetFieldsWithRuleTagAndReturnObject() {
+        Host expected = getHostForTests();
+        HostVO hostVoMock = Mockito.mock(HostVO.class);
+        List<HostTagVO> hostTagVOListMock = getHostRuleTagsForTests();
+
+        Mockito.doReturn(hostVoMock).when(hostDaoMock).findByIdIncludingRemoved(Mockito.anyLong());
+        mockMethodValidateIfObjectIsNull();
+        Mockito.doReturn(expected.getId()).when(hostVoMock).getUuid();
+        Mockito.doReturn(expected.getName()).when(hostVoMock).getName();
+        Mockito.doReturn(hostTagVOListMock).when(hostTagsDaoMock).getHostTags(Mockito.anyLong());
+
+        Host result = presetVariableHelperSpy.getPresetVariableValueHost(1l);
+
+        assertPresetVariableIdAndName(expected, result);
+        Assert.assertEquals(new ArrayList<>(), result.getTags());
+        Assert.assertTrue(result.getIsTagARule());
+        validateFieldNamesToIncludeInToString(Arrays.asList("id", "isTagARule", "name", "tags"), result);
     }
 
     @Test
@@ -755,13 +795,15 @@ public class PresetVariableHelperTest {
         Storage expected = getStorageForTests();
         Mockito.doReturn(null).when(presetVariableHelperSpy).getSecondaryStorageForSnapshot(Mockito.anyLong(), Mockito.anyInt());
 
+        List<StoragePoolTagVO> storageTagVOListMock = getStorageTagsForTests();
+
         StoragePoolVO storagePoolVoMock = Mockito.mock(StoragePoolVO.class);
         Mockito.doReturn(storagePoolVoMock).when(primaryStorageDaoMock).findByIdIncludingRemoved(Mockito.anyLong());
 
         Mockito.doReturn(expected.getId()).when(storagePoolVoMock).getUuid();
         Mockito.doReturn(expected.getName()).when(storagePoolVoMock).getName();
         Mockito.doReturn(expected.getScope()).when(storagePoolVoMock).getScope();
-        Mockito.doReturn(expected.getTags()).when(storagePoolTagsDaoMock).getStoragePoolTags(Mockito.anyLong());
+        Mockito.doReturn(storageTagVOListMock).when(storagePoolTagsDaoMock).findStoragePoolTags(Mockito.anyLong());
 
         Storage result = presetVariableHelperSpy.getPresetVariableValueStorage(1l, 2);
 
@@ -769,7 +811,32 @@ public class PresetVariableHelperTest {
         Assert.assertEquals(expected.getScope(), result.getScope());
         Assert.assertEquals(expected.getTags(), result.getTags());
 
-        validateFieldNamesToIncludeInToString(Arrays.asList("id", "name", "scope", "tags"), result);
+        validateFieldNamesToIncludeInToString(Arrays.asList("id", "isTagARule",  "name", "scope", "tags"), result);
+    }
+
+    @Test
+    public void getPresetVariableValueStorageTestGetSecondaryStorageForSnapshotReturnsNullWithRuleTag() {
+        Storage expected = getStorageForTests();
+        Mockito.doReturn(null).when(presetVariableHelperSpy).getSecondaryStorageForSnapshot(Mockito.anyLong(), Mockito.anyInt());
+
+        List<StoragePoolTagVO> storageTagVOListMock = getStorageRuleTagsForTests();
+
+        StoragePoolVO storagePoolVoMock = Mockito.mock(StoragePoolVO.class);
+        Mockito.doReturn(storagePoolVoMock).when(primaryStorageDaoMock).findByIdIncludingRemoved(Mockito.anyLong());
+
+        Mockito.doReturn(expected.getId()).when(storagePoolVoMock).getUuid();
+        Mockito.doReturn(expected.getName()).when(storagePoolVoMock).getName();
+        Mockito.doReturn(expected.getScope()).when(storagePoolVoMock).getScope();
+        Mockito.doReturn(storageTagVOListMock).when(storagePoolTagsDaoMock).findStoragePoolTags(Mockito.anyLong());
+
+        Storage result = presetVariableHelperSpy.getPresetVariableValueStorage(1l, 2);
+
+        assertPresetVariableIdAndName(expected, result);
+        Assert.assertEquals(expected.getScope(), result.getScope());
+        Assert.assertEquals(new ArrayList<>(), result.getTags());
+        Assert.assertTrue(result.getIsTagARule());
+
+        validateFieldNamesToIncludeInToString(Arrays.asList("id", "isTagARule",  "name", "scope", "tags"), result);
     }
 
     @Test
diff --git a/plugins/host-allocators/random/src/main/java/com/cloud/agent/manager/allocator/impl/RandomAllocator.java b/plugins/host-allocators/random/src/main/java/com/cloud/agent/manager/allocator/impl/RandomAllocator.java
index 8a46d10a7b5..70920df5eb5 100644
--- a/plugins/host-allocators/random/src/main/java/com/cloud/agent/manager/allocator/impl/RandomAllocator.java
+++ b/plugins/host-allocators/random/src/main/java/com/cloud/agent/manager/allocator/impl/RandomAllocator.java
@@ -22,7 +22,9 @@ import java.util.List;
 
 import javax.inject.Inject;
 
+import com.cloud.utils.exception.CloudRuntimeException;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.ListUtils;
 import org.apache.log4j.Logger;
 import org.springframework.stereotype.Component;
 
@@ -72,7 +74,7 @@ public class RandomAllocator extends AdapterBase implements HostAllocator {
         }
         String hostTag = offering.getHostTag();
         if (hostTag != null) {
-            s_logger.debug("Looking for hosts in dc: " + dcId + "  pod:" + podId + "  cluster:" + clusterId + " having host tag:" + hostTag);
+            s_logger.debug(String.format("Looking for hosts in dc [%s], pod [%s], cluster [%s] and complying with host tag [%s].", dcId, podId, clusterId, hostTag));
         } else {
             s_logger.debug("Looking for hosts in dc: " + dcId + "  pod:" + podId + "  cluster:" + clusterId);
         }
@@ -82,7 +84,7 @@ public class RandomAllocator extends AdapterBase implements HostAllocator {
             if (hostTag != null) {
                 hostsCopy.retainAll(_hostDao.listByHostTag(type, clusterId, podId, dcId, hostTag));
             } else {
-                hostsCopy.retainAll(_resourceMgr.listAllUpAndEnabledHosts(type, clusterId, podId, dcId));
+                hostsCopy.retainAll(_hostDao.listAllHostsThatHaveNoRuleTag(type, clusterId, podId, dcId));
             }
         } else {
             // list all computing hosts, regardless of whether they support routing...it's random after all
@@ -90,9 +92,16 @@ public class RandomAllocator extends AdapterBase implements HostAllocator {
             if (hostTag != null) {
                 hostsCopy = _hostDao.listByHostTag(type, clusterId, podId, dcId, hostTag);
             } else {
-                hostsCopy = _resourceMgr.listAllUpAndEnabledHosts(type, clusterId, podId, dcId);
+                hostsCopy = _hostDao.listAllHostsThatHaveNoRuleTag(type, clusterId, podId, dcId);
             }
         }
+        hostsCopy = ListUtils.union(hostsCopy, _hostDao.findHostsWithTagRuleThatMatchComputeOferringTags(hostTag));
+
+        if (hostsCopy.isEmpty()) {
+            s_logger.error(String.format("No suitable host found for vm [%s] with tags [%s].", vmProfile, hostTag));
+            throw new CloudRuntimeException(String.format("No suitable host found for vm [%s].", vmProfile));
+        }
+
         s_logger.debug("Random Allocator found " + hostsCopy.size() + "  hosts");
         if (hostsCopy.size() == 0) {
             return suitableHosts;
diff --git a/plugins/integrations/prometheus/src/main/java/org/apache/cloudstack/metrics/PrometheusExporterImpl.java b/plugins/integrations/prometheus/src/main/java/org/apache/cloudstack/metrics/PrometheusExporterImpl.java
index ee48252534f..3b111da5961 100644
--- a/plugins/integrations/prometheus/src/main/java/org/apache/cloudstack/metrics/PrometheusExporterImpl.java
+++ b/plugins/integrations/prometheus/src/main/java/org/apache/cloudstack/metrics/PrometheusExporterImpl.java
@@ -29,6 +29,7 @@ import com.cloud.configuration.dao.ResourceCountDao;
 import com.cloud.dc.DedicatedResourceVO;
 import com.cloud.dc.dao.DedicatedResourceDao;
 import com.cloud.host.HostStats;
+import com.cloud.host.HostTagVO;
 import com.cloud.user.Account;
 import com.cloud.user.dao.AccountDao;
 import org.apache.cloudstack.engine.subsystem.api.storage.ZoneScope;
@@ -234,7 +235,9 @@ public class PrometheusExporterImpl extends ManagerBase implements PrometheusExp
     }
 
     private String markTagMaps(HostVO host, Map<String, Integer> totalHosts, Map<String, Integer> upHosts, Map<String, Integer> downHosts) {
-        List<String> hostTags = _hostTagsDao.getHostTags(host.getId());
+        List<HostTagVO> hostTagVOS = _hostTagsDao.getHostTags(host.getId());
+        List<String> hostTags = new ArrayList<>();
+        hostTagVOS.forEach(hostTagVO -> hostTags.add(hostTagVO.getTag()));
         markTags(hostTags,totalHosts);
         if (host.getStatus() == Status.Up && !host.isInMaintenanceStates()) {
             markTags(hostTags, upHosts);
@@ -277,10 +280,12 @@ public class PrometheusExporterImpl extends ManagerBase implements PrometheusExp
                     metricsList.add(new ItemHostMemory(zoneName, zoneUuid, null, null, null, null, ALLOCATED, allocatedCapacityByTag.third(), 0, tag));
                 });
 
-        List<String> allHostTags = hostDao.listAll().stream()
+        List<HostTagVO> allHostTagVOS = hostDao.listAll().stream()
                 .flatMap( h -> _hostTagsDao.getHostTags(h.getId()).stream())
                 .distinct()
                 .collect(Collectors.toList());
+        List<String> allHostTags = new ArrayList<>();
+        allHostTagVOS.forEach(hostTagVO -> allHostTags.add(hostTagVO.getTag()));
 
         for (final State state : State.values()) {
             for (final String hostTag : allHostTags) {
diff --git a/plugins/storage/volume/cloudbyte/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ElastistorPrimaryDataStoreLifeCycle.java b/plugins/storage/volume/cloudbyte/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ElastistorPrimaryDataStoreLifeCycle.java
index d6a67b447c5..0798f9f2cd2 100644
--- a/plugins/storage/volume/cloudbyte/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ElastistorPrimaryDataStoreLifeCycle.java
+++ b/plugins/storage/volume/cloudbyte/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ElastistorPrimaryDataStoreLifeCycle.java
@@ -107,6 +107,7 @@ public class ElastistorPrimaryDataStoreLifeCycle implements PrimaryDataStoreLife
         Long capacityBytes = (Long) dsInfos.get("capacityBytes");
         Long capacityIops = (Long) dsInfos.get("capacityIops");
         String tags = (String) dsInfos.get("tags");
+        Boolean isTagARule = (Boolean) dsInfos.get("isTagARule");
         boolean managed = (Boolean) dsInfos.get("managed");
         Map<String, String> details = (Map<String, String>) dsInfos.get("details");
         String domainName = details.get("domainname");
@@ -196,6 +197,7 @@ public class ElastistorPrimaryDataStoreLifeCycle implements PrimaryDataStoreLife
         parameters.setCapacityIops(capacityIops);
         parameters.setHypervisorType(HypervisorType.Any);
         parameters.setTags(tags);
+        parameters.setIsTagARule(isTagARule);
         parameters.setDetails(details);
         parameters.setClusterId(clusterId);
 
diff --git a/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/DateraPrimaryDataStoreLifeCycle.java b/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/DateraPrimaryDataStoreLifeCycle.java
index 0906e645b66..6fd42009125 100644
--- a/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/DateraPrimaryDataStoreLifeCycle.java
+++ b/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/DateraPrimaryDataStoreLifeCycle.java
@@ -100,6 +100,7 @@ public class DateraPrimaryDataStoreLifeCycle implements PrimaryDataStoreLifeCycl
         Long capacityBytes = (Long) dsInfos.get("capacityBytes");
         Long capacityIops = (Long) dsInfos.get("capacityIops");
         String tags = (String) dsInfos.get("tags");
+        boolean isTagARule = (Boolean)dsInfos.get("isTagARule");
         @SuppressWarnings("unchecked")
         Map<String, String> details = (Map<String, String>) dsInfos.get("details");
         String domainName = details.get("domainname");
@@ -181,6 +182,7 @@ public class DateraPrimaryDataStoreLifeCycle implements PrimaryDataStoreLifeCycl
         parameters.setCapacityIops(capacityIops);
         parameters.setHypervisorType(HypervisorType.Any);
         parameters.setTags(tags);
+        parameters.setIsTagARule(isTagARule);
         parameters.setDetails(details);
 
         String managementVip = DateraUtil.getManagementVip(url);
diff --git a/plugins/storage/volume/default/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/CloudStackPrimaryDataStoreLifeCycleImpl.java b/plugins/storage/volume/default/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/CloudStackPrimaryDataStoreLifeCycleImpl.java
index 6e178c119e3..685565d73b0 100644
--- a/plugins/storage/volume/default/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/CloudStackPrimaryDataStoreLifeCycleImpl.java
+++ b/plugins/storage/volume/default/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/CloudStackPrimaryDataStoreLifeCycleImpl.java
@@ -138,6 +138,7 @@ public class CloudStackPrimaryDataStoreLifeCycleImpl implements PrimaryDataStore
         Map<String, String> details = (Map<String, String>)dsInfos.get("details");
 
         parameters.setTags(tags);
+        parameters.setIsTagARule((Boolean)dsInfos.get("isTagARule"));
         parameters.setDetails(details);
 
         String scheme = dsInfos.get("scheme").toString();
diff --git a/plugins/storage/volume/linstor/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/LinstorPrimaryDataStoreLifeCycleImpl.java b/plugins/storage/volume/linstor/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/LinstorPrimaryDataStoreLifeCycleImpl.java
index a7d0c7fc0a3..efc69438e75 100644
--- a/plugins/storage/volume/linstor/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/LinstorPrimaryDataStoreLifeCycleImpl.java
+++ b/plugins/storage/volume/linstor/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/LinstorPrimaryDataStoreLifeCycleImpl.java
@@ -91,6 +91,7 @@ public class LinstorPrimaryDataStoreLifeCycleImpl implements PrimaryDataStoreLif
         String providerName = (String) dsInfos.get("providerName");
         Long capacityIops = (Long) dsInfos.get("capacityIops");
         String tags = (String) dsInfos.get("tags");
+        Boolean isTagARule = (Boolean) dsInfos.get("isTagARule");
         @SuppressWarnings("unchecked")
         Map<String, String> details = (Map<String, String>) dsInfos.get("details");
 
@@ -168,6 +169,7 @@ public class LinstorPrimaryDataStoreLifeCycleImpl implements PrimaryDataStoreLif
         parameters.setCapacityIops(capacityIops);
         parameters.setHypervisorType(HypervisorType.KVM);
         parameters.setTags(tags);
+        parameters.setIsTagARule(isTagARule);
         parameters.setDetails(details);
         parameters.setUserInfo(resourceGroup);
 
diff --git a/plugins/storage/volume/nexenta/src/main/java/org/apache/cloudstack/storage/datastore/lifecylce/NexentaPrimaryDataStoreLifeCycle.java b/plugins/storage/volume/nexenta/src/main/java/org/apache/cloudstack/storage/datastore/lifecylce/NexentaPrimaryDataStoreLifeCycle.java
index 32735664a25..507189edc14 100644
--- a/plugins/storage/volume/nexenta/src/main/java/org/apache/cloudstack/storage/datastore/lifecylce/NexentaPrimaryDataStoreLifeCycle.java
+++ b/plugins/storage/volume/nexenta/src/main/java/org/apache/cloudstack/storage/datastore/lifecylce/NexentaPrimaryDataStoreLifeCycle.java
@@ -69,6 +69,7 @@ public class NexentaPrimaryDataStoreLifeCycle
         Long capacityBytes = (Long)dsInfos.get("capacityBytes");
         Long capacityIops = (Long)dsInfos.get("capacityIops");
         String tags = (String)dsInfos.get("tags");
+        Boolean isTagARule = (Boolean) dsInfos.get("isTagARule");
         Map<String, String> details = (Map<String, String>) dsInfos.get("details");
         NexentaUtil.NexentaPluginParameters params = NexentaUtil.parseNexentaPluginUrl(url);
         DataCenterVO zone = zoneDao.findById(zoneId);
@@ -98,6 +99,7 @@ public class NexentaPrimaryDataStoreLifeCycle
         parameters.setCapacityIops(capacityIops);
         parameters.setHypervisorType(Hypervisor.HypervisorType.Any);
         parameters.setTags(tags);
+        parameters.setIsTagARule(isTagARule);
 
         details.put(NexentaUtil.NMS_URL, params.getNmsUrl().toString());
 
diff --git a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycle.java b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycle.java
index 65831e4ec3d..17150699923 100644
--- a/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycle.java
+++ b/plugins/storage/volume/scaleio/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/ScaleIOPrimaryDataStoreLifeCycle.java
@@ -139,6 +139,7 @@ public class ScaleIOPrimaryDataStoreLifeCycle implements PrimaryDataStoreLifeCyc
         Long capacityBytes = (Long)dsInfos.get("capacityBytes");
         Long capacityIops = (Long)dsInfos.get("capacityIops");
         String tags = (String)dsInfos.get("tags");
+        Boolean isTagARule = (Boolean) dsInfos.get("isTagARule");
         Map<String, String> details = (Map<String, String>) dsInfos.get("details");
 
         if (zoneId == null) {
@@ -224,6 +225,7 @@ public class ScaleIOPrimaryDataStoreLifeCycle implements PrimaryDataStoreLifeCyc
         parameters.setHypervisorType(Hypervisor.HypervisorType.KVM);
         parameters.setUuid(UUID.randomUUID().toString());
         parameters.setTags(tags);
+        parameters.setIsTagARule(isTagARule);
 
         StoragePoolStatistics poolStatistics = scaleIOPool.getStatistics();
         if (poolStatistics != null) {
diff --git a/plugins/storage/volume/solidfire/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SolidFirePrimaryDataStoreLifeCycle.java b/plugins/storage/volume/solidfire/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SolidFirePrimaryDataStoreLifeCycle.java
index 38c8c240094..7a2767c32e6 100644
--- a/plugins/storage/volume/solidfire/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SolidFirePrimaryDataStoreLifeCycle.java
+++ b/plugins/storage/volume/solidfire/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SolidFirePrimaryDataStoreLifeCycle.java
@@ -91,6 +91,7 @@ public class SolidFirePrimaryDataStoreLifeCycle implements PrimaryDataStoreLifeC
         Long capacityBytes = (Long)dsInfos.get("capacityBytes");
         Long capacityIops = (Long)dsInfos.get("capacityIops");
         String tags = (String)dsInfos.get("tags");
+        Boolean isTagARule = (Boolean) dsInfos.get("isTagARule");
         @SuppressWarnings("unchecked")
         Map<String, String> details = (Map<String, String>)dsInfos.get("details");
 
@@ -142,6 +143,7 @@ public class SolidFirePrimaryDataStoreLifeCycle implements PrimaryDataStoreLifeC
         }
 
         parameters.setTags(tags);
+        parameters.setIsTagARule(isTagARule);
         parameters.setDetails(details);
 
         String managementVip = SolidFireUtil.getManagementVip(url);
diff --git a/plugins/storage/volume/solidfire/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SolidFireSharedPrimaryDataStoreLifeCycle.java b/plugins/storage/volume/solidfire/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SolidFireSharedPrimaryDataStoreLifeCycle.java
index 9cc746d4ee8..557cc3f60f6 100644
--- a/plugins/storage/volume/solidfire/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SolidFireSharedPrimaryDataStoreLifeCycle.java
+++ b/plugins/storage/volume/solidfire/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/SolidFireSharedPrimaryDataStoreLifeCycle.java
@@ -104,6 +104,7 @@ public class SolidFireSharedPrimaryDataStoreLifeCycle implements PrimaryDataStor
         Long capacityBytes = (Long)dsInfos.get("capacityBytes");
         Long capacityIops = (Long)dsInfos.get(CAPACITY_IOPS);
         String tags = (String)dsInfos.get("tags");
+        Boolean isTagARule = (Boolean) dsInfos.get("isTagARule");
         @SuppressWarnings("unchecked")
         Map<String, String> details = (Map<String, String>)dsInfos.get("details");
 
@@ -152,6 +153,7 @@ public class SolidFireSharedPrimaryDataStoreLifeCycle implements PrimaryDataStor
         parameters.setCapacityIops(capacityIops);
         parameters.setHypervisorType(hypervisorType);
         parameters.setTags(tags);
+        parameters.setIsTagARule(isTagARule);
         parameters.setDetails(details);
 
         String managementVip = SolidFireUtil.getManagementVip(url);
diff --git a/server/src/main/java/com/cloud/agent/manager/allocator/impl/FirstFitAllocator.java b/server/src/main/java/com/cloud/agent/manager/allocator/impl/FirstFitAllocator.java
index df6ea74881e..862e8acc9e0 100644
--- a/server/src/main/java/com/cloud/agent/manager/allocator/impl/FirstFitAllocator.java
+++ b/server/src/main/java/com/cloud/agent/manager/allocator/impl/FirstFitAllocator.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
+import com.cloud.utils.exception.CloudRuntimeException;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 import org.apache.log4j.Logger;
 import org.springframework.stereotype.Component;
@@ -182,7 +183,6 @@ public class FirstFitAllocator extends AdapterBase implements HostAllocator {
 
                 if (hasSvcOfferingTag && hasTemplateTag) {
                     hostsMatchingOfferingTag.retainAll(hostsMatchingTemplateTag);
-                    clusterHosts = _hostDao.listByHostTag(type, clusterId, podId, dcId, hostTagOnTemplate);
                     if (s_logger.isDebugEnabled()) {
                         s_logger.debug("Found " + hostsMatchingOfferingTag.size() + " Hosts satisfying both tags, host ids are:" + hostsMatchingOfferingTag);
                     }
@@ -202,6 +202,13 @@ public class FirstFitAllocator extends AdapterBase implements HostAllocator {
             clusterHosts.retainAll(hostsMatchingUefiTag);
         }
 
+        clusterHosts.addAll(_hostDao.findHostsWithTagRuleThatMatchComputeOferringTags(hostTagOnOffering));
+
+
+        if (clusterHosts.isEmpty()) {
+            s_logger.error(String.format("No suitable host found for vm [%s] with tags [%s].", vmProfile, hostTagOnOffering));
+            throw new CloudRuntimeException(String.format("No suitable host found for vm [%s].", vmProfile));
+        }
         // add all hosts that we are not considering to the avoid list
         List<HostVO> allhostsInCluster = _hostDao.listAllUpAndEnabledNonHAHosts(type, clusterId, podId, dcId, null);
         allhostsInCluster.removeAll(clusterHosts);
@@ -267,6 +274,8 @@ public class FirstFitAllocator extends AdapterBase implements HostAllocator {
             }
         }
 
+        hostsCopy.addAll(_hostDao.findHostsWithTagRuleThatMatchComputeOferringTags(hostTagOnOffering));
+
         if (!hostsCopy.isEmpty()) {
             suitableHosts = allocateTo(plan, offering, template, avoid, hostsCopy, returnUpTo, considerReservedCapacity, account);
         }
diff --git a/server/src/main/java/com/cloud/api/query/dao/HostJoinDaoImpl.java b/server/src/main/java/com/cloud/api/query/dao/HostJoinDaoImpl.java
index bf3ded1fef0..da81f42b41d 100644
--- a/server/src/main/java/com/cloud/api/query/dao/HostJoinDaoImpl.java
+++ b/server/src/main/java/com/cloud/api/query/dao/HostJoinDaoImpl.java
@@ -205,6 +205,7 @@ public class HostJoinDaoImpl extends GenericDaoBase<HostJoinVO, Long> implements
 
                 String hostTags = host.getTag();
                 hostResponse.setHostTags(hostTags);
+                hostResponse.setIsTagARule(host.getIsTagARule());
                 hostResponse.setHaHost(containsHostHATag(hostTags));
 
                 hostResponse.setHypervisorVersion(host.getHypervisorVersion());
diff --git a/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDao.java b/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDao.java
index 87659210ad7..9028f3418b3 100644
--- a/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDao.java
+++ b/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDao.java
@@ -27,6 +27,7 @@ import org.apache.cloudstack.api.response.StoragePoolResponse;
 import com.cloud.api.query.vo.StoragePoolJoinVO;
 import com.cloud.storage.StoragePool;
 import com.cloud.utils.db.GenericDao;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 
 public interface StoragePoolJoinDao extends GenericDao<StoragePoolJoinVO, Long> {
 
@@ -44,4 +45,6 @@ public interface StoragePoolJoinDao extends GenericDao<StoragePoolJoinVO, Long>
 
     Pair<List<StoragePoolJoinVO>, Integer> searchAndCount(Long storagePoolId, String storagePoolName, Long zoneId, String path, Long podId, Long clusterId, String address, ScopeType scopeType, StoragePoolStatus status, String keyword, Filter searchFilter);
 
+    List<StoragePoolVO> findStoragePoolByScopeAndRuleTags(Long datacenterId, Long podId, Long clusterId, ScopeType scopeType, List<String> tags);
+
 }
diff --git a/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDaoImpl.java b/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDaoImpl.java
index de469d21a11..e75e86108c7 100644
--- a/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDaoImpl.java
+++ b/server/src/main/java/com/cloud/api/query/dao/StoragePoolJoinDaoImpl.java
@@ -25,6 +25,7 @@ import com.cloud.storage.Storage;
 import com.cloud.storage.StoragePool;
 import com.cloud.storage.StoragePoolStatus;
 import com.cloud.storage.StorageStats;
+import com.cloud.storage.VolumeApiServiceImpl;
 import com.cloud.user.AccountManager;
 import com.cloud.utils.Pair;
 import com.cloud.utils.StringUtils;
@@ -44,6 +45,7 @@ import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailVO;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.cloudstack.utils.jsinterpreter.TagAsRuleHelper;
 import org.apache.log4j.Logger;
 import org.springframework.stereotype.Component;
 
@@ -71,10 +73,13 @@ public class StoragePoolJoinDaoImpl extends GenericDaoBase<StoragePoolJoinVO, Lo
     @Inject
     private StoragePoolDetailsDao storagePoolDetailsDao;
 
+
     private final SearchBuilder<StoragePoolJoinVO> spSearch;
 
     private final SearchBuilder<StoragePoolJoinVO> spIdSearch;
 
+    private final SearchBuilder<StoragePoolJoinVO> findByDatacenterAndScopeSb;
+
     protected StoragePoolJoinDaoImpl() {
 
         spSearch = createSearchBuilder();
@@ -85,6 +90,15 @@ public class StoragePoolJoinDaoImpl extends GenericDaoBase<StoragePoolJoinVO, Lo
         spIdSearch.and("id", spIdSearch.entity().getId(), SearchCriteria.Op.EQ);
         spIdSearch.done();
 
+        findByDatacenterAndScopeSb = createSearchBuilder();
+        findByDatacenterAndScopeSb.and("zoneId", findByDatacenterAndScopeSb.entity().getZoneId(), SearchCriteria.Op.EQ);
+        findByDatacenterAndScopeSb.and("clusterId", findByDatacenterAndScopeSb.entity().getClusterId(), SearchCriteria.Op.EQ);
+        findByDatacenterAndScopeSb.and("podId", findByDatacenterAndScopeSb.entity().getPodId(), SearchCriteria.Op.EQ);
+        findByDatacenterAndScopeSb.and("scope", findByDatacenterAndScopeSb.entity().getScope(), SearchCriteria.Op.EQ);
+        findByDatacenterAndScopeSb.and("status", findByDatacenterAndScopeSb.entity().getStatus(), SearchCriteria.Op.EQ);
+        findByDatacenterAndScopeSb.and("is_tag_a_rule", findByDatacenterAndScopeSb.entity().getIsTagARule(), SearchCriteria.Op.EQ);
+        findByDatacenterAndScopeSb.done();
+
         _count = "select count(distinct id) from storage_pool_view WHERE ";
     }
 
@@ -149,6 +163,7 @@ public class StoragePoolJoinDaoImpl extends GenericDaoBase<StoragePoolJoinVO, Lo
         poolResponse.setClusterName(pool.getClusterName());
         poolResponse.setProvider(pool.getStorageProviderName());
         poolResponse.setTags(pool.getTag());
+        poolResponse.setIsTagARule(pool.getIsTagARule());
         poolResponse.setOverProvisionFactor(Double.toString(CapacityManager.StorageOverprovisioningFactor.valueIn(pool.getId())));
 
         // set async job
@@ -221,6 +236,7 @@ public class StoragePoolJoinDaoImpl extends GenericDaoBase<StoragePoolJoinVO, Lo
         poolResponse.setClusterName(pool.getClusterName());
         poolResponse.setProvider(pool.getStorageProviderName());
         poolResponse.setTags(pool.getTag());
+        poolResponse.setIsTagARule(pool.getIsTagARule());
 
         // set async job
         poolResponse.setJobId(pool.getJobUuid());
@@ -366,4 +382,46 @@ public class StoragePoolJoinDaoImpl extends GenericDaoBase<StoragePoolJoinVO, Lo
         sc.setParameters("parent", 0);
         return sc;
     }
+    @Override
+    public List<StoragePoolVO> findStoragePoolByScopeAndRuleTags(Long datacenterId, Long podId, Long clusterId, ScopeType scopeType, List<String> tags) {
+        SearchCriteria<StoragePoolJoinVO> sc =  findByDatacenterAndScopeSb.create();
+        if (datacenterId != null) {
+            sc.setParameters("zoneId", datacenterId);
+        }
+        if (clusterId != null) {
+            sc.setParameters("clusterId", clusterId);
+        }
+        if (podId != null) {
+            sc.setParameters("podId", podId);
+        }
+
+        sc.setParameters("scope", scopeType);
+        sc.setParameters("status", "Up");
+        sc.setParameters("is_tag_a_rule", true);
+        List<StoragePoolJoinVO> storagePools = search(sc, null, false, false);
+
+        List<StoragePoolVO> filteredPools = new ArrayList<>();
+
+        StringBuilder injectableTagsBuilder = new StringBuilder();
+        for (String tag : tags) {
+            injectableTagsBuilder.append(tag).append(",");
+        }
+        if (!tags.isEmpty()) {
+            injectableTagsBuilder.deleteCharAt(injectableTagsBuilder.length() - 1);
+        }
+        String injectableTag = injectableTagsBuilder.toString();
+
+        for (StoragePoolJoinVO storagePoolJoinVO : storagePools) {
+            if (TagAsRuleHelper.interpretTagAsRule(storagePoolJoinVO.getTag(), injectableTag, VolumeApiServiceImpl.storageTagRuleExecutionTimeout.value())) {
+                StoragePoolVO storagePoolVO = storagePoolDao.findById(storagePoolJoinVO.getId());
+                if (storagePoolVO != null) {
+                    filteredPools.add(storagePoolVO);
+                } else {
+                    s_logger.warn(String.format("Unable to find Storage Pool [%s] in the DB.", storagePoolJoinVO.getUuid()));
+                }
+            }
+        }
+        return filteredPools;
+    }
+
 }
diff --git a/server/src/main/java/com/cloud/api/query/vo/HostJoinVO.java b/server/src/main/java/com/cloud/api/query/vo/HostJoinVO.java
index bb3b6935389..78a45429463 100644
--- a/server/src/main/java/com/cloud/api/query/vo/HostJoinVO.java
+++ b/server/src/main/java/com/cloud/api/query/vo/HostJoinVO.java
@@ -172,6 +172,9 @@ public class HostJoinVO extends BaseViewVO implements InternalIdentity, Identity
     @Column(name = "tag")
     private String tag;
 
+    @Column(name = "is_tag_a_rule")
+    private Boolean isTagARule;
+
     @Column(name = "memory_used_capacity")
     private long memUsedCapacity;
 
@@ -388,6 +391,10 @@ public class HostJoinVO extends BaseViewVO implements InternalIdentity, Identity
         return tag;
     }
 
+    public Boolean getIsTagARule() {
+        return isTagARule;
+    }
+
     public String getAnnotation() {
         return annotation;
     }
diff --git a/server/src/main/java/com/cloud/api/query/vo/StoragePoolJoinVO.java b/server/src/main/java/com/cloud/api/query/vo/StoragePoolJoinVO.java
index 1831aaafac9..5eb04d2e00d 100644
--- a/server/src/main/java/com/cloud/api/query/vo/StoragePoolJoinVO.java
+++ b/server/src/main/java/com/cloud/api/query/vo/StoragePoolJoinVO.java
@@ -110,6 +110,9 @@ public class StoragePoolJoinVO extends BaseViewVO implements InternalIdentity, I
     @Column(name = "tag")
     private String tag;
 
+    @Column(name = "is_tag_a_rule")
+    private boolean isTagARule;
+
     @Column(name = "disk_used_capacity")
     private long usedCapacity;
 
@@ -243,6 +246,10 @@ public class StoragePoolJoinVO extends BaseViewVO implements InternalIdentity, I
         return tag;
     }
 
+    public boolean getIsTagARule() {
+        return isTagARule;
+    }
+
     public long getUsedCapacity() {
         return usedCapacity;
     }
diff --git a/server/src/main/java/com/cloud/configuration/ConfigurationManagerImpl.java b/server/src/main/java/com/cloud/configuration/ConfigurationManagerImpl.java
index acf57a788a0..502fccfde9e 100644
--- a/server/src/main/java/com/cloud/configuration/ConfigurationManagerImpl.java
+++ b/server/src/main/java/com/cloud/configuration/ConfigurationManagerImpl.java
@@ -48,6 +48,10 @@ import javax.naming.ConfigurationException;
 
 import com.cloud.hypervisor.HypervisorGuru;
 import com.cloud.utils.crypt.DBEncryptionUtil;
+import com.cloud.host.HostTagVO;
+import com.cloud.storage.StoragePoolTagVO;
+import com.cloud.storage.VolumeApiServiceImpl;
+import com.googlecode.ipv6.IPv6Address;
 import org.apache.cloudstack.acl.SecurityChecker;
 import org.apache.cloudstack.affinity.AffinityGroup;
 import org.apache.cloudstack.affinity.AffinityGroupService;
@@ -128,6 +132,7 @@ import org.apache.cloudstack.storage.datastore.db.ImageStoreVO;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.cloudstack.utils.jsinterpreter.TagAsRuleHelper;
 import org.apache.cloudstack.utils.reflectiontostringbuilderutils.ReflectionToStringBuilderUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
@@ -293,7 +298,6 @@ import com.google.common.base.Enums;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
-import com.googlecode.ipv6.IPv6Address;
 import com.googlecode.ipv6.IPv6Network;
 
 public class ConfigurationManagerImpl extends ManagerBase implements ConfigurationManager, ConfigurationService, Configurable {
@@ -4080,17 +4084,23 @@ public class ConfigurationManagerImpl extends ManagerBase implements Configurati
             if (CollectionUtils.isNotEmpty(pools)) {
                 List<String> listOfTags = Arrays.asList(tags.split(","));
                 for (StoragePoolVO storagePoolVO : pools) {
-                    List<String> tagsOnPool = storagePoolTagDao.getStoragePoolTags(storagePoolVO.getId());
-                    if (CollectionUtils.isEmpty(tagsOnPool) || !tagsOnPool.containsAll(listOfTags)) {
-                        DiskOfferingVO offeringToRetrieveInfo = _diskOfferingDao.findById(diskOffering.getId());
-                        List<VolumeVO> volumes = _volumeDao.findByDiskOfferingId(diskOffering.getId());
-                        String listOfVolumesNamesAndUuid = ReflectionToStringBuilderUtils.reflectOnlySelectedFields(volumes, "name", "uuid");
-                        String diskOfferingInfo = ReflectionToStringBuilderUtils.reflectOnlySelectedFields(offeringToRetrieveInfo, "name", "uuid");
-                        String poolInfo = ReflectionToStringBuilderUtils.reflectOnlySelectedFields(storagePoolVO, "name", "uuid");
-                        throw new InvalidParameterValueException(String.format("There are active volumes using the disk offering %s, and the pool %s doesn't have the new tags. " +
-                                "The following volumes are using the mentioned disk offering %s. Please first add the new tags to the mentioned storage pools before adding them" +
-                                " to the disk offering.", diskOfferingInfo, poolInfo, listOfVolumesNamesAndUuid));
+                    List<StoragePoolTagVO> tagsOnPool = storagePoolTagDao.findStoragePoolTags(storagePoolVO.getId());
+                    List<String> tagsAsString = tagsOnPool.stream().map(StoragePoolTagVO::getTag).collect(Collectors.toList());
+
+                    if ((CollectionUtils.isNotEmpty(tagsAsString) && tagsAsString.containsAll(listOfTags)) ||
+                        (tagsOnPool.size() == 1 && tagsOnPool.get(0).isTagARule() &&
+                        TagAsRuleHelper.interpretTagAsRule(tagsOnPool.get(0).getTag(), tags, VolumeApiServiceImpl.storageTagRuleExecutionTimeout.value()))) {
+                        continue;
                     }
+
+                    DiskOfferingVO offeringToRetrieveInfo = _diskOfferingDao.findById(diskOffering.getId());
+                    List<VolumeVO> volumes = _volumeDao.findByDiskOfferingId(diskOffering.getId());
+                    String listOfVolumesNamesAndUuid = ReflectionToStringBuilderUtils.reflectOnlySelectedFields(volumes, "name", "uuid");
+                    String diskOfferingInfo = ReflectionToStringBuilderUtils.reflectOnlySelectedFields(offeringToRetrieveInfo, "name", "uuid");
+                    String poolInfo = ReflectionToStringBuilderUtils.reflectOnlySelectedFields(storagePoolVO, "name", "uuid");
+                    throw new InvalidParameterValueException(String.format("There are active volumes using the disk offering %s, and the pool %s doesn't have the new tags. " +
+                            "The following volumes are using the mentioned disk offering %s. Please first add the new tags to the mentioned storage pools before adding them" +
+                            " to the disk offering.", diskOfferingInfo, poolInfo, listOfVolumesNamesAndUuid));
                 }
             }
             diskOffering.setTags(tags);
@@ -4117,10 +4127,17 @@ public class ConfigurationManagerImpl extends ManagerBase implements Configurati
             if (CollectionUtils.isNotEmpty(hosts)) {
                 List<String> listOfHostTags = Arrays.asList(hostTags.split(","));
                 for (HostVO host : hosts) {
-                    List<String> tagsOnHost = hostTagDao.getHostTags(host.getId());
-                    if (CollectionUtils.isEmpty(tagsOnHost) || !tagsOnHost.containsAll(listOfHostTags)) {
-                        throw new InvalidParameterValueException(String.format("There are active VMs using offering [%s], and the hosts [%s] don't have the new tags", offering.getId(), hosts));
+                    List<HostTagVO> tagsOnHost = hostTagDao.getHostTags(host.getId());
+                    List<String> tagsAsString = tagsOnHost.stream().map(HostTagVO::getTag).collect(Collectors.toList());
+
+                    if ((CollectionUtils.isNotEmpty(tagsAsString) && tagsAsString.containsAll(listOfHostTags)) ||
+                        (tagsOnHost.size() == 1 && tagsOnHost.get(0).getIsTagARule() &&
+                        TagAsRuleHelper.interpretTagAsRule(tagsOnHost.get(0).getTag(), hostTags, HostTagsDao.hostTagRuleExecutionTimeout.value()))) {
+                        continue;
                     }
+
+                    throw new InvalidParameterValueException(String.format("There are active VMs using offering [%s], and the hosts [%s] don't have the new tags",
+                        offering.getId(), hosts));
                 }
             }
             offering.setHostTag(hostTags);
diff --git a/server/src/main/java/com/cloud/deploy/DeploymentPlanningManagerImpl.java b/server/src/main/java/com/cloud/deploy/DeploymentPlanningManagerImpl.java
index 9cb7f4e8aaf..cb22e81f366 100644
--- a/server/src/main/java/com/cloud/deploy/DeploymentPlanningManagerImpl.java
+++ b/server/src/main/java/com/cloud/deploy/DeploymentPlanningManagerImpl.java
@@ -1361,7 +1361,7 @@ StateListener<State, VirtualMachine.Event, VirtualMachine>, Configurable {
 
             if (vmRequiresSharedStorage) {
                 // check shared pools
-                List<StoragePoolVO> allPoolsInCluster = _storagePoolDao.findPoolsByTags(clusterVO.getDataCenterId(), clusterVO.getPodId(), clusterVO.getId(), null);
+                List<StoragePoolVO> allPoolsInCluster = _storagePoolDao.findPoolsByTags(clusterVO.getDataCenterId(), clusterVO.getPodId(), clusterVO.getId(), null, false, 0);
                 for (StoragePoolVO pool : allPoolsInCluster) {
                     if (!allocatorAvoidOutput.shouldAvoid(pool)) {
                         // there's some pool in the cluster that is not yet in avoid set
@@ -1374,7 +1374,7 @@ StateListener<State, VirtualMachine.Event, VirtualMachine>, Configurable {
             if (vmRequiresLocalStorege) {
                 // check local pools
                 List<StoragePoolVO> allLocalPoolsInCluster =
-                        _storagePoolDao.findLocalStoragePoolsByTags(clusterVO.getDataCenterId(), clusterVO.getPodId(), clusterVO.getId(), null);
+                        _storagePoolDao.findLocalStoragePoolsByTags(clusterVO.getDataCenterId(), clusterVO.getPodId(), clusterVO.getId(), null, false);
                 for (StoragePoolVO pool : allLocalPoolsInCluster) {
                     if (!allocatorAvoidOutput.shouldAvoid(pool)) {
                         // there's some pool in the cluster that is not yet
diff --git a/server/src/main/java/com/cloud/deploy/FirstFitPlanner.java b/server/src/main/java/com/cloud/deploy/FirstFitPlanner.java
index 4e838510010..c2969ecce50 100644
--- a/server/src/main/java/com/cloud/deploy/FirstFitPlanner.java
+++ b/server/src/main/java/com/cloud/deploy/FirstFitPlanner.java
@@ -27,6 +27,7 @@ import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
 import com.cloud.capacity.CapacityVO;
+import com.cloud.utils.exception.CloudRuntimeException;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
 import org.apache.cloudstack.framework.config.ConfigKey;
 import org.apache.cloudstack.framework.config.Configurable;
@@ -521,6 +522,12 @@ public class FirstFitPlanner extends AdapterBase implements DeploymentClusterPla
     private void removeClustersWithoutMatchingTag(List<Long> clusterListForVmAllocation, String hostTagOnOffering) {
 
         List<Long> matchingClusters = hostDao.listClustersByHostTag(hostTagOnOffering);
+        matchingClusters.addAll(hostDao.findClustersThatMatchHostTagRule(hostTagOnOffering));
+
+        if (matchingClusters.isEmpty()) {
+            s_logger.error(String.format("No suitable host found for the following compute offering tags [%s].", hostTagOnOffering));
+            throw new CloudRuntimeException("No suitable host found.");
+        }
 
         clusterListForVmAllocation.retainAll(matchingClusters);
 
diff --git a/server/src/main/java/com/cloud/resource/ResourceManagerImpl.java b/server/src/main/java/com/cloud/resource/ResourceManagerImpl.java
index d1697646c39..922df25a726 100755
--- a/server/src/main/java/com/cloud/resource/ResourceManagerImpl.java
+++ b/server/src/main/java/com/cloud/resource/ResourceManagerImpl.java
@@ -32,11 +32,13 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Random;
+import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
 import com.cloud.alert.AlertManager;
+import com.cloud.host.HostTagVO;
 import com.cloud.exception.StorageConflictException;
 import com.cloud.exception.StorageUnavailableException;
 import com.cloud.storage.Volume;
@@ -856,7 +858,7 @@ public class ResourceManagerImpl extends ManagerBase implements ResourceManager,
                                     if (s_logger.isTraceEnabled()) {
                                         s_logger.trace("Adding Host Tags for KVM host, tags:  :" + hostTags);
                                     }
-                                    _hostTagsDao.persist(host.getId(), hostTags);
+                                    _hostTagsDao.persist(host.getId(), hostTags, false);
                                 }
                                 hosts.add(host);
 
@@ -1909,7 +1911,7 @@ public class ResourceManagerImpl extends ManagerBase implements ResourceManager,
         }
     }
 
-    private void updateHostTags(HostVO host, Long hostId, List<String> hostTags) {
+    private void updateHostTags(HostVO host, Long hostId, List<String> hostTags, Boolean isTagARule) {
         List<VMInstanceVO> activeVMs =  _vmDao.listByHostId(hostId);
         s_logger.warn(String.format("The following active VMs [%s] are using the host [%s]. " +
                 "Updating the host tags will not affect them.", activeVMs, host));
@@ -1917,17 +1919,17 @@ public class ResourceManagerImpl extends ManagerBase implements ResourceManager,
         if (s_logger.isDebugEnabled()) {
             s_logger.debug("Updating Host Tags to :" + hostTags);
         }
-        _hostTagsDao.persist(hostId, new ArrayList<>(new HashSet<>(hostTags)));
+        _hostTagsDao.persist(hostId, new ArrayList<>(new HashSet<>(hostTags)), isTagARule);
     }
 
     @Override
     public Host updateHost(final UpdateHostCmd cmd) throws NoTransitionException {
         return updateHost(cmd.getId(), cmd.getName(), cmd.getOsCategoryId(),
-                cmd.getAllocationState(), cmd.getUrl(), cmd.getHostTags(), cmd.getAnnotation(), false);
+                cmd.getAllocationState(), cmd.getUrl(), cmd.getHostTags(), cmd.getIsTagARule(), cmd.getAnnotation(), false);
     }
 
     private Host updateHost(Long hostId, String name, Long guestOSCategoryId, String allocationState,
-                            String url, List<String> hostTags, String annotation, boolean isUpdateFromHostHealthCheck) throws NoTransitionException {
+                            String url, List<String> hostTags, Boolean isTagARule, String annotation, boolean isUpdateFromHostHealthCheck) throws NoTransitionException {
         // Verify that the host exists
         final HostVO host = _hostDao.findById(hostId);
         if (host == null) {
@@ -1948,7 +1950,7 @@ public class ResourceManagerImpl extends ManagerBase implements ResourceManager,
         }
 
         if (hostTags != null) {
-            updateHostTags(host, hostId, hostTags);
+            updateHostTags(host, hostId, hostTags, isTagARule);
         }
 
         if (url != null) {
@@ -2007,7 +2009,7 @@ public class ResourceManagerImpl extends ManagerBase implements ResourceManager,
 
     @Override
     public Host autoUpdateHostAllocationState(Long hostId, ResourceState.Event resourceEvent) throws NoTransitionException {
-        return updateHost(hostId, null, null, resourceEvent.toString(), null, null, null, true);
+        return updateHost(hostId, null, null, resourceEvent.toString(), null, null, null, null, true);
     }
 
     @Override
@@ -2339,7 +2341,7 @@ public class ResourceManagerImpl extends ManagerBase implements ResourceManager,
             final List<String> implicitHostTags = ssCmd.getHostTags();
             if (!implicitHostTags.isEmpty()) {
                 if (hostTags == null) {
-                    hostTags = _hostTagsDao.getHostTags(host.getId());
+                    hostTags = _hostTagsDao.getHostTags(host.getId()).parallelStream().map(HostTagVO::getTag).collect(Collectors.toList());
                 }
                 if (hostTags != null) {
                     implicitHostTags.removeAll(hostTags);
@@ -2367,7 +2369,7 @@ public class ResourceManagerImpl extends ManagerBase implements ResourceManager,
         host.setManagementServerId(_nodeId);
         host.setStorageUrl(startup.getIqn());
         host.setLastPinged(System.currentTimeMillis() >> 10);
-        host.setHostTags(hostTags);
+        host.setHostTags(hostTags, false);
         host.setDetails(details);
         if (startup.getStorageIpAddressDeux() != null) {
             host.setStorageIpAddressDeux(startup.getStorageIpAddressDeux());
@@ -3351,7 +3353,7 @@ public class ResourceManagerImpl extends ManagerBase implements ResourceManager,
 
     @Override
     public String getHostTags(final long hostId) {
-        final List<String> hostTags = _hostTagsDao.getHostTags(hostId);
+        final List<String> hostTags = _hostTagsDao.getHostTags(hostId).parallelStream().map(HostTagVO::getTag).collect(Collectors.toList());
         if (hostTags == null) {
             return null;
         } else {
diff --git a/server/src/main/java/com/cloud/resource/RollingMaintenanceManagerImpl.java b/server/src/main/java/com/cloud/resource/RollingMaintenanceManagerImpl.java
index d881dee137a..25b2ad53bf2 100644
--- a/server/src/main/java/com/cloud/resource/RollingMaintenanceManagerImpl.java
+++ b/server/src/main/java/com/cloud/resource/RollingMaintenanceManagerImpl.java
@@ -54,6 +54,7 @@ import com.cloud.exception.AgentUnavailableException;
 import com.cloud.exception.InvalidParameterValueException;
 import com.cloud.exception.OperationTimedoutException;
 import com.cloud.host.Host;
+import com.cloud.host.HostTagVO;
 import com.cloud.host.HostVO;
 import com.cloud.host.Status;
 import com.cloud.host.dao.HostDao;
@@ -615,7 +616,7 @@ public class RollingMaintenanceManagerImpl extends ManagerBase implements Rollin
         if (CollectionUtils.isEmpty(vmsRunning)) {
             return new Pair<>(true, "OK");
         }
-        List<String> hostTags = hostTagsDao.getHostTags(host.getId());
+        List<HostTagVO> hostTags = hostTagsDao.getHostTags(host.getId());
 
         int successfullyCheckedVmMigrations = 0;
         for (VMInstanceVO runningVM : vmsRunning) {
@@ -668,14 +669,14 @@ public class RollingMaintenanceManagerImpl extends ManagerBase implements Rollin
     /**
      * Check hosts tags
      */
-    private boolean checkHostTags(List<String> hostTags, List<String> hostInClusterTags, String offeringTag) {
+    private boolean checkHostTags(List<HostTagVO> hostTags, List<HostTagVO> hostInClusterTags, String offeringTag) {
         if (CollectionUtils.isEmpty(hostTags) && CollectionUtils.isEmpty(hostInClusterTags)) {
             return true;
         } else if ((CollectionUtils.isNotEmpty(hostTags) && CollectionUtils.isEmpty(hostInClusterTags)) ||
                 (CollectionUtils.isEmpty(hostTags) && CollectionUtils.isNotEmpty(hostInClusterTags))) {
             return false;
         } else {
-            return hostInClusterTags.contains(offeringTag);
+            return hostInClusterTags.parallelStream().anyMatch(hostTagVO -> offeringTag.equals(hostTagVO.getTag()));
         }
     }
 
diff --git a/server/src/main/java/com/cloud/storage/StorageManagerImpl.java b/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
index 494855b253d..66063bee0f7 100644
--- a/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
+++ b/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
@@ -854,6 +854,7 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
         params.put("hypervisorType", hypervisorType);
         params.put("url", cmd.getUrl());
         params.put("tags", cmd.getTags());
+        params.put("isTagARule", cmd.isTagARule());
         params.put("name", cmd.getStoragePoolName());
         params.put("details", details);
         params.put("providerName", storeProvider.getName());
@@ -1017,10 +1018,10 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
             if (pool.getPoolType() == StoragePoolType.DatastoreCluster) {
                 List<StoragePoolVO> childStoragePools = _storagePoolDao.listChildStoragePoolsInDatastoreCluster(pool.getId());
                 for (StoragePoolVO childPool : childStoragePools) {
-                    _storagePoolTagsDao.persist(childPool.getId(), storagePoolTags);
+                    _storagePoolTagsDao.persist(childPool.getId(), storagePoolTags, cmd.isTagARule());
                 }
             }
-            _storagePoolTagsDao.persist(pool.getId(), storagePoolTags);
+            _storagePoolTagsDao.persist(pool.getId(), storagePoolTags, cmd.isTagARule());
         }
 
         Long updatedCapacityBytes = null;
@@ -1991,7 +1992,7 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
 
     public void syncDatastoreClusterStoragePool(long datastoreClusterPoolId, List<ModifyStoragePoolAnswer> childDatastoreAnswerList, long hostId) {
         StoragePoolVO datastoreClusterPool = _storagePoolDao.findById(datastoreClusterPoolId);
-        List<String> storageTags = _storagePoolTagsDao.getStoragePoolTags(datastoreClusterPoolId);
+        List<StoragePoolTagVO> storageTags = _storagePoolTagsDao.findStoragePoolTags(datastoreClusterPoolId);
         List<StoragePoolVO> childDatastores = _storagePoolDao.listChildStoragePoolsInDatastoreCluster(datastoreClusterPoolId);
         Set<String> childDatastoreUUIDs = new HashSet<>();
         for (StoragePoolVO childDatastore : childDatastores) {
@@ -2019,18 +2020,18 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
                     dataStoreVO.setParent(datastoreClusterPoolId);
                     _storagePoolDao.update(dataStoreVO.getId(), dataStoreVO);
                     if (CollectionUtils.isNotEmpty(storageTags)) {
-                        storageTags.addAll(_storagePoolTagsDao.getStoragePoolTags(dataStoreVO.getId()));
+                        storageTags.addAll(_storagePoolTagsDao.findStoragePoolTags(dataStoreVO.getId()));
                     } else {
-                        storageTags = _storagePoolTagsDao.getStoragePoolTags(dataStoreVO.getId());
+                        storageTags = _storagePoolTagsDao.findStoragePoolTags(dataStoreVO.getId());
                     }
                     if (CollectionUtils.isNotEmpty(storageTags)) {
-                        Set<String> set = new LinkedHashSet<>(storageTags);
+                        Set<StoragePoolTagVO> set = new LinkedHashSet<>(storageTags);
                         storageTags.clear();
                         storageTags.addAll(set);
                         if (s_logger.isDebugEnabled()) {
                             s_logger.debug("Updating Storage Pool Tags to :" + storageTags);
                         }
-                        _storagePoolTagsDao.persist(dataStoreVO.getId(), storageTags);
+                        _storagePoolTagsDao.persist(storageTags);
                     }
                 } else {
                     // This is to find datastores which are removed from datastore cluster.
@@ -2038,7 +2039,7 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
                     childDatastoreUUIDs.remove(dataStoreVO.getUuid());
                 }
             } else {
-                dataStoreVO = createChildDatastoreVO(datastoreClusterPool, childDataStoreAnswer);
+                dataStoreVO = createChildDatastoreVO(datastoreClusterPool, childDataStoreAnswer, storageTags);
             }
             updateStoragePoolHostVOAndBytes(dataStoreVO, hostId, childDataStoreAnswer);
         }
@@ -2079,9 +2080,8 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
         }
     }
 
-    private StoragePoolVO createChildDatastoreVO(StoragePoolVO datastoreClusterPool, ModifyStoragePoolAnswer childDataStoreAnswer) {
+    private StoragePoolVO createChildDatastoreVO(StoragePoolVO datastoreClusterPool, ModifyStoragePoolAnswer childDataStoreAnswer, List<StoragePoolTagVO> storagePoolTagVOList) {
         StoragePoolInfo childStoragePoolInfo = childDataStoreAnswer.getPoolInfo();
-        List<String> storageTags = _storagePoolTagsDao.getStoragePoolTags(datastoreClusterPool.getId());
 
         StoragePoolVO dataStoreVO = new StoragePoolVO();
         dataStoreVO.setStorageProviderName(datastoreClusterPool.getStorageProviderName());
@@ -2108,7 +2108,15 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
         if(StringUtils.isNotEmpty(childDataStoreAnswer.getPoolType())) {
             details.put("pool_type", childDataStoreAnswer.getPoolType());
         }
-        _storagePoolDao.persist(dataStoreVO, details, storageTags);
+
+        List<String> storagePoolTags = new ArrayList<>();
+        boolean isTagARule = false;
+        if (CollectionUtils.isNotEmpty(storagePoolTagVOList)) {
+            storagePoolTags = storagePoolTagVOList.parallelStream().map(StoragePoolTagVO::getTag).collect(Collectors.toList());
+            isTagARule = storagePoolTagVOList.get(0).isTagARule();
+        }
+
+        _storagePoolDao.persist(dataStoreVO, details, storagePoolTags, isTagARule);
         return dataStoreVO;
     }
 
diff --git a/server/src/main/java/com/cloud/storage/VolumeApiServiceImpl.java b/server/src/main/java/com/cloud/storage/VolumeApiServiceImpl.java
index 7bfe8a0818b..b4faf2ee479 100644
--- a/server/src/main/java/com/cloud/storage/VolumeApiServiceImpl.java
+++ b/server/src/main/java/com/cloud/storage/VolumeApiServiceImpl.java
@@ -106,6 +106,7 @@ import org.apache.cloudstack.storage.image.datastore.ImageStoreEntity;
 import org.apache.cloudstack.utils.bytescale.ByteScaleUtils;
 import org.apache.cloudstack.utils.identity.ManagementServerNode;
 import org.apache.cloudstack.utils.imagestore.ImageStoreUtil;
+import org.apache.cloudstack.utils.jsinterpreter.TagAsRuleHelper;
 import org.apache.cloudstack.utils.reflectiontostringbuilderutils.ReflectionToStringBuilderUtils;
 import org.apache.cloudstack.utils.volume.VirtualMachineDiskInfo;
 import org.apache.commons.collections.CollectionUtils;
@@ -346,6 +347,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
     @Inject
     protected StoragePoolDetailsDao storagePoolDetailsDao;
 
+
     protected Gson _gson;
 
     private static final List<HypervisorType> SupportedHypervisorsForVolResize = Arrays.asList(HypervisorType.KVM, HypervisorType.XenServer,
@@ -375,6 +377,9 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
             "Time (in milliseconds) to wait before assuming the VM was unable to detach a volume after the hypervisor sends the detach command.",
             true);
 
+    public static ConfigKey<Long> storageTagRuleExecutionTimeout = new ConfigKey<>("Advanced", Long.class, "storage.tag.rule.execution.timeout", "2000", "The maximum runtime,"
+            + " in milliseconds, to execute a storage tag rule; if it is reached, a timeout will happen.", true);
+
     private final StateMachine2<Volume.State, Volume.Event, Volume> _volStateMachine;
 
     private static final Set<Volume.State> STATES_VOLUME_CANNOT_BE_DESTROYED = new HashSet<>(Arrays.asList(Volume.State.Destroy, Volume.State.Expunging, Volume.State.Expunged, Volume.State.Allocated));
@@ -3266,7 +3271,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
         }
         if (!doesTargetStorageSupportDiskOffering(destPool, newDiskOffering)) {
             throw new InvalidParameterValueException(String.format("Migration failed: target pool [%s, tags:%s] has no matching tags for volume [%s, uuid:%s, tags:%s]", destPool.getName(),
-                    getStoragePoolTags(destPool), volume.getName(), volume.getUuid(), newDiskOffering.getTags()));
+                    storagePoolTagsDao.getStoragePoolTags(destPool.getId()), volume.getName(), volume.getUuid(), newDiskOffering.getTags()));
         }
         if (volume.getVolumeType().equals(Volume.Type.ROOT)) {
             VMInstanceVO vm = null;
@@ -3329,17 +3334,31 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
 
     @Override
     public boolean doesTargetStorageSupportDiskOffering(StoragePool destPool, String diskOfferingTags) {
-        if (StringUtils.isBlank(diskOfferingTags)) {
+        Pair<List<String>, Boolean> storagePoolTags = getStoragePoolTags(destPool);
+        if ((storagePoolTags == null || !storagePoolTags.second()) && org.apache.commons.lang.StringUtils.isBlank(diskOfferingTags)) {
+            if (storagePoolTags == null) {
+                s_logger.debug(String.format("Destination storage pool [%s] does not have any tags, and so does the disk offering. Therefore, they are compatible", destPool.getUuid()));
+            } else {
+                s_logger.debug("Destination storage pool has tags [%s], and the disk offering has no tags. Therefore, they are compatible.");
+            }
             return true;
         }
-        String storagePoolTags = getStoragePoolTags(destPool);
-        if (StringUtils.isBlank(storagePoolTags)) {
+        if (storagePoolTags == null || CollectionUtils.isEmpty(storagePoolTags.first())) {
+            s_logger.debug(String.format("Destination storage pool [%s] has no tags, while disk offering has tags [%s]. Therefore, they are not compatible", destPool.getUuid(),
+                    diskOfferingTags));
             return false;
         }
-        String[] storageTagsAsStringArray = StringUtils.split(storagePoolTags, ",");
-        String[] newDiskOfferingTagsAsStringArray = StringUtils.split(diskOfferingTags, ",");
+        List<String> storageTagsList = storagePoolTags.first();
+        String[] newDiskOfferingTagsAsStringArray = org.apache.commons.lang.StringUtils.split(diskOfferingTags, ",");
 
-        return CollectionUtils.isSubCollection(Arrays.asList(newDiskOfferingTagsAsStringArray), Arrays.asList(storageTagsAsStringArray));
+        boolean result;
+        if (storagePoolTags.second()) {
+            result =  TagAsRuleHelper.interpretTagAsRule(storageTagsList.get(0), diskOfferingTags, storageTagRuleExecutionTimeout.value());
+        } else {
+            result = CollectionUtils.isSubCollection(Arrays.asList(newDiskOfferingTagsAsStringArray), storageTagsList);
+        }
+        s_logger.debug(String.format("Destination storage pool [%s] accepts tags [%s]? %s", destPool.getUuid(), diskOfferingTags, result));
+        return result;
     }
 
     public static boolean doesNewDiskOfferingHasTagsAsOldDiskOffering(DiskOfferingVO oldDO, DiskOfferingVO newDO) {
@@ -3355,14 +3374,17 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
     }
 
     /**
-     *  Retrieves the storage pool tags as a {@link String}. If the storage pool does not have tags we return a null value.
+     *  Returns a {@link Pair}, where the first value is the list of the StoragePool tags, and the second value is whether the returned tags are to be interpreted as a rule,
+     *  or a normal list of tags.
+     *  <br><br>
+     *  If the storage pool does not have tags we return a null value.
      */
-    protected String getStoragePoolTags(StoragePool destPool) {
-        List<String> destPoolTags = storagePoolTagsDao.getStoragePoolTags(destPool.getId());
+    protected Pair<List<String>, Boolean> getStoragePoolTags(StoragePool destPool) {
+        List<StoragePoolTagVO> destPoolTags = storagePoolTagsDao.findStoragePoolTags(destPool.getId());
         if (CollectionUtils.isEmpty(destPoolTags)) {
             return null;
         }
-        return StringUtils.join(destPoolTags, ",");
+        return new Pair<>(destPoolTags.parallelStream().map(StoragePoolTagVO::getTag).collect(Collectors.toList()), destPoolTags.get(0).isTagARule());
     }
 
     private Volume orchestrateMigrateVolume(VolumeVO volume, StoragePool destPool, boolean liveMigrateVolume, DiskOfferingVO newDiskOffering) {
diff --git a/server/src/main/java/com/cloud/storage/listener/StoragePoolMonitor.java b/server/src/main/java/com/cloud/storage/listener/StoragePoolMonitor.java
index 63ae60411ab..d6101046383 100644
--- a/server/src/main/java/com/cloud/storage/listener/StoragePoolMonitor.java
+++ b/server/src/main/java/com/cloud/storage/listener/StoragePoolMonitor.java
@@ -101,7 +101,7 @@ public class StoragePoolMonitor implements Listener {
                 scCmd.getHypervisorType() == HypervisorType.Ovm || scCmd.getHypervisorType() == HypervisorType.Hyperv ||
                 scCmd.getHypervisorType() == HypervisorType.LXC || scCmd.getHypervisorType() == HypervisorType.Ovm3) {
                 List<StoragePoolVO> pools = _poolDao.listBy(host.getDataCenterId(), host.getPodId(), host.getClusterId(), ScopeType.CLUSTER);
-                List<StoragePoolVO> zoneStoragePoolsByTags = _poolDao.findZoneWideStoragePoolsByTags(host.getDataCenterId(), null);
+                List<StoragePoolVO> zoneStoragePoolsByTags = _poolDao.findZoneWideStoragePoolsByTags(host.getDataCenterId(), null, false);
                 List<StoragePoolVO> zoneStoragePoolsByHypervisor = _poolDao.findZoneWideStoragePoolsByHypervisor(host.getDataCenterId(), scCmd.getHypervisorType());
                 zoneStoragePoolsByTags.retainAll(zoneStoragePoolsByHypervisor);
                 pools.addAll(zoneStoragePoolsByTags);
diff --git a/server/src/test/java/com/cloud/configuration/ConfigurationManagerTest.java b/server/src/test/java/com/cloud/configuration/ConfigurationManagerTest.java
index 8493535cf85..c2d748ee587 100644
--- a/server/src/test/java/com/cloud/configuration/ConfigurationManagerTest.java
+++ b/server/src/test/java/com/cloud/configuration/ConfigurationManagerTest.java
@@ -58,6 +58,7 @@ import com.cloud.network.dao.PhysicalNetworkDao;
 import com.cloud.network.dao.PhysicalNetworkVO;
 import com.cloud.projects.ProjectManager;
 import com.cloud.storage.DiskOfferingVO;
+import com.cloud.storage.StoragePoolTagVO;
 import com.cloud.storage.VolumeVO;
 import com.cloud.storage.dao.DiskOfferingDao;
 import com.cloud.storage.dao.StoragePoolTagsDao;
@@ -1163,12 +1164,17 @@ public class ConfigurationManagerTest {
     @Test
     public void updateDiskOfferingTagsWithPrimaryStorageWithCorrectTagsTestSuccess(){
         String tags = "tag1,tag2";
-        List<String> storageTagsWithCorrectTags = new ArrayList<>(Arrays.asList("tag1","tag2"));
         List<StoragePoolVO> pools = new ArrayList<>(Arrays.asList(storagePoolVO));
         List<VolumeVO> volumes = new ArrayList<>(Arrays.asList(volumeVO));
 
+        StoragePoolTagVO poolTagMock1 = Mockito.mock(StoragePoolTagVO.class);
+        StoragePoolTagVO poolTagMock2 = Mockito.mock(StoragePoolTagVO.class);
+        List<StoragePoolTagVO> poolTags = List.of(poolTagMock1, poolTagMock2);
+        Mockito.doReturn("tag1").when(poolTagMock1).getTag();
+        Mockito.doReturn("tag2").when(poolTagMock2).getTag();
+
         Mockito.when(primaryDataStoreDao.listStoragePoolsWithActiveVolumesByOfferingId(anyLong())).thenReturn(pools);
-        Mockito.when(storagePoolTagsDao.getStoragePoolTags(anyLong())).thenReturn(storageTagsWithCorrectTags);
+        Mockito.when(storagePoolTagsDao.findStoragePoolTags(anyLong())).thenReturn(poolTags);
         Mockito.when(diskOfferingDao.findById(anyLong())).thenReturn(diskOfferingVOMock);
         Mockito.when(_volumeDao.findByDiskOfferingId(anyLong())).thenReturn(volumes);
 
diff --git a/server/src/test/java/com/cloud/storage/VolumeApiServiceImplTest.java b/server/src/test/java/com/cloud/storage/VolumeApiServiceImplTest.java
index 8ea14846bef..d56a223082a 100644
--- a/server/src/test/java/com/cloud/storage/VolumeApiServiceImplTest.java
+++ b/server/src/test/java/com/cloud/storage/VolumeApiServiceImplTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.when;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -117,6 +118,7 @@ import com.cloud.user.ResourceLimitService;
 import com.cloud.user.User;
 import com.cloud.user.UserVO;
 import com.cloud.user.dao.AccountDao;
+import com.cloud.utils.Pair;
 import com.cloud.utils.db.TransactionLegacy;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.fsm.NoTransitionException;
@@ -650,29 +652,35 @@ public class VolumeApiServiceImplTest {
 
     @Test
     public void getStoragePoolTagsTestStorageWithoutTags() {
-        Mockito.when(storagePoolTagsDao.getStoragePoolTags(storagePoolMockId)).thenReturn(new ArrayList<>());
-
-        String returnedStoragePoolTags = volumeApiServiceImpl.getStoragePoolTags(storagePoolMock);
+        Pair<List<String>, Boolean> returnedStoragePoolTags = volumeApiServiceImpl.getStoragePoolTags(storagePoolMock);
 
         Assert.assertNull(returnedStoragePoolTags);
     }
 
     @Test
     public void getStoragePoolTagsTestStorageWithTags() {
-        ArrayList<String> tags = new ArrayList<>();
-        String tag1 = "tag1";
-        String tag2 = "tag2";
-        String tag3 = "tag3";
+        StoragePoolTagVO tag1 = new StoragePoolTagVO(1,"tag1", false);
+        StoragePoolTagVO tag2 = new StoragePoolTagVO(1,"tag2", false);
+        StoragePoolTagVO tag3 = new StoragePoolTagVO(1,"tag3", false);
+        List<StoragePoolTagVO> tags = Arrays.asList(tag1, tag2, tag3);
+
+        Mockito.when(storagePoolTagsDao.findStoragePoolTags(storagePoolMockId)).thenReturn(tags);
+
+        Pair<List<String>, Boolean> returnedStoragePoolTags = volumeApiServiceImpl.getStoragePoolTags(storagePoolMock);
+
+        Assert.assertEquals(new Pair<>(Arrays.asList("tag1","tag2","tag3"), false), returnedStoragePoolTags);
+    }
 
-        tags.add(tag1);
-        tags.add(tag2);
-        tags.add(tag3);
+    @Test
+    public void getStoragePoolTagsTestStorageWithRuleTag() {
+        StoragePoolTagVO tag1 = new StoragePoolTagVO(1,"tag1", true);
+        List<StoragePoolTagVO> tags = List.of(tag1);
 
-        Mockito.when(storagePoolTagsDao.getStoragePoolTags(storagePoolMockId)).thenReturn(tags);
+        Mockito.when(storagePoolTagsDao.findStoragePoolTags(storagePoolMockId)).thenReturn(tags);
 
-        String returnedStoragePoolTags = volumeApiServiceImpl.getStoragePoolTags(storagePoolMock);
+        Pair<List<String>, Boolean> returnedStoragePoolTags = volumeApiServiceImpl.getStoragePoolTags(storagePoolMock);
 
-        Assert.assertEquals("tag1,tag2,tag3", returnedStoragePoolTags);
+        Assert.assertEquals(new Pair<>(List.of("tag1"), true), returnedStoragePoolTags);
     }
 
     @Test
@@ -757,7 +765,7 @@ public class VolumeApiServiceImplTest {
 
         Mockito.when(newDiskOfferingMock.getTags()).thenReturn("tag1");
 
-        Mockito.doReturn("tag1").when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
+        Mockito.doReturn(new Pair<>(List.of("tag1"), false)).when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
 
         volumeApiServiceImpl.validateConditionsToReplaceDiskOfferingOfVolume(volumeVoMock, newDiskOfferingMock, storagePoolMock);
 
@@ -1138,7 +1146,7 @@ public class VolumeApiServiceImplTest {
         Mockito.doReturn("A,B,C").when(diskOfferingVoMock).getTags();
 
         StoragePool storagePoolMock = Mockito.mock(StoragePool.class);
-        Mockito.doReturn("A").when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
+        Mockito.doReturn(new Pair<>(List.of("A"), false)).when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
 
         boolean result = volumeApiServiceImpl.doesTargetStorageSupportDiskOffering(storagePoolMock, diskOfferingVoMock);
 
@@ -1151,7 +1159,7 @@ public class VolumeApiServiceImplTest {
         Mockito.doReturn("A,B,C").when(diskOfferingVoMock).getTags();
 
         StoragePool storagePoolMock = Mockito.mock(StoragePool.class);
-        Mockito.doReturn("A,B,C,D,X,Y").when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
+        Mockito.doReturn(new Pair<>(List.of("A","B","C","D","X","Y"), false)).when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
 
         boolean result = volumeApiServiceImpl.doesTargetStorageSupportDiskOffering(storagePoolMock, diskOfferingVoMock);
 
@@ -1164,7 +1172,7 @@ public class VolumeApiServiceImplTest {
         Mockito.doReturn("").when(diskOfferingVoMock).getTags();
 
         StoragePool storagePoolMock = Mockito.mock(StoragePool.class);
-        Mockito.lenient().doReturn("A,B,C,D,X,Y").when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
+        Mockito.lenient().doReturn(new Pair<>(List.of("A,B,C,D,X,Y"), false)).when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
 
         boolean result = volumeApiServiceImpl.doesTargetStorageSupportDiskOffering(storagePoolMock, diskOfferingVoMock);
 
@@ -1177,7 +1185,7 @@ public class VolumeApiServiceImplTest {
         Mockito.doReturn("A").when(diskOfferingVoMock).getTags();
 
         StoragePool storagePoolMock = Mockito.mock(StoragePool.class);
-        Mockito.doReturn("").when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
+        Mockito.doReturn(new Pair<>(List.of(""), false)).when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
 
         boolean result = volumeApiServiceImpl.doesTargetStorageSupportDiskOffering(storagePoolMock, diskOfferingVoMock);
 
@@ -1190,7 +1198,7 @@ public class VolumeApiServiceImplTest {
         Mockito.doReturn("").when(diskOfferingVoMock).getTags();
 
         StoragePool storagePoolMock = Mockito.mock(StoragePool.class);
-        Mockito.lenient().doReturn("").when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
+        Mockito.lenient().doReturn(new Pair<>(List.of(""), false)).when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
 
         boolean result = volumeApiServiceImpl.doesTargetStorageSupportDiskOffering(storagePoolMock, diskOfferingVoMock);
 
@@ -1203,7 +1211,7 @@ public class VolumeApiServiceImplTest {
         Mockito.doReturn("A,B").when(diskOfferingVoMock).getTags();
 
         StoragePool storagePoolMock = Mockito.mock(StoragePool.class);
-        Mockito.doReturn("C,D").when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
+        Mockito.doReturn(new Pair<>(List.of("C,D"), false)).when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
 
         boolean result = volumeApiServiceImpl.doesTargetStorageSupportDiskOffering(storagePoolMock, diskOfferingVoMock);
 
@@ -1216,13 +1224,52 @@ public class VolumeApiServiceImplTest {
         Mockito.doReturn("A").when(diskOfferingVoMock).getTags();
 
         StoragePool storagePoolMock = Mockito.mock(StoragePool.class);
-        Mockito.doReturn("A").when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
+        Mockito.doReturn(new Pair<>(List.of("A"), false)).when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
 
         boolean result = volumeApiServiceImpl.doesTargetStorageSupportDiskOffering(storagePoolMock, diskOfferingVoMock);
 
         Assert.assertTrue(result);
     }
 
+    @Test
+    public void doesTargetStorageSupportDiskOfferingTestStorageRuleTagWithDiskOfferingTagThatMatches() {
+        DiskOfferingVO diskOfferingVoMock = Mockito.mock(DiskOfferingVO.class);
+        Mockito.doReturn("A").when(diskOfferingVoMock).getTags();
+
+        StoragePool storagePoolMock = Mockito.mock(StoragePool.class);
+        Mockito.doReturn(new Pair<>(List.of("tags[0] == 'A'"), true)).when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
+
+        boolean result = volumeApiServiceImpl.doesTargetStorageSupportDiskOffering(storagePoolMock, diskOfferingVoMock);
+
+        Assert.assertTrue(result);
+    }
+
+    @Test
+    public void doesTargetStorageSupportDiskOfferingTestStorageRuleTagWithDiskOfferingTagThatDoesNotMatch() {
+        DiskOfferingVO diskOfferingVoMock = Mockito.mock(DiskOfferingVO.class);
+        Mockito.doReturn("'").when(diskOfferingVoMock).getTags();
+
+        StoragePool storagePoolMock = Mockito.mock(StoragePool.class);
+        Mockito.doReturn(new Pair<>(List.of("tags[0] == 'A'"), true)).when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
+
+        boolean result = volumeApiServiceImpl.doesTargetStorageSupportDiskOffering(storagePoolMock, diskOfferingVoMock);
+
+        Assert.assertFalse(result);
+    }
+
+    @Test
+    public void doesTargetStorageSupportDiskOfferingTestStorageRuleTagWithNullDiskOfferingTag() {
+        DiskOfferingVO diskOfferingVoMock = Mockito.mock(DiskOfferingVO.class);
+        Mockito.doReturn(null).when(diskOfferingVoMock).getTags();
+
+        StoragePool storagePoolMock = Mockito.mock(StoragePool.class);
+        Mockito.doReturn(new Pair<>(List.of("tags[0] == 'A'"), true)).when(volumeApiServiceImpl).getStoragePoolTags(storagePoolMock);
+
+        boolean result = volumeApiServiceImpl.doesTargetStorageSupportDiskOffering(storagePoolMock, diskOfferingVoMock);
+
+        Assert.assertFalse(result);
+    }
+
     @Test
     public void validateIfVmHaveBackupsTestExceptionWhenTryToDetachVolumeFromVMWhichBackupOffering() {
         try {
diff --git a/server/src/test/java/com/cloud/storage/listener/StoragePoolMonitorTest.java b/server/src/test/java/com/cloud/storage/listener/StoragePoolMonitorTest.java
index b892b80c9d4..fa6b71d0cb2 100644
--- a/server/src/test/java/com/cloud/storage/listener/StoragePoolMonitorTest.java
+++ b/server/src/test/java/com/cloud/storage/listener/StoragePoolMonitorTest.java
@@ -60,7 +60,7 @@ public class StoragePoolMonitorTest {
     @Test
     public void testProcessConnectStoragePoolNormal() throws Exception {
         Mockito.when(poolDao.listBy(nullable(Long.class), nullable(Long.class), nullable(Long.class), Mockito.any(ScopeType.class))).thenReturn(Collections.singletonList(pool));
-        Mockito.when(poolDao.findZoneWideStoragePoolsByTags(Mockito.anyLong(), Mockito.any(String[].class))).thenReturn(Collections.<StoragePoolVO>emptyList());
+        Mockito.when(poolDao.findZoneWideStoragePoolsByTags(Mockito.anyLong(), Mockito.any(String[].class), Mockito.anyBoolean())).thenReturn(Collections.<StoragePoolVO>emptyList());
         Mockito.when(poolDao.findZoneWideStoragePoolsByHypervisor(Mockito.anyLong(), Mockito.any(Hypervisor.HypervisorType.class))).thenReturn(Collections.<StoragePoolVO>emptyList());
         Mockito.doReturn(true).when(storageManager).connectHostToSharedPool(host.getId(), pool.getId());
 
@@ -73,7 +73,7 @@ public class StoragePoolMonitorTest {
     @Test
     public void testProcessConnectStoragePoolFailureOnHost() throws Exception {
         Mockito.when(poolDao.listBy(Mockito.anyLong(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(ScopeType.class))).thenReturn(Collections.singletonList(pool));
-        Mockito.when(poolDao.findZoneWideStoragePoolsByTags(Mockito.anyLong(), Mockito.any(String[].class))).thenReturn(Collections.<StoragePoolVO>emptyList());
+        Mockito.when(poolDao.findZoneWideStoragePoolsByTags(Mockito.anyLong(), Mockito.any(String[].class), Mockito.anyBoolean())).thenReturn(Collections.<StoragePoolVO>emptyList());
         Mockito.when(poolDao.findZoneWideStoragePoolsByHypervisor(Mockito.anyLong(), Mockito.any(Hypervisor.HypervisorType.class))).thenReturn(Collections.<StoragePoolVO>emptyList());
         Mockito.doThrow(new StorageUnavailableException("unable to mount storage", 123L)).when(storageManager).connectHostToSharedPool(Mockito.anyLong(), Mockito.anyLong());
 
diff --git a/ui/public/locales/en.json b/ui/public/locales/en.json
index 199437c8033..17da726ae45 100644
--- a/ui/public/locales/en.json
+++ b/ui/public/locales/en.json
@@ -1081,6 +1081,7 @@
 "label.isdedicated": "Dedicated",
 "label.isdefault": "Is default",
 "label.isdynamicallyscalable": "Dynamically scalable",
+"label.istagarule": "Tag as JS rule",
 "label.isextractable": "Extractable",
 "label.isfeatured": "Featured",
 "label.isforced": "Force delete",
diff --git a/ui/public/locales/pt_BR.json b/ui/public/locales/pt_BR.json
index 5b275a12602..c36a5c762a7 100644
--- a/ui/public/locales/pt_BR.json
+++ b/ui/public/locales/pt_BR.json
@@ -844,6 +844,7 @@
 "label.isdedicated": "Dedicado",
 "label.isdefault": "\u00c9\u0089 padr\u00e3o",
 "label.isdynamicallyscalable": "Dinamicamente escal\u00e1vel",
+"label.istagarule": "Tag como regra JS",
 "label.isextractable": "Extra\u00edvel",
 "label.isfeatured": "Em destaque",
 "label.isforced": "For\u00e7ar exclus\u00e3o",
diff --git a/ui/src/config/section/infra/hosts.js b/ui/src/config/section/infra/hosts.js
index af3ad1dbcb5..9a6fc021152 100644
--- a/ui/src/config/section/infra/hosts.js
+++ b/ui/src/config/section/infra/hosts.js
@@ -68,7 +68,7 @@ export default {
       icon: 'edit-outlined',
       label: 'label.edit',
       dataView: true,
-      args: ['name', 'hosttags', 'oscategoryid'],
+      args: ['name', 'hosttags', 'istagarule', 'oscategoryid'],
       mapping: {
         oscategoryid: {
           api: 'listOsCategories'
diff --git a/ui/src/config/section/infra/primaryStorages.js b/ui/src/config/section/infra/primaryStorages.js
index 6f51c7bd0a3..f222edeaf70 100644
--- a/ui/src/config/section/infra/primaryStorages.js
+++ b/ui/src/config/section/infra/primaryStorages.js
@@ -89,7 +89,7 @@ export default {
       icon: 'edit-outlined',
       label: 'label.edit',
       dataView: true,
-      args: ['name', 'tags', 'capacitybytes', 'capacityiops']
+      args: ['name', 'tags', 'istagarule', 'capacitybytes', 'capacityiops']
     },
     {
       api: 'updateStoragePool',
diff --git a/utils/src/main/java/org/apache/cloudstack/utils/jsinterpreter/TagAsRuleHelper.java b/utils/src/main/java/org/apache/cloudstack/utils/jsinterpreter/TagAsRuleHelper.java
new file mode 100644
index 00000000000..114818afc93
--- /dev/null
+++ b/utils/src/main/java/org/apache/cloudstack/utils/jsinterpreter/TagAsRuleHelper.java
@@ -0,0 +1,51 @@
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+package org.apache.cloudstack.utils.jsinterpreter;
+
+import com.cloud.utils.exception.CloudRuntimeException;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+public class TagAsRuleHelper {
+
+    private static final Logger LOGGER = Logger.getLogger(TagAsRuleHelper.class);
+
+    private static final String PARSE_TAGS = "tags = tags ? tags.split(',') : [];";
+
+
+    public static boolean interpretTagAsRule(String rule, String tags, long timeout) {
+        String script = PARSE_TAGS + rule;
+        tags = String.format("'%s'", StringEscapeUtils.escapeEcmaScript(tags));
+        try (JsInterpreter jsInterpreter = new JsInterpreter(timeout)) {
+            jsInterpreter.injectVariable("tags", tags);
+            Object scriptReturn = jsInterpreter.executeScript(script);
+            if (scriptReturn instanceof Boolean) {
+                return (Boolean)scriptReturn;
+            }
+        } catch (IOException ex) {
+            String message = String.format("Error while executing script [%s].", script);
+            LOGGER.error(message, ex);
+            throw new CloudRuntimeException(message, ex);
+        }
+
+        LOGGER.debug(String.format("Result of tag rule [%s] was not a boolean, returning false.", script));
+        return false;
+    }
+
+}