You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ab...@apache.org on 2021/06/09 00:51:14 UTC

[ranger] branch ranger-2.2 updated: RANGER-3309: Support batch upload of tags to Ranger

This is an automated email from the ASF dual-hosted git repository.

abhay pushed a commit to branch ranger-2.2
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/ranger-2.2 by this push:
     new 4a87e32  RANGER-3309: Support batch upload of tags to Ranger
4a87e32 is described below

commit 4a87e3203ad18155e82b172e9754aa0fdc3554e7
Author: Abhay Kulkarni <ab...@apache.org>
AuthorDate: Tue Jun 8 16:59:28 2021 -0700

    RANGER-3309: Support batch upload of tags to Ranger
---
 .../apache/ranger/rest/ServiceTagsProcessor.java   | 226 ++++++++++++++-------
 .../apache/ranger/service/RangerTagService.java    |   3 +-
 .../ranger/tagsync/model/AbstractTagSource.java    |   3 +-
 .../ranger/tagsync/process/TagSyncConfig.java      |  20 ++
 .../source/atlas/AtlasNotificationMapper.java      |  47 ++---
 .../tagsync/source/atlas/AtlasTagSource.java       | 187 +++++++++++------
 .../source/atlasrest/AtlasRESTTagSource.java       |  39 ++--
 .../ranger/tagsync/source/file/FileTagSource.java  |   6 +-
 .../src/main/resources/ranger-tagsync-default.xml  |   4 +
 9 files changed, 352 insertions(+), 183 deletions(-)

diff --git a/security-admin/src/main/java/org/apache/ranger/rest/ServiceTagsProcessor.java b/security-admin/src/main/java/org/apache/ranger/rest/ServiceTagsProcessor.java
index 67ae779..9551fbb 100644
--- a/security-admin/src/main/java/org/apache/ranger/rest/ServiceTagsProcessor.java
+++ b/security-admin/src/main/java/org/apache/ranger/rest/ServiceTagsProcessor.java
@@ -30,6 +30,7 @@ import org.apache.ranger.plugin.model.RangerTagDef;
 import org.apache.ranger.plugin.model.RangerTagResourceMap;
 import org.apache.ranger.plugin.store.RangerServiceResourceSignature;
 import org.apache.ranger.plugin.store.TagStore;
+import org.apache.ranger.plugin.util.RangerPerfTracer;
 import org.apache.ranger.plugin.util.ServiceTags;
 
 import java.util.ArrayList;
