You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2015/11/11 21:58:51 UTC

[07/10] incubator-ranger git commit: RANGER-726: Updated tagsync for recent changes in Atlas API

RANGER-726: Updated tagsync for recent changes in Atlas API

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/49e890e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/49e890e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/49e890e2

Branch: refs/heads/tag-policy
Commit: 49e890e26360c742ccbf80d2741df7bec48c6319
Parents: 5b86864
Author: Abhay Kulkarni <ak...@hortonworks.com>
Authored: Mon Nov 9 18:50:52 2015 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Wed Nov 11 12:29:47 2015 -0800

----------------------------------------------------------------------
 .../atlas-client-0.5.0.2.3.1.1-19.jar           | Bin 34558 -> 0 bytes
 .../atlas-notification-0.5.0.2.3.1.1-19.jar     | Bin 34734 -> 0 bytes
 .../atlas-typesystem-0.5.0.2.3.1.1-19.jar       | Bin 355350 -> 0 bytes
 pom.xml                                         |   4 +-
 src/main/assembly/tagsync.xml                   |   3 +
 .../conf/templates/installprop2xml.properties   |   6 +-
 tagsync/pom.xml                                 |  18 +
 tagsync/scripts/install.properties              |   2 +-
 tagsync/scripts/setup.py                        |   5 +-
 .../source/atlas/AtlasNotificationMapper.java   |  97 +++--
 .../tagsync/source/atlas/AtlasUtility.java      | 404 -------------------
 .../tagsync/source/atlas/TagAtlasSource.java    |  78 ++--
 12 files changed, 132 insertions(+), 485 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/local-repo/org/apache/atlas/atlas-client/0.5.0.2.3.1.1-19/atlas-client-0.5.0.2.3.1.1-19.jar
----------------------------------------------------------------------
diff --git a/local-repo/org/apache/atlas/atlas-client/0.5.0.2.3.1.1-19/atlas-client-0.5.0.2.3.1.1-19.jar b/local-repo/org/apache/atlas/atlas-client/0.5.0.2.3.1.1-19/atlas-client-0.5.0.2.3.1.1-19.jar
deleted file mode 100644
index 1fb2ef7..0000000
Binary files a/local-repo/org/apache/atlas/atlas-client/0.5.0.2.3.1.1-19/atlas-client-0.5.0.2.3.1.1-19.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/local-repo/org/apache/atlas/atlas-notification/0.5.0.2.3.1.1-19/atlas-notification-0.5.0.2.3.1.1-19.jar
----------------------------------------------------------------------
diff --git a/local-repo/org/apache/atlas/atlas-notification/0.5.0.2.3.1.1-19/atlas-notification-0.5.0.2.3.1.1-19.jar b/local-repo/org/apache/atlas/atlas-notification/0.5.0.2.3.1.1-19/atlas-notification-0.5.0.2.3.1.1-19.jar
deleted file mode 100644
index 848eeb3..0000000
Binary files a/local-repo/org/apache/atlas/atlas-notification/0.5.0.2.3.1.1-19/atlas-notification-0.5.0.2.3.1.1-19.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/local-repo/org/apache/atlas/atlas-typesystem/0.5.0.2.3.1.1-19/atlas-typesystem-0.5.0.2.3.1.1-19.jar
----------------------------------------------------------------------
diff --git a/local-repo/org/apache/atlas/atlas-typesystem/0.5.0.2.3.1.1-19/atlas-typesystem-0.5.0.2.3.1.1-19.jar b/local-repo/org/apache/atlas/atlas-typesystem/0.5.0.2.3.1.1-19/atlas-typesystem-0.5.0.2.3.1.1-19.jar
deleted file mode 100644
index f619b6e..0000000
Binary files a/local-repo/org/apache/atlas/atlas-typesystem/0.5.0.2.3.1.1-19/atlas-typesystem-0.5.0.2.3.1.1-19.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0648d67..d60fca4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -213,7 +213,7 @@
 		<jaxb-api.version>2.2.2</jaxb-api.version>
 		<jackson.version>1.9.13</jackson.version>
 		<sun-jersey-bundle.version>1.19</sun-jersey-bundle.version>
-		<atlas.version>0.5.0.2.3.1.1-19</atlas.version>
+		<atlas.version>0.6-incubating-SNAPSHOT</atlas.version>
 		<distMgmtStagingId>apache.staging.https</distMgmtStagingId>
     	<distMgmtStagingName>Apache Release Distribution Repository</distMgmtStagingName>
     	<distMgmtStagingUrl>https://repository.apache.org/service/local/staging/deploy/maven2</distMgmtStagingUrl>
