You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/03/19 21:08:53 UTC

[6/6] atlas git commit: ATLAS-2491: Hive hook should use v2 notifications

ATLAS-2491: Hive hook should use v2 notifications


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/6e02ec5b
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/6e02ec5b
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/6e02ec5b

Branch: refs/heads/master
Commit: 6e02ec5b3a97ee2bfaf16ef5e875c14e383d5823
Parents: dee8a2d
Author: rmani <rm...@hortonworks.com>
Authored: Tue Mar 6 08:50:18 2018 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Mon Mar 19 12:20:10 2018 -0700

----------------------------------------------------------------------
 .../atlas/falcon/bridge/FalconBridge.java       |    3 +-
 addons/hive-bridge/pom.xml                      |    6 +
 .../atlas/hive/bridge/ColumnLineageUtils.java   |  158 --
 .../atlas/hive/bridge/HiveMetaStoreBridge.java  | 1016 +++++++------
 .../atlas/hive/hook/AtlasHiveHookContext.java   |   91 ++
 .../org/apache/atlas/hive/hook/HiveHook.java    | 1201 ++-------------
 .../atlas/hive/hook/events/AlterDatabase.java   |   41 +
 .../atlas/hive/hook/events/AlterTable.java      |   42 +
 .../hive/hook/events/AlterTableRename.java      |  173 +++
 .../hive/hook/events/AlterTableRenameCol.java   |   99 ++
 .../atlas/hive/hook/events/BaseHiveEvent.java   |  831 ++++++++++
 .../atlas/hive/hook/events/CreateDatabase.java  |   75 +
 .../hive/hook/events/CreateHiveProcess.java     |  195 +++
 .../atlas/hive/hook/events/CreateTable.java     |   93 ++
 .../atlas/hive/hook/events/DropDatabase.java    |   68 +
 .../atlas/hive/hook/events/DropTable.java       |   61 +
 .../apache/atlas/hive/rewrite/ASTRewriter.java  |   26 -
 .../atlas/hive/rewrite/HiveASTRewriter.java     |   95 --
 .../atlas/hive/rewrite/LiteralRewriter.java     |   76 -
 .../atlas/hive/rewrite/RewriteContext.java      |   48 -
 .../atlas/hive/rewrite/RewriteException.java    |   24 -
 .../java/org/apache/atlas/hive/HiveITBase.java  |  541 ++++++-
 .../atlas/hive/bridge/ColumnLineageUtils.java   |  161 ++
 .../hive/bridge/HiveLiteralRewriterTest.java    |   68 -
 .../hive/bridge/HiveMetaStoreBridgeTest.java    |  184 ++-
 .../hive/bridge/HiveMetastoreBridgeIT.java      |   53 +-
 .../org/apache/atlas/hive/hook/HiveHookIT.java  | 1427 ++++++++++--------
 .../org/apache/atlas/sqoop/hook/SqoopHook.java  |    3 +-
 .../apache/atlas/storm/hook/StormAtlasHook.java |    3 +-
 29 files changed, 4076 insertions(+), 2786 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/6e02ec5b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
