You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ab...@apache.org on 2019/09/25 20:44:18 UTC

[ranger] branch master updated: RANGER-2510: Support for Incremental tag updates to improve performance

This is an automated email from the ASF dual-hosted git repository.

abhay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new c31d4e4  RANGER-2510: Support for Incremental tag updates to improve performance
c31d4e4 is described below

commit c31d4e45281d18b2f18be315d58dc21ea02c7c47
Author: Abhay Kulkarni <ak...@cloudera.com>
AuthorDate: Wed Sep 25 13:44:05 2019 -0700

    RANGER-2510: Support for Incremental tag updates to improve performance
---
 .../ranger/admin/client/RangerAdminRESTClient.java |  19 +-
 .../plugin/contextenricher/RangerTagEnricher.java  | 427 +++++++++++++++++----
 .../ranger/plugin/model/RangerPolicyDelta.java     |   3 +-
 .../policyengine/RangerPolicyEngineImpl.java       | 330 ++++++++++------
 .../policyengine/RangerPolicyRepository.java       |  86 ++++-
 .../RangerDefaultPolicyResourceMatcher.java        |   4 +-
 .../ranger/plugin/service/RangerBasePlugin.java    |   2 +-
 .../org/apache/ranger/plugin/store/TagStore.java   |   7 +-
 .../apache/ranger/plugin/store/TagValidator.java   |  23 +-
 .../apache/ranger/plugin/util/RangerRESTUtils.java |   1 +
 .../ranger/plugin/util/RangerResourceTrie.java     |  76 ++++
 .../plugin/util/RangerServiceTagsDeltaUtil.java    | 177 +++++++++
 .../org/apache/ranger/plugin/util/ServiceTags.java |  28 ++
 .../plugin/policyengine/TestPolicyEngine.java      |  15 +-
 .../policyengine/TestPolicyEngineComparison.java   | 191 +++++++++
 .../comparison/fail/myServicePolicies.json         | 177 +++++++++
 .../comparison/fail/myServiceTags.json             |  55 +++
 .../comparison/fail/otherServicePolicies.json      | 225 +++++++++++
 .../comparison/fail/otherServiceTags.json          |  55 +++
 .../comparison/success/myServicePolicies.json      |   2 +
 .../comparison/success/myServiceTags.json          |   1 +
 .../comparison/success/otherServicePolicies.json   |   2 +
 .../comparison/success/otherServiceTags.json       |   1 +
 .../policyengine/test_compare_policyengines.json   |  30 ++
 .../policyengine/test_policyengine_hive.json       |  33 +-
 .../test_policyengine_hive_incremental_add.json    |  62 ++-
 .../test_policyengine_hive_incremental_delete.json | 146 ++++++-
 .../test_policyengine_hive_incremental_update.json |  60 ++-
 .../admin/client/RangerAdminJersey2RESTClient.java |   9 +-
 .../optimized/current/ranger_core_db_mysql.sql     |  15 +
 .../mysql/patches/043-add-tag-change-log-table.sql |  31 ++
 .../optimized/current/ranger_core_db_oracle.sql    |  18 +
 .../patches/043-add-tag-change-log-table.sql       |  60 +++
 .../optimized/current/ranger_core_db_postgres.sql  |  21 +
 .../patches/043-add-tag-change-log-table.sql       |  39 ++
 .../current/ranger_core_db_sqlanywhere.sql         |  20 +
 .../patches/043-add-tag-change-log-table.sql       |  62 +++
 .../optimized/current/ranger_core_db_sqlserver.sql |  32 ++
 .../patches/043-add-tag-change-log-table.sql       |  53 +++
 .../apache/ranger/biz/RangerTagDBRetriever.java    |   7 +-
 .../java/org/apache/ranger/biz/ServiceDBStore.java | 205 +++++++---
 .../java/org/apache/ranger/biz/TagDBStore.java     | 238 ++++++++++--
 .../ranger/common/RangerAdminTagEnricher.java      |  63 ++-
 .../ranger/common/RangerServiceTagsCache.java      | 199 ++++++----
 .../org/apache/ranger/db/RangerDaoManagerBase.java |   5 +-
 .../org/apache/ranger/db/XXPolicyChangeLogDao.java |  29 +-
 .../apache/ranger/db/XXServiceVersionInfoDao.java  |  72 ++--
 .../org/apache/ranger/db/XXTagChangeLogDao.java    | 122 ++++++
 .../org/apache/ranger/entity/XXTagChangeLog.java   | 219 +++++++++++
 .../java/org/apache/ranger/rest/PublicAPIsv2.java  |   2 +-
 .../java/org/apache/ranger/rest/ServiceREST.java   |   9 +-
 .../main/java/org/apache/ranger/rest/TagREST.java  |  34 +-
 .../service/RangerServiceResourceService.java      |   2 +-
 .../apache/ranger/service/RangerTagDefService.java |   2 +-
 .../service/RangerTagResourceMapService.java       |  13 +-
 .../apache/ranger/service/RangerTagService.java    |  20 +-
 .../main/resources/META-INF/jpa_named_queries.xml  |  20 +
 .../java/org/apache/ranger/rest/TestTagREST.java   |  44 +--
 .../ranger/service/TestRangerTagDefService.java    |   4 +-
 .../service/TestRangerTagResourceMapService.java   |   8 -
 60 files changed, 3382 insertions(+), 533 deletions(-)

