You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2019/07/12 23:27:38 UTC

[atlas] branch branch-2.0 updated (0851a38 -> b587dee)

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a change to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git.


    from 0851a38  ATLAS-3332: In relationship tab empty valued attributes are shown even though Show empty values is unchecked
     new 47c9e42  ATLAS-3211 :- Update Hive hook with Relationship Attributes.
     new 35142e5  ATLAS-3315: Database initialization exception handling.
     new b587dee  ATLAS-3321: Introduce atlas metadata namespace

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 addons/hbase-bridge/src/bin/import-hbase.sh        |   2 +-
 .../apache/atlas/hbase/bridge/HBaseAtlasHook.java  |  75 +++++------
 .../org/apache/atlas/hbase/bridge/HBaseBridge.java |  85 ++++++------
 addons/hive-bridge/src/bin/import-hive.sh          |   2 +-
 .../atlas/hive/bridge/HiveMetaStoreBridge.java     | 100 ++++++++------
 .../atlas/hive/hook/AtlasHiveHookContext.java      |  20 +--
 .../java/org/apache/atlas/hive/hook/HiveHook.java  |   9 --
 .../atlas/hive/hook/events/AlterTableRename.java   |  32 +----
 .../atlas/hive/hook/events/BaseHiveEvent.java      | 148 ++++++++++++++-------
 .../atlas/hive/hook/events/CreateHiveProcess.java  |   6 +-
 .../java/org/apache/atlas/hive/HiveITBase.java     |   2 +-
 .../atlas/hive/bridge/HiveMetaStoreBridgeTest.java |  47 ++++---
 .../atlas/impala/hook/AtlasImpalaHookContext.java  |  17 ++-
 .../atlas/impala/hook/ImpalaLineageHook.java       |  16 +--
 .../atlas/impala/hook/events/BaseImpalaEvent.java  |   2 +-
 .../apache/atlas/impala/ImpalaLineageITBase.java   |   4 +-
 .../apache/atlas/impala/ImpalaLineageToolIT.java   |  18 +--
 .../atlas/impala/hook/ImpalaLineageHookIT.java     |   2 +-
 addons/kafka-bridge/src/bin/import-kafka.sh        |   2 +-
 .../org/apache/atlas/kafka/bridge/KafkaBridge.java |  55 ++++----
 .../org/apache/atlas/sqoop/hook/SqoopHook.java     |  40 +++---
 .../apache/atlas/storm/hook/StormAtlasHook.java    |  57 ++++----
 .../main/java/org/apache/atlas/AtlasConstants.java |  20 +--
 .../graphdb/janus/AtlasJanusGraphDatabase.java     |  32 +++--
 .../graphdb/janus/AtlasJanusDatabaseTest.java      |  36 ++++-
 .../atlas/model/instance/AtlasRelatedObjectId.java |   6 +
 .../java/org/apache/atlas/type/AtlasTypeUtil.java  |   4 +
 .../main/java/org/apache/atlas/hook/AtlasHook.java |  18 ++-
 28 files changed, 478 insertions(+), 379 deletions(-)


[atlas] 02/03: ATLAS-3315: Database initialization exception handling.

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 35142e55ac52fdd46a86f770d71328c72031be55
Author: Ashutosh Mestry <am...@hortonworks.com>
AuthorDate: Mon Jul 8 11:14:31 2019 -0700

    ATLAS-3315: Database initialization exception handling.
    
    (cherry picked from commit 8188a96e2aa3114502098eced91edfe3b1fb3674)
---
 .../graphdb/janus/AtlasJanusGraphDatabase.java     | 32 +++++++++++--------
 .../graphdb/janus/AtlasJanusDatabaseTest.java      | 36 +++++++++++++++++++++-
 2 files changed, 54 insertions(+), 14 deletions(-)

diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
index c8c6a52..e9b4b09 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
@@ -18,6 +18,7 @@
 
 package org.apache.atlas.repository.graphdb.janus;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
@@ -165,27 +166,32 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
                         throw new RuntimeException(e);
                     }
 
-                    try {
-                        graphInstance = JanusGraphFactory.open(config);
-                    } catch (JanusGraphException e) {
-                        LOG.warn("JanusGraphException: {}", e.getMessage());
-                        if (e.getMessage().startsWith(OLDER_STORAGE_EXCEPTION)) {
-                            LOG.info("Newer client is being used with older janus storage version. Setting allow-upgrade=true and reattempting connection");
-                            config.addProperty("graph.allow-upgrade", true);
-                            graphInstance = JanusGraphFactory.open(config);
-                        }
-                        else {
-                            throw new RuntimeException(e);
-                        }
-                    }
+                    graphInstance = initJanusGraph(config);
                     atlasGraphInstance = new AtlasJanusGraph();
                     validateIndexBackend(config);
+
                 }
             }
         }
         return graphInstance;
     }
 
