You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2015/11/11 21:28:08 UTC
incubator-ranger git commit: RANGER-726: Updated tagsync for recent
changes in Atlas API
Repository: incubator-ranger
Updated Branches:
refs/heads/tag-policy 74ec8c8fa -> 6259fb60e
RANGER-726: Updated tagsync for recent changes in Atlas API
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/6259fb60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/6259fb60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/6259fb60
Branch: refs/heads/tag-policy
Commit: 6259fb60e1f07d8a6455079ce49ef5445f946f35
Parents: 74ec8c8
Author: Abhay Kulkarni <ak...@hortonworks.com>
Authored: Mon Nov 9 18:50:52 2015 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Wed Nov 11 12:00:28 2015 -0800
----------------------------------------------------------------------
.../atlas-client-0.5.0.2.3.1.1-19.jar | Bin 34558 -> 0 bytes
.../atlas-notification-0.5.0.2.3.1.1-19.jar | Bin 34734 -> 0 bytes
.../atlas-typesystem-0.5.0.2.3.1.1-19.jar | Bin 355350 -> 0 bytes
pom.xml | 4 +-
src/main/assembly/tagsync.xml | 3 +
.../conf/templates/installprop2xml.properties | 6 +-
tagsync/pom.xml | 18 +
tagsync/scripts/install.properties | 2 +-
tagsync/scripts/setup.py | 5 +-
.../source/atlas/AtlasNotificationMapper.java | 97 +++--
.../tagsync/source/atlas/AtlasUtility.java | 404 -------------------
.../tagsync/source/atlas/TagAtlasSource.java | 78 ++--
12 files changed, 132 insertions(+), 485 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/6259fb60/local-repo/org/apache/atlas/atlas-client/0.5.0.2.3.1.1-19/atlas-client-0.5.0.2.3.1.1-19.jar
----------------------------------------------------------------------
diff --git a/local-repo/org/apache/atlas/atlas-client/0.5.0.2.3.1.1-19/atlas-client-0.5.0.2.3.1.1-19.jar b/local-repo/org/apache/atlas/atlas-client/0.5.0.2.3.1.1-19/atlas-client-0.5.0.2.3.1.1-19.jar
deleted file mode 100644
index 1fb2ef7..0000000
Binary files a/local-repo/org/apache/atlas/atlas-client/0.5.0.2.3.1.1-19/atlas-client-0.5.0.2.3.1.1-19.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/6259fb60/local-repo/org/apache/atlas/atlas-notification/0.5.0.2.3.1.1-19/atlas-notification-0.5.0.2.3.1.1-19.jar
----------------------------------------------------------------------
diff --git a/local-repo/org/apache/atlas/atlas-notification/0.5.0.2.3.1.1-19/atlas-notification-0.5.0.2.3.1.1-19.jar b/local-repo/org/apache/atlas/atlas-notification/0.5.0.2.3.1.1-19/atlas-notification-0.5.0.2.3.1.1-19.jar
deleted file mode 100644
index 848eeb3..0000000
Binary files a/local-repo/org/apache/atlas/atlas-notification/0.5.0.2.3.1.1-19/atlas-notification-0.5.0.2.3.1.1-19.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/6259fb60/local-repo/org/apache/atlas/atlas-typesystem/0.5.0.2.3.1.1-19/atlas-typesystem-0.5.0.2.3.1.1-19.jar
----------------------------------------------------------------------
diff --git a/local-repo/org/apache/atlas/atlas-typesystem/0.5.0.2.3.1.1-19/atlas-typesystem-0.5.0.2.3.1.1-19.jar b/local-repo/org/apache/atlas/atlas-typesystem/0.5.0.2.3.1.1-19/atlas-typesystem-0.5.0.2.3.1.1-19.jar
deleted file mode 100644
index f619b6e..0000000
Binary files a/local-repo/org/apache/atlas/atlas-typesystem/0.5.0.2.3.1.1-19/atlas-typesystem-0.5.0.2.3.1.1-19.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/6259fb60/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0648d67..d60fca4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -213,7 +213,7 @@
<jaxb-api.version>2.2.2</jaxb-api.version>
<jackson.version>1.9.13</jackson.version>
<sun-jersey-bundle.version>1.19</sun-jersey-bundle.version>
- <atlas.version>0.5.0.2.3.1.1-19</atlas.version>
+ <atlas.version>0.6-incubating-SNAPSHOT</atlas.version>
<distMgmtStagingId>apache.staging.https</distMgmtStagingId>
<distMgmtStagingName>Apache Release Distribution Repository</distMgmtStagingName>
<distMgmtStagingUrl>https://repository.apache.org/service/local/staging/deploy/maven2</distMgmtStagingUrl>
@@ -264,6 +264,7 @@
<enabled>false</enabled>
</snapshots>
</repository>
+ <!--
<repository>
<id>repo</id>
<url>file://${basedir}/local-repo</url>
@@ -271,6 +272,7 @@
<enabled>true</enabled>
</snapshots>
</repository>
+ -->
</repositories>
<dependencyManagement>
<dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/6259fb60/src/main/assembly/tagsync.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml
index 331dae0..8adc5cc 100644
--- a/src/main/assembly/tagsync.xml
+++ b/src/main/assembly/tagsync.xml
@@ -56,7 +56,10 @@
<include>org.apache.atlas:atlas-notification</include>
<include>org.apache.atlas:atlas-typesystem</include>
<include>org.apache.atlas:atlas-client</include>
+ <include>org.apache.atlas:atlas-common</include>
<include>com.google.inject:guice</include>
+ <include>com.google.inject.extensions:guice-multibindings</include>
+ <include>org.codehaus.jettison:jettison</include>
<include>aopalliance:aopalliance</include>
<include>javax.inject:javax.inject</include>
<include>org.apache.kafka:kafka_2.10</include>
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/6259fb60/tagsync/conf/templates/installprop2xml.properties
----------------------------------------------------------------------
diff --git a/tagsync/conf/templates/installprop2xml.properties b/tagsync/conf/templates/installprop2xml.properties
index 5b63835..101a1ba 100644
--- a/tagsync/conf/templates/installprop2xml.properties
+++ b/tagsync/conf/templates/installprop2xml.properties
@@ -34,8 +34,8 @@ TAG_SOURCE_FILE_MOD_TIME_CHECK_INTERVAL = ranger.tagsync.filesource.modtime.chec
TAGSYNC_FILESOURCE_FILENAME = ranger.tagsync.filesource.filename
-TAGSYNC_ATLAS_KAFKA_ENDPOINTS = atlas.notification.kafka.bootstrap.servers
-TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = atlas.notification.kafka.zookeeper.connect
-TAGSYNC_ATLAS_CONSUMER_GROUP = atlas.notification.kafka.group.id
+TAGSYNC_ATLAS_KAFKA_ENDPOINTS = atlas.kafka.bootstrap.servers
+TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = atlas.kafka.zookeeper.connect
+TAGSYNC_ATLAS_CONSUMER_GROUP = atlas.kafka.entities.group.id
TAGSYNC_ATLAS_TO_RANGER_SERVICE_MAPPING = ranger.tagsync.atlas.to.service.mapping
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/6259fb60/tagsync/pom.xml
----------------------------------------------------------------------
diff --git a/tagsync/pom.xml b/tagsync/pom.xml
index b800f61..c860c4a 100644
--- a/tagsync/pom.xml
+++ b/tagsync/pom.xml
@@ -30,8 +30,10 @@
<version>0.5.0</version>
</parent>
+ <!--
<repositories>
+
<repository>
<id>repo</id>
<url>file://${basedir}/../local-repo</url>
@@ -40,6 +42,7 @@
</snapshots>
</repository>
</repositories>
+ -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -107,6 +110,16 @@
<version>4.0</version>
</dependency>
<dependency>
+ <groupId>com.google.inject.extensions</groupId>
+ <artifactId>guice-multibindings</artifactId>
+ <version>4.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ <version>1.1</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-notification</artifactId>
<version>${atlas.version}</version>
@@ -121,5 +134,10 @@
<artifactId>atlas-client</artifactId>
<version>${atlas.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-common</artifactId>
+ <version>${atlas.version}</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/6259fb60/tagsync/scripts/install.properties
----------------------------------------------------------------------
diff --git a/tagsync/scripts/install.properties b/tagsync/scripts/install.properties
index f7de6e3..b5ad580 100644
--- a/tagsync/scripts/install.properties
+++ b/tagsync/scripts/install.properties
@@ -53,7 +53,7 @@ TAG_SOURCE_FILE_MOD_TIME_CHECK_INTERVAL = 60000
TAGSYNC_ATLAS_KAFKA_ENDPOINTS = localhost:6667
TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = localhost:2181
-TAGSYNC_ATLAS_CONSUMER_GROUP = entityConsumer
+TAGSYNC_ATLAS_CONSUMER_GROUP = ranger_entities_consumer
# Mapping from Atlas hive instance-name to Ranger service-name
# this needs to be in format clusterName,componentType,serviceName;clusterName2,componentType2,serviceName2
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/6259fb60/tagsync/scripts/setup.py
----------------------------------------------------------------------
diff --git a/tagsync/scripts/setup.py b/tagsync/scripts/setup.py
index e4b2433..f7455b8 100755
--- a/tagsync/scripts/setup.py
+++ b/tagsync/scripts/setup.py
@@ -317,8 +317,9 @@ def main():
atlasOutFile = file(atlasOutFn, "a")
atlasOutFile.write("atlas.notification.embedded=false" + "\n")
- atlasOutFile.write("atlas.notification.kafka.acks=1" + "\n")
- atlasOutFile.write("atlas.notification.kafka.data=${sys:atlas.home}/data/kafka" + "\n")
+ atlasOutFile.write("atlas.kafka.acks=1" + "\n")
+ atlasOutFile.write("atlas.kafka.data=${sys:atlas.home}/data/kafka" + "\n")
+ atlasOutFile.write("atlas.kafka.hook.group.id=atlas" + "\n")
atlasOutFile.close()
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/6259fb60/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index 8046b68..7925b5c 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -19,9 +19,10 @@
package org.apache.ranger.tagsync.source.atlas;
+import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.entity.EntityNotification;
-import org.apache.atlas.typesystem.api.Entity;
-import org.apache.atlas.typesystem.api.Trait;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.IStruct;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -58,7 +59,9 @@ class AtlasNotificationMapper {
properties = props;
try {
- if (isEntityMappable(entityNotification.getEntity())) {
+ IReferenceableInstance entity = entityNotification.getEntity();
+
+ if (isEntityMappable(entity)) {
ret = createServiceTags(entityNotification);
} else {
if(LOG.isDebugEnabled()) {
@@ -71,7 +74,7 @@ class AtlasNotificationMapper {
return ret;
}
- static private boolean isEntityMappable(Entity entity) {
+ static private boolean isEntityMappable(IReferenceableInstance entity) {
boolean ret = false;
String entityTypeName = entity.getTypeName();
@@ -91,44 +94,43 @@ class AtlasNotificationMapper {
ServiceTags ret = null;
EntityNotification.OperationType opType = entityNotification.getOperationType();
- Entity entity = entityNotification.getEntity();
- String opName = entityNotification.getOperationType().name();
switch (opType) {
- case ENTITY_CREATED: {
- LOG.debug("ENTITY_CREATED notification is not handled, as Ranger will get necessary information from any subsequent TRAIT_ADDED notification");
+ case ENTITY_CREATE: {
+ LOG.debug("ENTITY_CREATE notification is not handled, as Ranger will get necessary information from any subsequent TRAIT_ADDED notification");
break;
}
- case ENTITY_UPDATED: {
- ret = getServiceTags(entity);
+ case ENTITY_UPDATE: {
+ ret = getServiceTags(entityNotification);
if (MapUtils.isEmpty(ret.getTags())) {
LOG.debug("No traits associated with this entity update notification. Ignoring it altogether");
ret = null;
}
break;
}
- case TRAIT_ADDED:
- case TRAIT_DELETED: {
- ret = getServiceTags(entity);
+ case TRAIT_ADD:
+ case TRAIT_DELETE: {
+ ret = getServiceTags(entityNotification);
break;
}
default:
- LOG.error(opName + ": unknown notification received - not handled");
+ LOG.error(opType + ": unknown notification received - not handled");
}
return ret;
}
- static private ServiceTags getServiceTags(Entity entity) throws Exception {
+ static private ServiceTags getServiceTags(EntityNotification entityNotification) throws Exception {
ServiceTags ret = null;
+ IReferenceableInstance entity = entityNotification.getEntity();
List<RangerServiceResource> serviceResources = new ArrayList<RangerServiceResource>();
RangerServiceResource serviceResource = getServiceResource(entity);
serviceResources.add(serviceResource);
- Map<Long, RangerTag> tags = getTags(entity);
+ Map<Long, RangerTag> tags = getTags(entityNotification);
Map<Long, RangerTagDef> tagDefs = getTagDefs(tags);
@@ -163,7 +165,7 @@ class AtlasNotificationMapper {
}
- static private RangerServiceResource getServiceResource(Entity entity) throws Exception {
+ static private RangerServiceResource getServiceResource(IReferenceableInstance entity) throws Exception {
RangerServiceResource ret = null;
@@ -224,7 +226,7 @@ class AtlasNotificationMapper {
ret = new RangerServiceResource();
- ret.setGuid(entity.getId().getGuid());
+ ret.setGuid(entity.getId()._getId());
ret.setId(1L);
ret.setServiceName(serviceName);
ret.setResourceElements(elements);
@@ -232,22 +234,24 @@ class AtlasNotificationMapper {
return ret;
}
- static private Map<Long, RangerTag> getTags(Entity entity) {
+ static private Map<Long, RangerTag> getTags(EntityNotification entityNotification) {
Map<Long, RangerTag> ret = null;
- Map<String, ? extends Trait> traits = entity.getTraits();
+ ret = new HashMap<Long, RangerTag>();
+
+ long index = 1;
+
+ List<IStruct> traits = entityNotification.getAllTraits();
+
+ for (IStruct trait : traits) {
- if (MapUtils.isNotEmpty(traits)) {
- ret = new HashMap<Long, RangerTag>();
- long index = 1;
+ String traitName = trait.getTypeName();
- for (Map.Entry<String, ? extends Trait> entry : traits.entrySet()) {
- String traitName = entry.getKey();
- Trait trait = entry.getValue();
+ Map<String, String> tagAttrValues = new HashMap<String, String>();
- Map<String, Object> attrValues = trait.getValues();
+ try {
- Map<String, String> tagAttrValues = new HashMap<String, String>();
+ Map<String, Object> attrValues = trait.getValuesMap();
for (Map.Entry<String, Object> attrValueEntry : attrValues.entrySet()) {
String attrName = attrValueEntry.getKey();
@@ -259,14 +263,17 @@ class AtlasNotificationMapper {
LOG.error("Cannot cast attribute-value to String, skipping... attrName=" + attrName);
}
}
+ } catch (AtlasException exception) {
+ LOG.error("Could not get values for trait:" + traitName, exception);
+ }
- RangerTag tag = new RangerTag();
+ RangerTag tag = new RangerTag();
- tag.setType(traitName);
- tag.setAttributes(tagAttrValues);
+ tag.setType(traitName);
+ tag.setAttributes(tagAttrValues);
+
+ ret.put(index++, tag);
- ret.put(index++, tag);
- }
}
return ret;
@@ -289,13 +296,13 @@ class AtlasNotificationMapper {
return ret;
}
- static private String[] getQualifiedNameComponents(Entity entity) {
+ static private String[] getQualifiedNameComponents(IReferenceableInstance entity) {
String ret[] = new String[5];
if (StringUtils.equals(entity.getTypeName(), ENTITY_TYPE_HIVE_DB)) {
- String clusterName = getAttribute(entity.getValues(), "clusterName", String.class);
- String name = getAttribute(entity.getValues(), "name", String.class);
+ String clusterName = getEntityAttribute(entity, "clusterName", String.class);
+ String name = getEntityAttribute(entity, "name", String.class);
ret[1] = clusterName;
ret[2] = name;
@@ -303,20 +310,20 @@ class AtlasNotificationMapper {
ret[0] = ret[1] + "." + ret[2];
if (LOG.isDebugEnabled()) {
- LOG.debug("----- Entity-Id:" + entity.getId().getGuid());
+ LOG.debug("----- Entity-Id:" + entity.getId()._getId());
LOG.debug("----- Entity-Type-Name:" + entity.getTypeName());
LOG.debug("----- Entity-Cluster-Name:" + clusterName);
LOG.debug("----- Entity-Name:" + name);
}
} else {
- String qualifiedName = getAttribute(entity.getValues(), ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+ String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
String nameHierarchy[] = qualifiedName.split(QUALIFIED_NAME_FORMAT_DELIMITER_STRING);
int hierarchyLevels = nameHierarchy.length;
if (LOG.isDebugEnabled()) {
- LOG.debug("----- Entity-Id:" + entity.getId().getGuid());
+ LOG.debug("----- Entity-Id:" + entity.getId()._getId());
LOG.debug("----- Entity-Type-Name:" + entity.getTypeName());
LOG.debug("----- Entity-Qualified-Name:" + qualifiedName);
LOG.debug("----- Entity-Qualified-Name-Components -----");
@@ -351,6 +358,18 @@ class AtlasNotificationMapper {
return TagSyncConfig.getServiceName(apacheComponent, instanceName, properties);
}
+ static private <T> T getEntityAttribute(IReferenceableInstance entity, String name, Class<T> type) {
+ T ret = null;
+
+ try {
+ Map<String, Object> valueMap = entity.getValuesMap();
+ ret = getAttribute(valueMap, name, type);
+ } catch (AtlasException exception) {
+ LOG.error("Cannot get map of values for entity: " + entity.getId()._getId(), exception);
+ }
+
+ return ret;
+ }
static private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) {
return type.cast(map.get(name));
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/6259fb60/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
deleted file mode 100644
index 2548c36..0000000
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.tagsync.source.atlas;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import org.apache.atlas.typesystem.EntityImpl;
-import org.apache.atlas.typesystem.IdImpl;
-import org.apache.atlas.typesystem.TraitImpl;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.atlas.typesystem.api.Entity;
-import org.apache.atlas.typesystem.api.Trait;
-import org.apache.ranger.admin.client.datatype.RESTResponse;
-import org.apache.ranger.plugin.util.RangerRESTClient;
-import org.apache.ranger.plugin.util.RangerRESTUtils;
-import org.apache.ranger.tagsync.process.TagSyncConfig;
-
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.util.*;
-
-
-// class AtlasUtil
-
-@SuppressWarnings("unchecked")
-public class AtlasUtility {
-
- private static final Log LOG = LogFactory.getLog(AtlasUtility.class);
-
- // Atlas APIs
-
- public static final String API_ATLAS_TYPES = "api/atlas/types";
- public static final String API_ATLAS_ENTITIES = "api/atlas/entities?type=";
- public static final String API_ATLAS_ENTITY = "api/atlas/entities/";
- public static final String API_ATLAS_TYPE = "api/atlas/types/";
-
- public static final String RESULTS_ATTRIBUTE = "results";
- public static final String DEFINITION_ATTRIBUTE = "definition";
- public static final String VALUES_ATTRIBUTE = "values";
- public static final String TRAITS_ATTRIBUTE = "traits";
- public static final String TYPE_NAME_ATTRIBUTE = "typeName";
- public static final String TRAIT_TYPES_ATTRIBUTE = "traitTypes";
- public static final String SUPER_TYPES_ATTRIBUTE = "superTypes";
- public static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = "attributeDefinitions";
- public static final String NAME_ATTRIBUTE = "name";
-
- private Type mapType = new TypeToken<Map<String, Object>>() {
- }.getType();
-
- private RangerRESTClient restClient;
- private Map<String, Entity> entities = new LinkedHashMap<>();
-
-
- // ----- Constructor ------------------------------------------------------
-
- public AtlasUtility(Properties properties) {
-
- String url = TagSyncConfig.getAtlasEndpoint(properties);
- String sslConfigFileName = TagSyncConfig.getAtlasSslConfigFileName(properties);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Initializing RangerRestClient with (url=" + url + ", sslConfigFileName" + sslConfigFileName + ")");
- }
-
- restClient = new RangerRESTClient(url, sslConfigFileName);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Initialized RangerRestClient with (url=" + url + ", sslConfigFileName=" + sslConfigFileName + ")");
- }
- }
-
- // update the set of entities with current from Atlas
- public void refreshAllEntities() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> TagAtlasSource.refreshAllEntities()");
- }
-
- try {
- entities.clear();
- entities.putAll(getAllEntities());
- } catch (IOException e) {
- LOG.error("getAllEntities() failed", e);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== TagAtlasSource.refreshAllEntities()");
- }
- }
-
- // ----- AtlasUtility ------------------------------------------------------
-
- public Map<String, Entity> getAllEntities() throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> TagAtlasSource.getAllEntities()");
- }
- Map<String, Entity> entities = new LinkedHashMap<>();
-
- Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES);
-
- List<String> types = getAttribute(typesResponse, RESULTS_ATTRIBUTE, List.class);
-
- for (String type : types) {
-
- Map<String, Object> entitiesResponse = atlasAPI(API_ATLAS_ENTITIES + type);
-
- List<String> guids = getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class);
-
- for (String guid : guids) {
-
- if (StringUtils.isNotBlank(guid)) {
-
- Map<Trait, Map<String, ? extends Trait>> traitSuperTypes = new HashMap<>();
-
- Map<String, Object> entityResponse = atlasAPI(API_ATLAS_ENTITY + guid);
-
- if (entityResponse.containsKey(DEFINITION_ATTRIBUTE)) {
- String definitionJSON = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, String.class);
-
- LOG.info("{");
- LOG.info(" \"entity-id\":" + guid + ",");
- LOG.info(" \"entity-definition\":" + definitionJSON);
- LOG.info("}");
-
- Map<String, Object> definition = new Gson().fromJson(definitionJSON, mapType);
-
- Map<String, Object> values = getAttribute(definition, VALUES_ATTRIBUTE, Map.class);
- Map<String, Object> traits = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
- String typeName = getAttribute(definition, TYPE_NAME_ATTRIBUTE, String.class);
-
- LOG.info("Received entity(typeName=" + typeName + ", id=" + guid + ")");
-
-
- Map<String, TraitImpl> traitMap = new HashMap<>();
-
- if (MapUtils.isNotEmpty(traits)) {
-
- LOG.info("Traits for entity(typeName=" + typeName + ", id=" + guid + ") ------ ");
-
- for (Map.Entry<String, Object> entry : traits.entrySet()) {
-
- Map<String, Object> trait = (Map<String, Object>) entry.getValue();
-
- Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
- String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
-
- Map<String, TraitImpl> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
-
- TraitImpl trait1 = new TraitImpl(traitTypeName, traitValues, superTypes);
-
- traitSuperTypes.put(trait1, superTypes);
-
- traitMap.put(entry.getKey(), trait1);
-
-
- LOG.info(" Trait(typeName=" + traitTypeName + ")");
-
- }
- } else {
- LOG.info("No traits for entity(typeName=" + typeName + ", id=" + guid + ")");
- }
- EntityImpl entity = new EntityImpl(new IdImpl(guid, 0), typeName, values, traitMap);
-
- showEntity(entity);
-
- entities.put(guid, entity);
-
- }
- }
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> TagAtlasSource.getAllEntities()");
- }
- return entities;
- }
-
-
- // ----- helper methods ----------------------------------------------------
-
- private Map<String, Object> getTraitType(String traitName)
- throws IOException {
-
- Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + traitName);
-
- if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) {
- String definitionJSON = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, String.class);
-
- Map<String, Object> definition = new Gson().fromJson(definitionJSON, mapType);
-
- List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class);
-
- if (traitTypes.size() > 0) {
- return (Map<String, Object>) traitTypes.get(0);
- }
- }
- return null;
- }
-
- private Map<String, TraitImpl> getTraitSuperTypes(Map<String, Object> traitType, Map<String, Object> values)
- throws IOException {
-
- Map<String, TraitImpl> superTypes = new HashMap<>();
-
- if (traitType != null) {
-
- List<String> superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class);
-
- for (String superTypeName : superTypeNames) {
-
- Map<String, Object> superTraitType = getTraitType(superTypeName);
-
- if (superTraitType != null) {
- List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
-
- Map<String, Object> superTypeValues = new HashMap<>();
- for (Map<String, Object> attributeDefinition : attributeDefinitions) {
-
- String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString();
- if (values.containsKey(attributeName)) {
- superTypeValues.put(attributeName, values.get(attributeName));
- }
- }
-
- superTypes.put(superTypeName,
- //new TraitImpl(getTraitSuperTypes(superTraitType, superTypeValues), superTypeValues, superTypeName));
- new TraitImpl(superTypeName, superTypeValues, getTraitSuperTypes(superTraitType, superTypeValues)));
- }
- }
- }
- return superTypes;
- }
-
-
- /*
- private Map<String, Object> atlasAPI(String endpoint) throws IOException {
- InputStream in = streamProvider.readFrom(atlasEndpoint + endpoint, "GET", (String) null, Collections.<String, String>emptyMap());
- return new Gson().fromJson(IOUtils.toString(in, "UTF-8"), mapType);
- }
- */
-
-
- private Map<String, Object> atlasAPI(String endpoint) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> TagAtlasSource.atlasAPI(" + endpoint + ")");
- }
- // Create a REST client and perform a get on it
- Map<String, Object> ret = new HashMap<String, Object>();
-
- WebResource webResource = restClient.getResource(endpoint);
-
- ClientResponse response = webResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class);
-
- if (response != null && response.getStatus() == 200) {
- ret = response.getEntity(ret.getClass());
- } else {
- LOG.error("Atlas REST call returned with response={" + response + "}");
-
- RESTResponse resp = RESTResponse.fromClientResponse(response);
- LOG.error("Error getting Atlas Entity. request=" + webResource.toString()
- + ", response=" + resp.toString());
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== TagAtlasSource.atlasAPI(" + endpoint + ")");
- }
- return ret;
- }
-
- private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) {
- return type.cast(map.get(name));
- }
-
- public void showEntity(Entity entity) {
-
- LOG.debug("Entity-id :" + entity.getId());
-
- LOG.debug("Type: " + entity.getTypeName());
-
- LOG.debug("----- Values -----");
-
- for (Map.Entry<String, Object> entry : entity.getValues().entrySet()) {
- LOG.debug(" Name: " + entry.getKey() + "");
- Object value = entry.getValue();
- LOG.debug(" Value: " + getValue(value, entities.keySet()));
- }
-
- LOG.debug("----- Traits -----");
-
- for (String traitName : entity.getTraits().keySet()) {
- LOG.debug(" Name:" + entity.getId() + ", trait=" + traitName + ">" + traitName);
- }
-
- }
-
- public void showTrait(Entity entity, String traitId) {
-
- String[] traitNames = traitId.split(",");
-
- Trait trait = entity.getTraits().get(traitNames[0]);
-
- for (int i = 1; i < traitNames.length; ++i) {
- trait = trait.getSuperTypes().get(traitNames[i]);
- }
-
- String typeName = trait.getTypeName();
-
- LOG.debug("Trait " + typeName + " for Entity id=" + entity.getId());
-
- LOG.debug("Type: " + typeName);
-
- LOG.debug("----- Values ------");
-
- for (Map.Entry<String, Object> entry : trait.getValues().entrySet()) {
- LOG.debug("Name:" + entry.getKey());
- Object value = entry.getValue();
- LOG.debug("Value:" + getValue(value, entities.keySet()));
- }
-
- LOG.debug("Super Traits");
-
-
- for (String traitName : trait.getSuperTypes().keySet()) {
- LOG.debug("Name=" + entity.getId() + "&trait=" + traitId + "," + traitName + ">" + traitName);
- }
- }
-
- // resolve the given value if necessary
- private String getValue(Object value, Set<String> ids) {
- if (value == null) {
- return "";
- }
- String idString = getIdValue(value, ids);
- if (idString != null) {
- return idString;
- }
-
- idString = getIdListValue(value, ids);
- if (idString != null) {
- return idString;
- }
-
- return value.toString();
- }
-
- // get an id from the given value; return null if the value is not an id type
- private String getIdValue(Object value, Set<String> ids) {
- if (value instanceof Map) {
- Map map = (Map) value;
- if (map.size() == 3 && map.containsKey("id")) {
- String id = map.get("id").toString();
- if (ids.contains(id)) {
- return id;
- }
- }
- }
- return null;
- }
-
- // get an id list from the given value; return null if the value is not an id list type
- private String getIdListValue(Object value, Set<String> ids) {
- if (value instanceof List) {
- List list = (List) value;
- if (list.size() > 0) {
- StringBuilder sb = new StringBuilder();
- for (Object o : list) {
- String idString = getIdValue(o, ids);
- if (idString == null) {
- return value.toString();
- }
- if (sb.length() > 0) {
- sb.append(", ");
- }
- sb.append(idString);
- }
- return sb.toString();
- }
- }
- return null;
- }
-}
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/6259fb60/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
index 2725b23..fd64d12 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java
@@ -23,17 +23,21 @@ import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Injector;
+import com.google.inject.Provider;
+import org.apache.atlas.AtlasException;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.notification.entity.EntityNotification;
-import org.apache.atlas.notification.entity.EntityNotificationConsumer;
-import org.apache.atlas.notification.entity.EntityNotificationConsumerProvider;
-import org.apache.atlas.typesystem.api.Entity;
-import org.apache.atlas.typesystem.api.Trait;
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.atlas.typesystem.IStruct;
+
import org.apache.ranger.tagsync.model.TagSink;
import org.apache.ranger.tagsync.model.TagSource;
import org.apache.ranger.plugin.util.ServiceTags;
@@ -47,9 +51,9 @@ public class TagAtlasSource implements TagSource {
public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = "application.properties";
- public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.notification.kafka.bootstrap.servers";
- public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.notification.kafka.zookeeper.connect";
- public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.notification.kafka.group.id";
+ public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.kafka.bootstrap.servers";
+ public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.kafka.zookeeper.connect";
+ public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id";
private TagSink tagSink;
private Properties properties;
@@ -112,9 +116,11 @@ public class TagAtlasSource implements TagSource {
Injector injector = Guice.createInjector(notificationModule);
- EntityNotificationConsumerProvider consumerProvider = injector.getInstance(EntityNotificationConsumerProvider.class);
+ Provider<NotificationInterface> consumerProvider = injector.getProvider(NotificationInterface.class);
+ NotificationInterface notification = consumerProvider.get();
+ List<NotificationConsumer<EntityNotification>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
- consumerTask = new ConsumerRunnable(consumerProvider.get());
+ consumerTask = new ConsumerRunnable(iterators.get(0));
}
if (LOG.isDebugEnabled()) {
@@ -158,26 +164,25 @@ public class TagAtlasSource implements TagSource {
private class ConsumerRunnable implements Runnable {
- private final EntityNotificationConsumer consumer;
+ private final Iterator<EntityNotification> consumerIterator;
- private ConsumerRunnable(EntityNotificationConsumer consumer) {
- this.consumer = consumer;
+ private ConsumerRunnable(Iterator<EntityNotification> consumerIterator) {
+ this.consumerIterator = consumerIterator;
}
-
// ----- Runnable --------------------------------------------------------
@Override
public void run() {
- while (consumer.hasNext()) {
+ while (consumerIterator.hasNext()) {
try {
- EntityNotification notification = consumer.next();
+ EntityNotification notification = consumerIterator.next();
if (notification != null) {
printNotification(notification);
ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification, properties);
if (serviceTags == null) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Did not create ServiceTags structure for notification type:" + notification.getOperationType().name());
+ LOG.debug("Did not create ServiceTags structure for notification type:" + notification.getOperationType());
}
} else {
if (LOG.isDebugEnabled()) {
@@ -199,33 +204,36 @@ public class TagAtlasSource implements TagSource {
}
public void printNotification(EntityNotification notification) {
- Entity entity = notification.getEntity();
+ IReferenceableInstance entity = notification.getEntity();
if (LOG.isDebugEnabled()) {
- LOG.debug("Notification-Type: " + notification.getOperationType().name());
- LOG.debug("Entity-Id: " + entity.getId().getGuid());
- LOG.debug("Entity-Type: " + entity.getTypeName());
+ try {
+ LOG.debug("Notification-Type: " + notification.getOperationType());
+ LOG.debug("Entity-Id: " + entity.getId()._getId());
+ LOG.debug("Entity-Type: " + entity.getTypeName());
- LOG.debug("----------- Entity Values ----------");
+ LOG.debug("----------- Entity Values ----------");
- for (Map.Entry<String, Object> entry : entity.getValues().entrySet()) {
- LOG.debug(" Name:" + entry.getKey());
- Object value = entry.getValue();
- LOG.debug(" Value:" + value);
- }
+ for (Map.Entry<String, Object> entry : entity.getValuesMap().entrySet()) {
+ LOG.debug(" Name:" + entry.getKey());
+ Object value = entry.getValue();
+ LOG.debug(" Value:" + value);
+ }
- LOG.debug("----------- Entity Traits ----------");
+ LOG.debug("----------- Entity Traits ----------");
+ List<IStruct> traits = notification.getAllTraits();
- for (Map.Entry<String, ? extends Trait> entry : entity.getTraits().entrySet()) {
- LOG.debug(" Trait-Name:" + entry.getKey());
- Trait trait = entry.getValue();
- LOG.debug(" Trait-Type:" + trait.getTypeName());
- Map<String, Object> traitValues = trait.getValues();
- for (Map.Entry<String, Object> valueEntry : traitValues.entrySet()) {
- LOG.debug(" Trait-Value-Name:" + valueEntry.getKey());
- LOG.debug(" Trait-Value:" + valueEntry.getValue());
+ for (IStruct trait : traits) {
+ LOG.debug(" Trait-Type-Name:" + trait.getTypeName());
+ Map<String, Object> traitValues = trait.getValuesMap();
+ for (Map.Entry<String, Object> valueEntry : traitValues.entrySet()) {
+ LOG.debug(" Trait-Value-Name:" + valueEntry.getKey());
+ LOG.debug(" Trait-Value:" + valueEntry.getValue());
+ }
}
+ } catch (AtlasException exception) {
+ LOG.error("Cannot print notification - ", exception);
}
}
}