@@ -39,6 +40,7 @@ import java.util.Map;
 
 public class ServiceTagsProcessor {
 	private static final Log LOG = LogFactory.getLog(ServiceTagsProcessor.class);
+	private static final Log PERF_LOG_ADD_OR_UPDATE = RangerPerfTracer.getPerfLogger("tags.addOrUpdate");
 
 	private final TagStore tagStore;
 
@@ -87,9 +89,16 @@ public class ServiceTagsProcessor {
 			LOG.debug("==> ServiceTagsProcessor.createOrUpdate()");
 		}
 
+		RangerPerfTracer perfTotal = null;
+		RangerPerfTracer perf      = null;
+
 		Map<Long, RangerTagDef>          tagDefsInStore   = new HashMap<Long, RangerTagDef>();
 		Map<Long, RangerServiceResource> resourcesInStore = new HashMap<Long, RangerServiceResource>();
 
+		if(RangerPerfTracer.isPerfTraceEnabled(PERF_LOG_ADD_OR_UPDATE)) {
+			perfTotal = RangerPerfTracer.getPerfTracer(PERF_LOG_ADD_OR_UPDATE, "tags.addOrUpdate()");
+		}
+
 		if (MapUtils.isNotEmpty(serviceTags.getTagDefinitions())) {
 			RangerTagDef tagDef = null;
 
@@ -139,22 +148,32 @@ public class ServiceTagsProcessor {
 					Long                  resourceId        = resource.getId();
 
 					if(StringUtils.isNotEmpty(resource.getGuid())) {
+						if(RangerPerfTracer.isPerfTraceEnabled(PERF_LOG_ADD_OR_UPDATE)) {
+							perf = RangerPerfTracer.getPerfTracer(PERF_LOG_ADD_OR_UPDATE, "tags.search_service_resource_by_guid(" + resourceId + ")");
+						}
 						existing = tagStore.getServiceResourceByGuid(resource.getGuid());
-					}
-
-					if(existing == null) {
+						RangerPerfTracer.logAlways(perf);
+					} else {
 						if(MapUtils.isNotEmpty(resource.getResourceElements())) {
+							if(RangerPerfTracer.isPerfTraceEnabled(PERF_LOG_ADD_OR_UPDATE)) {
+								perf = RangerPerfTracer.getPerfTracer(PERF_LOG_ADD_OR_UPDATE, "tags.search_service_resource_by_signature(" + resourceId + ")");
+							}
 							RangerServiceResourceSignature serializer = new RangerServiceResourceSignature(resource);
 
 							resourceSignature = serializer.getSignature();
 							resource.setResourceSignature(resourceSignature);
 
 							existing = tagStore.getServiceResourceByServiceAndResourceSignature(resource.getServiceName(), resourceSignature);
+
+							RangerPerfTracer.logAlways(perf);
 						}
 					}
 
 					RangerServiceResource resourceInStore = null;
 
+					if(RangerPerfTracer.isPerfTraceEnabled(PERF_LOG_ADD_OR_UPDATE)) {
+						perf = RangerPerfTracer.getPerfTracer(PERF_LOG_ADD_OR_UPDATE, "tags.createOrUpdate_service_resource(" + resourceId + ")");
+					}
 					if (existing == null) {
 
 						resourceInStore = tagStore.createServiceResource(resource);
@@ -169,6 +188,7 @@ public class ServiceTagsProcessor {
 					}
 
 					resourcesInStore.put(resourceId, resourceInStore);
+					RangerPerfTracer.logAlways(perf);
 				}
 			} catch (Exception exception) {
 				LOG.error("createServiceResource failed, resource=" + resource, exception);
@@ -187,6 +207,10 @@ public class ServiceTagsProcessor {
 					continue;
 				}
 
+				if(RangerPerfTracer.isPerfTraceEnabled(PERF_LOG_ADD_OR_UPDATE)) {
+					perf = RangerPerfTracer.getPerfTracer(PERF_LOG_ADD_OR_UPDATE, "tags.get_tags_for_service_resource(" + resourceInStore.getId() + ")");
+				}
+
 				// Get all tags associated with this resourceId
 				List<RangerTag> associatedTags = null;
 
@@ -195,6 +219,8 @@ public class ServiceTagsProcessor {
 				} catch (Exception exception) {
 					LOG.error("RangerTags cannot be retrieved for resource with guid=" + resourceInStore.getGuid());
 					throw exception;
+				} finally {
+					RangerPerfTracer.logAlways(perf);
 				}
 
 				List<RangerTag> tagsToRetain = new ArrayList<RangerTag>();
@@ -216,91 +242,122 @@ public class ServiceTagsProcessor {
 								LOG.debug("Did not find matching tag for tagId=" + tagId);
 							}
 							// create new tag from incoming tag and associate it with service-resource
+							if (RangerPerfTracer.isPerfTraceEnabled(PERF_LOG_ADD_OR_UPDATE)) {
+								perf = RangerPerfTracer.getPerfTracer(PERF_LOG_ADD_OR_UPDATE, "tags.create_tag(" + tagId + ")");
+							}
 							RangerTag newTag = tagStore.createTag(incomingTag);
+							RangerPerfTracer.logAlways(perf);
 
 							RangerTagResourceMap tagResourceMap = new RangerTagResourceMap();
 
 							tagResourceMap.setTagId(newTag.getId());
 							tagResourceMap.setResourceId(resourceInStore.getId());
-
+							if (RangerPerfTracer.isPerfTraceEnabled(PERF_LOG_ADD_OR_UPDATE)) {
+								perf = RangerPerfTracer.getPerfTracer(PERF_LOG_ADD_OR_UPDATE, "tags.create_tagResourceMap(" + tagId + ")");
+							}
 							tagResourceMap = tagStore.createTagResourceMap(tagResourceMap);
+							RangerPerfTracer.logAlways(perf);
 
 							associatedTags.add(newTag);
 							tagsToRetain.add(newTag);
 
-							continue;
-
-						}
-
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Found matching tag for tagId=" + tagId + ", matchingTag=" + matchingTag);
-						}
+						} else {
 
-						if (isResourcePrivateTag(incomingTag)) {
-							if (!isResourcePrivateTag(matchingTag)) {
-								// create new tag from incoming tag and associate it with service-resource
-								RangerTag newTag = tagStore.createTag(incomingTag);
+							if (LOG.isDebugEnabled()) {
+								LOG.debug("Found matching tag for tagId=" + tagId + ", matchingTag=" + matchingTag);
+							}
 
-								RangerTagResourceMap tagResourceMap = new RangerTagResourceMap();
+							if (isResourcePrivateTag(incomingTag)) {
+								if (!isResourcePrivateTag(matchingTag)) {
+									// create new tag from incoming tag and associate it with service-resource
+									RangerTag newTag = tagStore.createTag(incomingTag);
 
-								tagResourceMap.setTagId(newTag.getId());
-								tagResourceMap.setResourceId(resourceInStore.getId());
+									RangerTagResourceMap tagResourceMap = new RangerTagResourceMap();
 
-								tagResourceMap = tagStore.createTagResourceMap(tagResourceMap);
+									tagResourceMap.setTagId(newTag.getId());
+									tagResourceMap.setResourceId(resourceInStore.getId());
 
-								associatedTags.add(newTag);
-								tagsToRetain.add(newTag);
+									tagResourceMap = tagStore.createTagResourceMap(tagResourceMap);
 
-							} else {
-								// Keep this tag, but update it with attribute-values and validity schedules from incoming tag
-								tagsToRetain.add(matchingTag);
+									associatedTags.add(newTag);
+									tagsToRetain.add(newTag);
 
-								if (LOG.isDebugEnabled()) {
-									LOG.debug("Updating existing private tag with id=" + matchingTag.getId());
+								} else {
+									tagsToRetain.add(matchingTag);
+
+									boolean isTagUpdateNeeded = false;
+
+									// Note that as there is no easy way to check validityPeriods for equality, an easy way to rule out the possibility of validityPeriods
+									// not matching is to check if both old and new tags have empty validityPeriods
+									if (matchingTag.getGuid() != null && matchingTag.getGuid().equals(incomingTag.getGuid())) {
+										if (isMatch(incomingTag, matchingTag) && CollectionUtils.isEmpty(incomingTag.getValidityPeriods()) && CollectionUtils.isEmpty(matchingTag.getValidityPeriods())) {
+											if (LOG.isDebugEnabled()) {
+												LOG.debug("No need to update existing-tag:[" + matchingTag + "] with incoming-tag:[" + incomingTag + "]");
+											}
+										} else {
+											isTagUpdateNeeded = true;
+										}
+									} else {
+										if (CollectionUtils.isEmpty(incomingTag.getValidityPeriods()) && CollectionUtils.isEmpty(matchingTag.getValidityPeriods())) {
+											// Completely matched tags. No need to update
+											if (LOG.isDebugEnabled()) {
+												LOG.debug("No need to update existing-tag:[" + matchingTag + "] with incoming-tag:[" + incomingTag + "]");
+											}
+										} else {
+											isTagUpdateNeeded = true;
+										}
+									}
+									if (isTagUpdateNeeded) {
+											// Keep this tag, and update it with attribute-values and validity schedules from incoming tag
+											if (LOG.isDebugEnabled()) {
+												LOG.debug("Updating existing private tag with id=" + matchingTag.getId());
+											}
+											incomingTag.setId(matchingTag.getId());
+											tagStore.updateTag(incomingTag);
+											isAnyTagUpdated = true;
+									}
 								}
-								incomingTag.setId(matchingTag.getId());
-								tagStore.updateTag(incomingTag);
-								isAnyTagUpdated = true;
-							}
-						} else { // shared model
-							if (isResourcePrivateTag(matchingTag)) {
-								// create new tag from incoming tag and associate it with service-resource
-								RangerTag newTag = tagStore.createTag(incomingTag);
+							} else { // shared model
+								if (isResourcePrivateTag(matchingTag)) {
+									// create new tag from incoming tag and associate it with service-resource
+									RangerTag newTag = tagStore.createTag(incomingTag);
 
-								RangerTagResourceMap tagResourceMap = new RangerTagResourceMap();
+									RangerTagResourceMap tagResourceMap = new RangerTagResourceMap();
 
-								tagResourceMap.setTagId(newTag.getId());
-								tagResourceMap.setResourceId(resourceInStore.getId());
+									tagResourceMap.setTagId(newTag.getId());
+									tagResourceMap.setResourceId(resourceInStore.getId());
 
-								tagResourceMap = tagStore.createTagResourceMap(tagResourceMap);
+									tagResourceMap = tagStore.createTagResourceMap(tagResourceMap);
 
-								associatedTags.add(newTag);
-								tagsToRetain.add(newTag);
+									associatedTags.add(newTag);
+									tagsToRetain.add(newTag);
 
-							} else {
-								// Keep this tag, but update it with attribute-values from incoming tag
-								tagsToRetain.add(matchingTag);
+								} else {
+									// Keep this tag, but update it with attribute-values from incoming tag
+									tagsToRetain.add(matchingTag);
 
-								// Update shared tag with new values
-								incomingTag.setId(matchingTag.getId());
-								tagStore.updateTag(incomingTag);
+									// Update shared tag with new values
+									incomingTag.setId(matchingTag.getId());
+									tagStore.updateTag(incomingTag);
 
-								// associate with service-resource if not already associated
-								if (findTagInList(matchingTag, associatedTags) == null) {
-									RangerTagResourceMap tagResourceMap = new RangerTagResourceMap();
+									// associate with service-resource if not already associated
+									if (findTagInList(matchingTag, associatedTags) == null) {
+										RangerTagResourceMap tagResourceMap = new RangerTagResourceMap();
 
-									tagResourceMap.setTagId(matchingTag.getId());
-									tagResourceMap.setResourceId(resourceInStore.getId());
+										tagResourceMap.setTagId(matchingTag.getId());
+										tagResourceMap.setResourceId(resourceInStore.getId());
 
-									tagResourceMap = tagStore.createTagResourceMap(tagResourceMap);
-								} else {
-									isAnyTagUpdated = true;
-								}
+										tagResourceMap = tagStore.createTagResourceMap(tagResourceMap);
+									} else {
+										isAnyTagUpdated = true;
+									}
 
+								}
 							}
-						}
 
+						}
 					}
+
 				} catch (Exception exception) {
 					LOG.error("createRangerTagResourceMap failed", exception);
 					throw exception;
@@ -332,11 +389,17 @@ public class ServiceTagsProcessor {
 					}
 				}
 				if (isAnyTagUpdated) {
+					if(RangerPerfTracer.isPerfTraceEnabled(PERF_LOG_ADD_OR_UPDATE)) {
+						perf = RangerPerfTracer.getPerfTracer(PERF_LOG_ADD_OR_UPDATE, "tags.refreshServiceResource(" + resourceInStore.getId() + ")");
+					}
 					tagStore.refreshServiceResource(resourceInStore.getId());
+					RangerPerfTracer.logAlways(perf);
 				}
 			}
 		}
 