+    @VisibleForTesting
+    static JanusGraph initJanusGraph(Configuration config) {
+        try {
+            return JanusGraphFactory.open(config);
+        } catch (JanusGraphException e) {
+            LOG.warn("JanusGraphException: {}", e.getMessage());
+            if (e.getMessage().startsWith(OLDER_STORAGE_EXCEPTION)) {
+                LOG.info("Newer client is being used with older janus storage version. Setting allow-upgrade=true and reattempting connection");
+                config.addProperty("graph.allow-upgrade", true);
+                return JanusGraphFactory.open(config);
+            } else {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
     public static JanusGraph getBulkLoadingGraphInstance() {
         try {
             Configuration cfg = getConfiguration();
diff --git a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusDatabaseTest.java b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusDatabaseTest.java
index 3ab3c1e..5cd5509 100644
--- a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusDatabaseTest.java
+++ b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusDatabaseTest.java
@@ -31,6 +31,7 @@ import org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator;
 import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
+import org.apache.commons.configuration.Configuration;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
@@ -39,9 +40,18 @@ import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
-import static org.testng.Assert.*;
+import static org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase.initJanusGraph;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 /**
  * Sanity test of basic graph operations using the Janus graphdb
@@ -81,6 +91,30 @@ public class AtlasJanusDatabaseTest {
     }
 
     @Test
+    public void initializationFailureShouldThrowRuntimeException() throws AtlasException {
+        Configuration cfg = AtlasJanusGraphDatabase.getConfiguration();
+        Map<String, Object> cfgBackup = new HashMap<>();
+        Iterator<String> keys = cfg.getKeys();
+        while(keys.hasNext()) {
+            String key = keys.next();
+            cfgBackup.put(key, cfg.getProperty(key));
+        }
+
+        try {
+            cfg.clear();
+            initJanusGraph(cfg);
+
+            fail("Should have thrown an exception!");
+        }
+        catch (RuntimeException ex) {
+            assertTrue(true);
+            for (Map.Entry<String, Object> entry : cfgBackup.entrySet()) {
+                cfg.setProperty(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    @Test
     public <V, E> void testPropertyDataTypes() {
 
         // primitives


[atlas] 01/03: ATLAS-3211 :- Update Hive hook with Relationship Attributes.

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 47c9e42db1ca8785c764302e497c24cb8cac6cc7
Author: Mandar Ambawane <ma...@freestoneinfotech.com>
AuthorDate: Mon Jul 8 20:26:20 2019 +0530

    ATLAS-3211 :- Update Hive hook with Relationship Attributes.
    
    Signed-off-by: nixonrodrigues <ni...@apache.org>
    (cherry picked from commit 5c0025177e6520564921c46d9a442e7da6f162c5)
---
 .../atlas/hive/hook/events/AlterTableRename.java   | 32 ++------
 .../atlas/hive/hook/events/BaseHiveEvent.java      | 89 +++++++++++++++++-----
 .../atlas/hive/hook/events/CreateHiveProcess.java  |  6 +-
 .../atlas/model/instance/AtlasRelatedObjectId.java |  6 ++
 .../java/org/apache/atlas/type/AtlasTypeUtil.java  |  4 +
 5 files changed, 89 insertions(+), 48 deletions(-)

diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
index 927ddc0..c59ff7f 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java
@@ -129,17 +129,13 @@ public class AlterTableRename extends BaseHiveEvent {
         // update qualifiedName for all columns, partitionKeys, storageDesc
         String renamedTableQualifiedName = (String) renamedTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME);
 
-        renameColumns((List<AtlasObjectId>) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_COLUMNS), oldTableEntity, renamedTableQualifiedName, ret);
-        renameColumns((List<AtlasObjectId>) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_PARTITION_KEYS), oldTableEntity, renamedTableQualifiedName, ret);
+        renameColumns((List<AtlasObjectId>) oldTableEntity.getEntity().getRelationshipAttribute(ATTRIBUTE_COLUMNS), oldTableEntity, renamedTableQualifiedName, ret);
+        renameColumns((List<AtlasObjectId>) oldTableEntity.getEntity().getRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS), oldTableEntity, renamedTableQualifiedName, ret);
         renameStorageDesc(oldTableEntity, renamedTableEntity, ret);
 
-        // remove columns, partitionKeys and storageDesc - as they have already been updated above
-        removeAttribute(renamedTableEntity, ATTRIBUTE_COLUMNS);
-        removeAttribute(renamedTableEntity, ATTRIBUTE_PARTITION_KEYS);
-        removeAttribute(renamedTableEntity, ATTRIBUTE_STORAGEDESC);
-
         // set previous name as the alias
         renamedTableEntity.getEntity().setAttribute(ATTRIBUTE_ALIASES, Collections.singletonList(oldTable.getTableName()));
+        renamedTableEntity.getEntity().setRelationshipAttributes(null);
 
         String        oldTableQualifiedName = (String) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME);
         AtlasObjectId oldTableId            = new AtlasObjectId(oldTableEntity.getEntity().getTypeName(), ATTRIBUTE_QUALIFIED_NAME, oldTableQualifiedName);
@@ -179,35 +175,17 @@ public class AlterTableRename extends BaseHiveEvent {
             AtlasObjectId oldSdId = new AtlasObjectId(oldSd.getTypeName(), ATTRIBUTE_QUALIFIED_NAME, oldSd.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
 
             newSd.removeAttribute(ATTRIBUTE_TABLE);
+            newSd.setRelationshipAttributes(null);
 
             notifications.add(new EntityPartialUpdateRequestV2(getUserName(), oldSdId, new AtlasEntityWithExtInfo(newSd)));
         }
     }
 
-    private void removeAttribute(AtlasEntityWithExtInfo entity, String attributeName) {
-        Object attributeValue = entity.getEntity().getAttribute(attributeName);
-
-        entity.getEntity().getAttributes().remove(attributeName);
-
-        if (attributeValue instanceof AtlasObjectId) {
-            AtlasObjectId objectId = (AtlasObjectId) attributeValue;
-
-            entity.removeReferredEntity(objectId.getGuid());
-        } else if (attributeValue instanceof Collection) {
-            for (Object item : (Collection) attributeValue)
-                if (item instanceof AtlasObjectId) {
-                    AtlasObjectId objectId = (AtlasObjectId) item;
-
-                    entity.removeReferredEntity(objectId.getGuid());
-                }
-        }
-    }
-
     private AtlasEntity getStorageDescEntity(AtlasEntityWithExtInfo tableEntity) {
         AtlasEntity ret = null;
 
         if (tableEntity != null && tableEntity.getEntity() != null) {
-            Object attrSdId = tableEntity.getEntity().getAttribute(ATTRIBUTE_STORAGEDESC);
+            Object attrSdId = tableEntity.getEntity().getRelationshipAttribute(ATTRIBUTE_STORAGEDESC);
 
             if (attrSdId instanceof AtlasObjectId) {
                 ret = tableEntity.getReferredEntity(((AtlasObjectId) attrSdId).getGuid());
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index c15bbfa..0bf3ce2 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -26,6 +26,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.instance.AtlasStruct;
 import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.repository.Constants;
@@ -161,6 +162,20 @@ public abstract class BaseHiveEvent {
     public static final String HDFS_PATH_PREFIX                    = "hdfs://";
     public static final String EMPTY_ATTRIBUTE_VALUE = "";
 
+    public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS = "dataset_process_inputs";
+    public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS = "process_dataset_outputs";
+    public static final String RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE = "hive_process_column_lineage";
+    public static final String RELATIONSHIP_HIVE_TABLE_DB = "hive_table_db";
+    public static final String RELATIONSHIP_HIVE_TABLE_PART_KEYS = "hive_table_partitionkeys";
+    public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS = "hive_table_columns";
+    public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC = "hive_table_storagedesc";
+    public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS = "aws_s3_bucket_aws_s3_pseudo_dirs";
+    public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE = "hive_process_process_executions";
+    public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES = "hive_db_ddl_queries";
+    public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES = "hive_table_ddl_queries";
+    public static final String RELATIONSHIP_HBASE_TABLE_NAMESPACE = "hbase_table_namespace";
+
+
     public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>();
 
 
@@ -193,6 +208,28 @@ public abstract class BaseHiveEvent {
         return table.getTTable() != null ? (table.getOwner()): "";
     }
 
+
+    public static AtlasRelatedObjectId getObjectIdWithRelationshipType(AtlasEntity entity, String relationShipType) {
+        AtlasRelatedObjectId atlasRelatedObjectId = new AtlasRelatedObjectId(getObjectId(entity), relationShipType);
+        return atlasRelatedObjectId;
+    }
+
+
+
+    public static List<AtlasRelatedObjectId> getObjectIdsWithRelationshipType(List<AtlasEntity> entities,String relationshipType) {
+        final List<AtlasRelatedObjectId> ret;
+        if (CollectionUtils.isNotEmpty(entities)) {
+            ret = new ArrayList<>(entities.size());
+            for (AtlasEntity entity : entities) {
+                ret.add(getObjectIdWithRelationshipType(entity, relationshipType));
+            }
+        } else {
+            ret = Collections.emptyList();
+        }
+        return ret;
+    }
+
+
     public static AtlasObjectId getObjectId(AtlasEntity entity) {
         String        qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
         AtlasObjectId ret           = new AtlasObjectId(entity.getGuid(), entity.getTypeName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
@@ -393,7 +430,9 @@ public abstract class BaseHiveEvent {
                 long createTime     = getTableCreateTime(table);
                 long lastAccessTime = table.getLastAccessTime() > 0 ? (table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime;
 
-                ret.setAttribute(ATTRIBUTE_DB, dbId);
+                AtlasRelatedObjectId dbRelatedObject =     new AtlasRelatedObjectId(dbId, RELATIONSHIP_HIVE_TABLE_DB);
+
+                ret.setRelationshipAttribute(ATTRIBUTE_DB, dbRelatedObject );
                 ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName);
                 ret.setAttribute(ATTRIBUTE_NAME, table.getTableName().toLowerCase());
                 ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner());
@@ -420,8 +459,10 @@ public abstract class BaseHiveEvent {
                 } else {
                     AtlasObjectId     tableId       = getObjectId(ret);
                     AtlasEntity       sd            = getStorageDescEntity(tableId, table);
-                    List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys());
-                    List<AtlasEntity> columns       = getColumnEntities(tableId, table, table.getCols());
+                    List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys(), RELATIONSHIP_HIVE_TABLE_PART_KEYS);
+                    List<AtlasEntity> columns       = getColumnEntities(tableId, table, table.getCols(), RELATIONSHIP_HIVE_TABLE_COLUMNS);
+
+
 
                     if (entityExtInfo != null) {
                         entityExtInfo.addReferredEntity(sd);
@@ -439,9 +480,10 @@ public abstract class BaseHiveEvent {
                         }
                     }
 
-                    ret.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sd));
-                    ret.setAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIds(partitionKeys));
-                    ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns));
+
+                    ret.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, getObjectIdWithRelationshipType(sd, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
+                    ret.setRelationshipAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIdsWithRelationshipType(partitionKeys, RELATIONSHIP_HIVE_TABLE_PART_KEYS));
+                    ret.setRelationshipAttribute(ATTRIBUTE_COLUMNS, getObjectIdsWithRelationshipType(columns, RELATIONSHIP_HIVE_TABLE_COLUMNS));
                 }
 
                 context.putEntity(tblQualifiedName, ret);
@@ -469,7 +511,9 @@ public abstract class BaseHiveEvent {
 
             StorageDescriptor sd = table.getSd();
 
-            ret.setAttribute(ATTRIBUTE_TABLE, tableId);
+            AtlasRelatedObjectId tableRelatedObject =     new AtlasRelatedObjectId(tableId, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC);
+
+            ret.setRelationshipAttribute(ATTRIBUTE_TABLE, tableRelatedObject);
             ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName);
             ret.setAttribute(ATTRIBUTE_PARAMETERS, sd.getParameters());
             ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(sd.getLocation()));
@@ -515,7 +559,7 @@ public abstract class BaseHiveEvent {
         return ret;
     }
 
-    protected List<AtlasEntity> getColumnEntities(AtlasObjectId tableId, Table table, List<FieldSchema> fieldSchemas) {
+    protected List<AtlasEntity> getColumnEntities(AtlasObjectId tableId, Table table, List<FieldSchema> fieldSchemas, String relationshipType) {
         List<AtlasEntity> ret            = new ArrayList<>();
         boolean           isKnownTable   = tableId.getGuid() == null;
         int               columnPosition = 0;
@@ -534,8 +578,8 @@ public abstract class BaseHiveEvent {
                     if (isKnownTable) {
                         column.setGuid(null);
                     }
-
-                    column.setAttribute(ATTRIBUTE_TABLE, tableId);
+                    AtlasRelatedObjectId relatedObjectId = new AtlasRelatedObjectId(tableId, relationshipType);
+                    column.setRelationshipAttribute(ATTRIBUTE_TABLE, (relatedObjectId));
                     column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, colQualifiedName);
                     column.setAttribute(ATTRIBUTE_NAME, fieldSchema.getName());
                     column.setAttribute(ATTRIBUTE_OWNER, table.getOwner());
@@ -583,7 +627,7 @@ public abstract class BaseHiveEvent {
 
                 ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
 
-                ret.setAttribute(ATTRIBUTE_BUCKET, getObjectId(bucketEntity));
+                ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, getObjectIdWithRelationshipType(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
                 ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
                 ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
                 ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
@@ -631,8 +675,8 @@ public abstract class BaseHiveEvent {
         }
 
         ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(inputs, outputs));
-        ret.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputs));
-        ret.setAttribute(ATTRIBUTE_OUTPUTS,  getObjectIds(outputs));
+        ret.setRelationshipAttribute(ATTRIBUTE_INPUTS, getObjectIdsWithRelationshipType(inputs, RELATIONSHIP_DATASET_PROCESS_INPUTS));
+        ret.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, getObjectIdsWithRelationshipType(outputs, RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
         ret.setAttribute(ATTRIBUTE_NAME, queryStr);
         ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
 
@@ -668,8 +712,9 @@ public abstract class BaseHiveEvent {
         ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr);
         ret.setAttribute(ATTRIBUTE_QUERY_ID, getQueryId());
         ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
-        ret.setAttribute(ATTRIBUTE_HOSTNAME, getContext().getHostName());
-        ret.setRelationshipAttribute(ATTRIBUTE_PROCESS, AtlasTypeUtil.toAtlasRelatedObjectId(hiveProcess));
+        ret.setAttribute(ATTRIBUTE_HOSTNAME, getContext().getHostName()); //
+        AtlasRelatedObjectId hiveProcessRelationObjectId = AtlasTypeUtil.toAtlasRelatedObjectId(hiveProcess, RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE);
+        ret.setRelationshipAttribute(ATTRIBUTE_PROCESS, hiveProcessRelationObjectId);
         return ret;
     }
 
@@ -684,11 +729,16 @@ public abstract class BaseHiveEvent {
         if (excludeEntityGuid) {
             objId.setGuid(null);
         }
+        AtlasRelatedObjectId objIdRelatedObject =     new AtlasRelatedObjectId(objId);
 
         if (StringUtils.equals(objId.getTypeName(), HIVE_TYPE_DB)) {
-            hiveDDL = new AtlasEntity(HIVE_DB_DDL, ATTRIBUTE_DB, objId);
+            hiveDDL = new AtlasEntity(HIVE_DB_DDL);
+            objIdRelatedObject.setRelationshipType(RELATIONSHIP_HIVE_DB_DDL_QUERIES);
+            hiveDDL.setRelationshipAttribute(ATTRIBUTE_DB, objIdRelatedObject);
         } else if (StringUtils.equals(objId.getTypeName(), HIVE_TYPE_TABLE)) {
-            hiveDDL = new AtlasEntity(HIVE_TABLE_DDL, ATTRIBUTE_TABLE, objId);
+            hiveDDL = new AtlasEntity(HIVE_TABLE_DDL);
+            objIdRelatedObject.setRelationshipType(RELATIONSHIP_HIVE_TABLE_DDL_QUERIES);
+            hiveDDL.setRelationshipAttribute( ATTRIBUTE_TABLE, objIdRelatedObject);
         }
 
         if (hiveDDL != null) {
@@ -948,7 +998,10 @@ public abstract class BaseHiveEvent {
 
             ret.setAttribute(ATTRIBUTE_NAME, hbaseTableName);
             ret.setAttribute(ATTRIBUTE_URI, hbaseTableName);
-            ret.setAttribute(ATTRIBUTE_NAMESPACE, getObjectId(nsEntity));
+
+            AtlasRelatedObjectId objIdRelatedObject =     new AtlasRelatedObjectId(getObjectId(nsEntity), RELATIONSHIP_HBASE_TABLE_NAMESPACE);
+
+            ret.setRelationshipAttribute(ATTRIBUTE_NAMESPACE, objIdRelatedObject);
             ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseTableQualifiedName(getClusterName(), hbaseNameSpace, hbaseTableName));
 
             entities.addReferredEntity(nsEntity);
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
index 7791fb4..91e063e 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
@@ -207,9 +207,9 @@ public class CreateHiveProcess extends BaseHiveEvent {
 
             columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
             columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME));
-            columnLineageProcess.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputColumns));
-            columnLineageProcess.setAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getObjectId(outputColumn)));
-            columnLineageProcess.setAttribute(ATTRIBUTE_QUERY, getObjectId(hiveProcess));
+            columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_INPUTS, getObjectIdsWithRelationshipType(inputColumns, BaseHiveEvent.RELATIONSHIP_DATASET_PROCESS_INPUTS));
+            columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getObjectIdWithRelationshipType(outputColumn, BaseHiveEvent.RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
+            columnLineageProcess.setRelationshipAttribute(ATTRIBUTE_QUERY, getObjectIdWithRelationshipType(hiveProcess, BaseHiveEvent.RELATIONSHIP_HIVE_PROCESS_COLUMN_LINEAGE));
             columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, entry.getValue().getType());
             columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION, entry.getValue().getExpr());
 
diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelatedObjectId.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelatedObjectId.java
index 7c57ccf..ae6932d 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelatedObjectId.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelatedObjectId.java
@@ -83,6 +83,12 @@ public class AtlasRelatedObjectId extends AtlasObjectId implements Serializable
         super(other);
     }
 
+    public AtlasRelatedObjectId(AtlasObjectId objId, String relationshipType) {
+        this(objId);
+
+        setRelationshipType(relationshipType);
+    }
+
     public AtlasRelatedObjectId(Map objIdMap) {
         super(objIdMap);
 
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
index d74c7e3..6ac176d 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java
@@ -414,6 +414,10 @@ public class AtlasTypeUtil {
         return new AtlasRelatedObjectId(getAtlasObjectId(entity));
     }
 
+    public static AtlasRelatedObjectId toAtlasRelatedObjectId(AtlasEntity entity, String relationshipType){
+        return new AtlasRelatedObjectId(getAtlasObjectId(entity), relationshipType);
+    }
+
     public static AtlasRelatedObjectId toAtlasRelatedObjectId(AtlasEntity entity, AtlasTypeRegistry typeRegistry) {
         return new AtlasRelatedObjectId(getAtlasObjectId(entity, typeRegistry));
     }


[atlas] 03/03: ATLAS-3321: Introduce atlas metadata namespace

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit b587dee9d12118173a231fa04e76690d0694ecb3
Author: Sarath Subramanian <sa...@apache.org>
AuthorDate: Thu Jul 11 16:41:58 2019 -0700

    ATLAS-3321: Introduce atlas metadata namespace
    
    (cherry picked from commit ef4025160dd456f3ac67ef39551ba280925a6a94)
---
 addons/hbase-bridge/src/bin/import-hbase.sh        |   2 +-
 .../apache/atlas/hbase/bridge/HBaseAtlasHook.java  |  75 +++++++---------
 .../org/apache/atlas/hbase/bridge/HBaseBridge.java |  85 ++++++++++--------
 addons/hive-bridge/src/bin/import-hive.sh          |   2 +-
 .../atlas/hive/bridge/HiveMetaStoreBridge.java     | 100 ++++++++++++---------
 .../atlas/hive/hook/AtlasHiveHookContext.java      |  20 ++---
 .../java/org/apache/atlas/hive/hook/HiveHook.java  |   9 --
 .../atlas/hive/hook/events/BaseHiveEvent.java      |  61 +++++++------
 .../java/org/apache/atlas/hive/HiveITBase.java     |   2 +-
 .../atlas/hive/bridge/HiveMetaStoreBridgeTest.java |  47 +++++-----
 .../atlas/impala/hook/AtlasImpalaHookContext.java  |  17 ++--
 .../atlas/impala/hook/ImpalaLineageHook.java       |  16 +---
 .../atlas/impala/hook/events/BaseImpalaEvent.java  |   2 +-
 .../apache/atlas/impala/ImpalaLineageITBase.java   |   4 +-
 .../apache/atlas/impala/ImpalaLineageToolIT.java   |  18 ++--
 .../atlas/impala/hook/ImpalaLineageHookIT.java     |   2 +-
 addons/kafka-bridge/src/bin/import-kafka.sh        |   2 +-
 .../org/apache/atlas/kafka/bridge/KafkaBridge.java |  55 +++++++-----
 .../org/apache/atlas/sqoop/hook/SqoopHook.java     |  40 +++++----
 .../apache/atlas/storm/hook/StormAtlasHook.java    |  57 +++++-------
 .../main/java/org/apache/atlas/AtlasConstants.java |  20 ++---
 .../main/java/org/apache/atlas/hook/AtlasHook.java |  18 +++-
 22 files changed, 336 insertions(+), 318 deletions(-)

diff --git a/addons/hbase-bridge/src/bin/import-hbase.sh b/addons/hbase-bridge/src/bin/import-hbase.sh
index b9c1a4b..34c4180 100644
--- a/addons/hbase-bridge/src/bin/import-hbase.sh
+++ b/addons/hbase-bridge/src/bin/import-hbase.sh
@@ -114,7 +114,7 @@ else
     exit 1
 fi
 
-CP="${ATLASCPPATH}:${HBASE_CP}:${HADOOP_CP}"
+CP="${HBASE_CP}:${HADOOP_CP}:${ATLASCPPATH}"
 
 # If running in cygwin, convert pathnames and classpath to Windows format.
 if [ "${CYGWIN}" == "true" ]
diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
index 1825cd2..6d062e2 100644
--- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
+++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
@@ -30,7 +30,6 @@ import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV
 import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
 import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.configuration.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -55,20 +54,17 @@ public class HBaseAtlasHook extends AtlasHook {
     private static final Logger LOG = LoggerFactory.getLogger(HBaseAtlasHook.class);
 
 
-    public static final String HBASE_CLUSTER_NAME   = "atlas.cluster.name";
-    public static final String DEFAULT_CLUSTER_NAME = "primary";
-    public static final String ATTR_DESCRIPTION     = "description";
-    public static final String ATTR_ATLAS_ENDPOINT  = "atlas.rest.address";
-    public static final String ATTR_COMMENT         = "comment";
-    public static final String ATTR_PARAMETERS      = "parameters";
-    public static final String ATTR_URI             = "uri";
-    public static final String ATTR_NAMESPACE       = "namespace";
-    public static final String ATTR_TABLE           = "table";
-    public static final String ATTR_COLUMNFAMILIES  = "column_families";
-    public static final String ATTR_CREATE_TIME     = "createTime";
-    public static final String ATTR_MODIFIED_TIME   = "modifiedTime";
-    public static final String ATTR_OWNER           = "owner";
-    public static final String ATTR_NAME            = "name";
+    public static final String ATTR_DESCRIPTION           = "description";
+    public static final String ATTR_ATLAS_ENDPOINT        = "atlas.rest.address";
+    public static final String ATTR_PARAMETERS            = "parameters";
+    public static final String ATTR_URI                   = "uri";
+    public static final String ATTR_NAMESPACE             = "namespace";
+    public static final String ATTR_TABLE                 = "table";
+    public static final String ATTR_COLUMNFAMILIES        = "column_families";
+    public static final String ATTR_CREATE_TIME           = "createTime";
+    public static final String ATTR_MODIFIED_TIME         = "modifiedTime";
+    public static final String ATTR_OWNER                 = "owner";
+    public static final String ATTR_NAME                  = "name";
 
     // column addition metadata
     public static final String ATTR_TABLE_MAX_FILESIZE              = "maxFileSize";
@@ -106,7 +102,6 @@ public class HBaseAtlasHook extends AtlasHook {
     public static final String HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT = "%s:%s.%s@%s";
 
     private static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName";
-    private              String clusterName                  = null;
 
     private static volatile HBaseAtlasHook me;
 
@@ -141,7 +136,7 @@ public class HBaseAtlasHook extends AtlasHook {
                     ret = me;
 
                     if (ret == null) {
-                        me = ret = new HBaseAtlasHook(atlasProperties);
+                        me = ret = new HBaseAtlasHook();
                     }
                 }
             } catch (Exception e) {
@@ -152,15 +147,9 @@ public class HBaseAtlasHook extends AtlasHook {
         return ret;
     }
 
-    public HBaseAtlasHook(Configuration atlasProperties) {
-        this(atlasProperties.getString(HBASE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+    public HBaseAtlasHook() {
     }
 
-    public HBaseAtlasHook(String clusterName) {
-        this.clusterName = clusterName;
-    }
-
-
     public void createAtlasInstances(HBaseOperationContext hbaseOperationContext) {
         OPERATION operation = hbaseOperationContext.getOperation();
 
@@ -210,7 +199,7 @@ public class HBaseAtlasHook extends AtlasHook {
     }
 
     private void deleteNameSpaceInstance(HBaseOperationContext hbaseOperationContext) {
-        String        nameSpaceQName = getNameSpaceQualifiedName(clusterName, hbaseOperationContext.getNameSpace());
+        String        nameSpaceQName = getNameSpaceQualifiedName(getMetadataNamespace(), hbaseOperationContext.getNameSpace());
         AtlasObjectId nameSpaceId    = new AtlasObjectId(HBaseDataTypes.HBASE_NAMESPACE.getName(), REFERENCEABLE_ATTRIBUTE_NAME, nameSpaceQName);
 
         LOG.info("Delete NameSpace {}", nameSpaceQName);
@@ -259,7 +248,7 @@ public class HBaseAtlasHook extends AtlasHook {
         }
 
         String        tableNameStr = tableName.getNameAsString();
-        String        tableQName   = getTableQualifiedName(clusterName, nameSpaceName, tableNameStr);
+        String        tableQName   = getTableQualifiedName(getMetadataNamespace(), nameSpaceName, tableNameStr);
         AtlasObjectId tableId      = new AtlasObjectId(HBaseDataTypes.HBASE_TABLE.getName(), REFERENCEABLE_ATTRIBUTE_NAME, tableQName);
 
         LOG.info("Delete Table {}", tableQName);
@@ -302,7 +291,7 @@ public class HBaseAtlasHook extends AtlasHook {
 
         String        tableNameStr      = tableName.getNameAsString();
         String        columnFamilyName  = hbaseOperationContext.getColummFamily();
-        String        columnFamilyQName = getColumnFamilyQualifiedName(clusterName, nameSpaceName, tableNameStr, columnFamilyName);
+        String        columnFamilyQName = getColumnFamilyQualifiedName(getMetadataNamespace(), nameSpaceName, tableNameStr, columnFamilyName);
         AtlasObjectId columnFamilyId    = new AtlasObjectId(HBaseDataTypes.HBASE_COLUMN_FAMILY.getName(), REFERENCEABLE_ATTRIBUTE_NAME, columnFamilyQName);
 
         LOG.info("Delete ColumnFamily {}", columnFamilyQName);
@@ -314,48 +303,48 @@ public class HBaseAtlasHook extends AtlasHook {
     /**
      * Construct the qualified name used to uniquely identify a ColumnFamily instance in Atlas.
      *
-     * @param clusterName  Name of the cluster to which the HBase component belongs
+     * @param metadataNamespace  Metadata namespace of the cluster to which the HBase component belongs
      * @param nameSpace    Name of the HBase database to which the Table belongs
      * @param tableName    Name of the HBase table
      * @param columnFamily Name of the ColumnFamily
      * @return Unique qualified name to identify the Table instance in Atlas.
      */
-    public static String getColumnFamilyQualifiedName(String clusterName, String nameSpace, String tableName, String columnFamily) {
-        if (clusterName == null || nameSpace == null || tableName == null || columnFamily == null) {
+    public static String getColumnFamilyQualifiedName(String metadataNamespace, String nameSpace, String tableName, String columnFamily) {
+        if (metadataNamespace == null || nameSpace == null || tableName == null || columnFamily == null) {
             return null;
         } else {
-            return String.format(HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), stripNameSpace(tableName.toLowerCase()), columnFamily.toLowerCase(), clusterName);
+            return String.format(HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), stripNameSpace(tableName.toLowerCase()), columnFamily.toLowerCase(), metadataNamespace);
         }
     }
 
     /**
      * Construct the qualified name used to uniquely identify a Table instance in Atlas.
      *
-     * @param clusterName Name of the cluster to which the HBase component belongs
+     * @param metadataNamespace  Metadata namespace of the cluster to which the HBase component belongs
      * @param nameSpace   Name of the HBase database to which the Table belongs
      * @param tableName   Name of the HBase table
      * @return Unique qualified name to identify the Table instance in Atlas.
      */
-    public static String getTableQualifiedName(String clusterName, String nameSpace, String tableName) {
-        if (clusterName == null || nameSpace == null || tableName == null) {
+    public static String getTableQualifiedName(String metadataNamespace, String nameSpace, String tableName) {
+        if (metadataNamespace == null || nameSpace == null || tableName == null) {
             return null;
         } else {
-            return String.format(HBASE_TABLE_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), stripNameSpace(tableName.toLowerCase()), clusterName);
+            return String.format(HBASE_TABLE_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), stripNameSpace(tableName.toLowerCase()), metadataNamespace);
         }
     }
 
     /**
      * Construct the qualified name used to uniquely identify a HBase NameSpace instance in Atlas.
      *
-     * @param clusterName Name of the cluster to which the HBase component belongs
+     * @param metadataNamespace  Metadata namespace of the cluster to which the HBase component belongs
      * @param nameSpace
      * @return Unique qualified name to identify the HBase NameSpace instance in Atlas.
      */
-    public static String getNameSpaceQualifiedName(String clusterName, String nameSpace) {
-        if (clusterName == null || nameSpace == null) {
+    public static String getNameSpaceQualifiedName(String metadataNamespace, String nameSpace) {
+        if (metadataNamespace == null || nameSpace == null) {
             return null;
         } else {
-            return String.format(HBASE_NAMESPACE_QUALIFIED_NAME, nameSpace.toLowerCase(), clusterName);
+            return String.format(HBASE_NAMESPACE_QUALIFIED_NAME, nameSpace.toLowerCase(), metadataNamespace);
         }
     }
 
@@ -375,8 +364,8 @@ public class HBaseAtlasHook extends AtlasHook {
         Date now = new Date(System.currentTimeMillis());
 
         nameSpace.setAttribute(ATTR_NAME, nameSpaceName);
-        nameSpace.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, getNameSpaceQualifiedName(clusterName, nameSpaceName));
-        nameSpace.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
+        nameSpace.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, getNameSpaceQualifiedName(getMetadataNamespace(), nameSpaceName));
+        nameSpace.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getMetadataNamespace());
         nameSpace.setAttribute(ATTR_DESCRIPTION, nameSpaceName);
         nameSpace.setAttribute(ATTR_PARAMETERS, hbaseOperationContext.getHbaseConf());
         nameSpace.setAttribute(ATTR_OWNER, hbaseOperationContext.getOwner());
@@ -393,7 +382,7 @@ public class HBaseAtlasHook extends AtlasHook {
         AtlasEntity table         = new AtlasEntity(HBaseDataTypes.HBASE_TABLE.getName());
         String      tableName     = getTableName(hbaseOperationContext);
         String      nameSpaceName = (String) nameSpace.getAttribute(ATTR_NAME);
-        String      tableQName    = getTableQualifiedName(clusterName, nameSpaceName, tableName);
+        String      tableQName    = getTableQualifiedName(getMetadataNamespace(), nameSpaceName, tableName);
         OPERATION   operation     = hbaseOperationContext.getOperation();
         Date        now           = new Date(System.currentTimeMillis());
 
@@ -455,7 +444,7 @@ public class HBaseAtlasHook extends AtlasHook {
         String      columnFamilyName  = columnFamilyDescriptor.getNameAsString();
         String      tableName         = (String) table.getAttribute(ATTR_NAME);
         String      nameSpaceName     = (String) nameSpace.getAttribute(ATTR_NAME);
-        String      columnFamilyQName = getColumnFamilyQualifiedName(clusterName, nameSpaceName, tableName, columnFamilyName);
+        String      columnFamilyQName = getColumnFamilyQualifiedName(getMetadataNamespace(), nameSpaceName, tableName, columnFamilyName);
         Date        now               = new Date(System.currentTimeMillis());
 
         columnFamily.setAttribute(ATTR_NAME, columnFamilyName);
diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java
index 1765c18..4a4b4d9 100644
--- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java
+++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java
@@ -65,21 +65,22 @@ import java.util.regex.Pattern;
 public class HBaseBridge {
     private static final Logger LOG = LoggerFactory.getLogger(HBaseBridge.class);
 
-    private static final int     EXIT_CODE_SUCCESS     = 0;
-    private static final int     EXIT_CODE_FAILED      = 1;
-    private static final String  ATLAS_ENDPOINT        = "atlas.rest.address";
-    private static final String  DEFAULT_ATLAS_URL     = "http://localhost:21000/";
-    private static final String  HBASE_CLUSTER_NAME    = "atlas.cluster.name";
-    private static final String  DEFAULT_CLUSTER_NAME  = "primary";
-    private static final String  QUALIFIED_NAME        = "qualifiedName";
-    private static final String  NAME                  = "name";
-    private static final String  URI                   = "uri";
-    private static final String  OWNER                 = "owner";
-    private static final String  DESCRIPTION_ATTR      = "description";
-    private static final String  CLUSTERNAME           = "clusterName";
-    private static final String  NAMESPACE             = "namespace";
-    private static final String  TABLE                 = "table";
-    private static final String  COLUMN_FAMILIES       = "column_families";
+    private static final int     EXIT_CODE_SUCCESS          = 0;
+    private static final int     EXIT_CODE_FAILED           = 1;
+    private static final String  ATLAS_ENDPOINT             = "atlas.rest.address";
+    private static final String  DEFAULT_ATLAS_URL          = "http://localhost:21000/";
+    private static final String  CLUSTER_NAME_KEY           = "atlas.cluster.name";
+    private static final String  DEFAULT_CLUSTER_NAME       = "primary";
+    private static final String  HBASE_METADATA_NAMESPACE   = "atlas.metadata.namespace";
+    private static final String  QUALIFIED_NAME             = "qualifiedName";
+    private static final String  NAME                       = "name";
+    private static final String  URI                        = "uri";
+    private static final String  OWNER                      = "owner";
+    private static final String  DESCRIPTION_ATTR           = "description";
+    private static final String  CLUSTERNAME                = "clusterName";
+    private static final String  NAMESPACE                  = "namespace";
+    private static final String  TABLE                      = "table";
+    private static final String  COLUMN_FAMILIES            = "column_families";
 
     // table metadata
     private static final String ATTR_TABLE_MAX_FILESIZE              = "maxFileSize";
@@ -115,9 +116,9 @@ public class HBaseBridge {
     private static final String HBASE_TABLE_QUALIFIED_NAME_FORMAT         = "%s:%s@%s";
     private static final String HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT = "%s:%s.%s@%s";
 
-    private final String         clusterName;
-    private final AtlasClientV2  atlasClientV2;
-    private final Admin          hbaseAdmin;
+    private final String        metadataNamespace;
+    private final AtlasClientV2 atlasClientV2;
+    private final Admin         hbaseAdmin;
 
 
     public static void main(String[] args) {
@@ -204,8 +205,8 @@ public class HBaseBridge {
     }
 
     public HBaseBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2) throws Exception {
-        this.atlasClientV2 = atlasClientV2;
-        this.clusterName   = atlasConf.getString(HBASE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        this.atlasClientV2     = atlasClientV2;
+        this.metadataNamespace = getMetadataNamespace(atlasConf);
 
         org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
 
@@ -220,6 +221,14 @@ public class HBaseBridge {
         hbaseAdmin = conn.getAdmin();
     }
 
+    private String getMetadataNamespace(Configuration config) {
+        return config.getString(HBASE_METADATA_NAMESPACE, getClusterName(config));
+    }
+
+    private String getClusterName(Configuration config) {
+        return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
+    }
+
     private boolean importHBaseEntities(String namespaceToImport, String tableToImport) throws Exception {
         boolean ret = false;
 
@@ -367,7 +376,7 @@ public class HBaseBridge {
 
     protected AtlasEntityWithExtInfo createOrUpdateNameSpace(NamespaceDescriptor namespaceDescriptor) throws Exception {
         String                 nsName          = namespaceDescriptor.getName();
-        String                 nsQualifiedName = getNameSpaceQualifiedName(clusterName, nsName);
+        String                 nsQualifiedName = getNameSpaceQualifiedName(metadataNamespace, nsName);
         AtlasEntityWithExtInfo nsEntity        = findNameSpaceEntityInAtlas(nsQualifiedName);
 
         if (nsEntity == null) {
@@ -390,7 +399,7 @@ public class HBaseBridge {
 
     protected  AtlasEntityWithExtInfo  createOrUpdateTable(String nameSpace, String tableName, AtlasEntity nameSapceEntity, TableDescriptor htd, ColumnFamilyDescriptor[] hcdts) throws Exception {
         String                 owner            = htd.getOwnerString();
-        String                 tblQualifiedName = getTableQualifiedName(clusterName, nameSpace, tableName);
+        String                 tblQualifiedName = getTableQualifiedName(metadataNamespace, nameSpace, tableName);
         AtlasEntityWithExtInfo ret              = findTableEntityInAtlas(tblQualifiedName);
 
         if (ret == null) {
@@ -436,7 +445,7 @@ public class HBaseBridge {
 
             for (ColumnFamilyDescriptor columnFamilyDescriptor : hcdts) {
                 String                 cfName          = columnFamilyDescriptor.getNameAsString();
-                String                 cfQualifiedName = getColumnFamilyQualifiedName(clusterName, nameSpace, tableName, cfName);
+                String                 cfQualifiedName = getColumnFamilyQualifiedName(metadataNamespace, nameSpace, tableName, cfName);
                 AtlasEntityWithExtInfo cfEntity        = findColumnFamiltyEntityInAtlas(cfQualifiedName);
 
                 if (cfEntity == null) {
@@ -516,10 +525,10 @@ public class HBaseBridge {
             ret = nsEtity;
         }
 
-        String qualifiedName = getNameSpaceQualifiedName(clusterName, nameSpace);
+        String qualifiedName = getNameSpaceQualifiedName(metadataNamespace, nameSpace);
 
         ret.setAttribute(QUALIFIED_NAME, qualifiedName);
-        ret.setAttribute(CLUSTERNAME, clusterName);
+        ret.setAttribute(CLUSTERNAME, metadataNamespace);
         ret.setAttribute(NAME, nameSpace);
         ret.setAttribute(DESCRIPTION_ATTR, nameSpace);
 
@@ -535,10 +544,10 @@ public class HBaseBridge {
             ret = atlasEntity;
         }
 
-        String tableQualifiedName = getTableQualifiedName(clusterName, nameSpace, tableName);
+        String tableQualifiedName = getTableQualifiedName(metadataNamespace, nameSpace, tableName);
 
         ret.setAttribute(QUALIFIED_NAME, tableQualifiedName);
-        ret.setAttribute(CLUSTERNAME, clusterName);
+        ret.setAttribute(CLUSTERNAME, metadataNamespace);
         ret.setAttribute(NAMESPACE, AtlasTypeUtil.getAtlasObjectId(nameSpaceEntity));
         ret.setAttribute(NAME, tableName);
         ret.setAttribute(DESCRIPTION_ATTR, tableName);
@@ -564,10 +573,10 @@ public class HBaseBridge {
         }
 
         String cfName          = hcdt.getNameAsString();
-        String cfQualifiedName = getColumnFamilyQualifiedName(clusterName, nameSpace, tableName, cfName);
+        String cfQualifiedName = getColumnFamilyQualifiedName(metadataNamespace, nameSpace, tableName, cfName);
 
         ret.setAttribute(QUALIFIED_NAME, cfQualifiedName);
-        ret.setAttribute(CLUSTERNAME, clusterName);
+        ret.setAttribute(CLUSTERNAME, metadataNamespace);
         ret.setAttribute(TABLE, tableId);
         ret.setAttribute(NAME, cfName);
         ret.setAttribute(DESCRIPTION_ATTR, cfName);
@@ -637,37 +646,37 @@ public class HBaseBridge {
 
     /**
      * Construct the qualified name used to uniquely identify a ColumnFamily instance in Atlas.
-     * @param clusterName Name of the cluster to which the Hbase component belongs
+     * @param metadataNamespace Metadata namespace of the cluster to which the Hbase component belongs
      * @param nameSpace Name of the Hbase database to which the Table belongs
      * @param tableName Name of the Hbase table
      * @param columnFamily Name of the ColumnFamily
      * @return Unique qualified name to identify the Table instance in Atlas.
      */
-    private static String getColumnFamilyQualifiedName(String clusterName, String nameSpace, String tableName, String columnFamily) {
+    private static String getColumnFamilyQualifiedName(String metadataNamespace, String nameSpace, String tableName, String columnFamily) {
         tableName = stripNameSpace(tableName.toLowerCase());
-        return String.format(HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), tableName, columnFamily.toLowerCase(), clusterName);
+        return String.format(HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), tableName, columnFamily.toLowerCase(), metadataNamespace);
     }
 
     /**
      * Construct the qualified name used to uniquely identify a Table instance in Atlas.
-     * @param clusterName Name of the cluster to which the Hbase component belongs
+     * @param metadataNamespace Metadata namespace of the cluster to which the Hbase component belongs
      * @param nameSpace Name of the Hbase database to which the Table belongs
      * @param tableName Name of the Hbase table
      * @return Unique qualified name to identify the Table instance in Atlas.
      */
-    private static String getTableQualifiedName(String clusterName, String nameSpace, String tableName) {
+    private static String getTableQualifiedName(String metadataNamespace, String nameSpace, String tableName) {
         tableName = stripNameSpace(tableName.toLowerCase());
-        return String.format(HBASE_TABLE_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), tableName, clusterName);
+        return String.format(HBASE_TABLE_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), tableName, metadataNamespace);
     }
 
     /**
      * Construct the qualified name used to uniquely identify a Hbase NameSpace instance in Atlas.
-     * @param clusterName Name of the cluster to which the Hbase component belongs
+     * @param metadataNamespace Metadata namespace of the cluster to which the Hbase component belongs
      * @param nameSpace Name of the NameSpace
      * @return Unique qualified name to identify the HBase NameSpace instance in Atlas.
      */
-    private static String getNameSpaceQualifiedName(String clusterName, String nameSpace) {
-        return String.format(HBASE_NAMESPACE_QUALIFIED_NAME, nameSpace.toLowerCase(), clusterName);
+    private static String getNameSpaceQualifiedName(String metadataNamespace, String nameSpace) {
+        return String.format(HBASE_NAMESPACE_QUALIFIED_NAME, nameSpace.toLowerCase(), metadataNamespace);
     }
 
     private static String stripNameSpace(String tableName){
diff --git a/addons/hive-bridge/src/bin/import-hive.sh b/addons/hive-bridge/src/bin/import-hive.sh
index 38ec3be..b1660ea 100755
--- a/addons/hive-bridge/src/bin/import-hive.sh
+++ b/addons/hive-bridge/src/bin/import-hive.sh
@@ -109,7 +109,7 @@ else
     exit 1
 fi
 
-CP="${ATLASCPPATH}:${HIVE_CP}:${HADOOP_CP}"
+CP="${HIVE_CP}:${HADOOP_CP}:${ATLASCPPATH}"
 
 # If running in cygwin, convert pathnames and classpath to Windows format.
 if [ "${CYGWIN}" == "true" ]
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 5f8f846..049112b 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -83,7 +83,8 @@ public class HiveMetaStoreBridge {
     private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridge.class);
 
     public static final String CONF_PREFIX                     = "atlas.hook.hive.";
-    public static final String HIVE_CLUSTER_NAME               = "atlas.cluster.name";
+    public static final String CLUSTER_NAME_KEY                = "atlas.cluster.name";
+    public static final String HIVE_METADATA_NAMESPACE         = "atlas.metadata.namespace";
     public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
     public static final String DEFAULT_CLUSTER_NAME            = "primary";
     public static final String TEMP_TABLE_PREFIX               = "_temp-";
@@ -95,10 +96,10 @@ public class HiveMetaStoreBridge {
     private static final int    EXIT_CODE_FAILED  = 1;
     private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
 
-    private final String                  clusterName;
-    private final Hive                    hiveClient;
-    private final AtlasClientV2           atlasClientV2;
-    private final boolean                 convertHdfsPathToLowerCase;
+    private final String        metadataNamespace;
+    private final Hive          hiveClient;
+    private final AtlasClientV2 atlasClientV2;
+    private final boolean       convertHdfsPathToLowerCase;
 
 
     public static void main(String[] args) {
@@ -209,7 +210,10 @@ public class HiveMetaStoreBridge {
      * @param hiveConf {@link HiveConf} for Hive component in the cluster
      */
     public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf, AtlasClientV2 atlasClientV2) throws Exception {
-        this(atlasProperties.getString(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), atlasClientV2, atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false));
+        this.metadataNamespace          = getMetadataNamespace(atlasProperties);
+        this.hiveClient                 = Hive.get(hiveConf);
+        this.atlasClientV2              = atlasClientV2;
+        this.convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
     }
 
     /**
@@ -220,19 +224,27 @@ public class HiveMetaStoreBridge {
         this(atlasProperties, hiveConf, null);
     }
 
-    HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClientV2 atlasClientV2) {
-        this(clusterName, hiveClient, atlasClientV2, true);
+    HiveMetaStoreBridge(String metadataNamespace, Hive hiveClient, AtlasClientV2 atlasClientV2) {
+        this(metadataNamespace, hiveClient, atlasClientV2, true);
     }
 
-    HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClientV2 atlasClientV2, boolean convertHdfsPathToLowerCase) {
-        this.clusterName                = clusterName;
+    HiveMetaStoreBridge(String metadataNamespace, Hive hiveClient, AtlasClientV2 atlasClientV2, boolean convertHdfsPathToLowerCase) {
+        this.metadataNamespace          = metadataNamespace;
         this.hiveClient                 = hiveClient;
         this.atlasClientV2              = atlasClientV2;
         this.convertHdfsPathToLowerCase = convertHdfsPathToLowerCase;
     }
 
-    public String getClusterName() {
-        return clusterName;
+    public String getMetadataNamespace(Configuration config) {
+        return config.getString(HIVE_METADATA_NAMESPACE, getClusterName(config));
+    }
+
+    private String getClusterName(Configuration config) {
+        return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
+    }
+
+    public String getMetadataNamespace() {
+        return metadataNamespace;
     }
 
     public Hive getHiveClient() {
@@ -337,7 +349,7 @@ public class HiveMetaStoreBridge {
             AtlasEntityWithExtInfo tableEntity = registerTable(dbEntity, table);
 
             if (table.getTableType() == TableType.EXTERNAL_TABLE) {
-                String                 processQualifiedName = getTableProcessQualifiedName(clusterName, table);
+                String                 processQualifiedName = getTableProcessQualifiedName(metadataNamespace, table);
                 AtlasEntityWithExtInfo processEntity        = findProcessEntity(processQualifiedName);
 
                 if (processEntity == null) {
@@ -350,7 +362,7 @@ public class HiveMetaStoreBridge {
 
                     processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, processQualifiedName);
                     processInst.setAttribute(ATTRIBUTE_NAME, query);
-                    processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName);
+                    processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
                     processInst.setAttribute(ATTRIBUTE_INPUTS, Collections.singletonList(BaseHiveEvent.getObjectId(pathInst)));
                     processInst.setAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(BaseHiveEvent.getObjectId(tableInst)));
                     processInst.setAttribute(ATTRIBUTE_USER_NAME, table.getOwner());
@@ -396,7 +408,7 @@ public class HiveMetaStoreBridge {
         Database               db  = hiveClient.getDatabase(databaseName);
 
         if (db != null) {
-            ret = findDatabase(clusterName, databaseName);
+            ret = findDatabase(metadataNamespace, databaseName);
 
             if (ret == null) {
                 ret = registerInstance(new AtlasEntityWithExtInfo(toDbEntity(db)));
@@ -542,12 +554,12 @@ public class HiveMetaStoreBridge {
 
         String dbName = hiveDB.getName().toLowerCase();
 
-        dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getDBQualifiedName(clusterName, dbName));
+        dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getDBQualifiedName(metadataNamespace, dbName));
         dbEntity.setAttribute(ATTRIBUTE_NAME, dbName);
         dbEntity.setAttribute(ATTRIBUTE_DESCRIPTION, hiveDB.getDescription());
         dbEntity.setAttribute(ATTRIBUTE_OWNER, hiveDB.getOwnerName());
 
-        dbEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName);
+        dbEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
         dbEntity.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(hiveDB.getLocationUri()));
         dbEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveDB.getParameters());
 
@@ -574,7 +586,7 @@ public class HiveMetaStoreBridge {
         }
 
         AtlasEntity tableEntity        = table.getEntity();
-        String      tableQualifiedName = getTableQualifiedName(clusterName, hiveTable);
+        String      tableQualifiedName = getTableQualifiedName(metadataNamespace, hiveTable);
         long        createTime         = BaseHiveEvent.getTableCreateTime(hiveTable);
         long        lastAccessTime     = hiveTable.getLastAccessTime() > 0 ? hiveTable.getLastAccessTime() : createTime;
 
@@ -705,7 +717,7 @@ public class HiveMetaStoreBridge {
         Path        path          = new Path(pathUri);
 
         ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString());
-        ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName);
+        ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
 
         if (StringUtils.isNotEmpty(nameServiceID)) {
             // Name service resolution is successful, now get updated HDFS path where the host port info is replaced by resolved name service
@@ -717,7 +729,7 @@ public class HiveMetaStoreBridge {
         } else {
             ret.setAttribute(ATTRIBUTE_PATH, pathUri);
 
-            // Only append clusterName for the HDFS path
+            // Only append metadataNamespace for the HDFS path
             if (pathUri.startsWith(HdfsNameServiceResolver.HDFS_SCHEME)) {
                 ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHdfsPathQualifiedName(pathUri));
             } else {
@@ -731,18 +743,18 @@ public class HiveMetaStoreBridge {
     /**
      * Gets the atlas entity for the database
      * @param databaseName  database Name
-     * @param clusterName    cluster name
+     * @param metadataNamespace    cluster name
      * @return AtlasEntity for database if exists, else null
      * @throws Exception
      */
-    private AtlasEntityWithExtInfo findDatabase(String clusterName, String databaseName) throws Exception {
+    private AtlasEntityWithExtInfo findDatabase(String metadataNamespace, String databaseName) throws Exception {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Searching Atlas for database {}", databaseName);
         }
 
         String typeName = HiveDataTypes.HIVE_DB.getName();
 
-        return findEntity(typeName, getDBQualifiedName(clusterName, databaseName));
+        return findEntity(typeName, getDBQualifiedName(metadataNamespace, databaseName));
     }
 
     /**
@@ -758,7 +770,7 @@ public class HiveMetaStoreBridge {
         }
 
         String typeName         = HiveDataTypes.HIVE_TABLE.getName();
-        String tblQualifiedName = getTableQualifiedName(getClusterName(), hiveTable.getDbName(), hiveTable.getTableName());
+        String tblQualifiedName = getTableQualifiedName(getMetadataNamespace(), hiveTable.getDbName(), hiveTable.getTableName());
 
         return findEntity(typeName, tblQualifiedName);
     }
@@ -822,37 +834,37 @@ public class HiveMetaStoreBridge {
 
     /**
      * Construct the qualified name used to uniquely identify a Table instance in Atlas.
-     * @param clusterName Name of the cluster to which the Hive component belongs
+     * @param metadataNamespace Metadata namespace of the cluster to which the Hive component belongs
      * @param table hive table for which the qualified name is needed
      * @return Unique qualified name to identify the Table instance in Atlas.
      */
-    private static String getTableQualifiedName(String clusterName, Table table) {
-        return getTableQualifiedName(clusterName, table.getDbName(), table.getTableName(), table.isTemporary());
+    private static String getTableQualifiedName(String metadataNamespace, Table table) {
+        return getTableQualifiedName(metadataNamespace, table.getDbName(), table.getTableName(), table.isTemporary());
     }
 
     private String getHdfsPathQualifiedName(String hdfsPath) {
-        return String.format("%s@%s", hdfsPath, clusterName);
+        return String.format("%s@%s", hdfsPath, metadataNamespace);
     }
 
     /**
      * Construct the qualified name used to uniquely identify a Database instance in Atlas.
-     * @param clusterName Name of the cluster to which the Hive component belongs
+     * @param metadataNamespace Name of the cluster to which the Hive component belongs
      * @param dbName Name of the Hive database
      * @return Unique qualified name to identify the Database instance in Atlas.
      */
-    public static String getDBQualifiedName(String clusterName, String dbName) {
-        return String.format("%s@%s", dbName.toLowerCase(), clusterName);
+    public static String getDBQualifiedName(String metadataNamespace, String dbName) {
+        return String.format("%s@%s", dbName.toLowerCase(), metadataNamespace);
     }
 
     /**
      * Construct the qualified name used to uniquely identify a Table instance in Atlas.
-     * @param clusterName Name of the cluster to which the Hive component belongs
+     * @param metadataNamespace Name of the cluster to which the Hive component belongs
      * @param dbName Name of the Hive database to which the Table belongs
      * @param tableName Name of the Hive table
      * @param isTemporaryTable is this a temporary table
      * @return Unique qualified name to identify the Table instance in Atlas.
      */
-    public static String getTableQualifiedName(String clusterName, String dbName, String tableName, boolean isTemporaryTable) {
+    public static String getTableQualifiedName(String metadataNamespace, String dbName, String tableName, boolean isTemporaryTable) {
         String tableTempName = tableName;
 
         if (isTemporaryTable) {
@@ -863,11 +875,11 @@ public class HiveMetaStoreBridge {
             }
         }
 
-        return String.format("%s.%s@%s", dbName.toLowerCase(), tableTempName.toLowerCase(), clusterName);
+        return String.format("%s.%s@%s", dbName.toLowerCase(), tableTempName.toLowerCase(), metadataNamespace);
     }
 
-    public static String getTableProcessQualifiedName(String clusterName, Table table) {
-        String tableQualifiedName = getTableQualifiedName(clusterName, table);
+    public static String getTableProcessQualifiedName(String metadataNamespace, Table table) {
+        String tableQualifiedName = getTableQualifiedName(metadataNamespace, table);
         long   createdTime        = getTableCreatedTime(table);
 
         return tableQualifiedName + SEP + createdTime;
@@ -876,24 +888,24 @@ public class HiveMetaStoreBridge {
 
     /**
      * Construct the qualified name used to uniquely identify a Table instance in Atlas.
-     * @param clusterName Name of the cluster to which the Hive component belongs
+     * @param metadataNamespace Metadata namespace of the cluster to which the Hive component belongs
      * @param dbName Name of the Hive database to which the Table belongs
      * @param tableName Name of the Hive table
      * @return Unique qualified name to identify the Table instance in Atlas.
      */
-    public static String getTableQualifiedName(String clusterName, String dbName, String tableName) {
-        return getTableQualifiedName(clusterName, dbName, tableName, false);
+    public static String getTableQualifiedName(String metadataNamespace, String dbName, String tableName) {
+        return getTableQualifiedName(metadataNamespace, dbName, tableName, false);
     }
     public static String getStorageDescQFName(String tableQualifiedName) {
         return tableQualifiedName + "_storage";
     }
 
     public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) {
-        final String[] parts       = tableQualifiedName.split("@");
-        final String   tableName   = parts[0];
-        final String   clusterName = parts[1];
+        final String[] parts             = tableQualifiedName.split("@");
+        final String   tableName         = parts[0];
+        final String   metadataNamespace = parts[1];
 
-        return String.format("%s.%s@%s", tableName, colName.toLowerCase(), clusterName);
+        return String.format("%s.%s@%s", tableName, colName.toLowerCase(), metadataNamespace);
     }
 
     public static long getTableCreatedTime(Table table) {
@@ -945,4 +957,4 @@ public class HiveMetaStoreBridge {
         }
         return ret;
     }
-}
+}
\ No newline at end of file
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
index 76d6fe6..0eee7c1 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
@@ -38,12 +38,12 @@ import static org.apache.atlas.hive.hook.events.BaseHiveEvent.toTable;
 
 
 public class AtlasHiveHookContext {
-    public static final char   QNAME_SEP_CLUSTER_NAME = '@';
-    public static final char   QNAME_SEP_ENTITY_NAME  = '.';
-    public static final char   QNAME_SEP_PROCESS      = ':';
-    public static final String TEMP_TABLE_PREFIX      = "_temp-";
-    public static final String CREATE_OPERATION       = "CREATE";
-    public static final String ALTER_OPERATION        = "ALTER";
+    public static final char   QNAME_SEP_METADATA_NAMESPACE = '@';
+    public static final char   QNAME_SEP_ENTITY_NAME        = '.';
+    public static final char   QNAME_SEP_PROCESS            = ':';
+    public static final String TEMP_TABLE_PREFIX            = "_temp-";
+    public static final String CREATE_OPERATION             = "CREATE";
+    public static final String ALTER_OPERATION              = "ALTER";
 
     private final HiveHook                 hook;
     private final HiveOperation            hiveOperation;
@@ -157,8 +157,8 @@ public class AtlasHiveHookContext {
     public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); }
 
 
-    public String getClusterName() {
-        return hook.getClusterName();
+    public String getMetadataNamespace() {
+        return hook.getMetadataNamespace();
     }
 
     public String getHostName() { return hook.getHostName(); }
@@ -192,7 +192,7 @@ public class AtlasHiveHookContext {
     }
 
     public String getQualifiedName(Database db) {
-        return (db.getName() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+        return (db.getName() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace();
     }
 
     public String getQualifiedName(Table table) {
@@ -206,7 +206,7 @@ public class AtlasHiveHookContext {
             }
         }
 
-        return (table.getDbName() + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+        return (table.getDbName() + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace();
     }
 
     public boolean isKnownDatabase(String dbQualifiedName) {
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index ffa56ce..5b1f61b 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -55,7 +55,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     public enum PreprocessAction { NONE, IGNORE, PRUNE }
 
     public static final String CONF_PREFIX                         = "atlas.hook.hive.";
-    public static final String CONF_CLUSTER_NAME                   = "atlas.cluster.name";
     public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE     = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
     public static final String HOOK_NAME_CACHE_ENABLED             = CONF_PREFIX + "name.cache.enabled";
     public static final String HOOK_NAME_CACHE_DATABASE_COUNT      = CONF_PREFIX + "name.cache.database.count";
@@ -66,13 +65,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     public static final String HOOK_HIVE_TABLE_IGNORE_PATTERN                            = CONF_PREFIX + "hive_table.ignore.pattern";
     public static final String HOOK_HIVE_TABLE_PRUNE_PATTERN                             = CONF_PREFIX + "hive_table.prune.pattern";
     public static final String HOOK_HIVE_TABLE_CACHE_SIZE                                = CONF_PREFIX + "hive_table.cache.size";
-
-    public static final String DEFAULT_CLUSTER_NAME = "primary";
     public static final String DEFAULT_HOST_NAME = "localhost";
 
     private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>();
 
-    private static final String  clusterName;
     private static final boolean convertHdfsPathToLowerCase;
     private static final boolean nameCacheEnabled;
     private static final int     nameCacheDatabaseMaxCount;
@@ -96,7 +92,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             OPERATION_MAP.put(hiveOperation.getOperationName(), hiveOperation);
         }
 
-        clusterName                     = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
         convertHdfsPathToLowerCase      = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
         nameCacheEnabled                = atlasProperties.getBoolean(HOOK_NAME_CACHE_ENABLED, true);
         nameCacheDatabaseMaxCount       = atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000);
@@ -253,10 +248,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         }
     }
 
-    public String getClusterName() {
-        return clusterName;
-    }
-
     public boolean isConvertHdfsPathToLowerCase() {
         return convertHdfsPathToLowerCase;
     }
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index 0bf3ce2..5ce4c77 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -65,7 +65,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_CLUSTER_NAME;
+import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_METADATA_NAMESPACE;
 import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_ENTITY_NAME;
 import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_PROCESS;
 
@@ -353,7 +353,7 @@ public abstract class BaseHiveEvent {
             ret.setAttribute(ATTRIBUTE_DESCRIPTION, db.getDescription());
             ret.setAttribute(ATTRIBUTE_OWNER, db.getOwnerName());
 
-            ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName());
+            ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getMetadataNamespace());
             ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(db.getLocationUri()));
             ret.setAttribute(ATTRIBUTE_PARAMETERS, db.getParameters());
 
@@ -599,7 +599,8 @@ public abstract class BaseHiveEvent {
 
     protected AtlasEntity getPathEntity(Path path, AtlasEntityExtInfo extInfo) {
         AtlasEntity ret;
-        String strPath = path.toString();
+        String      strPath           = path.toString();
+        String      metadataNamespace = getMetadataNamespace();
 
         if (strPath.startsWith(HDFS_PATH_PREFIX) && context.isConvertHdfsPathToLowerCase()) {
             strPath = strPath.toLowerCase();
@@ -607,8 +608,8 @@ public abstract class BaseHiveEvent {
 
         if (isS3Path(strPath)) {
             String      bucketName          = path.toUri().getAuthority();
-            String      bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
-            String      pathQualifiedName   = (strPath + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+            String      bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
+            String      pathQualifiedName   = (strPath + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
             AtlasEntity bucketEntity        = context.getEntity(bucketQualifiedName);
 
             ret = context.getEntity(pathQualifiedName);
@@ -657,7 +658,7 @@ public abstract class BaseHiveEvent {
                 ret.setAttribute(ATTRIBUTE_PATH, attrPath);
                 ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
                 ret.setAttribute(ATTRIBUTE_NAME, name);
-                ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName());
+                ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
 
                 context.putEntity(pathQualifiedName, ret);
             }
@@ -754,8 +755,8 @@ public abstract class BaseHiveEvent {
         return hiveDDL;
     }
 
-    protected String getClusterName() {
-        return context.getClusterName();
+    protected String getMetadataNamespace() {
+        return context.getMetadataNamespace();
     }
 
     protected Database getDatabases(String dbName) throws Exception {
@@ -873,7 +874,7 @@ public abstract class BaseHiveEvent {
     protected String getQualifiedName(Table table, FieldSchema column) {
         String tblQualifiedName = getQualifiedName(table);
 
-        int sepPos = tblQualifiedName.lastIndexOf(QNAME_SEP_CLUSTER_NAME);
+        int sepPos = tblQualifiedName.lastIndexOf(QNAME_SEP_METADATA_NAMESPACE);
 
         if (sepPos == -1) {
             return tblQualifiedName + QNAME_SEP_ENTITY_NAME + column.getName().toLowerCase();
@@ -891,19 +892,20 @@ public abstract class BaseHiveEvent {
     }
 
     protected String getQualifiedName(BaseColumnInfo column) {
-        String dbName    = column.getTabAlias().getTable().getDbName();
-        String tableName = column.getTabAlias().getTable().getTableName();
-        String colName   = column.getColumn() != null ? column.getColumn().getName() : null;
+        String dbName            = column.getTabAlias().getTable().getDbName();
+        String tableName         = column.getTabAlias().getTable().getTableName();
+        String colName           = column.getColumn() != null ? column.getColumn().getName() : null;
+        String metadataNamespace = getMetadataNamespace();
 
         if (colName == null) {
-            return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+            return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
         } else {
-            return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+            return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
         }
     }
 
     protected String getQualifiedName(String dbName, String tableName, String colName) {
-        return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+        return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace();
     }
 
     protected String getQualifiedName(URI location) {
@@ -921,14 +923,14 @@ public abstract class BaseHiveEvent {
 
     protected String getQualifiedName(String path) {
         if (path.startsWith(HdfsNameServiceResolver.HDFS_SCHEME)) {
-            return path + QNAME_SEP_CLUSTER_NAME + getClusterName();
+            return path + QNAME_SEP_METADATA_NAMESPACE + getMetadataNamespace();
         }
 
         return path.toLowerCase();
     }
 
     protected String getColumnQualifiedName(String tblQualifiedName, String columnName) {
-        int sepPos = tblQualifiedName.lastIndexOf(QNAME_SEP_CLUSTER_NAME);
+        int sepPos = tblQualifiedName.lastIndexOf(QNAME_SEP_METADATA_NAMESPACE);
 
         if (sepPos == -1) {
             return tblQualifiedName + QNAME_SEP_ENTITY_NAME + columnName.toLowerCase();
@@ -983,26 +985,27 @@ public abstract class BaseHiveEvent {
     }
 
     protected AtlasEntity toReferencedHBaseTable(Table table, AtlasEntitiesWithExtInfo entities) {
-        AtlasEntity    ret            = null;
-        HBaseTableInfo hBaseTableInfo = new HBaseTableInfo(table);
-        String         hbaseNameSpace = hBaseTableInfo.getHbaseNameSpace();
-        String         hbaseTableName = hBaseTableInfo.getHbaseTableName();
+        AtlasEntity    ret               = null;
+        HBaseTableInfo hBaseTableInfo    = new HBaseTableInfo(table);
+        String         hbaseNameSpace    = hBaseTableInfo.getHbaseNameSpace();
+        String         hbaseTableName    = hBaseTableInfo.getHbaseTableName();
+        String         metadataNamespace = getMetadataNamespace();
 
         if (hbaseTableName != null) {
             AtlasEntity nsEntity = new AtlasEntity(HBASE_TYPE_NAMESPACE);
             nsEntity.setAttribute(ATTRIBUTE_NAME, hbaseNameSpace);
-            nsEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName());
-            nsEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseNameSpaceQualifiedName(getClusterName(), hbaseNameSpace));
+            nsEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
+            nsEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseNameSpaceQualifiedName(metadataNamespace, hbaseNameSpace));
 
             ret = new AtlasEntity(HBASE_TYPE_TABLE);
 
             ret.setAttribute(ATTRIBUTE_NAME, hbaseTableName);
             ret.setAttribute(ATTRIBUTE_URI, hbaseTableName);
 
-            AtlasRelatedObjectId objIdRelatedObject =     new AtlasRelatedObjectId(getObjectId(nsEntity), RELATIONSHIP_HBASE_TABLE_NAMESPACE);
+            AtlasRelatedObjectId objIdRelatedObject = new AtlasRelatedObjectId(getObjectId(nsEntity), RELATIONSHIP_HBASE_TABLE_NAMESPACE);
 
             ret.setRelationshipAttribute(ATTRIBUTE_NAMESPACE, objIdRelatedObject);
-            ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseTableQualifiedName(getClusterName(), hbaseNameSpace, hbaseTableName));
+            ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseTableQualifiedName(metadataNamespace, hbaseNameSpace, hbaseTableName));
 
             entities.addReferredEntity(nsEntity);
             entities.addEntity(ret);
@@ -1024,12 +1027,12 @@ public abstract class BaseHiveEvent {
         return ret;
     }
 
-    private static String getHBaseTableQualifiedName(String clusterName, String nameSpace, String tableName) {
-        return String.format("%s:%s@%s", nameSpace.toLowerCase(), tableName.toLowerCase(), clusterName);
+    private static String getHBaseTableQualifiedName(String metadataNamespace, String nameSpace, String tableName) {
+        return String.format("%s:%s@%s", nameSpace.toLowerCase(), tableName.toLowerCase(), metadataNamespace);
     }
 
-    private static String getHBaseNameSpaceQualifiedName(String clusterName, String nameSpace) {
-        return String.format("%s@%s", nameSpace.toLowerCase(), clusterName);
+    private static String getHBaseNameSpaceQualifiedName(String metadataNamespace, String nameSpace) {
+        return String.format("%s@%s", nameSpace.toLowerCase(), metadataNamespace);
     }
 
     private boolean ignoreHDFSPathsinProcessQualifiedName() {
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java
index cbee7bf..0736153 100644
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java
@@ -519,7 +519,7 @@ public class HiveITBase {
                 Table outTable = entity.getTable();
                 //refresh table
                 outTable = dgiBridge.getHiveClient().getTable(outTable.getDbName(), outTable.getTableName());
-                return HiveMetaStoreBridge.getTableProcessQualifiedName(dgiBridge.getClusterName(), outTable);
+                return HiveMetaStoreBridge.getTableProcessQualifiedName(dgiBridge.getMetadataNamespace(), outTable);
             }
         }
 
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
index d55aa53..4403aaf 100644
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
@@ -53,10 +53,9 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class HiveMetaStoreBridgeTest {
-
-    private static final String TEST_DB_NAME = "default";
-    public static final String CLUSTER_NAME = "primary";
-    public static final String TEST_TABLE_NAME = "test_table";
+    private static final String TEST_DB_NAME       = "default";
+    public  static final String METADATA_NAMESPACE = "primary";
+    public  static final String TEST_TABLE_NAME    = "test_table";
 
     @Mock
     private Hive hiveClient;
@@ -90,13 +89,13 @@ public class HiveMetaStoreBridgeTest {
         when(hiveClient.getDatabase(TEST_DB_NAME)).thenReturn(db);
         when(hiveClient.getAllTables(TEST_DB_NAME)).thenReturn(Arrays.asList(new String[]{}));
 
-        returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME);
+        returnExistingDatabase(TEST_DB_NAME, atlasClientV2, METADATA_NAMESPACE);
 
         when(atlasEntityWithExtInfo.getEntity("72e06b34-9151-4023-aa9d-b82103a50e76"))
                 .thenReturn((new AtlasEntity.AtlasEntityWithExtInfo(
                         getEntity(HiveDataTypes.HIVE_DB.getName(), AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76"))).getEntity());
 
-        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
+        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(METADATA_NAMESPACE, hiveClient, atlasClientV2);
         bridge.importHiveMetadata(null, null, true);
 
         // verify update is called
@@ -109,7 +108,7 @@ public class HiveMetaStoreBridgeTest {
 
         List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
 
-        returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME);
+        returnExistingDatabase(TEST_DB_NAME, atlasClientV2, METADATA_NAMESPACE);
 
         // return existing table
 
@@ -119,7 +118,7 @@ public class HiveMetaStoreBridgeTest {
 
         when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(),
                 Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME))))
+                HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME, TEST_TABLE_NAME))))
                 .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
                         getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
 
@@ -127,7 +126,7 @@ public class HiveMetaStoreBridgeTest {
                 .thenReturn(createTableReference());
 
         Table testTable =  hiveTables.get(0);
-        String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable);
+        String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(METADATA_NAMESPACE, testTable);
 
         when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
             Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
@@ -136,7 +135,7 @@ public class HiveMetaStoreBridgeTest {
                 getEntity(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
 
 
-        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
+        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(METADATA_NAMESPACE, hiveClient, atlasClientV2);
         bridge.importHiveMetadata(null, null, true);
 
         // verify update is called on table
@@ -144,13 +143,13 @@ public class HiveMetaStoreBridgeTest {
 
     }
 
-    private void returnExistingDatabase(String databaseName, AtlasClientV2 atlasClientV2, String clusterName)
+    private void returnExistingDatabase(String databaseName, AtlasClientV2 atlasClientV2, String metadataNamespace)
             throws AtlasServiceException {
             //getEntity(HiveDataTypes.HIVE_DB.getName(), AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76");
 
         when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_DB.getName(),
                 Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                        HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, TEST_DB_NAME))))
+                        HiveMetaStoreBridge.getDBQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME))))
                 .thenReturn((new AtlasEntity.AtlasEntityWithExtInfo(
                         getEntity(HiveDataTypes.HIVE_DB.getName(), AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76"))));
 
@@ -179,16 +178,16 @@ public class HiveMetaStoreBridgeTest {
         List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
         Table hiveTable = hiveTables.get(0);
 
-        returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME);
+        returnExistingDatabase(TEST_DB_NAME, atlasClientV2, METADATA_NAMESPACE);
 
 
         when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(),
                 Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                        HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME))))
+                        HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME, TEST_TABLE_NAME))))
         .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
                         getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
 
-        String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTable);
+        String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(METADATA_NAMESPACE, hiveTable);
 
         when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
                 Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
@@ -206,7 +205,7 @@ public class HiveMetaStoreBridgeTest {
 
         when(hiveClient.getPartitions(hiveTable)).thenReturn(Arrays.asList(new Partition[]{partition}));
 
-        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
+        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(METADATA_NAMESPACE, hiveClient, atlasClientV2);
         try {
             bridge.importHiveMetadata(null, null, true);
         } catch (Exception e) {
@@ -220,12 +219,12 @@ public class HiveMetaStoreBridgeTest {
         final String table2Name = TEST_TABLE_NAME + "_1";
         List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME, table2Name);
 
-        returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME);
+        returnExistingDatabase(TEST_DB_NAME, atlasClientV2, METADATA_NAMESPACE);
         when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout while reading data from hive metastore"));
 
         when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(),
                 Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                        HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME))))
+                        HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME, TEST_TABLE_NAME))))
                 .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
                         getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
 
@@ -233,7 +232,7 @@ public class HiveMetaStoreBridgeTest {
                 .thenReturn(createTableReference());
 
         Table testTable =  hiveTables.get(1);
-        String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable);
+        String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(METADATA_NAMESPACE, testTable);
 
         when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
                 Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
@@ -241,7 +240,7 @@ public class HiveMetaStoreBridgeTest {
                 .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
                         getEntity(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
 
-        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
+        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(METADATA_NAMESPACE, hiveClient, atlasClientV2);
         try {
             bridge.importHiveMetadata(null, null, false);
         } catch (Exception e) {
@@ -255,13 +254,13 @@ public class HiveMetaStoreBridgeTest {
         final String table2Name = TEST_TABLE_NAME + "_1";
         List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME, table2Name);
 
-        returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME);
+        returnExistingDatabase(TEST_DB_NAME, atlasClientV2, METADATA_NAMESPACE);
         when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout while reading data from hive metastore"));
 
 
         when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(),
                 Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME))))
+                HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME, TEST_TABLE_NAME))))
         .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
                 getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
 
@@ -270,7 +269,7 @@ public class HiveMetaStoreBridgeTest {
                 .thenReturn(createTableReference());
 
         Table testTable  = hiveTables.get(1);
-        String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable);
+        String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(METADATA_NAMESPACE, testTable);
 
         when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
                 Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
@@ -278,7 +277,7 @@ public class HiveMetaStoreBridgeTest {
                 .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
                         getEntity(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
 
-        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
+        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(METADATA_NAMESPACE, hiveClient, atlasClientV2);
         try {
             bridge.importHiveMetadata(null, null, true);
             Assert.fail("Table registration is supposed to fail");
diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
index 1305f65..51b2f83 100644
--- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
+++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
@@ -33,9 +33,9 @@ import org.apache.commons.lang.StringUtils;
  * Contain the info related to an linear record from Impala
  */
 public class AtlasImpalaHookContext {
-    public static final char   QNAME_SEP_CLUSTER_NAME = '@';
-    public static final char   QNAME_SEP_ENTITY_NAME  = '.';
-    public static final char   QNAME_SEP_PROCESS      = ':';
+    public static final char QNAME_SEP_METADATA_NAMESPACE = '@';
+    public static final char QNAME_SEP_ENTITY_NAME        = '.';
+    public static final char QNAME_SEP_PROCESS            = ':';
 
     private final ImpalaLineageHook        hook;
     private final ImpalaOperationType      impalaOperation;
@@ -69,8 +69,8 @@ public class AtlasImpalaHookContext {
 
     public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); }
 
-    public String getClusterName() {
-        return hook.getClusterName();
+    public String getMetadataNamespace() {
+        return hook.getMetadataNamespace();
     }
 
     public String getHostName() {
@@ -82,7 +82,7 @@ public class AtlasImpalaHookContext {
     }
 
     public String getQualifiedNameForDb(String dbName) {
-        return (dbName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+        return (dbName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace();
     }
 
     public String getQualifiedNameForTable(String fullTableName) throws IllegalArgumentException {
@@ -100,8 +100,7 @@ public class AtlasImpalaHookContext {
     }
 
     public String getQualifiedNameForTable(String dbName, String tableName) {
-        return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_CLUSTER_NAME).toLowerCase() +
-            getClusterName();
+        return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace();
     }
 
     public String getQualifiedNameForColumn(LineageVertex vertex) {
@@ -179,7 +178,7 @@ public class AtlasImpalaHookContext {
     public String getQualifiedNameForColumn(String dbName, String tableName, String columnName) {
         return
             (dbName + QNAME_SEP_ENTITY_NAME  + tableName + QNAME_SEP_ENTITY_NAME +
-             columnName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+             columnName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace();
     }
 
     public String getUserName() { return lineageQuery.getUser(); }
diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
index b5fdb6d..10ae08f 100644
--- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
+++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
@@ -18,7 +18,6 @@
 
 package org.apache.atlas.impala.hook;
 
-import static org.apache.atlas.AtlasConstants.DEFAULT_CLUSTER_NAME;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import com.google.common.collect.Sets;
@@ -42,20 +41,17 @@ public class ImpalaLineageHook extends AtlasHook {
     public static final String ATLAS_ENDPOINT                      = "atlas.rest.address";
     public static final String REALM_SEPARATOR                     = "@";
     public static final String CONF_PREFIX                         = "atlas.hook.impala.";
-    public static final String CONF_CLUSTER_NAME                   = "atlas.cluster.name";
     public static final String CONF_REALM_NAME                     = "atlas.realm.name";
     public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE     = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
     public static final String DEFAULT_HOST_NAME                   = "localhost";
 
-    private static final String clusterName;
-    private static final String realm;
+    private static final String  realm;
     private static final boolean convertHdfsPathToLowerCase;
-    private static String hostName;
+    private static       String  hostName;
 
     static {
-        clusterName                     = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
-        realm                           = atlasProperties.getString(CONF_REALM_NAME, DEFAULT_CLUSTER_NAME);  // what should default be ??
-        convertHdfsPathToLowerCase      = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
+        realm                      = atlasProperties.getString(CONF_REALM_NAME, DEFAULT_CLUSTER_NAME);  // what should default be ??
+        convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
 
         try {
             hostName = InetAddress.getLocalHost().getHostName();
@@ -143,10 +139,6 @@ public class ImpalaLineageHook extends AtlasHook {
         return UserGroupInformation.getUGIFromSubject(userSubject);
     }
 
-    public String getClusterName() {
-        return clusterName;
-    }
-
     public String getRealm() {
         return realm;
     }
diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
index 4ea484f..5329e8b 100644
--- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
+++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
@@ -340,7 +340,7 @@ public abstract class BaseImpalaEvent {
 
             ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName);
             ret.setAttribute(ATTRIBUTE_NAME, dbName.toLowerCase());
-            ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, context.getClusterName());
+            ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, context.getMetadataNamespace());
 
             context.putEntity(dbQualifiedName, ret);
         }
diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
index f1d0237..600ca5e 100644
--- a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
+++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
@@ -293,7 +293,7 @@ public class ImpalaLineageITBase {
     protected String assertDatabaseIsRegistered(String dbName, AssertPredicate assertPredicate) throws Exception {
         LOG.debug("Searching for database: {}", dbName);
 
-        String dbQualifiedName = dbName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+        String dbQualifiedName = dbName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
             CLUSTER_NAME;
 
         dbQualifiedName = dbQualifiedName.toLowerCase();
@@ -320,7 +320,7 @@ public class ImpalaLineageITBase {
     protected String assertTableIsRegistered(String fullTableName, AssertPredicate assertPredicate, boolean isTemporary) throws Exception {
         LOG.debug("Searching for table {}", fullTableName);
 
-        String tableQualifiedName = (fullTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME).toLowerCase() +
+        String tableQualifiedName = (fullTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() +
             CLUSTER_NAME;
 
         return assertEntityIsRegistered(HIVE_TYPE_TABLE, REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName,
diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
index 8ebb385..25e26e8 100644
--- a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
+++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
@@ -77,7 +77,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
             // the value is from info in IMPALA_3
             String createTime = new Long((long)(1554750072)*1000).toString();
             String processQFName =
-                "db_1.view_1" + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+                "db_1.view_1" + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
                     CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
 
             processQFName = processQFName.toLowerCase();
@@ -140,7 +140,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
             Long afterCreateTime = System.currentTimeMillis() / BaseImpalaEvent.MILLIS_CONVERT_FACTOR;
 
             String processQFNameWithoutTime =
-                dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+                dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
                     CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS;
             processQFNameWithoutTime = processQFNameWithoutTime.toLowerCase();
 
@@ -210,7 +210,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
         // the value is from info in IMPALA_4.
         String createTime = new Long(TABLE_CREATE_TIME*1000).toString();
         String processQFName =
-            dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+            dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
                 CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
 
         processQFName = processQFName.toLowerCase();
@@ -266,7 +266,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
         // the value is from info in IMPALA_4.
         String createTime = new Long(TABLE_CREATE_TIME*1000).toString();
         String processQFName =
-            dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+            dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
                 CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
 
         processQFName = processQFName.toLowerCase();
@@ -322,9 +322,9 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
         // the value is from info in IMPALA_4.
         String createTime1 = new Long(TABLE_CREATE_TIME_SOURCE*1000).toString();
         String createTime2 = new Long(TABLE_CREATE_TIME*1000).toString();
-        String sourceQFName = dbName + "." + sourceTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+        String sourceQFName = dbName + "." + sourceTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
             CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime1;
-        String targetQFName = dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+        String targetQFName = dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
             CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime2;
         String processQFName = "QUERY:" + sourceQFName.toLowerCase() + "->:INSERT:" + targetQFName.toLowerCase();
 
@@ -385,9 +385,9 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
         // the value is from info in IMPALA_4.
         String createTime1 = new Long(TABLE_CREATE_TIME_SOURCE*1000).toString();
         String createTime2 = new Long(TABLE_CREATE_TIME*1000).toString();
-        String sourceQFName = dbName + "." + sourceTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+        String sourceQFName = dbName + "." + sourceTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
             CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime1;
-        String targetQFName = dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+        String targetQFName = dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
             CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime2;
         String processQFName = "QUERY:" + sourceQFName.toLowerCase() + "->:INSERT:" + targetQFName.toLowerCase();
 
@@ -454,7 +454,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
         // the value is from info in IMPALA_4.
         String createTime = new Long((long)1560885039*1000).toString();
         String processQFName =
-            dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+            dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
                 CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
 
         processQFName = processQFName.toLowerCase();
diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
index a7b9b0c..56d74fe 100644
--- a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
+++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
@@ -137,7 +137,7 @@ public class ImpalaLineageHookIT extends ImpalaLineageITBase {
             impalaHook.process(queryObj);
             String createTime = new Long(BaseImpalaEvent.getTableCreateTime(vertex5)).toString();
             String processQFName =
-                vertex5.getVertexId() + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+                vertex5.getVertexId() + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
                     CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
 
             processQFName = processQFName.toLowerCase();
diff --git a/addons/kafka-bridge/src/bin/import-kafka.sh b/addons/kafka-bridge/src/bin/import-kafka.sh
index 4bfb8b9..076eae8 100644
--- a/addons/kafka-bridge/src/bin/import-kafka.sh
+++ b/addons/kafka-bridge/src/bin/import-kafka.sh
@@ -117,7 +117,7 @@ else
     exit 1
 fi
 
-CP="${KAFKA_CP}:${ATLASCPPATH}:${HADOOP_CP}"
+CP="${ATLASCPPATH}:${HADOOP_CP}:${KAFKA_CP}"
 
 # If running in cygwin, convert pathnames and classpath to Windows format.
 if [ "${CYGWIN}" == "true" ]
diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
index 8755c9e..40b1fee 100644
--- a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
+++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
@@ -58,19 +58,20 @@ import java.util.regex.Pattern;
 public class KafkaBridge {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaBridge.class);
 
-    private static final int    EXIT_CODE_SUCCESS        = 0;
-    private static final int    EXIT_CODE_FAILED         = 1;
-    private static final String ATLAS_ENDPOINT           = "atlas.rest.address";
-    private static final String DEFAULT_ATLAS_URL        = "http://localhost:21000/";
-    private static final String KAFKA_CLUSTER_NAME       = "atlas.cluster.name";
-    private static final String DEFAULT_CLUSTER_NAME     = "primary";
-    private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
-    private static final String DESCRIPTION_ATTR         = "description";
-    private static final String PARTITION_COUNT          = "partitionCount";
-    private static final String NAME                     = "name";
-    private static final String URI                      = "uri";
-    private static final String CLUSTERNAME              = "clusterName";
-    private static final String TOPIC                    = "topic";
+    private static final int    EXIT_CODE_SUCCESS          = 0;
+    private static final int    EXIT_CODE_FAILED           = 1;
+    private static final String ATLAS_ENDPOINT             = "atlas.rest.address";
+    private static final String DEFAULT_ATLAS_URL          = "http://localhost:21000/";
+    private static final String CLUSTER_NAME_KEY           = "atlas.cluster.name";
+    private static final String KAFKA_METADATA_NAMESPACE   = "atlas.metadata.namespace";
+    private static final String DEFAULT_CLUSTER_NAME       = "primary";
+    private static final String ATTRIBUTE_QUALIFIED_NAME   = "qualifiedName";
+    private static final String DESCRIPTION_ATTR           = "description";
+    private static final String PARTITION_COUNT            = "partitionCount";
+    private static final String NAME                       = "name";
+    private static final String URI                        = "uri";
+    private static final String CLUSTERNAME                = "clusterName";
+    private static final String TOPIC                      = "topic";
 
     private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME       = "%s@%s";
     private static final String ZOOKEEPER_CONNECT                       = "atlas.kafka.zookeeper.connect";
@@ -81,7 +82,7 @@ public class KafkaBridge {
     private static final int    DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10 * 1000;
 
     private final List<String>  availableTopics;
-    private final String        clusterName;
+    private final String        metadataNamespace;
     private final AtlasClientV2 atlasClientV2;
     private final ZkUtils       zkUtils;
 
@@ -163,10 +164,18 @@ public class KafkaBridge {
         int      connectionTimeOutMs = atlasConf.getInt(ZOOKEEPER_CONNECTION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS);
         ZkClient zkClient            = new ZkClient(zookeeperConnect, sessionTimeOutMs, connectionTimeOutMs, ZKStringSerializer$.MODULE$);
 
-        this.atlasClientV2   = atlasClientV2;
-        this.clusterName     = atlasConf.getString(KAFKA_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
-        this.zkUtils         = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled());
-        this.availableTopics = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
+        this.atlasClientV2     = atlasClientV2;
+        this.metadataNamespace = getMetadataNamespace(atlasConf);
+        this.zkUtils           = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled());
+        this.availableTopics   = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
+    }
+
+    private String getMetadataNamespace(Configuration config) {
+        return config.getString(KAFKA_METADATA_NAMESPACE, getClusterName(config));
+    }
+
+    private String getClusterName(Configuration config) {
+        return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
     }
 
     public void importTopic(String topicToImport) throws Exception {
@@ -191,7 +200,7 @@ public class KafkaBridge {
 
     @VisibleForTesting
     AtlasEntityWithExtInfo createOrUpdateTopic(String topic) throws Exception {
-        String                 topicQualifiedName = getTopicQualifiedName(clusterName, topic);
+        String                 topicQualifiedName = getTopicQualifiedName(metadataNamespace, topic);
         AtlasEntityWithExtInfo topicEntity        = findTopicEntityInAtlas(topicQualifiedName);
 
         if (topicEntity == null) {
@@ -225,10 +234,10 @@ public class KafkaBridge {
             ret = topicEntity;
         }
 
-        String qualifiedName = getTopicQualifiedName(clusterName, topic);
+        String qualifiedName = getTopicQualifiedName(metadataNamespace, topic);
 
         ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
-        ret.setAttribute(CLUSTERNAME, clusterName);
+        ret.setAttribute(CLUSTERNAME, metadataNamespace);
         ret.setAttribute(TOPIC, topic);
         ret.setAttribute(NAME,topic);
         ret.setAttribute(DESCRIPTION_ATTR, topic);
@@ -239,8 +248,8 @@ public class KafkaBridge {
     }
 
     @VisibleForTesting
-    static String getTopicQualifiedName(String clusterName, String topic) {
-        return String.format(FORMAT_KAKFA_TOPIC_QUALIFIED_NAME, topic.toLowerCase(), clusterName);
+    static String getTopicQualifiedName(String metadataNamespace, String topic) {
+        return String.format(FORMAT_KAKFA_TOPIC_QUALIFIED_NAME, topic.toLowerCase(), metadataNamespace);
     }
 
     private AtlasEntityWithExtInfo findTopicEntityInAtlas(String topicQualifiedName) {
diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index 5397a4b..3ccd426 100644
--- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -52,8 +52,9 @@ import java.util.Date;
 public class SqoopHook extends SqoopJobDataPublisher {
     private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class);
 
-    public static final String ATLAS_CLUSTER_NAME   = "atlas.cluster.name";
-    public static final String DEFAULT_CLUSTER_NAME = "primary";
+    public static final String CLUSTER_NAME_KEY           = "atlas.cluster.name";
+    public static final String ATLAS_METADATA_NAMESPACE   = "atlas.metadata.namespace";
+    public static final String DEFAULT_CLUSTER_NAME       = "primary";
 
     public static final String USER           = "userName";
     public static final String DB_STORE_TYPE  = "dbStoreType";
@@ -80,12 +81,14 @@ public class SqoopHook extends SqoopJobDataPublisher {
     @Override
     public void publish(SqoopJobDataPublisher.Data data) throws AtlasHookException {
         try {
-            Configuration atlasProperties = ApplicationProperties.get();
-            String        clusterName     = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
-            AtlasEntity   entDbStore      = toSqoopDBStoreEntity(data);
-            AtlasEntity   entHiveDb       = toHiveDatabaseEntity(clusterName, data.getHiveDB());
-            AtlasEntity   entHiveTable    = data.getHiveTable() != null ? toHiveTableEntity(entHiveDb, data.getHiveTable()) : null;
-            AtlasEntity   entProcess      = toSqoopProcessEntity(entDbStore, entHiveDb, entHiveTable, data, clusterName);
+            Configuration atlasProperties   = ApplicationProperties.get();
+            String        metadataNamespace = atlasProperties.getString(ATLAS_METADATA_NAMESPACE, getClusterName(atlasProperties));
+
+            AtlasEntity entDbStore   = toSqoopDBStoreEntity(data);
+            AtlasEntity entHiveDb    = toHiveDatabaseEntity(metadataNamespace, data.getHiveDB());
+            AtlasEntity entHiveTable = data.getHiveTable() != null ? toHiveTableEntity(entHiveDb, data.getHiveTable()) : null;
+            AtlasEntity entProcess   = toSqoopProcessEntity(entDbStore, entHiveDb, entHiveTable, data, metadataNamespace);
+
 
             AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(entProcess);
 
@@ -105,11 +108,15 @@ public class SqoopHook extends SqoopJobDataPublisher {
         }
     }
 
-    private AtlasEntity toHiveDatabaseEntity(String clusterName, String dbName) {
+    private String getClusterName(Configuration config) {
+        return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
+    }
+
+    private AtlasEntity toHiveDatabaseEntity(String metadataNamespace, String dbName) {
         AtlasEntity entHiveDb     = new AtlasEntity(HiveDataTypes.HIVE_DB.getName());
-        String      qualifiedName = HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName);
+        String      qualifiedName = HiveMetaStoreBridge.getDBQualifiedName(metadataNamespace, dbName);
 
-        entHiveDb.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
+        entHiveDb.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, metadataNamespace);
         entHiveDb.setAttribute(AtlasClient.NAME, dbName);
         entHiveDb.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName);
 
@@ -153,9 +160,10 @@ public class SqoopHook extends SqoopJobDataPublisher {
         return entDbStore;
     }
 
-    private AtlasEntity toSqoopProcessEntity(AtlasEntity entDbStore, AtlasEntity entHiveDb, AtlasEntity entHiveTable, SqoopJobDataPublisher.Data data, String clusterName) {
+    private AtlasEntity toSqoopProcessEntity(AtlasEntity entDbStore, AtlasEntity entHiveDb, AtlasEntity entHiveTable,
+                                             SqoopJobDataPublisher.Data data, String metadataNamespace) {
         AtlasEntity         entProcess       = new AtlasEntity(SqoopDataTypes.SQOOP_PROCESS.getName());
-        String              sqoopProcessName = getSqoopProcessName(data, clusterName);
+        String              sqoopProcessName = getSqoopProcessName(data, metadataNamespace);
         Map<String, String> sqoopOptionsMap  = new HashMap<>();
         Properties          options          = data.getOptions();
 
@@ -190,7 +198,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
         return data.getOperation().toLowerCase().equals("import");
     }
 
-    static String getSqoopProcessName(Data data, String clusterName) {
+    static String getSqoopProcessName(Data data, String metadataNamespace) {
         StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(), data.getUrl()));
 
         if (StringUtils.isNotEmpty(data.getHiveTable())) {
@@ -204,9 +212,9 @@ public class SqoopHook extends SqoopJobDataPublisher {
         }
 
         if (data.getHiveTable() != null) {
-            name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName));
+            name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), metadataNamespace));
         } else {
-            name.append(String.format("--hive-%s --hive-database %s --hive-cluster %s", data.getOperation(), data.getHiveDB(), clusterName));
+            name.append(String.format("--hive-%s --hive-database %s --hive-cluster %s", data.getOperation(), data.getHiveDB(), metadataNamespace));
         }
 
         return name.toString();
diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index 97668a3..517a3c3 100644
--- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -118,7 +118,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
         topology.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, topologyInfo.get_name());
         topology.setAttribute(AtlasClient.OWNER, owner);
         topology.setAttribute("startTime", new Date(System.currentTimeMillis()));
-        topology.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
+        topology.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getMetadataNamespace());
 
         return topology;
     }
@@ -166,9 +166,9 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
     }
 
     private AtlasEntity addDataSet(String dataSetType, String topologyOwner, Serializable instance, Map stormConf, AtlasEntityExtInfo entityExtInfo) {
-        Map<String, String> config      = StormTopologyUtil.getFieldValues(instance, true, null);
-        String              clusterName = null;
-        AtlasEntity         ret         = null;
+        Map<String, String> config            = StormTopologyUtil.getFieldValues(instance, true, null);
+        AtlasEntity         ret               = null;
+        String              metadataNamespace = getMetadataNamespace();
 
         // todo: need to redo this with a config driven approach
         switch (dataSetType) {
@@ -188,8 +188,6 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
                     topologyOwner = ANONYMOUS_OWNER;
                 }
 
-                clusterName = getClusterName(stormConf);
-
                 if (topicName == null) {
                     LOG.error("Kafka topic name not found");
                 } else {
@@ -198,7 +196,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
                     ret.setAttribute("topic", topicName);
                     ret.setAttribute("uri", uri);
                     ret.setAttribute(AtlasClient.OWNER, topologyOwner);
-                    ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(clusterName, topicName));
+                    ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(metadataNamespace, topicName));
                     ret.setAttribute(AtlasClient.NAME, topicName);
                 }
             }
@@ -212,7 +210,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
                     uri = hbaseTableName;
                 }
 
-                clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf);
+                metadataNamespace = extractComponentMetadataNamespace(HBaseConfiguration.create(), stormConf);
 
                 if (hbaseTableName == null) {
                     LOG.error("HBase table name not found");
@@ -223,7 +221,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
                     ret.setAttribute(AtlasClient.NAME, uri);
                     ret.setAttribute(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal"));
                     //TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName
-                    ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, hbaseTableName));
+                    ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(metadataNamespace, HBASE_NAMESPACE_DEFAULT, hbaseTableName));
                 }
             }
             break;
@@ -234,11 +232,9 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
                 final Path   hdfsPath      = new Path(hdfsPathStr);
                 final String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(hdfsPathStr);
 
-                clusterName = getClusterName(stormConf);
-
                 ret = new AtlasEntity(HiveMetaStoreBridge.HDFS_PATH);
 
-                ret.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
+                ret.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, metadataNamespace);
                 ret.setAttribute(AtlasClient.OWNER, stormConf.get("hdfs.kerberos.principal"));
                 ret.setAttribute(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(hdfsPath).toString().toLowerCase());
 
@@ -247,16 +243,16 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
 
                     ret.setAttribute("path", updatedPath);
                     ret.setAttribute("nameServiceId", nameServiceID);
-                    ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, updatedPath));
+                    ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(metadataNamespace, updatedPath));
                 } else {
                     ret.setAttribute("path", hdfsPathStr);
-                    ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, hdfsPathStr));
+                    ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(metadataNamespace, hdfsPathStr));
                 }
             }
             break;
 
             case "HiveBolt": {
-                clusterName = extractComponentClusterName(new HiveConf(), stormConf);
+                metadataNamespace = extractComponentMetadataNamespace(new HiveConf(), stormConf);
 
                 final String dbName  = config.get("HiveBolt.options.databaseName");
                 final String tblName = config.get("HiveBolt.options.tableName");
@@ -267,8 +263,8 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
                     AtlasEntity dbEntity = new AtlasEntity("hive_db");
 
                     dbEntity.setAttribute(AtlasClient.NAME, dbName);
-                    dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), dbName));
-                    dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
+                    dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(metadataNamespace, dbName));
+                    dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, metadataNamespace);
 
                     entityExtInfo.addReferredEntity(dbEntity);
 
@@ -277,7 +273,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
 
                     ret.setAttribute(AtlasClient.NAME, tblName);
                     ret.setAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasObjectId(dbEntity));
-                    ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tblName));
+                    ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(metadataNamespace, dbName, tblName));
                 }
             }
             break;
@@ -384,30 +380,25 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
         }
     }
 
-    public static String getKafkaTopicQualifiedName(String clusterName, String topicName) {
-        return String.format("%s@%s", topicName.toLowerCase(), clusterName);
+    public static String getKafkaTopicQualifiedName(String metadataNamespace, String topicName) {
+        return String.format("%s@%s", topicName.toLowerCase(), metadataNamespace);
     }
 
-    public static String getHbaseTableQualifiedName(String clusterName, String nameSpace, String tableName) {
-        return String.format("%s.%s@%s", nameSpace.toLowerCase(), tableName.toLowerCase(), clusterName);
+    public static String getHbaseTableQualifiedName(String metadataNamespace, String nameSpace, String tableName) {
+        return String.format("%s.%s@%s", nameSpace.toLowerCase(), tableName.toLowerCase(), metadataNamespace);
     }
 
-    public static String getHdfsPathQualifiedName(String clusterName, String hdfsPath) {
-        return String.format("%s@%s", hdfsPath.toLowerCase(), clusterName);
+    public static String getHdfsPathQualifiedName(String metadataNamespace, String hdfsPath) {
+        return String.format("%s@%s", hdfsPath.toLowerCase(), metadataNamespace);
     }
 
-    private String getClusterName(Map stormConf) {
-        return atlasProperties.getString(AtlasConstants.CLUSTER_NAME_KEY, AtlasConstants.DEFAULT_CLUSTER_NAME);
-    }
-
-    private String extractComponentClusterName(Configuration configuration, Map stormConf) {
-        String clusterName = configuration.get(AtlasConstants.CLUSTER_NAME_KEY, null);
+    private String extractComponentMetadataNamespace(Configuration configuration, Map stormConf) {
+        String clusterName = configuration.get(CLUSTER_NAME_KEY, null);
 
         if (clusterName == null) {
-            clusterName = getClusterName(stormConf);
+            clusterName = getMetadataNamespace();
         }
 
         return clusterName;
     }
-
-}
+}
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/atlas/AtlasConstants.java b/common/src/main/java/org/apache/atlas/AtlasConstants.java
index 2b9f411..2b7b4b3 100644
--- a/common/src/main/java/org/apache/atlas/AtlasConstants.java
+++ b/common/src/main/java/org/apache/atlas/AtlasConstants.java
@@ -25,16 +25,16 @@ public final class AtlasConstants {
     private AtlasConstants() {
     }
 
-    public static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
-    public static final String DEFAULT_CLUSTER_NAME = "primary";
-    public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName";
-    public static final String SYSTEM_PROPERTY_APP_PORT = "atlas.app.port";
-    public static final String DEFAULT_APP_PORT_STR = "21000";
-    public static final String ATLAS_REST_ADDRESS_KEY = "atlas.rest.address";
-    public static final String DEFAULT_ATLAS_REST_ADDRESS = "http://localhost:21000";
-    public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30;
-    public static final String DEFAULT_TYPE_VERSION = "1.0";
-
+    public static final String CLUSTER_NAME_KEY              = "atlas.cluster.name";
+    public static final String SYSTEM_PROPERTY_APP_PORT      = "atlas.app.port";
+    public static final String ATLAS_REST_ADDRESS_KEY        = "atlas.rest.address";
     public static final String ATLAS_MIGRATION_MODE_FILENAME = "atlas.migration.data.filename";
     public static final String ATLAS_SERVICES_ENABLED        = "atlas.services.enabled";
+
+    public static final String CLUSTER_NAME_ATTRIBUTE        = "clusterName";
+    public static final String DEFAULT_APP_PORT_STR          = "21000";
+    public static final String DEFAULT_ATLAS_REST_ADDRESS    = "http://localhost:21000";
+    public static final String DEFAULT_TYPE_VERSION          = "1.0";
+    public static final int    ATLAS_SHUTDOWN_HOOK_PRIORITY  = 30;
+
 }
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 0030276..cc6546b 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -59,10 +59,14 @@ public abstract class AtlasHook {
     public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY    = "atlas.notification.failed.messages.filename";
     public static final String ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY = "atlas.notification.log.failed.messages";
     public static final String ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME        = "atlas_hook_failed_messages.log";
+    public static final String CONF_METADATA_NAMESPACE                            = "atlas.metadata.namespace";
+    public static final String CLUSTER_NAME_KEY                                   = "atlas.cluster.name";
+    public static final String DEFAULT_CLUSTER_NAME                               = "primary";
 
     protected static Configuration         atlasProperties;
     protected static NotificationInterface notificationInterface;
 
+    private static final String               metadataNamespace;
     private static final int                  SHUTDOWN_HOOK_WAIT_TIME_MS = 3000;
     private static final boolean              logFailedMessages;
     private static final FailedMessagesLogger failedMessagesLogger;
@@ -95,6 +99,7 @@ public abstract class AtlasHook {
             }
         }
 
+        metadataNamespace         = getMetadataNamespace(atlasProperties);
         notificationMaxRetries    = atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);
         notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
         notificationInterface     = NotificationProvider.get();
@@ -306,4 +311,15 @@ public abstract class AtlasHook {
         return ret;
     }
 
-}
+    private static String getMetadataNamespace(Configuration config) {
+        return config.getString(CONF_METADATA_NAMESPACE, getClusterName(config));
+    }
+
+    private static String getClusterName(Configuration config) {
+        return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
+    }
+
+    public String getMetadataNamespace() {
+        return metadataNamespace;
+    }
+}
\ No newline at end of file