@@ -264,6 +264,7 @@
                 <enabled>false</enabled>
             </snapshots>
           </repository>
+    <!--
     <repository>
       <id>repo</id>
       <url>file://${basedir}/local-repo</url>
@@ -271,6 +272,7 @@
          <enabled>true</enabled>
       </snapshots>
   </repository>
+  -->
   </repositories>
 	<dependencyManagement>
 		<dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/src/main/assembly/tagsync.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml
index 331dae0..8adc5cc 100644
--- a/src/main/assembly/tagsync.xml
+++ b/src/main/assembly/tagsync.xml
@@ -56,7 +56,10 @@
 					<include>org.apache.atlas:atlas-notification</include>
 					<include>org.apache.atlas:atlas-typesystem</include>
 					<include>org.apache.atlas:atlas-client</include>
+					<include>org.apache.atlas:atlas-common</include>
 					<include>com.google.inject:guice</include>
+					<include>com.google.inject.extensions:guice-multibindings</include>
+					<include>org.codehaus.jettison:jettison</include>
 					<include>aopalliance:aopalliance</include>
 					<include>javax.inject:javax.inject</include>
 					<include>org.apache.kafka:kafka_2.10</include>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/conf/templates/installprop2xml.properties
----------------------------------------------------------------------
diff --git a/tagsync/conf/templates/installprop2xml.properties b/tagsync/conf/templates/installprop2xml.properties
index 5b63835..101a1ba 100644
--- a/tagsync/conf/templates/installprop2xml.properties
+++ b/tagsync/conf/templates/installprop2xml.properties
@@ -34,8 +34,8 @@ TAG_SOURCE_FILE_MOD_TIME_CHECK_INTERVAL = ranger.tagsync.filesource.modtime.chec
 
 TAGSYNC_FILESOURCE_FILENAME = ranger.tagsync.filesource.filename
 
-TAGSYNC_ATLAS_KAFKA_ENDPOINTS = atlas.notification.kafka.bootstrap.servers
-TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = atlas.notification.kafka.zookeeper.connect
-TAGSYNC_ATLAS_CONSUMER_GROUP = atlas.notification.kafka.group.id
+TAGSYNC_ATLAS_KAFKA_ENDPOINTS = atlas.kafka.bootstrap.servers
+TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = atlas.kafka.zookeeper.connect
+TAGSYNC_ATLAS_CONSUMER_GROUP = atlas.kafka.entities.group.id
 
 TAGSYNC_ATLAS_TO_RANGER_SERVICE_MAPPING = ranger.tagsync.atlas.to.service.mapping

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/pom.xml
----------------------------------------------------------------------
diff --git a/tagsync/pom.xml b/tagsync/pom.xml
index b800f61..c860c4a 100644
--- a/tagsync/pom.xml
+++ b/tagsync/pom.xml
@@ -30,8 +30,10 @@
         <version>0.5.0</version>
     </parent>
 
+    <!--
     <repositories>
 
+
         <repository>
             <id>repo</id>
             <url>file://${basedir}/../local-repo</url>
@@ -40,6 +42,7 @@
             </snapshots>
         </repository>
     </repositories>
+    -->
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -107,6 +110,16 @@
             <version>4.0</version>
         </dependency>
         <dependency>
+            <groupId>com.google.inject.extensions</groupId>
+            <artifactId>guice-multibindings</artifactId>
+            <version>4.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jettison</groupId>
+            <artifactId>jettison</artifactId>
+            <version>1.1</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.atlas</groupId>
             <artifactId>atlas-notification</artifactId>
             <version>${atlas.version}</version>
