You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ab...@apache.org on 2018/02/28 21:00:07 UTC

ranger git commit: RANGER-1997: Update tagsync to handle Atlas notifications of type V1 and V2

Repository: ranger
Updated Branches:
  refs/heads/master 49b8a7a26 -> 9ed210470


RANGER-1997: Update tagsync to handle Atlas notifications of type V1 and V2


Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/9ed21047
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/9ed21047
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/9ed21047

Branch: refs/heads/master
Commit: 9ed210470bbc447d1fbacd9b98421309394061b0
Parents: 49b8a7a
Author: Abhay Kulkarni <ak...@hortonworks.com>
Authored: Wed Feb 28 12:59:58 2018 -0800
Committer: Abhay Kulkarni <ak...@hortonworks.com>
Committed: Wed Feb 28 12:59:58 2018 -0800

----------------------------------------------------------------------
 .../source/atlas/AtlasNotificationMapper.java   |  30 +--
 .../tagsync/source/atlas/AtlasTagSource.java    |  44 ++--
 .../source/atlas/EntityNotificationWrapper.java | 242 +++++++++++++++++++
 .../atlasrest/RangerAtlasEntityWithTags.java    |  31 +--
 4 files changed, 283 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ranger/blob/9ed21047/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index 91cf606..916aad3 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -19,9 +19,6 @@
 
 package org.apache.ranger.tagsync.source.atlas;
 
