You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ab...@apache.org on 2018/05/17 16:06:08 UTC

ranger git commit: RANGER-2104: Ranger tagsync should ignore ENTITY_UPDATE events if the updated entity does not have associated traits

Repository: ranger
Updated Branches:
  refs/heads/ranger-0.7 7e3963bc7 -> 126ff6ee0


RANGER-2104: Ranger tagsync should ignore ENTITY_UPDATE events if the updated entity does not have associated traits


Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/126ff6ee
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/126ff6ee
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/126ff6ee

Branch: refs/heads/ranger-0.7
Commit: 126ff6ee04e580dcf8b924f76df0e3917221106e
Parents: 7e3963b
Author: Abhay Kulkarni <ak...@hortonworks.com>
Authored: Thu May 17 08:51:40 2018 -0700
Committer: Abhay Kulkarni <ak...@hortonworks.com>
Committed: Thu May 17 08:51:40 2018 -0700

----------------------------------------------------------------------
 pom.xml                                         |  1 +
 src/main/assembly/tagsync.xml                   |  1 +
 .../source/atlas/AtlasNotificationMapper.java   | 53 +++++++++++++------
 .../tagsync/source/atlas/AtlasTagSource.java    | 54 +++++++++++++++++---
 4 files changed, 85 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ranger/blob/126ff6ee/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b76e4e3..2ec6768 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,6 +129,7 @@
         <atlas.guava.version>14.0</atlas.guava.version>
         <atlas.gson.version>2.5</atlas.gson.version>
         <atlas.jettison.version>1.3.7</atlas.jettison.version>
+        <atlas.commons.compress.version>1.4.1</atlas.commons.compress.version>
         <atlas.commons.logging.version>1.1.3</atlas.commons.logging.version>
         <avatica.version>1.7.1</avatica.version>
         <bouncycastle.version>1.55</bouncycastle.version>

http://git-wip-us.apache.org/repos/asf/ranger/blob/126ff6ee/src/main/assembly/tagsync.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml
index 26b42ca..5139937 100644
--- a/src/main/assembly/tagsync.xml
+++ b/src/main/assembly/tagsync.xml
@@ -50,6 +50,7 @@
 					<include>org.apache.atlas:atlas-client-v2:jar:${atlas.version}</include>
 					<include>org.apache.atlas:atlas-common:jar:${atlas.version}</include>
 					<include>org.apache.atlas:atlas-intg:jar:${atlas.version}</include>
+					<include>org.apache.commons:commons-compress:jar:${atlas.commons.compress.version}</include>
 					<include>org.apache.hadoop:hadoop-auth</include>
 					<include>org.apache.hadoop:hadoop-common</include>
 					<include>org.apache.kafka:kafka_${scala.binary.version}:jar:${kafka.version}</include>