index 11d13b3..cbf002f 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java
@@ -68,6 +68,7 @@ public class FalconBridge {
     public static final String RUNSON = "runs-on";
     public static final String STOREDIN = "stored-in";
     public static final String FREQUENCY = "frequency";
+    public static final String ATTRIBUTE_DB = "db";
 
     /**
      * Creates cluster entity
@@ -357,7 +358,7 @@ public class FalconBridge {
         tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
                 HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
         tableRef.set(AtlasClient.NAME, tableName.toLowerCase());
-        tableRef.set(HiveMetaStoreBridge.DB, dbRef);
+        tableRef.set(ATTRIBUTE_DB, dbRef);
         entities.add(tableRef);
 
         return entities;

http://git-wip-us.apache.org/repos/asf/atlas/blob/6e02ec5b/addons/hive-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index 14e4295..b3ebe8e 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -106,6 +106,12 @@
 
         <dependency>
             <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-client-v2</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
             <artifactId>atlas-notification</artifactId>
         </dependency>
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/6e02ec5b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java
deleted file mode 100644
index ba10008..0000000
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.hive.bridge;
-
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.hive.model.HiveDataTypes;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.hadoop.hive.ql.hooks.LineageInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ColumnLineageUtils {
-    public static final Logger LOG = LoggerFactory.getLogger(ColumnLineageUtils.class);
-    public static class HiveColumnLineageInfo {
-        public final String depenendencyType;
-        public final String expr;
-        public final String inputColumn;
-
-        HiveColumnLineageInfo(LineageInfo.Dependency d, String inputCol) {
-            depenendencyType = d.getType().name();
-            expr = d.getExpr();
-            inputColumn = inputCol;
-        }
-
-        @Override
-        public String toString(){
-            return inputColumn;
-        }
-    }
-
-    public static String getQualifiedName(LineageInfo.DependencyKey key){
-        String db = key.getDataContainer().getTable().getDbName();
-        String table = key.getDataContainer().getTable().getTableName();
-        String col = key.getFieldSchema().getName();
-        return db + "." + table + "." + col;
-    }
-
-    public static Map<String, List<HiveColumnLineageInfo>> buildLineageMap(LineageInfo lInfo) {
-        Map<String, List<HiveColumnLineageInfo>> m = new HashMap<>();
-
-        for (Map.Entry<LineageInfo.DependencyKey, LineageInfo.Dependency> e : lInfo.entrySet()) {
-            List<HiveColumnLineageInfo> l = new ArrayList<>();
-            String k = getQualifiedName(e.getKey());
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("buildLineageMap(): key={}; value={}", e.getKey(), e.getValue());
-            }
-
-            Collection<LineageInfo.BaseColumnInfo> baseCols = getBaseCols(e.getValue());
-
-            if (baseCols != null) {
-                for (LineageInfo.BaseColumnInfo iCol : baseCols) {
-                    String db = iCol.getTabAlias().getTable().getDbName();
-                    String table = iCol.getTabAlias().getTable().getTableName();
-                    String colQualifiedName = iCol.getColumn() == null ? db + "." + table : db + "." + table + "." + iCol.getColumn().getName();
-                    l.add(new HiveColumnLineageInfo(e.getValue(), colQualifiedName));
-                }
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Setting lineage --> Input: {} ==> Output : {}", l, k);
-                }
-                m.put(k, l);
-            }
-        }
-        return m;
-    }
-
-    static Collection<LineageInfo.BaseColumnInfo> getBaseCols(LineageInfo.Dependency lInfoDep) {
-        Collection<LineageInfo.BaseColumnInfo> ret = null;
-
-        if (lInfoDep != null) {
-            try {
-                Method getBaseColsMethod = lInfoDep.getClass().getMethod("getBaseCols");
-
-                Object retGetBaseCols = getBaseColsMethod.invoke(lInfoDep);
-
-                if (retGetBaseCols != null) {
-                    if (retGetBaseCols instanceof Collection) {
-                        ret = (Collection) retGetBaseCols;
-                    } else {
-                        LOG.warn("{}: unexpected return type from LineageInfo.Dependency.getBaseCols(), expected type {}",
-                                retGetBaseCols.getClass().getName(), "Collection");
-                    }
-                }
-            } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
-                LOG.warn("getBaseCols()", ex);
-            }
-        }
-
-        return ret;
-    }
-
-    static String[] extractComponents(String qualifiedName) {
-        String[] comps = qualifiedName.split("\\.");
-        int lastIdx = comps.length - 1;
-        int atLoc = comps[lastIdx].indexOf('@');
-        if (atLoc > 0) {
-            comps[lastIdx] = comps[lastIdx].substring(0, atLoc);
-        }
-        return comps;
-    }
-
-    static void populateColumnReferenceableMap(Map<String, Referenceable> m,
-                                               Referenceable r) {
-        if (r.getTypeName().equals(HiveDataTypes.HIVE_TABLE.getName())) {
-            String qName = (String) r.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
-            String[] qNameComps = extractComponents(qName);
-            for (Referenceable col : (List<Referenceable>) r.get(HiveMetaStoreBridge.COLUMNS)) {
-                String cName = (String) col.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
-                String[] colQNameComps = extractComponents(cName);
-                String colQName = colQNameComps[0] + "." + colQNameComps[1] + "." + colQNameComps[2];
-                m.put(colQName, col);
-            }
-            String tableQName = qNameComps[0] + "." + qNameComps[1];
-            m.put(tableQName, r);
-        }
-    }
-
-
-    public static Map<String, Referenceable> buildColumnReferenceableMap(List<Referenceable> inputs,
-                                                                         List<Referenceable> outputs) {
-        Map<String, Referenceable> m = new HashMap<>();
-
-        for (Referenceable r : inputs) {
-            populateColumnReferenceableMap(m, r);
-        }
-
-        for (Referenceable r : outputs) {
-            populateColumnReferenceableMap(m, r);
-        }
-
-        return m;
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/6e02ec5b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 4047c16..51df8d2 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -21,22 +21,28 @@ package org.apache.atlas.hive.bridge;
 import com.google.common.annotations.VisibleForTesting;
 import com.sun.jersey.api.client.ClientResponse;
 import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasConstants;
+import org.apache.atlas.AtlasClientV2;
 import org.apache.atlas.AtlasServiceException;
-import org.apache.atlas.hive.hook.HiveHook;
+import org.apache.atlas.hive.hook.events.BaseHiveEvent;
 import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.hook.AtlasHookException;
-import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.instance.EntityMutations;
 import org.apache.atlas.utils.AuthenticationUtil;
 import org.apache.atlas.utils.HdfsNameServiceResolver;
-import org.apache.atlas.v1.model.instance.Id;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasStruct;
+import org.apache.commons.collections.CollectionUtils;
+
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang.StringUtils;
@@ -57,119 +63,221 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Date;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
-import static org.apache.atlas.hive.hook.HiveHook.CONF_PREFIX;
+import static org.apache.atlas.hive.hook.events.BaseHiveEvent.*;
 
 /**
  * A Bridge Utility that imports metadata from the Hive Meta Store
  * and registers them in Atlas.
  */
-public class HiveMetaStoreBridge {
-    private static final String DEFAULT_DGI_URL = "http://localhost:21000/";
-    public static final String HIVE_CLUSTER_NAME = "atlas.cluster.name";
-    public static final String DEFAULT_CLUSTER_NAME = "primary";
-    public static final String DESCRIPTION_ATTR = "description";
-
-    public static final String TEMP_TABLE_PREFIX = "_temp-";
-
-    private final String clusterName;
-    public static final long MILLIS_CONVERT_FACTOR = 1000;
-
-    public static final String ATLAS_ENDPOINT = "atlas.rest.address";
-
-    public static final String COMMENT = "comment";
-    public static final String PARAMETERS = "parameters";
-    public static final String COLUMNS = "columns";
-    public static final String POSITION = "position";
-    public static final String PART_COLS = "partitionKeys";
-    public static final String TABLE_ALIAS_LIST = "aliases";
-    public static final String STORAGE_NUM_BUCKETS = "numBuckets";
-    public static final String STORAGE_IS_STORED_AS_SUB_DIRS = "storedAsSubDirectories";
-    public static final String TABLE = "table";
-    public static final String DB = "db";
-    public static final String STORAGE_DESC = "sd";
-    public static final String STORAGE_DESC_INPUT_FMT = "inputFormat";
-    public static final String STORAGE_DESC_OUTPUT_FMT = "outputFormat";
-    public static final String LOCATION = "location";
-    public static final String TABLE_TYPE_ATTR = "tableType";
-    public static final String CREATE_TIME = "createTime";
-    public static final String LAST_ACCESS_TIME = "lastAccessTime";
-    public static final String HDFS_PATH = "hdfs_path";
-    public static final String SEP = ":".intern();
 
+public class HiveMetaStoreBridge {
     private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridge.class);
 
-    public  final Hive        hiveClient;
-    private final AtlasClient atlasClient;
-    private final boolean     convertHdfsPathToLowerCase;
+    public static final String CONF_PREFIX                     = "atlas.hook.hive.";
+    public static final String HIVE_CLUSTER_NAME               = "atlas.cluster.name";
+    public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
+    public static final String DEFAULT_CLUSTER_NAME            = "primary";
+    public static final String TEMP_TABLE_PREFIX               = "_temp-";
+    public static final String ATLAS_ENDPOINT                  = "atlas.rest.address";
+    public static final String SEP                             = ":".intern();
+    public static final String HDFS_PATH                       = "hdfs_path";
+
+    private static final String DEFAULT_ATLAS_URL            = "http://localhost:21000/";
 
     private final HdfsNameServiceResolver hdfsNameServiceResolver = HdfsNameServiceResolver.getInstance();
+    private final String                  clusterName;
+    private final Hive                    hiveClient;
+    private final AtlasClientV2           atlasClientV2;
+    private final boolean                 convertHdfsPathToLowerCase;
 
-    HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) {
-        this(clusterName, hiveClient, atlasClient, true);
-    }
 
-    HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient, boolean convertHdfsPathToLowerCase) {
-        this.clusterName                = clusterName;
-        this.hiveClient                 = hiveClient;
-        this.atlasClient                = atlasClient;
-        this.convertHdfsPathToLowerCase = convertHdfsPathToLowerCase;
-    }
+    public static void main(String[] args) throws AtlasHookException {
+        try {
+            Configuration atlasConf     = ApplicationProperties.get();
+            String[]      atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT);
 
-    public String getClusterName() {
-        return clusterName;
+            if (atlasEndpoint == null || atlasEndpoint.length == 0){
+                atlasEndpoint = new String[] { DEFAULT_ATLAS_URL };
+            }
+
+            AtlasClientV2 atlasClientV2;
+
+            if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
+                String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
+
+                atlasClientV2 = new AtlasClientV2(atlasEndpoint, basicAuthUsernamePassword);
+            } else {
+                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+                atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint);
+            }
+
+            Options           options     = new Options();
+            CommandLineParser parser      = new BasicParser();
+            CommandLine       cmd         = parser.parse(options, args);
+            boolean           failOnError = cmd.hasOption("failOnError");
+
+            HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2);
+
+            hiveMetaStoreBridge.importHiveMetadata(failOnError);
+        } catch(Exception e) {
+            throw new AtlasHookException("HiveMetaStoreBridge.main() failed.", e);
+        }
     }
 
     /**
      * Construct a HiveMetaStoreBridge.
      * @param hiveConf {@link HiveConf} for Hive component in the cluster
      */
-    public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf) throws Exception {
-        this(atlasProperties, hiveConf, null);
+    public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf, AtlasClientV2 atlasClientV2) throws Exception {
+        this(atlasProperties.getString(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), atlasClientV2, atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, true));
     }
 
     /**
      * Construct a HiveMetaStoreBridge.
      * @param hiveConf {@link HiveConf} for Hive component in the cluster
      */
-    public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf, AtlasClient atlasClient) throws Exception {
-        this(atlasProperties.getString(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), atlasClient, atlasProperties.getBoolean(CONF_PREFIX + "hdfs_path.convert_to_lowercase", true));
+    public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf) throws Exception {
+        this(atlasProperties, hiveConf, null);
+    }
+
+    HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClientV2 atlasClientV2) {
+        this(clusterName, hiveClient, atlasClientV2, true);
     }
 
