You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ab...@apache.org on 2018/02/28 00:59:45 UTC
[2/2] ranger git commit: RANGER-1996: Change Atlas version from 0.8.2
to 1.0.0-SNAPSHOT
RANGER-1996: Change Atlas version from 0.8.2 to 1.0.0-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/2626e5cc
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/2626e5cc
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/2626e5cc
Branch: refs/heads/master
Commit: 2626e5ccf03860de96b25b428eb4e64a4447cb8c
Parents: 7954cbb
Author: Abhay Kulkarni <ak...@hortonworks.com>
Authored: Thu Dec 7 17:29:41 2017 -0800
Committer: Abhay Kulkarni <ak...@hortonworks.com>
Committed: Tue Feb 27 16:40:13 2018 -0800
----------------------------------------------------------------------
.../atlas/authorizer/RangerAtlasAuthorizer.java | 120 +++----
.../atlas/authorizer/RangerAtlasResource.java | 60 ----
pom.xml | 4 +-
.../atlas/authorizer/RangerAtlasAuthorizer.java | 94 ++++--
src/main/assembly/tagsync.xml | 30 +-
tagsync/pom.xml | 75 ++---
.../ranger/tagsync/process/TagSynchronizer.java | 2 +-
.../tagsync/sink/tagadmin/TagAdminRESTSink.java | 11 +-
.../source/atlas/AtlasEntityWithTraits.java | 98 ------
.../source/atlas/AtlasHbaseResourceMapper.java | 12 +-
.../source/atlas/AtlasHdfsResourceMapper.java | 13 +-
.../source/atlas/AtlasHiveResourceMapper.java | 11 +-
.../source/atlas/AtlasKafkaResourceMapper.java | 17 +-
.../source/atlas/AtlasNotificationMapper.java | 142 ++++----
.../source/atlas/AtlasResourceMapper.java | 26 +-
.../source/atlas/AtlasResourceMapperUtil.java | 66 ++--
.../source/atlas/AtlasStormResourceMapper.java | 10 +-
.../tagsync/source/atlas/AtlasTagSource.java | 97 +++---
.../source/atlasrest/AtlasRESTTagSource.java | 238 ++++++++++++--
.../tagsync/source/atlasrest/AtlasRESTUtil.java | 325 -------------------
.../source/atlasrest/RangerAtlasEntity.java | 60 ++++
.../atlasrest/RangerAtlasEntityWithTags.java | 118 +++++++
.../process/TestHbaseResourceMapper.java | 56 ++--
.../tagsync/process/TestHdfsResourceMapper.java | 24 +-
.../tagsync/process/TestHiveResourceMapper.java | 28 +-
.../process/TestKafkaResourceMapper.java | 16 +-
26 files changed, 793 insertions(+), 960 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasAuthorizer.java
----------------------------------------------------------------------
diff --git a/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasAuthorizer.java b/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasAuthorizer.java
index 542acf9..90e75a1 100644
--- a/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasAuthorizer.java
+++ b/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasAuthorizer.java
@@ -19,26 +19,21 @@
package org.apache.ranger.authorization.atlas.authorizer;
-import java.util.Date;
-import java.util.Set;
-import org.apache.atlas.authorize.AtlasAccessRequest;
+import org.apache.atlas.authorize.AtlasAdminAccessRequest;
import org.apache.atlas.authorize.AtlasAuthorizationException;
+import org.apache.atlas.authorize.AtlasEntityAccessRequest;
+import org.apache.atlas.authorize.AtlasTypeAccessRequest;
import org.apache.atlas.authorize.AtlasAuthorizer;
-import org.apache.atlas.authorize.AtlasResourceTypes;
-import org.apache.commons.logging.Log;
import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
-import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
-import org.apache.ranger.plugin.policyengine.RangerAccessResult;
import org.apache.ranger.plugin.service.RangerBasePlugin;
-import org.apache.ranger.plugin.util.RangerPerfTracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class RangerAtlasAuthorizer implements AtlasAuthorizer {
private static final Logger LOG = LoggerFactory.getLogger(RangerAtlasAuthorizer.class);
- private static final Log PERF_ATLASAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("atlasauth.request");
- private static boolean isDebugEnabled = LOG.isDebugEnabled();
+
private static volatile RangerBasePlugin atlasPlugin = null;
@Override
@@ -55,10 +50,11 @@ public class RangerAtlasAuthorizer implements AtlasAuthorizer {
if (plugin == null) {
plugin = new RangerAtlasPlugin();
+
plugin.init();
plugin.setResultProcessor(new RangerDefaultAuditHandler());
- atlasPlugin = plugin;
+ atlasPlugin = plugin;
}
}
}
@@ -68,66 +64,61 @@ public class RangerAtlasAuthorizer implements AtlasAuthorizer {
}
@Override
- public boolean isAccessAllowed(AtlasAccessRequest request) throws AtlasAuthorizationException {
- boolean isAccessAllowed = true;
- if (isDebugEnabled) {
- LOG.debug("==> isAccessAllowed( " + request + " )");
+ public void cleanUp() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> cleanUp ");
}
- RangerPerfTracer perf = null;
+ }
- if(RangerPerfTracer.isPerfTraceEnabled(PERF_ATLASAUTH_REQUEST_LOG)) {
- perf = RangerPerfTracer.getPerfTracer(PERF_ATLASAUTH_REQUEST_LOG, "RangerAtlasAuthorizer.isAccessAllowed(request=" + request + ")");
+ @Override
+ public boolean isAccessAllowed(AtlasAdminAccessRequest request) throws AtlasAuthorizationException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> isAccessAllowed(AtlasAdminAccessRequest)");
}
- String resource = request.getResource();
- String user = request.getUser();
- Set<String> userGroups = request.getUserGroups();
- String action = request.getAction().name();
- Set<AtlasResourceTypes> resourceTypes = request.getResourceTypes();
- String clientIPAddress = request.getClientIPAddress();
- String clusterName = atlasPlugin.getClusterName();
-
- for (AtlasResourceTypes resourceType : resourceTypes) {
- RangerAtlasAccessRequest rangerRequest =
- new RangerAtlasAccessRequest(resourceType, resource, action, user, userGroups, clientIPAddress, clusterName);
- if (isDebugEnabled) {
- LOG.debug("Creating RangerAtlasAccessRequest with values [resource : " + resource + ", user : " + user
- + ", Groups : " + userGroups + ", action : " + action + ", resourceType : " + resourceType
- + ", clientIP : " + clientIPAddress + ", clusterName : " + clusterName + "]");
- }
- isAccessAllowed = checkAccess(rangerRequest);
- if (!isAccessAllowed) {
- break;
- }
- }
+ final boolean ret;
- RangerPerfTracer.log(perf);
+ ret = true; // TODO: evaluate Ranger policies
- if (isDebugEnabled) {
- LOG.debug("<== isAccessAllowed Returning value :: " + isAccessAllowed);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== isAccessAllowed(AtlasAdminAccessRequest)");
}
- return isAccessAllowed;
+
+ return ret;
}
- private boolean checkAccess(RangerAtlasAccessRequest request) {
- boolean isAccessAllowed = false;
- RangerBasePlugin plugin = atlasPlugin;
+ @Override
+ public boolean isAccessAllowed(AtlasEntityAccessRequest request) throws AtlasAuthorizationException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> isAccessAllowed(AtlasEntityAccessRequest)");
+ }
+
+ final boolean ret;
+
+ ret = true; // TODO: evaluate Ranger policies
- if (plugin != null) {
- RangerAccessResult rangerResult = plugin.isAccessAllowed(request);
- isAccessAllowed = rangerResult != null && rangerResult.getIsAllowed();
- } else {
- isAccessAllowed = false;
- LOG.warn("AtlasPlugin not initialized properly : " + plugin+"... Access blocked!!!");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== isAccessAllowed(AtlasEntityAccessRequest)");
}
- return isAccessAllowed;
+
+ return ret;
}
@Override
- public void cleanUp() {
- if (isDebugEnabled) {
- LOG.debug("==> cleanUp ");
+ public boolean isAccessAllowed(AtlasTypeAccessRequest request) throws AtlasAuthorizationException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> isAccessAllowed(AtlasTypeAccessRequest)");
}
+
+ final boolean ret;
+
+ ret = true; // TODO: evaluate Ranger policies
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== isAccessAllowed(AtlasTypeAccessRequest)");
+ }
+
+ return ret;
}
class RangerAtlasPlugin extends RangerBasePlugin {
@@ -135,21 +126,4 @@ public class RangerAtlasAuthorizer implements AtlasAuthorizer {
super("atlas", "atlas");
}
}
-
-}
-
-class RangerAtlasAccessRequest extends RangerAccessRequestImpl {
-
- public RangerAtlasAccessRequest(AtlasResourceTypes resType, String resource, String action, String user,
- Set<String> userGroups, String clientIp, String clusterName) {
- super.setResource(new RangerAtlasResource(resType, resource));
- super.setAccessType(action);
- super.setUser(user);
- super.setUserGroups(userGroups);
- super.setAccessTime(new Date(System.currentTimeMillis()));
- super.setClientIPAddress(clientIp);
- super.setAction(action);
- super.setClusterName(clusterName);
- }
-
}
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasResource.java
----------------------------------------------------------------------
diff --git a/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasResource.java b/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasResource.java
deleted file mode 100644
index f056f3e..0000000
--- a/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasResource.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.authorization.atlas.authorizer;
-
-import org.apache.atlas.authorize.AtlasResourceTypes;
-import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RangerAtlasResource extends RangerAccessResourceImpl {
- public static final String KEY_TYPE = "type";
- public static final String KEY_ENTITY = "entity";
- public static final String KEY_OPERATION = "operation";
- public static final String KEY_TAXONOMY = "taxonomy";
- public static final String KEY_TERM = "term";
-
- private static final Logger LOG = LoggerFactory.getLogger(RangerAtlasResource.class);
-
- public RangerAtlasResource(AtlasResourceTypes resourceType, String atlasResource) {
- switch (resourceType) {
- case TYPE:
- setValue(KEY_TYPE, atlasResource);
- break;
- case ENTITY:
- setValue(KEY_ENTITY, atlasResource);
- break;
- case OPERATION:
- setValue(KEY_OPERATION, atlasResource);
- break;
- case TAXONOMY:
- setValue(KEY_TAXONOMY, atlasResource);
- break;
- case TERM:
- setValue(KEY_TERM, atlasResource);
- break;
- default:
- LOG.warn("Invalid Resource : " + atlasResource);
- break;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 294c422..2f1af2d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,9 +125,10 @@
<apacheds.version>2.0.0-M22</apacheds.version>
<asm.all.version>3.2</asm.all.version>
<aspectj.version>1.8.2</aspectj.version>
- <atlas.version>0.8.2</atlas.version>
+ <atlas.version>1.0.0-SNAPSHOT</atlas.version>
<atlas.guava.version>14.0</atlas.guava.version>
<atlas.gson.version>2.5</atlas.gson.version>
+ <atlas.jackson.version>2.9.2</atlas.jackson.version>
<atlas.jettison.version>1.3.7</atlas.jettison.version>
<atlas.commons.logging.version>1.1.3</atlas.commons.logging.version>
<bouncycastle.version>1.55</bouncycastle.version>
@@ -208,6 +209,7 @@
<springframework.version>3.2.10.RELEASE</springframework.version>
<sqoop.version>1.99.7</sqoop.version>
<storm.version>1.2.0</storm.version>
+ <sun-jersey-bundle.version>1.19</sun-jersey-bundle.version>
<tomcat.embed.version>7.0.82</tomcat.embed.version>
<velocity.version>1.7</velocity.version>
<zookeeper.version>3.4.6</zookeeper.version>
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/ranger-atlas-plugin-shim/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasAuthorizer.java
----------------------------------------------------------------------
diff --git a/ranger-atlas-plugin-shim/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasAuthorizer.java b/ranger-atlas-plugin-shim/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasAuthorizer.java
index 1f82fcc..9302bdd 100644
--- a/ranger-atlas-plugin-shim/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasAuthorizer.java
+++ b/ranger-atlas-plugin-shim/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasAuthorizer.java
@@ -19,7 +19,9 @@
package org.apache.ranger.authorization.atlas.authorizer;
-import org.apache.atlas.authorize.AtlasAccessRequest;
+import org.apache.atlas.authorize.AtlasAdminAccessRequest;
+import org.apache.atlas.authorize.AtlasEntityAccessRequest;
+import org.apache.atlas.authorize.AtlasTypeAccessRequest;
import org.apache.atlas.authorize.AtlasAuthorizationException;
import org.apache.atlas.authorize.AtlasAuthorizer;
import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
@@ -90,40 +92,88 @@ public class RangerAtlasAuthorizer implements AtlasAuthorizer {
}
- @Override
- public boolean isAccessAllowed(AtlasAccessRequest request) throws AtlasAuthorizationException {
- boolean isAccessAllowed = false;
- if (isDebugEnabled) {
- LOG.debug("isAccessAllowed <===");
- }
+ @Override
+ public void cleanUp() {
+ if (isDebugEnabled) {
+ LOG.debug("cleanUp <===");
+ }
+ try {
+ activatePluginClassLoader();
+ rangerAtlasAuthorizerImpl.cleanUp();
+ } finally {
+ deactivatePluginClassLoader();
+ }
- try {
+ }
+
+ @Override
+ public boolean isAccessAllowed(AtlasAdminAccessRequest request) throws AtlasAuthorizationException {
+ if (isDebugEnabled) {
+ LOG.debug("==> isAccessAllowed(AtlasAdminAccessRequest)");
+ }
+
+ final boolean ret;
+
+ try {
activatePluginClassLoader();
- isAccessAllowed = rangerAtlasAuthorizerImpl.isAccessAllowed(request);
+ ret = rangerAtlasAuthorizerImpl.isAccessAllowed(request);
} finally {
deactivatePluginClassLoader();
}
- if (isDebugEnabled) {
- LOG.debug("isAccessAllowed ===> Returning value :: " + isAccessAllowed);
- }
- return isAccessAllowed;
- }
+ if (isDebugEnabled) {
+ LOG.debug("<== isAccessAllowed(AtlasAdminAccessRequest): " + ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public boolean isAccessAllowed(AtlasEntityAccessRequest request) throws AtlasAuthorizationException {
+ if (isDebugEnabled) {
+ LOG.debug("==> isAccessAllowed(AtlasEntityAccessRequest)");
+ }
+
+ final boolean ret;
- @Override
- public void cleanUp() {
- if (isDebugEnabled) {
- LOG.debug("cleanUp <===");
- }
- try {
+ try {
activatePluginClassLoader();
- rangerAtlasAuthorizerImpl.cleanUp();
+
+ ret = rangerAtlasAuthorizerImpl.isAccessAllowed(request);
} finally {
deactivatePluginClassLoader();
}
- }
+ if (isDebugEnabled) {
+ LOG.debug("<== isAccessAllowed(AtlasEntityAccessRequest): " + ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public boolean isAccessAllowed(AtlasTypeAccessRequest request) throws AtlasAuthorizationException {
+ if (isDebugEnabled) {
+ LOG.debug("==> isAccessAllowed(AtlasTypeAccessRequest)");
+ }
+
+ final boolean ret;
+
+ try {
+ activatePluginClassLoader();
+
+ ret = rangerAtlasAuthorizerImpl.isAccessAllowed(request);
+ } finally {
+ deactivatePluginClassLoader();
+ }
+
+ if (isDebugEnabled) {
+ LOG.debug("<== isAccessAllowed(AtlasTypeAccessRequest): " + ret);
+ }
+
+ return ret;
+ }
private void activatePluginClassLoader() {
if(rangerPluginClassLoader != null) {
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/src/main/assembly/tagsync.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml
index 0b17151..c929395 100644
--- a/src/main/assembly/tagsync.xml
+++ b/src/main/assembly/tagsync.xml
@@ -37,14 +37,13 @@
<include>com.101tec:zkclient</include>
<include>com.google.code.gson:gson:jar:${gson.version}</include>
<include>com.google.guava:guava:jar:${google.guava.version}</include>
- <include>com.google.inject:guice:jar:${guice.version}</include>
- <include>com.google.inject.extensions:guice-multibindings:jar:${guice.version}</include>
<include>com.sun.jersey:jersey-bundle:jar:${jersey-bundle.version}</include>
- <include>com.thoughtworks.paranamer:paranamer:jar:${paranamer.version}</include>
- <include>com.yammer.metrics:metrics-core</include>
+ <include>com.sun.jersey.contribs:jersey-multipart:jar:${sun-jersey-bundle.version}</include>
<include>org.apache.atlas:atlas-notification:jar:${atlas.version}</include>
- <include>org.apache.atlas:atlas-typesystem:jar:${atlas.version}</include>
- <include>org.apache.atlas:atlas-client:jar:${atlas.version}</include>
+ <include>org.apache.atlas:atlas-intg:jar:${atlas.version}</include>
+ <include>org.apache.atlas:atlas-client-v1:jar:${atlas.version}</include>
+ <include>org.apache.atlas:atlas-client-v2:jar:${atlas.version}</include>
+ <include>org.apache.atlas:atlas-client-common:jar:${atlas.version}</include>
<include>org.apache.atlas:atlas-common:jar:${atlas.version}</include>
<include>org.apache.hadoop:hadoop-auth</include>
<include>org.apache.hadoop:hadoop-common</include>
@@ -55,20 +54,15 @@
<include>org.apache.ranger:ranger-plugins-common</include>
<include>org.apache.ranger:ranger-util</include>
<include>org.apache.zookeeper:zookeeper:jar:${zookeeper.version}</include>
- <include>org.codehaus.jackson:jackson-core-asl</include>
- <include>org.codehaus.jackson:jackson-jaxrs</include>
- <include>org.codehaus.jackson:jackson-mapper-asl</include>
- <include>org.codehaus.jackson:jackson-xc</include>
+ <include>com.fasterxml.jackson.core:jackson-annotations:jar:${atlas.jackson.version}</include>
+ <include>com.fasterxml.jackson.core:jackson-core:jar:${atlas.jackson.version}</include>
+ <include>com.fasterxml.jackson.core:jackson-databind:jar:${atlas.jackson.version}</include>
+ <include>com.fasterxml.jackson.jaxrs:jackson-jaxrs-base:jar:${atlas.jackson.version}</include>
+ <include>com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:jar:${atlas.jackson.version}</include>
+ <include>org.codehaus.jackson:jackson-core-asl:jar:${codehaus.jackson.version}</include>
+ <include>org.codehaus.jackson:jackson-jaxrs:jar:${codehaus.jackson.version}</include>
<include>org.codehaus.jettison:jettison:jar:${jettison.version}</include>
- <include>org.json4s:json4s-native_${scala.binary.version}:jar:${json4s.version}</include>
- <include>org.json4s:json4s-core_${scala.binary.version}:jar:${json4s.version}</include>
- <include>org.json4s:json4s-ast_${scala.binary.version}:jar:${json4s.version}</include>
<include>org.scala-lang:scala-library:jar:${scala.version}</include>
- <include>org.scala-lang:scalap:jar:${scala.version}</include>
- <include>org.scala-lang:scala-compiler:jar:${scala.version}</include>
- <include>org.scala-lang:scala-reflect:jar:${scala.version}</include>
- <include>org.scala-lang.modules:scala-xml_${scala.binary.version}:jar:${scala.xml.version}</include>
- <include>org.scala-lang.modules:scala-parser-combinators_${scala.binary.version}:jar:${scala.xml.version}</include>
<include>org.slf4j:slf4j-api</include>
<include>aopalliance:aopalliance:jar:${aopalliance.version}</include>
<include>commons-cli:commons-cli:jar:${commons.cli.version}</include>
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/tagsync/pom.xml
----------------------------------------------------------------------
diff --git a/tagsync/pom.xml b/tagsync/pom.xml
index 8757f8d..7e53641 100644
--- a/tagsync/pom.xml
+++ b/tagsync/pom.xml
@@ -55,6 +55,11 @@
<version>${jersey-bundle.version}</version>
</dependency>
<dependency>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>jersey-multipart</artifactId>
+ <version>${sun-jersey-bundle.version}</version>
+ </dependency>
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>${commons.cli.version}</version>
@@ -95,14 +100,14 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>com.google.inject</groupId>
- <artifactId>guice</artifactId>
- <version>${guice.version}</version>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <version>${codehaus.jackson.version}</version>
</dependency>
<dependency>
- <groupId>com.google.inject.extensions</groupId>
- <artifactId>guice-multibindings</artifactId>
- <version>${guice.version}</version>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-jaxrs</artifactId>
+ <version>${codehaus.jackson.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jettison</groupId>
@@ -116,7 +121,7 @@
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
- <artifactId>atlas-typesystem</artifactId>
+ <artifactId>atlas-intg</artifactId>
<version>${atlas.version}</version>
</dependency>
<dependency>
@@ -126,6 +131,11 @@
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-client-v2</artifactId>
+ <version>${atlas.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
<artifactId>atlas-common</artifactId>
<version>${atlas.version}</version>
<exclusions>
@@ -160,53 +170,34 @@
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- <version>${scala.version}</version>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scalap</artifactId>
- <version>${scala.version}</version>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- <version>${scala.version}</version>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${atlas.jackson.version}</version>
</dependency>
<dependency>
- <groupId>org.scala-lang.modules</groupId>
- <artifactId>scala-xml_${scala.binary.version}</artifactId>
- <version>${scala.xml.version}</version>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${atlas.jackson.version}</version>
</dependency>
<dependency>
- <groupId>org.scala-lang.modules</groupId>
- <artifactId>scala-parser-combinators_${scala.binary.version}</artifactId>
- <version>${scala.xml.version}</version>
+ <groupId>com.fasterxml.jackson.jaxrs</groupId>
+ <artifactId>jackson-jaxrs-base</artifactId>
+ <version>${atlas.jackson.version}</version>
</dependency>
<dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-core_${scala.binary.version}</artifactId>
- <version>${json4s.version}</version>
+ <groupId>com.fasterxml.jackson.jaxrs</groupId>
+ <artifactId>jackson-jaxrs-json-provider</artifactId>
+ <version>${atlas.jackson.version}</version>
</dependency>
<dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-native_${scala.binary.version}</artifactId>
- <version>${json4s.version}</version>
- </dependency>
- <dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-ast_${scala.binary.version}</artifactId>
- <version>${json4s.version}</version>
- </dependency>
- <dependency>
- <groupId>com.thoughtworks.paranamer</groupId>
- <artifactId>paranamer</artifactId>
- <version>${paranamer.version}</version>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>${atlas.jackson.version}</version>
</dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
index b07cd34..45997e4 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
@@ -107,7 +107,7 @@ public class TagSynchronizer {
if (ret) {
LOG.info("Initializing TAG source and sink");
-
+ ret = false;
tagSink = initializeTagSink(properties);
if (tagSink != null) {
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
index c34b6ea..a1dc8f5 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
@@ -65,13 +65,13 @@ public class TagAdminRESTSink implements TagSink, Runnable {
@Override
public boolean initialize(Properties properties) {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("==> TagAdminRESTSink.initialize()");
}
boolean ret = false;
- String restUrl = TagSyncConfig.getTagAdminRESTUrl(properties);
+ String restUrl = TagSyncConfig.getTagAdminRESTUrl(properties);
String sslConfigFile = TagSyncConfig.getTagAdminRESTSslConfigFile(properties);
String userName = TagSyncConfig.getTagAdminUserName(properties);
String password = TagSyncConfig.getTagAdminPassword(properties);
@@ -89,16 +89,19 @@ public class TagAdminRESTSink implements TagSink, Runnable {
if (StringUtils.isNotBlank(restUrl)) {
tagRESTClient = new RangerRESTClient(restUrl, sslConfigFile);
- if(!isKerberized) {
+ if (!isKerberized) {
tagRESTClient.setBasicAuthInfo(userName, password);
}
+ // Build and cache REST client. This will catch any errors in building REST client up-front
+ tagRESTClient.getClient();
+
uploadWorkItems = new LinkedBlockingQueue<UploadWorkItem>();
ret = true;
} else {
LOG.error("No value specified for property 'ranger.tagsync.tagadmin.rest.url'!");
}
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("<== TagAdminRESTSink.initialize(), result=" + ret);
}
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java
deleted file mode 100644
index 77dee01..0000000
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.tagsync.source.atlas;
-
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.atlas.typesystem.IStruct;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-public class AtlasEntityWithTraits {
-
- private final IReferenceableInstance entity;
- private final List<IStruct> traits;
-
- public AtlasEntityWithTraits(IReferenceableInstance entity, List<IStruct> traits) {
- this.entity = entity;
- this.traits = traits;
- }
-
- public IReferenceableInstance getEntity() {
- return entity;
- }
-
- public List<IStruct> getAllTraits() {
- return traits == null ? new LinkedList<IStruct>() : traits;
- }
-
- @Override
- public String toString( ) {
- StringBuilder sb = new StringBuilder();
-
- toString(sb);
-
- return sb.toString();
- }
-
- public void toString(StringBuilder sb) {
-
- sb.append("AtlasEntityWithTraits={ ");
-
- sb.append("Entity-Id: " + entity.getId()._getId()).append(", ")
- .append("Entity-Type: " + entity.getTypeName()).append(", ")
- .append("Entity-Version: " + entity.getId().getVersion()).append(", ")
- .append("Entity-State: " + entity.getId().getStateAsString()).append(", ");
-
- sb.append("Entity-Values={ ");
- try {
- for (Map.Entry<String, Object> entry : entity.getValuesMap().entrySet()) {
- sb.append("{").append(entry.getKey()).append(", ").append(entry.getValue()).append("}, ");
- }
- } catch (AtlasException exception) {
- // Ignore
- }
- sb.append(" }");
-
- sb.append(", Entity-Traits={ ");
- for (IStruct trait : traits) {
- try {
- sb.append("{traitType=").append(trait.getTypeName()).append(", ");
- Map<String, Object> traitValues = trait.getValuesMap();
- sb.append("{");
- for (Map.Entry<String, Object> valueEntry : traitValues.entrySet()) {
- sb.append("{").append(valueEntry.getKey()).append(", ").append(valueEntry.getValue()).append("}");
- }
- sb.append("}");
-
- sb.append(" }");
- } catch (AtlasException exception) {
- // Ignore
- }
- }
- sb.append(" }");
-
- sb.append(" }");
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
index 8b36a31..33e804a 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
@@ -22,10 +22,10 @@ package org.apache.ranger.tagsync.source.atlas;
import java.util.Map;
import java.util.HashMap;
-import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.commons.lang.StringUtils;
import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
public class AtlasHbaseResourceMapper extends AtlasResourceMapper {
public static final String ENTITY_TYPE_HBASE_TABLE = "hbase_table";
@@ -36,10 +36,6 @@ public class AtlasHbaseResourceMapper extends AtlasResourceMapper {
public static final String RANGER_TYPE_HBASE_COLUMN_FAMILY = "column-family";
public static final String RANGER_TYPE_HBASE_COLUMN = "column";
- public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
- public static final String QUALIFIED_NAME_DELIMITER = "\\.";
- public static final Character QUALIFIED_NAME_DELIMITER_CHAR = '.';
-
public static final String[] SUPPORTED_ENTITY_TYPES = { ENTITY_TYPE_HBASE_TABLE, ENTITY_TYPE_HBASE_COLUMN_FAMILY, ENTITY_TYPE_HBASE_COLUMN };
public AtlasHbaseResourceMapper() {
@@ -47,8 +43,8 @@ public class AtlasHbaseResourceMapper extends AtlasResourceMapper {
}
@Override
- public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception {
- String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+ public RangerServiceResource buildResource(final RangerAtlasEntity entity) throws Exception {
+ String qualifiedName = (String)entity.getAttributes().get(AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME);
if (StringUtils.isEmpty(qualifiedName)) {
throw new Exception("attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity");
}
@@ -64,7 +60,7 @@ public class AtlasHbaseResourceMapper extends AtlasResourceMapper {
}
String entityType = entity.getTypeName();
- String entityGuid = entity.getId() != null ? entity.getId()._getId() : null;
+ String entityGuid = entity.getGuid();
String serviceName = getRangerServiceName(clusterName);
Map<String, RangerPolicyResource> elements = new HashMap<String, RangerPolicyResource>();
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
index 06bff90..378542c 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
@@ -22,12 +22,12 @@ package org.apache.ranger.tagsync.source.atlas;
import java.util.HashMap;
import java.util.Map;
-import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.ranger.plugin.model.RangerPolicy;
import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
public class AtlasHdfsResourceMapper extends AtlasResourceMapper {
public static final String ENTITY_TYPE_HDFS_PATH = "hdfs_path";
@@ -35,7 +35,6 @@ public class AtlasHdfsResourceMapper extends AtlasResourceMapper {
public static final String ENTITY_ATTRIBUTE_PATH = "path";
public static final String ENTITY_ATTRIBUTE_CLUSTER_NAME = "clusterName";
- public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
public static final String[] SUPPORTED_ENTITY_TYPES = { ENTITY_TYPE_HDFS_PATH };
@@ -56,10 +55,10 @@ public class AtlasHdfsResourceMapper extends AtlasResourceMapper {
}
@Override
- public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception {
- String path = getEntityAttribute(entity, ENTITY_ATTRIBUTE_PATH, String.class);
- String clusterName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_CLUSTER_NAME, String.class);
- String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+ public RangerServiceResource buildResource(final RangerAtlasEntity entity) throws Exception {
+ String path = (String)entity.getAttributes().get(ENTITY_ATTRIBUTE_PATH);
+ String clusterName = (String)entity.getAttributes().get(ENTITY_ATTRIBUTE_CLUSTER_NAME);
+ String qualifiedName = (String)entity.getAttributes().get(AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME);
if(StringUtils.isEmpty(path)) {
path = getResourceNameFromQualifiedName(qualifiedName);
@@ -81,7 +80,7 @@ public class AtlasHdfsResourceMapper extends AtlasResourceMapper {
}
}
- String entityGuid = entity.getId() != null ? entity.getId()._getId() : null;
+ String entityGuid = entity.getGuid();
String serviceName = getRangerServiceName(clusterName);
Boolean isExcludes = Boolean.FALSE;
Boolean isRecursive = Boolean.TRUE;
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
index a359622..3e0a97f 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
@@ -22,10 +22,10 @@ package org.apache.ranger.tagsync.source.atlas;
import java.util.Map;
import java.util.HashMap;
-import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.commons.lang.StringUtils;
import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
public class AtlasHiveResourceMapper extends AtlasResourceMapper {
public static final String ENTITY_TYPE_HIVE_DB = "hive_db";
@@ -36,9 +36,6 @@ public class AtlasHiveResourceMapper extends AtlasResourceMapper {
public static final String RANGER_TYPE_HIVE_TABLE = "table";
public static final String RANGER_TYPE_HIVE_COLUMN = "column";
- public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
- public static final String QUALIFIED_NAME_DELIMITER = "\\.";
-
public static final String[] SUPPORTED_ENTITY_TYPES = { ENTITY_TYPE_HIVE_DB, ENTITY_TYPE_HIVE_TABLE, ENTITY_TYPE_HIVE_COLUMN };
public AtlasHiveResourceMapper() {
@@ -46,8 +43,8 @@ public class AtlasHiveResourceMapper extends AtlasResourceMapper {
}
@Override
- public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception {
- String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+ public RangerServiceResource buildResource(final RangerAtlasEntity entity) throws Exception {
+ String qualifiedName = (String)entity.getAttributes().get(AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME);
if (StringUtils.isEmpty(qualifiedName)) {
throw new Exception("attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity");
}
@@ -63,7 +60,7 @@ public class AtlasHiveResourceMapper extends AtlasResourceMapper {
}
String entityType = entity.getTypeName();
- String entityGuid = entity.getId() != null ? entity.getId()._getId() : null;
+ String entityGuid = entity.getGuid();
String serviceName = getRangerServiceName(clusterName);
String[] resources = resourceStr.split(QUALIFIED_NAME_DELIMITER);
String dbName = resources.length > 0 ? resources[0] : null;
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
index 09ae5d1..86e37c3 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
@@ -22,18 +22,16 @@ package org.apache.ranger.tagsync.source.atlas;
import java.util.HashMap;
import java.util.Map;
-import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.commons.lang.StringUtils;
import org.apache.ranger.plugin.model.RangerPolicy;
import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
public class AtlasKafkaResourceMapper extends AtlasResourceMapper {
public static final String ENTITY_TYPE_KAFKA_TOPIC = "kafka_topic";
public static final String RANGER_TYPE_KAFKA_TOPIC = "topic";
- public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
-
public static final String[] SUPPORTED_ENTITY_TYPES = { ENTITY_TYPE_KAFKA_TOPIC };
public AtlasKafkaResourceMapper() {
@@ -41,12 +39,8 @@ public class AtlasKafkaResourceMapper extends AtlasResourceMapper {
}
@Override
- public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception {
- String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
-
- if(StringUtils.isEmpty(qualifiedName)) {
- throwExceptionWithMessage("attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity");
- }
+ public RangerServiceResource buildResource(final RangerAtlasEntity entity) throws Exception {
+ String qualifiedName = (String)entity.getAttributes().get(AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME);
String topic = getResourceNameFromQualifiedName(qualifiedName);
@@ -61,16 +55,17 @@ public class AtlasKafkaResourceMapper extends AtlasResourceMapper {
}
if(StringUtils.isEmpty(clusterName)) {
- throwExceptionWithMessage("Cluster name not found in attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "'");
+ throwExceptionWithMessage("attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity");
}
+
Map<String, RangerPolicyResource> elements = new HashMap<String, RangerPolicy.RangerPolicyResource>();
Boolean isExcludes = Boolean.FALSE;
Boolean isRecursive = Boolean.TRUE;
elements.put(RANGER_TYPE_KAFKA_TOPIC, new RangerPolicyResource(topic, isExcludes, isRecursive));
- String entityGuid = entity.getId() != null ? entity.getId()._getId() : null;
+ String entityGuid = entity.getGuid();
String serviceName = getRangerServiceName(clusterName);
return new RangerServiceResource(entityGuid, serviceName, elements);
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index f007ae5..91cf606 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -19,11 +19,9 @@
package org.apache.ranger.tagsync.source.atlas;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.notification.entity.EntityNotification;
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.atlas.typesystem.IStruct;
-import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
+import org.apache.atlas.v1.model.instance.Id;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
@@ -34,9 +32,10 @@ import org.apache.ranger.plugin.model.RangerTag;
import org.apache.ranger.plugin.model.RangerTagDef;
import org.apache.ranger.plugin.model.RangerTagDef.RangerTagAttributeDef;
import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntityWithTags;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -47,7 +46,7 @@ public class AtlasNotificationMapper {
private static Map<String, Long> unhandledEventTypes = new HashMap<String, Long>();
- private static void logUnhandledEntityNotification(EntityNotification entityNotification) {
+ private static void logUnhandledEntityNotification(EntityNotificationV1 entityNotification) {
final int REPORTING_INTERVAL_FOR_UNHANDLED_ENTITYTYPE_IN_MILLIS = 5 * 60 * 1000; // 5 minutes
@@ -77,39 +76,30 @@ public class AtlasNotificationMapper {
}
@SuppressWarnings("unchecked")
- public static ServiceTags processEntityNotification(EntityNotification entityNotification) {
+ public static ServiceTags processEntityNotification(EntityNotificationV1 entityNotification) {
ServiceTags ret = null;
if (isNotificationHandled(entityNotification)) {
try {
- IReferenceableInstance entity = entityNotification.getEntity();
-
- if (entity != null && AtlasResourceMapperUtil.isEntityTypeHandled(entity.getTypeName())) {
- AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(entity, entityNotification.getAllTraits());
- if (entityNotification.getOperationType() == EntityNotification.OperationType.ENTITY_DELETE) {
- ret = buildServiceTagsForEntityDeleteNotification(entityWithTraits);
- } else {
- if (entity.getId().getState() == Id.EntityState.ACTIVE) {
- ret = buildServiceTags(entityWithTraits, null);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring entityNotification for entity that is not ACTIVE: " + entityWithTraits);
- }
- }
- }
+ RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(entityNotification);
+
+ if (entityNotification.getOperationType() == EntityNotificationV1.OperationType.ENTITY_DELETE) {
+ ret = buildServiceTagsForEntityDeleteNotification(entityWithTags);
} else {
- logUnhandledEntityNotification(entityNotification);
+ ret = buildServiceTags(entityWithTags, null);
}
} catch (Exception exception) {
LOG.error("createServiceTags() failed!! ", exception);
}
+ } else {
+ logUnhandledEntityNotification(entityNotification);
}
return ret;
}
- public static Map<String, ServiceTags> processAtlasEntities(List<AtlasEntityWithTraits> atlasEntities) {
+ public static Map<String, ServiceTags> processAtlasEntities(List<RangerAtlasEntityWithTags> atlasEntities) {
Map<String, ServiceTags> ret = null;
try {
@@ -121,17 +111,16 @@ public class AtlasNotificationMapper {
return ret;
}
- static private boolean isNotificationHandled(EntityNotification entityNotification) {
+ static private boolean isNotificationHandled(EntityNotificationV1 entityNotification) {
boolean ret = false;
- EntityNotification.OperationType opType = entityNotification.getOperationType();
+ EntityNotificationV1.OperationType opType = entityNotification.getOperationType();
- if(opType != null) {
+ if (opType != null) {
switch (opType) {
- case ENTITY_CREATE: {
- LOG.debug("ENTITY_CREATE notification is not handled, as Ranger will get necessary information from any subsequent TRAIT_ADDED notification");
+ case ENTITY_CREATE:
+ ret = CollectionUtils.isNotEmpty(entityNotification.getAllTraits());
break;
- }
case ENTITY_UPDATE:
case ENTITY_DELETE:
case TRAIT_ADD:
@@ -142,30 +131,38 @@ public class AtlasNotificationMapper {
}
default:
LOG.error(opType + ": unknown notification received - not handled");
+ break;
+ }
+ if (ret) {
+ final Referenceable entity = entityNotification.getEntity();
+
+ ret = entity != null
+ && entity.getId().getState() == Id.EntityState.ACTIVE
+ && AtlasResourceMapperUtil.isEntityTypeHandled(entity.getTypeName());
}
}
return ret;
}
- static private ServiceTags buildServiceTagsForEntityDeleteNotification(AtlasEntityWithTraits entityWithTraits) throws Exception {
+ static private ServiceTags buildServiceTagsForEntityDeleteNotification(RangerAtlasEntityWithTags entityWithTags) throws Exception {
final ServiceTags ret;
- IReferenceableInstance entity = entityWithTraits.getEntity();
+ RangerAtlasEntity entity = entityWithTags.getEntity();
- String guid = entity.getId()._getId();
+ String guid = entity.getGuid();
if (StringUtils.isNotBlank(guid)) {
ret = new ServiceTags();
RangerServiceResource serviceResource = new RangerServiceResource();
serviceResource.setGuid(guid);
ret.getServiceResources().add(serviceResource);
} else {
- ret = buildServiceTags(entityWithTraits, null);
+ ret = buildServiceTags(entityWithTags, null);
if (ret != null) {
// tag-definitions should NOT be deleted as part of service-resource delete
- ret.setTagDefinitions(Collections.<Long, RangerTagDef>emptyMap());
+ ret.setTagDefinitions(MapUtils.EMPTY_MAP);
// Ranger deletes tags associated with deleted service-resource
- ret.setTags(Collections.<Long, RangerTag>emptyMap());
+ ret.setTags(MapUtils.EMPTY_MAP);
}
}
@@ -176,13 +173,13 @@ public class AtlasNotificationMapper {
return ret;
}
- static private Map<String, ServiceTags> buildServiceTags(List<AtlasEntityWithTraits> entitiesWithTraits) throws Exception {
+ static private Map<String, ServiceTags> buildServiceTags(List<RangerAtlasEntityWithTags> entitiesWithTags) throws Exception {
Map<String, ServiceTags> ret = new HashMap<String, ServiceTags>();
- for (AtlasEntityWithTraits element : entitiesWithTraits) {
- IReferenceableInstance entity = element.getEntity();
- if (entity != null && entity.getId().getState() == Id.EntityState.ACTIVE) {
+ for (RangerAtlasEntityWithTags element : entitiesWithTags) {
+ RangerAtlasEntity entity = element.getEntity();
+ if (entity != null) {
buildServiceTags(element, ret);
} else {
if (LOG.isDebugEnabled()) {
@@ -241,15 +238,15 @@ public class AtlasNotificationMapper {
return ret;
}
- static private ServiceTags buildServiceTags(AtlasEntityWithTraits entityWithTraits, Map<String, ServiceTags> serviceTagsMap) throws Exception {
+ static private ServiceTags buildServiceTags(RangerAtlasEntityWithTags entityWithTags, Map<String, ServiceTags> serviceTagsMap) throws Exception {
ServiceTags ret = null;
- IReferenceableInstance entity = entityWithTraits.getEntity();
+ RangerAtlasEntity entity = entityWithTags.getEntity();
RangerServiceResource serviceResource = AtlasResourceMapperUtil.getRangerServiceResource(entity);
if (serviceResource != null) {
- List<RangerTag> tags = getTags(entityWithTraits);
- List<RangerTagDef> tagDefs = getTagDefs(entityWithTraits);
+ List<RangerTag> tags = getTags(entityWithTags);
+ List<RangerTagDef> tagDefs = getTagDefs(entityWithTags);
String serviceName = serviceResource.getServiceName();
ret = createOrGetServiceTags(serviceTagsMap, serviceName);
@@ -279,12 +276,12 @@ public class AtlasNotificationMapper {
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Entity " + entityWithTraits + " does not have any tags associated with it when full-sync is being done.");
+ LOG.debug("Entity " + entityWithTags + " does not have any tags associated with it when full-sync is being done.");
LOG.debug("Will not add this entity to serviceTags, so that this entity, if exists, will be removed from ranger");
}
}
} else {
- LOG.error("Failed to build serviceResource for entity:" + entity.getId()._getId());
+ LOG.error("Failed to build serviceResource for entity:" + entity.getGuid());
}
return ret;
@@ -307,58 +304,33 @@ public class AtlasNotificationMapper {
return ret;
}
- static private List<RangerTag> getTags(AtlasEntityWithTraits entityWithTraits) {
+ static private List<RangerTag> getTags(RangerAtlasEntityWithTags entityWithTags) {
List<RangerTag> ret = new ArrayList<RangerTag>();
- if(entityWithTraits != null && CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) {
- List<IStruct> traits = entityWithTraits.getAllTraits();
-
- for (IStruct trait : traits) {
- Map<String, String> tagAttrs = new HashMap<String, String>();
+ if (entityWithTags != null && MapUtils.isNotEmpty(entityWithTags.getTags())) {
+ Map<String, Map<String, String>> tags = entityWithTags.getTags();
- try {
- Map<String, Object> attrs = trait.getValuesMap();
-
- if(MapUtils.isNotEmpty(attrs)) {
- for (Map.Entry<String, Object> attrEntry : attrs.entrySet()) {
- String attrName = attrEntry.getKey();
- Object attrValue = attrEntry.getValue();
-
- tagAttrs.put(attrName, attrValue != null ? attrValue.toString() : null);
- }
- }
- } catch (AtlasException exception) {
- LOG.error("Could not get values for trait:" + trait.getTypeName(), exception);
- }
-
- ret.add(new RangerTag(null, trait.getTypeName(), tagAttrs, RangerTag.OWNER_SERVICERESOURCE));
+ for (Map.Entry<String, Map<String, String>> tag : tags.entrySet()) {
+ ret.add(new RangerTag(null, tag.getKey(), tag.getValue(), RangerTag.OWNER_SERVICERESOURCE));
}
}
return ret;
}
- static private List<RangerTagDef> getTagDefs(AtlasEntityWithTraits entityWithTraits) {
+ static private List<RangerTagDef> getTagDefs(RangerAtlasEntityWithTags entityWithTags) {
List<RangerTagDef> ret = new ArrayList<RangerTagDef>();
- if(entityWithTraits != null && CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) {
- List<IStruct> traits = entityWithTraits.getAllTraits();
-
- for (IStruct trait : traits) {
- RangerTagDef tagDef = new RangerTagDef(trait.getTypeName(), "Atlas");
-
- try {
- Map<String, Object> attrs = trait.getValuesMap();
+ if (entityWithTags != null && MapUtils.isNotEmpty(entityWithTags.getTags())) {
+ Map<String, Map<String, String>> tags = entityWithTags.getTags();
- if(MapUtils.isNotEmpty(attrs)) {
- for (String attrName : attrs.keySet()) {
- tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string"));
- }
+ for (Map.Entry<String, Map<String, String>> tag : tags.entrySet()) {
+ RangerTagDef tagDef = new RangerTagDef(tag.getKey(), "Atlas");
+ if (MapUtils.isNotEmpty(tag.getValue())) {
+ for (String attributeName : tag.getValue().keySet()) {
+ tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attributeName, entityWithTags.getTagAttributeType(tag.getKey(), attributeName)));
}
- } catch (AtlasException exception) {
- LOG.error("Could not get values for trait:" + trait.getTypeName(), exception);
}
-
ret.add(tagDef);
}
}
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
index 8ececdf..5d067a5 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
@@ -20,19 +20,20 @@
package org.apache.ranger.tagsync.source.atlas;
import java.util.Properties;
-import java.util.Map;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
public abstract class AtlasResourceMapper {
private static final Log LOG = LogFactory.getLog(AtlasResourceMapper.class);
public static final String TAGSYNC_DEFAULT_CLUSTER_NAME = "ranger.tagsync.atlas.default.cluster.name";
+ public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+ public static final String QUALIFIED_NAME_DELIMITER = "\\.";
+ public static final Character QUALIFIED_NAME_DELIMITER_CHAR = '.';
protected static final String TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX = "ranger.tagsync.atlas.";
protected static final String TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX = ".ranger.service";
@@ -73,7 +74,7 @@ public abstract class AtlasResourceMapper {
this.defaultClusterName = properties != null ? properties.getProperty(TAGSYNC_DEFAULT_CLUSTER_NAME) : null;
}
- abstract public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception;
+ abstract public RangerServiceResource buildResource(final RangerAtlasEntity entity) throws Exception;
protected String getCustomRangerServiceName(String atlasInstanceName) {
if(properties != null) {
@@ -118,21 +119,4 @@ public abstract class AtlasResourceMapper {
throw new Exception(msg);
}
-
- static protected <T> T getEntityAttribute(IReferenceableInstance entity, String name, Class<T> type) {
- T ret = null;
-
- try {
- Map<String, Object> valueMap = entity.getValuesMap();
- ret = getAttribute(valueMap, name, type);
- } catch (AtlasException exception) {
- LOG.error("Cannot get map of values for entity: " + entity.getId()._getId(), exception);
- }
-
- return ret;
- }
-
- static protected <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) {
- return type.cast(map.get(name));
- }
}
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
index 40a639b..cd2cb63 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
@@ -19,7 +19,6 @@
package org.apache.ranger.tagsync.source.atlas;
-import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.commons.lang.StringUtils;
import org.apache.ranger.plugin.model.RangerServiceResource;
@@ -28,14 +27,13 @@ import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.tagsync.process.TagSyncConfig;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
public class AtlasResourceMapperUtil {
private static final Log LOG = LogFactory.getLog(AtlasResourceMapperUtil.class);
private static Map<String, AtlasResourceMapper> atlasResourceMappers = new HashMap<String, AtlasResourceMapper>();
- private static final String MAPPER_NAME_DELIMITER = ",";
-
public static boolean isEntityTypeHandled(String entityTypeName) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> isEntityTypeHandled(entityTypeName=" + entityTypeName + ")");
@@ -52,9 +50,9 @@ public class AtlasResourceMapperUtil {
return ret;
}
- public static RangerServiceResource getRangerServiceResource(IReferenceableInstance atlasEntity) {
+ public static RangerServiceResource getRangerServiceResource(RangerAtlasEntity atlasEntity) {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> getRangerServiceResource(" + atlasEntity.getId()._getId() +")");
+ LOG.debug("==> getRangerServiceResource(" + atlasEntity.getGuid() +")");
}
RangerServiceResource resource = null;
@@ -65,59 +63,63 @@ public class AtlasResourceMapperUtil {
try {
resource = mapper.buildResource(atlasEntity);
} catch (Exception exception) {
- LOG.error("Could not get serviceResource for atlas entity:" + atlasEntity.getId()._getId() + ": ", exception);
+ LOG.error("Could not get serviceResource for atlas entity:" + atlasEntity.getGuid() + ": ", exception);
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug("<== getRangerServiceResource(" + atlasEntity.getId()._getId() +"): resource=" + resource);
+ LOG.debug("<== getRangerServiceResource(" + atlasEntity.getGuid() +"): resource=" + resource);
}
return resource;
}
static public boolean initializeAtlasResourceMappers(Properties properties) {
+ final String MAPPER_NAME_DELIMITER = ",";
+
String customMapperNames = TagSyncConfig.getCustomAtlasResourceMappers(properties);
if (LOG.isDebugEnabled()) {
LOG.debug("==> initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + customMapperNames + ")");
}
+ boolean ret = true;
- // Initialize the default mappers
- initializeAtlasResourceMapper(new AtlasHiveResourceMapper(), properties);
- initializeAtlasResourceMapper(new AtlasHdfsResourceMapper(), properties);
- initializeAtlasResourceMapper(new AtlasHbaseResourceMapper(), properties);
- initializeAtlasResourceMapper(new AtlasKafkaResourceMapper(), properties);
- initializeAtlasResourceMapper(new AtlasStormResourceMapper(), properties);
+ List<String> mapperNames = new ArrayList<String>();
+ mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasHiveResourceMapper");
+ mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasHdfsResourceMapper");
+ mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasHbaseResourceMapper");
+ mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasKafkaResourceMapper");
- // Initialize the custom mappers
- boolean ret = true;
if (StringUtils.isNotBlank(customMapperNames)) {
for (String customMapperName : customMapperNames.split(MAPPER_NAME_DELIMITER)) {
- try {
- Class<?> clazz = Class.forName(customMapperName);
- AtlasResourceMapper resourceMapper = (AtlasResourceMapper) clazz.newInstance();
-
- initializeAtlasResourceMapper(resourceMapper, properties);
- } catch (Exception exception) {
- LOG.error("Failed to create AtlasResourceMapper:" + customMapperName + ": ", exception);
- ret = false;
- }
+ mapperNames.add(customMapperName.trim());
+ }
+ }
+
+ for (String mapperName : mapperNames) {
+ try {
+ Class<?> clazz = Class.forName(mapperName);
+ AtlasResourceMapper resourceMapper = (AtlasResourceMapper) clazz.newInstance();
+
+ resourceMapper.initialize(properties);
+
+ for (String entityTypeName : resourceMapper.getSupportedEntityTypes()) {
+ add(entityTypeName, resourceMapper);
+ }
+
+ } catch (Exception exception) {
+ LOG.error("Failed to create AtlasResourceMapper:" + mapperName + ": ", exception);
+ ret = false;
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug("<== initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + customMapperNames + "): " + ret);
+ LOG.debug("<== initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + mapperNames + "): " + ret);
}
return ret;
}
- private static void initializeAtlasResourceMapper(AtlasResourceMapper resourceMapper, Properties properties) {
- resourceMapper.initialize(properties);
-
- for (String entityTypeName : resourceMapper.getSupportedEntityTypes()) {
- atlasResourceMappers.put(entityTypeName, resourceMapper);
- }
+ private static void add(String entityType, AtlasResourceMapper mapper) {
+ atlasResourceMappers.put(entityType, mapper);
}
-
}
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasStormResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasStormResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasStormResourceMapper.java
index 4ed01ca..650968d 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasStormResourceMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasStormResourceMapper.java
@@ -22,17 +22,15 @@ package org.apache.ranger.tagsync.source.atlas;
import java.util.HashMap;
import java.util.Map;
-import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.commons.lang.StringUtils;
import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
public class AtlasStormResourceMapper extends AtlasResourceMapper {
public static final String ENTITY_TYPE_STORM_TOPOLOGY = "storm_topology";
public static final String RANGER_TYPE_STORM_TOPOLOGY = "topology";
- public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
-
public static final String[] SUPPORTED_ENTITY_TYPES = { ENTITY_TYPE_STORM_TOPOLOGY };
public AtlasStormResourceMapper() {
@@ -40,8 +38,8 @@ public class AtlasStormResourceMapper extends AtlasResourceMapper {
}
@Override
- public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception {
- String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+ public RangerServiceResource buildResource(final RangerAtlasEntity entity) throws Exception {
+ String qualifiedName = (String)entity.getAttributes().get(AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME);
String topology = getResourceNameFromQualifiedName(qualifiedName);
@@ -65,7 +63,7 @@ public class AtlasStormResourceMapper extends AtlasResourceMapper {
elements.put(RANGER_TYPE_STORM_TOPOLOGY, new RangerPolicyResource(topology, isExcludes, isRecursive));
- String entityGuid = entity.getId() != null ? entity.getId()._getId() : null;
+ String entityGuid = entity.getGuid();
String serviceName = getRangerServiceName(clusterName);
return new RangerServiceResource(entityGuid, serviceName, elements);
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index c382db0..8c15ee5 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -20,24 +20,24 @@
package org.apache.ranger.tagsync.source.atlas;
-import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.kafka.NotificationProvider;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import org.apache.atlas.notification.NotificationConsumer;
-import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.entity.EntityNotification;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.ranger.tagsync.model.AbstractTagSource;
import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.model.AbstractTagSource;
+import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntityWithTags;
+
import java.io.IOException;
import java.io.InputStream;
-import java.util.Properties;
import java.util.List;
+import java.util.Properties;
public class AtlasTagSource extends AbstractTagSource {
private static final Log LOG = LogFactory.getLog(AtlasTagSource.class);
@@ -100,10 +100,11 @@ public class AtlasTagSource extends AbstractTagSource {
}
if (ret) {
- NotificationInterface notification = NotificationProvider.get();
- List<NotificationConsumer<Object>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
+ NotificationInterface notification = NotificationProvider.get();
+ List<NotificationConsumer<EntityNotificationV1>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
+
+ consumerTask = new ConsumerRunnable(iterators.get(0));
- consumerTask = new ConsumerRunnable(iterators.get(0));
}
if (LOG.isDebugEnabled()) {
@@ -137,63 +138,59 @@ public class AtlasTagSource extends AbstractTagSource {
}
}
- private static String getPrintableEntityNotification(EntityNotification notification) {
+ private static String getPrintableEntityNotification(EntityNotificationV1 notification) {
StringBuilder sb = new StringBuilder();
sb.append("{ Notification-Type: ").append(notification.getOperationType()).append(", ");
- AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(notification.getEntity(), notification.getAllTraits());
- sb.append(entityWithTraits.toString());
+ RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(notification);
+ sb.append(entityWithTags.toString());
+
sb.append("}");
return sb.toString();
}
private class ConsumerRunnable implements Runnable {
- private final NotificationConsumer<Object> consumer;
+ private final NotificationConsumer<EntityNotificationV1> consumer;
- private ConsumerRunnable(NotificationConsumer<Object> consumer) {
+ private ConsumerRunnable(NotificationConsumer<EntityNotificationV1> consumer) {
this.consumer = consumer;
}
+
@Override
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> ConsumerRunnable.run()");
}
while (true) {
- try {
- List<AtlasKafkaMessage<Object>> messages = consumer.receive(1000L);
- for (AtlasKafkaMessage<Object> message : messages) {
- Object kafkaMessage = message != null ? message.getMessage() : null;
-
- if (kafkaMessage != null) {
- EntityNotification notification = null;
- if (kafkaMessage instanceof EntityNotification) {
- notification = (EntityNotification) kafkaMessage;
- } else {
- LOG.warn("Received Kafka notification of unexpected type:[" + kafkaMessage.getClass().toString() + "], Ignoring...");
- }
- if (notification != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Notification=" + getPrintableEntityNotification(notification));
- }
-
- ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification);
- if (serviceTags != null) {
- updateSink(serviceTags);
- }
- }
- TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
- consumer.commit(partition, message.getOffset());
- } else {
- LOG.error("Null message received from Kafka!! Ignoring..");
- }
- }
- } catch (Exception exception) {
- LOG.error("Caught exception..: ", exception);
- return;
- }
- }
+ try {
+ List<AtlasKafkaMessage<EntityNotificationV1>> messages = consumer.receive(1000L);
+
+ for (AtlasKafkaMessage<EntityNotificationV1> message : messages) {
+ EntityNotificationV1 notification = message != null ? message.getMessage() : null;
+
+ if (notification != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Notification=" + getPrintableEntityNotification(notification));
+ }
+
+ ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification);
+ if (serviceTags != null) {
+ updateSink(serviceTags);
+ }
+
+ TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+ consumer.commit(partition, message.getOffset());
+ } else {
+ LOG.error("Null entityNotification received from Kafka!! Ignoring..");
+ }
+ }
+ } catch (Exception exception) {
+ LOG.error("Caught exception..: ", exception);
+ return;
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ranger/blob/2626e5cc/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
index 4e0ae90..b715869 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
@@ -20,35 +20,67 @@
package org.apache.ranger.tagsync.source.atlasrest;
import com.google.gson.Gson;
-
import com.google.gson.GsonBuilder;
-
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.type.AtlasBuiltInTypes;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.plugin.util.RangerRESTClient;
-import org.apache.ranger.tagsync.model.AbstractTagSource;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.model.AbstractTagSource;
import org.apache.ranger.tagsync.model.TagSink;
import org.apache.ranger.tagsync.process.TagSyncConfig;
import org.apache.ranger.tagsync.process.TagSynchronizer;
-import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits;
import org.apache.ranger.tagsync.source.atlas.AtlasNotificationMapper;
import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import java.util.TimeZone;
public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
private static final Log LOG = LogFactory.getLog(AtlasRESTTagSource.class);
- private long sleepTimeBetweenCycleInMillis;
+ private static final ThreadLocal<DateFormat> DATE_FORMATTER = new ThreadLocal<DateFormat>() {
+ @Override
+ protected DateFormat initialValue() {
+ SimpleDateFormat dateFormat = new SimpleDateFormat(AtlasBaseTypeDef.SERIALIZED_DATE_FORMAT_STR);
+
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- private AtlasRESTUtil atlasRESTUtil = null;
+ return dateFormat;
+ }
+ };
+
+ private long sleepTimeBetweenCycleInMillis;
+ private String[] restUrls = null;
+ private boolean isKerberized = false;
+ private String[] userNamePassword = null;
private Thread myThread = null;
@@ -95,30 +127,26 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
boolean ret = AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties);
sleepTimeBetweenCycleInMillis = TagSyncConfig.getTagSourceAtlasDownloadIntervalInMillis(properties);
- final boolean isKerberized = TagSyncConfig.getTagsyncKerberosIdentity(properties) != null;
+ isKerberized = TagSyncConfig.getTagsyncKerberosIdentity(properties) != null;
- String restUrl = TagSyncConfig.getAtlasRESTEndpoint(properties);
+ String restEndpoint = TagSyncConfig.getAtlasRESTEndpoint(properties);
String sslConfigFile = TagSyncConfig.getAtlasRESTSslConfigFile(properties);
- String userName = TagSyncConfig.getAtlasRESTUserName(properties);
- String password = TagSyncConfig.getAtlasRESTPassword(properties);
+ this.userNamePassword = new String[] { TagSyncConfig.getAtlasRESTUserName(properties), TagSyncConfig.getAtlasRESTPassword(properties) };
if (LOG.isDebugEnabled()) {
- LOG.debug("restUrl=" + restUrl);
+ LOG.debug("restUrl=" + restEndpoint);
LOG.debug("sslConfigFile=" + sslConfigFile);
- LOG.debug("userName=" + userName);
+ LOG.debug("userName=" + userNamePassword[0]);
LOG.debug("kerberized=" + isKerberized);
}
-
- if (StringUtils.isNotEmpty(restUrl)) {
- if (!restUrl.endsWith("/")) {
- restUrl += "/";
- }
- RangerRESTClient atlasRESTClient = new RangerRESTClient(restUrl, sslConfigFile);
-
- if (!isKerberized) {
- atlasRESTClient.setBasicAuthInfo(userName, password);
- }
- atlasRESTUtil = new AtlasRESTUtil(atlasRESTClient, isKerberized);
+ if (StringUtils.isNotEmpty(restEndpoint)) {
+ this.restUrls = restEndpoint.split(",");
+
+ for (int i = 0; i < restUrls.length; i++) {
+ if (!restUrls[i].endsWith("/")) {
+ restUrls[i] += "/";
+ }
+ }
} else {
LOG.info("AtlasEndpoint not specified, Initial download of Atlas-entities cannot be done.");
ret = false;
@@ -174,16 +202,15 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
public void synchUp() {
- List<AtlasEntityWithTraits> atlasEntities = atlasRESTUtil.getAtlasEntities();
+ List<RangerAtlasEntityWithTags> rangerAtlasEntities = getAtlasActiveEntities();
- if (CollectionUtils.isNotEmpty(atlasEntities)) {
+ if (CollectionUtils.isNotEmpty(rangerAtlasEntities)) {
if (LOG.isDebugEnabled()) {
- for (AtlasEntityWithTraits element : atlasEntities) {
+ for (RangerAtlasEntityWithTags element : rangerAtlasEntities) {
LOG.debug(element);
}
}
-
- Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processAtlasEntities(atlasEntities);
+ Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processAtlasEntities(rangerAtlasEntities);
if (MapUtils.isNotEmpty(serviceTagsMap)) {
for (Map.Entry<String, ServiceTags> entry : serviceTagsMap.entrySet()) {
@@ -202,5 +229,158 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
}
+ private List<RangerAtlasEntityWithTags> getAtlasActiveEntities() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> getAtlasActiveEntities()");
+ }
+ List<RangerAtlasEntityWithTags> ret = null;
+
+ SearchParameters searchParams = new SearchParameters();
+ AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
+ AtlasTypeRegistry.AtlasTransientTypeRegistry tty = null;
+ AtlasSearchResult searchResult = null;
+
+ searchParams.setClassification("*");
+ searchParams.setIncludeClassificationAttributes(true);
+ searchParams.setOffset(0);
+ searchParams.setLimit(Integer.MAX_VALUE);
+
+ boolean commitUpdates = false;
+ try {
+ AtlasClientV2 atlasClient = getAtlasClient();
+ searchResult = atlasClient.facetedSearch(searchParams);
+ AtlasTypesDef typesDef = atlasClient.getAllTypeDefs(new SearchFilter());
+ tty = typeRegistry.lockTypeRegistryForUpdate();
+ tty.addTypes(typesDef);
+ commitUpdates = true;
+ } catch (AtlasServiceException | AtlasBaseException | IOException excp) {
+ LOG.error("failed to download tags from Atlas", excp);
+ } catch (Exception unexpectedException) {
+ LOG.error("Failed to download tags from Atlas due to unexpected exception", unexpectedException);
+ } finally {
+ if (tty != null) {
+ typeRegistry.releaseTypeRegistryForUpdate(tty, commitUpdates);
+ }
+ }
+
+ if (commitUpdates && searchResult != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(AtlasType.toJson(searchResult));
+ }
+ ret = new ArrayList<>();
+ List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
+ if (CollectionUtils.isNotEmpty(entityHeaders)) {
+ for (AtlasEntityHeader header : entityHeaders) {
+ if (!header.getStatus().equals(AtlasEntity.Status.ACTIVE)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping entity because it is not ACTIVE, header:[" + header + "]");
+ }
+ continue;
+ }
+
+ String typeName = header.getTypeName();
+ if (!AtlasResourceMapperUtil.isEntityTypeHandled(typeName)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not fetching Atlas entities of type:[" + typeName + "]");
+ }
+ continue;
+ }
+
+ Map<String, Map<String, String>> allTagsForEntity = new HashMap<>();
+
+ for (AtlasClassification classification : header.getClassifications()) {
+ Map<String, Map<String, String>> tags = resolveTag(typeRegistry, classification.getTypeName(), classification.getAttributes());
+ if (tags != null) {
+ allTagsForEntity.putAll(tags);
+ }
+ }
+
+ if (MapUtils.isNotEmpty(allTagsForEntity)) {
+
+ RangerAtlasEntity entity = new RangerAtlasEntity(typeName, header.getGuid(), header.getAttributes());
+ RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(entity, allTagsForEntity, typeRegistry);
+ ret.add(entityWithTags);
+ }
+ }
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== getAtlasActiveEntities()");
+ }
+
+ return ret;
+ }
+
+ /*
+ * Returns a map of <tag-name, List<attributeName, [attributeValue, attributeType]>>
+ */
+ private Map<String, Map<String, String>> resolveTag(AtlasTypeRegistry typeRegistry, String typeName, Map<String, Object> attributes) {
+ Map<String, Map<String, String>> ret = new HashMap<>();
+
+ try {
+ AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(typeName);
+ if (classificationType != null) {
+ Map<String, String> allAttributes = new HashMap<>();
+ if (MapUtils.isNotEmpty(attributes) && MapUtils.isNotEmpty(classificationType.getAllAttributes())) {
+ for (Map.Entry<String, Object> attribute : attributes.entrySet()) {
+ String name = attribute.getKey();
+ Object value = attribute.getValue();
+ if (value != null) {
+ String stringValue = value.toString();
+ AtlasStructType.AtlasAttribute atlasAttribute = classificationType.getAttribute(name);
+ if (atlasAttribute != null) {
+ if (value instanceof Number) {
+ if (atlasAttribute.getAttributeType() instanceof AtlasBuiltInTypes.AtlasDateType) {
+ stringValue = DATE_FORMATTER.get().format(value);
+ }
+ }
+ allAttributes.put(name, stringValue);
+ }
+ }
+ }
+ }
+ // Put most derived classificationType with all attributes
+ ret.put(typeName, allAttributes);
+
+ // Find base classification types
+ Set<String> superTypeNames = classificationType.getAllSuperTypes();
+ for (String superTypeName : superTypeNames) {
+ AtlasClassificationType superType = typeRegistry.getClassificationTypeByName(superTypeName);
+ if (superType != null) {
+ Map<String, String> attributeMap = new HashMap<>();
+ if (MapUtils.isNotEmpty(attributes) && MapUtils.isNotEmpty(superType.getAllAttributes())) {
+ for (String name : superType.getAllAttributes().keySet()) {
+ String stringValue = allAttributes.get(name);
+ if (stringValue != null) {
+ attributeMap.put(name, stringValue);
+ }
+ }
+ }
+ ret.put(superTypeName, attributeMap);
+ }
+ }
+ }
+ } catch (Exception exception) {
+ LOG.error("Error in resolving tags for type:[" + typeName + "]", exception);
+ }
+ return ret;
+ }
+
+ private AtlasClientV2 getAtlasClient() throws IOException {
+ final AtlasClientV2 ret;
+
+ if (isKerberized) {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+
+ ugi.checkTGTAndReloginFromKeytab();
+
+ ret = new AtlasClientV2(ugi, ugi.getShortUserName(), restUrls);
+ } else {
+ ret = new AtlasClientV2(restUrls, userNamePassword);
+ }
+
+ return ret;
+ }
}