You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2016/11/26 10:50:35 UTC
incubator-ranger git commit: RANGER-1233: TagSync updated to support
Atlas notifications for Kafka and HBase entities
Repository: incubator-ranger
Updated Branches:
refs/heads/master 35ff7ee79 -> de8f444d2
RANGER-1233: TagSync updated to support Atlas notifications for Kafka and HBase entities
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/de8f444d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/de8f444d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/de8f444d
Branch: refs/heads/master
Commit: de8f444d27eebfa633ad88e14811bfc1e5d39bd7
Parents: 35ff7ee
Author: Abhay Kulkarni <ak...@hortonworks.com>
Authored: Tue Nov 22 17:01:07 2016 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Sat Nov 26 02:49:30 2016 -0800
----------------------------------------------------------------------
.../source/atlas/AtlasHbaseResourceMapper.java | 139 ++++++++++
.../source/atlas/AtlasKafkaResourceMapper.java | 77 ++++++
.../source/atlas/AtlasResourceMapperUtil.java | 2 +
.../process/TestHbaseResourceMapper.java | 251 +++++++++++++++++++
.../process/TestKafkaResourceMapper.java | 123 +++++++++
5 files changed, 592 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/de8f444d/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
new file mode 100644
index 0000000..8b36a31
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.commons.lang.StringUtils;
+import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
+import org.apache.ranger.plugin.model.RangerServiceResource;
+
+public class AtlasHbaseResourceMapper extends AtlasResourceMapper {
+ public static final String ENTITY_TYPE_HBASE_TABLE = "hbase_table";
+ public static final String ENTITY_TYPE_HBASE_COLUMN_FAMILY = "hbase_column_family";
+ public static final String ENTITY_TYPE_HBASE_COLUMN = "hbase_column";
+
+ public static final String RANGER_TYPE_HBASE_TABLE = "table";
+ public static final String RANGER_TYPE_HBASE_COLUMN_FAMILY = "column-family";
+ public static final String RANGER_TYPE_HBASE_COLUMN = "column";
+
+ public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+ public static final String QUALIFIED_NAME_DELIMITER = "\\.";
+ public static final Character QUALIFIED_NAME_DELIMITER_CHAR = '.';
+
+ public static final String[] SUPPORTED_ENTITY_TYPES = { ENTITY_TYPE_HBASE_TABLE, ENTITY_TYPE_HBASE_COLUMN_FAMILY, ENTITY_TYPE_HBASE_COLUMN };
+
+ public AtlasHbaseResourceMapper() {
+ super("hbase", SUPPORTED_ENTITY_TYPES);
+ }
+
+ @Override
+ public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception {
+ String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+ if (StringUtils.isEmpty(qualifiedName)) {
+ throw new Exception("attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity");
+ }
+
+ String resourceStr = getResourceNameFromQualifiedName(qualifiedName);
+ if (StringUtils.isEmpty(resourceStr)) {
+ throwExceptionWithMessage("resource not found in attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "': " + qualifiedName);
+ }
+
+ String clusterName = getClusterNameFromQualifiedName(qualifiedName);
+ if (StringUtils.isEmpty(clusterName)) {
+ throwExceptionWithMessage("cluster-name not found in attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "': " + qualifiedName);
+ }
+
+ String entityType = entity.getTypeName();
+ String entityGuid = entity.getId() != null ? entity.getId()._getId() : null;
+ String serviceName = getRangerServiceName(clusterName);
+
+ Map<String, RangerPolicyResource> elements = new HashMap<String, RangerPolicyResource>();
+
+ if (StringUtils.equals(entityType, ENTITY_TYPE_HBASE_TABLE)) {
+ String tblName = resourceStr;
+ if (StringUtils.isNotEmpty(tblName)) {
+ elements.put(RANGER_TYPE_HBASE_TABLE, new RangerPolicyResource(tblName));
+ }
+ } else if (StringUtils.equals(entityType, ENTITY_TYPE_HBASE_COLUMN_FAMILY)) {
+ String[] resources = resourceStr.split(QUALIFIED_NAME_DELIMITER);
+ String tblName = null;
+ String familyName = null;
+
+ if (resources.length == 2) {
+ tblName = resources[0];
+ familyName = resources[1];
+ } else if (resources.length > 2) {
+ StringBuffer tblNameBuf = new StringBuffer(resources[0]);
+
+ for (int i = 1; i < resources.length - 1; i++) {
+ tblNameBuf.append(QUALIFIED_NAME_DELIMITER_CHAR).append(resources[i]);
+ }
+
+ tblName = tblNameBuf.toString();
+ familyName = resources[resources.length - 1];
+ }
+
+ if (StringUtils.isNotEmpty(tblName) && StringUtils.isNotEmpty(familyName)) {
+ elements.put(RANGER_TYPE_HBASE_TABLE, new RangerPolicyResource(tblName));
+ elements.put(RANGER_TYPE_HBASE_COLUMN_FAMILY, new RangerPolicyResource(familyName));
+ }
+ } else if (StringUtils.equals(entityType, ENTITY_TYPE_HBASE_COLUMN)) {
+ String[] resources = resourceStr.split(QUALIFIED_NAME_DELIMITER);
+ String tblName = null;
+ String familyName = null;
+ String colName = null;
+
+ if (resources.length == 3) {
+ tblName = resources[0];
+ familyName = resources[1];
+ colName = resources[2];
+ } else if (resources.length > 3) {
+ StringBuffer tblNameBuf = new StringBuffer(resources[0]);
+
+ for (int i = 1; i < resources.length - 2; i++) {
+ tblNameBuf.append(QUALIFIED_NAME_DELIMITER_CHAR).append(resources[i]);
+ }
+
+ tblName = tblNameBuf.toString();
+ familyName = resources[resources.length - 2];
+ colName = resources[resources.length - 1];
+ }
+
+ if (StringUtils.isNotEmpty(tblName) && StringUtils.isNotEmpty(familyName) && StringUtils.isNotEmpty(colName)) {
+ elements.put(RANGER_TYPE_HBASE_TABLE, new RangerPolicyResource(tblName));
+ elements.put(RANGER_TYPE_HBASE_COLUMN_FAMILY, new RangerPolicyResource(familyName));
+ elements.put(RANGER_TYPE_HBASE_COLUMN, new RangerPolicyResource(colName));
+ }
+ } else {
+ throwExceptionWithMessage("unrecognized entity-type: " + entityType);
+ }
+
+ if(elements.isEmpty()) {
+ throwExceptionWithMessage("invalid qualifiedName for entity-type '" + entityType + "': " + qualifiedName);
+ }
+
+ RangerServiceResource ret = new RangerServiceResource(entityGuid, serviceName, elements);
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/de8f444d/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
new file mode 100644
index 0000000..272328e
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.commons.lang.StringUtils;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
+import org.apache.ranger.plugin.model.RangerServiceResource;
+
+public class AtlasKafkaResourceMapper extends AtlasResourceMapper {
+ public static final String ENTITY_TYPE_KAFKA_TOPIC = "kafka_topic";
+ public static final String RANGER_TYPE_KAFKA_TOPIC = "topic";
+
+ public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+
+ public static final String[] SUPPORTED_ENTITY_TYPES = { ENTITY_TYPE_KAFKA_TOPIC };
+
+ public AtlasKafkaResourceMapper() {
+ super("kafka", SUPPORTED_ENTITY_TYPES);
+ }
+
+ @Override
+ public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception {
+ String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+
+ String topic = getResourceNameFromQualifiedName(qualifiedName);
+
+ if(StringUtils.isEmpty(topic)) {
+ throwExceptionWithMessage("topic not found in attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "'");
+ }
+
+ String clusterName = getClusterNameFromQualifiedName(qualifiedName);
+
+ if(StringUtils.isEmpty(clusterName)) {
+ clusterName = defaultClusterName;
+ }
+
+ if(StringUtils.isEmpty(clusterName)) {
+ throwExceptionWithMessage("attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity");
+ }
+
+
+ Map<String, RangerPolicyResource> elements = new HashMap<String, RangerPolicy.RangerPolicyResource>();
+ Boolean isExcludes = Boolean.FALSE;
+ Boolean isRecursive = Boolean.TRUE;
+
+ elements.put(RANGER_TYPE_KAFKA_TOPIC, new RangerPolicyResource(topic, isExcludes, isRecursive));
+
+ String entityGuid = entity.getId() != null ? entity.getId()._getId() : null;
+ String serviceName = getRangerServiceName(clusterName);
+
+ RangerServiceResource ret = new RangerServiceResource(entityGuid, serviceName, elements);
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/de8f444d/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
index 14b2001..b8f5c46 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
@@ -87,6 +87,8 @@ public class AtlasResourceMapperUtil {
List<String> mapperNames = new ArrayList<String>();
mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasHiveResourceMapper");
mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasHdfsResourceMapper");
+ mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasHbaseResourceMapper");
+ mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasKafkaResourceMapper");
if (StringUtils.isNotBlank(customMapperNames)) {
for(String customMapperName : customMapperNames.split(MAPPER_NAME_DELIMIER)) {
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/de8f444d/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestHbaseResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestHbaseResourceMapper.java b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestHbaseResourceMapper.java
new file mode 100644
index 0000000..e990c28
--- /dev/null
+++ b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestHbaseResourceMapper.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.process;
+
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.source.atlas.AtlasHbaseResourceMapper;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestHbaseResourceMapper {
+ private static final String TABLE_QUALIFIED_NAME = "table@cl1";
+ private static final String COLUMN_FAMILY_QUALIFIED_NAME = "table.family@cl1";
+ private static final String COLUMN_QUALIFIED_NAME = "table.family.column@cl1";
+
+ private static final String DOTTED_TABLE_QUALIFIED_NAME = "table.prefix.1@cl1";
+ private static final String DOTTED_COLUMN_FAMILY_QUALIFIED_NAME = "table.prefix.1.family@cl1";
+ private static final String DOTTED_COLUMN_QUALIFIED_NAME = "table.prefix.1.family.column@cl1";
+
+ private static final String SERVICE_NAME = "cl1_hbase";
+ private static final String RANGER_TABLE = "table";
+ private static final String RANGER_COLUMN_FAMILY = "family";
+ private static final String RANGER_COLUMN = "column";
+
+ private static final String DOTTED_RANGER_TABLE = "table.prefix.1";
+
+ AtlasHbaseResourceMapper resourceMapper = new AtlasHbaseResourceMapper();
+
+ @Test
+ public void testHbaseTable() throws Exception {
+ Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+ entAttribs.put(AtlasHbaseResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, TABLE_QUALIFIED_NAME);
+
+ IReferenceableInstance entity = getHbaseTableEntity(entAttribs);
+ RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+ assertTableResource(resource, false);
+ }
+
+ @Test
+ public void testHbaseColumnFamily() throws Exception {
+ Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+ entAttribs.put(AtlasHbaseResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, COLUMN_FAMILY_QUALIFIED_NAME);
+
+ IReferenceableInstance entity = getHbaseColumnFamilyEntity(entAttribs);
+ RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+ assertColumnFamilyResource(resource, false);
+ }
+
+ @Test
+ public void testHbaseColumn() throws Exception {
+ Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+ entAttribs.put(AtlasHbaseResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, COLUMN_QUALIFIED_NAME);
+
+ IReferenceableInstance entity = getHbaseColumnEntity(entAttribs);
+ RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+ assertColumnResource(resource, false);
+ }
+
+ @Test
+ public void testHbaseResourceFromMissingAttribs() throws Exception {
+ Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+ IReferenceableInstance entity = getHbaseTableEntity(entAttribs);
+
+ try {
+ RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+ Assert.fail("expected exception. Found " + resource);
+ } catch(Exception excp) {
+ // ignore
+ }
+ }
+
+ @Test
+ public void testHbaseResourceFromMissingColumnFamilyName() throws Exception {
+ Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+ entAttribs.put(AtlasHbaseResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, TABLE_QUALIFIED_NAME);
+
+ IReferenceableInstance entity = getHbaseColumnFamilyEntity(entAttribs);
+
+ try {
+ RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+ Assert.fail("expected exception. Found " + resource);
+ } catch(Exception excp) {
+ // ignore
+ }
+ }
+
+ @Test
+ public void testHbaseResourceFromMissingColumnName() throws Exception {
+ Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+ entAttribs.put(AtlasHbaseResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, COLUMN_FAMILY_QUALIFIED_NAME);
+
+ IReferenceableInstance entity = getHbaseColumnEntity(entAttribs);
+
+ try {
+ RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+ Assert.fail("expected exception. Found " + resource);
+ } catch(Exception excp) {
+ // ignore
+ }
+ }
+
+ @Test
+ public void testHbaseDottedTable() throws Exception {
+ Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+ entAttribs.put(AtlasHbaseResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, DOTTED_TABLE_QUALIFIED_NAME);
+
+ IReferenceableInstance entity = getHbaseTableEntity(entAttribs);
+ RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+ assertTableResource(resource, true);
+ }
+
+ @Test
+ public void testHbaseDottedColumnFamily() throws Exception {
+ Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+ entAttribs.put(AtlasHbaseResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, DOTTED_COLUMN_FAMILY_QUALIFIED_NAME);
+
+ IReferenceableInstance entity = getHbaseColumnFamilyEntity(entAttribs);
+ RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+ assertColumnFamilyResource(resource, true);
+ }
+
+ @Test
+ public void testHbaseDottedColumn() throws Exception {
+ Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+ entAttribs.put(AtlasHbaseResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, DOTTED_COLUMN_QUALIFIED_NAME);
+
+ IReferenceableInstance entity = getHbaseColumnEntity(entAttribs);
+ RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+ assertColumnResource(resource, true);
+ }
+
+ private IReferenceableInstance getHbaseTableEntity(Map<String, Object> entAttribs) throws Exception {
+ IReferenceableInstance entity = Mockito.mock(IReferenceableInstance.class);
+
+ Mockito.when(entity.getTypeName()).thenReturn(AtlasHbaseResourceMapper.ENTITY_TYPE_HBASE_TABLE);
+ Mockito.when(entity.getValuesMap()).thenReturn(entAttribs);
+
+ return entity;
+ }
+
+ private IReferenceableInstance getHbaseColumnFamilyEntity(Map<String, Object> entAttribs) throws Exception {
+ IReferenceableInstance entity = Mockito.mock(IReferenceableInstance.class);
+
+ Mockito.when(entity.getTypeName()).thenReturn(AtlasHbaseResourceMapper.ENTITY_TYPE_HBASE_COLUMN_FAMILY);
+ Mockito.when(entity.getValuesMap()).thenReturn(entAttribs);
+
+ return entity;
+ }
+
+ private IReferenceableInstance getHbaseColumnEntity(Map<String, Object> entAttribs) throws Exception {
+ IReferenceableInstance entity = Mockito.mock(IReferenceableInstance.class);
+
+ Mockito.when(entity.getTypeName()).thenReturn(AtlasHbaseResourceMapper.ENTITY_TYPE_HBASE_COLUMN);
+ Mockito.when(entity.getValuesMap()).thenReturn(entAttribs);
+
+ return entity;
+ }
+
+ private void assertServiceResource(RangerServiceResource resource) {
+ Assert.assertNotNull(resource);
+ Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
+ Assert.assertNotNull(resource.getResourceElements());
+ }
+
+ private void assertTableResource(RangerServiceResource resource, boolean isDottedTable) {
+ assertServiceResource(resource);
+
+ Assert.assertEquals(1, resource.getResourceElements().size());
+
+ Assert.assertTrue(resource.getResourceElements().containsKey(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE));
+ Assert.assertNotNull(resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues());
+ Assert.assertEquals(1, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues().size());
+ Assert.assertEquals(isDottedTable ? DOTTED_RANGER_TABLE : RANGER_TABLE, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues().get(0));
+ }
+
+ private void assertColumnFamilyResource(RangerServiceResource resource, boolean isDottedTable) {
+ assertServiceResource(resource);
+
+ Assert.assertEquals(2, resource.getResourceElements().size());
+
+ Assert.assertTrue(resource.getResourceElements().containsKey(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE));
+ Assert.assertNotNull(resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues());
+ Assert.assertEquals(1, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues().size());
+ Assert.assertEquals(isDottedTable ? DOTTED_RANGER_TABLE : RANGER_TABLE, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues().get(0));
+
+ Assert.assertTrue(resource.getResourceElements().containsKey(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN_FAMILY));
+ Assert.assertNotNull(resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN_FAMILY).getValues());
+ Assert.assertEquals(1, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN_FAMILY).getValues().size());
+ Assert.assertEquals(RANGER_COLUMN_FAMILY, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN_FAMILY).getValues().get(0));
+ }
+
+ private void assertColumnResource(RangerServiceResource resource, boolean isDottedTable) {
+ assertServiceResource(resource);
+
+ Assert.assertEquals(3, resource.getResourceElements().size());
+
+ Assert.assertTrue(resource.getResourceElements().containsKey(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE));
+ Assert.assertNotNull(resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues());
+ Assert.assertEquals(1, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues().size());
+ Assert.assertEquals(isDottedTable ? DOTTED_RANGER_TABLE : RANGER_TABLE, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues().get(0));
+
+ Assert.assertTrue(resource.getResourceElements().containsKey(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN_FAMILY));
+ Assert.assertNotNull(resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN_FAMILY).getValues());
+ Assert.assertEquals(1, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN_FAMILY).getValues().size());
+ Assert.assertEquals(RANGER_COLUMN_FAMILY, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN_FAMILY).getValues().get(0));
+
+ Assert.assertTrue(resource.getResourceElements().containsKey(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN));
+ Assert.assertNotNull(resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN).getValues());
+ Assert.assertEquals(1, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN).getValues().size());
+ Assert.assertEquals(RANGER_COLUMN, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN).getValues().get(0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/de8f444d/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestKafkaResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestKafkaResourceMapper.java b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestKafkaResourceMapper.java
new file mode 100644
index 0000000..3beb82f
--- /dev/null
+++ b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestKafkaResourceMapper.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.process;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.source.atlas.AtlasKafkaResourceMapper;
+import org.junit.Test;
+
+import org.mockito.Mockito;
+import org.junit.Assert;
+
+public class TestKafkaResourceMapper {
+ private static final String CLUSTER_NAME = "cl1";
+ private static final String TOPIC = "kafka-topic";
+ private static final String QUALIFIED_NAME = "kafka-topic@cl1";
+
+ private static final String SERVICE_NAME = "cl1_kafka";
+ private static final String RANGER_TOPIC = "kafka-topic";
+
+ AtlasKafkaResourceMapper resourceMapper = new AtlasKafkaResourceMapper();
+
+ @Test
+ public void testKafkaResourceFromQualifiedName() throws Exception {
+ Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+ entAttribs.put(AtlasKafkaResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, QUALIFIED_NAME);
+
+ IReferenceableInstance entity = getKafkaTopicEntity(entAttribs);
+ RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+ assertServiceResource(resource);
+ }
+
+ @Test
+ public void testKafkaResourceFromOnlyTopic() throws Exception {
+ Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+ entAttribs.put(AtlasKafkaResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, TOPIC);
+
+ IReferenceableInstance entity = getKafkaTopicEntity(entAttribs);
+
+ try {
+ RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+ Assert.fail("expected exception. Found " + resource);
+ } catch(Exception excp) {
+ // ignore
+ }
+ }
+
+ @Test
+ public void testKafkaResourceFromOnlyClusterName() throws Exception {
+ Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+ entAttribs.put(AtlasKafkaResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, CLUSTER_NAME);
+
+ IReferenceableInstance entity = getKafkaTopicEntity(entAttribs);
+
+ try {
+ RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+ Assert.fail("expected exception. Found " + resource);
+ } catch(Exception excp) {
+ // ignore
+ }
+ }
+
+ @Test
+ public void testKafkaResourceFromMissingAttribs() throws Exception {
+ Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+ IReferenceableInstance entity = getKafkaTopicEntity(entAttribs);
+
+ try {
+ RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+ Assert.fail("expected exception. Found " + resource);
+ } catch(Exception excp) {
+ // ignore
+ }
+ }
+
+ private IReferenceableInstance getKafkaTopicEntity(Map<String, Object> entAttribs) throws Exception {
+ IReferenceableInstance entity = Mockito.mock(IReferenceableInstance.class);
+
+ Mockito.when(entity.getTypeName()).thenReturn(AtlasKafkaResourceMapper.ENTITY_TYPE_KAFKA_TOPIC);
+ Mockito.when(entity.getValuesMap()).thenReturn(entAttribs);
+
+ return entity;
+ }
+
+ private void assertServiceResource(RangerServiceResource resource) {
+ Assert.assertNotNull(resource);
+ Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
+ Assert.assertNotNull(resource.getResourceElements());
+ Assert.assertEquals(1, resource.getResourceElements().size());
+ Assert.assertTrue(resource.getResourceElements().containsKey(AtlasKafkaResourceMapper.RANGER_TYPE_KAFKA_TOPIC));
+ Assert.assertNotNull(resource.getResourceElements().get(AtlasKafkaResourceMapper.RANGER_TYPE_KAFKA_TOPIC).getValues());
+ Assert.assertEquals(1, resource.getResourceElements().get(AtlasKafkaResourceMapper.RANGER_TYPE_KAFKA_TOPIC).getValues().size());
+ Assert.assertEquals(RANGER_TOPIC, resource.getResourceElements().get(AtlasKafkaResourceMapper.RANGER_TYPE_KAFKA_TOPIC).getValues().get(0));
+ }
+}