-    AtlasClient getAtlasClient() {
-        return atlasClient;
+    HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClientV2 atlasClientV2, boolean convertHdfsPathToLowerCase) {
+        this.clusterName                = clusterName;
+        this.hiveClient                 = hiveClient;
+        this.atlasClientV2              = atlasClientV2;
+        this.convertHdfsPathToLowerCase = convertHdfsPathToLowerCase;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public Hive getHiveClient() {
+        return hiveClient;
+    }
+
+    public AtlasClientV2 getAtlasClient() {
+        return atlasClientV2;
     }
 
     public boolean isConvertHdfsPathToLowerCase() {
         return convertHdfsPathToLowerCase;
     }
 
-    void importHiveMetadata(boolean failOnError) throws Exception {
-        LOG.info("Importing hive metadata");
+
+    @VisibleForTesting
+    public void importHiveMetadata(boolean failOnError) throws Exception {
+        LOG.info("Importing Hive metadata");
+
         importDatabases(failOnError);
     }
 
     private void importDatabases(boolean failOnError) throws Exception {
         List<String> databases = hiveClient.getAllDatabases();
+
+        LOG.info("Found {} databases", databases.size());
+
         for (String databaseName : databases) {
-            Referenceable dbReference = registerDatabase(databaseName);
+            AtlasEntityWithExtInfo dbEntity = registerDatabase(databaseName);
 
-            if (dbReference != null) {
-                importTables(dbReference, databaseName, failOnError);
+            if (dbEntity != null) {
+                importTables(dbEntity.getEntity(), databaseName, failOnError);
             }
         }
     }
 
     /**
-     * Create a Hive Database entity
-     * @param hiveDB The Hive {@link Database} object from which to map properties
-     * @return new Hive Database entity
-     * @throws HiveException
+     * Imports all tables for the given db
+     * @param dbEntity
+     * @param databaseName
+     * @param failOnError
+     * @throws Exception
      */
-    public Referenceable createDBInstance(Database hiveDB) throws HiveException {
-        return createOrUpdateDBInstance(hiveDB, null);
+    private int importTables(AtlasEntity dbEntity, String databaseName, final boolean failOnError) throws Exception {
+        List<String> hiveTables = hiveClient.getAllTables(databaseName);
+
+        LOG.info("Found {} tables in database {}", hiveTables.size(), databaseName);
+
+        int tablesImported = 0;
+
+        try {
+            for (String tableName : hiveTables) {
+                int imported = importTable(dbEntity, databaseName, tableName, failOnError);
+
+                tablesImported += imported;
+            }
+        } finally {
+            if (tablesImported == hiveTables.size()) {
+                LOG.info("Successfully imported all {} tables from database {}", tablesImported, databaseName);
+            } else {
+                LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import", tablesImported, hiveTables.size(), databaseName);
+            }
+        }
+
+        return tablesImported;
+    }
+
+    @VisibleForTesting
+    public int importTable(AtlasEntity dbEntity, String databaseName, String tableName, final boolean failOnError) throws Exception {
+        try {
+            Table                  table       = hiveClient.getTable(databaseName, tableName);
+            AtlasEntityWithExtInfo tableEntity = registerTable(dbEntity, table);
+
+            if (table.getTableType() == TableType.EXTERNAL_TABLE) {
+                String                 processQualifiedName = getTableProcessQualifiedName(clusterName, table);
+                AtlasEntityWithExtInfo processEntity        = findProcessEntity(processQualifiedName);
+
+                if (processEntity == null) {
+                    String      tableLocation = isConvertHdfsPathToLowerCase() ? lower(table.getDataLocation().toString()) : table.getDataLocation().toString();
+                    String      query         = getCreateTableString(table, tableLocation);
+                    AtlasEntity pathInst      = toHdfsPathEntity(tableLocation);
+                    AtlasEntity tableInst     = tableEntity.getEntity();
+                    AtlasEntity processInst   = new AtlasEntity(HiveDataTypes.HIVE_PROCESS.getName());
+                    long        now           = System.currentTimeMillis();
+
+                    processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, processQualifiedName);
+                    processInst.setAttribute(ATTRIBUTE_NAME, query);
+                    processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName);
+                    processInst.setAttribute(ATTRIBUTE_INPUTS, Collections.singletonList(BaseHiveEvent.getObjectId(pathInst)));
+                    processInst.setAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(BaseHiveEvent.getObjectId(tableInst)));
+                    processInst.setAttribute(ATTRIBUTE_USER_NAME, table.getOwner());
+                    processInst.setAttribute(ATTRIBUTE_START_TIME, now);
+                    processInst.setAttribute(ATTRIBUTE_END_TIME, now);
+                    processInst.setAttribute(ATTRIBUTE_OPERATION_TYPE, "CREATETABLE");
+                    processInst.setAttribute(ATTRIBUTE_QUERY_TEXT, query);
+                    processInst.setAttribute(ATTRIBUTE_QUERY_ID, query);
+                    processInst.setAttribute(ATTRIBUTE_QUERY_PLAN, "{}");
+                    processInst.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(query));
+
+                    AtlasEntitiesWithExtInfo createTableProcess = new AtlasEntitiesWithExtInfo();
+
+                    createTableProcess.addEntity(processInst);
+                    createTableProcess.addEntity(pathInst);
+
+                    registerInstances(createTableProcess);
+                } else {
+                    LOG.info("Process {} is already registered", processQualifiedName);
+                }
+            }
+
+            return 1;
+        } catch (Exception e) {
+            LOG.error("Import failed for hive_table {}", tableName, e);
+
+            if (failOnError) {
+                throw e;
+            }
+
+            return 0;
+        }
     }
 
     /**
@@ -178,518 +286,506 @@ public class HiveMetaStoreBridge {
      * @return
      * @throws Exception
      */