+		RangerPerfTracer.logAlways(perfTotal);
+
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("<== ServiceTagsProcessor.createOrUpdate()");
 		}
@@ -383,40 +446,51 @@ public class ServiceTagsProcessor {
 			if (isResourcePrivateTag(incomingTag)) {
 
 				for (RangerTag existingTag : existingTags) {
+					if (isMatch(incomingTag, existingTag)) {
+						ret = existingTag;
+						break;
+					}
+				}
+			}
 
-					if (StringUtils.equals(incomingTag.getType(), existingTag.getType())) {
+		}
 
-						// Check attribute values
-						Map<String, String> incomingTagAttributes = incomingTag.getAttributes();
-						Map<String, String> existingTagAttributes = existingTag.getAttributes();
+		return ret;
+	}
 
-						if (CollectionUtils.isEqualCollection(incomingTagAttributes.keySet(), existingTagAttributes.keySet())) {
+	private boolean isMatch(final RangerTag incomingTag, final RangerTag existingTag) {
+		boolean ret = false;
 
-							boolean matched = true;
+		if (incomingTag != null && existingTag != null) {
 
-							for (Map.Entry<String, String> entry : incomingTagAttributes.entrySet()) {
+			if (StringUtils.equals(incomingTag.getType(), existingTag.getType())) {
 
-								String key = entry.getKey();
-								String value = entry.getValue();
+				// Check attribute values
+				Map<String, String> incomingTagAttributes = incomingTag.getAttributes();
+				Map<String, String> existingTagAttributes = existingTag.getAttributes();
 
-								if (!StringUtils.equals(value, existingTagAttributes.get(key))) {
-									matched = false;
-									break;
-								}
+				if (CollectionUtils.isEqualCollection(incomingTagAttributes.keySet(), existingTagAttributes.keySet())) {
 
-							}
-							if (matched) {
-								ret = existingTag;
-								break;
-							}
+					boolean matched = true;
+
+					for (Map.Entry<String, String> entry : incomingTagAttributes.entrySet()) {
+
+						String key = entry.getKey();
+						String value = entry.getValue();
+
+						if (!StringUtils.equals(value, existingTagAttributes.get(key))) {
+							matched = false;
+							break;
 						}
 
 					}
+					if (matched) {
+						ret = true;
+					}
 				}
-			}
 
+			}
 		}
-
 		return ret;
 	}
 