diff --git a/agents-common/src/main/java/org/apache/ranger/admin/client/RangerAdminRESTClient.java b/agents-common/src/main/java/org/apache/ranger/admin/client/RangerAdminRESTClient.java
index 01ee945..5939f38 100644
--- a/agents-common/src/main/java/org/apache/ranger/admin/client/RangerAdminRESTClient.java
+++ b/agents-common/src/main/java/org/apache/ranger/admin/client/RangerAdminRESTClient.java
@@ -49,10 +49,11 @@ public class RangerAdminRESTClient extends AbstractRangerAdminClient {
 	private String           serviceName;
     private String           serviceNameUrlParam;
 	private String           pluginId;
-	private String clusterName;
+	private String           clusterName;
 	private RangerRESTClient restClient;
-	private RangerRESTUtils restUtils   = new RangerRESTUtils();
-	private String 		 supportsPolicyDeltas = "true";
+	private RangerRESTUtils  restUtils   = new RangerRESTUtils();
+	private String 		     supportsPolicyDeltas;
+	private String 		     supportsTagDeltas;
 
 	public static <T> GenericType<List<T>> getGenericType(final T clazz) {
 
@@ -83,11 +84,13 @@ public class RangerAdminRESTClient extends AbstractRangerAdminClient {
 		String sslConfigFileName 		= RangerConfiguration.getInstance().get(propertyPrefix + ".policy.rest.ssl.config.file");
 		clusterName       				= RangerConfiguration.getInstance().get(propertyPrefix + ".access.cluster.name", "");
 		if(StringUtil.isEmpty(clusterName)){
-			clusterName       				= RangerConfiguration.getInstance().get(propertyPrefix + ".ambari.cluster.name", "");
+			clusterName = RangerConfiguration.getInstance().get(propertyPrefix + ".ambari.cluster.name", "");
 		}
 		int	 restClientConnTimeOutMs	= RangerConfiguration.getInstance().getInt(propertyPrefix + ".policy.rest.client.connection.timeoutMs", 120 * 1000);
 		int	 restClientReadTimeOutMs	= RangerConfiguration.getInstance().getInt(propertyPrefix + ".policy.rest.client.read.timeoutMs", 30 * 1000);
-		supportsPolicyDeltas                    = RangerConfiguration.getInstance().get(propertyPrefix + ".policy.rest.supports.policy.deltas", "false");
+		supportsPolicyDeltas            = RangerConfiguration.getInstance().get(propertyPrefix + ".policy.rest.supports.policy.deltas", "false");
+		supportsTagDeltas               = RangerConfiguration.getInstance().get(propertyPrefix + ".tag.rest.supports.tag.deltas", "false");
+
         if (!StringUtil.isEmpty(tmpUrl)) {
             url = tmpUrl.trim();
         }
@@ -97,6 +100,9 @@ public class RangerAdminRESTClient extends AbstractRangerAdminClient {
 		if (!"true".equalsIgnoreCase(supportsPolicyDeltas)) {
 			supportsPolicyDeltas = "false";
 		}
+		if (!"true".equalsIgnoreCase(supportsTagDeltas)) {
+			supportsTagDeltas = "false";
+		}
 
 		init(url, sslConfigFileName, restClientConnTimeOutMs , restClientReadTimeOutMs);
 
@@ -701,6 +707,7 @@ public class RangerAdminRESTClient extends AbstractRangerAdminClient {
 		queryParams.put(RangerRESTUtils.LAST_KNOWN_TAG_VERSION_PARAM, Long.toString(lastKnownVersion));
 		queryParams.put(RangerRESTUtils.REST_PARAM_LAST_ACTIVATION_TIME, Long.toString(lastActivationTimeInMillis));
 		queryParams.put(RangerRESTUtils.REST_PARAM_PLUGIN_ID, pluginId);
+		queryParams.put(RangerRESTUtils.REST_PARAM_SUPPORTS_TAG_DELTAS, supportsTagDeltas);
 
 		if (isSecureMode) {
 			PrivilegedAction<ClientResponse> action = new PrivilegedAction<ClientResponse>() {
@@ -712,7 +719,7 @@ public class RangerAdminRESTClient extends AbstractRangerAdminClient {
 					} catch (Exception e) {
 						LOG.error("Failed to get response, Error is : "+e.getMessage());
 					}
-				return clientResp;
+					return clientResp;
 				}
 			};
 			if (LOG.isDebugEnabled()) {
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java b/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
index b596992..3063885 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/contextenricher/RangerTagEnricher.java
@@ -34,6 +34,7 @@ import org.apache.ranger.plugin.model.RangerTag;
 import org.apache.ranger.plugin.model.validation.RangerServiceDefHelper;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
 import org.apache.ranger.plugin.policyengine.RangerAccessResource;
+import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
 import org.apache.ranger.plugin.policyresourcematcher.RangerDefaultPolicyResourceMatcher;
 import org.apache.ranger.plugin.policyresourcematcher.RangerPolicyResourceMatcher;
 import org.apache.ranger.plugin.util.DownloadTrigger;
@@ -44,6 +45,7 @@ import org.apache.ranger.plugin.util.RangerAccessRequestUtil;
 import org.apache.ranger.plugin.util.RangerPerfTracer;
 import org.apache.ranger.plugin.util.RangerResourceTrie;
 import org.apache.ranger.plugin.util.RangerServiceNotFoundException;
+import org.apache.ranger.plugin.util.RangerServiceTagsDeltaUtil;
 import org.apache.ranger.plugin.util.ServiceTags;
 
 import java.io.File;
@@ -67,12 +69,13 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 	private static final Log LOG = LogFactory.getLog(RangerTagEnricher.class);
 
 	private static final Log PERF_CONTEXTENRICHER_INIT_LOG = RangerPerfTracer.getPerfLogger("contextenricher.init");
-	private static final Log PERF_TRIE_OP_LOG = RangerPerfTracer.getPerfLogger("resourcetrie.retrieval");
+	private static final Log PERF_TRIE_OP_LOG              = RangerPerfTracer.getPerfLogger("resourcetrie.retrieval");
+	private static final Log PERF_SET_SERVICETAGS_LOG      = RangerPerfTracer.getPerfLogger("tagenricher.setservicetags");
 
 
-	public static final String TAG_REFRESHER_POLLINGINTERVAL_OPTION = "tagRefresherPollingInterval";
-	public static final String TAG_RETRIEVER_CLASSNAME_OPTION       = "tagRetrieverClassName";
-	public static final String TAG_DISABLE_TRIE_PREFILTER_OPTION    = "disableTrieLookupPrefilter";
+	private static final String TAG_REFRESHER_POLLINGINTERVAL_OPTION = "tagRefresherPollingInterval";
+	private static final String TAG_RETRIEVER_CLASSNAME_OPTION       = "tagRetrieverClassName";
+	private static final String TAG_DISABLE_TRIE_PREFILTER_OPTION    = "disableTrieLookupPrefilter";
 
 	private RangerTagRefresher                 tagRefresher;
 	private RangerTagRetriever                 tagRetriever;
@@ -214,7 +217,7 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 		private final Map<Collection<String>, Boolean> dataMaskHierarchies  = new HashMap<>();
 		private final Map<Collection<String>, Boolean> rowFilterHierarchies = new HashMap<>();
 
-		public Boolean isValidHierarchy(int policyType, Collection<String> resourceKeys) {
+		Boolean isValidHierarchy(int policyType, Collection<String> resourceKeys) {
 			switch (policyType) {
 				case RangerPolicy.POLICY_TYPE_ACCESS:
 					return accessHierarchies.get(resourceKeys);
@@ -227,7 +230,7 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 			}
 		}
 
-		public void addHierarchy(int policyType, Collection<String> resourceKeys, Boolean isValid) {
+		void addHierarchy(int policyType, Collection<String> resourceKeys, Boolean isValid) {
 			switch (policyType) {
 				case RangerPolicy.POLICY_TYPE_ACCESS:
 					accessHierarchies.put(resourceKeys, isValid);
@@ -246,97 +249,226 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 	}
 
 	public void setServiceTags(final ServiceTags serviceTags) {
-		if (serviceTags == null || CollectionUtils.isEmpty(serviceTags.getServiceResources())) {
-			LOG.info("ServiceTags is null or there are no tagged resources for service " + serviceName);
-			enrichedServiceTags = null;
-		} else {
-			List<RangerServiceResourceMatcher> resourceMatchers = new ArrayList<>();
+		boolean rebuildOnlyIndex = false;
+		setServiceTags(serviceTags, rebuildOnlyIndex);
+	}
 
-			RangerServiceDefHelper serviceDefHelper = new RangerServiceDefHelper(serviceDef, false);
+	protected void setServiceTags(final ServiceTags serviceTags, final boolean rebuildOnlyIndex) {
 
-			List<RangerServiceResource> serviceResources = serviceTags.getServiceResources();
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> RangerTagEnricher.setServiceTags(serviceTags=" + serviceTags + ", rebuildOnlyIndex=" + rebuildOnlyIndex + ")");
+		}
 
-			ResourceHierarchies hierarchies = new ResourceHierarchies();
+		if (serviceTags == null) {
+			LOG.info("ServiceTags is null for service " + serviceName);
+			enrichedServiceTags = null;
+		} else  {
+			RangerServiceDefHelper serviceDefHelper = new RangerServiceDefHelper(serviceDef, false);
+			ResourceHierarchies    hierarchies      = new ResourceHierarchies();
 
-			for (RangerServiceResource serviceResource : serviceResources) {
-				final Collection<String> resourceKeys = serviceResource.getResourceElements().keySet();
+			RangerPerfTracer perf = null;
 
-				for (int policyType : RangerPolicy.POLICY_TYPES) {
-					Boolean isValidHierarchy = hierarchies.isValidHierarchy(policyType, resourceKeys);
-					if (isValidHierarchy == null) { // hierarchy not yet validated
-						isValidHierarchy = Boolean.FALSE;
+			if(RangerPerfTracer.isPerfTraceEnabled(PERF_SET_SERVICETAGS_LOG)) {
+				perf = RangerPerfTracer.getPerfTracer(PERF_SET_SERVICETAGS_LOG, "RangerTagEnricher.setServiceTags(newTagVersion=" + serviceTags.getTagVersion() + ",isDelta=" + serviceTags.getIsDelta() + ")");
+			}
 
-						for (List<RangerServiceDef.RangerResourceDef> hierarchy : serviceDefHelper.getResourceHierarchies(policyType)) {
-							if (serviceDefHelper.hierarchyHasAllResources(hierarchy, resourceKeys)) {
-								isValidHierarchy = Boolean.TRUE;
+			if (!serviceTags.getIsDelta()) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Received all service-tags");
+				}
+				if (CollectionUtils.isEmpty(serviceTags.getServiceResources())) {
+					LOG.info("There are no tagged resources for service " + serviceName);
+					enrichedServiceTags = null;
+				} else {
 
-								break;
-							}
+					List<RangerServiceResourceMatcher> resourceMatchers = new ArrayList<>();
+					List<RangerServiceResource> serviceResources = serviceTags.getServiceResources();
+
+					for (RangerServiceResource serviceResource : serviceResources) {
+						RangerServiceResourceMatcher serviceResourceMatcher = createRangerServiceResourceMatcher(serviceResource, serviceDefHelper, hierarchies);
+						if (serviceResourceMatcher != null) {
+							resourceMatchers.add(serviceResourceMatcher);
+						} else {
+							LOG.error("Could not create service-resource-matcher for service-resource:[" + serviceResource + "]");
 						}
+					}
+
+					Map<String, RangerResourceTrie<RangerServiceResourceMatcher>> serviceResourceTrie = null;
+
+					if (!disableTrieLookupPrefilter) {
+						serviceResourceTrie = new HashMap<>();
 
-						hierarchies.addHierarchy(policyType, resourceKeys, isValidHierarchy);
+						for (RangerServiceDef.RangerResourceDef resourceDef : serviceDef.getResources()) {
+							serviceResourceTrie.put(resourceDef.getName(), new RangerResourceTrie<>(resourceDef, resourceMatchers));
+						}
 					}
 
-					if (isValidHierarchy) {
-						RangerDefaultPolicyResourceMatcher matcher = new RangerDefaultPolicyResourceMatcher();
+					enrichedServiceTags = new EnrichedServiceTags(serviceTags, resourceMatchers, serviceResourceTrie);
+				}
+
+			} else {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Received service-tag deltas:" + serviceTags);
+				}
+
+				ServiceTags delta          = serviceTags;
+				ServiceTags oldServiceTags = enrichedServiceTags != null ? enrichedServiceTags.getServiceTags() : new ServiceTags();
 
-						matcher.setServiceDef(this.serviceDef);
-						matcher.setPolicyResources(serviceResource.getResourceElements(), policyType);
+				ServiceTags                                                   newServiceTags         = rebuildOnlyIndex ? oldServiceTags : RangerServiceTagsDeltaUtil.applyDelta(oldServiceTags, delta);
+				List<RangerServiceResourceMatcher>                            newResourceMatchers    = enrichedServiceTags != null ? enrichedServiceTags.getServiceResourceMatchers() : new ArrayList<>();
+				Map<String, RangerResourceTrie<RangerServiceResourceMatcher>> newServiceResourceTrie = enrichedServiceTags != null ? enrichedServiceTags.getServiceResourceTrie() : new HashMap<>();
 
+				if (serviceTags.getTagsChangeExtent() == ServiceTags.TagsChangeExtent.NONE) {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("No change to service-tags other than version change");
+					}
+				} else {
+					if (serviceTags.getTagsChangeExtent() != ServiceTags.TagsChangeExtent.TAGS) {
 						if (LOG.isDebugEnabled()) {
-							LOG.debug("RangerTagEnricher.setServiceTags() - Initializing matcher with (resource=" + serviceResource
-									+ ", serviceDef=" + this.serviceDef.getName() + ")");
+							LOG.debug("Delta contains changes other than tag attribute changes, [" + serviceTags.getTagsChangeExtent() + "]");
+						}
+
+						newServiceResourceTrie = new HashMap<>();
+						newResourceMatchers    = new ArrayList<>();
 
+						if (enrichedServiceTags != null) {
+							newResourceMatchers.addAll(enrichedServiceTags.getServiceResourceMatchers());
 						}
-						matcher.setServiceDefHelper(serviceDefHelper);
-						matcher.init();
 
-						RangerServiceResourceMatcher serviceResourceMatcher = new RangerServiceResourceMatcher(serviceResource, matcher);
-						resourceMatchers.add(serviceResourceMatcher);
-					}
-				}
-			}
+						Map<String, RangerResourceTrie<RangerServiceResourceMatcher>> serviceResourceTrie = enrichedServiceTags != null ? enrichedServiceTags.getServiceResourceTrie() : new HashMap<>();
 
+						for (Map.Entry<String, RangerResourceTrie<RangerServiceResourceMatcher>> entry : serviceResourceTrie.entrySet()) {
+							RangerResourceTrie<RangerServiceResourceMatcher> resourceTrie = new RangerResourceTrie<>(entry.getValue());
+							newServiceResourceTrie.put(entry.getKey(), resourceTrie);
+						}
 
-			Map<String, RangerResourceTrie<RangerServiceResourceMatcher>> serviceResourceTrie = null;
+						List<RangerServiceResource> changedServiceResources = delta.getServiceResources();
 
-			if (!disableTrieLookupPrefilter) {
-				serviceResourceTrie = new HashMap<>();
+						for (RangerServiceResource serviceResource : changedServiceResources) {
 
-				for (RangerServiceDef.RangerResourceDef resourceDef : serviceDef.getResources()) {
-					serviceResourceTrie.put(resourceDef.getName(), new RangerResourceTrie<RangerServiceResourceMatcher>(resourceDef, resourceMatchers));
-				}
-			}
+							if (enrichedServiceTags != null) {
 
-			Set<RangerTagForEval> tagsForEmptyResourceAndAnyAccess = new HashSet<>();
-			for (Map.Entry<Long, RangerTag> entry : serviceTags.getTags().entrySet()) {
-				tagsForEmptyResourceAndAnyAccess.add(new RangerTagForEval(entry.getValue(), RangerPolicyResourceMatcher.MatchType.DESCENDANT));
-			}
+								if (LOG.isDebugEnabled()) {
+									LOG.debug("Removing service-resource:[" + serviceResource + "] from trie-map");
+								}
 
-			enrichedServiceTags = new EnrichedServiceTags(serviceTags, resourceMatchers, serviceResourceTrie, tagsForEmptyResourceAndAnyAccess);
-		}
+								// Remove existing serviceResource from the copy
 
-		RangerAuthContext authContext = getAuthContext();
-		if (authContext != null) {
-			authContext.addOrReplaceRequestContextEnricher(this, enrichedServiceTags);
+								RangerAccessResourceImpl accessResource = new RangerAccessResourceImpl();
 
-			Map<String, RangerBasePlugin> servicePluginMap = RangerBasePlugin.getServicePluginMap();
-			RangerBasePlugin plugin = servicePluginMap != null ? servicePluginMap.get(getServiceName()) : null;
-			if (plugin != null) {
-				plugin.contextChanged();
+								for (Map.Entry<String, RangerPolicy.RangerPolicyResource> entry : serviceResource.getResourceElements().entrySet()) {
+									accessResource.setValue(entry.getKey(), entry.getValue());
+								}
+								if (LOG.isDebugEnabled()) {
+									LOG.debug("RangerAccessResource:[" + accessResource + "] created to represent service-resource[" + serviceResource + "] to find evaluators from trie-map");
+								}
+
+								List<RangerServiceResourceMatcher> oldMatchers = getEvaluators(accessResource, enrichedServiceTags);
+
+								if (LOG.isDebugEnabled()) {
+									LOG.debug("Found [" + oldMatchers.size() + "] matchers for service-resource[" + serviceResource + "]");
+								}
+
+								for (RangerServiceResourceMatcher matcher : oldMatchers) {
+
+									for (String resourceDefName : serviceResource.getResourceElements().keySet()) {
+										RangerResourceTrie<RangerServiceResourceMatcher> trie = newServiceResourceTrie.get(resourceDefName);
+										if (trie != null) {
+											trie.delete(serviceResource.getResourceElements().get(resourceDefName), matcher);
+										} else {
+											LOG.error("Cannot find resourceDef with name:[" + resourceDefName + "]. Should NOT happen!!");
+											LOG.error("Setting tagVersion to -1 to ensure that in the next download all tags are downloaded");
+											newServiceTags.setTagVersion(-1L);
+										}
+									}
+								}
+
+								// Remove old resource matchers
+								newResourceMatchers.removeAll(oldMatchers);
+
+								if (LOG.isDebugEnabled()) {
+									LOG.debug("Found and removed [" + oldMatchers.size() + "] matchers for service-resource[" + serviceResource + "] from trie-map");
+								}
+							}
+
+							if (!StringUtils.isEmpty(serviceResource.getResourceSignature())) {
+
+								RangerServiceResourceMatcher resourceMatcher = createRangerServiceResourceMatcher(serviceResource, serviceDefHelper, hierarchies);
+
+								if (resourceMatcher != null) {
+									for (String resourceDefName : serviceResource.getResourceElements().keySet()) {
+
+										RangerResourceTrie<RangerServiceResourceMatcher> trie = newServiceResourceTrie.get(resourceDefName);
+
+										if (trie == null) {
+											List<RangerServiceDef.RangerResourceDef> resourceDefs = serviceDef.getResources();
+											RangerServiceDef.RangerResourceDef found = null;
+											for (RangerServiceDef.RangerResourceDef resourceDef : resourceDefs) {
+												if (StringUtils.equals(resourceDef.getName(), resourceDefName)) {
+													found = resourceDef;
+													break;
+												}
+											}
+											if (found != null) {
+												List<RangerServiceResourceMatcher> resourceMatchers = new ArrayList<>();
+												trie = new RangerResourceTrie<>(found, resourceMatchers);
+												newServiceResourceTrie.put(resourceDefName, trie);
+											}
+										}
+
+										if (trie != null) {
+											trie.add(serviceResource.getResourceElements().get(resourceDefName), resourceMatcher);
+											if (LOG.isDebugEnabled()) {
+												LOG.debug("Added resource-matcher for service-resource:[" + serviceResource + "]");
+											}
+										} else {
+											LOG.error("Could not create resource-matcher for resource: [" + serviceResource + "]. Should NOT happen!!");
+											LOG.error("Setting tagVersion to -1 to ensure that in the next download all tags are downloaded");
+											newServiceTags.setTagVersion(-1L);
+										}
+									}
+									newResourceMatchers.add(resourceMatcher);
+								} else {
+									LOG.error("Could not create resource-matcher for resource: [" + serviceResource + "]. Should NOT happen!!");
+									LOG.error("Setting tagVersion to -1 to ensure that in the next download all tags are downloaded");
+									newServiceTags.setTagVersion(-1L);
+								}
+							} else {
+								if (LOG.isDebugEnabled()) {
+									LOG.debug("Service-resource:[id=" + serviceResource.getId() + "] is deleted as its resource-signature is empty. No need to create it!");
+								}
+							}
+						}
+					} else {
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("Delta contains only tag attribute changes");
+						}
+					}
+
+					enrichedServiceTags = new EnrichedServiceTags(newServiceTags, newResourceMatchers, newServiceResourceTrie);
+				}
 			}
+			RangerPerfTracer.logAlways(perf);
+		}
+
+		setEnrichedServiceTagsInPlugin();
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== RangerTagEnricher.setServiceTags(serviceTags=" + serviceTags + ", rebuildOnlyIndex=" + rebuildOnlyIndex + ")");
 		}
 
 	}
 
 	protected Long getServiceTagsVersion() {
-		return enrichedServiceTags != null ? enrichedServiceTags.getServiceTags().getTagVersion() : null;
+		return enrichedServiceTags != null ? enrichedServiceTags.getServiceTags().getTagVersion() : -1L;
+	}
+
+	protected Long getResourceTrieVersion() {
+		return enrichedServiceTags != null ? enrichedServiceTags.getResourceTrieVersion() : -1L;
 	}
 
 	@Override
 	public boolean preCleanup() {
-		boolean ret = true;
-
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("==> RangerTagEnricher.preCleanup()");
 		}
@@ -354,9 +486,9 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 		}
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerTagEnricher.preCleanup() : result=" + ret);
+			LOG.debug("<== RangerTagEnricher.preCleanup() : result=" + true);
 		}
-		return ret;
+		return true;
 	}
 
 	public void syncTagsWithAdmin(final DownloadTrigger token) throws InterruptedException {
@@ -364,6 +496,52 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 		token.waitForCompletion();
 	}
 
+	public boolean compare(RangerTagEnricher other) {
+		boolean ret;
+
+		if (enrichedServiceTags.getServiceResourceTrie() != null && other.enrichedServiceTags.getServiceResourceTrie() != null) {
+			ret = enrichedServiceTags.getServiceResourceTrie().size() == other.enrichedServiceTags.getServiceResourceTrie().size();
+
+			if (ret && enrichedServiceTags.getServiceResourceTrie().size() > 0) {
+				for (Map.Entry<String, RangerResourceTrie<RangerServiceResourceMatcher>> entry : enrichedServiceTags.getServiceResourceTrie().entrySet()) {
+					ret = entry.getValue().compareSubtree(other.enrichedServiceTags.getServiceResourceTrie().get(entry.getKey()));
+					if (!ret) {
+						break;
+					}
+				}
+			}
+		} else {
+			ret = enrichedServiceTags.getServiceResourceTrie() == other.enrichedServiceTags.getServiceResourceTrie();
+		}
+
+		if (ret) {
+			// Compare mappings
+			ServiceTags myServiceTags = enrichedServiceTags.getServiceTags();
+			ServiceTags otherServiceTags = other.enrichedServiceTags.getServiceTags();
+
+			ret = StringUtils.equals(myServiceTags.getServiceName(), otherServiceTags.getServiceName()) &&
+					//myServiceTags.getTagVersion().equals(otherServiceTags.getTagVersion()) &&
+					myServiceTags.getTags().size() == otherServiceTags.getTags().size() &&
+					myServiceTags.getServiceResources().size() == otherServiceTags.getServiceResources().size() &&
+					myServiceTags.getResourceToTagIds().size() == otherServiceTags.getResourceToTagIds().size();
+			if (ret) {
+				for (RangerServiceResource serviceResource : myServiceTags.getServiceResources()) {
+					Long serviceResourceId = serviceResource.getId();
+
+					List<Long> myTagsForResource = myServiceTags.getResourceToTagIds().get(serviceResourceId);
+					List<Long> otherTagsForResource = otherServiceTags.getResourceToTagIds().get(serviceResourceId);
+
+					ret = CollectionUtils.size(myTagsForResource) == CollectionUtils.size(otherTagsForResource);
+
+					if (ret && CollectionUtils.size(myTagsForResource) > 0) {
+						ret = myTagsForResource.size() == CollectionUtils.intersection(myTagsForResource, otherTagsForResource).size();
+					}
+				}
+			}
+		}
+
+		return ret;
+	}
 
 	private Set<RangerTagForEval> findMatchingTags(final RangerAccessRequest request, EnrichedServiceTags dataStore) {
 		if (LOG.isDebugEnabled()) {
@@ -427,6 +605,78 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 		return ret;
 	}
 
+	private RangerServiceResourceMatcher createRangerServiceResourceMatcher(RangerServiceResource serviceResource, RangerServiceDefHelper serviceDefHelper, ResourceHierarchies hierarchies) {
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> createRangerServiceResourceMatcher(serviceResource=" + serviceResource + ")");
+		}
+
+		RangerServiceResourceMatcher ret = null;
+
+		final Collection<String> resourceKeys = serviceResource.getResourceElements().keySet();
+
+		for (int policyType : RangerPolicy.POLICY_TYPES) {
+			Boolean isValidHierarchy = hierarchies.isValidHierarchy(policyType, resourceKeys);
+			if (isValidHierarchy == null) { // hierarchy not yet validated
+				isValidHierarchy = Boolean.FALSE;
+
+				for (List<RangerServiceDef.RangerResourceDef> hierarchy : serviceDefHelper.getResourceHierarchies(policyType)) {
+					if (serviceDefHelper.hierarchyHasAllResources(hierarchy, resourceKeys)) {
+						isValidHierarchy = Boolean.TRUE;
+
+						break;
+					}
+				}
+
+				hierarchies.addHierarchy(policyType, resourceKeys, isValidHierarchy);
+			}
+
+			if (isValidHierarchy) {
+				RangerDefaultPolicyResourceMatcher matcher = new RangerDefaultPolicyResourceMatcher();
+
+				matcher.setServiceDef(this.serviceDef);
+				matcher.setPolicyResources(serviceResource.getResourceElements(), policyType);
+
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("RangerTagEnricher.setServiceTags() - Initializing matcher with (resource=" + serviceResource
+							+ ", serviceDef=" + this.serviceDef.getName() + ")");
+
+				}
+				matcher.setServiceDefHelper(serviceDefHelper);
+				matcher.init();
+
+				ret = new RangerServiceResourceMatcher(serviceResource, matcher);
+				break;
+			}
+		}
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== createRangerServiceResourceMatcher(serviceResource=" + serviceResource + ") : [" + ret + "]");
+		}
+		return ret;
+
+	}
+
+	private void setEnrichedServiceTagsInPlugin() {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> setEnrichedServiceTagsInPlugin()");
+		}
+
+		RangerAuthContext authContext = getAuthContext();
+		if (authContext != null) {
+			authContext.addOrReplaceRequestContextEnricher(this, enrichedServiceTags);
+
+			Map<String, RangerBasePlugin> servicePluginMap = RangerBasePlugin.getServicePluginMap();
+			RangerBasePlugin plugin = servicePluginMap != null ? servicePluginMap.get(getServiceName()) : null;
+			if (plugin != null) {
+				plugin.contextChanged();
+			}
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== setEnrichedServiceTagsInPlugin()");
+		}
+	}
+
 	private List<RangerServiceResourceMatcher> getEvaluators(RangerAccessResource resource, EnrichedServiceTags enrichedServiceTags) {
 		if(LOG.isDebugEnabled()) {
 			LOG.debug("==> RangerTagEnricher.getEvaluators(" + (resource != null ? resource.getAsString() : null) + ")");
@@ -460,7 +710,7 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 
 					List<RangerServiceResourceMatcher> serviceResourceMatchers = trie.getEvaluatorsForResource(resource.getValue(resourceName));
 
-					if (CollectionUtils.isEmpty(serviceResourceMatchers)) { // no policies for this resource, bail out
+					if (CollectionUtils.isEmpty(serviceResourceMatchers)) { // no tags for this resource, bail out
 						serviceResourceMatchersList = null;
 						smallestList = null;
 						break;
@@ -484,7 +734,7 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 					ret = new ArrayList<>(smallestList);
 					for (List<RangerServiceResourceMatcher> serviceResourceMatchers : serviceResourceMatchersList) {
 						if (serviceResourceMatchers != smallestList) {
-							// remove policies from ret that are not in serviceResourceMatchers
+							// remove other serviceResourceMatchers from ret that are not in serviceResourceMatchers
 							ret.retainAll(serviceResourceMatchers);
 							if (CollectionUtils.isEmpty(ret)) { // if no policy exists, bail out and return empty list
 								ret = null;
@@ -539,22 +789,32 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 	}
 
 	static private final class EnrichedServiceTags {
-		final private ServiceTags                        serviceTags;
-		final private List<RangerServiceResourceMatcher> serviceResourceMatchers;
+		final private ServiceTags                                                      serviceTags;
+		final private List<RangerServiceResourceMatcher>                               serviceResourceMatchers;
 		final private Map<String, RangerResourceTrie<RangerServiceResourceMatcher>>    serviceResourceTrie;
-		final private Set<RangerTagForEval>              tagsForEmptyResourceAndAnyAccess; // Used only when accessed resource is empty and access type is 'any'
-
-		EnrichedServiceTags(ServiceTags serviceTags, List<RangerServiceResourceMatcher> serviceResourceMatchers,
-							Map<String, RangerResourceTrie<RangerServiceResourceMatcher>> serviceResourceTrie, Set<RangerTagForEval> tagsForEmptyResourceAndAnyAccess) {
-			this.serviceTags             = serviceTags;
-			this.serviceResourceMatchers = serviceResourceMatchers;
-			this.serviceResourceTrie     = serviceResourceTrie;
-			this.tagsForEmptyResourceAndAnyAccess          = tagsForEmptyResourceAndAnyAccess;
+		final private Set<RangerTagForEval>                                            tagsForEmptyResourceAndAnyAccess; // Used only when accessed resource is empty and access type is 'any'
+		final private Long                                                             resourceTrieVersion;
+
+		EnrichedServiceTags(ServiceTags serviceTags, List<RangerServiceResourceMatcher> serviceResourceMatchers, Map<String, RangerResourceTrie<RangerServiceResourceMatcher>> serviceResourceTrie) {
+			this.serviceTags                      = serviceTags;
+			this.serviceResourceMatchers          = serviceResourceMatchers;
+			this.serviceResourceTrie              = serviceResourceTrie;
+			this.tagsForEmptyResourceAndAnyAccess = createTagsForEmptyResourceAndAnyAccess();
+			this.resourceTrieVersion              = serviceTags.getTagVersion();
 		}
-		ServiceTags getServiceTags() {return serviceTags;}
-		List<RangerServiceResourceMatcher> getServiceResourceMatchers() { return serviceResourceMatchers;}
+		ServiceTags                                                   getServiceTags() {return serviceTags;}
+		List<RangerServiceResourceMatcher>                            getServiceResourceMatchers() { return serviceResourceMatchers;}
 		Map<String, RangerResourceTrie<RangerServiceResourceMatcher>> getServiceResourceTrie() { return serviceResourceTrie;}
-		Set<RangerTagForEval> getTagsForEmptyResourceAndAnyAccess() { return tagsForEmptyResourceAndAnyAccess;}
+		Long                                                          getResourceTrieVersion() { return resourceTrieVersion;}
+		Set<RangerTagForEval>                                         getTagsForEmptyResourceAndAnyAccess() { return tagsForEmptyResourceAndAnyAccess;}
+
+		private Set<RangerTagForEval> createTagsForEmptyResourceAndAnyAccess() {
+			Set<RangerTagForEval> tagsForEmptyResourceAndAnyAccess = new HashSet<>();
+			for (Map.Entry<Long, RangerTag> entry : serviceTags.getTags().entrySet()) {
+				tagsForEmptyResourceAndAnyAccess.add(new RangerTagForEval(entry.getValue(), RangerPolicyResourceMatcher.MatchType.DESCENDANT));
+			}
+			return tagsForEmptyResourceAndAnyAccess;
+		}
 	}
 
 	static class RangerTagRefresher extends Thread {
@@ -562,7 +822,7 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 
 		private final RangerTagRetriever tagRetriever;
 		private final RangerTagEnricher tagEnricher;
-		private long lastKnownVersion = -1L;
+		private long lastKnownVersion;
 		private final BlockingQueue<DownloadTrigger> tagDownloadQueue;
 		private long lastActivationTimeInMillis;
 
@@ -626,7 +886,7 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 		private void populateTags() throws InterruptedException {
 
 			if (tagEnricher != null) {
-				ServiceTags serviceTags = null;
+				ServiceTags serviceTags;
 
 				try {
 					serviceTags = tagRetriever.retrieveTags(lastKnownVersion, lastActivationTimeInMillis);
@@ -635,12 +895,15 @@ public class RangerTagEnricher extends RangerAbstractContextEnricher {
 						if (!hasProvidedTagsToReceiver) {
 							serviceTags = loadFromCache();
 						}
-					} else {
+					} else if (!serviceTags.getIsDelta()) {
 						saveToCache(serviceTags);
 					}
 
 					if (serviceTags != null) {
 						tagEnricher.setServiceTags(serviceTags);
+						if (serviceTags.getIsDelta()) {
+							saveToCache(tagEnricher.enrichedServiceTags.serviceTags);
+						}
 						LOG.info("RangerTagRefresher.populateTags() - Updated tags-cache to new version of tags, lastKnownVersion=" + lastKnownVersion + "; newVersion="
 								+ (serviceTags.getTagVersion() == null ? -1L : serviceTags.getTagVersion()));
 						hasProvidedTagsToReceiver = true;
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/model/RangerPolicyDelta.java b/agents-common/src/main/java/org/apache/ranger/plugin/model/RangerPolicyDelta.java
index 74e7add..1b69d8d 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/model/RangerPolicyDelta.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/model/RangerPolicyDelta.java
@@ -43,8 +43,9 @@ public class RangerPolicyDelta implements java.io.Serializable {
     public static final int CHANGE_TYPE_SERVICE_DEF_CHANGE  = 4;
     public static final int CHANGE_TYPE_RANGER_ADMIN_START  = 5;
     public static final int CHANGE_TYPE_LOG_ERROR           = 6;
+    public static final int CHANGE_TYPE_INVALIDATE_POLICY_DELTAS = 7;
 
-    private static String[] changeTypeNames = { "POLICY_CREATE", "POLICY_UPDATE", "POLICY_DELETE", "SERVICE_CHANGE", "SERVICE_DEF_CHANGE", "RANGER_ADMIN_START", "LOG_ERROR" };
+    private static String[] changeTypeNames = { "POLICY_CREATE", "POLICY_UPDATE", "POLICY_DELETE", "SERVICE_CHANGE", "SERVICE_DEF_CHANGE", "RANGER_ADMIN_START", "LOG_ERROR", "INVALIDATE_POLICY_DELTAS" };
 
     private Long                id;
     private Integer             changeType;
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineImpl.java b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineImpl.java
index d33f5d3..c23a2d4 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineImpl.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyEngineImpl.java
@@ -73,9 +73,6 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 	private final RangerPolicyRepository policyRepository;
 	private final RangerPolicyRepository tagPolicyRepository;
 
-	private boolean isPolicyRepositoryShared = false;
-	private boolean isTagPolicyRepositoryShared = false;
-
 	private List<RangerContextEnricher> allContextEnrichers;
 
 	private boolean  useForwardedIPAddress;
@@ -91,7 +88,6 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 
 	public RangerPolicyEngineImpl(final RangerPolicyEngineImpl other, ServicePolicies servicePolicies) {
 
-		List<RangerPolicyDelta> deltas        = servicePolicies.getPolicyDeltas();
 		long                    policyVersion = servicePolicies.getPolicyVersion();
 
 		this.useForwardedIPAddress = other.useForwardedIPAddress;
@@ -103,32 +99,34 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 		List<RangerPolicyDelta> defaultZoneDeltasForTagPolicies = new ArrayList<>();
 
 		if (MapUtils.isNotEmpty(servicePolicies.getSecurityZones())) {
-			Map<String, List<RangerPolicyDelta>> zoneDeltasMap = new HashMap<>();
 
 			buildZoneTrie(servicePolicies);
 
+			Map<String, List<RangerPolicyDelta>> zoneDeltasMap = new HashMap<>();
+
 			for (Map.Entry<String, ServicePolicies.SecurityZoneInfo> zone : servicePolicies.getSecurityZones().entrySet()) {
-				zoneDeltasMap.put(zone.getKey(), new ArrayList<>());
-			}
-			for (RangerPolicyDelta delta : deltas) {
-				String zoneName = delta.getZoneName();
 
-				if (StringUtils.isNotEmpty(zoneName)) {
-					List<RangerPolicyDelta> zoneDeltas = zoneDeltasMap.get(zoneName);
-					if (zoneDeltas != null) {
+				List<RangerPolicyDelta> deltas = zone.getValue().getPolicyDeltas();
+
+				for (RangerPolicyDelta delta : deltas) {
+					String zoneName = delta.getZoneName();
+
+					if (StringUtils.isNotEmpty(zoneName)) {
+						List<RangerPolicyDelta> zoneDeltas = zoneDeltasMap.get(zoneName);
+						if (zoneDeltas == null) {
+							zoneDeltas = new ArrayList<>();
+							zoneDeltasMap.put(zoneName, zoneDeltas);
+						}
 						zoneDeltas.add(delta);
-					}
-				} else {
-					if (servicePolicies.getServiceDef().getName().equals(delta.getServiceType())) {
-						defaultZoneDeltas.add(delta);
 					} else {
-						defaultZoneDeltasForTagPolicies.add(delta);
+						LOG.warn("policyDelta : [" + delta + "] does not belong to any zone. Should not have come here.");
 					}
 				}
 			}
-			for (Map.Entry<String, ServicePolicies.SecurityZoneInfo> zone : servicePolicies.getSecurityZones().entrySet()) {
-				final String                 zoneName        = zone.getKey();
-				List<RangerPolicyDelta>      zoneDeltas      = zoneDeltasMap.get(zoneName);
+
+			for (Map.Entry<String, List<RangerPolicyDelta>> entry : zoneDeltasMap.entrySet()) {
+				final String             zoneName   = entry.getKey();
+				List<RangerPolicyDelta>  zoneDeltas = entry.getValue();
 
 				RangerPolicyRepository       otherRepository = other.policyRepositories.get(zoneName);
 				final RangerPolicyRepository policyRepository;
@@ -150,56 +148,51 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 						policyRepository = new RangerPolicyRepository(otherRepository, zoneDeltas, policyVersion);
 					}
 				} else {
-					policyRepository = otherRepository;
+					policyRepository = shareWith(otherRepository);
 				}
 
 				policyRepositories.put(zoneName, policyRepository);
 			}
-		} else {
-			for (RangerPolicyDelta delta : deltas) {
-				if (servicePolicies.getServiceDef().getName().equals(delta.getServiceType())) {
-					defaultZoneDeltas.add(delta);
-				} else {
-					defaultZoneDeltasForTagPolicies.add(delta);
-				}
+		}
+
+		List<RangerPolicyDelta> unzonedDeltas = servicePolicies.getPolicyDeltas();
+
+		for (RangerPolicyDelta delta : unzonedDeltas) {
+			if (servicePolicies.getServiceDef().getName().equals(delta.getServiceType())) {
+				defaultZoneDeltas.add(delta);
+			} else {
+				defaultZoneDeltasForTagPolicies.add(delta);
 			}
 		}
 
 		if (other.policyRepository != null && CollectionUtils.isNotEmpty(defaultZoneDeltas)) {
-			this.policyRepository      = new RangerPolicyRepository(other.policyRepository, defaultZoneDeltas, policyVersion);
+			this.policyRepository = new RangerPolicyRepository(other.policyRepository, defaultZoneDeltas, policyVersion);
 		} else {
-			this.policyRepository = other.policyRepository;
-			other.isPolicyRepositoryShared = true;
+			this.policyRepository = shareWith(other.policyRepository);
 		}
-		if (servicePolicies.getTagPolicies() == null) {
-			this.tagPolicyRepository = null;
-			if (other.tagPolicyRepository != null) {
-				other.isTagPolicyRepositoryShared = false;
-			}
-		} else {
-			if (CollectionUtils.isNotEmpty(defaultZoneDeltasForTagPolicies)) {
-				if (other.tagPolicyRepository != null) {
-					this.tagPolicyRepository = new RangerPolicyRepository(other.tagPolicyRepository, defaultZoneDeltasForTagPolicies, policyVersion);
-				} else {
-					// Only creates are expected
-					List<RangerPolicy> tagPolicies = new ArrayList<>();
-					for (RangerPolicyDelta delta : defaultZoneDeltasForTagPolicies) {
-						if (delta.getChangeType() == RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE) {
-							tagPolicies.add(delta.getPolicy());
-						} else {
-							LOG.warn("Expected changeType:[" + RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE + "], found policy-change-delta:[" + delta + "]");
-						}
+
+		if (servicePolicies.getTagPolicies() != null && CollectionUtils.isNotEmpty(defaultZoneDeltasForTagPolicies)) {
+			if (other.tagPolicyRepository == null) {
+				// Only creates are expected
+				List<RangerPolicy> tagPolicies = new ArrayList<>();
+				for (RangerPolicyDelta delta : defaultZoneDeltasForTagPolicies) {
+					if (delta.getChangeType() == RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE) {
+						tagPolicies.add(delta.getPolicy());
+					} else {
+						LOG.warn("Expected changeType:[" + RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE + "], found policy-change-delta:[" + delta + "]");
 					}
-					servicePolicies.getTagPolicies().setPolicies(tagPolicies);
-					this.tagPolicyRepository = new RangerPolicyRepository(other.policyRepository.getAppId(), servicePolicies.getTagPolicies(), other.policyRepository.getOptions(), this.pluginContext, servicePolicies.getServiceDef(), servicePolicies.getServiceName());
 				}
+				servicePolicies.getTagPolicies().setPolicies(tagPolicies);
+				this.tagPolicyRepository = new RangerPolicyRepository(other.policyRepository.getAppId(), servicePolicies.getTagPolicies(), other.policyRepository.getOptions(), this.pluginContext, servicePolicies.getServiceDef(), servicePolicies.getServiceName());
 			} else {
-				this.tagPolicyRepository = other.tagPolicyRepository;
-				other.isTagPolicyRepositoryShared = true;
+				this.tagPolicyRepository = new RangerPolicyRepository(other.tagPolicyRepository, defaultZoneDeltasForTagPolicies, policyVersion);
 			}
+		} else {
+			this.tagPolicyRepository = shareWith(other.tagPolicyRepository);
 		}
 
 		List<RangerContextEnricher> tmpList;
+
 		List<RangerContextEnricher> tagContextEnrichers = tagPolicyRepository == null ? null :tagPolicyRepository.getContextEnrichers();
 		List<RangerContextEnricher> resourceContextEnrichers = policyRepository.getContextEnrichers();
 
@@ -213,15 +206,16 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 		}
 		this.allContextEnrichers = tmpList;
 
-		// Initialize role-related information
-		userRoleMapping = MapUtils.isNotEmpty(servicePolicies.getUserRoles()) ? servicePolicies.getUserRoles() : null;
-		groupRoleMapping = MapUtils.isNotEmpty(servicePolicies.getGroupRoles()) ? servicePolicies.getGroupRoles() : null;
+                // Initialize role-related information
+                userRoleMapping = MapUtils.isNotEmpty(servicePolicies.getUserRoles()) ? servicePolicies.getUserRoles() : null;
+                groupRoleMapping = MapUtils.isNotEmpty(servicePolicies.getGroupRoles()) ? servicePolicies.getGroupRoles() : null;
 
 		reorderPolicyEvaluators();
 
 	}
 
 	public RangerPolicyEngineImpl(String appId, ServicePolicies servicePolicies, RangerPolicyEngineOptions options, RangerPluginContext rangerPluginContext) {
+
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("==> RangerPolicyEngineImpl(" + appId + ", " + servicePolicies + ", " + options + ", " + rangerPluginContext + ")");
 		}
@@ -280,7 +274,6 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 				LOG.debug("RangerPolicyEngineImpl : Building tag-policy-repository for tag-service " + tagPolicies.getServiceName());
 			}
 			tagPolicyRepository = new RangerPolicyRepository(appId, tagPolicies, options, this.pluginContext, servicePolicies.getServiceDef(), servicePolicies.getServiceName());
-
 		} else {
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("RangerPolicyEngineImpl : No tag-policy-repository for service " + servicePolicies.getServiceName());
@@ -312,9 +305,9 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 			}
 		}
 
-		// Initialize role-related information
-		userRoleMapping = MapUtils.isNotEmpty(servicePolicies.getUserRoles()) ? servicePolicies.getUserRoles() : null;
-		groupRoleMapping = MapUtils.isNotEmpty(servicePolicies.getGroupRoles()) ? servicePolicies.getGroupRoles() : null;
+                // Initialize role-related information
+                userRoleMapping = MapUtils.isNotEmpty(servicePolicies.getUserRoles()) ? servicePolicies.getUserRoles() : null;
+                groupRoleMapping = MapUtils.isNotEmpty(servicePolicies.getGroupRoles()) ? servicePolicies.getGroupRoles() : null;
 
 		RangerPerfTracer.log(perf);
 
@@ -341,9 +334,31 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 		if(RangerPerfTracer.isPerfTraceEnabled(PERF_POLICYENGINE_INIT_LOG)) {
 			perf = RangerPerfTracer.getPerfTracer(PERF_POLICYENGINE_INIT_LOG, "RangerPolicyEngine.cloneWithDelta()");
 		}
+
 		RangerServiceDef serviceDef = this.getServiceDef();
 		String serviceType = (serviceDef != null) ? serviceDef.getName() : "";
-		if (CollectionUtils.isNotEmpty(servicePolicies.getPolicyDeltas()) && RangerPolicyDeltaUtil.isValidDeltas(servicePolicies.getPolicyDeltas(), serviceType)) {
+
+		boolean isValidDeltas = false;
+
+		if (CollectionUtils.isNotEmpty(servicePolicies.getPolicyDeltas()) || MapUtils.isNotEmpty(servicePolicies.getSecurityZones())) {
+			isValidDeltas = CollectionUtils.isEmpty(servicePolicies.getPolicyDeltas()) || RangerPolicyDeltaUtil.isValidDeltas(servicePolicies.getPolicyDeltas(), serviceType);
+
+			if (isValidDeltas) {
+				if (MapUtils.isNotEmpty(servicePolicies.getSecurityZones())) {
+					for (Map.Entry<String, ServicePolicies.SecurityZoneInfo> entry : servicePolicies.getSecurityZones().entrySet()) {
+						if (!RangerPolicyDeltaUtil.isValidDeltas(entry.getValue().getPolicyDeltas(), serviceType)) {
+							if (LOG.isDebugEnabled()) {
+								LOG.debug("Invalid policy-deltas for security zone:[" + entry.getKey() + "]");
+							}
+							isValidDeltas = false;
+							break;
+						}
+					}
+				}
+			}
+		}
+
+		if (isValidDeltas) {
 			ret = new RangerPolicyEngineImpl(this, servicePolicies);
 		} else {
 			ret = null;
@@ -430,11 +445,11 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 
 		RangerAccessRequestUtil.setCurrentUserInContext(request.getContext(), request.getUser());
 
-		Set<String> roles = getRolesFromUserAndGroups(request.getUser(), request.getUserGroups());
+                Set<String> roles = getRolesFromUserAndGroups(request.getUser(), request.getUserGroups());
 
-		if (CollectionUtils.isNotEmpty(roles)) {
-			RangerAccessRequestUtil.setCurrentUserRolesInContext(request.getContext(), roles);
-		}
+                if (CollectionUtils.isNotEmpty(roles)) {
+                        RangerAccessRequestUtil.setCurrentUserRolesInContext(request.getContext(), roles);
+                }
 
 		String owner = request.getResource() != null ? request.getResource().getOwnerUser() : null;
 
@@ -461,6 +476,7 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 
 		}
 
+
 		if(LOG.isDebugEnabled()) {
 			LOG.debug("<== RangerPolicyEngineImpl.preProcess(" + request + ")");
 		}
@@ -693,22 +709,22 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 						}
 					}
 
-					for (Map.Entry<String, Map<String, PolicyACLSummary.AccessResult>> roleAccessInfo : aclSummary.getRolesAccessInfo().entrySet()) {
-						final String roleName = roleAccessInfo.getKey();
-
-						for (Map.Entry<String, PolicyACLSummary.AccessResult> accessInfo : roleAccessInfo.getValue().entrySet()) {
-							if (isConditional) {
-								accessResult = ACCESS_CONDITIONAL;
-							} else {
-								accessResult = accessInfo.getValue().getResult();
-								if (accessResult.equals(RangerPolicyEvaluator.ACCESS_UNDETERMINED)) {
-									accessResult = RangerPolicyEvaluator.ACCESS_DENIED;
-								}
-							}
-							RangerPolicy policy = evaluator.getPolicy();
-							ret.setRoleAccessInfo(roleName, accessInfo.getKey(), accessResult, policy);
-						}
-					}
+                                        for (Map.Entry<String, Map<String, PolicyACLSummary.AccessResult>> roleAccessInfo : aclSummary.getRolesAccessInfo().entrySet()) {
+                                                final String roleName = roleAccessInfo.getKey();
+
+                                                for (Map.Entry<String, PolicyACLSummary.AccessResult> accessInfo : roleAccessInfo.getValue().entrySet()) {
+                                                        if (isConditional) {
+                                                                accessResult = ACCESS_CONDITIONAL;
+                                                        } else {
+                                                                accessResult = accessInfo.getValue().getResult();
+                                                                if (accessResult.equals(RangerPolicyEvaluator.ACCESS_UNDETERMINED)) {
+                                                                        accessResult = RangerPolicyEvaluator.ACCESS_DENIED;
+                                                                }
+                                                        }
+                                                        RangerPolicy policy = evaluator.getPolicy();
+                                                        ret.setRoleAccessInfo(roleName, accessInfo.getKey(), accessResult, policy);
+                                                }
+                                        }
 				}
 			}
 			ret.finalizeAcls();
@@ -731,13 +747,20 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 			LOG.debug("==> RangerPolicyEngineImpl.preCleanup()");
 		}
 
-		if (policyRepository != null && !isPolicyRepositoryShared) {
+		if (policyRepository != null) {
 			policyRepository.preCleanup();
 		}
-		if (tagPolicyRepository != null && !isTagPolicyRepositoryShared) {
+
+		if (tagPolicyRepository != null) {
 			tagPolicyRepository.preCleanup();
 		}
 
+		if (MapUtils.isNotEmpty(this.policyRepositories)) {
+			for (Map.Entry<String, RangerPolicyRepository> entry : this.policyRepositories.entrySet()) {
+				entry.getValue().preCleanup();
+			}
+		}
+
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("<== RangerPolicyEngineImpl.preCleanup() : result=" + ret);
 		}
@@ -759,13 +782,20 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 		}
 		preCleanup();
 
-		if (policyRepository != null && !isPolicyRepositoryShared) {
+		if (policyRepository != null) {
 			policyRepository.cleanup();
 		}
-		if (tagPolicyRepository != null && !isTagPolicyRepositoryShared) {
+
+		if (tagPolicyRepository != null) {
 			tagPolicyRepository.cleanup();
 		}
 
+		if (MapUtils.isNotEmpty(this.policyRepositories)) {
+			for (Map.Entry<String, RangerPolicyRepository> entry : this.policyRepositories.entrySet()) {
+				entry.getValue().cleanup();
+			}
+		}
+
 		RangerPerfTracer.log(perf);
 
 		if (LOG.isDebugEnabled()) {
@@ -857,11 +887,11 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 			matchedRepositories.add(this.policyRepository);
 		}
 
-		Set<String> roles = getRolesFromUserAndGroups(user, userGroups);
+                Set<String> roles = getRolesFromUserAndGroups(user, userGroups);
 
 		for (RangerPolicyRepository policyRepository : matchedRepositories) {
 			for (RangerPolicyEvaluator evaluator : policyRepository.getLikelyMatchPolicyEvaluators(resource, RangerPolicy.POLICY_TYPE_ACCESS)) {
-				ret = evaluator.isAccessAllowed(resource, user, userGroups, roles, accessType);
+                                ret = evaluator.isAccessAllowed(resource, user, userGroups, roles, accessType);
 
 				if (ret) {
 					break;
@@ -891,14 +921,14 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 			LOG.debug("==> RangerPolicyEngineImpl.isAccessAllowed(" + policy.getId() + ", " + user + ", " + userGroups + ", " + accessType + ")");
 		}
 
-		boolean ret = isAccessAllowed(policy, user, userGroups, null, accessType);
+                boolean ret = isAccessAllowed(policy, user, userGroups, null, accessType);
 
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerPolicyEngineImpl.isAccessAllowed(" + policy.getId() + ", " + user + ", " + userGroups + ", " + accessType + ") : " + ret);
-		}
+                if (LOG.isDebugEnabled()) {
+                        LOG.debug("<== RangerPolicyEngineImpl.isAccessAllowed(" + policy.getId() + ", " + user + ", " + userGroups + ", " + accessType + ") : " + ret);
+                }
 
-		return ret;
-	}
+                return ret;
+        }
 
 	/*
 	 * This API is used by ranger-admin
@@ -915,7 +945,7 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 		RangerPerfTracer perf = null;
 
 		if(RangerPerfTracer.isPerfTraceEnabled(PERF_POLICYENGINE_REQUEST_LOG)) {
-			perf = RangerPerfTracer.getPerfTracer(PERF_POLICYENGINE_REQUEST_LOG, "RangerPolicyEngine.isAccessAllowed(user=" + user + "," + userGroups + ", roles=" + roles + ",accessType=" + accessType + ")");
+                        perf = RangerPerfTracer.getPerfTracer(PERF_POLICYENGINE_REQUEST_LOG, "RangerPolicyEngine.isAccessAllowed(user=" + user + "," + userGroups + ", roles=" + roles + ",accessType=" + accessType + ")");
 		}
 
 		String zoneName = trieMap == null ? null : policy.getZoneName();
@@ -941,7 +971,7 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 
 		for (RangerPolicyRepository policyRepository : matchedRepositories) {
 			for (RangerPolicyEvaluator evaluator : policyRepository.getPolicyEvaluators()) {
-				ret = evaluator.isAccessAllowed(policy, user, userGroups, roles, accessType);
+                                ret = evaluator.isAccessAllowed(policy, user, userGroups, roles, accessType);
 
 				if (ret) {
 					break;
@@ -955,7 +985,7 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 		RangerPerfTracer.log(perf);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerPolicyEngineImpl.isAccessAllowed(" + policy.getId() + ", " + user + ", " + userGroups + ", " + roles + ", " + accessType + "): " + ret);
+                        LOG.debug("<== RangerPolicyEngineImpl.isAccessAllowed(" + policy.getId() + ", " + user + ", " + userGroups + ", " + roles + ", " + accessType + "): " + ret);
 		}
 
 		return ret;
@@ -1295,33 +1325,33 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 		return ret;
 	}
 
-	@Override
-	public Set<String> getRolesFromUserAndGroups(String user, Set<String> groups) {
-		Set<String> allRoles = new HashSet<>();
-		if (MapUtils.isNotEmpty(userRoleMapping) && StringUtils.isNotEmpty(user)) {
-			Set<String> userRoles = userRoleMapping.get(user);
-			if (CollectionUtils.isNotEmpty(userRoles)) {
-				allRoles.addAll(userRoles);
-			}
-		}
+        @Override
+        public Set<String> getRolesFromUserAndGroups(String user, Set<String> groups) {
+                Set<String> allRoles = new HashSet<>();
+                if (MapUtils.isNotEmpty(userRoleMapping) && StringUtils.isNotEmpty(user)) {
+                        Set<String> userRoles = userRoleMapping.get(user);
+                        if (CollectionUtils.isNotEmpty(userRoles)) {
+                                allRoles.addAll(userRoles);
+                        }
+                }
 
-		if (MapUtils.isNotEmpty(groupRoleMapping)) {
-			if (CollectionUtils.isNotEmpty(groups)) {
-				for (String group : groups) {
-					Set<String> groupRoles = groupRoleMapping.get(group);
-					if (CollectionUtils.isNotEmpty(groupRoles)) {
-						allRoles.addAll(groupRoles);
-					}
-				}
-			}
-			Set<String> publicGroupRoles = groupRoleMapping.get(RangerPolicyEngine.GROUP_PUBLIC);
-			if (CollectionUtils.isNotEmpty(publicGroupRoles)) {
-				allRoles.addAll(publicGroupRoles);
-			}
-		}
+                if (MapUtils.isNotEmpty(groupRoleMapping)) {
+                        if (CollectionUtils.isNotEmpty(groups)) {
+                                for (String group : groups) {
+                                        Set<String> groupRoles = groupRoleMapping.get(group);
+                                        if (CollectionUtils.isNotEmpty(groupRoles)) {
+                                                allRoles.addAll(groupRoles);
+                                        }
+                                }
+                        }
+                        Set<String> publicGroupRoles = groupRoleMapping.get(RangerPolicyEngine.GROUP_PUBLIC);
+                        if (CollectionUtils.isNotEmpty(publicGroupRoles)) {
+                                allRoles.addAll(publicGroupRoles);
+                        }
+                }
 
-		return allRoles;
-	}
+                return allRoles;
+        }
 
 	public List<RangerPolicy> getResourcePolicies(String zoneName) {
 		RangerPolicyRepository zoneResourceRepository = policyRepositories.get(zoneName);
@@ -1730,6 +1760,57 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
         return ret;
     }
 
+    public boolean compare(RangerPolicyEngineImpl other) {
+		// Compare zones, policyRepositories and tagRepository
+		boolean ret;
+
+		if (policyRepository != null && other.policyRepository != null) {
+			ret = policyRepository .compare(other.policyRepository);
+		} else {
+			ret = policyRepository == other.policyRepository;
+		}
+
+		if (ret) {
+			if (tagPolicyRepository != null && other.tagPolicyRepository != null) {
+				ret = tagPolicyRepository.compare(other.tagPolicyRepository);
+			} else {
+				ret = tagPolicyRepository == other.tagPolicyRepository;
+			}
+			if (ret) {
+				if (trieMap != null && other.trieMap != null) {
+					//check trieMap
+					ret = trieMap.size() == other.trieMap.size();
+					if (ret) {
+						for (Map.Entry<String, RangerResourceTrie> entry : trieMap.entrySet()) {
+							ret = entry.getValue().compareSubtree(other.trieMap.get(entry.getKey()));
+							if (!ret) {
+								break;
+							}
+						}
+					}
+					ret = true;
+				} else {
+					ret = trieMap == other.trieMap;
+				}
+				if (ret) {
+					// Check policyRepositories
+					ret = policyRepositories.size() == other.policyRepositories.size();
+
+					if (ret) {
+						for (Map.Entry<String, RangerPolicyRepository> entry : policyRepositories.entrySet()) {
+							ret = entry.getValue().compare(other.policyRepositories.get(entry.getKey()));
+							if (!ret) {
+								break;
+							}
+						}
+					}
+				}
+			}
+		}
+
+		return ret;
+	}
+
     private String getMatchedZoneName(RangerAccessResource accessResource) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> RangerPolicyEngineImpl.getMatchedZoneName(" + accessResource + ")");
@@ -1897,4 +1978,11 @@ public class RangerPolicyEngineImpl implements RangerPolicyEngine {
 
         return ret;
     }
+
+	private static RangerPolicyRepository shareWith(RangerPolicyRepository other) {
+		if (other != null) {
+			other.setIsShared(true);
+		}
+		return other;
+	}
 }
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java
index aec325c..ae88c73 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/policyengine/RangerPolicyRepository.java
@@ -101,6 +101,7 @@ class RangerPolicyRepository {
     private final Map<String, RangerResourceTrie> rowFilterResourceTrie;
 
     private boolean                           isContextEnrichersShared = false;
+    private boolean                           isShared = false;
 
     RangerPolicyRepository(final RangerPolicyRepository other, final List<RangerPolicyDelta> deltas, long policyVersion) {
 
@@ -253,6 +254,7 @@ class RangerPolicyRepository {
 
         this.policyVersion = policyVersion;
 
+        other.isShared      = false;
     }
 
     RangerPolicyRepository(String appId, ServicePolicies servicePolicies, RangerPolicyEngineOptions options, RangerPluginContext pluginContext) {
@@ -303,7 +305,7 @@ class RangerPolicyRepository {
             this.accessAuditCache = null;
         }
 
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("RangerPolicyRepository : building policy-repository for service[" + serviceName + "], and zone:[" + zoneName + "] with auditMode[" + auditModeEnum + "]");
         }
 
@@ -315,7 +317,7 @@ class RangerPolicyRepository {
             this.contextEnrichers = null;
         }
 
-        if(options.disableTrieLookupPrefilter) {
+        if (options.disableTrieLookupPrefilter) {
             policyResourceTrie    = null;
             dataMaskResourceTrie  = null;
             rowFilterResourceTrie = null;
@@ -357,7 +359,7 @@ class RangerPolicyRepository {
 
         this.accessAuditCache = null;
 
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
             LOG.debug("RangerPolicyRepository : building tag-policy-repository for tag service:[" + serviceName +"], with auditMode[" + auditModeEnum +"]");
         }
 
@@ -369,7 +371,7 @@ class RangerPolicyRepository {
             this.contextEnrichers = null;
         }
 
-        if(options.disableTrieLookupPrefilter) {
+        if (options.disableTrieLookupPrefilter) {
             policyResourceTrie    = null;
             dataMaskResourceTrie  = null;
             rowFilterResourceTrie = null;
@@ -390,21 +392,25 @@ class RangerPolicyRepository {
     }
 
     boolean preCleanup() {
-        if (CollectionUtils.isNotEmpty(this.contextEnrichers) && !isContextEnrichersShared) {
-            for (RangerContextEnricher enricher : this.contextEnrichers) {
-                enricher.preCleanup();
+        if (!isShared) {
+            if (CollectionUtils.isNotEmpty(this.contextEnrichers) && !isContextEnrichersShared) {
+                for (RangerContextEnricher enricher : this.contextEnrichers) {
+                    enricher.preCleanup();
+                }
+                return true;
             }
-            return true;
         }
         return false;
     }
 
     void cleanup() {
-        preCleanup();
+        if (!isShared) {
+            preCleanup();
 
-        if (CollectionUtils.isNotEmpty(this.contextEnrichers) && !isContextEnrichersShared) {
-            for (RangerContextEnricher enricher : this.contextEnrichers) {
-                enricher.cleanup();
+            if (CollectionUtils.isNotEmpty(this.contextEnrichers) && !isContextEnrichersShared) {
+                for (RangerContextEnricher enricher : this.contextEnrichers) {
+                    enricher.cleanup();
+                }
             }
         }
     }
@@ -413,8 +419,7 @@ class RangerPolicyRepository {
     protected void finalize() throws Throwable {
         try {
             cleanup();
-        }
-        finally {
+        } finally {
             super.finalize();
         }
     }
@@ -424,15 +429,15 @@ class RangerPolicyRepository {
             LOG.debug("==> reorderEvaluators()");
         }
 
-        if(policyResourceTrie == null) {
+        if (policyResourceTrie == null) {
             policyEvaluators = getReorderedPolicyEvaluators(policyEvaluators);
         }
 
-        if(dataMaskResourceTrie == null) {
+        if (dataMaskResourceTrie == null) {
             dataMaskPolicyEvaluators = getReorderedPolicyEvaluators(dataMaskPolicyEvaluators);
         }
 
-        if(rowFilterResourceTrie == null) {
+        if (rowFilterResourceTrie == null) {
             rowFilterPolicyEvaluators = getReorderedPolicyEvaluators(rowFilterPolicyEvaluators);
         }
 
@@ -495,7 +500,7 @@ class RangerPolicyRepository {
     List<RangerContextEnricher> getContextEnrichers() { return contextEnrichers; }
 
     List<RangerPolicyEvaluator> getPolicyEvaluators(int policyType) {
-        switch(policyType) {
+        switch (policyType) {
             case RangerPolicy.POLICY_TYPE_ACCESS:
                 return getPolicyEvaluators();
             case RangerPolicy.POLICY_TYPE_DATAMASK:
@@ -550,7 +555,7 @@ class RangerPolicyRepository {
             }
 
             if (CollectionUtils.isNotEmpty(ret)) {
-                switch(policyType) {
+                switch (policyType) {
                     case RangerPolicy.POLICY_TYPE_ACCESS:
                         Collections.sort(ret, PolicyEvaluatorForTag.EVAL_ORDER_COMPARATOR);
                         break;
@@ -582,7 +587,7 @@ class RangerPolicyRepository {
     }
 
     List<RangerPolicyEvaluator> getLikelyMatchPolicyEvaluators(RangerAccessResource resource, int policyType) {
-        switch(policyType) {
+        switch (policyType) {
             case RangerPolicy.POLICY_TYPE_ACCESS:
                 return getLikelyMatchAccessPolicyEvaluators(resource);
             case RangerPolicy.POLICY_TYPE_DATAMASK:
@@ -597,7 +602,16 @@ class RangerPolicyRepository {
 
     Map<Long, RangerPolicyEvaluator> getPolicyEvaluatorsMap() { return policyEvaluatorsMap; }
 
-    RangerPolicyEvaluator getPolicyEvaluator(Long id) { return policyEvaluatorsMap.get(id); }
+    RangerPolicyEvaluator getPolicyEvaluator(Long id) {
+        return policyEvaluatorsMap.get(id);
+    }
+
+    void setIsShared(boolean isShared) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Setting shared flag from " + this.isShared + " to " + isShared);
+        }
+        this.isShared = isShared;
+    }
 
     private List<RangerPolicyEvaluator> getLikelyMatchAccessPolicyEvaluators(RangerAccessResource resource) {
        String resourceStr = resource == null ? null : resource.getAsString();
@@ -1370,6 +1384,36 @@ class RangerPolicyRepository {
         return ret;
     }
 
+    public boolean compare(RangerPolicyRepository other) {
+        return compareTrie(RangerPolicy.POLICY_TYPE_ACCESS, other) &&
+                compareTrie(RangerPolicy.POLICY_TYPE_DATAMASK, other) &&
+                compareTrie(RangerPolicy.POLICY_TYPE_ROWFILTER, other);
+    }
+
+    private boolean compareTrie(final int policyType, RangerPolicyRepository other) {
+        boolean ret;
+
+        Map<String, RangerResourceTrie> myTrie    = getTrie(policyType);
+        Map<String, RangerResourceTrie> otherTrie = other.getTrie(policyType);
+
+        ret = myTrie.size() == otherTrie.size();
+
+        if (ret) {
+            for (Map.Entry<String, RangerResourceTrie> entry : myTrie.entrySet()) {
+                RangerResourceTrie myResourceTrie    = entry.getValue();
+                RangerResourceTrie otherResourceTrie = otherTrie.get(entry.getKey());
+
+                ret = otherResourceTrie != null && myResourceTrie.compareSubtree(otherResourceTrie);
+
+                if (!ret) {
+                    break;
+                }
+            }
+        }
+
+        return ret;
+    }
+
     private StringBuilder toString(StringBuilder sb) {
 
         sb.append("RangerPolicyRepository={");
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/policyresourcematcher/RangerDefaultPolicyResourceMatcher.java b/agents-common/src/main/java/org/apache/ranger/plugin/policyresourcematcher/RangerDefaultPolicyResourceMatcher.java
index 633ec96..d2fc5d3 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/policyresourcematcher/RangerDefaultPolicyResourceMatcher.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/policyresourcematcher/RangerDefaultPolicyResourceMatcher.java
@@ -735,7 +735,7 @@ public class RangerDefaultPolicyResourceMatcher implements RangerPolicyResourceM
                 break;
             }
             case SELF_OR_ANCESTOR: {
-                ret = matchType == MatchType.SELF || matchType == MatchType.ANCESTOR;
+                ret = matchType == MatchType.SELF || matchType == MatchType.ANCESTOR || matchType == MatchType.ANCESTOR_WITH_WILDCARDS;
                 break;
             }
             case DESCENDANT: {
@@ -743,7 +743,7 @@ public class RangerDefaultPolicyResourceMatcher implements RangerPolicyResourceM
                 break;
             }
             case ANCESTOR: {
-                ret = matchType == MatchType.ANCESTOR;
+                ret = matchType == MatchType.ANCESTOR || matchType == MatchType.ANCESTOR_WITH_WILDCARDS;
                 break;
             }
             default: {
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
index 8de0329..cf833b7 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/service/RangerBasePlugin.java
@@ -320,7 +320,7 @@ public class RangerBasePlugin {
 					if (LOG.isDebugEnabled()) {
 						LOG.debug("policy-deltas are not null");
 					}
-					if (CollectionUtils.isNotEmpty(policies.getPolicyDeltas())) {
+					if (CollectionUtils.isNotEmpty(policies.getPolicyDeltas()) || MapUtils.isNotEmpty(policies.getSecurityZones())) {
 						if (LOG.isDebugEnabled()) {
 							LOG.debug("Non empty policy-deltas found. Cloning engine using policy-deltas");
 						}
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/store/TagStore.java b/agents-common/src/main/java/org/apache/ranger/plugin/store/TagStore.java
index fe4b278..795289b 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/store/TagStore.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/store/TagStore.java
@@ -130,10 +130,13 @@ public interface TagStore {
     PList<RangerTagResourceMap> getPaginatedTagResourceMaps(SearchFilter filter) throws Exception;
 
 
-    ServiceTags getServiceTagsIfUpdated(String serviceName, Long lastKnownVersion) throws Exception;
-    ServiceTags getServiceTags(String serviceName) throws Exception;
+    ServiceTags getServiceTagsIfUpdated(String serviceName, Long lastKnownVersion, boolean needsBackwardCompatibility) throws Exception;
+    ServiceTags getServiceTags(String serviceName, Long lastKnownVersion) throws Exception;
+    ServiceTags getServiceTagsDelta(String serviceName, Long lastKnownVersion) throws Exception;
+
 
     Long getTagVersion(String serviceName);
 
     void deleteAllTagObjectsForService(String serviceName) throws Exception;
+
 }
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/store/TagValidator.java b/agents-common/src/main/java/org/apache/ranger/plugin/store/TagValidator.java
index c334e39..08b1e45 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/store/TagValidator.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/store/TagValidator.java
@@ -77,6 +77,10 @@ public class TagValidator {
 			throw new Exception("Attempt to update nonexistant tag, id=" + id);
 		}
 
+		if (!StringUtils.equals(tag.getType(), existing.getType())) {
+			throw new Exception("Attempt to change the tag-type");
+		}
+
 		tag.setId(existing.getId());
 		tag.setGuid(existing.getGuid());
 	}
@@ -91,6 +95,10 @@ public class TagValidator {
 			throw new Exception("Attempt to update nonexistent tag, guid=" + guid);
 		}
 
+		if (!StringUtils.equals(tag.getType(), existing.getType())) {
+			throw new Exception("Attempt to change the tag-type");
+		}
+
 		tag.setId(existing.getId());
 		tag.setGuid(existing.getGuid());
 	}
@@ -139,9 +147,10 @@ public class TagValidator {
 			ret = tagStore.getServiceResourceByGuid(guid);
 		}
 
-		RangerServiceResourceSignature serializer = new RangerServiceResourceSignature(resource);
-
-		resource.setResourceSignature(serializer.getSignature());
+		if (ret == null) {
+			RangerServiceResourceSignature serializer = new RangerServiceResourceSignature(resource);
+			resource.setResourceSignature(serializer.getSignature());
+		}
 
 		return ret;
 	}
@@ -160,6 +169,10 @@ public class TagValidator {
 			throw new Exception("Attempt to update nonexistent resource, id=" + id);
 		}
 
+		if (!StringUtils.equals(existing.getServiceName(), resource.getServiceName())) {
+			throw new Exception("Attempt to change service-name for existing service-resource");
+		}
+
 		RangerServiceResourceSignature serializer = new RangerServiceResourceSignature(resource);
 
 		resource.setId(existing.getId());
@@ -177,6 +190,10 @@ public class TagValidator {
 			throw new Exception("Attempt to update nonexistent resource, guid=" + guid);
 		}
 
+		if (!StringUtils.equals(existing.getServiceName(), resource.getServiceName())) {
+			throw new Exception("Attempt to change service-name for existing service-resource");
+		}
+
 		RangerServiceResourceSignature serializer = new RangerServiceResourceSignature(resource);
 
 		resource.setId(existing.getId());
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerRESTUtils.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerRESTUtils.java
index 310f69d..bdb77e7 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerRESTUtils.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerRESTUtils.java
@@ -72,6 +72,7 @@ public class RangerRESTUtils {
 	
 	public static final String REST_PARAM_CLUSTER_NAME   = "clusterName";
 	public static final String REST_PARAM_SUPPORTS_POLICY_DELTAS   = "supportsPolicyDeltas";
+	public static final String REST_PARAM_SUPPORTS_TAG_DELTAS      = "supportsTagDeltas";
 
 	public static final String REST_PARAM_ZONE_NAME		 = "zoneName";
 	public static final String REST_PARAM_EXEC_USER      = "execUser";
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerResourceTrie.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerResourceTrie.java
index 605a74a..0cba882 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerResourceTrie.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerResourceTrie.java
@@ -220,6 +220,82 @@ public class RangerResourceTrie<T extends RangerPolicyResourceEvaluator> {
         }
     }
 
+    public boolean compareSubtree(RangerResourceTrie<T> other) {
+
+        final boolean ret;
+        List<TrieNode<T>> mismatchedNodes = new ArrayList<>();
+
+        if (this.root == null || other.root == null) {
+            ret = this.root == other.root;
+            if (!ret) {
+                mismatchedNodes.add(this.root);
+            }
+        } else {
+            ret = compareSubtree(this.root, other.root, mismatchedNodes);
+        }
+        return ret;
+    }
+
+    private boolean compareSubtree(TrieNode<T> me, TrieNode<T> other, List<TrieNode<T>> misMatched) {
+        boolean ret = StringUtils.equals(me.getStr(), other.getStr());
+
+        if (ret) {
+            Map<Character, TrieNode<T>> myChildren = me.getChildren();
+            Map<Character, TrieNode<T>> otherChildren = other.getChildren();
+
+            ret = myChildren.size() == otherChildren.size() &&
+                    compareLists(me.getEvaluators(), other.getEvaluators()) &&
+                    compareLists(me.getWildcardEvaluators(), other.getWildcardEvaluators()) &&
+                    myChildren.keySet().size() == otherChildren.keySet().size();
+            if (ret) {
+                // Check if subtrees match
+                for (Map.Entry<Character, TrieNode<T>> entry : myChildren.entrySet()) {
+                    Character c = entry.getKey();
+                    TrieNode<T> myNode = entry.getValue();
+                    TrieNode<T> otherNode = otherChildren.get(c);
+                    ret = otherNode != null && compareSubtree(myNode, otherNode, misMatched);
+                    if (!ret) {
+                        break;
+                    }
+                }
+            }
+        }
+
+        if (!ret) {
+            misMatched.add(me);
+        }
+
+        return ret;
+    }
+
+    private boolean compareLists(List<? extends RangerPolicyResourceEvaluator> me, List<? extends RangerPolicyResourceEvaluator> other) {
+        boolean ret;
+
+        if (me == null || other == null) {
+            ret = me == other;
+        } else {
+            ret = me.size() == other.size();
+
+            if (ret) {
+                List<Long> myIds = new ArrayList<>();
+                List<Long> otherIds = new ArrayList<>();
+                for (RangerPolicyResourceEvaluator evaluator : me) {
+                    myIds.add(evaluator.getId());
+                }
+                for (RangerPolicyResourceEvaluator evaluator : other) {
+                    otherIds.add(evaluator.getId());
+                }
+
+                ret = compareLongLists(myIds, otherIds);
+            }
+        }
+        return ret;
+    }
+
+    private boolean compareLongLists(List<Long> me, List<Long> other) {
+        return me.size() == CollectionUtils.intersection(me, other).size();
+    }
+
     private TrieNode<T> copyTrieSubtree(TrieNode<T> source, List<T> parentWildcardEvaluators) {
         if (TRACE_LOG.isTraceEnabled()) {
             StringBuilder sb = new StringBuilder();
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerServiceTagsDeltaUtil.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerServiceTagsDeltaUtil.java
new file mode 100644
index 0000000..6b70b22
--- /dev/null
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/RangerServiceTagsDeltaUtil.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.plugin.util;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.plugin.model.RangerTag;
+import org.apache.ranger.plugin.model.RangerTagDef;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class RangerServiceTagsDeltaUtil {
+
+    private static final Log LOG = LogFactory.getLog(RangerServiceTagsDeltaUtil.class);
+
+    private static final Log PERF_TAGS_DELTA_LOG = RangerPerfTracer.getPerfLogger("tags.delta");
+
+    /*
+    It should be possible to call applyDelta() multiple times with serviceTags and delta resulting from previous call to applyDelta()
+    The end result should be same if called once or multiple times.
+     */
+    static public ServiceTags applyDelta(ServiceTags serviceTags, ServiceTags delta) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> RangerServiceTagsDeltaUtil.applyDelta()");
+        }
+        RangerPerfTracer perf = null;
+
+        if(RangerPerfTracer.isPerfTraceEnabled(PERF_TAGS_DELTA_LOG)) {
+            perf = RangerPerfTracer.getPerfTracer(PERF_TAGS_DELTA_LOG, "RangerServiceTagsDeltaUtil.applyDelta()");
+        }
+        if (serviceTags != null && !serviceTags.getIsDelta() && delta != null && delta.getIsDelta()) {
+            serviceTags.setServiceName(delta.getServiceName());
+            serviceTags.setTagVersion(delta.getTagVersion());
+
+            // merge
+            Map<Long, RangerTag>        tags             = serviceTags.getTags();
+            List<RangerServiceResource> serviceResources = serviceTags.getServiceResources();
+            Map<Long, List<Long>>       resourceToTagIds = serviceTags.getResourceToTagIds();
+            boolean                     isAnyMaterialChange = false;
+
+            for (Map.Entry<Long, RangerTag> tagEntry : delta.getTags().entrySet()) {
+                if (StringUtils.isEmpty(tagEntry.getValue().getType())) {
+                    if (null != tags.remove(tagEntry.getKey())) {
+                        isAnyMaterialChange = true;
+                    }
+                } else {
+                    tags.put(tagEntry.getKey(), tagEntry.getValue());
+                }
+            }
+
+            // This could be expensive - compute time is M x N ( M - number of old tagged service resources, N - number of changed service resources)
+
+            Map<Long, Long>             deletedServiceResources  = new HashMap<>();
+            List<RangerServiceResource> addedServiceResources    = new ArrayList<>();
+
+            for (RangerServiceResource serviceResource : delta.getServiceResources()) {
+
+                boolean                         found = false;
+                Iterator<RangerServiceResource> iter  = serviceResources.iterator();
+
+                while (iter.hasNext()) {
+                    RangerServiceResource existingResource = iter.next();
+
+                    if (serviceResource.getId().equals(existingResource.getId())) {
+                        if (!StringUtils.isEmpty(serviceResource.getResourceSignature())) {
+                            if (!serviceResource.getResourceSignature().equals(existingResource.getResourceSignature())) { // ServiceResource changed
+                                iter.remove();
+                                existingResource.setResourceSignature(null);
+                                addedServiceResources.add(existingResource);
+                                break;
+                            }
+                        } else {
+                            iter.remove();
+                            deletedServiceResources.put(serviceResource.getId(), serviceResource.getId());
+                        }
+                        found = true;
+                        break;
+                    }
+                }
+                if (!found) {
+                    if (StringUtils.isNotEmpty(serviceResource.getResourceSignature())) {
+                        serviceResources.add(serviceResource);
+                        isAnyMaterialChange = true;
+                    }
+                }
+            }
+
+            for (Long deletedServiceResourceId : deletedServiceResources.keySet()) {
+                resourceToTagIds.remove(deletedServiceResourceId);
+            }
+
+            resourceToTagIds.putAll(delta.getResourceToTagIds());
+
+            // Ensure that any modified service-resources are at head of list of service-resources in delta
+            // So that in in setServiceTags(), they get cleaned out first, and service-resource with new spec gets added
+
+            addedServiceResources.addAll(delta.getServiceResources());
+            delta.setServiceResources(addedServiceResources);
+
+            if (!isAnyMaterialChange) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("No material change may have occurred because of applying this delta");
+                }
+            }
+
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Cannot apply deltas to service-tags as one of preconditions is violated. Returning received serviceTags without applying delta!!");
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== RangerServiceTagsDeltaUtil.applyDelta()");
+        }
+
+        RangerPerfTracer.log(perf);
+
+        return serviceTags;
+    }
+
+    public static void pruneUnusedAttributes(ServiceTags serviceTags) {
+        if (serviceTags != null) {
+            serviceTags.setTagUpdateTime(null);
+
+            for (Map.Entry<Long, RangerTagDef> entry : serviceTags.getTagDefinitions().entrySet()) {
+                RangerTagDef tagDef = entry.getValue();
+                tagDef.setCreatedBy(null);
+                tagDef.setCreateTime(null);
+                tagDef.setUpdatedBy(null);
+                tagDef.setUpdateTime(null);
+                tagDef.setGuid(null);
+            }
+
+            for (Map.Entry<Long, RangerTag> entry : serviceTags.getTags().entrySet()) {
+                RangerTag tag = entry.getValue();
+                tag.setCreatedBy(null);
+                tag.setCreateTime(null);
+                tag.setUpdatedBy(null);
+                tag.setUpdateTime(null);
+                tag.setGuid(null);
+            }
+
+            for (RangerServiceResource serviceResource : serviceTags.getServiceResources()) {
+                serviceResource.setCreatedBy(null);
+                serviceResource.setCreateTime(null);
+                serviceResource.setUpdatedBy(null);
+                serviceResource.setUpdateTime(null);
+                serviceResource.setGuid(null);
+
+                serviceResource.setServiceName(null);
+            }
+        }
+    }
+}
diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/ServiceTags.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/ServiceTags.java
index fed3f12..9e8e0cf 100644
--- a/agents-common/src/main/java/org/apache/ranger/plugin/util/ServiceTags.java
+++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/ServiceTags.java
@@ -50,6 +50,9 @@ public class ServiceTags implements java.io.Serializable {
 	public static final String OP_DELETE        = "delete";
 	public static final String OP_REPLACE       = "replace";
 
+	public enum TagsChangeExtent { NONE, TAGS, SERVICE_RESOURCE, ALL }
+	public enum TagsChangeType { NONE, SERVICE_RESOURCE_UPDATE, TAG_UPDATE, TAG_RESOURCE_MAP_UPDATE, RANGER_ADMIN_START, INVALIDATE_TAG_DELTAS, ALL }
+
 	private String                      op = OP_ADD_OR_UPDATE;
 	private String                      serviceName;
 	private Long                        tagVersion;
@@ -58,6 +61,8 @@ public class ServiceTags implements java.io.Serializable {
 	private Map<Long, RangerTag>        tags;
 	private List<RangerServiceResource> serviceResources;
 	private Map<Long, List<Long>>       resourceToTagIds;
+	private Boolean					 	isDelta;
+	private TagsChangeExtent			tagsChangeExtent;
 
 	public ServiceTags() {
 		this(OP_ADD_OR_UPDATE, null, 0L, null, null, null, null, null);
@@ -65,6 +70,10 @@ public class ServiceTags implements java.io.Serializable {
 
 	public ServiceTags(String op, String serviceName, Long tagVersion, Date tagUpdateTime, Map<Long, RangerTagDef> tagDefinitions,
 					   Map<Long, RangerTag> tags, List<RangerServiceResource> serviceResources, Map<Long, List<Long>> resourceToTagIds) {
+		this(op, serviceName, tagVersion, tagUpdateTime, tagDefinitions, tags, serviceResources, resourceToTagIds, false, TagsChangeExtent.ALL);
+	}
+	public ServiceTags(String op, String serviceName, Long tagVersion, Date tagUpdateTime, Map<Long, RangerTagDef> tagDefinitions,
+					   Map<Long, RangerTag> tags, List<RangerServiceResource> serviceResources, Map<Long, List<Long>> resourceToTagIds, Boolean isDelta, TagsChangeExtent tagsChangeExtent) {
 		setOp(op);
 		setServiceName(serviceName);
 		setTagVersion(tagVersion);
@@ -73,6 +82,8 @@ public class ServiceTags implements java.io.Serializable {
 		setTags(tags);
 		setServiceResources(serviceResources);
 		setResourceToTagIds(resourceToTagIds);
+		setIsDelta(isDelta);
+		setTagsChangeExtent(tagsChangeExtent);
 	}
 	/**
 	 * @return the op
@@ -162,6 +173,21 @@ public class ServiceTags implements java.io.Serializable {
 		this.resourceToTagIds = resourceToTagIds == null ? new HashMap<Long, List<Long>>() : resourceToTagIds;
 	}
 
+	public Boolean getIsDelta() {
+		return isDelta == null ? Boolean.FALSE : isDelta;
+	}
+
+	public void setIsDelta(Boolean isDelta) {
+		this.isDelta = isDelta;
+	}
+
+	public TagsChangeExtent getTagsChangeExtent() {
+		return tagsChangeExtent;
+	}
+
+	public void setTagsChangeExtent(TagsChangeExtent tagsChangeExtent) {
+		this.tagsChangeExtent = tagsChangeExtent;
+	}
 	@Override
 	public String toString( ) {
 		StringBuilder sb = new StringBuilder();
@@ -177,6 +203,8 @@ public class ServiceTags implements java.io.Serializable {
 				.append("serviceName=").append(serviceName).append(", ")
 				.append("tagVersion=").append(tagVersion).append(", ")
 				.append("tagUpdateTime={").append(tagUpdateTime).append("}")
+				.append("isDelta={").append(isDelta).append("}")
+				.append("tagsChangeExtent={").append(tagsChangeExtent).append("}")
 				.append("}");
 
 		return sb;
diff --git a/agents-common/src/test/java/org/apache/ranger/plugin/policyengine/TestPolicyEngine.java b/agents-common/src/test/java/org/apache/ranger/plugin/policyengine/TestPolicyEngine.java
index 7180675..d0e0cfc 100644
--- a/agents-common/src/test/java/org/apache/ranger/plugin/policyengine/TestPolicyEngine.java
+++ b/agents-common/src/test/java/org/apache/ranger/plugin/policyengine/TestPolicyEngine.java
@@ -405,6 +405,7 @@ public class TestPolicyEngine {
 		assertTrue("invalid input: " + testName, testCase != null && testCase.serviceDef != null && testCase.policies != null && testCase.tests != null);
 
 		ServicePolicies servicePolicies = new ServicePolicies();
+		servicePolicies.setPolicyVersion(100L);
 		servicePolicies.setServiceName(testCase.serviceName);
 		servicePolicies.setServiceDef(testCase.serviceDef);
 		servicePolicies.setPolicies(testCase.policies);
@@ -460,13 +461,12 @@ public class TestPolicyEngine {
 
 		runTestCaseTests(policyEngine, policyEngineForResourceAccessInfo, testCase.serviceDef, testName, testCase.tests);
 
-		if (CollectionUtils.isNotEmpty(testCase.updatedPolicies)) {
-			servicePolicies.setPolicyDeltas(testCase.updatedPolicies);
-			servicePolicies.setPolicyVersion(-1L);
+		if (testCase.updatedPolicies != null) {
+			servicePolicies.setPolicyDeltas(testCase.updatedPolicies.policyDeltas);
+			servicePolicies.setSecurityZones(testCase.updatedPolicies.securityZones);
 			RangerPolicyEngine updatedPolicyEngine = policyEngine.cloneWithDelta(servicePolicies);
 			RangerPolicyEngine updatedPolicyEngineForResourceAccessInfo = policyEngineForResourceAccessInfo.cloneWithDelta(servicePolicies);
 			runTestCaseTests(updatedPolicyEngine, updatedPolicyEngineForResourceAccessInfo, testCase.serviceDef, testName, testCase.updatedTests);
-
 		}
 	}
 
@@ -605,7 +605,7 @@ public class TestPolicyEngine {
 		public String             auditMode;
 		public List<TestData>     tests;
 
-		public List<RangerPolicyDelta> updatedPolicies;
+		public UpdatedPolicies    updatedPolicies;
 		public List<TestData>     updatedTests;
 		
 		class TestData {
@@ -624,6 +624,11 @@ public class TestPolicyEngine {
 		}
 	}
 
+	static class UpdatedPolicies {
+		public Map<String, ServicePolicies.SecurityZoneInfo> securityZones;
+		public List<RangerPolicyDelta>                       policyDeltas;
+	}
+
     static class ValiditySchedulerTestResult {
         boolean isValid;
         int validationFailureCount;
diff --git a/agents-common/src/test/java/org/apache/ranger/plugin/policyengine/TestPolicyEngineComparison.java b/agents-common/src/test/java/org/apache/ranger/plugin/policyengine/TestPolicyEngineComparison.java
new file mode 100644
index 0000000..b0a12c3
--- /dev/null
+++ b/agents-common/src/test/java/org/apache/ranger/plugin/policyengine/TestPolicyEngineComparison.java
@@ -0,0 +1,191 @@
+package org.apache.ranger.plugin.policyengine;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+import java.util.List;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+import org.apache.ranger.plugin.contextenricher.RangerTagEnricher;
+import org.apache.ranger.plugin.util.ServicePolicies;
+import org.apache.ranger.plugin.util.ServiceTags;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestPolicyEngineComparison {
+    private static Gson gsonBuilder;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        gsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z")
+                .setPrettyPrinting()
+                .registerTypeAdapter(RangerAccessResource.class, new RangerResourceDeserializer())
+                .create();
+
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @Test
+    public void testComparePolicyEngines() throws Exception {
+        String[] tests = {"/policyengine/test_compare_policyengines.json"};
+
+        runTestsFromResourceFiles(tests);
+    }
+
+    private void runTestsFromResourceFiles(String[] resourceNames) throws Exception {
+        for (String resourceName : resourceNames) {
+            InputStream inStream = this.getClass().getResourceAsStream(resourceName);
+            InputStreamReader reader = new InputStreamReader(inStream);
+
+            runTests(reader, resourceName);
+        }
+    }
+    private void runTests(InputStreamReader reader, String testName) throws Exception {
+
+        ComparisonTests testCases = gsonBuilder.fromJson(reader, ComparisonTests.class);
+
+        assertTrue("invalid input: " + testName, testCases != null && testCases.testCases != null);
+
+        RangerPolicyEngineOptions options = new RangerPolicyEngineOptions();
+        options.optimizeTrieForRetrieval = true;
+
+
+        for (ComparisonTests.TestCase testCase : testCases.testCases) {
+
+            assertTrue("invalid input: " + testCase.name ,testCase.me != null && testCase.other != null);
+
+            ComparisonTests.TestCase.PolicyEngineData myData = testCase.me;
+            ComparisonTests.TestCase.PolicyEngineData otherData = testCase.other;
+
+            assertFalse("invalid input: " + testCase.name, myData.servicePoliciesFile == null || otherData.servicePoliciesFile == null);
+            assertTrue("invalid input: " + testCase.name, myData.serviceTagsFile == null || otherData.serviceTagsFile != null);
+
+            // Read servicePoliciesFile
+            ServicePolicies myServicePolicies = readServicePolicies(myData.servicePoliciesFile);
+            ServicePolicies otherServicePolicies = readServicePolicies(otherData.servicePoliciesFile);
+
+            assertFalse("invalid input: " + testCase.name, myServicePolicies == null || otherServicePolicies == null);
+
+            ServiceTags myServiceTags = null;
+            ServiceTags otherServiceTags = null;
+
+            if (myData.serviceTagsFile != null) {
+                myServiceTags = readServiceTags(myData.serviceTagsFile);
+                otherServiceTags = readServiceTags(otherData.serviceTagsFile);
+
+                assertFalse("invalid input: " + testCase.name, myServiceTags == null || otherServiceTags == null);
+            }
+
+            boolean isPolicyEnginesEqual = true;
+            boolean isTagsEqual = true;
+
+            if (myServicePolicies != null) {
+                RangerPolicyEngineImpl myPolicyEngine = new RangerPolicyEngineImpl("test-compare-my-engine", myServicePolicies, options, null);
+                RangerPolicyEngineImpl otherPolicyEngine = new RangerPolicyEngineImpl("test-compare-other-engine", otherServicePolicies, options, null);
+
+                isPolicyEnginesEqual = myPolicyEngine.compare(otherPolicyEngine) && otherPolicyEngine.compare(myPolicyEngine);
+
+
+                if (myServiceTags != null) {
+                    RangerTagEnricher myTagEnricher = new RangerTagEnricher();
+                    RangerTagEnricher otherTagEnricher = new RangerTagEnricher();
+
+                    myTagEnricher.setAppId("test-compare-my-tags");
+                    myTagEnricher.setServiceDef(myServicePolicies.getServiceDef());
+                    myTagEnricher.setServiceName(myServiceTags.getServiceName());
+                    myTagEnricher.setServiceTags(myServiceTags);
+
+                    otherTagEnricher.setAppId("test-compare-other-tags");
+                    otherTagEnricher.setServiceDef(myServicePolicies.getServiceDef());
+                    otherTagEnricher.setServiceName(otherServiceTags.getServiceName());
+                    otherTagEnricher.setServiceTags(otherServiceTags);
+
+                    isTagsEqual = myTagEnricher.compare(otherTagEnricher) && otherTagEnricher.compare(myTagEnricher);
+
+                }
+            }
+            assertEquals("PolicyEngines are not equal " + testCase.name, isPolicyEnginesEqual, testCase.isPolicyEnginesEqual);
+            assertEquals("Tags are not equal " + testCase.name,isTagsEqual, testCase.isTagsEqual);
+        }
+
+    }
+
+    private ServicePolicies readServicePolicies(String fileName) {
+        InputStream inStream = this.getClass().getResourceAsStream(fileName);
+        InputStreamReader reader = new InputStreamReader(inStream);
+        return gsonBuilder.fromJson(reader, ServicePolicies.class);
+    }
+
+    private ServiceTags readServiceTags(String fileName) {
+        InputStream inStream = this.getClass().getResourceAsStream(fileName);
+        InputStreamReader reader = new InputStreamReader(inStream);
+        return gsonBuilder.fromJson(reader, ServiceTags.class);
+    }
+
+    static class ComparisonTests {
+        List<TestCase> testCases;
+
+        class TestCase {
+            String          name;
+            PolicyEngineData me;
+            PolicyEngineData other;
+            boolean          isPolicyEnginesEqual;
+            boolean          isTagsEqual;
+
+            class PolicyEngineData {
+                String servicePoliciesFile;
+                String serviceTagsFile;
+            }
+        }
+    }
+
+    static class RangerResourceDeserializer implements JsonDeserializer<RangerAccessResource> {
+        @Override
+        public RangerAccessResource deserialize(JsonElement jsonObj, Type type,
+                                                JsonDeserializationContext context) throws JsonParseException {
+            return gsonBuilder.fromJson(jsonObj, RangerAccessResourceImpl.class);
+        }
+    }
+}
\ No newline at end of file
diff --git a/agents-common/src/test/resources/policyengine/comparison/fail/myServicePolicies.json b/agents-common/src/test/resources/policyengine/comparison/fail/myServicePolicies.json
new file mode 100644
index 0000000..5b79968
--- /dev/null
+++ b/agents-common/src/test/resources/policyengine/comparison/fail/myServicePolicies.json
@@ -0,0 +1,177 @@
+{
+  "serviceName": "hivedev",
+  "serviceDef": {
+    "name": "hive",
+    "id": 3,
+    "resources": [
+      {
+        "name": "database",
+        "level": 1,
+        "mandatory": true,
+        "lookupSupported": true,
+        "matcher": "org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions": {
+          "wildCard": true,
+          "ignoreCase": true
+        },
+        "isValidLeaf": true,
+        "label": "Hive Database",
+        "description": "Hive Database"
+      },
+      {
+        "name": "table",
+        "level": 2,
+        "parent": "database",
+        "mandatory": true,
+        "lookupSupported": true,
+        "matcher": "org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions": {
+          "wildCard": true,
+          "ignoreCase": true
+        },
+        "isValidLeaf": true,
+        "label": "Hive Table",
+        "description": "Hive Table"
+      },
+      {
+        "name": "column",
+        "level": 3,
+        "parent": "table",
+        "mandatory": true,
+        "lookupSupported": true,
+        "matcher": "org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions": {
+          "wildCard": true,
+          "ignoreCase": true
+        },
+        "label": "Hive Column",
+        "description": "Hive Column"
+      }
+    ],
+    "accessTypes": [
+      {
+        "name": "select",
+        "label": "Select"
+      },
+      {
+        "name": "update",
+        "label": "Update"
+      },
+      {
+        "name": "create",
+        "label": "Create"
+      },
+      {
+        "name": "drop",
+        "label": "Drop"
+      },
+      {
+        "name": "alter",
+        "label": "Alter"
+      },
+      {
+        "name": "index",
+        "label": "Index"
+      },
+      {
+        "name": "lock",
+        "label": "Lock"
+      },
+      {
+        "name": "all",
+        "label": "All",
+        "impliedGrants": [
+          "select",
+          "update",
+          "create",
+          "drop",
+          "alter",
+          "index",
+          "lock"
+        ]
+      }
+    ]
+  },
+  "policies": [
+    {
+      "id": 1,
+      "name": "db=default: audit-all-access",
+      "isEnabled": true,
+      "isAuditEnabled": true,
+      "resources": {
+        "database": {
+          "values": [
+            "default"
+          ]
+        },
+        "table": {
+          "values": [
+            "*"
+          ]
+        },
+        "column": {
+          "values": [
+            "*"
+          ]
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [],
+          "users": [],
+          "groups": [
+            "public"
+          ],
+          "delegateAdmin": false
+        }
+      ]
+    },
+    {
+      "id": 2,
+      "name": "db=default",
+      "isEnabled": true,
+      "isAuditEnabled": true,
+      "resources": {
+        "database": {
+          "values": [
+            "default"
+          ]
+        },
+        "table": {
+          "values": [
+            "my-table"
+          ]
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {
+              "type": "create",
+              "isAllowed": true
+            },
+            {
+              "type": "drop",
+              "isAllowed": true
+            },
+            {
+              "type": "select",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "user1"
+          ],
+          "groups": [
+            "role-group1"
+          ],
+          "roles": [
+            "fin-admin",
+            "fin-group"
+          ],
+          "delegateAdmin": false
+        }
+      ]
+    }
+  ]
+}
diff --git a/agents-common/src/test/resources/policyengine/comparison/fail/myServiceTags.json b/agents-common/src/test/resources/policyengine/comparison/fail/myServiceTags.json
new file mode 100644
index 0000000..fe57135
--- /dev/null
+++ b/agents-common/src/test/resources/policyengine/comparison/fail/myServiceTags.json
@@ -0,0 +1,55 @@
+{
+  "op": "add_or_update",
+  "serviceName": "cl1_hive",
+  "tagVersion": 1,
+  "tagUpdateTime": "20150924-22:26:33.000-+0000",
+  "tagDefinitions": {
+     "1":{"id":1, "guid":"tagdef-1", "name":"PII",        "attributeDefs":[], "owner":0},
+     "2":{"id":2, "guid":"tagdef-2", "name":"EXPIRES_ON", "attributeDefs":[{"name":"expiry_date", "type":"date"}], "owner":0},
+     "3":{"id":3, "guid":"tagdef-3", "name":"FINANCE",    "attributeDefs":[], "owner":0},
+     "4":{"id":4, "guid":"tagdef-4", "name":"AUDIT",      "attributeDefs":[], "owner":0}
+   },
+  "tags": {
+     "1":{"id":1, "guid":"tag-1", "type":"PII",        "attributes":{}, "owner":0},
+     "2":{"id":2, "guid":"tag-2", "type":"EXPIRES_ON", "attributes":{"expiry_date":"2015/08/31"}, "owner":0},
+     "3":{"id":3, "guid":"tag-3", "type":"FINANCE",    "attributes":{}, "owner":0},
+     "4":{"id":4, "guid":"tag-4", "type":"AUDIT",      "attributes":{}, "owner":0}
+   },
+   "serviceResources": [
+     {
+       "id":1,
+       "guid":"resource-1",
+       "serviceName":"cl1_hive",
+       "resourceElements":{
+         "database":{"values":["hr"]},
+         "table":{"values":["employee"]},
+         "column":{"values":["ssn"]}
+       }
+     },
+     {
+       "id":2,
+       "guid":"resource-2",
+       "serviceName":"cl1_hive",
+       "resourceElements":{
+         "database":{"values":["finance"]},
+         "table":{"values":["tax_2010"]}
+       }
+     },
+     {
+       "id":3,
+       "guid":"resource-3",
+       "serviceName":"cl1_hive",
+       "resourceElements":{
+         "database":{"values":["finance"]},
+         "table":{"values":["tax_2010"]},
+         "column":{"values":["ssn"]}
+       }
+     }
+   ],
+
+   "resourceToTagIds": {
+     "1":[1],
+     "2":[2],
+     "3":[3]
+   }
+}
diff --git a/agents-common/src/test/resources/policyengine/comparison/fail/otherServicePolicies.json b/agents-common/src/test/resources/policyengine/comparison/fail/otherServicePolicies.json
new file mode 100644
index 0000000..f70737d
--- /dev/null
+++ b/agents-common/src/test/resources/policyengine/comparison/fail/otherServicePolicies.json
@@ -0,0 +1,225 @@
+{
+  "serviceName": "hivedev",
+  "serviceDef": {
+    "name": "hive",
+    "id": 3,
+    "resources": [
+      {
+        "name": "database",
+        "level": 1,
+        "mandatory": true,
+        "lookupSupported": true,
+        "matcher": "org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions": {
+          "wildCard": true,
+          "ignoreCase": true
+        },
+        "isValidLeaf": true,
+        "label": "Hive Database",
+        "description": "Hive Database"
+      },
+      {
+        "name": "table",
+        "level": 2,
+        "parent": "database",
+        "mandatory": true,
+        "lookupSupported": true,
+        "matcher": "org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions": {
+          "wildCard": true,
+          "ignoreCase": true
+        },
+        "isValidLeaf": true,
+        "label": "Hive Table",
+        "description": "Hive Table"
+      },
+      {
+        "name": "column",
+        "level": 3,
+        "parent": "table",
+        "mandatory": true,
+        "lookupSupported": true,
+        "matcher": "org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions": {
+          "wildCard": true,
+          "ignoreCase": true
+        },
+        "label": "Hive Column",
+        "description": "Hive Column"
+      }
+    ],
+    "accessTypes": [
+      {
+        "name": "select",
+        "label": "Select"
+      },
+      {
+        "name": "update",
+        "label": "Update"
+      },
+      {
+        "name": "create",
+        "label": "Create"
+      },
+      {
+        "name": "drop",
+        "label": "Drop"
+      },
+      {
+        "name": "alter",
+        "label": "Alter"
+      },
+      {
+        "name": "index",
+        "label": "Index"
+      },
+      {
+        "name": "lock",
+        "label": "Lock"
+      },
+      {
+        "name": "all",
+        "label": "All",
+        "impliedGrants": [
+          "select",
+          "update",
+          "create",
+          "drop",
+          "alter",
+          "index",
+          "lock"
+        ]
+      }
+    ]
+  },
+  "policies": [
+    {
+      "id": 1,
+      "name": "db=default: audit-all-access",
+      "isEnabled": true,
+      "isAuditEnabled": true,
+      "resources": {
+        "database": {
+          "values": [
+            "default"
+          ]
+        },
+        "table": {
+          "values": [
+            "*"
+          ]
+        },
+        "column": {
+          "values": [
+            "*"
+          ]
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [],
+          "users": [],
+          "groups": [
+            "public"
+          ],
+          "delegateAdmin": false
+        }
+      ]
+    },
+    {
+      "id": 2,
+      "name": "db=default",
+      "isEnabled": true,
+      "isAuditEnabled": true,
+      "resources": {
+        "database": {
+          "values": [
+            "default"
+          ]
+        },
+        "table": {
+          "values": [
+            "my-table"
+          ]
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {
+              "type": "create",
+              "isAllowed": true
+            },
+            {
+              "type": "drop",
+              "isAllowed": true
+            },
+            {
+              "type": "select",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "user1"
+          ],
+          "groups": [
+            "role-group1"
+          ],
+          "roles": [
+            "fin-admin",
+            "fin-group"
+          ],
+          "delegateAdmin": false
+        }
+      ]
+    },
+    {
+      "id": 3,
+      "name": "db=finance",
+      "isEnabled": true,
+      "isAuditEnabled": true,
+      "resources": {
+        "database": {
+          "values": [
+            "default"
+          ]
+        },
+        "table": {
+          "values": [
+            "my-table"
+          ]
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {
+              "type": "create",
+              "isAllowed": true
+            },
+            {
+              "type": "drop",
+              "isAllowed": true
+            },
+            {
+              "type": "select",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "user1"
+          ],
+          "groups": [
+            "role-group1"
+          ],
+          "roles": [
+            "fin-admin",
+            "fin-group"
+          ],
+          "delegateAdmin": false
+        }
+      ]
+    }
+
+  ]
+}
diff --git a/agents-common/src/test/resources/policyengine/comparison/fail/otherServiceTags.json b/agents-common/src/test/resources/policyengine/comparison/fail/otherServiceTags.json
new file mode 100644
index 0000000..66e63d1
--- /dev/null
+++ b/agents-common/src/test/resources/policyengine/comparison/fail/otherServiceTags.json
@@ -0,0 +1,55 @@
+{
+  "op": "add_or_update",
+  "serviceName": "cl1_hive",
+  "tagVersion": 1,
+  "tagUpdateTime": "20150924-22:26:33.000-+0000",
+  "tagDefinitions": {
+     "1":{"id":1, "guid":"tagdef-1", "name":"PII",        "attributeDefs":[], "owner":0},
+     "2":{"id":2, "guid":"tagdef-2", "name":"EXPIRES_ON", "attributeDefs":[{"name":"expiry_date", "type":"date"}], "owner":0},
+     "3":{"id":3, "guid":"tagdef-3", "name":"FINANCE",    "attributeDefs":[], "owner":0},
+     "4":{"id":4, "guid":"tagdef-4", "name":"AUDIT",      "attributeDefs":[], "owner":0}
+   },
+  "tags": {
+     "1":{"id":1, "guid":"tag-1", "type":"PII",        "attributes":{}, "owner":0},
+     "2":{"id":2, "guid":"tag-2", "type":"EXPIRES_ON", "attributes":{"expiry_date":"2015/08/31"}, "owner":0},
+     "3":{"id":3, "guid":"tag-3", "type":"FINANCE",    "attributes":{}, "owner":0},
+     "4":{"id":4, "guid":"tag-4", "type":"AUDIT",      "attributes":{}, "owner":0}
+   },
+   "serviceResources": [
+     {
+       "id":1,
+       "guid":"resource-1",
+       "serviceName":"cl1_hive",
+       "resourceElements":{
+         "database":{"values":["hr"]},
+         "table":{"values":["employee"]},
+         "column":{"values":["ssn"]}
+       }
+     },
+     {
+       "id":2,
+       "guid":"resource-2",
+       "serviceName":"cl1_hive",
+       "resourceElements":{
+         "database":{"values":["finance"]},
+         "table":{"values":["tax_2010"]}
+       }
+     },
+     {
+       "id":3,
+       "guid":"resource-3",
+       "serviceName":"cl1_hive",
+       "resourceElements":{
+         "database":{"values":["finance"]},
+         "table":{"values":["tax_2010"]},
+         "column":{"values":["ssn"]}
+       }
+     }
+   ],
+
+   "resourceToTagIds": {
+     "1":[1],
+     "2":[2],
+     "3":[3, 4]
+   }
+}
diff --git a/agents-common/src/test/resources/policyengine/comparison/success/myServicePolicies.json b/agents-common/src/test/resources/policyengine/comparison/success/myServicePolicies.json
new file mode 100644
index 0000000..2cfd8ec
--- /dev/null
+++ b/agents-common/src/test/resources/policyengine/comparison/success/myServicePolicies.json
@@ -0,0 +1,2 @@
+{"serviceName":"cm_hive","serviceId":6,"policyVersion":7,"policyUpdateTime":"20190730-17:47:46.221--0700","policies":[{"service":"cm_hive","name":"all - hiveservice","policyType":0,"policyPriority":0,"description":"Policy for all - hiveservice","isAuditEnabled":true,"resources":{"hiveservice":{"values":["*"],"isExcludes":false,"isRecursive":false}},"policyItems":[{"accesses":[{"type":"select","isAllowed":true},{"type":"update","isAllowed":true},{"type":"create","isAllowed":true},{"type": [...]
+
diff --git a/agents-common/src/test/resources/policyengine/comparison/success/myServiceTags.json b/agents-common/src/test/resources/policyengine/comparison/success/myServiceTags.json
new file mode 100644
index 0000000..626c023
--- /dev/null
+++ b/agents-common/src/test/resources/policyengine/comparison/success/myServiceTags.json
@@ -0,0 +1 @@
+{"op":"add_or_update","serviceName":"cm_hive","tagVersion":1007,"tagDefinitions":{},"tags":{"3":{"type":"PII","owner":0,"attributes":{"level":"Important"},"options":{},"validityPeriods":[],"id":3,"isEnabled":true},"10":{"type":"PII","owner":0,"attributes":{"level":"tag-1-level"},"options":{},"validityPeriods":[],"id":10,"isEnabled":true}},"serviceResources":[{"resourceElements":{"database":{"values":["default"],"isExcludes":false,"isRecursive":false},"column":{"values":["salary"],"isExcl [...]
\ No newline at end of file
diff --git a/agents-common/src/test/resources/policyengine/comparison/success/otherServicePolicies.json b/agents-common/src/test/resources/policyengine/comparison/success/otherServicePolicies.json
new file mode 100644
index 0000000..2cfd8ec
--- /dev/null
+++ b/agents-common/src/test/resources/policyengine/comparison/success/otherServicePolicies.json
@@ -0,0 +1,2 @@
+{"serviceName":"cm_hive","serviceId":6,"policyVersion":7,"policyUpdateTime":"20190730-17:47:46.221--0700","policies":[{"service":"cm_hive","name":"all - hiveservice","policyType":0,"policyPriority":0,"description":"Policy for all - hiveservice","isAuditEnabled":true,"resources":{"hiveservice":{"values":["*"],"isExcludes":false,"isRecursive":false}},"policyItems":[{"accesses":[{"type":"select","isAllowed":true},{"type":"update","isAllowed":true},{"type":"create","isAllowed":true},{"type": [...]
+
diff --git a/agents-common/src/test/resources/policyengine/comparison/success/otherServiceTags.json b/agents-common/src/test/resources/policyengine/comparison/success/otherServiceTags.json
new file mode 100644
index 0000000..1d8043d
--- /dev/null
+++ b/agents-common/src/test/resources/policyengine/comparison/success/otherServiceTags.json
@@ -0,0 +1 @@
+{"serviceName":"cm_hive","tagVersion":1007,"tagDefinitions":{},"tags":{"3":{"id":3,"isEnabled":true,"type":"PII","owner":0,"attributes":{"level":"Important"},"options":{},"validityPeriods":[]},"10":{"id":10,"isEnabled":true,"type":"PII","owner":0,"attributes":{"level":"tag-1-level"},"options":{},"validityPeriods":[]}},"serviceResources":[{"id":2,"isEnabled":true,"version":2,"resourceElements":{"database":{"values":["default"],"isExcludes":false,"isRecursive":false},"column":{"values":["s [...]
\ No newline at end of file
diff --git a/agents-common/src/test/resources/policyengine/test_compare_policyengines.json b/agents-common/src/test/resources/policyengine/test_compare_policyengines.json
new file mode 100644
index 0000000..9a0d846
--- /dev/null
+++ b/agents-common/src/test/resources/policyengine/test_compare_policyengines.json
@@ -0,0 +1,30 @@
+{
+  "testCases": [
+    {
+      "name": "success",
+      "me": {
+        "servicePoliciesFile": "/policyengine/comparison/success/myServicePolicies.json",
+        "serviceTagsFile": "/policyengine/comparison/success/myServiceTags.json"
+      },
+      "other": {
+        "servicePoliciesFile": "/policyengine/comparison/success/otherServicePolicies.json",
+        "serviceTagsFile": "/policyengine/comparison/success/otherServiceTags.json"
+      },
+      "isPolicyEnginesEqual": true,
+      "isTagsEqual": true
+    },
+    {
+      "name": "fail",
+      "me": {
+        "servicePoliciesFile": "/policyengine/comparison/fail/myServicePolicies.json",
+        "serviceTagsFile": "/policyengine/comparison/fail/myServiceTags.json"
+      },
+      "other": {
+        "servicePoliciesFile": "/policyengine/comparison/fail/otherServicePolicies.json",
+        "serviceTagsFile": "/policyengine/comparison/fail/otherServiceTags.json"
+      },
+      "isPolicyEnginesEqual": false,
+      "isTagsEqual": false
+    }
+  ]
+}
\ No newline at end of file
diff --git a/agents-common/src/test/resources/policyengine/test_policyengine_hive.json b/agents-common/src/test/resources/policyengine/test_policyengine_hive.json
index 66b3644..efc1dcc 100644
--- a/agents-common/src/test/resources/policyengine/test_policyengine_hive.json
+++ b/agents-common/src/test/resources/policyengine/test_policyengine_hive.json
@@ -6,7 +6,7 @@
     "id":3,
     "resources":[
       {"name":"database","level":1,"mandatory":true,"lookupSupported":true,"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher","matcherOptions":{"wildCard":true, "ignoreCase":true},"label":"Hive Database","description":"Hive Database"},
-      {"name":"url","level":1,"mandatory":true,"lookupSupported":false,"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher","matcherOptions":{"wildCard":true, "ignoreCase":true},"label":"URL","description":"URL"},
+      {"name":"url","level":1,"mandatory":true,"lookupSupported":false,"recursiveSupported":true,"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher","matcherOptions":{"wildCard":true, "ignoreCase":true},"label":"URL","description":"URL"},
       {"name":"hiveservice","level":1,"mandatory":true,"lookupSupported":false,"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher","matcherOptions":{"wildCard":true, "ignoreCase":true},"label":"HiveService","description":"HiveService"},
       {"name":"table","level":2,"parent":"database","mandatory":true,"lookupSupported":true,"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher","matcherOptions":{"wildCard":true, "ignoreCase":true},"label":"Hive Table","description":"Hive Table"},
       {"name":"udf","level":2,"parent":"database","mandatory":true,"lookupSupported":true,"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher","matcherOptions":{"wildCard":true, "ignoreCase":true},"label":"Hive UDF","description":"Hive UDF"},
@@ -43,6 +43,13 @@
   },
 
   "policies":[
+    {"id":200,"name":"url=s3a://qe-s3-bucket-mst/test_abcd/abcd; s3a://qe-s3-bucket-mst/demo/*: URL-access-policy","isEnabled":true,"isAuditEnabled":true,
+      "resources":{"url":{"values": ["s3a://qe-s3-bucket-mst/test_abcd/abcd", "s3a://qe-s3-bucket-mst/demo"], "isRecursive":true}},
+      "policyItems":[
+        {"accesses":[{"type":"read","isAllowed":true},{"type":"write","isAllowed":true}],"users":[],"groups":["public"],"delegateAdmin":false}
+      ]
+    }
+  ,
     {"id":1,"name":"db=default: audit-all-access","isEnabled":true,"isAuditEnabled":true,
      "resources":{"database":{"values":["default"]},"table":{"values":["*"]},"column":{"values":["*"]}},
      "policyItems":[
@@ -82,6 +89,30 @@
   ],
 
   "tests":[
+    {"name":"ALLOW 'read s3a://qe-s3-bucket-mst/demo;' for user1",
+      "request":{
+        "resource":{"elements":{"url":"s3a://qe-s3-bucket-mst/demo"}},
+        "accessType":"read","user":"user1","userGroups":["users"],"requestData":"read s3a://qe-s3-bucket-mst/demo for user1"
+      },
+      "result":{"isAudited":true,"isAllowed":true,"policyId":200}
+    }
+  ,
+    {"name":"ALLOW 'read s3a://qe-s3-bucket-mst/test_abcd/abcd;' for user1",
+      "request":{
+        "resource":{"elements":{"url":"s3a://qe-s3-bucket-mst/test_abcd/abcd"}},
+        "accessType":"read","user":"user1","userGroups":["users"],"requestData":"read s3a://qe-s3-bucket-mst/test_abcd/abcd for user1"
+      },
+      "result":{"isAudited":true,"isAllowed":true,"policyId":200}
+    }
+  ,
+    {"name":"DENY 'read s3a://qe-s3-bucket-mst/test_abcd/abcd/non-existent;' for user1",
+      "request":{
+        "resource":{"elements":{"url":"s3a://qe-s3-bucket-mst/test_abcd/abcd/non-existent"}},
+        "accessType":"read","user":"user1","userGroups":["users"],"requestData":"read s3a://qe-s3-bucket-mst/test_abcd/abcd/non-existent for user1"
+      },
+      "result":{"isAudited":false,"isAllowed":false,"policyId":-1}
+    }
+  ,
     {"name":"DENY 'select tmp_1 from db1.tmp ;' for user1",
       "request":{
         "resource":{"elements":{"database":"db1", "table":"tmp", "column":"tmp_1"}},
diff --git a/agents-common/src/test/resources/policyengine/test_policyengine_hive_incremental_add.json b/agents-common/src/test/resources/policyengine/test_policyengine_hive_incremental_add.json
index fce1572..234673e 100644
--- a/agents-common/src/test/resources/policyengine/test_policyengine_hive_incremental_add.json
+++ b/agents-common/src/test/resources/policyengine/test_policyengine_hive_incremental_add.json
@@ -321,17 +321,59 @@
       "result":{"isAudited":true,"isAllowed":true,"policyId":3}
     }
   ],
-  "updatedPolicies": [
-    {"changeType": 0,
-      "policy": {
-        "id":4,"version":1,"name":"db=db1; table=tmp; column=tmp*","isEnabled":true,"isAuditEnabled":true, "policyType":0,"serviceType":"hive",
-        "resources":{"database":{"values":["db1"]},"table":{"values":["tmp"]},"column":{"values":["tmp*"], "isExcludes":true}},
-        "policyItems":[
-          {"accesses":[{"type":"select","isAllowed":true}],"users":["user1","user2"],"groups":["group1","group2"],"delegateAdmin":false}
-        ]
+  "updatedPolicies": {
+    "policyDeltas": [
+      {
+        "changeType": 0,
+        "policy": {
+          "id": 4,
+          "version": 1,
+          "name": "db=db1; table=tmp; column=tmp*",
+          "isEnabled": true,
+          "isAuditEnabled": true,
+          "policyType": 0,
+          "serviceType": "hive",
+          "resources": {
+            "database": {
+              "values": [
+                "db1"
+              ]
+            },
+            "table": {
+              "values": [
+                "tmp"
+              ]
+            },
+            "column": {
+              "values": [
+                "tmp*"
+              ],
+              "isExcludes": true
+            }
+          },
+          "policyItems": [
+            {
+              "accesses": [
+                {
+                  "type": "select",
+                  "isAllowed": true
+                }
+              ],
+              "users": [
+                "user1",
+                "user2"
+              ],
+              "groups": [
+                "group1",
+                "group2"
+              ],
+              "delegateAdmin": false
+            }
+          ]
+        }
       }
-    }
-  ],
+    ]
+  },
   "updatedTests": [
     {"name":"ALLOW 'select abc_1 from db1.tmp ;' for user1",
       "request":{
diff --git a/agents-common/src/test/resources/policyengine/test_policyengine_hive_incremental_delete.json b/agents-common/src/test/resources/policyengine/test_policyengine_hive_incremental_delete.json
index e60bfe9..1ba9fba 100644
--- a/agents-common/src/test/resources/policyengine/test_policyengine_hive_incremental_delete.json
+++ b/agents-common/src/test/resources/policyengine/test_policyengine_hive_incremental_delete.json
@@ -53,7 +53,99 @@
       ],
       "policyType":0
     }
-  ],
+    ],
+    "securityZones": {
+      "zonehive": {
+        "containsAssociatedTagService": false,
+        "policies": [
+          {
+            "description": "First",
+            "id": 38,
+            "isAuditEnabled": true,
+            "isDenyAllElse": false,
+            "isEnabled": true,
+            "name": "First",
+            "options": {},
+            "policyItems": [
+              {
+                "accesses": [
+                  {
+                    "isAllowed": true,
+                    "type": "select"
+                  }
+                ],
+                "conditions": [],
+                "delegateAdmin": true,
+                "groups": [],
+                "roles": [],
+                "users": [
+                  "hive",
+                  "rangerlookup"
+                ]
+              }
+            ],
+            "policyLabels": [],
+            "policyPriority": 0,
+            "policyType": 0,
+            "resources":{"database":{"values":["db-zone"]},"table":{"values":["tbl1"]},"column":{"values":["*"]}},
+            "service": "hivedev",
+            "serviceType": "hive",
+            "validitySchedules": [],
+            "version": 1,
+            "zoneName": "zonehive"
+          },
+          {
+            "description": "Second",
+            "id": 39,
+            "isAuditEnabled": true,
+            "isDenyAllElse": false,
+            "isEnabled": true,
+            "name": "Second",
+            "options": {},
+            "policyItems": [
+              {
+                "accesses": [
+                  {
+                    "isAllowed": true,
+                    "type": "select"
+                  }
+                ],
+                "conditions": [],
+                "delegateAdmin": false,
+                "groups": [],
+                "roles": [],
+                "users": [
+                  "keyadmin"
+                ]
+              }
+            ],
+            "policyLabels": [],
+            "policyPriority": 0,
+            "policyType": 0,
+            "resources":{"database":{"values":["db-zone"]},"table":{"values":["tbl2"]},"column":{"values":["*"]}},
+            "rowFilterPolicyItems": [],
+            "service": "hivedev",
+            "serviceType": "hive",
+            "validitySchedules": [],
+            "version": 1,
+            "zoneName": "zonehive"
+          }
+        ],
+        "zoneName": "zonehive",
+        "resources": [
+          {
+            "database": [
+              "db-zone"
+            ],
+            "table": [
+              "*"
+            ],
+            "column": [
+              "*"
+            ]
+          }]
+      }
+    },
 
   "tests":[
     {"name":"DENY 'select tmp_1 from db1.tmp ;' for user1",
@@ -329,16 +421,48 @@
       "result":{"isAudited":true,"isAllowed":true,"policyId":3}
     }
   ],
-  "updatedPolicies": [
-    {"changeType": 2,
-      "policy": {
-        "id": 4,
-        "version": 1,
-        "policyType": 0,
-        "serviceType": "hive"
-      }
-    }
-  ],
+  "updatedPolicies": {
+      "securityZones": {
+        "zonehive": {
+          "containsAssociatedTagService": false,
+          "policyDeltas": [
+            {
+              "changeType": 2,
+              "policy": {
+                "id": 39,
+                "policyType": 0,
+                "serviceType": "hive",
+                "zoneName": "zonehive"
+              }
+            }
+          ],
+
+          "resources": [
+            {
+              "database": [
+                "db-zone"
+              ],
+              "table": [
+                "tbl2"
+              ],
+              "column": [
+                "*"
+              ]
+            }],
+            "zoneName": "zonehive"
+        }
+      },
+      "policyDeltas": [
+        {"changeType": 2,
+          "policy": {
+            "id": 4,
+            "version": 1,
+            "policyType": 0,
+            "serviceType": "hive"
+          }
+        }
+    ]
+  },
   "updatedTests": [
     {"name":"DENY 'select abc_1 from db1.tmp ;' for user1",
       "request":{
diff --git a/agents-common/src/test/resources/policyengine/test_policyengine_hive_incremental_update.json b/agents-common/src/test/resources/policyengine/test_policyengine_hive_incremental_update.json
index dea1c2b..2959225 100644
--- a/agents-common/src/test/resources/policyengine/test_policyengine_hive_incremental_update.json
+++ b/agents-common/src/test/resources/policyengine/test_policyengine_hive_incremental_update.json
@@ -329,16 +329,58 @@
       "result":{"isAudited":true,"isAllowed":true,"policyId":3}
     }
   ],
-  "updatedPolicies": [
-    {"changeType": 1,
-      "policy": {"id":4,"version":1,"name":"db=db1; table=tmp; column=tmp*","isEnabled":true,"isAuditEnabled":true,"serviceType":"hive","policyType":0,
-      "resources":{"database":{"values":["db1"]},"table":{"values":["tmp"]},"column":{"values":["tmp*"]}},
-      "policyItems":[
-        {"accesses":[{"type":"select","isAllowed":true}],"users":["user1","user2"],"groups":["group1","group2"],"delegateAdmin":false}
-      ]
+  "updatedPolicies": {
+    "policyDeltas": [
+      {
+        "changeType": 1,
+        "policy": {
+          "id": 4,
+          "version": 1,
+          "name": "db=db1; table=tmp; column=tmp*",
+          "isEnabled": true,
+          "isAuditEnabled": true,
+          "serviceType": "hive",
+          "policyType": 0,
+          "resources": {
+            "database": {
+              "values": [
+                "db1"
+              ]
+            },
+            "table": {
+              "values": [
+                "tmp"
+              ]
+            },
+            "column": {
+              "values": [
+                "tmp*"
+              ]
+            }
+          },
+          "policyItems": [
+            {
+              "accesses": [
+                {
+                  "type": "select",
+                  "isAllowed": true
+                }
+              ],
+              "users": [
+                "user1",
+                "user2"
+              ],
+              "groups": [
+                "group1",
+                "group2"
+              ],
+              "delegateAdmin": false
+            }
+          ]
+        }
       }
-    }
-  ],
+    ]
+  },
   "updatedTests": [
     {"name":"DENY 'select abc_1 from db1.tmp ;' for user1",
       "request":{
diff --git a/knox-agent/src/main/java/org/apache/ranger/admin/client/RangerAdminJersey2RESTClient.java b/knox-agent/src/main/java/org/apache/ranger/admin/client/RangerAdminJersey2RESTClient.java
index db16d73..5dcce11 100644
--- a/knox-agent/src/main/java/org/apache/ranger/admin/client/RangerAdminJersey2RESTClient.java
+++ b/knox-agent/src/main/java/org/apache/ranger/admin/client/RangerAdminJersey2RESTClient.java
@@ -72,6 +72,7 @@ public class RangerAdminJersey2RESTClient extends AbstractRangerAdminClient {
 	String _serviceName = null;
 	String _clusterName = null;
 	String _supportsPolicyDeltas = null;
+	String _supportsTagDeltas = null;
 	String _pluginId = null;
 	int	   _restClientConnTimeOutMs;
 	int	   _restClientReadTimeOutMs;
@@ -98,12 +99,16 @@ public class RangerAdminJersey2RESTClient extends AbstractRangerAdminClient {
 		if (!"true".equalsIgnoreCase(_supportsPolicyDeltas)) {
 			_supportsPolicyDeltas = "false";
 		}
+		_supportsTagDeltas = RangerConfiguration.getInstance().get(configPropertyPrefix + ".tag.rest.supports.tag.deltas", "false");
+		if (!"true".equalsIgnoreCase(_supportsTagDeltas)) {
+			_supportsTagDeltas = "false";
+		}
 
 		configURLs = RangerRESTClient.getURLs(tmpUrl);
 		this.lastKnownActiveUrlIndex = new Random().nextInt(configURLs.size());
 		_baseUrl = configURLs.get(this.lastKnownActiveUrlIndex);
 		_isSSL = _utils.isSsl(_baseUrl);
-		LOG.info("Init params: " + String.format("Base URL[%s], SSL Config filename[%s], ServiceName=[%s], SupportsPolicyDeltas=[%s], ConfigURLs=[%s]", _baseUrl, _sslConfigFileName, _serviceName, _supportsPolicyDeltas, configURLs));
+		LOG.info("Init params: " + String.format("Base URL[%s], SSL Config filename[%s], ServiceName=[%s], SupportsPolicyDeltas=[%s], ConfigURLs=[%s]", _baseUrl, _sslConfigFileName, _serviceName, _supportsPolicyDeltas, _supportsTagDeltas, configURLs));
 		
 		_client = getClient();
 		_client.property(ClientProperties.CONNECT_TIMEOUT, _restClientConnTimeOutMs);
@@ -285,7 +290,7 @@ public class RangerAdminJersey2RESTClient extends AbstractRangerAdminClient {
 		queryParams.put(RangerRESTUtils.REST_PARAM_LAST_KNOWN_POLICY_VERSION, Long.toString(lastKnownVersion));
 		queryParams.put(RangerRESTUtils.REST_PARAM_LAST_ACTIVATION_TIME, Long.toString(lastActivationTimeInMillis));
 		queryParams.put(RangerRESTUtils.REST_PARAM_PLUGIN_ID, _pluginId);
-		queryParams.put(RangerRESTUtils.REST_PARAM_SUPPORTS_POLICY_DELTAS, _supportsPolicyDeltas);
+		queryParams.put(RangerRESTUtils.REST_PARAM_SUPPORTS_TAG_DELTAS, _supportsTagDeltas);
 
 		String relativeURL = null;
 		ServiceTags serviceTags = null;
diff --git a/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql b/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql
index 399fd8a..8e42bd9 100644
--- a/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql
+++ b/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql
@@ -16,6 +16,7 @@
 DROP VIEW IF EXISTS `vx_trx_log`;
 DROP TABLE IF EXISTS `x_security_zone_ref_resource`;
 DROP TABLE IF EXISTS `x_policy_change_log`;
+DROP TABLE IF EXISTS `x_tag_change_log`;
 DROP TABLE IF EXISTS `x_policy_ref_group`;
 DROP TABLE IF EXISTS `x_policy_ref_user`;
 DROP TABLE IF EXISTS `x_policy_ref_datamask_type`;
@@ -1540,6 +1541,19 @@ CREATE TABLE IF NOT EXISTS `x_role_ref_role`(
  CONSTRAINT `x_role_ref_role_FK_role_ref_id` FOREIGN KEY (`role_ref_id`) REFERENCES `x_role` (`id`)
 )ROW_FORMAT=DYNAMIC;
 
+CREATE TABLE IF NOT EXISTS `x_tag_change_log` (
+`id` bigint(20) NOT NULL AUTO_INCREMENT,
+`create_time` datetime NULL DEFAULT NULL,
+`service_id` bigint(20) NOT NULL,
+`change_type` int(11) NOT NULL,
+`service_tags_version` bigint(20) NOT NULL DEFAULT '0',
+`service_resource_id` bigint(20) NULL DEFAULT NULL,
+`tag_id` bigint(20) NULL DEFAULT NULL,
+primary key (`id`)
+) ROW_FORMAT=DYNAMIC;
+
+CREATE INDEX x_tag_change_log_IDX_service_id ON x_tag_change_log(service_id);
+CREATE INDEX x_tag_change_log_IDX_tag_version ON x_tag_change_log(service_tags_version);
 CREATE INDEX x_policy_change_log_IDX_service_id ON x_policy_change_log(service_id);
 CREATE INDEX x_policy_change_log_IDX_policy_version ON x_policy_change_log(policy_version);
 CREATE INDEX x_service_config_def_IDX_def_id ON x_service_config_def(def_id);
@@ -1650,6 +1664,7 @@ INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('040',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('041',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('042',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
+INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('043',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('DB_PATCHES',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
 
 INSERT INTO x_user_module_perm (user_id,module_id,create_time,update_time,added_by_id,upd_by_id,is_allowed)
diff --git a/security-admin/db/mysql/patches/043-add-tag-change-log-table.sql b/security-admin/db/mysql/patches/043-add-tag-change-log-table.sql
new file mode 100644
index 0000000..bf81a02
--- /dev/null
+++ b/security-admin/db/mysql/patches/043-add-tag-change-log-table.sql
@@ -0,0 +1,31 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+DROP TABLE IF EXISTS `x_tag_change_log`;
+
+CREATE TABLE IF NOT EXISTS `x_tag_change_log` (
+`id` bigint(20) NOT NULL AUTO_INCREMENT,
+`create_time` datetime NULL DEFAULT NULL,
+`service_id` bigint(20) NOT NULL,
+`change_type` int(11) NOT NULL,
+`service_tags_version` bigint(20) NOT NULL DEFAULT '0',
+`service_resource_id` bigint(20) NULL DEFAULT NULL,
+`tag_id` bigint(20) NULL DEFAULT NULL,
+primary key (`id`)
+) ROW_FORMAT=DYNAMIC;
+
+CREATE INDEX x_tag_change_log_IDX_service_id ON x_tag_change_log(service_id);
+CREATE INDEX x_tag_change_log_IDX_tag_version ON x_tag_change_log(service_tags_version);
+
diff --git a/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql b/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql
index 73b9173..1b158c9 100644
--- a/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql
+++ b/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql
@@ -99,6 +99,7 @@ call spdropsequence('X_SEC_ZONE_REF_TAG_SRVC_SEQ');
 call spdropsequence('X_RANGER_GLOBAL_STATE_SEQ');
 call spdropsequence('X_SECURITY_ZONE_SEQ');
 call spdropsequence('X_POLICY_CHANGE_LOG_SEQ');
+call spdropsequence('X_TAG_CHANGE_LOG_SEQ');
 
 CREATE SEQUENCE SEQ_GEN_IDENTITY START WITH 1 INCREMENT BY 1 NOCACHE NOCYCLE;
 CREATE SEQUENCE X_ACCESS_AUDIT_SEQ START WITH 1 INCREMENT BY 1 NOCACHE NOCYCLE;
@@ -168,6 +169,7 @@ CREATE SEQUENCE X_SEC_ZONE_REF_RESOURCE_SEQ START WITH 1 INCREMENT BY 1 NOCACHE
 CREATE SEQUENCE X_SEC_ZONE_REF_USER_SEQ START WITH 1 INCREMENT BY 1 NOCACHE NOCYCLE;
 CREATE SEQUENCE X_SEC_ZONE_REF_GROUP_SEQ START WITH 1 INCREMENT BY 1 NOCACHE NOCYCLE;
 CREATE SEQUENCE X_POLICY_CHANGE_LOG_SEQ START WITH 1 INCREMENT BY 1 NOCACHE NOCYCLE;
+CREATE SEQUENCE X_TAG_CHANGE_LOG_SEQ START WITH 1 INCREMENT BY 1 NOCACHE NOCYCLE;
 call spdropsequence('X_DB_VERSION_H_SEQ');
 CREATE SEQUENCE X_DB_VERSION_H_SEQ START WITH 1 INCREMENT BY 1 NOCACHE NOCYCLE;
 
@@ -204,6 +206,7 @@ END;/
 call spdropview('vx_trx_log');
 call spdroptable('x_security_zone_ref_resource');
 call spdroptable('x_policy_change_log');
+call spdroptable('x_tag_change_log');
 call spdroptable('x_policy_ref_group');
 call spdroptable('x_policy_ref_user');
 call spdroptable('x_policy_ref_datamask_type');
@@ -1499,6 +1502,20 @@ CREATE INDEX x_plcy_chng_log_IDX_service_id ON x_policy_change_log(service_id);
 CREATE INDEX x_plcy_chng_log_IDX_policy_ver ON x_policy_change_log(policy_version);
 COMMIT;
 
+CREATE TABLE x_tag_change_log (
+id NUMBER(20) NOT NULL,
+create_time DATE DEFAULT NULL NULL,
+service_id NUMBER(20) NOT NULL,
+change_type NUMBER(11) NOT NULL,
+service_tags_version NUMBER(20) DEFAULT '0' NOT NULL,
+service_resource_id NUMBER(20) DEFAULT NULL NULL,
+tag_id NUMBER(20) DEFAULT NULL NULL,
+primary key (id)
+);
+CREATE INDEX x_tag_chng_log_IDX_service_id ON x_tag_change_log(service_id);
+CREATE INDEX x_tag_chng_log_IDX_tag_ver ON x_tag_change_log(service_tags_version);
+COMMIT;
+
 CREATE TABLE x_role(
 id NUMBER(20) NOT NULL,
 create_time DATE DEFAULT NULL NULL,
@@ -1843,6 +1860,7 @@ INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,act
 INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval, '040',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
 INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval, '041',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
 INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval, '042',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
+INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval, '043',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
 INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval, 'DB_PATCHES',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
 INSERT INTO x_user_module_perm (id,user_id,module_id,create_time,update_time,added_by_id,upd_by_id,is_allowed) VALUES (X_USER_MODULE_PERM_SEQ.nextval,getXportalUIdByLoginId('admin'),getModulesIdByName('Reports'),sys_extract_utc(systimestamp),sys_extract_utc(systimestamp),getXportalUIdByLoginId('admin'),getXportalUIdByLoginId('admin'),1);
 INSERT INTO x_user_module_perm (id,user_id,module_id,create_time,update_time,added_by_id,upd_by_id,is_allowed) VALUES (X_USER_MODULE_PERM_SEQ.nextval,getXportalUIdByLoginId('admin'),getModulesIdByName('Resource Based Policies'),sys_extract_utc(systimestamp),sys_extract_utc(systimestamp),getXportalUIdByLoginId('admin'),getXportalUIdByLoginId('admin'),1);
diff --git a/security-admin/db/oracle/patches/043-add-tag-change-log-table.sql b/security-admin/db/oracle/patches/043-add-tag-change-log-table.sql
new file mode 100644
index 0000000..691cbdb
--- /dev/null
+++ b/security-admin/db/oracle/patches/043-add-tag-change-log-table.sql
@@ -0,0 +1,60 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE OR REPLACE PROCEDURE spdropsequence(ObjName IN varchar2)
+IS
+v_counter integer;
+BEGIN
+    select count(*) into v_counter from user_sequences where sequence_name = upper(ObjName);
+      if (v_counter > 0) then
+        execute immediate 'DROP SEQUENCE ' || ObjName;
+      end if;
+END;/
+/
+
+call spdropsequence('X_TAG_CHANGE_LOG_SEQ');
+
+CREATE OR REPLACE PROCEDURE spdroptable(ObjName IN varchar2)
+IS
+v_counter integer;
+BEGIN
+    select count(*) into v_counter from user_tables where table_name = upper(ObjName);
+     if (v_counter > 0) then
+     execute immediate 'drop table ' || ObjName || ' cascade constraints';
+     end if;
+END;/
+/
+
+call spdroptable('X_TAG_CHANGE_LOG');
+
+CREATE SEQUENCE X_TAG_CHANGE_LOG_SEQ START WITH 1 INCREMENT BY 1 NOCACHE NOCYCLE;
+CREATE TABLE x_tag_change_log (
+id NUMBER(20) NOT NULL,
+create_time DATE DEFAULT NULL NULL,
+service_id NUMBER(20) NOT NULL,
+change_type NUMBER(11) NOT NULL,
+service_tags_version NUMBER(20) DEFAULT '0' NOT NULL,
+service_resource_id NUMBER(20) DEFAULT NULL NULL,
+tag_id NUMBER(20) DEFAULT NULL NULL,
+primary key (id)
+);
+CREATE INDEX x_tag_chng_log_IDX_service_id ON x_tag_change_log(service_id);
+CREATE INDEX x_tag_chng_log_IDX_tag_ver ON x_tag_change_log(service_tags_version);
+COMMIT;
+
+GRANT ALL ON x_tag_change_log TO rangeradmin;
+GRANT ALL ON X_TAG_CHANGE_LOG_SEQ TO rangeradmin;
+COMMIT;
+
diff --git a/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql b/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql
index 0e8fd9c..0034759 100644
--- a/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql
+++ b/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql
@@ -15,6 +15,7 @@
 
 DROP TABLE IF EXISTS x_security_zone_ref_resource CASCADE;
 DROP TABLE IF EXISTS x_policy_change_log;
+DROP TABLE IF EXISTS x_tag_change_log;
 DROP TABLE IF EXISTS x_policy_ref_group CASCADE;
 DROP TABLE IF EXISTS x_policy_ref_user CASCADE;
 DROP TABLE IF EXISTS x_policy_ref_datamask_type CASCADE;
@@ -99,6 +100,7 @@ DROP SEQUENCE IF EXISTS x_sec_zone_ref_tag_srvc_SEQ;
 DROP SEQUENCE IF EXISTS x_ranger_global_state_seq;
 DROP SEQUENCE IF EXISTS x_security_zone_seq;
 DROP SEQUENCE IF EXISTS x_policy_change_log_seq;
+DROP SEQUENCE IF EXISTS x_tag_change_log_seq;
 DROP SEQUENCE IF EXISTS x_policy_ref_group_seq;
 DROP SEQUENCE IF EXISTS x_policy_ref_user_seq;
 DROP SEQUENCE IF EXISTS x_policy_ref_datamask_type_seq;
@@ -1553,6 +1555,24 @@ priv_type INT DEFAULT NULL NULL,
 );
 commit;
 
+CREATE SEQUENCE x_tag_change_log_seq;
+
+CREATE TABLE x_tag_change_log (
+id BIGINT DEFAULT nextval('x_tag_change_log_seq'::regclass),
+create_time TIMESTAMP DEFAULT NULL NULL,
+service_id bigint NOT NULL,
+change_type int NOT NULL,
+service_tags_version bigint DEFAULT '0' NOT NULL,
+service_resource_id bigint DEFAULT NULL NULL,
+tag_id bigint DEFAULT NULL NULL,
+primary key (id)
+);
+commit;
+
+CREATE INDEX x_tag_change_log_IDX_service_id ON x_tag_change_log(service_id);
+CREATE INDEX x_tag_change_log_IDX_tag_version ON x_tag_change_log(service_tags_version);
+commit;
+
 CREATE INDEX x_policy_change_log_IDX_service_id ON x_policy_change_log(service_id);
 CREATE INDEX x_policy_change_log_IDX_policy_version ON x_policy_change_log(policy_version);
 commit;
@@ -1767,6 +1787,7 @@ INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('040',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('041',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('042',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
+INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('043',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('DB_PATCHES',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
 
 INSERT INTO x_user_module_perm (user_id,module_id,create_time,update_time,added_by_id,upd_by_id,is_allowed) VALUES
diff --git a/security-admin/db/postgres/patches/043-add-tag-change-log-table.sql b/security-admin/db/postgres/patches/043-add-tag-change-log-table.sql
new file mode 100644
index 0000000..3e060d7
--- /dev/null
+++ b/security-admin/db/postgres/patches/043-add-tag-change-log-table.sql
@@ -0,0 +1,39 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+DROP TABLE IF EXISTS x_tag_change_log;
+DROP SEQUENCE IF EXISTS x_tag_change_log_seq;
+
+CREATE SEQUENCE x_tag_change_log_seq;
+
+CREATE TABLE x_tag_change_log (
+id BIGINT DEFAULT nextval('x_tag_change_log_seq'::regclass),
+create_time TIMESTAMP DEFAULT NULL NULL,
+service_id bigint NOT NULL,
+change_type int NOT NULL,
+service_tags_version bigint DEFAULT '0' NOT NULL,
+service_resource_id bigint DEFAULT NULL NULL,
+tag_id bigint DEFAULT NULL NULL,
+primary key (id)
+);
+commit;
+CREATE INDEX x_tag_change_log_IDX_service_id ON x_tag_change_log(service_id);
+CREATE INDEX x_tag_change_log_IDX_tag_version ON x_tag_change_log(service_tags_version);
+commit;
+
+-- grant all privileges on x_tag_change_log to rangeradmin;
+-- grant all privileges on x_tag_change_log_seq to rangeradmin;
+
+-- commit;
diff --git a/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql b/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql
index 5d37f08..9dc7656 100644
--- a/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql
+++ b/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql
@@ -39,6 +39,8 @@ call dbo.removeForeignKeysAndTable('x_security_zone_ref_resource')
 GO
 call dbo.removeForeignKeysAndTable('x_policy_change_log')
 GO
+call dbo.removeForeignKeysAndTable('x_tag_change_log')
+GO
 call dbo.removeForeignKeysAndTable('x_policy_ref_group')
 GO
 call dbo.removeForeignKeysAndTable('x_policy_ref_user')
@@ -1221,6 +1223,18 @@ CREATE TABLE dbo.x_policy_change_log(
 )
 GO
 
+CREATE TABLE IF NOT EXISTS dbo.x_tag_change_log (
+id bigint IDENTITY NOT NULL,
+create_time datetime DEFAULT NULL NULL,
+service_id bigint NOT NULL,
+change_type int NOT NULL,
+service_tags_version  bigint DEFAULT 0 NOT NULL,
+service_resource_id bigint DEFAULT NULL NULL,
+tag_id bigint DEFAULT NULL NULL,
+CONSTRAINT x_tag_change_log_PK_id PRIMARY KEY CLUSTERED(id)
+)
+GO
+
 CREATE TABLE dbo.x_role(
 id bigint IDENTITY NOT NULL,
 create_time datetime DEFAULT NULL NULL,
@@ -2032,6 +2046,10 @@ CREATE NONCLUSTERED INDEX x_policy_change_log_IDX_service_id ON dbo.x_policy_cha
 GO
 CREATE NONCLUSTERED INDEX x_policy_change_log_IDX_policy_version ON dbo.x_policy_change_log(policy_version ASC)
 GO
+CREATE NONCLUSTERED INDEX x_tag_change_log_IDX_service_id ON dbo.x_tag_change_log(service_id ASC);
+GO
+CREATE NONCLUSTERED INDEX x_tag_change_log_IDX_tag_version ON dbo.x_tag_change_log(service_tags_version ASC);
+GO
 insert into x_portal_user (create_time,update_time,first_name,last_name,pub_scr_name,login_id,password,email,status) values (GETDATE(),GETDATE(),'Admin','','Admin','admin','ceb4f32325eda6142bd65215f4c0f371','',1)
 GO
 insert into x_portal_user_role (create_time,update_time,user_id,user_role,status) values (GETDATE(),GETDATE(),dbo.getXportalUIdByLoginId('admin'),'ROLE_SYS_ADMIN',1)
@@ -2128,6 +2146,8 @@ INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active
 GO
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('042',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 GO
+INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('043',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
+GO
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('DB_PATCHES',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 GO
 INSERT INTO x_user_module_perm (user_id,module_id,create_time,update_time,added_by_id,upd_by_id,is_allowed) VALUES (dbo.getXportalUIdByLoginId('admin'),dbo.getModulesIdByName('Reports'),CURRENT_TIMESTAMP,CURRENT_TIMESTAMP,dbo.getXportalUIdByLoginId('admin'),dbo.getXportalUIdByLoginId('admin'),1);
diff --git a/security-admin/db/sqlanywhere/patches/043-add-tag-change-log-table.sql b/security-admin/db/sqlanywhere/patches/043-add-tag-change-log-table.sql
new file mode 100644
index 0000000..74ae777
--- /dev/null
+++ b/security-admin/db/sqlanywhere/patches/043-add-tag-change-log-table.sql
@@ -0,0 +1,62 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE OR REPLACE PROCEDURE dbo.removeForeignKeysAndTable (IN table_name varchar(100))
+AS
+BEGIN
+	DECLARE @stmt VARCHAR(300)
+	DECLARE @tblname VARCHAR(300)
+	DECLARE @drpstmt VARCHAR(1000)
+	DECLARE cur CURSOR FOR select 'alter table dbo.' + table_name + ' drop constraint ' + role from SYS.SYSFOREIGNKEYS where foreign_creator ='dbo' and foreign_tname = table_name
+	OPEN cur WITH HOLD
+		fetch cur into @stmt
+		WHILE (@@sqlstatus = 0)
+		BEGIN
+			execute(@stmt)
+			fetch cur into @stmt
+		END
+	close cur
+	DEALLOCATE CURSOR cur
+	SET @tblname ='dbo.' + table_name;
+	SET @drpstmt = 'DROP TABLE IF EXISTS ' + @tblname;
+	execute(@drpstmt)
+END
+GO
+call dbo.removeForeignKeysAndTable('x_tag_change_log')
+GO
+
+CREATE TABLE IF NOT EXISTS dbo.x_tag_change_log (
+id bigint IDENTITY NOT NULL,
+create_time datetime DEFAULT NULL NULL,
+service_id bigint NOT NULL,
+change_type int NOT NULL,
+service_tags_version  bigint DEFAULT 0 NOT NULL,
+service_resource_id bigint DEFAULT NULL NULL,
+tag_id bigint DEFAULT NULL NULL,
+CONSTRAINT x_tag_change_log_PK_id PRIMARY KEY CLUSTERED(id)
+)
+GO
+CREATE NONCLUSTERED INDEX x_tag_change_log_IDX_service_id ON dbo.x_tag_change_log(service_id ASC);
+GO
+CREATE NONCLUSTERED INDEX x_tag_change_log_IDX_tag_version ON dbo.x_tag_change_log(service_tags_version);
+GO
+exit
+
+
+
+
+
+
+
diff --git a/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql b/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql
index 16600e0..9383d1a 100644
--- a/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql
+++ b/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql
@@ -625,6 +625,10 @@ IF (OBJECT_ID('x_policy_change_log') IS NOT NULL)
 BEGIN
     DROP TABLE [dbo].[x_policy_change_log]
 END
+IF (OBJECT_ID('x_tag_change_log') IS NOT NULL)
+BEGIN
+    DROP TABLE [dbo].[x_tag_change_log]
+END
 IF (OBJECT_ID('x_policy_ref_group') IS NOT NULL)
 BEGIN
     DROP TABLE [dbo].[x_policy_ref_group]
@@ -2574,6 +2578,33 @@ SET ANSI_NULLS ON
 SET QUOTED_IDENTIFIER ON
 SET ANSI_PADDING ON
 
+CREATE TABLE [dbo].[x_tag_change_log] (
+[id] [bigint] IDENTITY(1,1) NOT NULL,
+[create_time] [datetime2] DEFAULT NULL NULL,
+[service_id] [bigint] NOT NULL,
+[change_type] [int] NOT NULL,
+[service_tags_version] [bigint] DEFAULT 0 NOT NULL,
+[service_resource_id] [bigint]  DEFAULT NULL NULL,
+[tag_id] [bigint]  DEFAULT NULL NULL,
+PRIMARY KEY CLUSTERED
+(
+        [id] ASC
+)WITH (PAD_INDEX = OFF,STATISTICS_NORECOMPUTE = OFF,IGNORE_DUP_KEY = OFF,ALLOW_ROW_LOCKS = ON,ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
+) ON [PRIMARY]
+CREATE NONCLUSTERED INDEX [x_tag_change_log_IDX_service_id] ON [x_tag_change_log]
+(
+   [service_id] ASC
+)
+WITH (SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, IGNORE_DUP_KEY = OFF, ONLINE = OFF) ON [PRIMARY]
+CREATE NONCLUSTERED INDEX [x_tag_change_log_IDX_tag_version] ON [x_tag_change_log]
+(
+   [service_tags_version] ASC
+)
+
+WITH (SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, IGNORE_DUP_KEY = OFF, ONLINE = OFF) ON [PRIMARY]
+SET ANSI_NULLS ON
+SET QUOTED_IDENTIFIER ON
+SET ANSI_PADDING ON
 
 ALTER TABLE [dbo].[x_asset]  WITH CHECK ADD  CONSTRAINT [x_asset_FK_added_by_id] FOREIGN KEY([added_by_id])
 REFERENCES [dbo].[x_portal_user] ([id])
@@ -3861,6 +3892,7 @@ INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('040',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('041',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('042',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
+INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('043',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('DB_PATCHES',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 INSERT INTO x_user_module_perm (user_id,module_id,create_time,update_time,added_by_id,upd_by_id,is_allowed) VALUES (dbo.getXportalUIdByLoginId('admin'),dbo.getModulesIdByName('Reports'),CURRENT_TIMESTAMP,CURRENT_TIMESTAMP,dbo.getXportalUIdByLoginId('admin'),dbo.getXportalUIdByLoginId('admin'),1);
 INSERT INTO x_user_module_perm (user_id,module_id,create_time,update_time,added_by_id,upd_by_id,is_allowed) VALUES (dbo.getXportalUIdByLoginId('admin'),dbo.getModulesIdByName('Resource Based Policies'),CURRENT_TIMESTAMP,CURRENT_TIMESTAMP,dbo.getXportalUIdByLoginId('admin'),dbo.getXportalUIdByLoginId('admin'),1);
diff --git a/security-admin/db/sqlserver/patches/043-add-tag-change-log-table.sql b/security-admin/db/sqlserver/patches/043-add-tag-change-log-table.sql
new file mode 100644
index 0000000..79c012d
--- /dev/null
+++ b/security-admin/db/sqlserver/patches/043-add-tag-change-log-table.sql
@@ -0,0 +1,53 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+GO
+IF (OBJECT_ID('x_tag_change_log') IS NOT NULL)
+BEGIN
+    DROP TABLE [dbo].[x_tag_change_log]
+END
+GO
+SET ANSI_NULLS ON
+GO
+SET QUOTED_IDENTIFIER ON
+GO
+SET ANSI_PADDING ON
+GO
+CREATE TABLE [dbo].[x_tag_change_log] (
+[id] [bigint] IDENTITY(1,1) NOT NULL,
+[create_time] [datetime2] DEFAULT NULL NULL,
+[service_id] [bigint] NOT NULL,
+[change_type] [int] NOT NULL,
+[service_tags_version] [bigint] DEFAULT 0 NOT NULL,
+[service_resource_id] [bigint]  DEFAULT NULL NULL,
+[tag_id] [bigint]  DEFAULT NULL NULL,
+PRIMARY KEY CLUSTERED
+(
+        [id] ASC
+)WITH (PAD_INDEX = OFF,STATISTICS_NORECOMPUTE = OFF,IGNORE_DUP_KEY = OFF,ALLOW_ROW_LOCKS = ON,ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
+) ON [PRIMARY]
+CREATE NONCLUSTERED INDEX [x_tag_change_log_IDX_service_id] ON [x_tag_change_log]
+(
+   [service_id] ASC
+)
+WITH (SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, IGNORE_DUP_KEY = OFF, ONLINE = OFF) ON [PRIMARY]
+CREATE NONCLUSTERED INDEX [x_tag_change_log_IDX_tag_version] ON [x_tag_change_log]
+(
+   [service_tags_version] ASC
+)
+
+WITH (SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, IGNORE_DUP_KEY = OFF, ONLINE = OFF) ON [PRIMARY]
+GO
+exit
\ No newline at end of file
diff --git a/security-admin/src/main/java/org/apache/ranger/biz/RangerTagDBRetriever.java b/security-admin/src/main/java/org/apache/ranger/biz/RangerTagDBRetriever.java
index 394c05c..22bea8b 100644
--- a/security-admin/src/main/java/org/apache/ranger/biz/RangerTagDBRetriever.java
+++ b/security-admin/src/main/java/org/apache/ranger/biz/RangerTagDBRetriever.java
@@ -45,9 +45,8 @@ import org.springframework.transaction.support.TransactionCallback;
 import org.springframework.transaction.support.TransactionTemplate;
 
 public class RangerTagDBRetriever {
-	static final Log LOG = LogFactory.getLog(RangerTagDBRetriever.class);
-	static final Log PERF_LOG = RangerPerfTracer.getPerfLogger("db.RangerTagDBRetriever");
-	public static final String OPTION_RANGER_FILTER_TAGS_FOR_SERVICE_PLUGIN = "ranger.filter.tags.for.service.plugin";
+	private static final Log LOG = LogFactory.getLog(RangerTagDBRetriever.class);
+	private static final Log PERF_LOG = RangerPerfTracer.getPerfLogger("db.RangerTagDBRetriever");
 
 	public static final Type subsumedDataType = new TypeToken<List<RangerTagDef.RangerTagAttributeDef>>() {}.getType();
 
@@ -80,7 +79,7 @@ public class RangerTagDBRetriever {
 			RangerPerfTracer perf = null;
 
 			if (RangerPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
-				perf = RangerPerfTracer.getPerfTracer(PERF_LOG, "RangerTagDBReceiver.getTags(serviceName=" + xService.getName());
+				perf = RangerPerfTracer.getPerfTracer(PERF_LOG, "RangerTagDBRetriever.RangerTagDBRetriever(serviceName=" + xService.getName() + ")");
 			}
 
 			if (txTemplate == null) {
diff --git a/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java b/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
index e1c4578..92436ac 100644
--- a/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
+++ b/security-admin/src/main/java/org/apache/ranger/biz/ServiceDBStore.java
@@ -63,12 +63,28 @@ import org.apache.ranger.audit.provider.MiscUtil;
 import org.apache.ranger.authorization.hadoop.config.RangerConfiguration;
 import org.apache.ranger.common.AppConstants;
 import org.apache.ranger.common.ContextUtil;
+import org.apache.ranger.common.MessageEnums;
+import org.apache.ranger.common.RangerCommonEnums;
+import org.apache.ranger.common.db.RangerTransactionSynchronizationAdapter;
+import org.apache.ranger.db.XXPolicyDao;
+import org.apache.ranger.entity.XXTagChangeLog;
+import org.apache.ranger.plugin.model.RangerRole;
+import org.apache.ranger.plugin.model.RangerSecurityZone;
+import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.plugin.model.validation.RangerServiceDefValidator;
+import org.apache.ranger.plugin.model.validation.RangerValidator;
+import org.apache.ranger.plugin.model.validation.ValidationFailureDetails;
+import org.apache.ranger.plugin.model.RangerPolicyDelta;
+import org.apache.ranger.plugin.policyengine.RangerPolicyEngine;
+import org.apache.ranger.plugin.policyresourcematcher.RangerDefaultPolicyResourceMatcher;
+import org.apache.ranger.plugin.policyresourcematcher.RangerPolicyResourceMatcher;
+import org.apache.ranger.plugin.service.RangerBaseService;
+import org.apache.ranger.plugin.store.ServiceStore;
+import org.apache.ranger.plugin.util.PasswordUtils;
 import org.apache.ranger.common.DateUtil;
 import org.apache.ranger.common.JSONUtil;
-import org.apache.ranger.common.MessageEnums;
 import org.apache.ranger.common.PropertiesUtil;
 import org.apache.ranger.common.RESTErrorUtil;
-import org.apache.ranger.common.RangerCommonEnums;
 import org.apache.ranger.common.RangerConstants;
 import org.apache.ranger.common.RangerFactory;
 import org.apache.ranger.common.RangerServicePoliciesCache;
@@ -76,7 +92,6 @@ import org.apache.ranger.common.RangerVersionInfo;
 import org.apache.ranger.common.SearchCriteria;
 import org.apache.ranger.common.StringUtil;
 import org.apache.ranger.common.UserSessionBase;
-import org.apache.ranger.common.db.RangerTransactionSynchronizationAdapter;
 import org.apache.ranger.db.RangerDaoManager;
 import org.apache.ranger.db.XXAccessTypeDefDao;
 import org.apache.ranger.db.XXAccessTypeDefGrantsDao;
@@ -85,7 +100,6 @@ import org.apache.ranger.db.XXDataMaskTypeDefDao;
 import org.apache.ranger.db.XXEnumDefDao;
 import org.apache.ranger.db.XXEnumElementDefDao;
 import org.apache.ranger.db.XXPolicyConditionDefDao;
-import org.apache.ranger.db.XXPolicyDao;
 import org.apache.ranger.db.XXPolicyLabelMapDao;
 import org.apache.ranger.db.XXResourceDefDao;
 import org.apache.ranger.db.XXServiceConfigDefDao;
@@ -125,10 +139,7 @@ import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyItemAccess;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyItemCondition;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerRowFilterPolicyItem;
-import org.apache.ranger.plugin.model.RangerPolicyDelta;
 import org.apache.ranger.plugin.model.RangerPolicyResourceSignature;
-import org.apache.ranger.plugin.model.RangerRole;
-import org.apache.ranger.plugin.model.RangerSecurityZone;
 import org.apache.ranger.plugin.model.RangerService;
 import org.apache.ranger.plugin.model.RangerServiceDef;
 import org.apache.ranger.plugin.model.RangerServiceDef.RangerAccessTypeDef;
@@ -142,19 +153,10 @@ import org.apache.ranger.plugin.model.RangerServiceDef.RangerResourceDef;
 import org.apache.ranger.plugin.model.RangerServiceDef.RangerRowFilterDef;
 import org.apache.ranger.plugin.model.RangerServiceDef.RangerServiceConfigDef;
 import org.apache.ranger.plugin.model.validation.RangerServiceDefHelper;
-import org.apache.ranger.plugin.model.validation.RangerServiceDefValidator;
-import org.apache.ranger.plugin.model.validation.RangerValidator;
-import org.apache.ranger.plugin.model.validation.ValidationFailureDetails;
-import org.apache.ranger.plugin.policyengine.RangerPolicyEngine;
-import org.apache.ranger.plugin.policyresourcematcher.RangerDefaultPolicyResourceMatcher;
-import org.apache.ranger.plugin.policyresourcematcher.RangerPolicyResourceMatcher;
-import org.apache.ranger.plugin.service.RangerBaseService;
 import org.apache.ranger.plugin.store.AbstractServiceStore;
 import org.apache.ranger.plugin.store.EmbeddedServiceDefsUtil;
 import org.apache.ranger.plugin.store.PList;
 import org.apache.ranger.plugin.store.ServicePredicateUtil;
-import org.apache.ranger.plugin.store.ServiceStore;
-import org.apache.ranger.plugin.util.PasswordUtils;
 import org.apache.ranger.plugin.util.RangerPolicyDeltaUtil;
 import org.apache.ranger.plugin.util.SearchFilter;
 import org.apache.ranger.plugin.util.ServicePolicies;
@@ -220,7 +222,7 @@ public class ServiceDBStore extends AbstractServiceStore {
     private static final String POLICY_TYPE_DATAMASK  = "Masking";
     private static final String POLICY_TYPE_ROWFILTER = "Row Level Filter";
 
-	private static       String LOCAL_HOSTNAME = "unknown";
+	private static       String LOCAL_HOSTNAME;
 	private static final String HOSTNAME       = "Host name";
 	private static final String USER_NAME      = "Exported by";
 	private static final String RANGER_VERSION = "Ranger apache version";
@@ -234,8 +236,9 @@ public class ServiceDBStore extends AbstractServiceStore {
 	public static final String  ENCRYPT_KEY     = PropertiesUtil.getProperty("ranger.password.encryption.key", PasswordUtils.DEFAULT_ENCRYPT_KEY);
 	public static final String  SALT            = PropertiesUtil.getProperty("ranger.password.salt", PasswordUtils.DEFAULT_SALT);
 	public static final Integer ITERATION_COUNT = PropertiesUtil.getIntProperty("ranger.password.iteration.count", PasswordUtils.DEFAULT_ITERATION_COUNT);
-	public static final boolean SUPPORTS_POLICY_DELTAS = RangerConfiguration.getInstance().getBoolean("ranger.admin.supports.policy.deltas", false);
-	public static final Integer RETENTION_PERIOD_IN_DAYS = RangerConfiguration.getInstance().getInt("ranger.admin.delta.retention.time.in.days", 7);
+	public static boolean SUPPORTS_POLICY_DELTAS = false;
+	public static Integer RETENTION_PERIOD_IN_DAYS = 7;
+	public static Integer TAG_RETENTION_PERIOD_IN_DAYS = 3;
 
 	static {
 		try {
@@ -364,6 +367,11 @@ public class ServiceDBStore extends AbstractServiceStore {
 						LOG.error("Could not add ranger-admin resources to RangerConfiguration.");
 					}
 
+					SUPPORTS_POLICY_DELTAS       = RangerConfiguration.getInstance().getBoolean("ranger.admin.supports.policy.deltas", false);
+					RETENTION_PERIOD_IN_DAYS     = RangerConfiguration.getInstance().getInt("ranger.admin.delta.retention.time.in.days", 7);
+					TAG_RETENTION_PERIOD_IN_DAYS = 	RangerConfiguration.getInstance().getInt("ranger.admin.tag.delta.retention.time.in.days", 3);
+
+
 					TransactionTemplate txTemplate = new TransactionTemplate(txManager);
 
 					final ServiceDBStore dbStore = this;
@@ -376,7 +384,8 @@ public class ServiceDBStore extends AbstractServiceStore {
 								EmbeddedServiceDefsUtil.instance().init(dbStore);
 								getServiceUpgraded();
 								createGenericUsers();
-								resetPolicyUpdateLog(RETENTION_PERIOD_IN_DAYS, false);
+								resetPolicyUpdateLog(RETENTION_PERIOD_IN_DAYS, RangerPolicyDelta.CHANGE_TYPE_RANGER_ADMIN_START);
+								resetTagUpdateLog(TAG_RETENTION_PERIOD_IN_DAYS, ServiceTags.TagsChangeType.RANGER_ADMIN_START);
 								//createUnzonedSecurityZone();
 								return null;
 							}
@@ -3385,20 +3394,24 @@ public class ServiceDBStore extends AbstractServiceStore {
 		transactionSynchronizationAdapter.executeOnTransactionCommit(serviceVersionUpdater);
 	}
 
-	public static void persistVersionChange(RangerDaoManager daoMgr, Long id, VERSION_TYPE versionType, String zoneName, Integer policyDeltaType, RangerPolicy policy) {
+	public static void persistVersionChange(ServiceVersionUpdater serviceVersionUpdater) {
+		RangerDaoManager daoMgr = serviceVersionUpdater.daoManager;
+		Long id = serviceVersionUpdater.serviceId;
+		VERSION_TYPE versionType = serviceVersionUpdater.versionType;
+
 		XXServiceVersionInfoDao serviceVersionInfoDao = daoMgr.getXXServiceVersionInfo();
 
 		XXServiceVersionInfo serviceVersionInfoDbObj = serviceVersionInfoDao.findByServiceId(id);
-        	XXService service = daoMgr.getXXService().getById(id);
+		XXService service = daoMgr.getXXService().getById(id);
 
 		Long nextPolicyVersion = 1L;
 		Date now = new Date();
 
-		if(serviceVersionInfoDbObj != null) {
+		if (serviceVersionInfoDbObj != null) {
 			if (versionType == VERSION_TYPE.POLICY_VERSION || versionType == VERSION_TYPE.POLICY_AND_TAG_VERSION) {
-                		nextPolicyVersion = getNextVersion(serviceVersionInfoDbObj.getPolicyVersion());
+				nextPolicyVersion = getNextVersion(serviceVersionInfoDbObj.getPolicyVersion());
 
-                		serviceVersionInfoDbObj.setPolicyVersion(nextPolicyVersion);
+				serviceVersionInfoDbObj.setPolicyVersion(nextPolicyVersion);
 				serviceVersionInfoDbObj.setPolicyUpdateTime(now);
 			}
 			if (versionType == VERSION_TYPE.TAG_VERSION || versionType == VERSION_TYPE.POLICY_AND_TAG_VERSION) {
@@ -3421,24 +3434,69 @@ public class ServiceDBStore extends AbstractServiceStore {
 			}
 		}
 
-		if (service != null && versionType != VERSION_TYPE.TAG_VERSION) {
-			// Build and save PolicyChangeLog
-			XXPolicyChangeLog policyChangeLog = new XXPolicyChangeLog();
+		if (service != null) {
+			persistChangeLog(service, versionType, versionType == VERSION_TYPE.TAG_VERSION ? serviceVersionInfoDbObj.getTagVersion() : serviceVersionInfoDbObj.getPolicyVersion(), serviceVersionUpdater);
+		}
+	}
+
+	private static void persistChangeLog(ServiceVersionUpdater serviceVersionUpdater) {
+		XXServiceVersionInfoDao serviceVersionInfoDao = serviceVersionUpdater.daoManager.getXXServiceVersionInfo();
+
+		XXServiceVersionInfo serviceVersionInfoDbObj = serviceVersionInfoDao.findByServiceId(serviceVersionUpdater.serviceId);
+		XXService service = serviceVersionUpdater.daoManager.getXXService().getById(serviceVersionUpdater.serviceId);
+
+		if (service != null && serviceVersionInfoDao != null) {
+			Long version = serviceVersionUpdater.versionType == VERSION_TYPE.TAG_VERSION ? serviceVersionInfoDbObj.getTagVersion() : serviceVersionInfoDbObj.getPolicyVersion();
+			persistChangeLog(service, serviceVersionUpdater.versionType, version, serviceVersionUpdater);
+		}
+	}
+
+	private static void persistChangeLog(XXService service, VERSION_TYPE versionType, Long version, ServiceVersionUpdater serviceVersionUpdater) {
+		Date now = new Date();
+
+		if (versionType == VERSION_TYPE.TAG_VERSION) {
+			ServiceTags.TagsChangeType tagChangeType = serviceVersionUpdater.tagChangeType;
+			if (tagChangeType == ServiceTags.TagsChangeType.RANGER_ADMIN_START || TagDBStore.isSupportsTagDeltas()) {
+				// Build and save TagChangeLog
+				XXTagChangeLog tagChangeLog = new XXTagChangeLog();
 
-			policyChangeLog.setCreateTime(now);
-			policyChangeLog.setServiceId(service.getId());
-			policyChangeLog.setChangeType(policyDeltaType);
-			policyChangeLog.setPolicyVersion(nextPolicyVersion);
-			policyChangeLog.setZoneName(zoneName);
+				Long serviceResourceId = serviceVersionUpdater.resourceId;
+				Long tagId = serviceVersionUpdater.tagId;
 
-			if (policy != null) {
-				policyChangeLog.setServiceType(policy.getServiceType());
-				policyChangeLog.setPolicyType(policy.getPolicyType());
-				policyChangeLog.setPolicyId(policy.getId());
+				tagChangeLog.setCreateTime(now);
+				tagChangeLog.setServiceId(service.getId());
+				tagChangeLog.setChangeType(tagChangeType.ordinal());
+				tagChangeLog.setServiceTagsVersion(version);
+				tagChangeLog.setServiceResourceId(serviceResourceId);
+				tagChangeLog.setTagId(tagId);
+
+				serviceVersionUpdater.daoManager.getXXTagChangeLog().create(tagChangeLog);
 			}
 
-			daoMgr.getXXPolicyChangeLog().create(policyChangeLog);
+		} else {
+			Integer policyDeltaChange = serviceVersionUpdater.policyDeltaChange;
+
+			if (policyDeltaChange == RangerPolicyDelta.CHANGE_TYPE_RANGER_ADMIN_START || isSupportsPolicyDeltas()) {
+				// Build and save PolicyChangeLog
+				XXPolicyChangeLog policyChangeLog = new XXPolicyChangeLog();
+
+				policyChangeLog.setCreateTime(now);
+				policyChangeLog.setServiceId(service.getId());
+				policyChangeLog.setChangeType(serviceVersionUpdater.policyDeltaChange);
+				policyChangeLog.setPolicyVersion(version);
+				policyChangeLog.setZoneName(serviceVersionUpdater.zoneName);
+
+				RangerPolicy policy = serviceVersionUpdater.policy;
+				if (policy != null) {
+					policyChangeLog.setServiceType(policy.getServiceType());
+					policyChangeLog.setPolicyType(policy.getPolicyType());
+					policyChangeLog.setPolicyId(policy.getId());
+				}
+
+				serviceVersionUpdater.daoManager.getXXPolicyChangeLog().create(policyChangeLog);
+			}
 		}
+
 	}
 
 
@@ -4592,24 +4650,51 @@ public class ServiceDBStore extends AbstractServiceStore {
 		xUserService.createXUserWithOutLogin(genericUser);
 	}
 
-	public void resetPolicyUpdateLog(int retentionInDays, boolean reloadServicePoliciesCache) {
+	public void resetPolicyUpdateLog(int retentionInDays, Integer policyChangeType) {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> resetPolicyUpdateLog(" + retentionInDays + ", " + reloadServicePoliciesCache + ")");
+			LOG.debug("==> resetPolicyUpdateLog(" + retentionInDays + ", " + policyChangeType + ")");
 		}
 
 		daoMgr.getXXPolicyChangeLog().deleteOlderThan(retentionInDays);
 
-		if (reloadServicePoliciesCache) {
-			List<Long> allServiceIds = daoMgr.getXXService().getAllServiceIds();
-			if (CollectionUtils.isNotEmpty(allServiceIds)) {
-				for (Long serviceId : allServiceIds) {
-					persistVersionChange(daoMgr, serviceId, VERSION_TYPE.POLICY_VERSION, null, RangerPolicyDelta.CHANGE_TYPE_RANGER_ADMIN_START, null);
+		List<Long> allServiceIds = daoMgr.getXXService().getAllServiceIds();
+		if (CollectionUtils.isNotEmpty(allServiceIds)) {
+			for (Long serviceId : allServiceIds) {
+				ServiceVersionUpdater updater = new ServiceVersionUpdater(daoMgr, serviceId, VERSION_TYPE.POLICY_VERSION, null, policyChangeType, null);
+				if (policyChangeType == RangerPolicyDelta.CHANGE_TYPE_RANGER_ADMIN_START) {
+					persistChangeLog(updater);
+				} else {
+					persistVersionChange(updater);
 				}
 			}
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== resetPolicyUpdateLog(" + retentionInDays + ", " + policyChangeType + ")");
 
 		}
+	}
+	public void resetTagUpdateLog(int retentionInDays, ServiceTags.TagsChangeType tagChangeType) {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== resetPolicyUpdateLog(" + retentionInDays + ", " + reloadServicePoliciesCache + ")");
+			LOG.debug("==> resetTagUpdateLog(" + retentionInDays + ", " + tagChangeType + ")");
+		}
+
+		daoMgr.getXXTagChangeLog().deleteOlderThan(retentionInDays);
+
+		List<Long> allServiceIds = daoMgr.getXXService().getAllServiceIds();
+		if (CollectionUtils.isNotEmpty(allServiceIds)) {
+			for (Long serviceId : allServiceIds) {
+				ServiceVersionUpdater updater = new ServiceVersionUpdater(daoMgr, serviceId, VERSION_TYPE.TAG_VERSION, tagChangeType, null, null);
+				if (tagChangeType == ServiceTags.TagsChangeType.RANGER_ADMIN_START) {
+					persistChangeLog(updater);
+				} else {
+					persistVersionChange(updater);
+				}
+			}
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== resetTagUpdateLog(" + retentionInDays + ", " + tagChangeType + ")");
 
 		}
 	}
@@ -5045,6 +5130,10 @@ public class ServiceDBStore extends AbstractServiceStore {
 		return ret;
 	}
 
+	public static boolean isSupportsPolicyDeltas() {
+		return SUPPORTS_POLICY_DELTAS;
+	}
+
 	public static class ServiceVersionUpdater implements Runnable {
 		final Long 			   serviceId;
 		final RangerDaoManager daoManager;
@@ -5053,9 +5142,9 @@ public class ServiceDBStore extends AbstractServiceStore {
 		final Integer          policyDeltaChange;
 		final RangerPolicy     policy;
 
-		public ServiceVersionUpdater(RangerDaoManager daoManager, Long serviceId, VERSION_TYPE versionType) {
-			this(daoManager, serviceId, versionType, null, null, null);
-		}
+		final ServiceTags.TagsChangeType tagChangeType;
+		final Long             resourceId;
+		final Long             tagId;
 
 		public ServiceVersionUpdater(RangerDaoManager daoManager, Long serviceId, VERSION_TYPE versionType, Integer policyDeltaType) {
 			this(daoManager, serviceId, versionType, null, policyDeltaType, null);
@@ -5068,10 +5157,26 @@ public class ServiceDBStore extends AbstractServiceStore {
 			this.policyDeltaChange = policyDeltaType;
 			this.zoneName    = zoneName;
 			this.policy      = policy;
+			this.tagChangeType = ServiceTags.TagsChangeType.NONE;
+			this.resourceId    = null;
+			this.tagId         = null;
 		}
+
+		public ServiceVersionUpdater(RangerDaoManager daoManager, Long serviceId, VERSION_TYPE versionType, ServiceTags.TagsChangeType tagChangeType, Long resourceId, Long tagId ) {
+			this.serviceId   = serviceId;
+			this.daoManager  = daoManager;
+			this.versionType = versionType;
+			this.zoneName    = null;
+			this.policyDeltaChange = null;
+			this.policy            = null;
+			this.tagChangeType = tagChangeType;
+			this.resourceId    = resourceId;
+			this.tagId         = tagId;
+		}
+
 		@Override
 		public void run() {
-			ServiceDBStore.persistVersionChange(this.daoManager, this.serviceId, this.versionType, this.zoneName, policyDeltaChange, policy);
+			ServiceDBStore.persistVersionChange(this);
 		}
 	}
 }
diff --git a/security-admin/src/main/java/org/apache/ranger/biz/TagDBStore.java b/security-admin/src/main/java/org/apache/ranger/biz/TagDBStore.java
index 3cc4765..0e33298 100644
--- a/security-admin/src/main/java/org/apache/ranger/biz/TagDBStore.java
+++ b/security-admin/src/main/java/org/apache/ranger/biz/TagDBStore.java
@@ -21,15 +21,17 @@ package org.apache.ranger.biz;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.authorization.hadoop.config.RangerConfiguration;
 import org.apache.ranger.authorization.utils.JsonUtils;
-import org.apache.ranger.common.GUIDUtil;
 import org.apache.ranger.common.MessageEnums;
 import org.apache.ranger.common.RESTErrorUtil;
 import org.apache.ranger.common.RangerAdminTagEnricher;
@@ -39,6 +41,7 @@ import org.apache.ranger.entity.XXService;
 import org.apache.ranger.entity.XXServiceResource;
 import org.apache.ranger.entity.XXServiceVersionInfo;
 import org.apache.ranger.entity.XXTag;
+import org.apache.ranger.entity.XXTagChangeLog;
 import org.apache.ranger.entity.XXTagResourceMap;
 import org.apache.ranger.plugin.model.*;
 import org.apache.ranger.plugin.model.validation.RangerValidityScheduleValidator;
@@ -46,10 +49,11 @@ import org.apache.ranger.plugin.model.validation.ValidationFailureDetails;
 import org.apache.ranger.plugin.store.AbstractTagStore;
 import org.apache.ranger.plugin.store.PList;
 import org.apache.ranger.plugin.store.RangerServiceResourceSignature;
+import org.apache.ranger.plugin.util.RangerPerfTracer;
 import org.apache.ranger.plugin.util.RangerServiceNotFoundException;
+import org.apache.ranger.plugin.util.RangerServiceTagsDeltaUtil;
 import org.apache.ranger.plugin.util.SearchFilter;
 import org.apache.ranger.plugin.util.ServiceTags;
-import org.apache.ranger.service.RangerAuditFields;
 import org.apache.ranger.service.RangerTagDefService;
 import org.apache.ranger.service.RangerTagResourceMapService;
 import org.apache.ranger.service.RangerTagService;
@@ -64,7 +68,12 @@ import javax.servlet.http.HttpServletResponse;
 
 @Component
 public class TagDBStore extends AbstractTagStore {
-	private static final Log LOG = LogFactory.getLog(TagDBStore.class);
+	private static final Log LOG      = LogFactory.getLog(TagDBStore.class);
+	private static final Log PERF_LOG = RangerPerfTracer.getPerfLogger("db.TagDBStore");
+
+
+	private static boolean SUPPORTS_TAG_DELTAS = false;
+	private static boolean IS_SUPPORTS_TAG_DELTAS_INITIALIZED = false;
 
 	@Autowired
 	RangerTagDefService rangerTagDefService;
@@ -89,17 +98,12 @@ public class TagDBStore extends AbstractTagStore {
 	RESTErrorUtil errorUtil;
 
 	@Autowired
-	RangerAuditFields rangerAuditFields;
-
-	@Autowired
-	GUIDUtil guidUtil;
-
-	@Autowired
 	RESTErrorUtil restErrorUtil;
 
 	@PostConstruct
 	public void initStore() {
 		RangerAdminTagEnricher.setTagStore(this);
+		RangerAdminTagEnricher.setDaoManager(daoManager);
 	}
 
 	@Override
@@ -910,9 +914,9 @@ public class TagDBStore extends AbstractTagStore {
 
 
 	@Override
-	public ServiceTags getServiceTagsIfUpdated(String serviceName, Long lastKnownVersion) throws Exception {
+	public ServiceTags getServiceTagsIfUpdated(String serviceName, Long lastKnownVersion, boolean needsBackwardCompatibility) throws Exception {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagDBStore.getServiceTagsIfUpdated(" + serviceName + ", " + lastKnownVersion + ")");
+			LOG.debug("==> TagDBStore.getServiceTagsIfUpdated(" + serviceName + ", " + lastKnownVersion + ", " + needsBackwardCompatibility + ")");
 		}
 
 		ServiceTags ret = null;
@@ -932,7 +936,7 @@ public class TagDBStore extends AbstractTagStore {
 		}
 
 		if (lastKnownVersion == null || serviceVersionInfoDbObj == null || serviceVersionInfoDbObj.getTagVersion() == null || !lastKnownVersion.equals(serviceVersionInfoDbObj.getTagVersion())) {
-			ret = RangerServiceTagsCache.getInstance().getServiceTags(serviceName, xxService.getId(), this);
+			ret = RangerServiceTagsCache.getInstance().getServiceTags(serviceName, xxService.getId(), lastKnownVersion, needsBackwardCompatibility, this);
 		}
 
 		if (ret != null && lastKnownVersion != null && lastKnownVersion.equals(ret.getTagVersion())) {
@@ -945,7 +949,7 @@ public class TagDBStore extends AbstractTagStore {
 		}
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== TagDBStore.getServiceTagsIfUpdated(" + serviceName + ", " + lastKnownVersion + "): count=" + ((ret == null || ret.getTags() == null) ? 0 : ret.getTags().size()));
+			LOG.debug("<== TagDBStore.getServiceTagsIfUpdated(" + serviceName + ", " + lastKnownVersion + ", " + needsBackwardCompatibility + "): count=" + ((ret == null || ret.getTags() == null) ? 0 : ret.getTags().size()));
 		}
 
 		return ret;
@@ -960,13 +964,12 @@ public class TagDBStore extends AbstractTagStore {
 	}
 
 	@Override
-	public ServiceTags getServiceTags(String serviceName) throws Exception {
-
+	public ServiceTags getServiceTags(String serviceName, Long lastKnownVersion) throws Exception {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagDBStore.getServiceTags(" + serviceName + ")");
+			LOG.debug("==> TagDBStore.getServiceTags(" + serviceName + ", " + lastKnownVersion + ")");
 		}
 
-		ServiceTags ret = null;
+		final ServiceTags ret;
 
 		XXService xxService = daoManager.getXXService().findByName(serviceName);
 
@@ -986,28 +989,63 @@ public class TagDBStore extends AbstractTagStore {
 			throw new Exception("service-def does not exist. id=" + xxService.getType());
 		}
 
-		RangerTagDBRetriever tagDBRetriever = new RangerTagDBRetriever(daoManager, txManager, xxService);
+		ServiceTags delta = getServiceTagsDelta(xxService, lastKnownVersion);
 
-		Map<Long, RangerTagDef> tagDefMap = tagDBRetriever.getTagDefs();
-		Map<Long, RangerTag> tagMap = tagDBRetriever.getTags();
-		List<RangerServiceResource> resources = tagDBRetriever.getServiceResources();
-		Map<Long, List<Long>> resourceToTagIds = tagDBRetriever.getResourceToTagIds();
+		if (delta != null) {
+			ret = delta;
+		} else {
+			RangerTagDBRetriever tagDBRetriever = new RangerTagDBRetriever(daoManager, txManager, xxService);
 
-		ret = new ServiceTags();
+			Map<Long, RangerTagDef> tagDefMap = tagDBRetriever.getTagDefs();
+			Map<Long, RangerTag> tagMap = tagDBRetriever.getTags();
+			List<RangerServiceResource> resources = tagDBRetriever.getServiceResources();
+			Map<Long, List<Long>> resourceToTagIds = tagDBRetriever.getResourceToTagIds();
 
-		ret.setServiceName(xxService.getName());
-		ret.setTagVersion(serviceVersionInfoDbObj == null ? null : serviceVersionInfoDbObj.getTagVersion());
-		ret.setTagUpdateTime(serviceVersionInfoDbObj == null ? null : serviceVersionInfoDbObj.getTagUpdateTime());
-		ret.setTagDefinitions(tagDefMap);
-		ret.setTags(tagMap);
-		ret.setServiceResources(resources);
-		ret.setResourceToTagIds(resourceToTagIds);
+			ret = new ServiceTags();
 
-		if(LOG.isDebugEnabled()) {
-			LOG.debug("<== TagDBStore.getServiceTags(" + serviceName + ")");
+			ret.setServiceName(xxService.getName());
+			ret.setTagVersion(serviceVersionInfoDbObj == null ? null : serviceVersionInfoDbObj.getTagVersion());
+			ret.setTagUpdateTime(serviceVersionInfoDbObj == null ? null : serviceVersionInfoDbObj.getTagUpdateTime());
+			ret.setTagDefinitions(tagDefMap);
+			ret.setTags(tagMap);
+			ret.setServiceResources(resources);
+			ret.setResourceToTagIds(resourceToTagIds);
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== TagDBStore.getServiceTags(" + serviceName + ", " + lastKnownVersion + ")");
 		}
+
 		return ret;
+	}
+
+	@Override
+	public ServiceTags getServiceTagsDelta(String serviceName, Long lastKnownVersion) throws Exception {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagDBStore.getServiceTagsDelta(" + serviceName + ", " + lastKnownVersion + ")");
+		}
 
+		final ServiceTags ret;
+
+		if (lastKnownVersion == -1L || !isSupportsTagDeltas()) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Returning without computing tags-deltas.., SUPPORTS_TAG_DELTAS:[" + SUPPORTS_TAG_DELTAS + "], lastKnownVersion:[" + lastKnownVersion + "]");
+			}
+			ret = null;
+		} else {
+			XXService xxService = daoManager.getXXService().findByName(serviceName);
+
+			if (xxService == null) {
+				throw new Exception("service does not exist. name=" + serviceName);
+			}
+
+			ret = getServiceTagsDelta(xxService, lastKnownVersion);
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== TagDBStore.getServiceTagsDelta(" + serviceName + ", " + lastKnownVersion + ")");
+		}
+		return ret;
 	}
 
 	@Override
@@ -1099,4 +1137,140 @@ public class TagDBStore extends AbstractTagStore {
 
 		return tag;
 	}
+
+	private ServiceTags getServiceTagsDelta(XXService xxService, Long lastKnownVersion) {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagDBStore.getServiceTagsDelta(lastKnownVersion=" + lastKnownVersion + ")");
+		}
+		ServiceTags ret = null;
+
+		if (lastKnownVersion == -1L || !isSupportsTagDeltas()) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Returning without computing tags-deltas.., SUPPORTS_TAG_DELTAS:[" + SUPPORTS_TAG_DELTAS + "], lastKnownVersion:[" + lastKnownVersion + "]");
+			}
+		} else {
+			RangerPerfTracer perf = null;
+
+			if (RangerPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+				perf = RangerPerfTracer.getPerfTracer(PERF_LOG, "TagDBStore.getServiceTagsDelta(serviceName=" + xxService.getName() + ", lastKnownVersion=" + lastKnownVersion + ")");
+			}
+
+			List<XXTagChangeLog> changeLogRecords = daoManager.getXXTagChangeLog().findLaterThan(lastKnownVersion, xxService.getId());
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Number of tag-change-log records found since " + lastKnownVersion + " :[" + (changeLogRecords == null ? 0 : changeLogRecords.size()) + "] for serviceId:[" + xxService.getId() + "]");
+			}
+
+			try {
+				ret = createServiceTagsDelta(changeLogRecords);
+				if (ret != null) {
+					ret.setServiceName(xxService.getName());
+				}
+			} catch (Exception e) {
+				LOG.error("Perhaps some tag or service-resource could not be found", e);
+			}
+
+			RangerPerfTracer.logAlways(perf);
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== TagDBStore.getServiceTagsDelta(lastKnownVersion=" + lastKnownVersion + ")");
+		}
+
+		return ret;
+	}
+
+	private ServiceTags createServiceTagsDelta(List<XXTagChangeLog> changeLogs) throws Exception {
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("==> TagDBStore.createServiceTagsDelta()");
+		}
+		ServiceTags ret = null;
+
+		if (CollectionUtils.isNotEmpty(changeLogs)) {
+
+			Map<Long, Long> tagIds = new HashMap<>();
+			Map<Long, Long> serviceResourceIds = new HashMap<>();
+
+			for (XXTagChangeLog record : changeLogs) {
+				if (record.getChangeType().equals(ServiceTags.TagsChangeType.TAG_UPDATE.ordinal())) {
+					tagIds.put(record.getTagId(), record.getTagId());
+				} else if (record.getChangeType().equals(ServiceTags.TagsChangeType.SERVICE_RESOURCE_UPDATE.ordinal())) {
+					serviceResourceIds.put(record.getServiceResourceId(), record.getServiceResourceId());
+				} else if (record.getChangeType().equals(ServiceTags.TagsChangeType.TAG_RESOURCE_MAP_UPDATE.ordinal())) {
+					tagIds.put(record.getTagId(), record.getTagId());
+					serviceResourceIds.put(record.getServiceResourceId(), record.getServiceResourceId());
+				} else {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Unknown changeType in tag-change-log record: [" + record + "]");
+						LOG.debug("Returning without further processing");
+						tagIds.clear();
+						serviceResourceIds.clear();
+						break;
+					}
+				}
+			}
+
+			if (MapUtils.isNotEmpty(serviceResourceIds) || MapUtils.isNotEmpty(tagIds)) {
+				ret = new ServiceTags();
+				ret.setIsDelta(true);
+
+				ServiceTags.TagsChangeExtent tagsChangeExtent = ServiceTags.TagsChangeExtent.TAGS;
+
+				ret.setTagVersion(changeLogs.get(changeLogs.size() - 1).getServiceTagsVersion());
+
+				for (Long tagId : tagIds.keySet()) {
+					// Check if tagId is part of any resource->id mapping
+					final RangerTag tag;
+					if (CollectionUtils.isNotEmpty(rangerTagResourceMapService.getByTagId(tagId))) {
+						tag = rangerTagService.read(tagId); // This should not throw exception
+					} else {
+						tag = new RangerTag();
+						tag.setId(tagId);
+					}
+					ret.getTags().put(tag.getId(), tag);
+				}
+
+				if (MapUtils.isNotEmpty(serviceResourceIds)) {
+
+					for (Long serviceResourceId : serviceResourceIds.keySet()) {
+						// Check if serviceResourceId is part of any resource->id mapping
+						List<Long> mappings = rangerTagResourceMapService.getTagIdsForResourceId(serviceResourceId);
+
+						final RangerServiceResource serviceResource;
+						if (CollectionUtils.isNotEmpty(mappings)) {
+							serviceResource = rangerServiceResourceService.read(serviceResourceId); // This should not throw exception
+							ret.getResourceToTagIds().put(serviceResourceId, mappings);
+						} else {
+							serviceResource = new RangerServiceResource();
+							serviceResource.setId(serviceResourceId);
+						}
+
+						ret.getServiceResources().add(serviceResource);
+						tagsChangeExtent = ServiceTags.TagsChangeExtent.SERVICE_RESOURCE;
+					}
+				}
+				ret.setTagsChangeExtent(tagsChangeExtent);
+
+				RangerServiceTagsDeltaUtil.pruneUnusedAttributes(ret);
+			}
+		} else {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("No tag-change-log records provided to createServiceTagsDelta()");
+			}
+		}
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("<== TagDBStore.createServiceTagsDelta() : serviceTagsDelta={" + ret + "}");
+		}
+
+		return ret;
+	}
+
+	public static boolean isSupportsTagDeltas() {
+        if (!IS_SUPPORTS_TAG_DELTAS_INITIALIZED) {
+            SUPPORTS_TAG_DELTAS = RangerConfiguration.getInstance().getBoolean("ranger.admin.supports.tag.deltas", false);
+            IS_SUPPORTS_TAG_DELTAS_INITIALIZED = true;
+        }
+        return SUPPORTS_TAG_DELTAS;
+    }
 }
diff --git a/security-admin/src/main/java/org/apache/ranger/common/RangerAdminTagEnricher.java b/security-admin/src/main/java/org/apache/ranger/common/RangerAdminTagEnricher.java
index f81184d..ad8d43b 100644
--- a/security-admin/src/main/java/org/apache/ranger/common/RangerAdminTagEnricher.java
+++ b/security-admin/src/main/java/org/apache/ranger/common/RangerAdminTagEnricher.java
@@ -19,6 +19,9 @@
 
 package org.apache.ranger.common;
 
+import org.apache.ranger.authorization.hadoop.config.RangerConfiguration;
+import org.apache.ranger.db.RangerDaoManager;
+import org.apache.ranger.entity.XXServiceVersionInfo;
 import org.apache.ranger.plugin.contextenricher.RangerTagEnricher;
 
 import org.apache.commons.logging.Log;
@@ -32,12 +35,19 @@ import org.apache.ranger.plugin.util.ServiceTags;
 public class RangerAdminTagEnricher extends RangerTagEnricher {
     private static final Log LOG = LogFactory.getLog(RangerAdminTagEnricher.class);
 
-    private static TagStore tagStore = null;
+    private static TagStore         tagStore   = null;
+    private static RangerDaoManager daoManager = null;
+
+    private static boolean ADMIN_TAG_ENRICHER_SUPPORTS_TAG_DELTAS;
 
     private Long serviceId;
 
     public static void setTagStore(TagStore tagStore) {
-        RangerAdminTagEnricher.tagStore = tagStore;
+        RangerAdminTagEnricher.tagStore   = tagStore;
+    }
+
+    public static void setDaoManager(RangerDaoManager daoManager) {
+        RangerAdminTagEnricher.daoManager = daoManager;
     }
 
     @Override
@@ -47,6 +57,8 @@ public class RangerAdminTagEnricher extends RangerTagEnricher {
         }
         super.init();
 
+        ADMIN_TAG_ENRICHER_SUPPORTS_TAG_DELTAS = RangerConfiguration.getInstance().getBoolean("ranger.admin.tag.enricher.supports.tag.deltas", true);
+
         ServiceStore svcStore = tagStore != null ? tagStore.getServiceStore() : null;
 
         if (tagStore == null || svcStore == null) {
@@ -80,21 +92,56 @@ public class RangerAdminTagEnricher extends RangerTagEnricher {
     }
 
     private void refreshTagsIfNeeded() {
-        ServiceTags serviceTags = null;
+
+        final Long        enrichedServiceTagsVersion = getServiceTagsVersion();
+        final Long        resourceTrieVersion        = getResourceTrieVersion();
+        ServiceTags       serviceTags                = null;
+
         try {
-            serviceTags = RangerServiceTagsCache.getInstance().getServiceTags(serviceName, serviceId, tagStore);
+
+            boolean needsBackwardCompatibility = !ADMIN_TAG_ENRICHER_SUPPORTS_TAG_DELTAS || enrichedServiceTagsVersion == -1L;
+
+            XXServiceVersionInfo serviceVersionInfoDbObj = daoManager.getXXServiceVersionInfo().findByServiceName(serviceName);
+
+            if (serviceVersionInfoDbObj == null) {
+                LOG.warn("serviceVersionInfo does not exist. name=" + serviceName);
+            }
+
+            if (serviceVersionInfoDbObj == null || serviceVersionInfoDbObj.getTagVersion() == null || !enrichedServiceTagsVersion.equals(serviceVersionInfoDbObj.getTagVersion())) {
+                serviceTags = RangerServiceTagsCache.getInstance().getServiceTags(serviceName, serviceId, enrichedServiceTagsVersion, needsBackwardCompatibility, tagStore);
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Have the latest tag version already. Only need to check if it needs to be rebuilt");
+                }
+                if (!enrichedServiceTagsVersion.equals(resourceTrieVersion)) {
+                    serviceTags = RangerServiceTagsCache.getInstance().getServiceTags(serviceName, serviceId, resourceTrieVersion, needsBackwardCompatibility, tagStore);
+                }
+            }
+
         } catch (Exception e) {
             LOG.error("Could not get cached service-tags, continue to use old ones..", e);
+            serviceTags = null;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Received serviceTags:[" + serviceTags + "]");
         }
 
         if (serviceTags != null) {
-            Long enrichedServiceTagsVersion = getServiceTagsVersion();
 
-            if (enrichedServiceTagsVersion == null || !enrichedServiceTagsVersion.equals(serviceTags.getTagVersion())) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("enrichedServiceTagsVersion=" + enrichedServiceTagsVersion + ", serviceTags-version=" + serviceTags.getTagVersion());
+            }
+
+            if (!enrichedServiceTagsVersion.equals(serviceTags.getTagVersion()) || !resourceTrieVersion.equals(serviceTags.getTagVersion())) {
+
                 synchronized(this) {
-                    enrichedServiceTagsVersion = getServiceTagsVersion();
 
-                    if (enrichedServiceTagsVersion == null || !enrichedServiceTagsVersion.equals(serviceTags.getTagVersion())) {
+                    if (serviceTags.getIsDelta()) {
+                        // Avoid rebuilding service-tags - applyDelta may not work correctly if called twice
+                        boolean rebuildOnlyIndex = true;
+                        setServiceTags(serviceTags, rebuildOnlyIndex);
+                    } else {
                         setServiceTags(serviceTags);
                     }
                 }
diff --git a/security-admin/src/main/java/org/apache/ranger/common/RangerServiceTagsCache.java b/security-admin/src/main/java/org/apache/ranger/common/RangerServiceTagsCache.java
index 802f42a..22a92b2 100644
--- a/security-admin/src/main/java/org/apache/ranger/common/RangerServiceTagsCache.java
+++ b/security-admin/src/main/java/org/apache/ranger/common/RangerServiceTagsCache.java
@@ -22,12 +22,11 @@ package org.apache.ranger.common;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.ranger.authorization.hadoop.config.RangerConfiguration;
-import org.apache.ranger.plugin.model.RangerServiceResource;
-import org.apache.ranger.plugin.model.RangerTag;
 import org.apache.ranger.plugin.store.TagStore;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.plugin.util.RangerServiceTagsDeltaUtil;
 import org.apache.ranger.plugin.util.ServiceTags;
 
 import java.util.Date;
@@ -46,7 +45,7 @@ public class RangerServiceTagsCache {
 	private final boolean useServiceTagsCache;
 	private final int waitTimeInSeconds;
 
-	private final Map<String, ServiceTagsWrapper> serviceTagsMap = new HashMap<String, ServiceTagsWrapper>();
+	private final Map<String, ServiceTagsWrapper> serviceTagsMap = new HashMap<>();
 
 	public static RangerServiceTagsCache getInstance() {
 		if (sInstance == null) {
@@ -67,31 +66,29 @@ public class RangerServiceTagsCache {
 	public void dump() {
 
 		if (useServiceTagsCache) {
-			Set<String> serviceNames = null;
+			final Set<String> serviceNames;
 
 			synchronized (this) {
 				serviceNames = serviceTagsMap.keySet();
 			}
 
 			if (CollectionUtils.isNotEmpty(serviceNames)) {
-				ServiceTagsWrapper cachedServiceTagsWrapper = null;
+				ServiceTagsWrapper cachedServiceTagsWrapper;
 
 				for (String serviceName : serviceNames) {
 					synchronized (this) {
 						cachedServiceTagsWrapper = serviceTagsMap.get(serviceName);
 					}
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("serviceName:" + serviceName + ", Cached-MetaData:" + cachedServiceTagsWrapper);
-					}
+					LOG.debug("serviceName:" + serviceName + ", Cached-MetaData:" + cachedServiceTagsWrapper);
 				}
 			}
 		}
 	}
 
-	public ServiceTags getServiceTags(String serviceName, Long serviceId, TagStore tagStore) throws Exception {
+	public ServiceTags getServiceTags(String serviceName, Long serviceId, Long lastKnownVersion, boolean needsBackwardCompatibility, TagStore tagStore) throws Exception {
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> RangerServiceTagsCache.getServiceTags(" + serviceName + ", " + serviceId + ")");
+			LOG.debug("==> RangerServiceTagsCache.getServiceTags(" + serviceName + ", " + serviceId + ", " + lastKnownVersion + ", " + needsBackwardCompatibility + ")");
 		}
 
 		ServiceTags ret = null;
@@ -102,12 +99,10 @@ public class RangerServiceTagsCache {
 				LOG.debug("useServiceTagsCache=" + useServiceTagsCache);
 			}
 
-			ServiceTags serviceTags = null;
-
 			if (!useServiceTagsCache) {
 				if (tagStore != null) {
 					try {
-						serviceTags = tagStore.getServiceTags(serviceName);
+						ret = tagStore.getServiceTags(serviceName, -1L);
 					} catch (Exception exception) {
 						LOG.error("getServiceTags(" + serviceName + "): failed to get latest tags from tag-store", exception);
 					}
@@ -115,7 +110,7 @@ public class RangerServiceTagsCache {
 					LOG.error("getServiceTags(" + serviceName + "): failed to get latest tags as tag-store is null!");
 				}
 			} else {
-				ServiceTagsWrapper serviceTagsWrapper = null;
+				ServiceTagsWrapper serviceTagsWrapper;
 
 				synchronized (this) {
 					serviceTagsWrapper = serviceTagsMap.get(serviceName);
@@ -138,26 +133,19 @@ public class RangerServiceTagsCache {
 				}
 
 				if (tagStore != null) {
-					boolean refreshed = serviceTagsWrapper.getLatestOrCached(serviceName, tagStore);
-
-					if(LOG.isDebugEnabled()) {
-						LOG.debug("getLatestOrCached returned " + refreshed);
-					}
+					ret = serviceTagsWrapper.getLatestOrCached(serviceName, tagStore, lastKnownVersion, needsBackwardCompatibility);
 				} else {
 					LOG.error("getServiceTags(" + serviceName + "): failed to get latest tags as tag-store is null!");
+					ret = serviceTagsWrapper.getServiceTags();
 				}
 
-				serviceTags = serviceTagsWrapper.getServiceTags();
 			}
-
-			ret = serviceTags;
-
 		} else {
 			LOG.error("getServiceTags() failed to get tags as serviceName is null or blank and/or serviceId is null!");
 		}
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== RangerServiceTagsCache.getServiceTags(" + serviceName + ", " + serviceId + "): count=" + ((ret == null || ret.getTags() == null) ? 0 : ret.getTags().size()));
+			LOG.debug("<== RangerServiceTagsCache.getServiceTags(" + serviceName + ", " + serviceId + ", " + lastKnownVersion + ", " + needsBackwardCompatibility + "): count=" + ((ret == null || ret.getTags() == null) ? 0 : ret.getTags().size()));
 		}
 
 		return ret;
@@ -169,6 +157,20 @@ public class RangerServiceTagsCache {
 		Date updateTime = null;
 		long longestDbLoadTimeInMs = -1;
 
+		ServiceTagsDeltasCache deltaCache;
+
+		class ServiceTagsDeltasCache {
+			final long        		fromVersion;
+			final ServiceTags 		serviceTagsDelta;
+
+			ServiceTagsDeltasCache(final long fromVersion, ServiceTags serviceTagsDelta) {
+				this.fromVersion         = fromVersion;
+				this.serviceTagsDelta    = serviceTagsDelta;
+			}
+			ServiceTags getServiceTagsDeltaFromVersion(long fromVersion) {
+				return this.fromVersion == fromVersion ? this.serviceTagsDelta : null;
+			}
+		}
 		ReentrantLock lock = new ReentrantLock();
 
 		ServiceTagsWrapper(Long serviceId) {
@@ -186,50 +188,99 @@ public class RangerServiceTagsCache {
 			return updateTime;
 		}
 
-		long getLongestDbLoadTimeInMs() {
-			return longestDbLoadTimeInMs;
-		}
-
-		boolean getLatestOrCached(String serviceName, TagStore tagStore) throws Exception {
-			boolean ret = false;
+		ServiceTags getLatestOrCached(String serviceName, TagStore tagStore, Long lastKnownVersion, boolean needsBackwardCompatibility) throws Exception {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("==> RangerServiceTagsCache.getLatestOrCached(lastKnownVersion=" + lastKnownVersion + ", " + needsBackwardCompatibility + ")");
+			}
+			ServiceTags	ret		   = null;
+			boolean		lockResult = false;
 
 			try {
-				ret = lock.tryLock(waitTimeInSeconds, TimeUnit.SECONDS);
-				if (ret) {
-					getLatest(serviceName, tagStore);
+				final boolean isCacheCompletelyLoaded;
+
+				lockResult = lock.tryLock(waitTimeInSeconds, TimeUnit.SECONDS);
+				if (lockResult) {
+
+					isCacheCompletelyLoaded = getLatest(serviceName, tagStore);
+
+					if (isCacheCompletelyLoaded) {
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("ServiceTags cache was completely loaded from database ");
+						}
+					}
+					if (needsBackwardCompatibility || isCacheCompletelyLoaded
+							|| lastKnownVersion == -1L || lastKnownVersion.equals(serviceTags.getTagVersion())) {
+						// Looking for all tags, or Some disqualifying change encountered
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("Need to return all cached ServiceTags: [needsBackwardCompatibility:" + needsBackwardCompatibility + ", isCacheCompletelyLoaded:" + isCacheCompletelyLoaded + ", lastKnownVersion:" + lastKnownVersion + ", serviceTagsVersion:" + serviceTags.getTagVersion() + "]");
+						}
+						ret = this.serviceTags;
+					} else {
+						boolean isDeltaCacheReinitialized = false;
+						ServiceTags serviceTagsDelta = this.deltaCache != null ? this.deltaCache.getServiceTagsDeltaFromVersion(lastKnownVersion) : null;
+
+						if (serviceTagsDelta == null) {
+							serviceTagsDelta = tagStore.getServiceTagsDelta(serviceName, lastKnownVersion);
+							isDeltaCacheReinitialized = true;
+						}
+						if (serviceTagsDelta != null) {
+							if (LOG.isDebugEnabled()) {
+								LOG.debug("Deltas were requested. Returning deltas from lastKnownVersion:[" + lastKnownVersion + "]");
+							}
+							if (isDeltaCacheReinitialized) {
+								this.deltaCache = new ServiceTagsDeltasCache(lastKnownVersion, serviceTagsDelta);
+							}
+							ret = serviceTagsDelta;
+						} else {
+							LOG.warn("Deltas were requested, but could not get them!! lastKnownVersion:[" + lastKnownVersion + "]; Returning cached ServiceTags:[" + (serviceTags != null ? serviceTags.getTagVersion() : -1L) + "]");
+
+							this.deltaCache = null;
+							ret = this.serviceTags;
+						}
+					}
+				} else {
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Could not get lock in [" + waitTimeInSeconds + "] seconds, returning cached ServiceTags");
+					}
+					ret = this.serviceTags;
 				}
 			} catch (InterruptedException exception) {
 				LOG.error("getLatestOrCached:lock got interrupted..", exception);
 			} finally {
-				if (ret) {
+				if (lockResult) {
 					lock.unlock();
 				}
 			}
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("<== RangerServiceTagsCache.getLatestOrCached(lastKnownVersion=" + lastKnownVersion + ", " + needsBackwardCompatibility + "): " + ret);
+			}
 
 			return ret;
 		}
 
-		void getLatest(String serviceName, TagStore tagStore) throws Exception {
-
+		boolean getLatest(String serviceName, TagStore tagStore) throws Exception {
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("==> ServiceTagsWrapper.getLatest(" + serviceName + ")");
 			}
 
+			boolean isCacheCompletelyLoaded = false;
+
+			final Long cachedServiceTagsVersion = serviceTags != null ? serviceTags.getTagVersion() : -1L;
+
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Found ServiceTags in-cache : " + (serviceTags != null));
 			}
 
 			Long tagVersionInDb = tagStore.getTagVersion(serviceName);
 
-
-			if (serviceTags == null || tagVersionInDb == null || !tagVersionInDb.equals(serviceTags.getTagVersion())) {
+			if (serviceTags == null || tagVersionInDb == null || !tagVersionInDb.equals(cachedServiceTagsVersion)) {
 				if (LOG.isDebugEnabled()) {
-					LOG.debug("loading serviceTags from db ... cachedServiceTagsVersion=" + (serviceTags != null ? serviceTags.getTagVersion() : null) + ", tagVersionInDb=" + tagVersionInDb);
+					LOG.debug("loading serviceTags from db ... cachedServiceTagsVersion=" + cachedServiceTagsVersion + ", tagVersionInDb=" + tagVersionInDb);
 				}
 
 				long startTimeMs = System.currentTimeMillis();
 
-				ServiceTags serviceTagsFromDb = tagStore.getServiceTags(serviceName);
+				ServiceTags serviceTagsFromDb = tagStore.getServiceTags(serviceName, cachedServiceTagsVersion);
 
 				long dbLoadTime = System.currentTimeMillis() - startTimeMs;
 
@@ -239,46 +290,52 @@ public class RangerServiceTagsCache {
 				updateTime = new Date();
 
 				if (serviceTagsFromDb != null) {
-					if (serviceTagsFromDb.getTagVersion() == null) {
-						serviceTagsFromDb.setTagVersion(0L);
+					if (serviceTags == null) {
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("Initializing ServiceTags cache for the first time");
+						}
+						serviceTags = serviceTagsFromDb;
+						this.deltaCache = null;
+						pruneUnusedAttributes();
+						isCacheCompletelyLoaded = true;
+					} else if (!serviceTagsFromDb.getIsDelta()) {
+						// service-tags are loaded because of some disqualifying event
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("Complete set of tag are loaded from database, because of some disqualifying event or because tag-delta is not supported");
+						}
+						serviceTags = serviceTagsFromDb;
+						this.deltaCache = null;
+						pruneUnusedAttributes();
+						isCacheCompletelyLoaded = true;
+					} else { // Previously cached service tags are still valid - no disqualifying change
+						// Rebuild tags cache from original tags and deltas
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("Retrieved tag-deltas from database. These will be applied on top of ServiceTags version:[" + cachedServiceTagsVersion + "], tag-deltas:[" + serviceTagsFromDb.getTagVersion() + "]");
+						}
+						RangerServiceTagsDeltaUtil.applyDelta(serviceTags, serviceTagsFromDb);
+						this.deltaCache = new ServiceTagsDeltasCache(cachedServiceTagsVersion, serviceTagsFromDb);
 					}
-					serviceTags = serviceTagsFromDb;
-					pruneUnusedAttributes();
+				} else {
+					LOG.error("Could not get tags from database, from-version:[" + cachedServiceTagsVersion + ")");
+				}
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("ServiceTags old-version:[" + cachedServiceTagsVersion + "], new-version:[" + serviceTags.getTagVersion() + "]");
+				}
+			} else {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("ServiceTags Cache already has the latest version, version:[" + cachedServiceTagsVersion + "]");
 				}
 			}
 
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("<== ServiceTagsWrapper.getLatest(" + serviceName + ")");
+				LOG.debug("<== ServiceTagsWrapper.getLatest(" + serviceName + "): " + isCacheCompletelyLoaded);
 			}
+
+			return isCacheCompletelyLoaded;
 		}
 
 		private void pruneUnusedAttributes() {
-			if (serviceTags != null) {
-				serviceTags.setOp(null);
-				serviceTags.setTagUpdateTime(null);
-
-				serviceTags.setTagDefinitions(null);
-
-				for (Map.Entry<Long, RangerTag> entry : serviceTags.getTags().entrySet()) {
-					RangerTag tag = entry.getValue();
-					tag.setCreatedBy(null);
-					tag.setCreateTime(null);
-					tag.setUpdatedBy(null);
-					tag.setUpdateTime(null);
-					tag.setGuid(null);
-				}
-
-				for (RangerServiceResource serviceResource : serviceTags.getServiceResources()) {
-					serviceResource.setCreatedBy(null);
-					serviceResource.setCreateTime(null);
-					serviceResource.setUpdatedBy(null);
-					serviceResource.setUpdateTime(null);
-					serviceResource.setGuid(null);
-
-					serviceResource.setServiceName(null);
-					serviceResource.setResourceSignature(null);
-				}
-			}
+			RangerServiceTagsDeltaUtil.pruneUnusedAttributes(this.serviceTags);
 		}
 
 		StringBuilder toString(StringBuilder sb) {
diff --git a/security-admin/src/main/java/org/apache/ranger/db/RangerDaoManagerBase.java b/security-admin/src/main/java/org/apache/ranger/db/RangerDaoManagerBase.java
index e4e335f..74c3dd0 100644
--- a/security-admin/src/main/java/org/apache/ranger/db/RangerDaoManagerBase.java
+++ b/security-admin/src/main/java/org/apache/ranger/db/RangerDaoManagerBase.java
@@ -293,7 +293,7 @@ public abstract class RangerDaoManagerBase {
 
 	public XXSecurityZoneRefServiceDao getXXSecurityZoneRefService() { return new XXSecurityZoneRefServiceDao(this); }
 
-        public XXSecurityZoneRefTagServiceDao getXXSecurityZoneRefTagService() { return new XXSecurityZoneRefTagServiceDao(this); }
+	public XXSecurityZoneRefTagServiceDao getXXSecurityZoneRefTagService() { return new XXSecurityZoneRefTagServiceDao(this); }
 
 	public XXSecurityZoneRefResourceDao getXXSecurityZoneRefResource() { return new XXSecurityZoneRefResourceDao(this); }
 
@@ -315,5 +315,8 @@ public abstract class RangerDaoManagerBase {
 
 	public XXRoleRefRoleDao getXXRoleRefRole() { return new XXRoleRefRoleDao(this); }
 
+	public XXTagChangeLogDao getXXTagChangeLog() { return new XXTagChangeLogDao(this); }
+
+
 }
 
diff --git a/security-admin/src/main/java/org/apache/ranger/db/XXPolicyChangeLogDao.java b/security-admin/src/main/java/org/apache/ranger/db/XXPolicyChangeLogDao.java
index ef436a0..7055391 100644
--- a/security-admin/src/main/java/org/apache/ranger/db/XXPolicyChangeLogDao.java
+++ b/security-admin/src/main/java/org/apache/ranger/db/XXPolicyChangeLogDao.java
@@ -19,6 +19,7 @@ package org.apache.ranger.db;
 
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -55,12 +56,23 @@ public class XXPolicyChangeLogDao extends BaseDao<XXPolicyChangeLog> {
                     .setParameter("version", version)
                     .setParameter("serviceId", serviceId)
                     .getResultList();
-            // Ensure that the first record has the same version as the base-version from where the records are fetched
+
+            // Ensure that some record has the same version as the base-version from where the records are fetched
             if (CollectionUtils.isNotEmpty(logs)) {
-                Object[] firstRecord = logs.get(0);
-                Long versionOfFirstRecord = (Long) firstRecord[2];
-                if (version.equals(versionOfFirstRecord)) {
-                    logs.remove(0);
+                Iterator<Object[]> iter = logs.iterator();
+                boolean foundAndRemoved = false;
+
+                while (iter.hasNext()) {
+                    Object[] record = iter.next();
+                    Long recordVersion = (Long) record[2];
+                    if (version.equals(recordVersion)) {
+                        iter.remove();
+                        foundAndRemoved = true;
+                    } else {
+                        break;
+                    }
+                }
+                if (foundAndRemoved) {
                     ret = convert(policyService, logs);
                 } else {
                     ret = null;
@@ -111,10 +123,11 @@ public class XXPolicyChangeLogDao extends BaseDao<XXPolicyChangeLog> {
             for (Object[] log : queryResult) {
 
                 RangerPolicy policy;
-                Long logRecordId = (Long) log[0];
+
+                Long    logRecordId      = (Long) log[0];
                 Integer policyChangeType = (Integer) log[1];
-                String serviceType = (String) log[3];
-                Long policyId = (Long) log[5];
+                String  serviceType      = (String) log[3];
+                Long    policyId         = (Long) log[5];
 
                 if (policyId != null) {
                     XXPolicy xxPolicy = daoManager.getXXPolicy().getById(policyId);
diff --git a/security-admin/src/main/java/org/apache/ranger/db/XXServiceVersionInfoDao.java b/security-admin/src/main/java/org/apache/ranger/db/XXServiceVersionInfoDao.java
index 8081703..9f6f024 100644
--- a/security-admin/src/main/java/org/apache/ranger/db/XXServiceVersionInfoDao.java
+++ b/security-admin/src/main/java/org/apache/ranger/db/XXServiceVersionInfoDao.java
@@ -17,7 +17,6 @@
 
 package org.apache.ranger.db;
 
-import java.util.Date;
 import java.util.List;
 
 import javax.persistence.NoResultException;
@@ -26,6 +25,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.ranger.biz.ServiceDBStore;
 import org.apache.ranger.common.db.BaseDao;
 import org.apache.ranger.entity.XXServiceVersionInfo;
+import org.apache.ranger.plugin.util.ServiceTags;
 import org.springframework.stereotype.Service;
 
 /**
@@ -72,65 +72,91 @@ public class XXServiceVersionInfoDao extends BaseDao<XXServiceVersionInfo> {
 				.getResultList();
 	}
 
-	public void updateServiceVersionInfoForServiceResourceUpdate(Long resourceId, Date updateTime) {
-		if (resourceId == null) {
+	public void updateServiceVersionInfoForTagResourceMapCreate(Long resourceId, Long tagId) {
+		if (resourceId == null || tagId == null) {
 			return;
 		}
 
 		try {
 			List<XXServiceVersionInfo> serviceVersionInfos = getEntityManager().createNamedQuery("XXServiceVersionInfo.findByServiceResourceId", tClass).setParameter("resourceId", resourceId).getResultList();
 
-			updateTagVersionAndTagUpdateTime(serviceVersionInfos, updateTime);
+			updateTagVersionAndTagUpdateTime(serviceVersionInfos, resourceId, tagId);
 		} catch (NoResultException e) {
 			return;
 		}
 	}
 
-	public void updateServiceVersionInfoForTagUpdate(Long tagId, Date updateTime) {
-		if (tagId == null) {
+	public void updateServiceVersionInfoForTagResourceMapDelete(Long resourceId, Long tagId) {
+		if (resourceId == null || tagId == null) {
 			return;
 		}
 
 		try {
-			List<XXServiceVersionInfo> serviceVersionInfos = getEntityManager().createNamedQuery("XXServiceVersionInfo.findByTagId", tClass).setParameter("tagId", tagId).getResultList();
+			List<XXServiceVersionInfo> serviceVersionInfos = getEntityManager().createNamedQuery("XXServiceVersionInfo.findByServiceResourceId", tClass).setParameter("resourceId", resourceId).getResultList();
 
-			updateTagVersionAndTagUpdateTime(serviceVersionInfos, updateTime);
+			updateTagVersionAndTagUpdateTime(serviceVersionInfos, resourceId, tagId);
 		} catch (NoResultException e) {
 			return;
 		}
 	}
-
-	public void updateServiceVersionInfoForTagDefUpdate(Long tagDefId, Date updateTime) {
-		if (tagDefId == null) {
+	public void updateServiceVersionInfoForServiceResourceUpdate(Long resourceId) {
+		if (resourceId == null) {
 			return;
 		}
 
+		Long tagId = null;
+
 		try {
-			List<XXServiceVersionInfo> serviceVersionInfos = getEntityManager().createNamedQuery("XXServiceVersionInfo.findByTagDefId", tClass).setParameter("tagDefId", tagDefId).getResultList();
+			List<XXServiceVersionInfo> serviceVersionInfos = getEntityManager().createNamedQuery("XXServiceVersionInfo.findByServiceResourceId", tClass).setParameter("resourceId", resourceId).getResultList();
 
-			updateTagVersionAndTagUpdateTime(serviceVersionInfos, updateTime);
+			updateTagVersionAndTagUpdateTime(serviceVersionInfos, resourceId, tagId);
 		} catch (NoResultException e) {
 			return;
 		}
 	}
 
-	private void updateTagVersionAndTagUpdateTime(List<XXServiceVersionInfo> serviceVersionInfos, Date updateTime) {
-		if(CollectionUtils.isEmpty(serviceVersionInfos)) {
+	public void updateServiceVersionInfoForTagUpdate(Long tagId) {
+		if (tagId == null) {
+			return;
+		}
+
+		Long resourceId = null;
+		try {
+			List<XXServiceVersionInfo> serviceVersionInfos = getEntityManager().createNamedQuery("XXServiceVersionInfo.findByTagId", tClass).setParameter("tagId", tagId).getResultList();
+
+			updateTagVersionAndTagUpdateTime(serviceVersionInfos, resourceId, tagId);
+		} catch (NoResultException e) {
 			return;
 		}
+	}
 
-		if(updateTime == null) {
-			updateTime = new Date();
+	public void updateServiceVersionInfoForTagDefUpdate(Long tagDefId) {
+		if (tagDefId != null) {
+			return;
 		}
+	}
+
+	private void updateTagVersionAndTagUpdateTime(List<XXServiceVersionInfo> serviceVersionInfos, Long resourceId, Long tagId) {
+
+		if(CollectionUtils.isNotEmpty(serviceVersionInfos) || (resourceId == null && tagId == null)) {
+
+			for (XXServiceVersionInfo serviceVersionInfo : serviceVersionInfos) {
 
-		for(XXServiceVersionInfo serviceVersionInfo : serviceVersionInfos) {
-			final RangerDaoManager finaldaoManager 		  = daoManager;
-			final Long 		       finalServiceId  		  = serviceVersionInfo.getServiceId();
-			final ServiceDBStore.VERSION_TYPE versionType = ServiceDBStore.VERSION_TYPE.TAG_VERSION;
+				final Long                        serviceId   = serviceVersionInfo.getServiceId();
+				final ServiceDBStore.VERSION_TYPE versionType = ServiceDBStore.VERSION_TYPE.TAG_VERSION;
+				final ServiceTags.TagsChangeType  tagChangeType;
 
-			Runnable serviceVersionUpdater = new ServiceDBStore.ServiceVersionUpdater(finaldaoManager, finalServiceId, versionType);
+				if (tagId == null) {
+					tagChangeType = ServiceTags.TagsChangeType.SERVICE_RESOURCE_UPDATE;
+				} else if (resourceId == null) {
+					tagChangeType = ServiceTags.TagsChangeType.TAG_UPDATE;
+				} else {
+					tagChangeType = ServiceTags.TagsChangeType.TAG_RESOURCE_MAP_UPDATE;
+				}
 
-			daoManager.getRangerTransactionSynchronizationAdapter().executeOnTransactionCommit(serviceVersionUpdater);
+				final Runnable serviceVersionUpdater = new ServiceDBStore.ServiceVersionUpdater(daoManager, serviceId, versionType, tagChangeType, resourceId, tagId);
+				daoManager.getRangerTransactionSynchronizationAdapter().executeOnTransactionCommit(serviceVersionUpdater);
+			}
 		}
 
 	}
diff --git a/security-admin/src/main/java/org/apache/ranger/db/XXTagChangeLogDao.java b/security-admin/src/main/java/org/apache/ranger/db/XXTagChangeLogDao.java
new file mode 100644
index 0000000..ef65e9a
--- /dev/null
+++ b/security-admin/src/main/java/org/apache/ranger/db/XXTagChangeLogDao.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.db;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.common.db.BaseDao;
+import org.apache.ranger.entity.XXTagChangeLog;
+import org.springframework.stereotype.Service;
+
+/**
+ */
+@Service
+public class XXTagChangeLogDao extends BaseDao<XXTagChangeLog> {
+
+    private static final Log LOG = LogFactory.getLog(XXTagChangeLogDao.class);
+
+    /**
+     * Default Constructor
+     */
+    public XXTagChangeLogDao(RangerDaoManagerBase daoManager) {
+        super(daoManager);
+    }
+
+    public List<XXTagChangeLog> findLaterThan(Long version, Long serviceId) {
+        final List<XXTagChangeLog> ret;
+        if (version != null) {
+            List<Object[]> logs = getEntityManager()
+                    .createNamedQuery("XXTagChangeLog.findSinceVersion", Object[].class)
+                    .setParameter("version", version)
+                    .setParameter("serviceId", serviceId)
+                    .getResultList();
+            // Ensure that some record has the same version as the base-version from where the records are fetched
+            if (CollectionUtils.isNotEmpty(logs)) {
+                Iterator<Object[]> iter = logs.iterator();
+                boolean foundAndRemoved = false;
+
+                while (iter.hasNext()) {
+                    Object[] record = iter.next();
+                    Long recordVersion = (Long) record[2];
+                    if (version.equals(recordVersion)) {
+                        iter.remove();
+                        foundAndRemoved = true;
+                    } else {
+                        break;
+                    }
+                }
+                if (foundAndRemoved) {
+                    ret = convert(logs);
+                } else {
+                    ret = null;
+                }
+            } else {
+                ret = null;
+            }
+        } else {
+            ret = null;
+        }
+
+        return ret;
+    }
+
+    public void deleteOlderThan(int olderThanInDays) {
+
+        Date since = new Date(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(olderThanInDays));
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Deleting records from x_tag_change_log that are older than " + olderThanInDays + " days, that is,  older than " + since);
+        }
+
+        getEntityManager().createNamedQuery("XXTagChangeLog.deleteOlderThan").setParameter("olderThan", since).executeUpdate();
+    }
+
+    private List<XXTagChangeLog> convert(List<Object[]> queryResult) {
+
+        final List<XXTagChangeLog> ret;
+
+        if (CollectionUtils.isNotEmpty(queryResult)) {
+
+            ret = new ArrayList<>(queryResult.size());
+
+            for (Object[] log : queryResult) {
+
+                Long logRecordId = (Long) log[0];
+                Integer tagChangeType = (Integer) log[1];
+                Long serviceTagsVersion = (Long) log[2];
+                Long serviceResourceId = (Long) log[3];
+                Long tagId = (Long) log[4];
+
+                ret.add(new XXTagChangeLog(logRecordId, tagChangeType, serviceTagsVersion, serviceResourceId, tagId));
+            }
+        } else {
+            ret = null;
+        }
+        return ret;
+
+    }
+
+}
+
diff --git a/security-admin/src/main/java/org/apache/ranger/entity/XXTagChangeLog.java b/security-admin/src/main/java/org/apache/ranger/entity/XXTagChangeLog.java
new file mode 100644
index 0000000..c3eb144
--- /dev/null
+++ b/security-admin/src/main/java/org/apache/ranger/entity/XXTagChangeLog.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.entity;
+
+import java.util.Date;
+
+import javax.persistence.Cacheable;
+import javax.persistence.Entity;
+import javax.persistence.Column;
+import javax.persistence.EntityListeners;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.SequenceGenerator;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.ranger.common.AppConstants;
+import org.apache.ranger.common.DateUtil;
+
+@EntityListeners( org.apache.ranger.common.db.JPABeanCallbacks.class)
+@Entity
+@Cacheable
+@XmlRootElement
+@Table(name = "x_tag_change_log")
+public class XXTagChangeLog implements java.io.Serializable {
+    private static final long serialVersionUID = 1L;
+
+    @Id
+    @SequenceGenerator(name = "X_TAG_CHANGE_LOG_SEQ", sequenceName = "X_TAG_CHANGE_LOG_SEQ", allocationSize = 1)
+    @GeneratedValue(strategy = GenerationType.AUTO, generator = "X_TAG_CHANGE_LOG_SEQ")
+    @Column(name = "id")
+    protected Long id;
+
+    @Temporal(TemporalType.TIMESTAMP)
+    @Column(name="create_time"   )
+    protected Date createTime = DateUtil.getUTCDate();
+
+    @Column(name = "service_id")
+    protected Long serviceId;
+
+    @Column(name = "change_type")
+    protected Integer changeType;
+
+    @Column(name = "service_tags_version")
+    protected Long serviceTagsVersion;
+
+    @Column(name = "service_resource_id")
+    protected Long serviceResourceId;
+
+    @Column(name = "tag_id")
+    protected Long tagId;
+
+    /**
+     * Default constructor. This will set all the attributes to default value.
+     */
+    public XXTagChangeLog( ) {
+        this(null, null, null, null, null, null, null);
+    }
+
+    public XXTagChangeLog(Long id, Integer changeType, Long serviceTagsVersion, Long serviceResourceId,  Long tagId) {
+        this(id, null, null, changeType, serviceTagsVersion, serviceResourceId, tagId);
+    }
+
+    public XXTagChangeLog(Long id, Date createTime, Long serviceId, Integer changeType, Long serviceTagsVersion, Long serviceResourceId, Long tagId) {
+        setId(id);
+        setCreateTime(createTime);
+        setServiceId(serviceId);
+        setChangeType(changeType);
+        setServiceTagsVersion(serviceTagsVersion);
+        setServiceResourceId(serviceResourceId);
+        setTagId(tagId);
+    }
+
+    public int getMyClassType( ) {
+        return AppConstants.CLASS_TYPE_NONE;
+    }
+
+    public String getMyDisplayValue() {
+        return null;
+    }
+
+    public void setId(Long id) {
+        this.id = id;
+    }
+
+    public Long getId() {
+        return this.id;
+    }
+
+    public void setCreateTime( Date createTime ) {
+        this.createTime = createTime;
+    }
+
+    public Date getCreateTime( ) {
+        return this.createTime;
+    }
+
+    public void setServiceId(Long serviceId) {
+        this.serviceId = serviceId;
+    }
+
+    public Long getServiceId() {
+        return this.serviceId;
+    }
+
+    public void setChangeType(Integer changeType) { this.changeType = changeType; }
+
+    public Integer getChangeType() { return this.changeType; }
+
+    public void setServiceTagsVersion(Long serviceTagsVersion) {
+        this.serviceTagsVersion = serviceTagsVersion;
+    }
+
+    public Long getServiceTagsVersion() {
+        return this.serviceTagsVersion;
+    }
+
+    public Long getServiceResourceId() { return this.serviceResourceId; }
+
+    public void setServiceResourceId(Long serviceResourceId) {
+        this.serviceResourceId = serviceResourceId;
+    }
+
+    public Long getTagId() { return this.tagId; }
+
+    public void setTagId(Long tagId) {
+        this.tagId = tagId;
+    }
+
+    /**
+     * This return the bean content in string format
+     * @return formatedStr
+     */
+    @Override
+    public String toString( ) {
+        String str = "XXTagChangeLog={";
+        str += "id={" + id + "} ";
+        str += "createTime={" + createTime + "} ";
+        str += "serviceId={" + serviceId + "} ";
+        str += "changeType={" + changeType + "} ";
+        str += "serviceTagsVersion={" + serviceTagsVersion + "} ";
+        str += "serviceResourceId={" + serviceResourceId + "} ";
+        str += "tagId={" + tagId + "} ";
+        str += "}";
+        return str;
+    }
+
+    /**
+     * Checks for all attributes except referenced db objects
+     * @return true if all attributes match
+     */
+    @Override
+    public boolean equals( Object obj) {
+        if (obj == null)
+            return false;
+        if (this == obj)
+            return true;
+        if (!super.equals(obj))
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        XXTagChangeLog other = (XXTagChangeLog) obj;
+        if ((this.id == null && other.id != null) || (this.id != null && !this.id.equals(other.id))) {
+            return false;
+        }
+        if ((this.createTime == null && other.createTime != null) || (this.createTime != null && !this.createTime.equals(other.createTime))) {
+            return false;
+        }
+        if ((this.serviceId == null && other.serviceId != null) || (this.serviceId != null && !this.serviceId.equals(other.serviceId))) {
+            return false;
+        }
+        if ((this.changeType == null && other.changeType != null) || (this.changeType != null && !this.changeType.equals(other.changeType))) {
+            return false;
+        }
+        if ((this.serviceTagsVersion == null && other.serviceTagsVersion != null) || (this.serviceTagsVersion != null && !this.serviceTagsVersion.equals(other.serviceTagsVersion))) {
+            return false;
+        }
+        if ((this.serviceResourceId == null && other.serviceResourceId != null) || (this.serviceResourceId != null && !this.serviceResourceId.equals(other.serviceResourceId))) {
+            return false;
+        }
+        if ((this.tagId == null && other.tagId != null) || (this.tagId != null && !this.tagId.equals(other.tagId))) {
+            return false;
+        }
+        return true;
+    }
+
+    public static boolean equals(Object object1, Object object2) {
+        if (object1 == object2) {
+            return true;
+        }
+        if ((object1 == null) || (object2 == null)) {
+            return false;
+        }
+        return object1.equals(object2);
+    }
+
+}
+
+
diff --git a/security-admin/src/main/java/org/apache/ranger/rest/PublicAPIsv2.java b/security-admin/src/main/java/org/apache/ranger/rest/PublicAPIsv2.java
index 2a4c53b..c33841d 100644
--- a/security-admin/src/main/java/org/apache/ranger/rest/PublicAPIsv2.java
+++ b/security-admin/src/main/java/org/apache/ranger/rest/PublicAPIsv2.java
@@ -532,7 +532,7 @@ public class PublicAPIsv2 {
 			logger.debug("==> PublicAPIsv2.deletePolicyDeltas(" + olderThan + ", " + reloadServicePoliciesCache + ")");
 		}
 
-		serviceREST.deletePolicyDeltas(olderThan, reloadServicePoliciesCache, request);
+		serviceREST.deletePolicyDeltas(olderThan, request);
 
 		if (logger.isDebugEnabled()) {
 			logger.debug("<== PublicAPIsv2.deletePolicyDeltas(" + olderThan + ", " + reloadServicePoliciesCache + ")");
diff --git a/security-admin/src/main/java/org/apache/ranger/rest/ServiceREST.java b/security-admin/src/main/java/org/apache/ranger/rest/ServiceREST.java
index 8ee181a..cfeaadd 100644
--- a/security-admin/src/main/java/org/apache/ranger/rest/ServiceREST.java
+++ b/security-admin/src/main/java/org/apache/ranger/rest/ServiceREST.java
@@ -93,6 +93,7 @@ import org.apache.ranger.plugin.model.RangerPolicy;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyItem;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyItemAccess;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
+import org.apache.ranger.plugin.model.RangerPolicyDelta;
 import org.apache.ranger.plugin.model.RangerSecurityZone;
 import org.apache.ranger.plugin.model.RangerService;
 import org.apache.ranger.plugin.model.RangerServiceDef;
@@ -3180,15 +3181,15 @@ public class ServiceREST {
 	@DELETE
 	@Path("/server/policydeltas")
 	@PreAuthorize("hasRole('ROLE_SYS_ADMIN')")
-	public void deletePolicyDeltas(@DefaultValue("7") @QueryParam("days") Integer olderThan, @DefaultValue("false") @QueryParam("reloadServicePoliciesCache") Boolean reloadServicePoliciesCache, @Context HttpServletRequest request) {
+	public void deletePolicyDeltas(@DefaultValue("7") @QueryParam("days") Integer olderThan, @Context HttpServletRequest request) {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> ServiceREST.deletePolicyDeltas(" + olderThan + ", " + reloadServicePoliciesCache + ")");
+			LOG.debug("==> ServiceREST.deletePolicyDeltas(" + olderThan + ")");
 		}
 
-		svcStore.resetPolicyUpdateLog(olderThan, reloadServicePoliciesCache);
+		svcStore.resetPolicyUpdateLog(olderThan, RangerPolicyDelta.CHANGE_TYPE_INVALIDATE_POLICY_DELTAS);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== ServiceREST.deletePolicyDeltas(" + olderThan + ", " + reloadServicePoliciesCache + ")");
+			LOG.debug("<== ServiceREST.deletePolicyDeltas(" + olderThan + ")");
 		}
 	}
 
diff --git a/security-admin/src/main/java/org/apache/ranger/rest/TagREST.java b/security-admin/src/main/java/org/apache/ranger/rest/TagREST.java
index 03d06e2..51f33db 100644
--- a/security-admin/src/main/java/org/apache/ranger/rest/TagREST.java
+++ b/security-admin/src/main/java/org/apache/ranger/rest/TagREST.java
@@ -40,6 +40,7 @@ import org.apache.ranger.plugin.model.RangerTagDef;
 import org.apache.ranger.plugin.store.EmbeddedServiceDefsUtil;
 import org.apache.ranger.plugin.store.TagStore;
 import org.apache.ranger.plugin.store.TagValidator;
+import org.apache.ranger.plugin.util.RangerRESTUtils;
 import org.apache.ranger.plugin.util.SearchFilter;
 import org.apache.ranger.plugin.util.ServiceTags;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -384,7 +385,7 @@ public class TagREST {
             if (exist == null) {
                 ret = tagStore.createTag(tag);
             } else if (updateIfExists) {
-                ret = updateTag(exist.getId(), exist);
+                ret = updateTag(exist.getId(), tag);
             } else {
                 throw new Exception("tag with Id " + exist.getId() + " already exists");
             }
@@ -616,7 +617,7 @@ public class TagREST {
             if (exist == null) {
                 ret = tagStore.createServiceResource(resource);
             } else if (updateIfExists) {
-                ret = updateServiceResource(exist.getId(), exist);
+                ret = updateServiceResource(exist.getId(), resource);
             } else {
                 throw new Exception("resource with Id " + exist.getId() + " already exists");
             }
@@ -1106,9 +1107,10 @@ public class TagREST {
     public ServiceTags getServiceTagsIfUpdated(@PathParam("serviceName") String serviceName,
                                                    @QueryParam(TagRESTConstants.LAST_KNOWN_TAG_VERSION_PARAM) Long lastKnownVersion,
                                                @DefaultValue("0") @QueryParam(TagRESTConstants.LAST_ACTIVATION_TIME) Long lastActivationTime, @QueryParam("pluginId") String pluginId,
+                                               @DefaultValue("false") @QueryParam(RangerRESTUtils.REST_PARAM_SUPPORTS_TAG_DELTAS) Boolean supportsTagDeltas,
                                                @Context HttpServletRequest request) {
         if(LOG.isDebugEnabled()) {
-            LOG.debug("==> TagREST.getServiceTagsIfUpdated(" + serviceName + ", " + lastKnownVersion + ", " + lastActivationTime + ", " + pluginId + ")");
+            LOG.debug("==> TagREST.getServiceTagsIfUpdated(" + serviceName + ", " + lastKnownVersion + ", " + lastActivationTime + ", " + pluginId + ", " + supportsTagDeltas + ")");
         }
 
 		ServiceTags ret      = null;
@@ -1121,7 +1123,7 @@ public class TagREST {
 		}
 
         try {
-            ret = tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion);
+            ret = tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion, !supportsTagDeltas);
 
             if (ret == null) {
                 downloadedVersion = lastKnownVersion;
@@ -1148,7 +1150,7 @@ public class TagREST {
         }
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("<== TagREST.getServiceTagsIfUpdated(" + serviceName + ", " + lastKnownVersion + ", " + lastActivationTime + ", " + pluginId + ")");
+            LOG.debug("<== TagREST.getServiceTagsIfUpdated(" + serviceName + ", " + lastKnownVersion + ", " + lastActivationTime + ", " + pluginId + ", " + supportsTagDeltas + ")");
         }
 
         return ret;
@@ -1160,10 +1162,11 @@ public class TagREST {
     public ServiceTags getSecureServiceTagsIfUpdated(@PathParam("serviceName") String serviceName,
                                                    @QueryParam(TagRESTConstants.LAST_KNOWN_TAG_VERSION_PARAM) Long lastKnownVersion,
                                                      @DefaultValue("0") @QueryParam(TagRESTConstants.LAST_ACTIVATION_TIME) Long lastActivationTime, @QueryParam("pluginId") String pluginId,
+                                                     @DefaultValue("false") @QueryParam(RangerRESTUtils.REST_PARAM_SUPPORTS_TAG_DELTAS) Boolean supportsTagDeltas,
                                                      @Context HttpServletRequest request) {
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("==> TagREST.getSecureServiceTagsIfUpdated(" + serviceName + ", " + lastKnownVersion + ", " + lastActivationTime + ", " + pluginId + ")");
+            LOG.debug("==> TagREST.getSecureServiceTagsIfUpdated(" + serviceName + ", " + lastKnownVersion + ", " + lastActivationTime + ", " + pluginId + ", " + supportsTagDeltas + ")");
         }
 
 		ServiceTags ret      = null;
@@ -1202,7 +1205,7 @@ public class TagREST {
         		}
         	}
         	if (isAllowed) {
-	            ret = tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion);
+	            ret = tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion, !supportsTagDeltas);
 
 				if(ret == null) {
                     downloadedVersion = lastKnownVersion;
@@ -1234,10 +1237,25 @@ public class TagREST {
         }
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("<== TagREST.getSecureServiceTagsIfUpdated(" + serviceName + ", " + lastKnownVersion + ", " + lastActivationTime + ", " + pluginId + ")");
+            LOG.debug("<== TagREST.getSecureServiceTagsIfUpdated(" + serviceName + ", " + lastKnownVersion + ", " + lastActivationTime + ", " + pluginId + ", " + supportsTagDeltas + ")");
         }
 
         return ret;
     }
 
+    @DELETE
+    @Path("/server/tagdeltas")
+    @PreAuthorize("hasRole('ROLE_SYS_ADMIN')")
+    public void deleteTagDeltas(@DefaultValue("3") @QueryParam("days") Integer olderThan, @Context HttpServletRequest request) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> ServiceREST.deleteTagDeltas(" + olderThan + ")");
+        }
+
+        svcStore.resetTagUpdateLog(olderThan, ServiceTags.TagsChangeType.INVALIDATE_TAG_DELTAS);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== ServiceREST.deleteTagDeltas(" + olderThan + ")");
+        }
+    }
+
 }
diff --git a/security-admin/src/main/java/org/apache/ranger/service/RangerServiceResourceService.java b/security-admin/src/main/java/org/apache/ranger/service/RangerServiceResourceService.java
index f0cb8f4..6bbf98d 100644
--- a/security-admin/src/main/java/org/apache/ranger/service/RangerServiceResourceService.java
+++ b/security-admin/src/main/java/org/apache/ranger/service/RangerServiceResourceService.java
@@ -79,7 +79,7 @@ public class RangerServiceResourceService extends RangerServiceResourceServiceBa
         RangerServiceResource ret = super.postUpdate(resource);
 
         if (serviceUpdateNeeded) {
-            daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForServiceResourceUpdate(resource.getId(), resource.getUpdateTime());
+            daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForServiceResourceUpdate(resource.getId());
         }
 
         return ret;
diff --git a/security-admin/src/main/java/org/apache/ranger/service/RangerTagDefService.java b/security-admin/src/main/java/org/apache/ranger/service/RangerTagDefService.java
index c04c335..d4350b6 100644
--- a/security-admin/src/main/java/org/apache/ranger/service/RangerTagDefService.java
+++ b/security-admin/src/main/java/org/apache/ranger/service/RangerTagDefService.java
@@ -56,7 +56,7 @@ public class RangerTagDefService extends RangerTagDefServiceBase<XXTagDef, Range
 	public RangerTagDef postUpdate(XXTagDef tagDef) {
 		RangerTagDef ret = super.postUpdate(tagDef);
 
-		daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForTagDefUpdate(tagDef.getId(), tagDef.getUpdateTime());
+		daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForTagDefUpdate(tagDef.getId());
 
 		return ret;
 	}
diff --git a/security-admin/src/main/java/org/apache/ranger/service/RangerTagResourceMapService.java b/security-admin/src/main/java/org/apache/ranger/service/RangerTagResourceMapService.java
index 89c451e..76c02ac 100644
--- a/security-admin/src/main/java/org/apache/ranger/service/RangerTagResourceMapService.java
+++ b/security-admin/src/main/java/org/apache/ranger/service/RangerTagResourceMapService.java
@@ -54,16 +54,7 @@ public class RangerTagResourceMapService extends RangerTagResourceMapServiceBase
 	public RangerTagResourceMap postCreate(XXTagResourceMap tagResMap) {
 		RangerTagResourceMap ret = super.postCreate(tagResMap);
 
-		daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForServiceResourceUpdate(tagResMap.getResourceId(), tagResMap.getUpdateTime());
-
-		return ret;
-	}
-
-	@Override
-	public RangerTagResourceMap postUpdate(XXTagResourceMap tagResMap) {
-		RangerTagResourceMap ret = super.postUpdate(tagResMap);
-
-		daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForServiceResourceUpdate(tagResMap.getResourceId(), tagResMap.getUpdateTime());
+		daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForTagResourceMapCreate(tagResMap.getResourceId(), tagResMap.getTagId());
 
 		return ret;
 	}
@@ -73,7 +64,7 @@ public class RangerTagResourceMapService extends RangerTagResourceMapServiceBase
 		XXTagResourceMap tagResMap = super.preDelete(id);
 
 		if (tagResMap != null) {
-			daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForServiceResourceUpdate(tagResMap.getResourceId(), null);
+			daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForTagResourceMapDelete(tagResMap.getResourceId(), tagResMap.getTagId());
 		}
 
 		return tagResMap;
diff --git a/security-admin/src/main/java/org/apache/ranger/service/RangerTagService.java b/security-admin/src/main/java/org/apache/ranger/service/RangerTagService.java
index f5060e5..5fdefe5 100644
--- a/security-admin/src/main/java/org/apache/ranger/service/RangerTagService.java
+++ b/security-admin/src/main/java/org/apache/ranger/service/RangerTagService.java
@@ -62,10 +62,28 @@ public class RangerTagService extends RangerTagServiceBase<XXTag, RangerTag> {
 	}
 
 	@Override
+	public RangerTag postCreate(XXTag tag) {
+		RangerTag ret = super.postCreate(tag);
+
+		daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForTagUpdate(tag.getId());
+
+		return ret;
+	}
+
+	@Override
 	public RangerTag postUpdate(XXTag tag) {
 		RangerTag ret = super.postUpdate(tag);
 
-		daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForTagUpdate(tag.getId(), tag.getUpdateTime());
+		daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForTagUpdate(tag.getId());
+
+		return ret;
+	}
+
+	@Override
+	protected XXTag preDelete(Long id) {
+		XXTag ret = super.preDelete(id);
+
+		daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForTagUpdate(id);
 
 		return ret;
 	}
diff --git a/security-admin/src/main/resources/META-INF/jpa_named_queries.xml b/security-admin/src/main/resources/META-INF/jpa_named_queries.xml
index 078588a..9714fa9 100755
--- a/security-admin/src/main/resources/META-INF/jpa_named_queries.xml
+++ b/security-admin/src/main/resources/META-INF/jpa_named_queries.xml
@@ -1560,4 +1560,24 @@
 			and roleRef.roleName    != role.name
 		</query>
 	</named-query>
+
+
+	<!-- XXTagChangeLog -->
+	<named-query name="XXTagChangeLog.findByServiceId">
+		<query>
+			select obj from XXTagChangeLog obj where obj.serviceId = :serviceId
+		</query>
+	</named-query>
+	<named-query name="XXTagChangeLog.findSinceVersion">
+		<query>
+			select obj.id, obj.changeType, obj.serviceTagsVersion, obj.serviceResourceId, obj.tagId from
+			XXTagChangeLog obj where obj.serviceId = :serviceId and obj.serviceTagsVersion >= :version order by
+			obj.serviceTagsVersion
+		</query>
+	</named-query>
+
+	<named-query name="XXTagChangeLog.deleteOlderThan">
+		<query>delete from XXTagChangeLog obj where obj.createTime &lt; :olderThan</query>
+	</named-query>
+
 </entity-mappings>
diff --git a/security-admin/src/test/java/org/apache/ranger/rest/TestTagREST.java b/security-admin/src/test/java/org/apache/ranger/rest/TestTagREST.java
index 90060b2..402f1ce 100644
--- a/security-admin/src/test/java/org/apache/ranger/rest/TestTagREST.java
+++ b/security-admin/src/test/java/org/apache/ranger/rest/TestTagREST.java
@@ -1406,16 +1406,16 @@ public class TestTagREST {
 		ServiceTags oldServiceTag = null;
 		
 		try {
-			Mockito.when(tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion)).thenReturn(oldServiceTag);
+			Mockito.when(tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion, true)).thenReturn(oldServiceTag);
 		} catch (Exception e) {
 		}
 		Mockito.when(restErrorUtil.createRESTException(Mockito.anyInt(),Mockito.anyString(), Mockito.anyBoolean())).thenThrow(new WebApplicationException());
 		thrown.expect(WebApplicationException.class);
 		
-		tagREST.getServiceTagsIfUpdated(serviceName, lastKnownVersion, 0L, pluginId, null);
+		tagREST.getServiceTagsIfUpdated(serviceName, lastKnownVersion, 0L, pluginId, false,null);
 		
 		try {
-			Mockito.verify(tagStore).getServiceTagsIfUpdated(serviceName, lastKnownVersion);
+			Mockito.verify(tagStore).getServiceTagsIfUpdated(serviceName, lastKnownVersion, true);
 		} catch (Exception e) {
 		}
 		Mockito.verify(restErrorUtil).createRESTException(Mockito.anyInt(),Mockito.anyString(), Mockito.anyBoolean());
@@ -1428,15 +1428,15 @@ public class TestTagREST {
 		oldServiceTag.setTagVersion(5L);
 		
 		try {
-			Mockito.when(tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion)).thenReturn(oldServiceTag);
+			Mockito.when(tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion, true)).thenReturn(oldServiceTag);
 		} catch (Exception e) {
 		}
-		ServiceTags serviceTags = tagREST.getServiceTagsIfUpdated(serviceName, lastKnownVersion, 0L, pluginId, null);
+		ServiceTags serviceTags = tagREST.getServiceTagsIfUpdated(serviceName, lastKnownVersion, 0L, pluginId, false,null);
 		Assert.assertEquals(serviceTags.getServiceName(), oldServiceTag.getServiceName());
 		Assert.assertEquals(serviceTags.getTagVersion(), oldServiceTag.getTagVersion());
 		
 		try {
-			Mockito.verify(tagStore).getServiceTagsIfUpdated(serviceName, lastKnownVersion);
+			Mockito.verify(tagStore).getServiceTagsIfUpdated(serviceName, lastKnownVersion, true);
 		} catch (Exception e) {
 		}
 	}
@@ -1479,11 +1479,11 @@ public class TestTagREST {
 		}
 		
 		try {
-			Mockito.when(tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion)).thenReturn(oldServiceTag);
+			Mockito.when(tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion, true)).thenReturn(oldServiceTag);
 		} catch (Exception e) {
 		}
 		
-		ServiceTags result = tagREST.getSecureServiceTagsIfUpdated(serviceName, lastKnownVersion, 0L, pluginId, null);
+		ServiceTags result = tagREST.getSecureServiceTagsIfUpdated(serviceName, lastKnownVersion, 0L, pluginId, false, null);
 		Assert.assertNotNull(result.getServiceName());
 		Assert.assertEquals(result.getServiceName(), oldServiceTag.getServiceName());
 		Assert.assertEquals(result.getTagVersion(), oldServiceTag.getTagVersion());
@@ -1499,7 +1499,7 @@ public class TestTagREST {
 		} catch (Exception e) {
 		}
 		try {
-			Mockito.verify(tagStore).getServiceTagsIfUpdated(serviceName, lastKnownVersion);
+			Mockito.verify(tagStore).getServiceTagsIfUpdated(serviceName, lastKnownVersion, true);
 		} catch (Exception e) {
 		}
 	}
@@ -1541,11 +1541,11 @@ public class TestTagREST {
 		}
 		
 		try {
-			Mockito.when(tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion)).thenReturn(oldServiceTag);
+			Mockito.when(tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion, true)).thenReturn(oldServiceTag);
 		} catch (Exception e) {
 		}
 		
-		ServiceTags result = tagREST.getSecureServiceTagsIfUpdated(serviceName, lastKnownVersion, 0L, pluginId, null);
+		ServiceTags result = tagREST.getSecureServiceTagsIfUpdated(serviceName, lastKnownVersion, 0L, pluginId, false, null);
 		Assert.assertNotNull(result.getServiceName());
 		Assert.assertEquals(result.getServiceName(), oldServiceTag.getServiceName());
 		Assert.assertEquals(result.getTagVersion(), oldServiceTag.getTagVersion());
@@ -1561,7 +1561,7 @@ public class TestTagREST {
 		} catch (Exception e) {
 		}
 		try {
-			Mockito.verify(tagStore).getServiceTagsIfUpdated(serviceName, lastKnownVersion);
+			Mockito.verify(tagStore).getServiceTagsIfUpdated(serviceName, lastKnownVersion, true);
 		} catch (Exception e) {
 		}
 	}