-    private Referenceable registerDatabase(String databaseName) throws Exception {
-        Referenceable dbRef = getDatabaseReference(clusterName, databaseName);
-        Database db = hiveClient.getDatabase(databaseName);
+    private AtlasEntityWithExtInfo registerDatabase(String databaseName) throws Exception {
+        AtlasEntityWithExtInfo ret = null;
+        Database               db  = hiveClient.getDatabase(databaseName);
 
         if (db != null) {
-            if (dbRef == null) {
-                dbRef = createDBInstance(db);
-                dbRef = registerInstance(dbRef);
+            ret = findDatabase(clusterName, databaseName);
+
+            if (ret == null) {
+                ret = registerInstance(new AtlasEntityWithExtInfo(toDbEntity(db)));
             } else {
-                LOG.info("Database {} is already registered with id {}. Updating it.", databaseName, dbRef.getId().getId());
-                dbRef = createOrUpdateDBInstance(db, dbRef);
-                updateInstance(dbRef);
+                LOG.info("Database {} is already registered - id={}. Updating it.", databaseName, ret.getEntity().getGuid());
+
+                ret.setEntity(toDbEntity(db, ret.getEntity()));
+
+                updateInstance(ret);
             }
         }
-        return dbRef;
+
+        return ret;
     }
 
-    private Referenceable createOrUpdateDBInstance(Database hiveDB, Referenceable dbRef) {
-        LOG.info("Importing objects from databaseName : {}", hiveDB.getName());
+    private AtlasEntityWithExtInfo registerTable(AtlasEntity dbEntity, Table table) throws AtlasHookException {
+        try {
+            AtlasEntityWithExtInfo ret;
+            AtlasEntityWithExtInfo tableEntity = findTableEntity(table);
 
-        if (dbRef == null) {
-            dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
-        }
-        String dbName = hiveDB.getName().toLowerCase();
-        dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getDBQualifiedName(clusterName, dbName));
-        dbRef.set(AtlasClient.NAME, dbName);
-        dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
-        dbRef.set(DESCRIPTION_ATTR, hiveDB.getDescription());
-
-        dbRef.set(LOCATION, hdfsNameServiceResolver.getPathWithNameServiceID(hiveDB.getLocationUri()));
-        dbRef.set(PARAMETERS, hiveDB.getParameters());
-        dbRef.set(AtlasClient.OWNER, hiveDB.getOwnerName());
-        if (hiveDB.getOwnerType() != null) {
-            dbRef.set("ownerType", hiveDB.getOwnerType().getValue());
+            if (tableEntity == null) {
+                tableEntity = toTableEntity(dbEntity, table);
+
+                ret = registerInstance(tableEntity);
+            } else {
+                LOG.info("Table {}.{} is already registered with id {}. Updating entity.", table.getDbName(), table.getTableName(), tableEntity.getEntity().getGuid());
+
+                ret = toTableEntity(dbEntity, table, tableEntity);
+
+                updateInstance(ret);
+            }
+
+            return ret;
+        } catch (Exception e) {
+            throw new AtlasHookException("HiveMetaStoreBridge.registerTable() failed.", e);
         }
-        return dbRef;
     }
 
     /**
      * Registers an entity in atlas
-     * @param referenceable
+     * @param entity
      * @return
      * @throws Exception
      */
-    private Referenceable registerInstance(Referenceable referenceable) throws Exception {
-        String typeName = referenceable.getTypeName();
-        LOG.debug("creating instance of type {}", typeName);
+    private AtlasEntityWithExtInfo registerInstance(AtlasEntityWithExtInfo entity) throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("creating {} entity: {}", entity.getEntity().getTypeName(), entity);
+        }
 
-        String entityJSON = AtlasType.toV1Json(referenceable);
-        LOG.debug("Submitting new entity {} = {}", referenceable.getTypeName(), entityJSON);
-        List<String> guids = getAtlasClient().createEntity(entityJSON);
-        LOG.debug("created instance for type {}, guid: {}", typeName, guids);
+        AtlasEntityWithExtInfo  ret             = null;
+        EntityMutationResponse  response        = atlasClientV2.createEntity(entity);
+        List<AtlasEntityHeader> createdEntities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);
 
-        return new Referenceable(guids.get(guids.size() - 1), referenceable.getTypeName(), null);
-    }
+        if (CollectionUtils.isNotEmpty(createdEntities)) {
+            for (AtlasEntityHeader createdEntity : createdEntities) {
+                if (ret == null) {
+                    ret = atlasClientV2.getEntityByGuid(createdEntity.getGuid());
 
-    /**
-     * Gets reference to the atlas entity for the database
-     * @param databaseName  database Name
-     * @param clusterName    cluster name
-     * @return Reference for database if exists, else null
-     * @throws Exception
-     */
-    private Referenceable getDatabaseReference(String clusterName, String databaseName) throws Exception {
-        LOG.debug("Getting reference for database {}", databaseName);
-        String typeName = HiveDataTypes.HIVE_DB.getName();
+                    LOG.info("Created {} entity: name={}, guid={}", ret.getEntity().getTypeName(), ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), ret.getEntity().getGuid());
+                } else if (ret.getEntity(createdEntity.getGuid()) == null) {
+                    AtlasEntityWithExtInfo newEntity = atlasClientV2.getEntityByGuid(createdEntity.getGuid());
 
-        return getEntityReference(typeName, getDBQualifiedName(clusterName, databaseName));
-    }
+                    ret.addReferredEntity(newEntity.getEntity());
 
-    /**
-     * Construct the qualified name used to uniquely identify a Database instance in Atlas.
-     * @param clusterName Name of the cluster to which the Hive component belongs
-     * @param dbName Name of the Hive database
-     * @return Unique qualified name to identify the Database instance in Atlas.
-     */
-    public static String getDBQualifiedName(String clusterName, String dbName) {
-        return String.format("%s@%s", dbName.toLowerCase(), clusterName);
-    }
+                    if (MapUtils.isNotEmpty(newEntity.getReferredEntities())) {
+                        for (Map.Entry<String, AtlasEntity> entry : newEntity.getReferredEntities().entrySet()) {
+                            ret.addReferredEntity(entry.getKey(), entry.getValue());
+                        }
+                    }
 
-    private String getCreateTableString(Table table, String location){
-        String colString = "";
-        List<FieldSchema> colList = table.getAllCols();
-        if ( colList != null) {
-            for (FieldSchema col : colList) {
-                colString += col.getName() + " " + col.getType() + ",";
-            }
-            if (colList.size() > 0) {
-                colString = colString.substring(0, colString.length() - 1);
-                colString = "(" + colString + ")";
+                    LOG.info("Created {} entity: name={}, guid={}", newEntity.getEntity().getTypeName(), newEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), newEntity.getEntity().getGuid());
+                }
             }
         }
-        String query = "create external table " + table.getTableName() +  colString +
-                " location '" + location + "'";
-        return query;
+
+        return ret;
     }
 
     /**
-     * Imports all tables for the given db
-     * @param databaseReferenceable
-     * @param databaseName
-     * @param failOnError
+     * Registers an entity in atlas
+     * @param entities
+     * @return
      * @throws Exception
      */
-    private int importTables(Referenceable databaseReferenceable, String databaseName, final boolean failOnError) throws Exception {
-        int tablesImported = 0;
-        List<String> hiveTables = hiveClient.getAllTables(databaseName);
-        LOG.info("Importing tables {} for db {}", hiveTables.toString(), databaseName);
-        for (String tableName : hiveTables) {
-            int imported = importTable(databaseReferenceable, databaseName, tableName, failOnError);
-            tablesImported += imported;
+    private AtlasEntitiesWithExtInfo registerInstances(AtlasEntitiesWithExtInfo entities) throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("creating {} entities: {}", entities.getEntities().size(), entities);
         }
 
-        if (tablesImported == hiveTables.size()) {
-            LOG.info("Successfully imported all {} tables from {} ", tablesImported, databaseName);
-        } else {
-            LOG.error("Able to import {} tables out of {} tables from {}. Please check logs for import errors", tablesImported, hiveTables.size(), databaseName);
-        }
+        AtlasEntitiesWithExtInfo ret = null;
+        EntityMutationResponse   response        = atlasClientV2.createEntities(entities);
+        List<AtlasEntityHeader>  createdEntities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);
 
-        return tablesImported;
-    }
+        if (CollectionUtils.isNotEmpty(createdEntities)) {
+            ret = new AtlasEntitiesWithExtInfo();
 
-    @VisibleForTesting
-    public int importTable(Referenceable databaseReferenceable, String databaseName, String tableName, final boolean failOnError) throws Exception {
-        try {
-            Table table = hiveClient.getTable(databaseName, tableName);
-            Referenceable tableReferenceable = registerTable(databaseReferenceable, table);
-            if (table.getTableType() == TableType.EXTERNAL_TABLE) {
-                String tableQualifiedName = getTableProcessQualifiedName(clusterName, table);
-                Referenceable process = getProcessReference(tableQualifiedName);
-                if (process == null) {
-                    LOG.info("Attempting to register create table process for {}", tableQualifiedName);
-                    Referenceable lineageProcess = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
-                    ArrayList<Referenceable> sourceList = new ArrayList<>();
-                    ArrayList<Referenceable> targetList = new ArrayList<>();
-                    String tableLocation = isConvertHdfsPathToLowerCase() ? HiveHook.lower(table.getDataLocation().toString()) : table.getDataLocation().toString();
-                    Referenceable path = fillHDFSDataSet(tableLocation);
-                    String query = getCreateTableString(table, tableLocation);
-                    sourceList.add(path);
-                    targetList.add(tableReferenceable);
-                    lineageProcess.set("inputs", sourceList);
-                    lineageProcess.set("outputs", targetList);
-                    lineageProcess.set("userName", table.getOwner());
-                    lineageProcess.set("startTime", new Date(System.currentTimeMillis()));
-                    lineageProcess.set("endTime", new Date(System.currentTimeMillis()));
-                    lineageProcess.set("operationType", "CREATETABLE");
-                    lineageProcess.set("queryText", query);
-                    lineageProcess.set("queryId", query);
-                    lineageProcess.set("queryPlan", "{}");
-                    lineageProcess.set("clusterName", clusterName);
-                    List<String> recentQueries = new ArrayList<>(1);
-                    recentQueries.add(query);
-                    lineageProcess.set("recentQueries", recentQueries);
-                    String processQualifiedName = getTableProcessQualifiedName(clusterName, table);
-                    lineageProcess.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQualifiedName);
-                    lineageProcess.set(AtlasClient.NAME, query);
-                    registerInstance(lineageProcess);
-                } else {
-                    LOG.info("Process {} is already registered", process.toString());
+            for (AtlasEntityHeader createdEntity : createdEntities) {
+                AtlasEntityWithExtInfo entity = atlasClientV2.getEntityByGuid(createdEntity.getGuid());
+
+                ret.addEntity(entity.getEntity());
+
+                if (MapUtils.isNotEmpty(entity.getReferredEntities())) {
+                    for (Map.Entry<String, AtlasEntity> entry : entity.getReferredEntities().entrySet()) {
+                        ret.addReferredEntity(entry.getKey(), entry.getValue());
+                    }
                 }
+
+                LOG.info("Created {} entity: name={}, guid={}", entity.getEntity().getTypeName(), entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getEntity().getGuid());
             }
-            return 1;
-        } catch (Exception e) {
-            LOG.error("Import failed for hive_table {} ", tableName, e);
-            if (failOnError) {
-                throw e;
-            }
-            return 0;
         }
-    }
 
-    /**
-     * Gets reference for the table
-     *
-     * @param hiveTable
-     * @return table reference if exists, else null
-     * @throws Exception
-     */
-    private Referenceable getTableReference(Table hiveTable)  throws Exception {
-        LOG.debug("Getting reference for table {}.{}", hiveTable.getDbName(), hiveTable.getTableName());
-
-        String typeName = HiveDataTypes.HIVE_TABLE.getName();
-        String tblQualifiedName = getTableQualifiedName(getClusterName(), hiveTable.getDbName(), hiveTable.getTableName());
-        return getEntityReference(typeName, tblQualifiedName);
+        return ret;
     }
 
-    private Referenceable getEntityReference(final String typeName, final String tblQualifiedName) throws AtlasServiceException {
-        AtlasClient dgiClient = getAtlasClient();
-        try {
-            return dgiClient.getEntity(typeName, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tblQualifiedName);
-        } catch (AtlasServiceException e) {
-            if(e.getStatus() == ClientResponse.Status.NOT_FOUND) {
-                return null;
-            }
-            throw e;
+    private void updateInstance(AtlasEntityWithExtInfo entity) throws AtlasServiceException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("updating {} entity: {}", entity.getEntity().getTypeName(), entity);
         }
-    }
 
-    private Referenceable getProcessReference(String qualifiedName) throws Exception{
-        LOG.debug("Getting reference for process {}", qualifiedName);
-        String typeName = HiveDataTypes.HIVE_PROCESS.getName();
-        return getEntityReference(typeName, qualifiedName);
+        atlasClientV2.updateEntity(entity);
+
+        LOG.info("Updated {} entity: name={}, guid={}", entity.getEntity().getTypeName(), entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getEntity().getGuid());
     }
 
     /**
-     * Construct the qualified name used to uniquely identify a Table instance in Atlas.
-     * @param clusterName Name of the cluster to which the Hive component belongs
-     * @param dbName Name of the Hive database to which the Table belongs
-     * @param tableName Name of the Hive table
-     * @return Unique qualified name to identify the Table instance in Atlas.
+     * Create a Hive Database entity
+     * @param hiveDB The Hive {@link Database} object from which to map properties
+     * @return new Hive Database AtlasEntity
+     * @throws HiveException
      */
-    public static String getTableQualifiedName(String clusterName, String dbName, String tableName, boolean isTemporaryTable) {
-        String tableTempName = tableName;
-        if (isTemporaryTable) {
-            if (SessionState.get() != null && SessionState.get().getSessionId() != null) {
-                tableTempName = tableName + TEMP_TABLE_PREFIX + SessionState.get().getSessionId();
-            } else {
-                tableTempName = tableName + TEMP_TABLE_PREFIX + RandomStringUtils.random(10);
-            }
-        }
-        return String.format("%s.%s@%s", dbName.toLowerCase(), tableTempName.toLowerCase(), clusterName);
+    private AtlasEntity toDbEntity(Database hiveDB) throws HiveException {
+        return toDbEntity(hiveDB, null);
     }
 
+    private AtlasEntity toDbEntity(Database hiveDB, AtlasEntity dbEntity) {
+        if (dbEntity == null) {
+            dbEntity = new AtlasEntity(HiveDataTypes.HIVE_DB.getName());
+        }
 
+        String dbName = hiveDB.getName().toLowerCase();
 
-    /**
-     * Construct the qualified name used to uniquely identify a Table instance in Atlas.
-     * @param clusterName Name of the cluster to which the Hive component belongs
-     * @param table hive table for which the qualified name is needed
-     * @return Unique qualified name to identify the Table instance in Atlas.
-     */
-    public static String getTableQualifiedName(String clusterName, Table table) {
-        return getTableQualifiedName(clusterName, table.getDbName(), table.getTableName(), table.isTemporary());
-    }
+        dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getDBQualifiedName(clusterName, dbName));
+        dbEntity.setAttribute(ATTRIBUTE_NAME, dbName);
+        dbEntity.setAttribute(ATTRIBUTE_DESCRIPTION, hiveDB.getDescription());
+        dbEntity.setAttribute(ATTRIBUTE_OWNER, hiveDB.getOwnerName());
 
-    public static String getTableProcessQualifiedName(String clusterName, Table table) {
-        String tableQualifiedName = getTableQualifiedName(clusterName, table);
-        Date createdTime = getTableCreatedTime(table);
-        return tableQualifiedName + SEP + createdTime.getTime();
-    }
+        dbEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName);
+        dbEntity.setAttribute(ATTRIBUTE_LOCATION, hdfsNameServiceResolver.getPathWithNameServiceID(hiveDB.getLocationUri()));
+        dbEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveDB.getParameters());
 
-    /**
-     * Construct the qualified name used to uniquely identify a Table instance in Atlas.
-     * @param clusterName Name of the cluster to which the Hive component belongs
-     * @param dbName Name of the Hive database to which the Table belongs
-     * @param tableName Name of the Hive table
-     * @return Unique qualified name to identify the Table instance in Atlas.
-     */
-    public static String getTableQualifiedName(String clusterName, String dbName, String tableName) {
-         return getTableQualifiedName(clusterName, dbName, tableName, false);
-    }
+        if (hiveDB.getOwnerType() != null) {
+            dbEntity.setAttribute(ATTRIBUTE_OWNER_TYPE, OWNER_TYPE_TO_ENUM_VALUE.get(hiveDB.getOwnerType().getValue()));
+        }
 
+        return dbEntity;
+    }
     /**
      * Create a new table instance in Atlas
-     * @param dbReference reference to a created Hive database {@link Referenceable} to which this table belongs
+     * @param  database AtlasEntity for Hive  {@link AtlasEntity} to which this table belongs
      * @param hiveTable reference to the Hive {@link Table} from which to map properties
-     * @return Newly created Hive reference
+     * @return Newly created Hive AtlasEntity
      * @throws Exception
      */
