You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2016/11/26 10:50:35 UTC

incubator-ranger git commit: RANGER-1233: TagSync updated to support Atlas notifications for Kafka and HBase entities

Repository: incubator-ranger
Updated Branches:
  refs/heads/master 35ff7ee79 -> de8f444d2


RANGER-1233: TagSync updated to support Atlas notifications for Kafka and HBase entities

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/de8f444d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/de8f444d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/de8f444d

Branch: refs/heads/master
Commit: de8f444d27eebfa633ad88e14811bfc1e5d39bd7
Parents: 35ff7ee
Author: Abhay Kulkarni <ak...@hortonworks.com>
Authored: Tue Nov 22 17:01:07 2016 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Sat Nov 26 02:49:30 2016 -0800

----------------------------------------------------------------------
 .../source/atlas/AtlasHbaseResourceMapper.java  | 139 ++++++++++
 .../source/atlas/AtlasKafkaResourceMapper.java  |  77 ++++++
 .../source/atlas/AtlasResourceMapperUtil.java   |   2 +
 .../process/TestHbaseResourceMapper.java        | 251 +++++++++++++++++++
 .../process/TestKafkaResourceMapper.java        | 123 +++++++++
 5 files changed, 592 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/de8f444d/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
new file mode 100644
index 0000000..8b36a31
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.commons.lang.StringUtils;
+import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
+import org.apache.ranger.plugin.model.RangerServiceResource;
+
+public class AtlasHbaseResourceMapper extends AtlasResourceMapper {
+	public static final String ENTITY_TYPE_HBASE_TABLE          = "hbase_table";
+	public static final String ENTITY_TYPE_HBASE_COLUMN_FAMILY  = "hbase_column_family";
+	public static final String ENTITY_TYPE_HBASE_COLUMN         = "hbase_column";
+
+	public static final String RANGER_TYPE_HBASE_TABLE          = "table";
+	public static final String RANGER_TYPE_HBASE_COLUMN_FAMILY  = "column-family";
+	public static final String RANGER_TYPE_HBASE_COLUMN         = "column";
+
+	public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME  = "qualifiedName";
+	public static final String QUALIFIED_NAME_DELIMITER         = "\\.";
+	public static final Character QUALIFIED_NAME_DELIMITER_CHAR    = '.';
+
+	public static final String[] SUPPORTED_ENTITY_TYPES = { ENTITY_TYPE_HBASE_TABLE, ENTITY_TYPE_HBASE_COLUMN_FAMILY, ENTITY_TYPE_HBASE_COLUMN };
+
+	public AtlasHbaseResourceMapper() {
+		super("hbase", SUPPORTED_ENTITY_TYPES);
+	}
+
+	@Override
+	public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception {
+		String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+		if (StringUtils.isEmpty(qualifiedName)) {
+			throw new Exception("attribute '" +  ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity");
+		}
+
+		String resourceStr = getResourceNameFromQualifiedName(qualifiedName);
+		if (StringUtils.isEmpty(resourceStr)) {
+			throwExceptionWithMessage("resource not found in attribute '" +  ENTITY_ATTRIBUTE_QUALIFIED_NAME + "': " + qualifiedName);
+		}
+
+		String clusterName = getClusterNameFromQualifiedName(qualifiedName);
+		if (StringUtils.isEmpty(clusterName)) {
+			throwExceptionWithMessage("cluster-name not found in attribute '" +  ENTITY_ATTRIBUTE_QUALIFIED_NAME + "': " + qualifiedName);
+		}
+
+		String entityType  = entity.getTypeName();
+		String entityGuid  = entity.getId() != null ? entity.getId()._getId() : null;
+		String serviceName = getRangerServiceName(clusterName);
+
+		Map<String, RangerPolicyResource> elements = new HashMap<String, RangerPolicyResource>();
+
+		if (StringUtils.equals(entityType, ENTITY_TYPE_HBASE_TABLE)) {
+			String tblName = resourceStr;
+			if (StringUtils.isNotEmpty(tblName)) {
+				elements.put(RANGER_TYPE_HBASE_TABLE, new RangerPolicyResource(tblName));
+			}
+		} else if (StringUtils.equals(entityType, ENTITY_TYPE_HBASE_COLUMN_FAMILY)) {
+			String[] resources  = resourceStr.split(QUALIFIED_NAME_DELIMITER);
+			String   tblName    = null;
+			String   familyName = null;
+
+			if (resources.length == 2) {
+				tblName    = resources[0];
+				familyName = resources[1];
+			} else if (resources.length > 2) {
+				StringBuffer tblNameBuf = new StringBuffer(resources[0]);
+
+				for (int i = 1; i < resources.length - 1; i++) {
+					tblNameBuf.append(QUALIFIED_NAME_DELIMITER_CHAR).append(resources[i]);
+				}
+
+				tblName = tblNameBuf.toString();
+				familyName = resources[resources.length - 1];
+			}
+
+			if (StringUtils.isNotEmpty(tblName) && StringUtils.isNotEmpty(familyName)) {
+				elements.put(RANGER_TYPE_HBASE_TABLE, new RangerPolicyResource(tblName));
+				elements.put(RANGER_TYPE_HBASE_COLUMN_FAMILY, new RangerPolicyResource(familyName));
+			}
+		} else if (StringUtils.equals(entityType, ENTITY_TYPE_HBASE_COLUMN)) {
+			String[] resources  = resourceStr.split(QUALIFIED_NAME_DELIMITER);
+			String   tblName    = null;
+			String   familyName = null;
+			String   colName    = null;
+
+			if (resources.length == 3) {
+				tblName    = resources[0];
+				familyName = resources[1];
+				colName    = resources[2];
+			} else if (resources.length > 3) {
+				StringBuffer tblNameBuf = new StringBuffer(resources[0]);
+
+				for (int i = 1; i < resources.length - 2; i++) {
+					tblNameBuf.append(QUALIFIED_NAME_DELIMITER_CHAR).append(resources[i]);
+				}
+
+				tblName    = tblNameBuf.toString();
+				familyName = resources[resources.length - 2];
+				colName    = resources[resources.length - 1];
+			}
+
+			if (StringUtils.isNotEmpty(tblName) && StringUtils.isNotEmpty(familyName) && StringUtils.isNotEmpty(colName)) {
+				elements.put(RANGER_TYPE_HBASE_TABLE, new RangerPolicyResource(tblName));
+				elements.put(RANGER_TYPE_HBASE_COLUMN_FAMILY, new RangerPolicyResource(familyName));
+				elements.put(RANGER_TYPE_HBASE_COLUMN, new RangerPolicyResource(colName));
+			}
+		} else {
+			throwExceptionWithMessage("unrecognized entity-type: " + entityType);
+		}
+
+		if(elements.isEmpty()) {
+			throwExceptionWithMessage("invalid qualifiedName for entity-type '" + entityType + "': " + qualifiedName);
+		}
+
+		RangerServiceResource ret = new RangerServiceResource(entityGuid, serviceName, elements);
+
+		return ret;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/de8f444d/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
new file mode 100644
index 0000000..272328e
--- /dev/null
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.source.atlas;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.commons.lang.StringUtils;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
+import org.apache.ranger.plugin.model.RangerServiceResource;
+
+public class AtlasKafkaResourceMapper extends AtlasResourceMapper {
+	public static final String ENTITY_TYPE_KAFKA_TOPIC = "kafka_topic";
+	public static final String RANGER_TYPE_KAFKA_TOPIC = "topic";
+
+	public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+
+	public static final String[] SUPPORTED_ENTITY_TYPES = { ENTITY_TYPE_KAFKA_TOPIC };
+
+	public AtlasKafkaResourceMapper() {
+		super("kafka", SUPPORTED_ENTITY_TYPES);
+	}
+
+	@Override
+	public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception {
+		String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+
+		String topic = getResourceNameFromQualifiedName(qualifiedName);
+
+		if(StringUtils.isEmpty(topic)) {
+			throwExceptionWithMessage("topic not found in attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME +  "'");
+		}
+
+		String clusterName = getClusterNameFromQualifiedName(qualifiedName);
+
+		if(StringUtils.isEmpty(clusterName)) {
+			clusterName = defaultClusterName;
+		}
+
+		if(StringUtils.isEmpty(clusterName)) {
+			throwExceptionWithMessage("attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME +  "' not found in entity");
+		}
+
+
+		Map<String, RangerPolicyResource> elements = new HashMap<String, RangerPolicy.RangerPolicyResource>();
+		Boolean isExcludes  = Boolean.FALSE;
+		Boolean isRecursive = Boolean.TRUE;
+
+		elements.put(RANGER_TYPE_KAFKA_TOPIC, new RangerPolicyResource(topic, isExcludes, isRecursive));
+
+		String  entityGuid  = entity.getId() != null ? entity.getId()._getId() : null;
+		String  serviceName = getRangerServiceName(clusterName);
+
+		RangerServiceResource ret = new RangerServiceResource(entityGuid, serviceName, elements);
+
+		return ret;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/de8f444d/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
index 14b2001..b8f5c46 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
@@ -87,6 +87,8 @@ public class AtlasResourceMapperUtil {
 		List<String> mapperNames = new ArrayList<String>();
 		mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasHiveResourceMapper");
 		mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasHdfsResourceMapper");
+		mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasHbaseResourceMapper");
+		mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasKafkaResourceMapper");
 
 		if (StringUtils.isNotBlank(customMapperNames)) {
 			for(String customMapperName : customMapperNames.split(MAPPER_NAME_DELIMIER)) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/de8f444d/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestHbaseResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestHbaseResourceMapper.java b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestHbaseResourceMapper.java
new file mode 100644
index 0000000..e990c28
--- /dev/null
+++ b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestHbaseResourceMapper.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.process;
+
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.source.atlas.AtlasHbaseResourceMapper;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestHbaseResourceMapper {
+    private static final String TABLE_QUALIFIED_NAME          = "table@cl1";
+    private static final String COLUMN_FAMILY_QUALIFIED_NAME  = "table.family@cl1";
+    private static final String COLUMN_QUALIFIED_NAME         = "table.family.column@cl1";
+
+    private static final String DOTTED_TABLE_QUALIFIED_NAME   = "table.prefix.1@cl1";
+    private static final String DOTTED_COLUMN_FAMILY_QUALIFIED_NAME  = "table.prefix.1.family@cl1";
+    private static final String DOTTED_COLUMN_QUALIFIED_NAME         = "table.prefix.1.family.column@cl1";
+
+    private static final String SERVICE_NAME            = "cl1_hbase";
+    private static final String RANGER_TABLE            = "table";
+    private static final String RANGER_COLUMN_FAMILY    = "family";
+    private static final String RANGER_COLUMN           = "column";
+
+    private static final String DOTTED_RANGER_TABLE     = "table.prefix.1";
+
+    AtlasHbaseResourceMapper resourceMapper = new AtlasHbaseResourceMapper();
+
+    @Test
+    public void testHbaseTable() throws Exception {
+        Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+        entAttribs.put(AtlasHbaseResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, TABLE_QUALIFIED_NAME);
+
+        IReferenceableInstance entity   = getHbaseTableEntity(entAttribs);
+        RangerServiceResource  resource = resourceMapper.buildResource(entity);
+
+        assertTableResource(resource, false);
+    }
+
+    @Test
+    public void testHbaseColumnFamily() throws Exception {
+        Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+        entAttribs.put(AtlasHbaseResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, COLUMN_FAMILY_QUALIFIED_NAME);
+
+        IReferenceableInstance entity   = getHbaseColumnFamilyEntity(entAttribs);
+        RangerServiceResource  resource = resourceMapper.buildResource(entity);
+
+        assertColumnFamilyResource(resource, false);
+    }
+
+    @Test
+    public void testHbaseColumn() throws Exception {
+        Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+        entAttribs.put(AtlasHbaseResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, COLUMN_QUALIFIED_NAME);
+
+        IReferenceableInstance entity   = getHbaseColumnEntity(entAttribs);
+        RangerServiceResource  resource = resourceMapper.buildResource(entity);
+
+        assertColumnResource(resource, false);
+    }
+
+    @Test
+    public void testHbaseResourceFromMissingAttribs() throws Exception {
+        Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+        IReferenceableInstance entity = getHbaseTableEntity(entAttribs);
+
+        try {
+            RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+            Assert.fail("expected exception. Found " + resource);
+        } catch(Exception excp) {
+            // ignore
+        }
+    }
+
+    @Test
+    public void testHbaseResourceFromMissingColumnFamilyName() throws Exception {
+        Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+        entAttribs.put(AtlasHbaseResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, TABLE_QUALIFIED_NAME);
+
+        IReferenceableInstance entity = getHbaseColumnFamilyEntity(entAttribs);
+
+        try {
+            RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+            Assert.fail("expected exception. Found " + resource);
+        } catch(Exception excp) {
+            // ignore
+        }
+    }
+
+    @Test
+    public void testHbaseResourceFromMissingColumnName() throws Exception {
+        Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+        entAttribs.put(AtlasHbaseResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, COLUMN_FAMILY_QUALIFIED_NAME);
+
+        IReferenceableInstance entity = getHbaseColumnEntity(entAttribs);
+
+        try {
+            RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+            Assert.fail("expected exception. Found " + resource);
+        } catch(Exception excp) {
+            // ignore
+        }
+    }
+
+    @Test
+    public void testHbaseDottedTable() throws Exception {
+        Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+        entAttribs.put(AtlasHbaseResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, DOTTED_TABLE_QUALIFIED_NAME);
+
+        IReferenceableInstance entity   = getHbaseTableEntity(entAttribs);
+        RangerServiceResource  resource = resourceMapper.buildResource(entity);
+
+        assertTableResource(resource, true);
+    }
+
+    @Test
+    public void testHbaseDottedColumnFamily() throws Exception {
+        Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+        entAttribs.put(AtlasHbaseResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, DOTTED_COLUMN_FAMILY_QUALIFIED_NAME);
+
+        IReferenceableInstance entity   = getHbaseColumnFamilyEntity(entAttribs);
+        RangerServiceResource  resource = resourceMapper.buildResource(entity);
+
+        assertColumnFamilyResource(resource, true);
+    }
+
+    @Test
+    public void testHbaseDottedColumn() throws Exception {
+        Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+        entAttribs.put(AtlasHbaseResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, DOTTED_COLUMN_QUALIFIED_NAME);
+
+        IReferenceableInstance entity   = getHbaseColumnEntity(entAttribs);
+        RangerServiceResource  resource = resourceMapper.buildResource(entity);
+
+        assertColumnResource(resource, true);
+    }
+
+    private IReferenceableInstance getHbaseTableEntity(Map<String, Object> entAttribs) throws Exception {
+        IReferenceableInstance entity = Mockito.mock(IReferenceableInstance.class);
+
+        Mockito.when(entity.getTypeName()).thenReturn(AtlasHbaseResourceMapper.ENTITY_TYPE_HBASE_TABLE);
+        Mockito.when(entity.getValuesMap()).thenReturn(entAttribs);
+
+        return entity;
+    }
+
+    private IReferenceableInstance getHbaseColumnFamilyEntity(Map<String, Object> entAttribs) throws Exception {
+        IReferenceableInstance entity = Mockito.mock(IReferenceableInstance.class);
+
+        Mockito.when(entity.getTypeName()).thenReturn(AtlasHbaseResourceMapper.ENTITY_TYPE_HBASE_COLUMN_FAMILY);
+        Mockito.when(entity.getValuesMap()).thenReturn(entAttribs);
+
+        return entity;
+    }
+
+    private IReferenceableInstance getHbaseColumnEntity(Map<String, Object> entAttribs) throws Exception {
+        IReferenceableInstance entity = Mockito.mock(IReferenceableInstance.class);
+
+        Mockito.when(entity.getTypeName()).thenReturn(AtlasHbaseResourceMapper.ENTITY_TYPE_HBASE_COLUMN);
+        Mockito.when(entity.getValuesMap()).thenReturn(entAttribs);
+
+        return entity;
+    }
+
+    private void assertServiceResource(RangerServiceResource resource) {
+        Assert.assertNotNull(resource);
+        Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
+        Assert.assertNotNull(resource.getResourceElements());
+    }
+
+    private void assertTableResource(RangerServiceResource resource, boolean isDottedTable) {
+        assertServiceResource(resource);
+
+        Assert.assertEquals(1, resource.getResourceElements().size());
+
+        Assert.assertTrue(resource.getResourceElements().containsKey(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE));
+        Assert.assertNotNull(resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues());
+        Assert.assertEquals(1, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues().size());
+        Assert.assertEquals(isDottedTable ? DOTTED_RANGER_TABLE : RANGER_TABLE, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues().get(0));
+    }
+
+    private void assertColumnFamilyResource(RangerServiceResource resource, boolean isDottedTable) {
+        assertServiceResource(resource);
+
+        Assert.assertEquals(2, resource.getResourceElements().size());
+
+        Assert.assertTrue(resource.getResourceElements().containsKey(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE));
+        Assert.assertNotNull(resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues());
+        Assert.assertEquals(1, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues().size());
+        Assert.assertEquals(isDottedTable ? DOTTED_RANGER_TABLE : RANGER_TABLE, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues().get(0));
+
+        Assert.assertTrue(resource.getResourceElements().containsKey(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN_FAMILY));
+        Assert.assertNotNull(resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN_FAMILY).getValues());
+        Assert.assertEquals(1, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN_FAMILY).getValues().size());
+        Assert.assertEquals(RANGER_COLUMN_FAMILY, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN_FAMILY).getValues().get(0));
+    }
+
+    private void assertColumnResource(RangerServiceResource resource, boolean isDottedTable) {
+        assertServiceResource(resource);
+
+        Assert.assertEquals(3, resource.getResourceElements().size());
+
+        Assert.assertTrue(resource.getResourceElements().containsKey(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE));
+        Assert.assertNotNull(resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues());
+        Assert.assertEquals(1, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues().size());
+        Assert.assertEquals(isDottedTable ? DOTTED_RANGER_TABLE : RANGER_TABLE, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_TABLE).getValues().get(0));
+
+        Assert.assertTrue(resource.getResourceElements().containsKey(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN_FAMILY));
+        Assert.assertNotNull(resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN_FAMILY).getValues());
+        Assert.assertEquals(1, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN_FAMILY).getValues().size());
+        Assert.assertEquals(RANGER_COLUMN_FAMILY, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN_FAMILY).getValues().get(0));
+
+        Assert.assertTrue(resource.getResourceElements().containsKey(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN));
+        Assert.assertNotNull(resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN).getValues());
+        Assert.assertEquals(1, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN).getValues().size());
+        Assert.assertEquals(RANGER_COLUMN, resource.getResourceElements().get(AtlasHbaseResourceMapper.RANGER_TYPE_HBASE_COLUMN).getValues().get(0));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/de8f444d/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestKafkaResourceMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestKafkaResourceMapper.java b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestKafkaResourceMapper.java
new file mode 100644
index 0000000..3beb82f
--- /dev/null
+++ b/tagsync/src/test/java/org/apache/ranger/tagsync/process/TestKafkaResourceMapper.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.process;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.atlas.typesystem.IReferenceableInstance;
+import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.source.atlas.AtlasKafkaResourceMapper;
+import org.junit.Test;
+
+import org.mockito.Mockito;
+import org.junit.Assert;
+
+public class TestKafkaResourceMapper {
+    private static final String CLUSTER_NAME    = "cl1";
+    private static final String TOPIC           = "kafka-topic";
+    private static final String QUALIFIED_NAME  = "kafka-topic@cl1";
+
+    private static final String SERVICE_NAME    = "cl1_kafka";
+    private static final String RANGER_TOPIC    = "kafka-topic";
+
+    AtlasKafkaResourceMapper resourceMapper = new AtlasKafkaResourceMapper();
+
+    @Test
+    public void testKafkaResourceFromQualifiedName() throws Exception {
+        Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+        entAttribs.put(AtlasKafkaResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, QUALIFIED_NAME);
+
+        IReferenceableInstance entity   = getKafkaTopicEntity(entAttribs);
+        RangerServiceResource  resource = resourceMapper.buildResource(entity);
+
+        assertServiceResource(resource);
+    }
+
+    @Test
+    public void testKafkaResourceFromOnlyTopic() throws Exception {
+        Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+        entAttribs.put(AtlasKafkaResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, TOPIC);
+
+        IReferenceableInstance entity   = getKafkaTopicEntity(entAttribs);
+
+        try {
+            RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+            Assert.fail("expected exception. Found " + resource);
+        } catch(Exception excp) {
+            // ignore
+        }
+    }
+
+    @Test
+    public void testKafkaResourceFromOnlyClusterName() throws Exception {
+        Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+        entAttribs.put(AtlasKafkaResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME, CLUSTER_NAME);
+
+        IReferenceableInstance entity   = getKafkaTopicEntity(entAttribs);
+
+        try {
+            RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+            Assert.fail("expected exception. Found " + resource);
+        } catch(Exception excp) {
+            // ignore
+        }
+    }
+
+    @Test
+    public void testKafkaResourceFromMissingAttribs() throws Exception {
+        Map<String, Object> entAttribs = new HashMap<String, Object>();
+
+        IReferenceableInstance entity  = getKafkaTopicEntity(entAttribs);
+
+        try {
+            RangerServiceResource resource = resourceMapper.buildResource(entity);
+
+            Assert.fail("expected exception. Found " + resource);
+        } catch(Exception excp) {
+            // ignore
+        }
+    }
+
+    private IReferenceableInstance getKafkaTopicEntity(Map<String, Object> entAttribs) throws Exception {
+        IReferenceableInstance entity = Mockito.mock(IReferenceableInstance.class);
+
+        Mockito.when(entity.getTypeName()).thenReturn(AtlasKafkaResourceMapper.ENTITY_TYPE_KAFKA_TOPIC);
+        Mockito.when(entity.getValuesMap()).thenReturn(entAttribs);
+
+        return entity;
+    }
+
+    private void assertServiceResource(RangerServiceResource resource) {
+        Assert.assertNotNull(resource);
+        Assert.assertEquals(SERVICE_NAME, resource.getServiceName());
+        Assert.assertNotNull(resource.getResourceElements());
+        Assert.assertEquals(1, resource.getResourceElements().size());
+        Assert.assertTrue(resource.getResourceElements().containsKey(AtlasKafkaResourceMapper.RANGER_TYPE_KAFKA_TOPIC));
+        Assert.assertNotNull(resource.getResourceElements().get(AtlasKafkaResourceMapper.RANGER_TYPE_KAFKA_TOPIC).getValues());
+        Assert.assertEquals(1, resource.getResourceElements().get(AtlasKafkaResourceMapper.RANGER_TYPE_KAFKA_TOPIC).getValues().size());
+        Assert.assertEquals(RANGER_TOPIC, resource.getResourceElements().get(AtlasKafkaResourceMapper.RANGER_TYPE_KAFKA_TOPIC).getValues().get(0));
+    }
+}