@@ -1606,11 +1606,11 @@ public class TestTagREST {
 		
 		Mockito.when(bizUtil.isUserAllowed(rangerService, Allowed_User_List_For_Tag_Download)).thenReturn(isAllowed);
 		try {
-			Mockito.when(tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion)).thenReturn(oldServiceTag);
+			Mockito.when(tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion, true)).thenReturn(oldServiceTag);
 		} catch (Exception e) {
 		}
 		
-		ServiceTags result = tagREST.getSecureServiceTagsIfUpdated(serviceName, lastKnownVersion, 0L, pluginId, null);
+		ServiceTags result = tagREST.getSecureServiceTagsIfUpdated(serviceName, lastKnownVersion, 0L, pluginId, false, null);
 		Assert.assertNotNull(result.getServiceName());
 		Assert.assertEquals(result.getServiceName(), oldServiceTag.getServiceName());
 		Assert.assertEquals(result.getTagVersion(), oldServiceTag.getTagVersion());
@@ -1627,7 +1627,7 @@ public class TestTagREST {
 		}
 		Mockito.verify(bizUtil).isUserAllowed(rangerService, Allowed_User_List_For_Tag_Download);
 		try {
-			Mockito.verify(tagStore).getServiceTagsIfUpdated(serviceName, lastKnownVersion);
+			Mockito.verify(tagStore).getServiceTagsIfUpdated(serviceName, lastKnownVersion, true);
 		} catch (Exception e) {
 		}
 	}