-    public Referenceable createTableInstance(Referenceable dbReference, Table hiveTable)
-            throws AtlasHookException {
-        return createOrUpdateTableInstance(dbReference, null, hiveTable);
-    }
-
-    public static Date getTableCreatedTime(Table table) {
-        return new Date(table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR);
+    private AtlasEntityWithExtInfo toTableEntity(AtlasEntity database, Table hiveTable) throws AtlasHookException {
+        return toTableEntity(database, hiveTable, null);
     }
 
-    private Referenceable createOrUpdateTableInstance(Referenceable dbReference, Referenceable tableReference,
-                                                      final Table hiveTable) throws AtlasHookException {
-        LOG.info("Importing objects from {}.{}", hiveTable.getDbName(), hiveTable.getTableName());
-
-        if (tableReference == null) {
-            tableReference = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
-        }
-
-        String tableQualifiedName = getTableQualifiedName(clusterName, hiveTable);
-        tableReference.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
-        tableReference.set(AtlasClient.NAME, hiveTable.getTableName().toLowerCase());
-        tableReference.set(AtlasClient.OWNER, hiveTable.getOwner());
-
-        Date createDate = new Date();
-        if (hiveTable.getTTable() != null){
-            try {
-                createDate = getTableCreatedTime(hiveTable);
-                LOG.debug("Setting create time to {} ", createDate);
-                tableReference.set(CREATE_TIME, createDate);
-            } catch(Exception ne) {
-                LOG.error("Error while setting createTime for the table {} ", hiveTable.getCompleteName(), ne);
-            }
+    private AtlasEntityWithExtInfo toTableEntity(AtlasEntity database, final Table hiveTable, AtlasEntityWithExtInfo table) throws AtlasHookException {
+        if (table == null) {
+            table = new AtlasEntityWithExtInfo(new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName()));
         }
 
-        Date lastAccessTime = createDate;
-        if ( hiveTable.getLastAccessTime() > 0) {
-            lastAccessTime = new Date(hiveTable.getLastAccessTime() * MILLIS_CONVERT_FACTOR);
-        }
-        tableReference.set(LAST_ACCESS_TIME, lastAccessTime);
-        tableReference.set("retention", hiveTable.getRetention());
+        AtlasEntity tableEntity        = table.getEntity();
+        String      tableQualifiedName = getTableQualifiedName(clusterName, hiveTable);
+        long        createTime         = BaseHiveEvent.getTableCreateTime(hiveTable);
+        long        lastAccessTime     = hiveTable.getLastAccessTime() > 0 ? hiveTable.getLastAccessTime() : createTime;
 
-        tableReference.set(COMMENT, hiveTable.getParameters().get(COMMENT));
+        tableEntity.setAttribute(ATTRIBUTE_DB, BaseHiveEvent.getObjectId(database));
+        tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName);
+        tableEntity.setAttribute(ATTRIBUTE_NAME, hiveTable.getTableName().toLowerCase());
+        tableEntity.setAttribute(ATTRIBUTE_OWNER, hiveTable.getOwner());
 
-        // add reference to the database
-        tableReference.set(DB, dbReference);
-
-        // add reference to the StorageDescriptor
-        Referenceable sdReferenceable = fillStorageDesc(hiveTable.getSd(), tableQualifiedName, getStorageDescQFName(tableQualifiedName), tableReference.getId());
-        tableReference.set(STORAGE_DESC, sdReferenceable);
-
-        tableReference.set(PARAMETERS, hiveTable.getParameters());
+        tableEntity.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
+        tableEntity.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
+        tableEntity.setAttribute(ATTRIBUTE_RETENTION, hiveTable.getRetention());
+        tableEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveTable.getParameters());
+        tableEntity.setAttribute(ATTRIBUTE_COMMENT, hiveTable.getParameters().get(ATTRIBUTE_COMMENT));
+        tableEntity.setAttribute(ATTRIBUTE_TABLE_TYPE, hiveTable.getTableType().name());
+        tableEntity.setAttribute(ATTRIBUTE_TEMPORARY, hiveTable.isTemporary());
 
         if (hiveTable.getViewOriginalText() != null) {
-            tableReference.set("viewOriginalText", hiveTable.getViewOriginalText());
+            tableEntity.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, hiveTable.getViewOriginalText());
         }
 
         if (hiveTable.getViewExpandedText() != null) {
-            tableReference.set("viewExpandedText", hiveTable.getViewExpandedText());
+            tableEntity.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, hiveTable.getViewExpandedText());
         }
 
