You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ab...@apache.org on 2018/02/28 21:00:07 UTC
ranger git commit: RANGER-1997: Update tagsync to handle Atlas
notifications of type V1 and V2
Repository: ranger
Updated Branches:
refs/heads/master 49b8a7a26 -> 9ed210470
RANGER-1997: Update tagsync to handle Atlas notifications of type V1 and V2
Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/9ed21047
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/9ed21047
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/9ed21047
Branch: refs/heads/master
Commit: 9ed210470bbc447d1fbacd9b98421309394061b0
Parents: 49b8a7a
Author: Abhay Kulkarni <ak...@hortonworks.com>
Authored: Wed Feb 28 12:59:58 2018 -0800
Committer: Abhay Kulkarni <ak...@hortonworks.com>
Committed: Wed Feb 28 12:59:58 2018 -0800
----------------------------------------------------------------------
.../source/atlas/AtlasNotificationMapper.java | 30 +--
.../tagsync/source/atlas/AtlasTagSource.java | 44 ++--
.../source/atlas/EntityNotificationWrapper.java | 242 +++++++++++++++++++
.../atlasrest/RangerAtlasEntityWithTags.java | 31 +--
4 files changed, 283 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ranger/blob/9ed21047/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index 91cf606..916aad3 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -19,9 +19,6 @@
package org.apache.ranger.tagsync.source.atlas;
-import org.apache.atlas.v1.model.notification.EntityNotificationV1;
-import org.apache.atlas.v1.model.instance.Id;
-import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
@@ -46,13 +43,12 @@ public class AtlasNotificationMapper {
private static Map<String, Long> unhandledEventTypes = new HashMap<String, Long>();
- private static void logUnhandledEntityNotification(EntityNotificationV1 entityNotification) {
+ private static void logUnhandledEntityNotification(EntityNotificationWrapper entityNotification) {
final int REPORTING_INTERVAL_FOR_UNHANDLED_ENTITYTYPE_IN_MILLIS = 5 * 60 * 1000; // 5 minutes
boolean loggingNeeded = false;
- String entityTypeName = entityNotification != null && entityNotification.getEntity() != null ?
- entityNotification.getEntity().getTypeName() : null;
+ String entityTypeName = entityNotification.getEntityTypeName();
if (entityTypeName != null) {
Long timeInMillis = unhandledEventTypes.get(entityTypeName);
@@ -76,7 +72,7 @@ public class AtlasNotificationMapper {
}
@SuppressWarnings("unchecked")
- public static ServiceTags processEntityNotification(EntityNotificationV1 entityNotification) {
+ public static ServiceTags processEntityNotification(EntityNotificationWrapper entityNotification) {
ServiceTags ret = null;
@@ -84,7 +80,7 @@ public class AtlasNotificationMapper {
try {
RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(entityNotification);
- if (entityNotification.getOperationType() == EntityNotificationV1.OperationType.ENTITY_DELETE) {
+ if (entityNotification.getIsEntityDeleteOp()) {
ret = buildServiceTagsForEntityDeleteNotification(entityWithTags);
} else {
ret = buildServiceTags(entityWithTags, null);
@@ -111,21 +107,21 @@ public class AtlasNotificationMapper {
return ret;
}
- static private boolean isNotificationHandled(EntityNotificationV1 entityNotification) {
+ static private boolean isNotificationHandled(EntityNotificationWrapper entityNotification) {
boolean ret = false;
- EntityNotificationV1.OperationType opType = entityNotification.getOperationType();
+ EntityNotificationWrapper.NotificationType opType = entityNotification.getEntityNotificationType();
if (opType != null) {
switch (opType) {
case ENTITY_CREATE:
- ret = CollectionUtils.isNotEmpty(entityNotification.getAllTraits());
+ ret = ! entityNotification.getIsEmptyClassifications();
break;
case ENTITY_UPDATE:
case ENTITY_DELETE:
- case TRAIT_ADD:
- case TRAIT_UPDATE:
- case TRAIT_DELETE: {
+ case CLASSIFICATION_ADD:
+ case CLASSIFICATION_UPDATE:
+ case CLASSIFICATION_DELETE: {
ret = true;
break;
}
@@ -134,11 +130,7 @@ public class AtlasNotificationMapper {
break;
}
if (ret) {
- final Referenceable entity = entityNotification.getEntity();
-
- ret = entity != null
- && entity.getId().getState() == Id.EntityState.ACTIVE
- && AtlasResourceMapperUtil.isEntityTypeHandled(entity.getTypeName());
+ ret = entityNotification.getIsEntityTypeHandled();
}
}
http://git-wip-us.apache.org/repos/asf/ranger/blob/9ed21047/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index 8c15ee5..a13a789 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -21,9 +21,9 @@ package org.apache.ranger.tagsync.source.atlas;
import org.apache.atlas.kafka.NotificationProvider;
+import org.apache.atlas.model.notification.EntityNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.v1.model.notification.EntityNotificationV1;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -101,7 +101,7 @@ public class AtlasTagSource extends AbstractTagSource {
if (ret) {
NotificationInterface notification = NotificationProvider.get();
- List<NotificationConsumer<EntityNotificationV1>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
+ List<NotificationConsumer<EntityNotification>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
consumerTask = new ConsumerRunnable(iterators.get(0));
@@ -138,10 +138,10 @@ public class AtlasTagSource extends AbstractTagSource {
}
}
- private static String getPrintableEntityNotification(EntityNotificationV1 notification) {
+ private static String getPrintableEntityNotification(EntityNotificationWrapper notification) {
StringBuilder sb = new StringBuilder();
- sb.append("{ Notification-Type: ").append(notification.getOperationType()).append(", ");
+ sb.append("{ Notification-Type: ").append(notification.getEntityNotificationType()).append(", ");
RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(notification);
sb.append(entityWithTags.toString());
@@ -151,9 +151,9 @@ public class AtlasTagSource extends AbstractTagSource {
private class ConsumerRunnable implements Runnable {
- private final NotificationConsumer<EntityNotificationV1> consumer;
+ private final NotificationConsumer<EntityNotification> consumer;
- private ConsumerRunnable(NotificationConsumer<EntityNotificationV1> consumer) {
+ private ConsumerRunnable(NotificationConsumer<EntityNotification> consumer) {
this.consumer = consumer;
}
@@ -165,23 +165,31 @@ public class AtlasTagSource extends AbstractTagSource {
}
while (true) {
try {
- List<AtlasKafkaMessage<EntityNotificationV1>> messages = consumer.receive(1000L);
+ List<AtlasKafkaMessage<EntityNotification>> messages = consumer.receive(1000L);
- for (AtlasKafkaMessage<EntityNotificationV1> message : messages) {
- EntityNotificationV1 notification = message != null ? message.getMessage() : null;
+ for (AtlasKafkaMessage<EntityNotification> message : messages) {
+ EntityNotification notification = message != null ? message.getMessage() : null;
if (notification != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Notification=" + getPrintableEntityNotification(notification));
+ EntityNotificationWrapper notificationWrapper = null;
+ try {
+ notificationWrapper = new EntityNotificationWrapper(notification);
+ } catch (Throwable e) {
+ LOG.error("notification:[" + notification +"] has some issues..perhaps null entity??", e);
}
-
- ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification);
- if (serviceTags != null) {
- updateSink(serviceTags);
+ if (notificationWrapper != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Notification=" + getPrintableEntityNotification(notificationWrapper));
+ }
+
+ ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notificationWrapper);
+ if (serviceTags != null) {
+ updateSink(serviceTags);
+ }
+
+ TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+ consumer.commit(partition, message.getOffset());
}
-
- TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
- consumer.commit(partition, message.getOffset());
} else {
LOG.error("Null entityNotification received from Kafka!! Ignoring..");
}
http://git-wip-us.apache.org/repos/asf/ranger/blob/9ed21047/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java
new file mode 100644
index 0000000..e680b14
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.notification.EntityNotification;
+import org.apache.atlas.v1.model.instance.Id;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
+import org.apache.atlas.v1.model.notification.EntityNotificationV2;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
+
+import javax.annotation.Nonnull;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EntityNotificationWrapper {
+ private static final Log LOG = LogFactory.getLog(EntityNotificationWrapper.class);
+
+ public enum NotificationType { UNKNOWN, ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, CLASSIFICATION_ADD, CLASSIFICATION_UPDATE, CLASSIFICATION_DELETE}
+
+ private final EntityNotification notification;
+ private final EntityNotification.EntityNotificationType notificationType;
+ private final RangerAtlasEntity rangerAtlasEntity;
+ private final String entityTypeName;
+ private final boolean isEntityTypeHandled;
+ private final boolean isEntityDeleteOp;
+ private final boolean isEmptyClassifications;
+
+ EntityNotificationWrapper(@Nonnull EntityNotification notification) {
+ this.notification = notification;
+ notificationType = this.notification.getType();
+
+ switch (notificationType) {
+ case ENTITY_NOTIFICATION_V2: {
+
+ EntityNotificationV2 v2Notification = (EntityNotificationV2) notification;
+ AtlasEntity atlasEntity = v2Notification.getEntity();
+ String guid = atlasEntity.getGuid();
+ String typeName = atlasEntity.getTypeName();
+
+ rangerAtlasEntity = new RangerAtlasEntity(typeName, guid, atlasEntity.getAttributes());
+ entityTypeName = atlasEntity.getTypeName();
+ isEntityTypeHandled = atlasEntity.getStatus() == AtlasEntity.Status.ACTIVE
+ && AtlasResourceMapperUtil.isEntityTypeHandled(entityTypeName);
+ isEntityDeleteOp = EntityNotificationV2.OperationType.ENTITY_DELETE == v2Notification.getOperationType();
+ isEmptyClassifications = CollectionUtils.isNotEmpty(v2Notification.getClassifications());
+ }
+ break;
+ case ENTITY_NOTIFICATION_V1: {
+ EntityNotificationV1 v1Notification = (EntityNotificationV1) notification;
+
+ Referenceable atlasEntity = v1Notification.getEntity();
+ String guid = atlasEntity.getId()._getId();
+ String typeName = atlasEntity.getTypeName();
+
+ rangerAtlasEntity = new RangerAtlasEntity(typeName, guid, atlasEntity.getValues());
+ entityTypeName = atlasEntity.getTypeName();
+ isEntityTypeHandled = atlasEntity.getId().getState() == Id.EntityState.ACTIVE
+ && AtlasResourceMapperUtil.isEntityTypeHandled(entityTypeName);
+ isEntityDeleteOp = EntityNotificationV1.OperationType.ENTITY_DELETE == v1Notification.getOperationType();
+ isEmptyClassifications = CollectionUtils.isNotEmpty(v1Notification.getAllTraits());
+ }
+ break;
+ default: {
+ LOG.error("Unknown notification type - [" + notificationType + "]");
+
+ rangerAtlasEntity = null;
+ entityTypeName = null;
+ isEntityTypeHandled = false;
+ isEntityDeleteOp = false;
+ isEmptyClassifications = true;
+ }
+
+ break;
+ }
+ }
+
+ public RangerAtlasEntity getRangerAtlasEntity() {
+ return rangerAtlasEntity;
+ }
+
+ public String getEntityTypeName() {
+ return entityTypeName;
+ }
+
+ public boolean getIsEntityTypeHandled() {
+ return isEntityTypeHandled;
+ }
+
+ public boolean getIsEntityDeleteOp() {
+ return isEntityDeleteOp;
+ }
+
+ public boolean getIsEmptyClassifications() {
+ return isEmptyClassifications;
+ }
+
+ public NotificationType getEntityNotificationType() {
+ NotificationType ret = NotificationType.UNKNOWN;
+
+ switch (notificationType) {
+ case ENTITY_NOTIFICATION_V2: {
+ EntityNotificationV2.OperationType opType = ((EntityNotificationV2) notification).getOperationType();
+ switch (opType) {
+ case ENTITY_CREATE:
+ ret = NotificationType.ENTITY_CREATE;
+ break;
+ case ENTITY_UPDATE:
+ ret = NotificationType.ENTITY_UPDATE;
+ break;
+ case ENTITY_DELETE:
+ ret = NotificationType.ENTITY_DELETE;
+ break;
+ case CLASSIFICATION_ADD:
+ ret = NotificationType.CLASSIFICATION_ADD;
+ break;
+ case CLASSIFICATION_UPDATE:
+ ret = NotificationType.CLASSIFICATION_UPDATE;
+ break;
+ case CLASSIFICATION_DELETE:
+ ret = NotificationType.CLASSIFICATION_DELETE;
+ break;
+ default:
+ LOG.error("Received OperationType [" + opType + "], converting to UNKNOWN");
+ break;
+ }
+ break;
+ }
+ case ENTITY_NOTIFICATION_V1: {
+ EntityNotificationV1.OperationType opType = ((EntityNotificationV1) notification).getOperationType();
+ switch (opType) {
+ case ENTITY_CREATE:
+ ret = NotificationType.ENTITY_CREATE;
+ break;
+ case ENTITY_UPDATE:
+ ret = NotificationType.ENTITY_UPDATE;
+ break;
+ case ENTITY_DELETE:
+ ret = NotificationType.ENTITY_DELETE;
+ break;
+ case TRAIT_ADD:
+ ret = NotificationType.CLASSIFICATION_ADD;
+ break;
+ case TRAIT_UPDATE:
+ ret = NotificationType.CLASSIFICATION_UPDATE;
+ break;
+ case TRAIT_DELETE:
+ ret = NotificationType.CLASSIFICATION_DELETE;
+ break;
+ default:
+ LOG.error("Received OperationType [" + opType + "], converting to UNKNOWN");
+ break;
+ }
+ break;
+ }
+ default: {
+ LOG.error("Unknown notification type - [" + notificationType + "]");
+ }
+ break;
+ }
+
+ return ret;
+ }
+
+ public Map<String, Map<String, String>> getAllClassifications() {
+ Map<String, Map<String, String>> ret = new HashMap<>();
+
+ switch (notificationType) {
+ case ENTITY_NOTIFICATION_V2: {
+ List<AtlasClassification> allClassifications = ((EntityNotificationV2) notification).getClassifications();
+ if (CollectionUtils.isNotEmpty(allClassifications)) {
+ for (AtlasClassification classification : allClassifications) {
+ String classificationName = classification.getTypeName();
+
+ Map<String, Object> valuesMap = classification.getAttributes();
+ Map<String, String> attributes = new HashMap<>();
+ if (valuesMap != null) {
+ for (Map.Entry<String, Object> value : valuesMap.entrySet()) {
+ if (value.getValue() != null) {
+ attributes.put(value.getKey(), value.getValue().toString());
+ }
+ }
+ }
+ ret.put(classificationName, attributes);
+ }
+ }
+ }
+ break;
+ case ENTITY_NOTIFICATION_V1: {
+ List<Struct> allTraits = ((EntityNotificationV1) notification).getAllTraits();
+ if (CollectionUtils.isNotEmpty(allTraits)) {
+ for (Struct trait : allTraits) {
+ String traitName = trait.getTypeName();
+
+ Map<String, Object> valuesMap = trait.getValuesMap();
+ Map<String, String> attributes = new HashMap<>();
+ if (valuesMap != null) {
+ for (Map.Entry<String, Object> value : valuesMap.entrySet()) {
+ if (value.getValue() != null) {
+ attributes.put(value.getKey(), value.getValue().toString());
+ }
+ }
+ }
+ ret.put(traitName, attributes);
+ }
+ }
+ }
+ break;
+ default: {
+ LOG.error("Unknown notification type - [" + notificationType + "]");
+ }
+ break;
+ }
+
+ return ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ranger/blob/9ed21047/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/RangerAtlasEntityWithTags.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/RangerAtlasEntityWithTags.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/RangerAtlasEntityWithTags.java
index b25a241..ecbc502 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/RangerAtlasEntityWithTags.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/RangerAtlasEntityWithTags.java
@@ -21,13 +21,9 @@ package org.apache.ranger.tagsync.source.atlasrest;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.v1.model.notification.EntityNotificationV1;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.instance.Struct;
import org.apache.commons.lang.StringUtils;
+import org.apache.ranger.tagsync.source.atlas.EntityNotificationWrapper;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
public class RangerAtlasEntityWithTags {
@@ -35,31 +31,12 @@ public class RangerAtlasEntityWithTags {
private final Map<String, Map<String, String>> tags;
private final AtlasTypeRegistry typeRegistry;
- public RangerAtlasEntityWithTags(EntityNotificationV1 notification ) {
- Referenceable atlasEntity = notification.getEntity();
+ public RangerAtlasEntityWithTags(EntityNotificationWrapper notification ) {
- String guid = atlasEntity.getId()._getId();
- String typeName = atlasEntity.getTypeName();
+ this.entity = notification.getRangerAtlasEntity();
- this.entity = new RangerAtlasEntity(typeName, guid, atlasEntity.getValues());
+ this.tags = notification.getAllClassifications();
- this.tags = new HashMap<>();
-
- List<Struct> allTraits = notification.getAllTraits();
- for (Struct trait : allTraits) {
- String traitName = trait.getTypeName();
-
- Map<String, Object> valuesMap = trait.getValuesMap();
- Map<String, String> attributes = new HashMap<>();
- if (valuesMap != null) {
- for (Map.Entry<String, Object> value : valuesMap.entrySet()) {
- if (value.getValue() != null) {
- attributes.put(value.getKey(), value.getValue().toString());
- }
- }
- }
- this.tags.put(traitName, attributes);
- }
this.typeRegistry = null;
}