You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ab...@apache.org on 2021/06/09 00:51:14 UTC
[ranger] branch ranger-2.2 updated: RANGER-3309: Support batch
upload of tags to Ranger
This is an automated email from the ASF dual-hosted git repository.
abhay pushed a commit to branch ranger-2.2
in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/ranger-2.2 by this push:
new 4a87e32 RANGER-3309: Support batch upload of tags to Ranger
4a87e32 is described below
commit 4a87e3203ad18155e82b172e9754aa0fdc3554e7
Author: Abhay Kulkarni <ab...@apache.org>
AuthorDate: Tue Jun 8 16:59:28 2021 -0700
RANGER-3309: Support batch upload of tags to Ranger
---
.../apache/ranger/rest/ServiceTagsProcessor.java | 226 ++++++++++++++-------
.../apache/ranger/service/RangerTagService.java | 3 +-
.../ranger/tagsync/model/AbstractTagSource.java | 3 +-
.../ranger/tagsync/process/TagSyncConfig.java | 20 ++
.../source/atlas/AtlasNotificationMapper.java | 47 ++---
.../tagsync/source/atlas/AtlasTagSource.java | 187 +++++++++++------
.../source/atlasrest/AtlasRESTTagSource.java | 39 ++--
.../ranger/tagsync/source/file/FileTagSource.java | 6 +-
.../src/main/resources/ranger-tagsync-default.xml | 4 +
9 files changed, 352 insertions(+), 183 deletions(-)
diff --git a/security-admin/src/main/java/org/apache/ranger/rest/ServiceTagsProcessor.java b/security-admin/src/main/java/org/apache/ranger/rest/ServiceTagsProcessor.java
index 67ae779..9551fbb 100644
--- a/security-admin/src/main/java/org/apache/ranger/rest/ServiceTagsProcessor.java
+++ b/security-admin/src/main/java/org/apache/ranger/rest/ServiceTagsProcessor.java
@@ -30,6 +30,7 @@ import org.apache.ranger.plugin.model.RangerTagDef;
import org.apache.ranger.plugin.model.RangerTagResourceMap;
import org.apache.ranger.plugin.store.RangerServiceResourceSignature;
import org.apache.ranger.plugin.store.TagStore;
+import org.apache.ranger.plugin.util.RangerPerfTracer;
import org.apache.ranger.plugin.util.ServiceTags;
import java.util.ArrayList;
@@ -39,6 +40,7 @@ import java.util.Map;
public class ServiceTagsProcessor {
private static final Log LOG = LogFactory.getLog(ServiceTagsProcessor.class);
+ private static final Log PERF_LOG_ADD_OR_UPDATE = RangerPerfTracer.getPerfLogger("tags.addOrUpdate");
private final TagStore tagStore;
@@ -87,9 +89,16 @@ public class ServiceTagsProcessor {
LOG.debug("==> ServiceTagsProcessor.createOrUpdate()");
}
+ RangerPerfTracer perfTotal = null;
+ RangerPerfTracer perf = null;
+
Map<Long, RangerTagDef> tagDefsInStore = new HashMap<Long, RangerTagDef>();
Map<Long, RangerServiceResource> resourcesInStore = new HashMap<Long, RangerServiceResource>();
+ if(RangerPerfTracer.isPerfTraceEnabled(PERF_LOG_ADD_OR_UPDATE)) {
+ perfTotal = RangerPerfTracer.getPerfTracer(PERF_LOG_ADD_OR_UPDATE, "tags.addOrUpdate()");
+ }
+
if (MapUtils.isNotEmpty(serviceTags.getTagDefinitions())) {
RangerTagDef tagDef = null;
@@ -139,22 +148,32 @@ public class ServiceTagsProcessor {
Long resourceId = resource.getId();
if(StringUtils.isNotEmpty(resource.getGuid())) {
+ if(RangerPerfTracer.isPerfTraceEnabled(PERF_LOG_ADD_OR_UPDATE)) {
+ perf = RangerPerfTracer.getPerfTracer(PERF_LOG_ADD_OR_UPDATE, "tags.search_service_resource_by_guid(" + resourceId + ")");
+ }
existing = tagStore.getServiceResourceByGuid(resource.getGuid());
- }
-
- if(existing == null) {
+ RangerPerfTracer.logAlways(perf);
+ } else {
if(MapUtils.isNotEmpty(resource.getResourceElements())) {
+ if(RangerPerfTracer.isPerfTraceEnabled(PERF_LOG_ADD_OR_UPDATE)) {
+ perf = RangerPerfTracer.getPerfTracer(PERF_LOG_ADD_OR_UPDATE, "tags.search_service_resource_by_signature(" + resourceId + ")");
+ }
RangerServiceResourceSignature serializer = new RangerServiceResourceSignature(resource);
resourceSignature = serializer.getSignature();
resource.setResourceSignature(resourceSignature);
existing = tagStore.getServiceResourceByServiceAndResourceSignature(resource.getServiceName(), resourceSignature);
+
+ RangerPerfTracer.logAlways(perf);
}
}
RangerServiceResource resourceInStore = null;
+ if(RangerPerfTracer.isPerfTraceEnabled(PERF_LOG_ADD_OR_UPDATE)) {
+ perf = RangerPerfTracer.getPerfTracer(PERF_LOG_ADD_OR_UPDATE, "tags.createOrUpdate_service_resource(" + resourceId + ")");
+ }
if (existing == null) {
resourceInStore = tagStore.createServiceResource(resource);
@@ -169,6 +188,7 @@ public class ServiceTagsProcessor {
}
resourcesInStore.put(resourceId, resourceInStore);
+ RangerPerfTracer.logAlways(perf);
}
} catch (Exception exception) {
LOG.error("createServiceResource failed, resource=" + resource, exception);
@@ -187,6 +207,10 @@ public class ServiceTagsProcessor {
continue;
}
+ if(RangerPerfTracer.isPerfTraceEnabled(PERF_LOG_ADD_OR_UPDATE)) {
+ perf = RangerPerfTracer.getPerfTracer(PERF_LOG_ADD_OR_UPDATE, "tags.get_tags_for_service_resource(" + resourceInStore.getId() + ")");
+ }
+
// Get all tags associated with this resourceId
List<RangerTag> associatedTags = null;
@@ -195,6 +219,8 @@ public class ServiceTagsProcessor {
} catch (Exception exception) {
LOG.error("RangerTags cannot be retrieved for resource with guid=" + resourceInStore.getGuid());
throw exception;
+ } finally {
+ RangerPerfTracer.logAlways(perf);
}
List<RangerTag> tagsToRetain = new ArrayList<RangerTag>();
@@ -216,91 +242,122 @@ public class ServiceTagsProcessor {
LOG.debug("Did not find matching tag for tagId=" + tagId);
}
// create new tag from incoming tag and associate it with service-resource
+ if (RangerPerfTracer.isPerfTraceEnabled(PERF_LOG_ADD_OR_UPDATE)) {
+ perf = RangerPerfTracer.getPerfTracer(PERF_LOG_ADD_OR_UPDATE, "tags.create_tag(" + tagId + ")");
+ }
RangerTag newTag = tagStore.createTag(incomingTag);
+ RangerPerfTracer.logAlways(perf);
RangerTagResourceMap tagResourceMap = new RangerTagResourceMap();
tagResourceMap.setTagId(newTag.getId());
tagResourceMap.setResourceId(resourceInStore.getId());
-
+ if (RangerPerfTracer.isPerfTraceEnabled(PERF_LOG_ADD_OR_UPDATE)) {
+ perf = RangerPerfTracer.getPerfTracer(PERF_LOG_ADD_OR_UPDATE, "tags.create_tagResourceMap(" + tagId + ")");
+ }
tagResourceMap = tagStore.createTagResourceMap(tagResourceMap);
+ RangerPerfTracer.logAlways(perf);
associatedTags.add(newTag);
tagsToRetain.add(newTag);
- continue;
-
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found matching tag for tagId=" + tagId + ", matchingTag=" + matchingTag);
- }
+ } else {
- if (isResourcePrivateTag(incomingTag)) {
- if (!isResourcePrivateTag(matchingTag)) {
- // create new tag from incoming tag and associate it with service-resource
- RangerTag newTag = tagStore.createTag(incomingTag);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found matching tag for tagId=" + tagId + ", matchingTag=" + matchingTag);
+ }
- RangerTagResourceMap tagResourceMap = new RangerTagResourceMap();
+ if (isResourcePrivateTag(incomingTag)) {
+ if (!isResourcePrivateTag(matchingTag)) {
+ // create new tag from incoming tag and associate it with service-resource
+ RangerTag newTag = tagStore.createTag(incomingTag);
- tagResourceMap.setTagId(newTag.getId());
- tagResourceMap.setResourceId(resourceInStore.getId());
+ RangerTagResourceMap tagResourceMap = new RangerTagResourceMap();
- tagResourceMap = tagStore.createTagResourceMap(tagResourceMap);
+ tagResourceMap.setTagId(newTag.getId());
+ tagResourceMap.setResourceId(resourceInStore.getId());
- associatedTags.add(newTag);
- tagsToRetain.add(newTag);
+ tagResourceMap = tagStore.createTagResourceMap(tagResourceMap);
- } else {
- // Keep this tag, but update it with attribute-values and validity schedules from incoming tag
- tagsToRetain.add(matchingTag);
+ associatedTags.add(newTag);
+ tagsToRetain.add(newTag);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Updating existing private tag with id=" + matchingTag.getId());
+ } else {
+ tagsToRetain.add(matchingTag);
+
+ boolean isTagUpdateNeeded = false;
+
+ // Note that as there is no easy way to check validityPeriods for equality, an easy way to rule out the possibility of validityPeriods
+ // not matching is to check if both old and new tags have empty validityPeriods
+ if (matchingTag.getGuid() != null && matchingTag.getGuid().equals(incomingTag.getGuid())) {
+ if (isMatch(incomingTag, matchingTag) && CollectionUtils.isEmpty(incomingTag.getValidityPeriods()) && CollectionUtils.isEmpty(matchingTag.getValidityPeriods())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No need to update existing-tag:[" + matchingTag + "] with incoming-tag:[" + incomingTag + "]");
+ }
+ } else {
+ isTagUpdateNeeded = true;
+ }
+ } else {
+ if (CollectionUtils.isEmpty(incomingTag.getValidityPeriods()) && CollectionUtils.isEmpty(matchingTag.getValidityPeriods())) {
+ // Completely matched tags. No need to update
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No need to update existing-tag:[" + matchingTag + "] with incoming-tag:[" + incomingTag + "]");
+ }
+ } else {
+ isTagUpdateNeeded = true;
+ }
+ }
+ if (isTagUpdateNeeded) {
+ // Keep this tag, and update it with attribute-values and validity schedules from incoming tag
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Updating existing private tag with id=" + matchingTag.getId());
+ }
+ incomingTag.setId(matchingTag.getId());
+ tagStore.updateTag(incomingTag);
+ isAnyTagUpdated = true;
+ }
}
- incomingTag.setId(matchingTag.getId());
- tagStore.updateTag(incomingTag);
- isAnyTagUpdated = true;
- }
- } else { // shared model
- if (isResourcePrivateTag(matchingTag)) {
- // create new tag from incoming tag and associate it with service-resource
- RangerTag newTag = tagStore.createTag(incomingTag);
+ } else { // shared model
+ if (isResourcePrivateTag(matchingTag)) {
+ // create new tag from incoming tag and associate it with service-resource
+ RangerTag newTag = tagStore.createTag(incomingTag);
- RangerTagResourceMap tagResourceMap = new RangerTagResourceMap();
+ RangerTagResourceMap tagResourceMap = new RangerTagResourceMap();
- tagResourceMap.setTagId(newTag.getId());
- tagResourceMap.setResourceId(resourceInStore.getId());
+ tagResourceMap.setTagId(newTag.getId());
+ tagResourceMap.setResourceId(resourceInStore.getId());
- tagResourceMap = tagStore.createTagResourceMap(tagResourceMap);
+ tagResourceMap = tagStore.createTagResourceMap(tagResourceMap);
- associatedTags.add(newTag);
- tagsToRetain.add(newTag);
+ associatedTags.add(newTag);
+ tagsToRetain.add(newTag);
- } else {
- // Keep this tag, but update it with attribute-values from incoming tag
- tagsToRetain.add(matchingTag);
+ } else {
+ // Keep this tag, but update it with attribute-values from incoming tag
+ tagsToRetain.add(matchingTag);
- // Update shared tag with new values
- incomingTag.setId(matchingTag.getId());
- tagStore.updateTag(incomingTag);
+ // Update shared tag with new values
+ incomingTag.setId(matchingTag.getId());
+ tagStore.updateTag(incomingTag);
- // associate with service-resource if not already associated
- if (findTagInList(matchingTag, associatedTags) == null) {
- RangerTagResourceMap tagResourceMap = new RangerTagResourceMap();
+ // associate with service-resource if not already associated
+ if (findTagInList(matchingTag, associatedTags) == null) {
+ RangerTagResourceMap tagResourceMap = new RangerTagResourceMap();
- tagResourceMap.setTagId(matchingTag.getId());
- tagResourceMap.setResourceId(resourceInStore.getId());
+ tagResourceMap.setTagId(matchingTag.getId());
+ tagResourceMap.setResourceId(resourceInStore.getId());
- tagResourceMap = tagStore.createTagResourceMap(tagResourceMap);
- } else {
- isAnyTagUpdated = true;
- }
+ tagResourceMap = tagStore.createTagResourceMap(tagResourceMap);
+ } else {
+ isAnyTagUpdated = true;
+ }
+ }
}
- }
+ }
}
+
} catch (Exception exception) {
LOG.error("createRangerTagResourceMap failed", exception);
throw exception;
@@ -332,11 +389,17 @@ public class ServiceTagsProcessor {
}
}
if (isAnyTagUpdated) {
+ if(RangerPerfTracer.isPerfTraceEnabled(PERF_LOG_ADD_OR_UPDATE)) {
+ perf = RangerPerfTracer.getPerfTracer(PERF_LOG_ADD_OR_UPDATE, "tags.refreshServiceResource(" + resourceInStore.getId() + ")");
+ }
tagStore.refreshServiceResource(resourceInStore.getId());
+ RangerPerfTracer.logAlways(perf);
}
}
}
+ RangerPerfTracer.logAlways(perfTotal);
+
if (LOG.isDebugEnabled()) {
LOG.debug("<== ServiceTagsProcessor.createOrUpdate()");
}
@@ -383,40 +446,51 @@ public class ServiceTagsProcessor {
if (isResourcePrivateTag(incomingTag)) {
for (RangerTag existingTag : existingTags) {
+ if (isMatch(incomingTag, existingTag)) {
+ ret = existingTag;
+ break;
+ }
+ }
+ }
- if (StringUtils.equals(incomingTag.getType(), existingTag.getType())) {
+ }
- // Check attribute values
- Map<String, String> incomingTagAttributes = incomingTag.getAttributes();
- Map<String, String> existingTagAttributes = existingTag.getAttributes();
+ return ret;
+ }
- if (CollectionUtils.isEqualCollection(incomingTagAttributes.keySet(), existingTagAttributes.keySet())) {
+ private boolean isMatch(final RangerTag incomingTag, final RangerTag existingTag) {
+ boolean ret = false;
- boolean matched = true;
+ if (incomingTag != null && existingTag != null) {
- for (Map.Entry<String, String> entry : incomingTagAttributes.entrySet()) {
+ if (StringUtils.equals(incomingTag.getType(), existingTag.getType())) {
- String key = entry.getKey();
- String value = entry.getValue();
+ // Check attribute values
+ Map<String, String> incomingTagAttributes = incomingTag.getAttributes();
+ Map<String, String> existingTagAttributes = existingTag.getAttributes();
- if (!StringUtils.equals(value, existingTagAttributes.get(key))) {
- matched = false;
- break;
- }
+ if (CollectionUtils.isEqualCollection(incomingTagAttributes.keySet(), existingTagAttributes.keySet())) {
- }
- if (matched) {
- ret = existingTag;
- break;
- }
+ boolean matched = true;
+
+ for (Map.Entry<String, String> entry : incomingTagAttributes.entrySet()) {
+
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (!StringUtils.equals(value, existingTagAttributes.get(key))) {
+ matched = false;
+ break;
}
}
+ if (matched) {
+ ret = true;
+ }
}
- }
+ }
}
-
return ret;
}
diff --git a/security-admin/src/main/java/org/apache/ranger/service/RangerTagService.java b/security-admin/src/main/java/org/apache/ranger/service/RangerTagService.java
index 5fdefe5..3a90bd8 100644
--- a/security-admin/src/main/java/org/apache/ranger/service/RangerTagService.java
+++ b/security-admin/src/main/java/org/apache/ranger/service/RangerTagService.java
@@ -65,7 +65,8 @@ public class RangerTagService extends RangerTagServiceBase<XXTag, RangerTag> {
public RangerTag postCreate(XXTag tag) {
RangerTag ret = super.postCreate(tag);
- daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForTagUpdate(tag.getId());
+ // This is not needed - on tag creation, service-version-info need not be updated.
+ //daoMgr.getXXServiceVersionInfo().updateServiceVersionInfoForTagUpdate(tag.getId());
return ret;
}
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
index da4c5cb..4278a23 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
@@ -53,7 +53,7 @@ public abstract class AbstractTagSource implements TagSource {
return this.name;
}
- protected void updateSink(final ServiceTags toUpload) {
+ protected void updateSink(final ServiceTags toUpload) throws Exception {
if (toUpload == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("No ServiceTags to upload");
@@ -75,6 +75,7 @@ public abstract class AbstractTagSource implements TagSource {
String toUploadJSON = new Gson().toJson(toUpload);
LOG.error("Failed to upload serviceTags: " + toUploadJSON);
LOG.error("Exception : ", exception);
+ throw exception;
}
}
}
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
index 9245fdf..bfb7206 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
@@ -116,6 +116,12 @@ public class TagSyncConfig extends Configuration {
private static final long DEFAULT_TAGSYNC_METRICS_FREQUENCY__TIME_IN_MILLIS = 10000L;
private static final String TAGSYNC_METRICS_ENABLED_PROP = "ranger.tagsync.metrics.enabled";
+ private static final int DEFAULT_TAGSYNC_SINK_MAX_BATCH_SIZE = 1;
+ private static final String TAGSYNC_SINK_MAX_BATCH_SIZE_PROP = "ranger.tagsync.dest.ranger.max.batch.size";
+
+
+
+
private Properties props;
static {
@@ -434,6 +440,20 @@ public class TagSyncConfig extends Configuration {
return prop.getProperty(TAGSYNC_KERBEROS_IDENTITY);
}
+ public static int getSinkMaxBatchSize(Properties prop) {
+ int ret = DEFAULT_TAGSYNC_SINK_MAX_BATCH_SIZE;
+
+ String maxBatchSizeStr = prop.getProperty(TAGSYNC_SINK_MAX_BATCH_SIZE_PROP);
+
+ if (StringUtils.isNotEmpty(maxBatchSizeStr)) {
+ try {
+ ret = Integer.valueOf(maxBatchSizeStr);
+ } catch (Exception e) {
+ }
+ }
+ return ret;
+ }
+
private TagSyncConfig() {
super(false);
init();
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index ed4ba17..868f0f8 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -160,7 +160,7 @@ public class AtlasNotificationMapper {
}
@SuppressWarnings("unchecked")
- static private ServiceTags buildServiceTagsForEntityDeleteNotification(RangerAtlasEntityWithTags entityWithTags) {
+ static ServiceTags buildServiceTagsForEntityDeleteNotification(RangerAtlasEntityWithTags entityWithTags) {
final ServiceTags ret;
RangerAtlasEntity entity = entityWithTags.getEntity();
@@ -260,39 +260,36 @@ public class AtlasNotificationMapper {
if (serviceResource != null) {
- List<RangerTag> tags = getTags(entityWithTags);
- List<RangerTagDef> tagDefs = getTagDefs(entityWithTags);
- String serviceName = serviceResource.getServiceName();
+ List<RangerTag> tags = getTags(entityWithTags);
+ List<RangerTagDef> tagDefs = getTagDefs(entityWithTags);
+ String serviceName = serviceResource.getServiceName();
ret = createOrGetServiceTags(serviceTagsMap, serviceName);
- if (serviceTagsMap == null || CollectionUtils.isNotEmpty(tags)) {
-
- serviceResource.setId((long) ret.getServiceResources().size());
- ret.getServiceResources().add(serviceResource);
-
- List<Long> tagIds = new ArrayList<>();
+ serviceResource.setId((long) ret.getServiceResources().size());
+ ret.getServiceResources().add(serviceResource);
- if (CollectionUtils.isNotEmpty(tags)) {
- for (RangerTag tag : tags) {
- tag.setId((long) ret.getTags().size());
- ret.getTags().put(tag.getId(), tag);
+ List<Long> tagIds = new ArrayList<>();
- tagIds.add(tag.getId());
- }
- }
- ret.getResourceToTagIds().put(serviceResource.getId(), tagIds);
+ if (CollectionUtils.isNotEmpty(tags)) {
+ for (RangerTag tag : tags) {
+ tag.setId((long) ret.getTags().size());
+ ret.getTags().put(tag.getId(), tag);
- if (CollectionUtils.isNotEmpty(tagDefs)) {
- for (RangerTagDef tagDef : tagDefs) {
- tagDef.setId((long) ret.getTagDefinitions().size());
- ret.getTagDefinitions().put(tagDef.getId(), tagDef);
- }
+ tagIds.add(tag.getId());
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Entity " + entityWithTags + " does not have any tags associated with it when full-sync is being done.");
- LOG.debug("Will not add this entity to serviceTags, so that this entity, if exists, will be removed from ranger");
+ LOG.debug("Entity " + entityWithTags + " does not have any tags associated with it");
+ }
+ }
+
+ ret.getResourceToTagIds().put(serviceResource.getId(), tagIds);
+
+ if (CollectionUtils.isNotEmpty(tagDefs)) {
+ for (RangerTagDef tagDef : tagDefs) {
+ tagDef.setId((long) ret.getTagDefinitions().size());
+ ret.getTagDefinitions().put(tagDef.getId(), tagDef);
}
}
} else {
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index 21a22cd..e9fe02f 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -20,10 +20,14 @@
package org.apache.ranger.tagsync.source.atlas;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.model.notification.EntityNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,12 +35,16 @@ import org.apache.ranger.plugin.util.ServiceTags;
import org.apache.ranger.tagsync.model.AbstractTagSource;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.kafka.common.TopicPartition;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntityWithTags;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
public class AtlasTagSource extends AbstractTagSource {
@@ -48,6 +56,10 @@ public class AtlasTagSource extends AbstractTagSource {
public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.kafka.zookeeper.connect";
public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id";
+ public static final int MAX_WAIT_TIME_IN_MILLIS = 1000;
+
+ private int maxBatchSize;
+
private ConsumerRunnable consumerTask;
private Thread myThread = null;
@@ -106,6 +118,8 @@ public class AtlasTagSource extends AbstractTagSource {
consumerTask = new ConsumerRunnable(iterators.get(0));
}
+ maxBatchSize = TagSyncConfig.getSinkMaxBatchSize(properties);
+
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTagSource.initialize(), result=" + ret);
}
@@ -152,89 +166,69 @@ public class AtlasTagSource extends AbstractTagSource {
private final NotificationConsumer<EntityNotification> consumer;
+ private final List<RangerAtlasEntityWithTags> atlasEntitiesWithTags = new ArrayList<>();
+ private final List<AtlasKafkaMessage<EntityNotification>> messages = new ArrayList<>();
+
+ private long offsetOfLastMessageDeliveredToRanger = -1L;
+ private long offsetOfLastMessageCommittedToKafka = -1L;
+
+ private boolean isHandlingDeleteOps = false;
+
private ConsumerRunnable(NotificationConsumer<EntityNotification> consumer) {
this.consumer = consumer;
}
-
@Override
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> ConsumerRunnable.run()");
}
- boolean seenCommitException = false;
- long offsetOfLastMessageDeliveredToRanger = -1L;
-
while (true) {
- try {
- List<AtlasKafkaMessage<EntityNotification>> messages = consumer.receive(1000L);
- int index = 0;
+ try {
+ List<AtlasKafkaMessage<EntityNotification>> newMessages = consumer.receive(MAX_WAIT_TIME_IN_MILLIS);
- if (messages.size() > 0 && seenCommitException) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("seenCommitException=[true], offsetOfLastMessageDeliveredToRanger=[" + offsetOfLastMessageDeliveredToRanger + "]");
+ if (newMessages.size() == 0) {
+ LOG.info("AtlasTagSource.ConsumerRunnable.run: no message from NotificationConsumer within " + MAX_WAIT_TIME_IN_MILLIS + " milliseconds");
+ if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags)) {
+ buildAndUploadServiceTags();
}
- for (; index < messages.size(); index++) {
- AtlasKafkaMessage<EntityNotification> message = messages.get(index);
- if (message.getOffset() <= offsetOfLastMessageDeliveredToRanger) {
- // Already delivered to Ranger
- TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+ } else {
+ for (AtlasKafkaMessage<EntityNotification> message : newMessages) {
+ EntityNotification notification = message != null ? message.getMessage() : null;
+
+ if (notification != null) {
+ EntityNotificationWrapper notificationWrapper = null;
try {
+ notificationWrapper = new EntityNotificationWrapper(notification);
+ } catch (Throwable e) {
+ LOG.error("notification:[" + notification + "] has some issues..perhaps null entity??", e);
+ }
+ if (notificationWrapper != null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Committing previously commit-failed message with offset:[" + message.getOffset() + "]");
+ LOG.debug("Message-offset=" + message.getOffset() + ", Notification=" + getPrintableEntityNotification(notificationWrapper));
}
- consumer.commit(partition, message.getOffset());
- } catch (Exception commitException) {
- LOG.warn("Ranger tagsync already processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException);
- LOG.warn("This will cause Kafka to deliver this message:[" + message.getOffset() + "] repeatedly!! This may be unrecoverable error!!");
- }
- } else {
- break;
- }
- }
- }
- seenCommitException = false;
- offsetOfLastMessageDeliveredToRanger = -1L;
+ RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(notificationWrapper);
- for (; index < messages.size(); index++) {
- AtlasKafkaMessage<EntityNotification> message = messages.get(index);
- EntityNotification notification = message != null ? message.getMessage() : null;
-
- if (notification != null) {
- EntityNotificationWrapper notificationWrapper = null;
- try {
- notificationWrapper = new EntityNotificationWrapper(notification);
- } catch (Throwable e) {
- LOG.error("notification:[" + notification +"] has some issues..perhaps null entity??", e);
- }
- if (notificationWrapper != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Message-offset=" + message.getOffset() + ", Notification=" + getPrintableEntityNotification(notificationWrapper));
- }
-
- ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notificationWrapper);
- if (serviceTags != null) {
- updateSink(serviceTags);
- }
- offsetOfLastMessageDeliveredToRanger = message.getOffset();
-
- if (!seenCommitException) {
- TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
- try {
- consumer.commit(partition, message.getOffset());
- } catch (Exception commitException) {
- seenCommitException = true;
- LOG.warn("Ranger tagsync processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException);
+ if ((notificationWrapper.getIsEntityDeleteOp() && !isHandlingDeleteOps) || (!notificationWrapper.getIsEntityDeleteOp() && isHandlingDeleteOps)) {
+ buildAndUploadServiceTags();
+ isHandlingDeleteOps = !isHandlingDeleteOps;
}
+
+ atlasEntitiesWithTags.add(entityWithTags);
+ messages.add(message);
}
+ } else {
+ LOG.error("Null entityNotification received from Kafka!! Ignoring..");
}
- } else {
- LOG.error("Null entityNotification received from Kafka!! Ignoring..");
+ }
+ if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags) && atlasEntitiesWithTags.size() >= maxBatchSize) {
+ buildAndUploadServiceTags();
}
}
+
} catch (Exception exception) {
LOG.error("Caught exception..: ", exception);
// If transient error, retry after short interval
@@ -248,6 +242,81 @@ public class AtlasTagSource extends AbstractTagSource {
}
}
}
+
+ private void buildAndUploadServiceTags() throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> buildAndUploadServiceTags()");
+ }
+
+ commitToKafka();
+
+ Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processAtlasEntities(atlasEntitiesWithTags);
+
+ if (MapUtils.isNotEmpty(serviceTagsMap)) {
+ if (serviceTagsMap.size() != 1) {
+ LOG.warn("Unexpected!! Notifications for more than one service received by AtlasTagSource.. Service-Names:[" + serviceTagsMap.keySet() + "]");
+ }
+ for (Map.Entry<String, ServiceTags> entry : serviceTagsMap.entrySet()) {
+ if (isHandlingDeleteOps) {
+ entry.getValue().setOp(ServiceTags.OP_DELETE);
+ entry.getValue().setTagDefinitions(Collections.EMPTY_MAP);
+ entry.getValue().setTags(Collections.EMPTY_MAP);
+ } else {
+ entry.getValue().setOp(ServiceTags.OP_ADD_OR_UPDATE);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ Gson gsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create();
+ String serviceTagsString = gsonBuilder.toJson(entry.getValue());
+
+ LOG.debug("serviceTags=" + serviceTagsString);
+ }
+ updateSink(entry.getValue());
+ }
+ offsetOfLastMessageDeliveredToRanger = messages.get(messages.size()-1).getOffset();
+ LOG.info("Completed processing batch of messages of size:[" + messages.size() + "] received from NotificationConsumer");
+
+ commitToKafka();
+ }
+
+ atlasEntitiesWithTags.clear();
+ messages.clear();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== buildAndUploadServiceTags()");
+ }
+ }
+
+ private void commitToKafka() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> commitToKafka()");
+ }
+
+ for (AtlasKafkaMessage<EntityNotification> message : messages) {
+ if (message.getOffset() > offsetOfLastMessageCommittedToKafka) {
+ if (message.getOffset() <= offsetOfLastMessageDeliveredToRanger) {
+ // Already delivered to Ranger
+ TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committing message with offset:[" + message.getOffset() + "] to Kafka");
+ }
+ consumer.commit(partition, message.getOffset());
+ offsetOfLastMessageCommittedToKafka = message.getOffset();
+ } catch (Exception commitException) {
+ LOG.warn("Ranger tagsync already processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException);
+ LOG.warn("This will cause Kafka to deliver this message:[" + message.getOffset() + "] repeatedly!! This may be unrecoverable error!!");
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== commitToKafka()");
+ }
+ }
}
}
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
index 8b12aff..f49a0a8 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
@@ -189,30 +189,29 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
}
@Override
- public void run() {
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasRESTTagSource.run()");
- }
-
- while (true) {
-
- synchUp();
-
- LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds");
+ public void run() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasRESTTagSource.run()");
+ }
+ while (true) {
+ try {
+ synchUp();
- try {
+ LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + "] milliSeconds");
- Thread.sleep(sleepTimeBetweenCycleInMillis);
+ Thread.sleep(sleepTimeBetweenCycleInMillis);
- } catch (InterruptedException exception) {
- LOG.error("Interrupted..: ", exception);
- return;
- }
- }
- }
+ } catch (InterruptedException exception) {
+ LOG.error("Interrupted..: ", exception);
+ return;
+ } catch (Exception e) {
+ LOG.error("Caught exception", e);
+ return;
+ }
+ }
+ }
- public void synchUp() {
+ public void synchUp() throws Exception {
List<RangerAtlasEntityWithTags> rangerAtlasEntities = getAtlasActiveEntities();
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
index 62a5f73..ac611a1 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
@@ -230,6 +230,10 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
LOG.error("Interrupted..: ", exception);
return;
}
+ catch (Exception e) {
+ LOG.error("Caught exception..", e);
+ return;
+ }
}
}
@@ -258,7 +262,7 @@ public class FileTagSource extends AbstractTagSource implements Runnable {
return ret;
}
- public void synchUp() {
+ public void synchUp() throws Exception {
if (isChanged()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Begin: update tags from source==>sink");
diff --git a/tagsync/src/main/resources/ranger-tagsync-default.xml b/tagsync/src/main/resources/ranger-tagsync-default.xml
index 1034bc6..5a16f25 100644
--- a/tagsync/src/main/resources/ranger-tagsync-default.xml
+++ b/tagsync/src/main/resources/ranger-tagsync-default.xml
@@ -41,4 +41,8 @@
<name>ranger.tagsync.dest.ranger.session.cookie.name</name>
<value>RANGERADMINSESSIONID</value>
</property>
+ <property>
+ <name>ranger.tagsync.dest.ranger.max.batch.size</name>
+ <value>1</value>
+ </property>
</configuration>