-        tableReference.set(TABLE_TYPE_ATTR, hiveTable.getTableType().name());
-        tableReference.set("temporary", hiveTable.isTemporary());
+        AtlasEntity       sdEntity = toStroageDescEntity(hiveTable.getSd(), tableQualifiedName, getStorageDescQFName(tableQualifiedName), BaseHiveEvent.getObjectId(tableEntity));
+        List<AtlasEntity> partKeys = toColumns(hiveTable.getPartitionKeys(), tableEntity);
+        List<AtlasEntity> columns  = toColumns(hiveTable.getCols(), tableEntity);
 
-        // add reference to the Partition Keys
-        List<Referenceable> partKeys = getColumns(hiveTable.getPartitionKeys(), tableReference);
-        tableReference.set("partitionKeys", partKeys);
+        tableEntity.setAttribute(ATTRIBUTE_STORAGEDESC, BaseHiveEvent.getObjectId(sdEntity));
+        tableEntity.setAttribute(ATTRIBUTE_PARTITION_KEYS, BaseHiveEvent.getObjectIds(partKeys));
+        tableEntity.setAttribute(ATTRIBUTE_COLUMNS, BaseHiveEvent.getObjectIds(columns));
 
-        tableReference.set(COLUMNS, getColumns(hiveTable.getCols(), tableReference));
+        if (MapUtils.isNotEmpty(table.getReferredEntities())) {
+            table.getReferredEntities().clear();
+        }
 
-        return tableReference;
-    }
+        table.addReferredEntity(database);
+        table.addReferredEntity(sdEntity);
 
-    public static String getStorageDescQFName(String entityQualifiedName) {
-        return entityQualifiedName + "_storage";
-    }
+        if (partKeys != null) {
+            for (AtlasEntity partKey : partKeys) {
+                table.addReferredEntity(partKey);
+            }
+        }
 
-    private Referenceable registerTable(Referenceable dbReference, Table table) throws AtlasHookException {
-        try {
-            String dbName = table.getDbName();
-            String tableName = table.getTableName();
-            LOG.info("Attempting to register table [{}]", tableName);
-            Referenceable tableReference = getTableReference(table);
-            LOG.info("Found result {}", tableReference);
-            if (tableReference == null) {
-                tableReference = createTableInstance(dbReference, table);
-                tableReference = registerInstance(tableReference);
-            } else {
-                LOG.info("Table {}.{} is already registered with id {}. Updating entity.", dbName, tableName,
-                        tableReference.getId().getId());
-                tableReference = createOrUpdateTableInstance(dbReference, tableReference, table);
-                updateInstance(tableReference);
+        if (columns != null) {
+            for (AtlasEntity column : columns) {
+                table.addReferredEntity(column);
             }
-            return tableReference;
-        } catch (Exception e) {
-            throw new AtlasHookException("HiveMetaStoreBridge.getStorageDescQFName() failed.", e);
         }
+
+        return table;
     }
 
-    private void updateInstance(Referenceable referenceable) throws AtlasServiceException {
-        String typeName = referenceable.getTypeName();
-        LOG.debug("updating instance of type {}", typeName);
+    private AtlasEntity toStroageDescEntity(StorageDescriptor storageDesc, String tableQualifiedName, String sdQualifiedName, AtlasObjectId tableId ) throws AtlasHookException {
+        AtlasEntity ret = new AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName());
 
-        String entityJSON = AtlasType.toV1Json(referenceable);
-        LOG.debug("Updating entity {} = {}", referenceable.getTypeName(), entityJSON);
+        ret.setAttribute(ATTRIBUTE_TABLE, tableId);
+        ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName);
+        ret.setAttribute(ATTRIBUTE_PARAMETERS, storageDesc.getParameters());
+        ret.setAttribute(ATTRIBUTE_LOCATION, hdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation()));
+        ret.setAttribute(ATTRIBUTE_INPUT_FORMAT, storageDesc.getInputFormat());
+        ret.setAttribute(ATTRIBUTE_OUTPUT_FORMAT, storageDesc.getOutputFormat());
+        ret.setAttribute(ATTRIBUTE_COMPRESSED, storageDesc.isCompressed());
+        ret.setAttribute(ATTRIBUTE_NUM_BUCKETS, storageDesc.getNumBuckets());
+        ret.setAttribute(ATTRIBUTE_STORED_AS_SUB_DIRECTORIES, storageDesc.isStoredAsSubDirectories());
 
-        atlasClient.updateEntity(referenceable.getId().getId(), referenceable);
-    }
+        if (storageDesc.getBucketCols().size() > 0) {
+            ret.setAttribute(ATTRIBUTE_BUCKET_COLS, storageDesc.getBucketCols());
+        }
 
-    public Referenceable fillStorageDesc(StorageDescriptor storageDesc, String tableQualifiedName,
-        String sdQualifiedName, Id tableId) throws AtlasHookException {
-        LOG.debug("Filling storage descriptor information for {}", storageDesc);
+        if (storageDesc.getSerdeInfo() != null) {
+            SerDeInfo serdeInfo = storageDesc.getSerdeInfo();
 
-        Referenceable sdReferenceable = new Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName());
-        sdReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sdQualifiedName);
+            LOG.debug("serdeInfo = {}", serdeInfo);
+            // SkewedInfo skewedInfo = storageDesc.getSkewedInfo();
 
-        SerDeInfo serdeInfo = storageDesc.getSerdeInfo();
-        LOG.debug("serdeInfo = {}", serdeInfo);
-        // SkewedInfo skewedInfo = storageDesc.getSkewedInfo();
+            AtlasStruct serdeInfoStruct = new AtlasStruct(HiveDataTypes.HIVE_SERDE.getName());
 