diff --git a/security-admin/src/main/java/org/apache/ranger/service/RangerTagService.java b/security-admin/src/main/java/org/apache/ranger/service/RangerTagService.java
index 5fdefe5..3a90bd8 100644
--- a/security-admin/src/main/java/org/apache/ranger/service/RangerTagService.java
+++ b/security-admin/src/main/java/org/apache/ranger/service/RangerTagService.java
@@ -65,7 +65,8 @@ public class RangerTagService extends RangerTagServiceBase<XXTag, RangerTag> {
 	public RangerTag postCreate(XXTag tag) {
 		RangerTag ret = super.postCreate(tag);
 
-		daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForTagUpdate(tag.getId());
+		// This is not needed - on tag creation, service-version-info need not be updated.
+		//daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForTagUpdate(tag.getId());
 
 		return ret;
 	}
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
index da4c5cb..4278a23 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
@@ -53,7 +53,7 @@ public abstract  class AbstractTagSource implements TagSource {
 		return this.name;
 	}
 
-	protected void updateSink(final ServiceTags toUpload) {
+	protected void updateSink(final ServiceTags toUpload) throws Exception {
 		if (toUpload == null) {
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("No ServiceTags to upload");
@@ -75,6 +75,7 @@ public abstract  class AbstractTagSource implements TagSource {
 				String toUploadJSON = new Gson().toJson(toUpload);
 				LOG.error("Failed to upload serviceTags: " + toUploadJSON);
 				LOG.error("Exception : ", exception);
+				throw exception;
 			}
 		}
 	}
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
index 9245fdf..bfb7206 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
@@ -116,6 +116,12 @@ public class TagSyncConfig extends Configuration {
     private static final long    DEFAULT_TAGSYNC_METRICS_FREQUENCY__TIME_IN_MILLIS = 10000L;
     private static final String  TAGSYNC_METRICS_ENABLED_PROP = "ranger.tagsync.metrics.enabled";
 
+	private static final int     DEFAULT_TAGSYNC_SINK_MAX_BATCH_SIZE = 1;
+	private static final String  TAGSYNC_SINK_MAX_BATCH_SIZE_PROP    = "ranger.tagsync.dest.ranger.max.batch.size";
+
+
+
+
 	private Properties props;
 
 	static {
@@ -434,6 +440,20 @@ public class TagSyncConfig extends Configuration {
 		return prop.getProperty(TAGSYNC_KERBEROS_IDENTITY);
 	}
 
+	public static int getSinkMaxBatchSize(Properties prop) {
+		int ret = DEFAULT_TAGSYNC_SINK_MAX_BATCH_SIZE;
+
+		String maxBatchSizeStr = prop.getProperty(TAGSYNC_SINK_MAX_BATCH_SIZE_PROP);
+
+		if (StringUtils.isNotEmpty(maxBatchSizeStr)) {
+			try {
+				ret = Integer.valueOf(maxBatchSizeStr);
+			} catch (Exception e) {
+			}
+		}
+		return ret;
+	}
+
 	private TagSyncConfig() {
 		super(false);
 		init();
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index ed4ba17..868f0f8 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -160,7 +160,7 @@ public class AtlasNotificationMapper {
     }
 
     @SuppressWarnings("unchecked")
-    static private ServiceTags buildServiceTagsForEntityDeleteNotification(RangerAtlasEntityWithTags entityWithTags) {
+    static ServiceTags buildServiceTagsForEntityDeleteNotification(RangerAtlasEntityWithTags entityWithTags) {
         final ServiceTags ret;
 
         RangerAtlasEntity   entity = entityWithTags.getEntity();
@@ -260,39 +260,36 @@ public class AtlasNotificationMapper {
 
         if (serviceResource != null) {
 
-            List<RangerTag>     tags        = getTags(entityWithTags);
-            List<RangerTagDef>  tagDefs     = getTagDefs(entityWithTags);
-            String              serviceName = serviceResource.getServiceName();
+            List<RangerTag>    tags = getTags(entityWithTags);
+            List<RangerTagDef> tagDefs = getTagDefs(entityWithTags);
+            String             serviceName = serviceResource.getServiceName();
 
             ret = createOrGetServiceTags(serviceTagsMap, serviceName);
 
-            if (serviceTagsMap == null || CollectionUtils.isNotEmpty(tags)) {
-
-                serviceResource.setId((long) ret.getServiceResources().size());
-                ret.getServiceResources().add(serviceResource);
-
-                List<Long> tagIds = new ArrayList<>();
+            serviceResource.setId((long) ret.getServiceResources().size());
+            ret.getServiceResources().add(serviceResource);
 
-                if (CollectionUtils.isNotEmpty(tags)) {
-                    for (RangerTag tag : tags) {
-                        tag.setId((long) ret.getTags().size());
-                        ret.getTags().put(tag.getId(), tag);
+            List<Long> tagIds = new ArrayList<>();
 
-                        tagIds.add(tag.getId());
-                    }
-                }
-                ret.getResourceToTagIds().put(serviceResource.getId(), tagIds);
+            if (CollectionUtils.isNotEmpty(tags)) {
+                for (RangerTag tag : tags) {
+                    tag.setId((long) ret.getTags().size());
+                    ret.getTags().put(tag.getId(), tag);
 
-                if (CollectionUtils.isNotEmpty(tagDefs)) {
-                    for (RangerTagDef tagDef : tagDefs) {
-                        tagDef.setId((long) ret.getTagDefinitions().size());
-                        ret.getTagDefinitions().put(tagDef.getId(), tagDef);
-                    }
+                    tagIds.add(tag.getId());
                 }
             } else {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Entity " + entityWithTags + " does not have any tags associated with it when full-sync is being done.");
-                    LOG.debug("Will not add this entity to serviceTags, so that this entity, if exists,  will be removed from ranger");
+                    LOG.debug("Entity " + entityWithTags + " does not have any tags associated with it");
+                }
+            }
+
+            ret.getResourceToTagIds().put(serviceResource.getId(), tagIds);
+
+            if (CollectionUtils.isNotEmpty(tagDefs)) {
+                for (RangerTagDef tagDef : tagDefs) {
+                    tagDef.setId((long) ret.getTagDefinitions().size());
+                    ret.getTagDefinitions().put(tagDef.getId(), tagDef);
                 }
             }
         } else {
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index 21a22cd..e9fe02f 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -20,10 +20,14 @@
 package org.apache.ranger.tagsync.source.atlas;
 
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import org.apache.atlas.kafka.NotificationProvider;
 import org.apache.atlas.model.notification.EntityNotification;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationInterface;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,12 +35,16 @@ import org.apache.ranger.plugin.util.ServiceTags;
 import org.apache.ranger.tagsync.model.AbstractTagSource;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
 import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntityWithTags;
 
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 public class AtlasTagSource extends AbstractTagSource {
@@ -48,6 +56,10 @@ public class AtlasTagSource extends AbstractTagSource {
 	public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.kafka.zookeeper.connect";
 	public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id";
 
+	public static final int    MAX_WAIT_TIME_IN_MILLIS = 1000;
+
+	private             int    maxBatchSize;
+
 	private ConsumerRunnable consumerTask;
 	private Thread myThread = null;
 
@@ -106,6 +118,8 @@ public class AtlasTagSource extends AbstractTagSource {
 			consumerTask = new ConsumerRunnable(iterators.get(0));
 		}
 
+		maxBatchSize = TagSyncConfig.getSinkMaxBatchSize(properties);
+
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("<== AtlasTagSource.initialize(), result=" + ret);
 		}
@@ -152,89 +166,69 @@ public class AtlasTagSource extends AbstractTagSource {
 
 		private final NotificationConsumer<EntityNotification> consumer;
 
+		private final List<RangerAtlasEntityWithTags>             atlasEntitiesWithTags = new ArrayList<>();
+		private final List<AtlasKafkaMessage<EntityNotification>> messages              = new ArrayList<>();
+
+		private long    offsetOfLastMessageDeliveredToRanger = -1L;
+		private long    offsetOfLastMessageCommittedToKafka  = -1L;
+
+		private boolean isHandlingDeleteOps   = false;
+
 		private ConsumerRunnable(NotificationConsumer<EntityNotification> consumer) {
 			this.consumer = consumer;
 		}
 
-
 		@Override
 		public void run() {
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("==> ConsumerRunnable.run()");
 			}
 
-			boolean seenCommitException = false;
-			long offsetOfLastMessageDeliveredToRanger = -1L;
-
 			while (true) {
-				try {
-					List<AtlasKafkaMessage<EntityNotification>> messages = consumer.receive(1000L);
 
-					int index = 0;
+				try {
+					List<AtlasKafkaMessage<EntityNotification>> newMessages = consumer.receive(MAX_WAIT_TIME_IN_MILLIS);
 
-					if (messages.size() > 0 && seenCommitException) {
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("seenCommitException=[true], offsetOfLastMessageDeliveredToRanger=[" + offsetOfLastMessageDeliveredToRanger + "]");
+					if (newMessages.size() == 0) {
+						LOG.info("AtlasTagSource.ConsumerRunnable.run: no message from NotificationConsumer within " + MAX_WAIT_TIME_IN_MILLIS + " milliseconds");
+						if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags)) {
+							buildAndUploadServiceTags();
 						}
-						for (; index < messages.size(); index++) {
-							AtlasKafkaMessage<EntityNotification> message = messages.get(index);
-							if (message.getOffset() <= offsetOfLastMessageDeliveredToRanger) {
-								// Already delivered to Ranger
-								TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+					} else {
+						for (AtlasKafkaMessage<EntityNotification> message : newMessages) {
+							EntityNotification notification = message != null ? message.getMessage() : null;
+
+							if (notification != null) {
+								EntityNotificationWrapper notificationWrapper = null;
 								try {
+									notificationWrapper = new EntityNotificationWrapper(notification);
+								} catch (Throwable e) {
+									LOG.error("notification:[" + notification + "] has some issues..perhaps null entity??", e);
+								}
+								if (notificationWrapper != null) {
 									if (LOG.isDebugEnabled()) {
-										LOG.debug("Committing previously commit-failed message with offset:[" + message.getOffset() + "]");
+										LOG.debug("Message-offset=" + message.getOffset() + ", Notification=" + getPrintableEntityNotification(notificationWrapper));
 									}
-									consumer.commit(partition, message.getOffset());
-								} catch (Exception commitException) {
-									LOG.warn("Ranger tagsync already processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException);
-									LOG.warn("This will cause Kafka to deliver this message:[" + message.getOffset() + "] repeatedly!! This may be unrecoverable error!!");
-								}
-							} else {
-								break;
-							}
-						}
-					}
 
-					seenCommitException = false;
-					offsetOfLastMessageDeliveredToRanger = -1L;
+									RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(notificationWrapper);
 
-					for (; index < messages.size(); index++) {
-						AtlasKafkaMessage<EntityNotification> message = messages.get(index);
-						EntityNotification notification = message != null ? message.getMessage() : null;
-
-						if (notification != null) {
-							EntityNotificationWrapper notificationWrapper = null;
-							try {
-								notificationWrapper = new EntityNotificationWrapper(notification);
-							} catch (Throwable e) {
-								LOG.error("notification:[" + notification +"] has some issues..perhaps null entity??", e);
-							}
-							if (notificationWrapper != null) {
-								if (LOG.isDebugEnabled()) {
-									LOG.debug("Message-offset=" + message.getOffset() + ", Notification=" + getPrintableEntityNotification(notificationWrapper));
-								}
-
-								ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notificationWrapper);
-								if (serviceTags != null) {
-									updateSink(serviceTags);
-								}
-								offsetOfLastMessageDeliveredToRanger = message.getOffset();
-
-								if (!seenCommitException) {
-									TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
-									try {
-										consumer.commit(partition, message.getOffset());
-									} catch (Exception commitException) {
-										seenCommitException = true;
-										LOG.warn("Ranger tagsync processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException);
+									if ((notificationWrapper.getIsEntityDeleteOp() && !isHandlingDeleteOps) || (!notificationWrapper.getIsEntityDeleteOp() && isHandlingDeleteOps)) {
+										buildAndUploadServiceTags();
+										isHandlingDeleteOps = !isHandlingDeleteOps;
 									}
+
+									atlasEntitiesWithTags.add(entityWithTags);
+									messages.add(message);
 								}
+							} else {
+								LOG.error("Null entityNotification received from Kafka!! Ignoring..");
 							}
-						} else {
-							LOG.error("Null entityNotification received from Kafka!! Ignoring..");
+						}
+						if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags) && atlasEntitiesWithTags.size() >= maxBatchSize) {
+							buildAndUploadServiceTags();
 						}
 					}
+
 				} catch (Exception exception) {
 					LOG.error("Caught exception..: ", exception);
 					// If transient error, retry after short interval
@@ -248,6 +242,81 @@ public class AtlasTagSource extends AbstractTagSource {
 				}
 			}
 		}
+
+		private void buildAndUploadServiceTags() throws Exception {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("==> buildAndUploadServiceTags()");
+			}
+
+			commitToKafka();
+
+			Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processAtlasEntities(atlasEntitiesWithTags);
+
+			if (MapUtils.isNotEmpty(serviceTagsMap)) {
+				if (serviceTagsMap.size() != 1) {
+					LOG.warn("Unexpected!! Notifications for more than one service received by AtlasTagSource.. Service-Names:[" + serviceTagsMap.keySet() + "]");
+				}
+				for (Map.Entry<String, ServiceTags> entry : serviceTagsMap.entrySet()) {
+					if (isHandlingDeleteOps) {
+						entry.getValue().setOp(ServiceTags.OP_DELETE);
+						entry.getValue().setTagDefinitions(Collections.EMPTY_MAP);
+						entry.getValue().setTags(Collections.EMPTY_MAP);
+					} else {
+						entry.getValue().setOp(ServiceTags.OP_ADD_OR_UPDATE);
+					}
+
+					if (LOG.isDebugEnabled()) {
+						Gson gsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create();
+						String serviceTagsString = gsonBuilder.toJson(entry.getValue());
+
+						LOG.debug("serviceTags=" + serviceTagsString);
+					}
+					updateSink(entry.getValue());
+				}
+				offsetOfLastMessageDeliveredToRanger = messages.get(messages.size()-1).getOffset();
+				LOG.info("Completed processing batch of messages of size:[" + messages.size() + "] received from NotificationConsumer");
+
+				commitToKafka();
+			}
+
+			atlasEntitiesWithTags.clear();
+			messages.clear();
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("<== buildAndUploadServiceTags()");
+			}
+		}
+
+		private void commitToKafka() {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("==> commitToKafka()");
+			}
+
+			for (AtlasKafkaMessage<EntityNotification> message : messages) {
+				if (message.getOffset() > offsetOfLastMessageCommittedToKafka) {
+					if (message.getOffset() <= offsetOfLastMessageDeliveredToRanger) {
+						// Already delivered to Ranger
+						TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+						try {
+							if (LOG.isDebugEnabled()) {
+								LOG.debug("Committing message with offset:[" + message.getOffset() + "] to Kafka");
+							}
+							consumer.commit(partition, message.getOffset());
+							offsetOfLastMessageCommittedToKafka = message.getOffset();
+						} catch (Exception commitException) {
+							LOG.warn("Ranger tagsync already processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException);
+							LOG.warn("This will cause Kafka to deliver this message:[" + message.getOffset() + "] repeatedly!! This may be unrecoverable error!!");
+						}
+					} else {
+						break;
+					}
+				}
+			}
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("<== commitToKafka()");
+			}
+		}
 	}
 }
 
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
index 8b12aff..f49a0a8 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
@@ -189,30 +189,29 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
 	}
 
 	@Override
-	public void run() {
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> AtlasRESTTagSource.run()");
-		}
-
-		while (true) {
-
-			synchUp();
-
-			LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds");
+    public void run() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRESTTagSource.run()");
+        }
+        while (true) {
+            try {
+                synchUp();
 
-			try {
+                LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds");
 
-				Thread.sleep(sleepTimeBetweenCycleInMillis);
+                Thread.sleep(sleepTimeBetweenCycleInMillis);
 
-			} catch (InterruptedException exception) {
-				LOG.error("Interrupted..: ", exception);
-				return;
-			}
-		}
-	}
+            } catch (InterruptedException exception) {
+                LOG.error("Interrupted..: ", exception);
+                return;
+            } catch (Exception e) {
+                LOG.error("Caught exception", e);
+                return;
+            }
+        }
+    }
 
-	public void synchUp() {
+	public void synchUp() throws Exception {
 
 		List<RangerAtlasEntityWithTags> rangerAtlasEntities = getAtlasActiveEntities();
 
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
index 62a5f73..ac611a1 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
@@ -230,6 +230,10 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
 				LOG.error("Interrupted..: ", exception);
 				return;
 			}
+			catch (Exception e) {
+				LOG.error("Caught exception..", e);
+				return;
+			}
 		}
 	}
 
@@ -258,7 +262,7 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
 		return ret;
 	}
 
-	public void synchUp() {
+	public void synchUp() throws Exception {
 		if (isChanged()) {
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Begin: update tags from source==>sink");
diff --git a/tagsync/src/main/resources/ranger-tagsync-default.xml b/tagsync/src/main/resources/ranger-tagsync-default.xml
index 1034bc6..5a16f25 100644
--- a/tagsync/src/main/resources/ranger-tagsync-default.xml
+++ b/tagsync/src/main/resources/ranger-tagsync-default.xml
@@ -41,4 +41,8 @@
 		<name>ranger.tagsync.dest.ranger.session.cookie.name</name>
 		<value>RANGERADMINSESSIONID</value>
 	</property>
+	<property>
+		<name>ranger.tagsync.dest.ranger.max.batch.size</name>
+		<value>1</value>
+	</property>
 </configuration>