http://git-wip-us.apache.org/repos/asf/ranger/blob/126ff6ee/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index 8641d60..1c7f063 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -51,7 +51,6 @@ import java.util.*;
 public class AtlasNotificationMapper {
 	private static final Log LOG = LogFactory.getLog(AtlasNotificationMapper.class);
 
-
 	private static Map<String, Long> unhandledEventTypes = new HashMap<String, Long>();
 
 	private static final ThreadLocal<DateFormat> DATE_FORMATTER = new ThreadLocal<DateFormat>() {
@@ -140,8 +139,20 @@ public class AtlasNotificationMapper {
 			switch (opType) {
 				case ENTITY_CREATE:
 					ret = CollectionUtils.isNotEmpty(entityNotification.getAllTraits());
+					if (!ret) {
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("ENTITY_CREATE notification is ignored, as there are no traits associated with the entity. Ranger will get necessary information from any subsequent TRAIT_ADDED notification");
+						}
+					}
 					break;
 				case ENTITY_UPDATE:
+					ret = CollectionUtils.isNotEmpty(entityNotification.getAllTraits());
+					if (!ret) {
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("ENTITY_UPDATE notification is ignored, as there are no traits associated with the entity.");
+						}
+					}
+					break;
 				case ENTITY_DELETE:
 				case TRAIT_ADD:
 				case TRAIT_UPDATE:
@@ -278,9 +289,8 @@ public class AtlasNotificationMapper {
 		List<RangerTag>        ret    = new ArrayList<RangerTag>();
 		IReferenceableInstance entity = entityWithTraits != null ? entityWithTraits.getEntity() : null;
 
-		if(entity != null && CollectionUtils.isNotEmpty(entity.getTraits())) {
-			for (String traitName : entity.getTraits()) {
-				IStruct             trait    = entity.getTrait(traitName);
+		if(entity != null && CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) {
+			for (IStruct trait : entityWithTraits.getAllTraits()) {
 				Map<String, String> tagAttrs = new HashMap<String, String>();
 
 				try {
@@ -310,9 +320,8 @@ public class AtlasNotificationMapper {
 		List<RangerTagDef>     ret    = new ArrayList<RangerTagDef>();
 		IReferenceableInstance entity = entityWithTraits != null ? entityWithTraits.getEntity() : null;
 
-		if(entity != null && CollectionUtils.isNotEmpty(entity.getTraits())) {
-			for (String traitName : entity.getTraits()) {
-				IStruct       trait = entity.getTrait(traitName);
+		if(entity != null && CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) {
+			for (IStruct trait : entityWithTraits.getAllTraits()) {
 				RangerTagDef tagDef = new RangerTagDef(trait.getTypeName(), "Atlas");
 
 				try {
@@ -415,7 +424,7 @@ public class AtlasNotificationMapper {
 
 		if (serviceResource != null) {
 			List<RangerTag>     tags        = getTags(entity, typeRegistry);
-			List<RangerTagDef>  tagDefs     = getTagDefs(entity);
+			List<RangerTagDef>  tagDefs     = getTagDefs(entity, typeRegistry);
 			String              serviceName = serviceResource.getServiceName();
 
 			ret = createOrGetServiceTags(serviceTagsMap, serviceName);
@@ -477,28 +486,38 @@ public class AtlasNotificationMapper {
 		return ret;
 	}
 
-	static private List<RangerTagDef> getTagDefs(AtlasEntityHeader entity) {
+	static private List<RangerTagDef> getTagDefs(AtlasEntityHeader entity, AtlasTypeRegistry typeRegistry) {
 		List<RangerTagDef> ret = new ArrayList<>();
 
 		if(entity != null && CollectionUtils.isNotEmpty(entity.getClassificationNames())) {
-			List<AtlasClassification> traits = entity.getClassifications();
+			List<AtlasClassification> classifications = entity.getClassifications();
 
-			for (AtlasClassification trait : traits) {
-				RangerTagDef tagDef = new RangerTagDef(trait.getTypeName(), "Atlas");
+			for (AtlasClassification classification : classifications) {
+				ret.add(getTagDef(classification));
 
-				if(MapUtils.isNotEmpty(trait.getAttributes())) {
-					for (String attrName : trait.getAttributes().keySet()) {
-						tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string"));
+				List<AtlasClassification> superClassifications = getSuperClassifications(classification, typeRegistry);
+
+				if (CollectionUtils.isNotEmpty(superClassifications)) {
+					for (AtlasClassification superClassification : superClassifications) {
+						ret.add(getTagDef(superClassification));
 					}
 				}
-
-				ret.add(tagDef);
 			}
 		}
 
 		return ret;
 	}
 
+	static private RangerTagDef getTagDef(AtlasClassification classification) {
+		RangerTagDef tagDef = new RangerTagDef(classification.getTypeName(), "Atlas");
+		if(MapUtils.isNotEmpty(classification.getAttributes())) {
+			for (String attrName : classification.getAttributes().keySet()) {
+				tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string"));
+			}
+		}
+		return tagDef;
+	}
+
 	static private List<AtlasClassification> getSuperClassifications(AtlasClassification classification, AtlasTypeRegistry typeRegistry) {
 		List<AtlasClassification> ret                = null;
 		AtlasClassificationType   classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName());

http://git-wip-us.apache.org/repos/asf/ranger/blob/126ff6ee/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index 95ff8ec..3810442 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -160,32 +160,72 @@ public class AtlasTagSource extends AbstractTagSource {
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("==> ConsumerRunnable.run()");
 			}
+			boolean seenCommitException = false;
+			long offsetOfLastMessageDeliveredToRanger = -1L;
+
 			while (true) {
 				try {
 					List<AtlasKafkaMessage<EntityNotification>> messages = consumer.receive(1000L);
+					int index = 0;
+
+					if (seenCommitException) {
+						for (; index < messages.size(); index++) {
+							AtlasKafkaMessage<EntityNotification> message = messages.get(index);
+							if (message.getOffset() <= offsetOfLastMessageDeliveredToRanger) {
+								// Already delivered to Ranger
+								TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+								try {
+									consumer.commit(partition, message.getOffset());
+								} catch (Exception commitException) {
+									LOG.warn("Ranger tagsync already processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException);
+									LOG.warn("This will cause Kafka to deliver this message:[" + message.getOffset() + "] repeatedly!! This may be unrecoverable error!!");
+								}
+							} else {
+								seenCommitException = false;
+								offsetOfLastMessageDeliveredToRanger = -1L;
+								break;
+							}
+						}
+					}
 
-					for (AtlasKafkaMessage<EntityNotification> message :  messages) {
+					for (; index < messages.size(); index++) {
+						AtlasKafkaMessage<EntityNotification> message = messages.get(index);
 						EntityNotification notification = message != null ? message.getMessage() : null;
 
 						if (notification != null) {
 							if (LOG.isDebugEnabled()) {
-								LOG.debug("Notification=" + getPrintableEntityNotification(notification));
+								LOG.debug("Message-offset=" + message.getOffset() + ", Notification=" + getPrintableEntityNotification(notification));
 							}
 
 							ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification);
 							if (serviceTags != null) {
 								updateSink(serviceTags);
 							}
-
-							TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
-							consumer.commit(partition, message.getOffset());
+							offsetOfLastMessageDeliveredToRanger = message.getOffset();
+
+							if (!seenCommitException) {
+								TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+								try {
+									consumer.commit(partition, message.getOffset());
+								} catch (Exception commitException) {
+									seenCommitException = true;
+									LOG.warn("Ranger tagsync processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException);
+								}
+							}
 						} else {
 							LOG.error("Null entityNotification received from Kafka!! Ignoring..");
 						}
 					}
 				} catch (Exception exception) {
-					LOG.error("Caught exception..: ", exception);
-					return;
+					LOG.error("Caught exception : ", exception);
+					// If transient error, retry after short interval
+					try {
+						Thread.sleep(100);
+					} catch (InterruptedException interrupted) {
+						LOG.error("Interrupted: ", interrupted);
+						LOG.error("Returning from thread. May cause process to be up but not processing events!!");
+						return;
+					}
 				}
 			}
 		}