@@ -1671,11 +1671,11 @@ public class TestTagREST {
 		
 		Mockito.when(bizUtil.isUserAllowed(rangerService, Allowed_User_List_For_Tag_Download)).thenReturn(isAllowed);
 		try {
-			Mockito.when(tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion)).thenReturn(oldServiceTag);
+			Mockito.when(tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion, true)).thenReturn(oldServiceTag);
 		} catch (Exception e) {
 		}
 		
-		ServiceTags result = tagREST.getSecureServiceTagsIfUpdated(serviceName, lastKnownVersion, 0L, pluginId, null);
+		ServiceTags result = tagREST.getSecureServiceTagsIfUpdated(serviceName, lastKnownVersion, 0L, pluginId, false, null);
 		Assert.assertNotNull(result.getServiceName());
 		Assert.assertEquals(result.getServiceName(), oldServiceTag.getServiceName());
 		Assert.assertEquals(result.getTagVersion(), oldServiceTag.getTagVersion());
@@ -1692,7 +1692,7 @@ public class TestTagREST {
 		}
 		Mockito.verify(bizUtil).isUserAllowed(rangerService, Allowed_User_List_For_Tag_Download);
 		try {
-			Mockito.verify(tagStore).getServiceTagsIfUpdated(serviceName, lastKnownVersion);
+			Mockito.verify(tagStore).getServiceTagsIfUpdated(serviceName, lastKnownVersion, true);
 		} catch (Exception e) {
 		}
 	}
