You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ab...@apache.org on 2018/05/17 16:06:08 UTC
ranger git commit: RANGER-2104: Ranger tagsync should ignore
ENTITY_UPDATE events if the updated entity does not have associated traits
Repository: ranger
Updated Branches:
refs/heads/ranger-0.7 7e3963bc7 -> 126ff6ee0
RANGER-2104: Ranger tagsync should ignore ENTITY_UPDATE events if the updated entity does not have associated traits
Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/126ff6ee
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/126ff6ee
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/126ff6ee
Branch: refs/heads/ranger-0.7
Commit: 126ff6ee04e580dcf8b924f76df0e3917221106e
Parents: 7e3963b
Author: Abhay Kulkarni <ak...@hortonworks.com>
Authored: Thu May 17 08:51:40 2018 -0700
Committer: Abhay Kulkarni <ak...@hortonworks.com>
Committed: Thu May 17 08:51:40 2018 -0700
----------------------------------------------------------------------
pom.xml | 1 +
src/main/assembly/tagsync.xml | 1 +
.../source/atlas/AtlasNotificationMapper.java | 53 +++++++++++++------
.../tagsync/source/atlas/AtlasTagSource.java | 54 +++++++++++++++++---
4 files changed, 85 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ranger/blob/126ff6ee/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b76e4e3..2ec6768 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,6 +129,7 @@
<atlas.guava.version>14.0</atlas.guava.version>
<atlas.gson.version>2.5</atlas.gson.version>
<atlas.jettison.version>1.3.7</atlas.jettison.version>
+ <atlas.commons.compress.version>1.4.1</atlas.commons.compress.version>
<atlas.commons.logging.version>1.1.3</atlas.commons.logging.version>
<avatica.version>1.7.1</avatica.version>
<bouncycastle.version>1.55</bouncycastle.version>
http://git-wip-us.apache.org/repos/asf/ranger/blob/126ff6ee/src/main/assembly/tagsync.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml
index 26b42ca..5139937 100644
--- a/src/main/assembly/tagsync.xml
+++ b/src/main/assembly/tagsync.xml
@@ -50,6 +50,7 @@
<include>org.apache.atlas:atlas-client-v2:jar:${atlas.version}</include>
<include>org.apache.atlas:atlas-common:jar:${atlas.version}</include>
<include>org.apache.atlas:atlas-intg:jar:${atlas.version}</include>
+ <include>org.apache.commons:commons-compress:jar:${atlas.commons.compress.version}</include>
<include>org.apache.hadoop:hadoop-auth</include>
<include>org.apache.hadoop:hadoop-common</include>
<include>org.apache.kafka:kafka_${scala.binary.version}:jar:${kafka.version}</include>
http://git-wip-us.apache.org/repos/asf/ranger/blob/126ff6ee/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index 8641d60..1c7f063 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -51,7 +51,6 @@ import java.util.*;
public class AtlasNotificationMapper {
private static final Log LOG = LogFactory.getLog(AtlasNotificationMapper.class);
-
private static Map<String, Long> unhandledEventTypes = new HashMap<String, Long>();
private static final ThreadLocal<DateFormat> DATE_FORMATTER = new ThreadLocal<DateFormat>() {
@@ -140,8 +139,20 @@ public class AtlasNotificationMapper {
switch (opType) {
case ENTITY_CREATE:
ret = CollectionUtils.isNotEmpty(entityNotification.getAllTraits());
+ if (!ret) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ENTITY_CREATE notification is ignored, as there are no traits associated with the entity. Ranger will get necessary information from any subsequent TRAIT_ADDED notification");
+ }
+ }
break;
case ENTITY_UPDATE:
+ ret = CollectionUtils.isNotEmpty(entityNotification.getAllTraits());
+ if (!ret) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ENTITY_UPDATE notification is ignored, as there are no traits associated with the entity.");
+ }
+ }
+ break;
case ENTITY_DELETE:
case TRAIT_ADD:
case TRAIT_UPDATE:
@@ -278,9 +289,8 @@ public class AtlasNotificationMapper {
List<RangerTag> ret = new ArrayList<RangerTag>();
IReferenceableInstance entity = entityWithTraits != null ? entityWithTraits.getEntity() : null;
- if(entity != null && CollectionUtils.isNotEmpty(entity.getTraits())) {
- for (String traitName : entity.getTraits()) {
- IStruct trait = entity.getTrait(traitName);
+ if(entity != null && CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) {
+ for (IStruct trait : entityWithTraits.getAllTraits()) {
Map<String, String> tagAttrs = new HashMap<String, String>();
try {
@@ -310,9 +320,8 @@ public class AtlasNotificationMapper {
List<RangerTagDef> ret = new ArrayList<RangerTagDef>();
IReferenceableInstance entity = entityWithTraits != null ? entityWithTraits.getEntity() : null;
- if(entity != null && CollectionUtils.isNotEmpty(entity.getTraits())) {
- for (String traitName : entity.getTraits()) {
- IStruct trait = entity.getTrait(traitName);
+ if(entity != null && CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) {
+ for (IStruct trait : entityWithTraits.getAllTraits()) {
RangerTagDef tagDef = new RangerTagDef(trait.getTypeName(), "Atlas");
try {
@@ -415,7 +424,7 @@ public class AtlasNotificationMapper {
if (serviceResource != null) {
List<RangerTag> tags = getTags(entity, typeRegistry);
- List<RangerTagDef> tagDefs = getTagDefs(entity);
+ List<RangerTagDef> tagDefs = getTagDefs(entity, typeRegistry);
String serviceName = serviceResource.getServiceName();
ret = createOrGetServiceTags(serviceTagsMap, serviceName);
@@ -477,28 +486,38 @@ public class AtlasNotificationMapper {
return ret;
}
- static private List<RangerTagDef> getTagDefs(AtlasEntityHeader entity) {
+ static private List<RangerTagDef> getTagDefs(AtlasEntityHeader entity, AtlasTypeRegistry typeRegistry) {
List<RangerTagDef> ret = new ArrayList<>();
if(entity != null && CollectionUtils.isNotEmpty(entity.getClassificationNames())) {
- List<AtlasClassification> traits = entity.getClassifications();
+ List<AtlasClassification> classifications = entity.getClassifications();
- for (AtlasClassification trait : traits) {
- RangerTagDef tagDef = new RangerTagDef(trait.getTypeName(), "Atlas");
+ for (AtlasClassification classification : classifications) {
+ ret.add(getTagDef(classification));
- if(MapUtils.isNotEmpty(trait.getAttributes())) {
- for (String attrName : trait.getAttributes().keySet()) {
- tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string"));
+ List<AtlasClassification> superClassifications = getSuperClassifications(classification, typeRegistry);
+
+ if (CollectionUtils.isNotEmpty(superClassifications)) {
+ for (AtlasClassification superClassification : superClassifications) {
+ ret.add(getTagDef(superClassification));
}
}
-
- ret.add(tagDef);
}
}
return ret;
}
+ static private RangerTagDef getTagDef(AtlasClassification classification) {
+ RangerTagDef tagDef = new RangerTagDef(classification.getTypeName(), "Atlas");
+ if(MapUtils.isNotEmpty(classification.getAttributes())) {
+ for (String attrName : classification.getAttributes().keySet()) {
+ tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string"));
+ }
+ }
+ return tagDef;
+ }
+
static private List<AtlasClassification> getSuperClassifications(AtlasClassification classification, AtlasTypeRegistry typeRegistry) {
List<AtlasClassification> ret = null;
AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(classification.getTypeName());
http://git-wip-us.apache.org/repos/asf/ranger/blob/126ff6ee/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index 95ff8ec..3810442 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -160,32 +160,72 @@ public class AtlasTagSource extends AbstractTagSource {
if (LOG.isDebugEnabled()) {
LOG.debug("==> ConsumerRunnable.run()");
}
+ boolean seenCommitException = false;
+ long offsetOfLastMessageDeliveredToRanger = -1L;
+
while (true) {
try {
List<AtlasKafkaMessage<EntityNotification>> messages = consumer.receive(1000L);
+ int index = 0;
+
+ if (seenCommitException) {
+ for (; index < messages.size(); index++) {
+ AtlasKafkaMessage<EntityNotification> message = messages.get(index);
+ if (message.getOffset() <= offsetOfLastMessageDeliveredToRanger) {
+ // Already delivered to Ranger
+ TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+ try {
+ consumer.commit(partition, message.getOffset());
+ } catch (Exception commitException) {
+ LOG.warn("Ranger tagsync already processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException);
+ LOG.warn("This will cause Kafka to deliver this message:[" + message.getOffset() + "] repeatedly!! This may be unrecoverable error!!");
+ }
+ } else {
+ seenCommitException = false;
+ offsetOfLastMessageDeliveredToRanger = -1L;
+ break;
+ }
+ }
+ }
- for (AtlasKafkaMessage<EntityNotification> message : messages) {
+ for (; index < messages.size(); index++) {
+ AtlasKafkaMessage<EntityNotification> message = messages.get(index);
EntityNotification notification = message != null ? message.getMessage() : null;
if (notification != null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Notification=" + getPrintableEntityNotification(notification));
+ LOG.debug("Message-offset=" + message.getOffset() + ", Notification=" + getPrintableEntityNotification(notification));
}
ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification);
if (serviceTags != null) {
updateSink(serviceTags);
}
-
- TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
- consumer.commit(partition, message.getOffset());
+ offsetOfLastMessageDeliveredToRanger = message.getOffset();
+
+ if (!seenCommitException) {
+ TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+ try {
+ consumer.commit(partition, message.getOffset());
+ } catch (Exception commitException) {
+ seenCommitException = true;
+ LOG.warn("Ranger tagsync processed message at offset " + message.getOffset() + ". Ignoring failure in committing this message and continuing to process next message", commitException);
+ }
+ }
} else {
LOG.error("Null entityNotification received from Kafka!! Ignoring..");
}
}
} catch (Exception exception) {
- LOG.error("Caught exception..: ", exception);
- return;
+ LOG.error("Caught exception : ", exception);
+ // If transient error, retry after short interval
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException interrupted) {
+ LOG.error("Interrupted: ", interrupted);
+ LOG.error("Returning from thread. May cause process to be up but not processing events!!");
+ return;
+ }
}
}
}