-import org.apache.atlas.v1.model.notification.EntityNotificationV1;
-import org.apache.atlas.v1.model.instance.Id;
-import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
@@ -46,13 +43,12 @@ public class AtlasNotificationMapper {
 
 	private static Map<String, Long> unhandledEventTypes = new HashMap<String, Long>();
 
-	private static void logUnhandledEntityNotification(EntityNotificationV1 entityNotification) {
+	private static void logUnhandledEntityNotification(EntityNotificationWrapper entityNotification) {
 
 		final int REPORTING_INTERVAL_FOR_UNHANDLED_ENTITYTYPE_IN_MILLIS = 5 * 60 * 1000; // 5 minutes
 
 		boolean loggingNeeded = false;
-		String entityTypeName = entityNotification != null && entityNotification.getEntity() != null ?
-				entityNotification.getEntity().getTypeName() : null;
+		String entityTypeName = entityNotification.getEntityTypeName();
 
 		if (entityTypeName != null) {
 			Long timeInMillis = unhandledEventTypes.get(entityTypeName);
@@ -76,7 +72,7 @@ public class AtlasNotificationMapper {
 	}
 
 	@SuppressWarnings("unchecked")
-	public static ServiceTags processEntityNotification(EntityNotificationV1 entityNotification) {
+	public static ServiceTags processEntityNotification(EntityNotificationWrapper entityNotification) {
 
 		ServiceTags ret = null;
 
@@ -84,7 +80,7 @@ public class AtlasNotificationMapper {
 			try {
 				RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(entityNotification);
 
-				if (entityNotification.getOperationType() == EntityNotificationV1.OperationType.ENTITY_DELETE) {
+				if (entityNotification.getIsEntityDeleteOp()) {
 					ret = buildServiceTagsForEntityDeleteNotification(entityWithTags);
 				} else {
 					ret = buildServiceTags(entityWithTags, null);
@@ -111,21 +107,21 @@ public class AtlasNotificationMapper {
 		return ret;
 	}
 
-	static private boolean isNotificationHandled(EntityNotificationV1 entityNotification) {
+	static private boolean isNotificationHandled(EntityNotificationWrapper entityNotification) {
 		boolean ret = false;
 
-		EntityNotificationV1.OperationType opType = entityNotification.getOperationType();
+		EntityNotificationWrapper.NotificationType opType = entityNotification.getEntityNotificationType();
 
 		if (opType != null) {
 			switch (opType) {
 				case ENTITY_CREATE:
-					ret = CollectionUtils.isNotEmpty(entityNotification.getAllTraits());
+					ret = ! entityNotification.getIsEmptyClassifications();
 					break;
 				case ENTITY_UPDATE:
 				case ENTITY_DELETE:
-				case TRAIT_ADD:
-				case TRAIT_UPDATE:
-				case TRAIT_DELETE: {
+				case CLASSIFICATION_ADD:
+				case CLASSIFICATION_UPDATE:
+				case CLASSIFICATION_DELETE: {
 					ret = true;
 					break;
 				}
@@ -134,11 +130,7 @@ public class AtlasNotificationMapper {
 					break;
 			}
 			if (ret) {
-				final Referenceable entity = entityNotification.getEntity();
-
-				ret = entity != null
-						&& entity.getId().getState() == Id.EntityState.ACTIVE
-						&& AtlasResourceMapperUtil.isEntityTypeHandled(entity.getTypeName());
+				ret = entityNotification.getIsEntityTypeHandled();
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/ranger/blob/9ed21047/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index 8c15ee5..a13a789 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -21,9 +21,9 @@ package org.apache.ranger.tagsync.source.atlas;
 
 
 import org.apache.atlas.kafka.NotificationProvider;
+import org.apache.atlas.model.notification.EntityNotification;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.v1.model.notification.EntityNotificationV1;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -101,7 +101,7 @@ public class AtlasTagSource extends AbstractTagSource {
 
 		if (ret) {
 			NotificationInterface notification = NotificationProvider.get();
-			List<NotificationConsumer<EntityNotificationV1>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
+			List<NotificationConsumer<EntityNotification>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
 
 			consumerTask = new ConsumerRunnable(iterators.get(0));
 
@@ -138,10 +138,10 @@ public class AtlasTagSource extends AbstractTagSource {
 		}
 	}
 
-	private static String getPrintableEntityNotification(EntityNotificationV1 notification) {
+	private static String getPrintableEntityNotification(EntityNotificationWrapper notification) {
 		StringBuilder sb = new StringBuilder();
 
-		sb.append("{ Notification-Type: ").append(notification.getOperationType()).append(", ");
+		sb.append("{ Notification-Type: ").append(notification.getEntityNotificationType()).append(", ");
         RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(notification);
         sb.append(entityWithTags.toString());
 
@@ -151,9 +151,9 @@ public class AtlasTagSource extends AbstractTagSource {
 
 	private class ConsumerRunnable implements Runnable {
 
-		private final NotificationConsumer<EntityNotificationV1> consumer;
+		private final NotificationConsumer<EntityNotification> consumer;
 
-		private ConsumerRunnable(NotificationConsumer<EntityNotificationV1> consumer) {
+		private ConsumerRunnable(NotificationConsumer<EntityNotification> consumer) {
 			this.consumer = consumer;
 		}
 
@@ -165,23 +165,31 @@ public class AtlasTagSource extends AbstractTagSource {
 			}
 			while (true) {
 				try {
-					List<AtlasKafkaMessage<EntityNotificationV1>> messages = consumer.receive(1000L);
+					List<AtlasKafkaMessage<EntityNotification>> messages = consumer.receive(1000L);
 
-					for (AtlasKafkaMessage<EntityNotificationV1> message :  messages) {
-						EntityNotificationV1 notification = message != null ? message.getMessage() : null;
+					for (AtlasKafkaMessage<EntityNotification> message :  messages) {
+						EntityNotification notification = message != null ? message.getMessage() : null;
 
 						if (notification != null) {
-							if (LOG.isDebugEnabled()) {
-								LOG.debug("Notification=" + getPrintableEntityNotification(notification));
+							EntityNotificationWrapper notificationWrapper = null;
+							try {
+								notificationWrapper = new EntityNotificationWrapper(notification);
+							} catch (Throwable e) {
+								LOG.error("notification:[" + notification +"] has some issues..perhaps null entity??", e);
 							}
-
-							ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification);
-							if (serviceTags != null) {
-								updateSink(serviceTags);
+							if (notificationWrapper != null) {
+								if (LOG.isDebugEnabled()) {
+									LOG.debug("Notification=" + getPrintableEntityNotification(notificationWrapper));
+								}
+
+								ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notificationWrapper);
+								if (serviceTags != null) {
+									updateSink(serviceTags);
+								}
+
+								TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+								consumer.commit(partition, message.getOffset());
 							}
-
-							TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
-							consumer.commit(partition, message.getOffset());
 						} else {
 							LOG.error("Null entityNotification received from Kafka!! Ignoring..");
 						}

http://git-wip-us.apache.org/repos/asf/ranger/blob/9ed21047/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java
new file mode 100644
index 0000000..e680b14
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.notification.EntityNotification;
+import org.apache.atlas.v1.model.instance.Id;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
+import org.apache.atlas.v1.model.notification.EntityNotificationV2;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
+
+import javax.annotation.Nonnull;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EntityNotificationWrapper {
+	private static final Log LOG = LogFactory.getLog(EntityNotificationWrapper.class);
+
+	public enum NotificationType { UNKNOWN, ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, CLASSIFICATION_ADD, CLASSIFICATION_UPDATE, CLASSIFICATION_DELETE}
+
+	private final EntityNotification notification;
+	private final EntityNotification.EntityNotificationType notificationType;
+	private final RangerAtlasEntity rangerAtlasEntity;
+	private final String entityTypeName;
+	private final boolean isEntityTypeHandled;
+	private final boolean isEntityDeleteOp;
+	private final boolean isEmptyClassifications;
+
+	EntityNotificationWrapper(@Nonnull EntityNotification notification) {
+		this.notification = notification;
+		notificationType = this.notification.getType();
+
+		switch (notificationType) {
+			case ENTITY_NOTIFICATION_V2: {
+
+				EntityNotificationV2 v2Notification = (EntityNotificationV2) notification;
+				AtlasEntity atlasEntity = v2Notification.getEntity();
+				String guid = atlasEntity.getGuid();
+				String typeName = atlasEntity.getTypeName();
+
+				rangerAtlasEntity = new RangerAtlasEntity(typeName, guid, atlasEntity.getAttributes());
+				entityTypeName = atlasEntity.getTypeName();
+				isEntityTypeHandled = atlasEntity.getStatus() == AtlasEntity.Status.ACTIVE
+						&& AtlasResourceMapperUtil.isEntityTypeHandled(entityTypeName);
+				isEntityDeleteOp = EntityNotificationV2.OperationType.ENTITY_DELETE == v2Notification.getOperationType();
+				isEmptyClassifications = CollectionUtils.isNotEmpty(v2Notification.getClassifications());
+			}
+			break;
+			case ENTITY_NOTIFICATION_V1: {
+				EntityNotificationV1 v1Notification = (EntityNotificationV1) notification;
+
+				Referenceable atlasEntity = v1Notification.getEntity();
+				String guid = atlasEntity.getId()._getId();
+				String typeName = atlasEntity.getTypeName();
+
+				rangerAtlasEntity = new RangerAtlasEntity(typeName, guid, atlasEntity.getValues());
+				entityTypeName = atlasEntity.getTypeName();
+				isEntityTypeHandled = atlasEntity.getId().getState() == Id.EntityState.ACTIVE
+						&& AtlasResourceMapperUtil.isEntityTypeHandled(entityTypeName);
+				isEntityDeleteOp = EntityNotificationV1.OperationType.ENTITY_DELETE == v1Notification.getOperationType();
+				isEmptyClassifications = CollectionUtils.isNotEmpty(v1Notification.getAllTraits());
+			}
+			break;
+			default: {
+				LOG.error("Unknown notification type - [" + notificationType + "]");
+
+				rangerAtlasEntity = null;
+				entityTypeName = null;
+				isEntityTypeHandled = false;
+				isEntityDeleteOp = false;
+				isEmptyClassifications = true;
+			}
+
+			break;
+		}
+	}
+
+	public RangerAtlasEntity getRangerAtlasEntity() {
+		return rangerAtlasEntity;
+	}
+
+	public String getEntityTypeName() {
+		return entityTypeName;
+	}
+
+	public boolean getIsEntityTypeHandled() {
+		return isEntityTypeHandled;
+	}
+
+	public boolean getIsEntityDeleteOp() {
+		return isEntityDeleteOp;
+	}
+
+	public boolean getIsEmptyClassifications() {
+		return isEmptyClassifications;
+	}
+
+	public NotificationType getEntityNotificationType() {
+		NotificationType ret = NotificationType.UNKNOWN;
+
+		switch (notificationType) {
+			case ENTITY_NOTIFICATION_V2: {
+				EntityNotificationV2.OperationType opType = ((EntityNotificationV2) notification).getOperationType();
+				switch (opType) {
+					case ENTITY_CREATE:
+						ret = NotificationType.ENTITY_CREATE;
+						break;
+					case ENTITY_UPDATE:
+						ret = NotificationType.ENTITY_UPDATE;
+						break;
+					case ENTITY_DELETE:
+						ret = NotificationType.ENTITY_DELETE;
+						break;
+					case CLASSIFICATION_ADD:
+						ret = NotificationType.CLASSIFICATION_ADD;
+						break;
+					case CLASSIFICATION_UPDATE:
+						ret = NotificationType.CLASSIFICATION_UPDATE;
+						break;
+					case CLASSIFICATION_DELETE:
+						ret = NotificationType.CLASSIFICATION_DELETE;
+						break;
+					default:
+						LOG.error("Received OperationType [" + opType + "], converting to UNKNOWN");
+						break;
+				}
+				break;
+			}
+			case ENTITY_NOTIFICATION_V1: {
+				EntityNotificationV1.OperationType opType = ((EntityNotificationV1) notification).getOperationType();
+				switch (opType) {
+					case ENTITY_CREATE:
+						ret = NotificationType.ENTITY_CREATE;
+						break;
+					case ENTITY_UPDATE:
+						ret = NotificationType.ENTITY_UPDATE;
+						break;
+					case ENTITY_DELETE:
+						ret = NotificationType.ENTITY_DELETE;
+						break;
+					case TRAIT_ADD:
+						ret = NotificationType.CLASSIFICATION_ADD;
+						break;
+					case TRAIT_UPDATE:
+						ret = NotificationType.CLASSIFICATION_UPDATE;
+						break;
+					case TRAIT_DELETE:
+						ret = NotificationType.CLASSIFICATION_DELETE;
+						break;
+					default:
+						LOG.error("Received OperationType [" + opType + "], converting to UNKNOWN");
+						break;
+				}
+				break;
+			}
+			default: {
+				LOG.error("Unknown notification type - [" + notificationType + "]");
+			}
+			break;
+		}
+
+		return ret;
+	}
+
+	public Map<String, Map<String, String>> getAllClassifications() {
+		Map<String, Map<String, String>> ret = new HashMap<>();
+
+		switch (notificationType) {
+			case ENTITY_NOTIFICATION_V2: {
+				List<AtlasClassification> allClassifications = ((EntityNotificationV2) notification).getClassifications();
+				if (CollectionUtils.isNotEmpty(allClassifications)) {
+					for (AtlasClassification classification : allClassifications) {
+						String classificationName = classification.getTypeName();
+
+						Map<String, Object> valuesMap = classification.getAttributes();
+						Map<String, String> attributes = new HashMap<>();
+						if (valuesMap != null) {
+							for (Map.Entry<String, Object> value : valuesMap.entrySet()) {
+								if (value.getValue() != null) {
+									attributes.put(value.getKey(), value.getValue().toString());
+								}
+							}
+						}
+						ret.put(classificationName, attributes);
+					}
+				}
+			}
+			break;
+			case ENTITY_NOTIFICATION_V1: {
+				List<Struct> allTraits = ((EntityNotificationV1) notification).getAllTraits();
+				if (CollectionUtils.isNotEmpty(allTraits)) {
+					for (Struct trait : allTraits) {
+						String traitName = trait.getTypeName();
+
+						Map<String, Object> valuesMap = trait.getValuesMap();
+						Map<String, String> attributes = new HashMap<>();
+						if (valuesMap != null) {
+							for (Map.Entry<String, Object> value : valuesMap.entrySet()) {
+								if (value.getValue() != null) {
+									attributes.put(value.getKey(), value.getValue().toString());
+								}
+							}
+						}
+						ret.put(traitName, attributes);
+					}
+				}
+			}
+			break;
+			default: {
+				LOG.error("Unknown notification type - [" + notificationType + "]");
+			}
+			break;
+		}
+
+		return ret;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/ranger/blob/9ed21047/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/RangerAtlasEntityWithTags.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/RangerAtlasEntityWithTags.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/RangerAtlasEntityWithTags.java
index b25a241..ecbc502 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/RangerAtlasEntityWithTags.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/RangerAtlasEntityWithTags.java
@@ -21,13 +21,9 @@ package org.apache.ranger.tagsync.source.atlasrest;
 import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.v1.model.notification.EntityNotificationV1;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.instance.Struct;
 import org.apache.commons.lang.StringUtils;
+import org.apache.ranger.tagsync.source.atlas.EntityNotificationWrapper;
 
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 public class RangerAtlasEntityWithTags {
@@ -35,31 +31,12 @@ public class RangerAtlasEntityWithTags {
     private final Map<String, Map<String, String>> tags;
     private final AtlasTypeRegistry typeRegistry;
 
-    public RangerAtlasEntityWithTags(EntityNotificationV1 notification ) {
-        Referenceable atlasEntity = notification.getEntity();
+    public RangerAtlasEntityWithTags(EntityNotificationWrapper notification ) {
 
-        String guid = atlasEntity.getId()._getId();
-        String typeName = atlasEntity.getTypeName();
+        this.entity = notification.getRangerAtlasEntity();
 
-        this.entity = new RangerAtlasEntity(typeName, guid, atlasEntity.getValues());
+        this.tags = notification.getAllClassifications();
 
-        this.tags = new HashMap<>();
-
-        List<Struct> allTraits = notification.getAllTraits();
-        for (Struct trait : allTraits) {
-            String traitName = trait.getTypeName();
-
-            Map<String, Object> valuesMap = trait.getValuesMap();
-            Map<String, String> attributes = new HashMap<>();
-            if (valuesMap != null) {
-                for (Map.Entry<String, Object> value : valuesMap.entrySet()) {
-                    if (value.getValue() != null) {
-                        attributes.put(value.getKey(), value.getValue().toString());
-                    }
-                }
-            }
-            this.tags.put(traitName, attributes);
-        }
         this.typeRegistry = null;
     }