-        String serdeInfoName = HiveDataTypes.HIVE_SERDE.getName();
-        Struct serdeInfoStruct = new Struct(serdeInfoName);
+            serdeInfoStruct.setAttribute(ATTRIBUTE_NAME, serdeInfo.getName());
+            serdeInfoStruct.setAttribute(ATTRIBUTE_SERIALIZATION_LIB, serdeInfo.getSerializationLib());
+            serdeInfoStruct.setAttribute(ATTRIBUTE_PARAMETERS, serdeInfo.getParameters());
 
-        serdeInfoStruct.set(AtlasClient.NAME, serdeInfo.getName());
-        serdeInfoStruct.set("serializationLib", serdeInfo.getSerializationLib());
-        serdeInfoStruct.set(PARAMETERS, serdeInfo.getParameters());
+            ret.setAttribute(ATTRIBUTE_SERDE_INFO, serdeInfoStruct);
+        }
 
-        sdReferenceable.set("serdeInfo", serdeInfoStruct);
-        sdReferenceable.set(STORAGE_NUM_BUCKETS, storageDesc.getNumBuckets());
-        sdReferenceable
-                .set(STORAGE_IS_STORED_AS_SUB_DIRS, storageDesc.isStoredAsSubDirectories());
+        if (CollectionUtils.isNotEmpty(storageDesc.getSortCols())) {
+            List<AtlasStruct> sortColsStruct = new ArrayList<>();
 
-        List<Struct> sortColsStruct = new ArrayList<>();
-        for (Order sortcol : storageDesc.getSortCols()) {
-            String hiveOrderName = HiveDataTypes.HIVE_ORDER.getName();
-            Struct colStruct = new Struct(hiveOrderName);
-            colStruct.set("col", sortcol.getCol());
-            colStruct.set("order", sortcol.getOrder());
+            for (Order sortcol : storageDesc.getSortCols()) {
+                String hiveOrderName = HiveDataTypes.HIVE_ORDER.getName();
+                AtlasStruct colStruct = new AtlasStruct(hiveOrderName);
+                colStruct.setAttribute("col", sortcol.getCol());
+                colStruct.setAttribute("order", sortcol.getOrder());
 
-            sortColsStruct.add(colStruct);
-        }
-        if (sortColsStruct.size() > 0) {
-            sdReferenceable.set("sortCols", sortColsStruct);
+                sortColsStruct.add(colStruct);
+            }
+
+            ret.setAttribute(ATTRIBUTE_SORT_COLS, sortColsStruct);
         }
 
-        sdReferenceable.set(LOCATION, hdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation()));
-        sdReferenceable.set("inputFormat", storageDesc.getInputFormat());
-        sdReferenceable.set("outputFormat", storageDesc.getOutputFormat());
-        sdReferenceable.set("compressed", storageDesc.isCompressed());
+        return ret;
+    }
 
-        if (storageDesc.getBucketCols().size() > 0) {
-            sdReferenceable.set("bucketCols", storageDesc.getBucketCols());
-        }
+    private List<AtlasEntity> toColumns(List<FieldSchema> schemaList, AtlasEntity table) throws AtlasHookException {
+        List<AtlasEntity> ret = new ArrayList<>();
 
-        sdReferenceable.set(PARAMETERS, storageDesc.getParameters());
-        sdReferenceable.set("storedAsSubDirectories", storageDesc.isStoredAsSubDirectories());
-        sdReferenceable.set(TABLE, tableId);
+        int columnPosition = 0;
+        for (FieldSchema fs : schemaList) {
+            LOG.debug("Processing field {}", fs);
 
-        return sdReferenceable;
-    }
+            AtlasEntity column = new AtlasEntity(HiveDataTypes.HIVE_COLUMN.getName());
+
+            column.setAttribute(ATTRIBUTE_TABLE, BaseHiveEvent.getObjectId(table));
+            column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getColumnQualifiedName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), fs.getName()));
+            column.setAttribute(ATTRIBUTE_NAME, fs.getName());
+            column.setAttribute(ATTRIBUTE_OWNER, table.getAttribute(ATTRIBUTE_OWNER));
+            column.setAttribute(ATTRIBUTE_COL_TYPE, fs.getType());
+            column.setAttribute(ATTRIBUTE_COL_POSITION, columnPosition++);
+            column.setAttribute(ATTRIBUTE_COMMENT, fs.getComment());
 
-    public Referenceable fillHDFSDataSet(String pathUri) {
-        Referenceable ref = new Referenceable(HDFS_PATH);
+            ret.add(column);
+        }
+        return ret;
+    }
 
-        // Get the name service ID for the given HDFS path
-        String nameServiceID = hdfsNameServiceResolver.getNameServiceIDForPath(pathUri);
+    private AtlasEntity toHdfsPathEntity(String pathUri) {
+        AtlasEntity ret           = new AtlasEntity(HDFS_PATH);
+        String      nameServiceID = hdfsNameServiceResolver.getNameServiceIDForPath(pathUri);
+        Path        path          = new Path(pathUri);
 
-        Path path = new Path(pathUri);
-        ref.set(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
+        ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
+        ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName);
 
         if (StringUtils.isNotEmpty(nameServiceID)) {
-            // Name service resolution is successful, now get updated HDFS path where the host port info is replaced by
-            // resolved name service
+            // Name service resolution is successful, now get updated HDFS path where the host port info is replaced by resolved name service
             String updatedHdfsPath = hdfsNameServiceResolver.getPathWithNameServiceID(pathUri);
-            ref.set("path", updatedHdfsPath);
-            ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, updatedHdfsPath));
-            // Only set name service if it was resolved
-            ref.set("nameServiceId", nameServiceID);
+
+            ret.setAttribute(ATTRIBUTE_PATH, updatedHdfsPath);
+            ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHdfsPathQualifiedName(updatedHdfsPath));
+            ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID);
         } else {
-            ref.set("path", pathUri);
+            ret.setAttribute(ATTRIBUTE_PATH, pathUri);
+
             // Only append clusterName for the HDFS path
             if (pathUri.startsWith(HdfsNameServiceResolver.HDFS_SCHEME)) {
-                ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, pathUri));
+                ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHdfsPathQualifiedName(pathUri));
             } else {
-                ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, pathUri);
+                ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathUri);
             }
         }
-        ref.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
-        return ref;
+
+        return ret;
     }
 
-    public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) {
-        final String[] parts = tableQualifiedName.split("@");
-        final String tableName = parts[0];
-        final String clusterName = parts[1];
-        return String.format("%s.%s@%s", tableName, colName.toLowerCase(), clusterName);
+    /**
+     * Gets the atlas entity for the database
+     * @param databaseName  database Name
+     * @param clusterName    cluster name
+     * @return AtlasEntity for database if exists, else null
+     * @throws Exception
+     */
+    private AtlasEntityWithExtInfo findDatabase(String clusterName, String databaseName) throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Searching Atlas for database {}", databaseName);
+        }
+
+        String typeName = HiveDataTypes.HIVE_DB.getName();
+
+        return findEntity(typeName, getDBQualifiedName(clusterName, databaseName));
     }
 
-    public List<Referenceable> getColumns(List<FieldSchema> schemaList, Referenceable tableReference) throws AtlasHookException {
-        List<Referenceable> colList = new ArrayList<>();
-        int columnPosition = 0;
-        for (FieldSchema fs : schemaList) {
-            LOG.debug("Processing field {}", fs);
-            Referenceable colReferenceable = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName());
-            colReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                    getColumnQualifiedName((String) tableReference.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), fs.getName()));
-            colReferenceable.set(AtlasClient.NAME, fs.getName());
-            colReferenceable.set(AtlasClient.OWNER, tableReference.get(AtlasClient.OWNER));
-            colReferenceable.set("type", fs.getType());
-            colReferenceable.set(POSITION, columnPosition++);
-            colReferenceable.set(COMMENT, fs.getComment());
-            colReferenceable.set(TABLE, tableReference.getId());
-
-
-            colList.add(colReferenceable);
+    /**
+     * Gets Atlas Entity for the table
+     *
+     * @param hiveTable
+     * @return table entity from Atlas  if exists, else null
+     * @throws Exception
+     */
+    private AtlasEntityWithExtInfo findTableEntity(Table hiveTable)  throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Searching Atlas for table {}.{}", hiveTable.getDbName(), hiveTable.getTableName());
         }