@@ -121,5 +134,10 @@
             <artifactId>atlas-client</artifactId>
             <version>${atlas.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-common</artifactId>
+            <version>${atlas.version}</version>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/scripts/install.properties
----------------------------------------------------------------------
diff --git a/tagsync/scripts/install.properties b/tagsync/scripts/install.properties
index f7de6e3..b5ad580 100644
--- a/tagsync/scripts/install.properties
+++ b/tagsync/scripts/install.properties
@@ -53,7 +53,7 @@ TAG_SOURCE_FILE_MOD_TIME_CHECK_INTERVAL = 60000
 
 TAGSYNC_ATLAS_KAFKA_ENDPOINTS = localhost:6667
 TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = localhost:2181
-TAGSYNC_ATLAS_CONSUMER_GROUP = entityConsumer
+TAGSYNC_ATLAS_CONSUMER_GROUP = ranger_entities_consumer
 
 # Mapping from Atlas hive instance-name to Ranger service-name
 # this needs to be in format clusterName,componentType,serviceName;clusterName2,componentType2,serviceName2

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/scripts/setup.py
----------------------------------------------------------------------
diff --git a/tagsync/scripts/setup.py b/tagsync/scripts/setup.py
index e4b2433..f7455b8 100755
--- a/tagsync/scripts/setup.py
+++ b/tagsync/scripts/setup.py
@@ -317,8 +317,9 @@ def main():
 	atlasOutFile = file(atlasOutFn, "a")
 
 	atlasOutFile.write("atlas.notification.embedded=false" + "\n")
-	atlasOutFile.write("atlas.notification.kafka.acks=1" + "\n")
-	atlasOutFile.write("atlas.notification.kafka.data=${sys:atlas.home}/data/kafka" + "\n")
+	atlasOutFile.write("atlas.kafka.acks=1" + "\n")
+	atlasOutFile.write("atlas.kafka.data=${sys:atlas.home}/data/kafka" + "\n")
+	atlasOutFile.write("atlas.kafka.hook.group.id=atlas" + "\n")
 
 	atlasOutFile.close()
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index 8046b68..7925b5c 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -19,9 +19,10 @@
 
 package org.apache.ranger.tagsync.source.atlas;
 
+import org.apache.atlas.AtlasException;
 import org.apache.atlas.notification.entity.EntityNotification;
-import org.apache.atlas.typesystem.api.Entity;
-import org.apache.atlas.typesystem.api.Trait;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.IStruct;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -58,7 +59,9 @@ class AtlasNotificationMapper {
 		properties = props;
 
 		try {
-			if (isEntityMappable(entityNotification.getEntity())) {
+			IReferenceableInstance entity = entityNotification.getEntity();
+
+			if (isEntityMappable(entity)) {
 				ret = createServiceTags(entityNotification);
 			} else {
 				if(LOG.isDebugEnabled()) {
@@ -71,7 +74,7 @@ class AtlasNotificationMapper {
 		return ret;
 	}
 
-	static private boolean isEntityMappable(Entity entity) {
+	static private boolean isEntityMappable(IReferenceableInstance entity) {
 		boolean ret = false;
 
 		String entityTypeName = entity.getTypeName();
@@ -91,44 +94,43 @@ class AtlasNotificationMapper {
 		ServiceTags ret = null;
 
 		EntityNotification.OperationType opType = entityNotification.getOperationType();
-		Entity entity = entityNotification.getEntity();
 
-		String opName = entityNotification.getOperationType().name();
 		switch (opType) {
-			case ENTITY_CREATED: {
-				LOG.debug("ENTITY_CREATED notification is not handled, as Ranger will get necessary information from any subsequent TRAIT_ADDED notification");
+			case ENTITY_CREATE: {
+				LOG.debug("ENTITY_CREATE notification is not handled, as Ranger will get necessary information from any subsequent TRAIT_ADDED notification");
 				break;
 			}
-			case ENTITY_UPDATED: {
-				ret = getServiceTags(entity);
+			case ENTITY_UPDATE: {
+				ret = getServiceTags(entityNotification);
 				if (MapUtils.isEmpty(ret.getTags())) {
 					LOG.debug("No traits associated with this entity update notification. Ignoring it altogether");
 					ret = null;
 				}
 				break;
 			}
-			case TRAIT_ADDED:
-			case TRAIT_DELETED: {
-				ret = getServiceTags(entity);
+			case TRAIT_ADD:
+			case TRAIT_DELETE: {
+				ret = getServiceTags(entityNotification);
 				break;
 			}
 			default:
-				LOG.error(opName + ": unknown notification received - not handled");
+				LOG.error(opType + ": unknown notification received - not handled");
 		}
 
 		return ret;
 	}
 
-	static private ServiceTags getServiceTags(Entity entity) throws Exception {
+	static private ServiceTags getServiceTags(EntityNotification entityNotification) throws Exception {
 		ServiceTags ret = null;
 
+		IReferenceableInstance entity = entityNotification.getEntity();
 
 		List<RangerServiceResource> serviceResources = new ArrayList<RangerServiceResource>();
 
 		RangerServiceResource serviceResource = getServiceResource(entity);
 		serviceResources.add(serviceResource);
 
-		Map<Long, RangerTag> tags = getTags(entity);
+		Map<Long, RangerTag> tags = getTags(entityNotification);
 
 		Map<Long, RangerTagDef> tagDefs = getTagDefs(tags);
 
@@ -163,7 +165,7 @@ class AtlasNotificationMapper {
 	}
 
 
-	static private RangerServiceResource getServiceResource(Entity entity) throws Exception {
+	static private RangerServiceResource getServiceResource(IReferenceableInstance entity) throws Exception {
 
 		RangerServiceResource ret = null;
 
@@ -224,7 +226,7 @@ class AtlasNotificationMapper {
 
 
 		ret = new RangerServiceResource();
-		ret.setGuid(entity.getId().getGuid());
+		ret.setGuid(entity.getId()._getId());
 		ret.setId(1L);
 		ret.setServiceName(serviceName);
 		ret.setResourceElements(elements);
@@ -232,22 +234,24 @@ class AtlasNotificationMapper {
 		return ret;
 	}
 
-	static private Map<Long, RangerTag> getTags(Entity entity) {
+	static private Map<Long, RangerTag> getTags(EntityNotification entityNotification) {
 		Map<Long, RangerTag> ret = null;
 
-		Map<String, ? extends Trait> traits = entity.getTraits();
+		ret = new HashMap<Long, RangerTag>();
+
+		long index = 1;
+
+		List<IStruct> traits = entityNotification.getAllTraits();
+
+		for (IStruct trait : traits) {
 
-		if (MapUtils.isNotEmpty(traits)) {
-			ret = new HashMap<Long, RangerTag>();
-			long index = 1;
+			String traitName = trait.getTypeName();
 
-			for (Map.Entry<String, ? extends Trait> entry : traits.entrySet()) {
-				String traitName = entry.getKey();
-				Trait trait = entry.getValue();
+			Map<String, String> tagAttrValues = new HashMap<String, String>();
 
-				Map<String, Object> attrValues = trait.getValues();
+			try {
 
-				Map<String, String> tagAttrValues = new HashMap<String, String>();
+				Map<String, Object> attrValues = trait.getValuesMap();
 
 				for (Map.Entry<String, Object> attrValueEntry : attrValues.entrySet()) {
 					String attrName = attrValueEntry.getKey();
@@ -259,14 +263,17 @@ class AtlasNotificationMapper {
 						LOG.error("Cannot cast attribute-value to String, skipping... attrName=" + attrName);
 					}
 				}
+			} catch (AtlasException exception) {
+				LOG.error("Could not get values for trait:" + traitName, exception);
+			}
 
-				RangerTag tag = new RangerTag();
+			RangerTag tag = new RangerTag();
 
-				tag.setType(traitName);
-				tag.setAttributes(tagAttrValues);
+			tag.setType(traitName);
+			tag.setAttributes(tagAttrValues);
+
+			ret.put(index++, tag);
 
-				ret.put(index++, tag);
-			}
 		}
 
 		return ret;
@@ -289,13 +296,13 @@ class AtlasNotificationMapper {
 		return ret;
 	}
 
-	static private String[] getQualifiedNameComponents(Entity entity) {
+	static private String[] getQualifiedNameComponents(IReferenceableInstance entity) {
 		String ret[] = new String[5];
 
 		if (StringUtils.equals(entity.getTypeName(), ENTITY_TYPE_HIVE_DB)) {
 
-			String clusterName = getAttribute(entity.getValues(), "clusterName", String.class);
-			String name = getAttribute(entity.getValues(), "name", String.class);
+			String clusterName = getEntityAttribute(entity, "clusterName", String.class);
+			String name = getEntityAttribute(entity, "name", String.class);
 
 			ret[1] = clusterName;
 			ret[2] = name;
@@ -303,20 +310,20 @@ class AtlasNotificationMapper {
 			ret[0] = ret[1] + "." + ret[2];
 
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("----- Entity-Id:" + entity.getId().getGuid());
+				LOG.debug("----- Entity-Id:" + entity.getId()._getId());
 				LOG.debug("----- Entity-Type-Name:" + entity.getTypeName());
 				LOG.debug("----- Entity-Cluster-Name:" + clusterName);
 				LOG.debug("----- Entity-Name:" + name);
 			}
 		} else {
-			String qualifiedName = getAttribute(entity.getValues(), ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+			String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
 
 			String nameHierarchy[] = qualifiedName.split(QUALIFIED_NAME_FORMAT_DELIMITER_STRING);
 
 			int hierarchyLevels = nameHierarchy.length;
 
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("----- Entity-Id:" + entity.getId().getGuid());
+				LOG.debug("----- Entity-Id:" + entity.getId()._getId());
 				LOG.debug("----- Entity-Type-Name:" + entity.getTypeName());
 				LOG.debug("----- Entity-Qualified-Name:" + qualifiedName);
 				LOG.debug("-----	Entity-Qualified-Name-Components -----");
@@ -351,6 +358,18 @@ class AtlasNotificationMapper {
 		return TagSyncConfig.getServiceName(apacheComponent, instanceName, properties);
 	}
 
+	static private <T> T getEntityAttribute(IReferenceableInstance entity, String name, Class<T> type) {
+		T ret = null;
+
+		try {
+			Map<String, Object> valueMap = entity.getValuesMap();
+			ret = getAttribute(valueMap, name, type);
+		} catch (AtlasException exception) {
+			LOG.error("Cannot get map of values for entity: " + entity.getId()._getId(), exception);
+		}
+
+		return ret;
+	}
 	static private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) {
 		return type.cast(map.get(name));
 	}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
deleted file mode 100644
index 2548c36..0000000
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.tagsync.source.atlas;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import org.apache.atlas.typesystem.EntityImpl;
-import org.apache.atlas.typesystem.IdImpl;
-import org.apache.atlas.typesystem.TraitImpl;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.atlas.typesystem.api.Entity;
-import org.apache.atlas.typesystem.api.Trait;
-import org.apache.ranger.admin.client.datatype.RESTResponse;
-import org.apache.ranger.plugin.util.RangerRESTClient;
-import org.apache.ranger.plugin.util.RangerRESTUtils;
-import org.apache.ranger.tagsync.process.TagSyncConfig;
-
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.util.*;
-
-
-// class AtlasUtil
-
-@SuppressWarnings("unchecked")
-public class AtlasUtility {
-
-	private static final Log LOG = LogFactory.getLog(AtlasUtility.class);
-
-	// Atlas APIs
-
-	public static final String API_ATLAS_TYPES = "api/atlas/types";
-	public static final String API_ATLAS_ENTITIES = "api/atlas/entities?type=";
-	public static final String API_ATLAS_ENTITY = "api/atlas/entities/";
-	public static final String API_ATLAS_TYPE = "api/atlas/types/";
-
-	public static final String RESULTS_ATTRIBUTE = "results";
-	public static final String DEFINITION_ATTRIBUTE = "definition";
-	public static final String VALUES_ATTRIBUTE = "values";
-	public static final String TRAITS_ATTRIBUTE = "traits";
-	public static final String TYPE_NAME_ATTRIBUTE = "typeName";
-	public static final String TRAIT_TYPES_ATTRIBUTE = "traitTypes";
-	public static final String SUPER_TYPES_ATTRIBUTE = "superTypes";
-	public static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = "attributeDefinitions";
-	public static final String NAME_ATTRIBUTE = "name";
-
-	private Type mapType = new TypeToken<Map<String, Object>>() {
-	}.getType();
-
-	private RangerRESTClient restClient;
-	private Map<String, Entity> entities = new LinkedHashMap<>();
-
-
-	// ----- Constructor ------------------------------------------------------
-
-	public AtlasUtility(Properties properties) {
-
-		String url = TagSyncConfig.getAtlasEndpoint(properties);
-		String sslConfigFileName = TagSyncConfig.getAtlasSslConfigFileName(properties);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Initializing RangerRestClient with (url=" + url + ", sslConfigFileName" + sslConfigFileName + ")");
-		}
-
-		restClient = new RangerRESTClient(url, sslConfigFileName);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Initialized RangerRestClient with (url=" + url + ", sslConfigFileName=" + sslConfigFileName + ")");
-		}
-	}
-
-	// update the set of entities with current from Atlas
-	public void refreshAllEntities() {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagAtlasSource.refreshAllEntities()");
-		}
-
-		try {
-			entities.clear();
-			entities.putAll(getAllEntities());
-		} catch (IOException e) {
-			LOG.error("getAllEntities() failed", e);
-		}
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== TagAtlasSource.refreshAllEntities()");
-		}
-	}
-
-	// ----- AtlasUtility ------------------------------------------------------
-
-	public Map<String, Entity> getAllEntities() throws IOException {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagAtlasSource.getAllEntities()");
-		}
-		Map<String, Entity> entities = new LinkedHashMap<>();
-
-		Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES);
-
-		List<String> types = getAttribute(typesResponse, RESULTS_ATTRIBUTE, List.class);
-
-		for (String type : types) {
-
-			Map<String, Object> entitiesResponse = atlasAPI(API_ATLAS_ENTITIES + type);
-
-			List<String> guids = getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class);
-
-			for (String guid : guids) {
-
-				if (StringUtils.isNotBlank(guid)) {
-
-					Map<Trait, Map<String, ? extends Trait>> traitSuperTypes = new HashMap<>();
-
-					Map<String, Object> entityResponse = atlasAPI(API_ATLAS_ENTITY + guid);
-
-					if (entityResponse.containsKey(DEFINITION_ATTRIBUTE)) {
-						String definitionJSON = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, String.class);
-
-						LOG.info("{");
-						LOG.info("	\"entity-id\":" + guid + ",");
-						LOG.info("	\"entity-definition\":" + definitionJSON);
-						LOG.info("}");
-
-						Map<String, Object> definition = new Gson().fromJson(definitionJSON, mapType);
-
-						Map<String, Object> values = getAttribute(definition, VALUES_ATTRIBUTE, Map.class);
-						Map<String, Object> traits = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
-						String typeName = getAttribute(definition, TYPE_NAME_ATTRIBUTE, String.class);
-
-						LOG.info("Received entity(typeName=" + typeName + ", id=" + guid + ")");
-
-
-						Map<String, TraitImpl> traitMap = new HashMap<>();
-
-						if (MapUtils.isNotEmpty(traits)) {
-
-							LOG.info("Traits for entity(typeName=" + typeName + ", id=" + guid + ") ------ ");
-
-							for (Map.Entry<String, Object> entry : traits.entrySet()) {
-
-								Map<String, Object> trait = (Map<String, Object>) entry.getValue();
-
-								Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
-								String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
-
-								Map<String, TraitImpl> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
-
-								TraitImpl trait1 = new TraitImpl(traitTypeName, traitValues, superTypes);
-
-								traitSuperTypes.put(trait1, superTypes);
-
-								traitMap.put(entry.getKey(), trait1);
-
-
-								LOG.info("			Trait(typeName=" + traitTypeName + ")");
-
-							}
-						} else {
-							LOG.info("No traits for entity(typeName=" + typeName + ", id=" + guid + ")");
-						}
-						EntityImpl entity = new EntityImpl(new IdImpl(guid, 0), typeName, values, traitMap);
-
-						showEntity(entity);
-
-						entities.put(guid, entity);
-
-					}
-				}
-			}
-		}
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagAtlasSource.getAllEntities()");
-		}
-		return entities;
-	}
-
-
-	// ----- helper methods ----------------------------------------------------
-
-	private Map<String, Object> getTraitType(String traitName)
-			throws IOException {
-
-		Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + traitName);
-
-		if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) {
-			String definitionJSON = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, String.class);
-
-			Map<String, Object> definition = new Gson().fromJson(definitionJSON, mapType);
-
-			List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class);
-
-			if (traitTypes.size() > 0) {
-				return (Map<String, Object>) traitTypes.get(0);
-			}
-		}
-		return null;
-	}
-
-	private Map<String, TraitImpl> getTraitSuperTypes(Map<String, Object> traitType, Map<String, Object> values)
-			throws IOException {
-
-		Map<String, TraitImpl> superTypes = new HashMap<>();
-
-		if (traitType != null) {
-
-			List<String> superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class);
-
-			for (String superTypeName : superTypeNames) {
-
-				Map<String, Object> superTraitType = getTraitType(superTypeName);
-
-				if (superTraitType != null) {
-					List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
-
-					Map<String, Object> superTypeValues = new HashMap<>();
-					for (Map<String, Object> attributeDefinition : attributeDefinitions) {
-
-						String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString();
-						if (values.containsKey(attributeName)) {
-							superTypeValues.put(attributeName, values.get(attributeName));
-						}
-					}
-
-					superTypes.put(superTypeName,
-							//new TraitImpl(getTraitSuperTypes(superTraitType, superTypeValues), superTypeValues, superTypeName));
-							new TraitImpl(superTypeName, superTypeValues, getTraitSuperTypes(superTraitType, superTypeValues)));
-				}
-			}
-		}
-		return superTypes;
-	}
-
-
-	/*
-		private Map<String, Object> atlasAPI(String endpoint) throws IOException {
-			InputStream in = streamProvider.readFrom(atlasEndpoint + endpoint, "GET", (String) null, Collections.<String, String>emptyMap());
-			return new Gson().fromJson(IOUtils.toString(in, "UTF-8"), mapType);
-		}
-		*/
-
-
-	private Map<String, Object> atlasAPI(String endpoint) {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("==> TagAtlasSource.atlasAPI(" + endpoint + ")");
-		}
-		// Create a REST client and perform a get on it
-		Map<String, Object> ret = new HashMap<String, Object>();
-
-		WebResource webResource = restClient.getResource(endpoint);
-
-		ClientResponse response = webResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class);
-
-		if (response != null && response.getStatus() == 200) {
-			ret = response.getEntity(ret.getClass());
-		} else {
-			LOG.error("Atlas REST call returned with response={" + response + "}");
-
-			RESTResponse resp = RESTResponse.fromClientResponse(response);
-			LOG.error("Error getting Atlas Entity. request=" + webResource.toString()
-					+ ", response=" + resp.toString());
-		}
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("<== TagAtlasSource.atlasAPI(" + endpoint + ")");
-		}
-		return ret;
-	}
-
-	private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) {
-		return type.cast(map.get(name));
-	}
-
-	public void showEntity(Entity entity) {
-
-		LOG.debug("Entity-id	:" + entity.getId());
-
-		LOG.debug("Type:		" + entity.getTypeName());
-
-		LOG.debug("----- Values -----");
-
-		for (Map.Entry<String, Object> entry : entity.getValues().entrySet()) {
-			LOG.debug("		Name:	" + entry.getKey() + "");
-			Object value = entry.getValue();
-			LOG.debug("		Value:	" + getValue(value, entities.keySet()));
-		}
-
-		LOG.debug("----- Traits -----");
-
-		for (String traitName : entity.getTraits().keySet()) {
-			LOG.debug("		Name:" + entity.getId() + ", trait=" + traitName + ">" + traitName);
-		}
-
-	}
-
-	public void showTrait(Entity entity, String traitId) {
-
-		String[] traitNames = traitId.split(",");
-
-		Trait trait = entity.getTraits().get(traitNames[0]);
-
-		for (int i = 1; i < traitNames.length; ++i) {
-			trait = trait.getSuperTypes().get(traitNames[i]);
-		}
-
-		String typeName = trait.getTypeName();
-
-		LOG.debug("Trait " + typeName + " for Entity id=" + entity.getId());
-
-		LOG.debug("Type: " + typeName);
-
-		LOG.debug("----- Values ------");
-
-		for (Map.Entry<String, Object> entry : trait.getValues().entrySet()) {
-			LOG.debug("Name:" + entry.getKey());
-			Object value = entry.getValue();
-			LOG.debug("Value:" + getValue(value, entities.keySet()));
-		}
-
-		LOG.debug("Super Traits");
-
-
-		for (String traitName : trait.getSuperTypes().keySet()) {
-			LOG.debug("Name=" + entity.getId() + "&trait=" + traitId + "," + traitName + ">" + traitName);
-		}
-	}
-
-	// resolve the given value if necessary
-	private String getValue(Object value, Set<String> ids) {
-		if (value == null) {
-			return "";
-		}
-		String idString = getIdValue(value, ids);
-		if (idString != null) {
-			return idString;
-		}
-
-		idString = getIdListValue(value, ids);
-		if (idString != null) {
-			return idString;
-		}
-
-		return value.toString();
-	}
-
-	// get an id from the given value; return null if the value is not an id type
-	private String getIdValue(Object value, Set<String> ids) {
-		if (value instanceof Map) {
-			Map map = (Map) value;
-			if (map.size() == 3 && map.containsKey("id")) {
-				String id = map.get("id").toString();
-				if (ids.contains(id)) {
-					return id;
-				}
-			}
-		}
-		return null;
-	}
-
-	// get an id list from the given value; return null if the value is not an id list type
-	private String getIdListValue(Object value, Set<String> ids) {
-		if (value instanceof List) {
-			List list = (List) value;
-			if (list.size() > 0) {
-				StringBuilder sb = new StringBuilder();
-				for (Object o : list) {
-					String idString = getIdValue(o, ids);
-					if (idString == null) {
-						return value.toString();
-					}
-					if (sb.length() > 0) {
-						sb.append(", ");
-					}
-					sb.append(idString);
-				}
-				return sb.toString();
-			}
-		}
-		return null;
-	}
-}
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
index 2725b23..fd64d12 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
@@ -23,17 +23,21 @@ import com.google.gson.Gson;
 
 import com.google.inject.Guice;
 import com.google.inject.Injector;
+import com.google.inject.Provider;
 
+import org.apache.atlas.AtlasException;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.NotificationModule;
 import org.apache.atlas.notification.entity.EntityNotification;
-import org.apache.atlas.notification.entity.EntityNotificationConsumer;
-import org.apache.atlas.notification.entity.EntityNotificationConsumerProvider;
-import org.apache.atlas.typesystem.api.Entity;
-import org.apache.atlas.typesystem.api.Trait;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.IStruct;
+
 import org.apache.ranger.tagsync.model.TagSink;
 import org.apache.ranger.tagsync.model.TagSource;
 import org.apache.ranger.plugin.util.ServiceTags;
@@ -47,9 +51,9 @@ public class TagAtlasSource implements TagSource {
 
 	public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = "application.properties";
 
-	public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.notification.kafka.bootstrap.servers";
-	public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.notification.kafka.zookeeper.connect";
-	public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.notification.kafka.group.id";
+	public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.kafka.bootstrap.servers";
+	public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.kafka.zookeeper.connect";
+	public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id";
 
 	private TagSink tagSink;
 	private Properties properties;
@@ -112,9 +116,11 @@ public class TagAtlasSource implements TagSource {
 
 			Injector injector = Guice.createInjector(notificationModule);
 
-			EntityNotificationConsumerProvider consumerProvider = injector.getInstance(EntityNotificationConsumerProvider.class);
+			Provider<NotificationInterface> consumerProvider = injector.getProvider(NotificationInterface.class);
+			NotificationInterface notification = consumerProvider.get();
+			List<NotificationConsumer<EntityNotification>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
 
-			consumerTask = new ConsumerRunnable(consumerProvider.get());
+			consumerTask = new ConsumerRunnable(iterators.get(0));
 		}
 
 		if (LOG.isDebugEnabled()) {
@@ -158,26 +164,25 @@ public class TagAtlasSource implements TagSource {
 
 	private class ConsumerRunnable implements Runnable {
 
-		private final EntityNotificationConsumer consumer;
+		private final Iterator<EntityNotification> consumerIterator;
 
-		private ConsumerRunnable(EntityNotificationConsumer consumer) {
-			this.consumer = consumer;
+		private ConsumerRunnable(Iterator<EntityNotification> consumerIterator) {
+			this.consumerIterator = consumerIterator;
 		}
 
-
 		// ----- Runnable --------------------------------------------------------
 
 		@Override
 		public void run() {
-			while (consumer.hasNext()) {
+			while (consumerIterator.hasNext()) {
 				try {
-					EntityNotification notification = consumer.next();
+					EntityNotification notification = consumerIterator.next();
 					if (notification != null) {
 						printNotification(notification);
 						ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification, properties);
 						if (serviceTags == null) {
 							if(LOG.isDebugEnabled()) {
-								LOG.debug("Did not create ServiceTags structure for notification type:" + notification.getOperationType().name());
+								LOG.debug("Did not create ServiceTags structure for notification type:" + notification.getOperationType());
 							}
 						} else {
 							if (LOG.isDebugEnabled()) {
@@ -199,33 +204,36 @@ public class TagAtlasSource implements TagSource {
 		}
 
 		public void printNotification(EntityNotification notification) {
-			Entity entity = notification.getEntity();
+			IReferenceableInstance entity = notification.getEntity();
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Notification-Type: " + notification.getOperationType().name());
-				LOG.debug("Entity-Id: " + entity.getId().getGuid());
-				LOG.debug("Entity-Type: " + entity.getTypeName());
+				try {
+					LOG.debug("Notification-Type: " + notification.getOperationType());
+					LOG.debug("Entity-Id: " + entity.getId()._getId());
+					LOG.debug("Entity-Type: " + entity.getTypeName());
 
-				LOG.debug("----------- Entity Values ----------");
+					LOG.debug("----------- Entity Values ----------");
 
 
-				for (Map.Entry<String, Object> entry : entity.getValues().entrySet()) {
-					LOG.debug("		Name:" + entry.getKey());
-					Object value = entry.getValue();
-					LOG.debug("		Value:" + value);
-				}
+					for (Map.Entry<String, Object> entry : entity.getValuesMap().entrySet()) {
+						LOG.debug("		Name:" + entry.getKey());
+						Object value = entry.getValue();
+						LOG.debug("		Value:" + value);
+					}
 
-				LOG.debug("----------- Entity Traits ----------");
+					LOG.debug("----------- Entity Traits ----------");
 
+					List<IStruct> traits = notification.getAllTraits();
 
-				for (Map.Entry<String, ? extends Trait> entry : entity.getTraits().entrySet()) {
-					LOG.debug("			Trait-Name:" + entry.getKey());
-					Trait trait = entry.getValue();
-					LOG.debug("			Trait-Type:" + trait.getTypeName());
-					Map<String, Object> traitValues = trait.getValues();
-					for (Map.Entry<String, Object> valueEntry : traitValues.entrySet()) {
-						LOG.debug("				Trait-Value-Name:" + valueEntry.getKey());
-						LOG.debug("				Trait-Value:" + valueEntry.getValue());
+					for (IStruct trait : traits) {
+						LOG.debug("			Trait-Type-Name:" + trait.getTypeName());
+						Map<String, Object> traitValues = trait.getValuesMap();
+						for (Map.Entry<String, Object> valueEntry : traitValues.entrySet()) {
+							LOG.debug("				Trait-Value-Name:" + valueEntry.getKey());
+							LOG.debug("				Trait-Value:" + valueEntry.getValue());
+						}
 					}
+				} catch (AtlasException exception) {
+					LOG.error("Cannot print notification - ", exception);
 				}
 			}
 		}