@@ -1738,7 +1738,7 @@ public class TestTagREST {
 		Mockito.when(restErrorUtil.createRESTException(Mockito.anyInt(), Mockito.anyString(), Mockito.anyBoolean())).thenThrow(new WebApplicationException());
 		thrown.expect(WebApplicationException.class);
 		
-		tagREST.getSecureServiceTagsIfUpdated(serviceName, lastKnownVersion, 0L, pluginId, null);
+		tagREST.getSecureServiceTagsIfUpdated(serviceName, lastKnownVersion, 0L, pluginId, false, null);
 		
 		Mockito.verify(bizUtil).isAdmin();
 		Mockito.verify(bizUtil).isKeyAdmin();
@@ -1791,13 +1791,13 @@ public class TestTagREST {
 		
 		Mockito.when(bizUtil.isUserAllowed(rangerService, Allowed_User_List_For_Tag_Download)).thenReturn(isAllowed);
 		try {
-			Mockito.when(tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion)).thenReturn(oldServiceTag);
+			Mockito.when(tagStore.getServiceTagsIfUpdated(serviceName, lastKnownVersion, true)).thenReturn(oldServiceTag);
 		} catch (Exception e) {
 		}
 		Mockito.when(restErrorUtil.createRESTException(Mockito.anyInt(), Mockito.anyString(), Mockito.anyBoolean())).thenThrow(new WebApplicationException());
 		thrown.expect(WebApplicationException.class);
 		
-		tagREST.getSecureServiceTagsIfUpdated(serviceName, lastKnownVersion, 0L, pluginId, null);
+		tagREST.getSecureServiceTagsIfUpdated(serviceName, lastKnownVersion, 0L, pluginId, false, null);
 		
 		Mockito.verify(bizUtil).isAdmin();
 		Mockito.verify(bizUtil).isKeyAdmin();
@@ -1811,7 +1811,7 @@ public class TestTagREST {
 		}
 		Mockito.verify(bizUtil).isUserAllowed(rangerService, Allowed_User_List_For_Tag_Download);
 		try {
-			Mockito.verify(tagStore).getServiceTagsIfUpdated(serviceName, lastKnownVersion);
+			Mockito.verify(tagStore).getServiceTagsIfUpdated(serviceName, lastKnownVersion, false);
 		} catch (Exception e) {
 		}
 		Mockito.verify(restErrorUtil).createRESTException(Mockito.anyInt(), Mockito.anyString(), Mockito.anyBoolean());
diff --git a/security-admin/src/test/java/org/apache/ranger/service/TestRangerTagDefService.java b/security-admin/src/test/java/org/apache/ranger/service/TestRangerTagDefService.java
index 5032c12..160a53c 100644
--- a/security-admin/src/test/java/org/apache/ranger/service/TestRangerTagDefService.java
+++ b/security-admin/src/test/java/org/apache/ranger/service/TestRangerTagDefService.java
@@ -83,14 +83,14 @@ public class TestRangerTagDefService {
 		XXServiceVersionInfoDao xxServiceVersionInfoDao = Mockito.mock(XXServiceVersionInfoDao.class);
 
 		Mockito.when(daoMgr.getXXServiceVersionInfo()).thenReturn(xxServiceVersionInfoDao);
-		Mockito.doNothing().when(xxServiceVersionInfoDao).updateServiceVersionInfoForTagDefUpdate(tagDef.getId(), tagDef.getUpdateTime());
+		Mockito.doNothing().when(xxServiceVersionInfoDao).updateServiceVersionInfoForTagDefUpdate(tagDef.getId());
 		
 		RangerTagDef result = rangerTagDefService.postUpdate(tagDef);
 		Assert.assertEquals(result.getId(), tagAttrDefList.get(0).getId());
 		Assert.assertEquals(result.getName(), tagAttrDefList.get(0).getName());
 
 		Mockito.verify(daoMgr).getXXServiceVersionInfo();
-		Mockito.verify(xxServiceVersionInfoDao).updateServiceVersionInfoForTagDefUpdate(tagDef.getId(), tagDef.getUpdateTime());
+		Mockito.verify(xxServiceVersionInfoDao).updateServiceVersionInfoForTagDefUpdate(tagDef.getId());
 	}
 		
 	@Test
diff --git a/security-admin/src/test/java/org/apache/ranger/service/TestRangerTagResourceMapService.java b/security-admin/src/test/java/org/apache/ranger/service/TestRangerTagResourceMapService.java
index 27ec8e1..1e17af8 100644
--- a/security-admin/src/test/java/org/apache/ranger/service/TestRangerTagResourceMapService.java
+++ b/security-admin/src/test/java/org/apache/ranger/service/TestRangerTagResourceMapService.java
@@ -61,14 +61,6 @@ public class TestRangerTagResourceMapService {
 	}
 
 	@Test
-	public void test2PostUpdate() {
-		Mockito.when(daoMgr.getXXPortalUser()).thenReturn(XXPortalUserDao);
-		Mockito.when(daoMgr.getXXServiceVersionInfo()).thenReturn(xXServiceVersionInfoDao);
-		rangerTagResourceMapService.postUpdate(xXTagResourceMap);
-
-	}
-
-	@Test
 	public void test3GetPopulatedViewObject() {
 		Mockito.when(daoMgr.getXXPortalUser()).thenReturn(XXPortalUserDao);
 		rangerTagResourceMapService.getPopulatedViewObject(xXTagResourceMap);