-        return colList;
+
+        String typeName         = HiveDataTypes.HIVE_TABLE.getName();
+        String tblQualifiedName = getTableQualifiedName(getClusterName(), hiveTable.getDbName(), hiveTable.getTableName());
+
+        return findEntity(typeName, tblQualifiedName);
     }
 
-    public static String getHdfsPathQualifiedName(String clusterName, String hdfsPath) {
-        return String.format("%s@%s", hdfsPath, clusterName);
+    private AtlasEntityWithExtInfo findProcessEntity(String qualifiedName) throws Exception{
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Searching Atlas for process {}", qualifiedName);
+        }
+
+        String typeName = HiveDataTypes.HIVE_PROCESS.getName();
+
+        return findEntity(typeName, qualifiedName);
     }
 
+    private AtlasEntityWithExtInfo findEntity(final String typeName, final String qualifiedName) throws AtlasServiceException {
+        AtlasClientV2 atlasClientV2 = getAtlasClient();
 
-    public static void main(String[] args) throws AtlasHookException {
         try {
-        Configuration atlasConf = ApplicationProperties.get();
-        String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT);
-        if (atlasEndpoint == null || atlasEndpoint.length == 0){
-            atlasEndpoint = new String[] { DEFAULT_DGI_URL };
-        }
-        AtlasClient atlasClient;
+            return atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
+        } catch (AtlasServiceException e) {
+            if(e.getStatus() == ClientResponse.Status.NOT_FOUND) {
+                return null;
+            }
 
-        if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
-            String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
-            atlasClient = new AtlasClient(atlasEndpoint, basicAuthUsernamePassword);
-        } else {
-            UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-            atlasClient = new AtlasClient(ugi, ugi.getShortUserName(), atlasEndpoint);
+            throw e;
         }
+    }
 
-        Options options = new Options();
-        CommandLineParser parser = new BasicParser();
-        CommandLine cmd = parser.parse( options, args);
+    private String getCreateTableString(Table table, String location){
+        String            colString = "";
+        List<FieldSchema> colList   = table.getAllCols();
 
-        boolean failOnError = false;
-        if (cmd.hasOption("failOnError")) {
-            failOnError = true;
+        if (colList != null) {
+            for (FieldSchema col : colList) {
+                colString += col.getName() + " " + col.getType() + ",";
+            }
+
+            if (colList.size() > 0) {
+                colString = colString.substring(0, colString.length() - 1);
+                colString = "(" + colString + ")";
+            }
         }
 
-        HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClient);
-        hiveMetaStoreBridge.importHiveMetadata(failOnError);
+        String query = "create external table " + table.getTableName() +  colString + " location '" + location + "'";
+
+        return query;
+    }
+
+    private String lower(String str) {
+        if (StringUtils.isEmpty(str)) {
+            return "";
         }
-        catch(Exception e) {
-            throw new AtlasHookException("HiveMetaStoreBridge.main() failed.", e);
+
+        return str.toLowerCase().trim();
+    }
+
+
+    /**
+     * Construct the qualified name used to uniquely identify a Table instance in Atlas.
+     * @param clusterName Name of the cluster to which the Hive component belongs
+     * @param table hive table for which the qualified name is needed
+     * @return Unique qualified name to identify the Table instance in Atlas.
+     */
+    private static String getTableQualifiedName(String clusterName, Table table) {
+        return getTableQualifiedName(clusterName, table.getDbName(), table.getTableName(), table.isTemporary());
+    }
+
+    private String getHdfsPathQualifiedName(String hdfsPath) {
+        return String.format("%s@%s", hdfsPath, clusterName);
+    }
+
+    /**
+     * Construct the qualified name used to uniquely identify a Database instance in Atlas.
+     * @param clusterName Name of the cluster to which the Hive component belongs
+     * @param dbName Name of the Hive database
+     * @return Unique qualified name to identify the Database instance in Atlas.
+     */
+    public static String getDBQualifiedName(String clusterName, String dbName) {
+        return String.format("%s@%s", dbName.toLowerCase(), clusterName);
+    }
+
+    /**
+     * Construct the qualified name used to uniquely identify a Table instance in Atlas.
+     * @param clusterName Name of the cluster to which the Hive component belongs
+     * @param dbName Name of the Hive database to which the Table belongs
+     * @param tableName Name of the Hive table
+     * @param isTemporaryTable is this a temporary table
+     * @return Unique qualified name to identify the Table instance in Atlas.
+     */
+    public static String getTableQualifiedName(String clusterName, String dbName, String tableName, boolean isTemporaryTable) {
+        String tableTempName = tableName;
+
+        if (isTemporaryTable) {
+            if (SessionState.get() != null && SessionState.get().getSessionId() != null) {
+                tableTempName = tableName + TEMP_TABLE_PREFIX + SessionState.get().getSessionId();
+            } else {
+                tableTempName = tableName + TEMP_TABLE_PREFIX + RandomStringUtils.random(10);
+            }
         }
+
+        return String.format("%s.%s@%s", dbName.toLowerCase(), tableTempName.toLowerCase(), clusterName);
+    }
+
+    public static String getTableProcessQualifiedName(String clusterName, Table table) {
+        String tableQualifiedName = getTableQualifiedName(clusterName, table);
+        long   createdTime        = getTableCreatedTime(table);
+
+        return tableQualifiedName + SEP + createdTime;
+    }
+
+
+    /**
+     * Construct the qualified name used to uniquely identify a Table instance in Atlas.
+     * @param clusterName Name of the cluster to which the Hive component belongs
+     * @param dbName Name of the Hive database to which the Table belongs
+     * @param tableName Name of the Hive table
+     * @return Unique qualified name to identify the Table instance in Atlas.
+     */
+    public static String getTableQualifiedName(String clusterName, String dbName, String tableName) {
+        return getTableQualifiedName(clusterName, dbName, tableName, false);
+    }
+    public static String getStorageDescQFName(String tableQualifiedName) {
+        return tableQualifiedName + "_storage";
+    }
+
+    public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) {
+        final String[] parts       = tableQualifiedName.split("@");
+        final String   tableName   = parts[0];
+        final String   clusterName = parts[1];
+
+        return String.format("%s.%s@%s", tableName, colName.toLowerCase(), clusterName);
+    }
+
+    public static long getTableCreatedTime(Table table) {
+        return table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR;
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6e02ec5b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
new file mode 100644
index 0000000..9105ebe
--- /dev/null
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.hive.hook;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class AtlasHiveHookContext {
+    private final HiveHook                 hook;
+    private final HiveOperation            hiveOperation;
+    private final HookContext              hiveContext;
+    private final Hive                     hive;
+    private final Map<String, AtlasEntity> qNameEntityMap = new HashMap<>();
+
+    public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext) throws Exception {
+        this.hook          = hook;
+        this.hiveOperation = hiveOperation;
+        this.hiveContext   = hiveContext;
+        this.hive          = Hive.get(hiveContext.getConf());
+    }
+
+    public HookContext getHiveContext() {
+        return hiveContext;
+    }
+
+    public Hive getHive() {
+        return hive;
+    }
+
+    public HiveOperation getHiveOperation() {
+        return hiveOperation;
+    }
+
+    public void putEntity(String qualifiedName, AtlasEntity entity) {
+        qNameEntityMap.put(qualifiedName, entity);
+    }
+
+    public AtlasEntity getEntity(String qualifiedName) {
+        return qNameEntityMap.get(qualifiedName);
+    }
+
+    public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); }
+
+
+    public String getClusterName() {
+        return hook.getClusterName();
+    }
+
+    public boolean isKnownDatabase(String dbQualifiedName) {
+        return hook.isKnownDatabase(dbQualifiedName);
+    }
+
+    public boolean isKnownTable(String tblQualifiedName) {
+        return hook.isKnownTable(tblQualifiedName);
+    }
+
+    public void addToKnownEntities(Collection<AtlasEntity> entities) {
+        hook.addToKnownEntities(entities);
+    }
+
+    public void removeFromKnownDatabase(String dbQualifiedName) {
+        hook.removeFromKnownDatabase(dbQualifiedName);
+    }
+
+    public void removeFromKnownTable(String tblQualifiedName) {
+        hook.removeFromKnownTable(tblQualifiedName);
+    }
+}