You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by si...@apache.org on 2021/11/03 23:49:22 UTC

[atlas] branch branch-2.0 updated: ATLAS-4424: Enhanced the Import hive utility to create export zip files and run bulk import

This is an automated email from the ASF dual-hosted git repository.

sidmishra pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new a444e3a  ATLAS-4424: Enhanced the Import hive utility to create export zip files and run bulk import
a444e3a is described below

commit a444e3aa7b607e64e8b928b32b89eb14f439c94c
Author: Sidharth Mishra <si...@gmail.com>
AuthorDate: Wed Nov 3 16:43:35 2021 -0700

    ATLAS-4424: Enhanced the Import hive utility to create export zip files and run bulk import
    
    Signed-off-by: Sidharth Mishra <si...@apache.org>
    (cherry picked from commit 041e7d737efbe243ebeb525ae4af1941b1008ee2)
---
 addons/hive-bridge/src/bin/import-hive.sh          |   18 +-
 .../atlas/hive/bridge/HiveMetaStoreBridge.java     |  227 +++--
 .../atlas/hive/bridge/HiveMetaStoreBridgeV2.java   | 1036 ++++++++++++++++++++
 .../atlas/model/impexp/AtlasImportRequest.java     |    5 +-
 .../model/migration/MigrationImportStatus.java     |   29 +-
 .../migration/DataMigrationStatusService.java      |   33 +-
 .../migration/ZipFileMigrationImporter.java        |   10 +-
 .../impexp/DataMigrationStatusServiceTest.java     |   10 +-
 .../apache/atlas/web/resources/AdminResource.java  |   10 +-
 9 files changed, 1265 insertions(+), 113 deletions(-)

diff --git a/addons/hive-bridge/src/bin/import-hive.sh b/addons/hive-bridge/src/bin/import-hive.sh
index c353937..ebe6976 100755
--- a/addons/hive-bridge/src/bin/import-hive.sh
+++ b/addons/hive-bridge/src/bin/import-hive.sh
@@ -140,24 +140,34 @@ do
     -d) IMPORT_ARGS="$IMPORT_ARGS -d $1"; shift;;
     -t) IMPORT_ARGS="$IMPORT_ARGS -t $1"; shift;;
     -f) IMPORT_ARGS="$IMPORT_ARGS -f $1"; shift;;
+    -o) IMPORT_ARGS="$IMPORT_ARGS -o $1"; shift;;
+    -i) IMPORT_ARGS="$IMPORT_ARGS -i";;
+    -h) export HELP_OPTION="true"; IMPORT_ARGS="$IMPORT_ARGS -h";;
     --database) IMPORT_ARGS="$IMPORT_ARGS --database $1"; shift;;
     --table) IMPORT_ARGS="$IMPORT_ARGS --table $1"; shift;;
     --filename) IMPORT_ARGS="$IMPORT_ARGS --filename $1"; shift;;
+    --output) IMPORT_ARGS="$IMPORT_ARGS --output $1"; shift;;
+    --ignoreBulkImport) IMPORT_ARGS="$IMPORT_ARGS --ignoreBulkImport";;
+    --help) export HELP_OPTION="true"; IMPORT_ARGS="$IMPORT_ARGS --help";;
     -deleteNonExisting) IMPORT_ARGS="$IMPORT_ARGS -deleteNonExisting";;
     "") break;;
-    *) JVM_ARGS="$JVM_ARGS $option"
+    *) IMPORT_ARGS="$IMPORT_ARGS $option"
   esac
 done
 
 JAVA_PROPERTIES="${JAVA_PROPERTIES} ${JVM_ARGS}"
 
-echo "Log file for import is $LOGFILE"
+if [ -z ${HELP_OPTION} ]; then
+  echo "Log file for import is $LOGFILE"
+fi
 
 "${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.hive.bridge.HiveMetaStoreBridge $IMPORT_ARGS
 
 RETVAL=$?
-[ $RETVAL -eq 0 ] && echo Hive metadata imported successfully!
-[ $RETVAL -ne 0 ] && echo Failed to import Hive metadata! Check logs at: $LOGFILE for details.
+if [ -z ${HELP_OPTION} ]; then
+  [ $RETVAL -eq 0 ] && echo Hive Meta Data imported successfully!
+  [ $RETVAL -eq 1 ] && echo Failed to import Hive Meta Data! Check logs at: $LOGFILE for details.
+fi
 
 exit $RETVAL
 
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index c361ac6..28365bc 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -20,7 +20,6 @@ package org.apache.atlas.hive.bridge;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.sun.jersey.api.client.ClientResponse;
-import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClientV2;
@@ -43,12 +42,14 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasStruct;
 import org.apache.atlas.utils.PathExtractorContext;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.collections.CollectionUtils;
 
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.MissingArgumentException;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.ArrayUtils;
@@ -105,10 +106,27 @@ public class HiveMetaStoreBridge {
     public static final String HIVE_TABLE_DB_EDGE_LABEL        = "__hive_table.db";
     public static final String HOOK_HIVE_PAGE_LIMIT            = CONF_PREFIX + "page.limit";
 
+    static final String OPTION_OUTPUT_FILEPATH_SHORT     = "o";
+    static final String OPTION_OUTPUT_FILEPATH_LONG      = "output";
+    static final String OPTION_IGNORE_BULK_IMPORT_SHORT  = "i";
+    static final String OPTION_IGNORE_BULK_IMPORT_LONG   = "ignoreBulkImport";
+    static final String OPTION_DATABASE_SHORT            = "d";
+    static final String OPTION_DATABASE_LONG             = "database";
+    static final String OPTION_TABLE_SHORT               = "t";
+    static final String OPTION_TABLE_LONG                = "table";
+    static final String OPTION_IMPORT_DATA_FILE_SHORT    = "f";
+    static final String OPTION_IMPORT_DATA_FILE_LONG     = "filename";
+    static final String OPTION_FAIL_ON_ERROR             = "failOnError";
+    static final String OPTION_DELETE_NON_EXISTING       = "deleteNonExisting";
+    static final String OPTION_HELP_SHORT                = "h";
+    static final String OPTION_HELP_LONG                 = "help";
+
     public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2  = "v2";
 
-    private static final int    EXIT_CODE_SUCCESS = 0;
-    private static final int    EXIT_CODE_FAILED  = 1;
+    private static final int    EXIT_CODE_SUCCESS      = 0;
+    private static final int    EXIT_CODE_FAILED       = 1;
+    private static final int    EXIT_CODE_INVALID_ARG  = 2;
+
     private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
     private static       int    pageLimit         = 10000;
 
@@ -122,84 +140,63 @@ public class HiveMetaStoreBridge {
     public static void main(String[] args) {
         int exitCode = EXIT_CODE_FAILED;
         AtlasClientV2 atlasClientV2 = null;
+        Options acceptedCliOptions = prepareCommandLineOptions();
 
         try {
-            Options options = new Options();
-            options.addOption("d", "database", true, "Database name");
-            options.addOption("t", "table", true, "Table name");
-            options.addOption("f", "filename", true, "Filename");
-            options.addOption("failOnError", false, "failOnError");
-            options.addOption("deleteNonExisting", false, "Delete database and table entities in Atlas if not present in Hive");
-
-            CommandLine   cmd               = new BasicParser().parse(options, args);
-            boolean       failOnError       = cmd.hasOption("failOnError");
-            boolean       deleteNonExisting = cmd.hasOption("deleteNonExisting");
-            LOG.info("delete non existing flag : {} ", deleteNonExisting);
-
-            String        databaseToImport = cmd.getOptionValue("d");
-            String        tableToImport    = cmd.getOptionValue("t");
-            String        fileToImport     = cmd.getOptionValue("f");
-            Configuration atlasConf        = ApplicationProperties.get();
-            String[]      atlasEndpoint    = atlasConf.getStringArray(ATLAS_ENDPOINT);
-
-            if (atlasEndpoint == null || atlasEndpoint.length == 0) {
-                atlasEndpoint = new String[] { DEFAULT_ATLAS_URL };
-            }
+            CommandLine  cmd              = new BasicParser().parse(acceptedCliOptions, args);
+            List<String> argsNotProcessed = cmd.getArgList();
 
+            if (argsNotProcessed != null && argsNotProcessed.size() > 0) {
+                throw new ParseException("Unrecognized arguments.");
+            }
 
-            if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
-                String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
-
-                atlasClientV2 = new AtlasClientV2(atlasEndpoint, basicAuthUsernamePassword);
+            if (cmd.hasOption(OPTION_HELP_SHORT)) {
+                printUsage(acceptedCliOptions);
+                exitCode = EXIT_CODE_SUCCESS;
             } else {
-                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+                Configuration atlasConf        = ApplicationProperties.get();
+                String[]      atlasEndpoint    = atlasConf.getStringArray(ATLAS_ENDPOINT);
 
-                atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint);
-            }
-
-            HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2);
+                if (atlasEndpoint == null || atlasEndpoint.length == 0) {
+                    atlasEndpoint = new String[] { DEFAULT_ATLAS_URL };
+                }
 
-            if (deleteNonExisting) {
-                hiveMetaStoreBridge.deleteEntitiesForNonExistingHiveMetadata(failOnError);
-                exitCode = EXIT_CODE_SUCCESS;
-            } else if (StringUtils.isNotEmpty(fileToImport)) {
-                File f = new File(fileToImport);
+                if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
+                    String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
 
-                if (f.exists() && f.canRead()) {
-                    BufferedReader br   = new BufferedReader(new FileReader(f));
-                    String         line = null;
+                    atlasClientV2 = new AtlasClientV2(atlasEndpoint, basicAuthUsernamePassword);
+                } else {
+                    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
 
-                    while((line = br.readLine()) != null) {
-                        String val[] = line.split(":");
+                    atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint);
+                }
 
-                        if (ArrayUtils.isNotEmpty(val)) {
-                            databaseToImport = val[0];
+                boolean createZip = cmd.hasOption(OPTION_OUTPUT_FILEPATH_LONG);
 
-                            if (val.length > 1) {
-                                tableToImport = val[1];
-                            } else {
-                                tableToImport = "";
-                            }
+                if (createZip) {
+                    HiveMetaStoreBridgeV2 hiveMetaStoreBridgeV2 = new HiveMetaStoreBridgeV2(atlasConf, new HiveConf(), atlasClientV2);
 
-                            hiveMetaStoreBridge.importHiveMetadata(databaseToImport, tableToImport, failOnError);
-                        }
+                    if (hiveMetaStoreBridgeV2.exportDataToZipAndRunAtlasImport(cmd)) {
+                        exitCode = EXIT_CODE_SUCCESS;
                     }
-
-                    exitCode = EXIT_CODE_SUCCESS;
                 } else {
-                    LOG.error("Failed to read the input file: " + fileToImport);
-                    exitCode = EXIT_CODE_FAILED;
+                    HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2);
+
+                    if (hiveMetaStoreBridge.importDataDirectlyToAtlas(cmd)) {
+                        exitCode = EXIT_CODE_SUCCESS;
+                    }
                 }
-            } else {
-                hiveMetaStoreBridge.importHiveMetadata(databaseToImport, tableToImport, failOnError);
-                exitCode = EXIT_CODE_SUCCESS;
             }
-
         } catch(ParseException e) {
-            LOG.error("Failed to parse arguments. Error: ", e.getMessage());
-            printUsage();
+            LOG.error("Invalid argument. Error: {}", e.getMessage());
+            System.out.println("Invalid argument. Error: " + e.getMessage());
+            exitCode = EXIT_CODE_INVALID_ARG;
+
+            if (!(e instanceof MissingArgumentException)) {
+                printUsage(acceptedCliOptions);
+            }
         } catch(Exception e) {
-            LOG.error("Import failed", e);
+            LOG.error("Import Failed", e);
         } finally {
             if( atlasClientV2 !=null) {
                 atlasClientV2.close();
@@ -209,26 +206,48 @@ public class HiveMetaStoreBridge {
         System.exit(exitCode);
     }
 
-    private static void printUsage() {
+    private static Options prepareCommandLineOptions() {
+        Options acceptedCliOptions = new Options();
+
+        return acceptedCliOptions.addOption(OPTION_OUTPUT_FILEPATH_SHORT, OPTION_OUTPUT_FILEPATH_LONG, true, "Output path or file for Zip import")
+                .addOption(OPTION_IGNORE_BULK_IMPORT_SHORT, OPTION_IGNORE_BULK_IMPORT_LONG, false, "Ignore bulk Import for Zip import")
+                .addOption(OPTION_DATABASE_SHORT, OPTION_DATABASE_LONG, true, "Database name")
+                .addOption(OPTION_TABLE_SHORT, OPTION_TABLE_LONG, true, "Table name")
+                .addOption(OPTION_IMPORT_DATA_FILE_SHORT, OPTION_IMPORT_DATA_FILE_LONG, true, "Filename")
+                .addOption(OPTION_FAIL_ON_ERROR, false, "failOnError")
+                .addOption(OPTION_DELETE_NON_EXISTING, false, "Delete database and table entities in Atlas if not present in Hive")
+                .addOption(OPTION_HELP_SHORT, OPTION_HELP_LONG, false, "Print this help message");
+    }
+
+    private static void printUsage(Options options) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("import-hive.sh", options);
+        System.out.println();
+        System.out.println("Usage options:");
+        System.out.println("    Usage 1: import-hive.sh [-d <database> OR --database <database>] "  );
+        System.out.println("        Imports specified database and its tables ...");
+        System.out.println();
+        System.out.println("    Usage 2: import-hive.sh [-d <database> OR --database <database>] [-t <table> OR --table <table>]");
+        System.out.println("        Imports specified table within that database ...");
         System.out.println();
+        System.out.println("    Usage 3: import-hive.sh");
+        System.out.println("        Imports all databases and tables...");
         System.out.println();
-        System.out.println("Usage 1: import-hive.sh [-d <database> OR --database <database>] "  );
-        System.out.println("    Imports specified database and its tables ...");
+        System.out.println("    Usage 4: import-hive.sh -f <filename>");
+        System.out.println("        Imports all databases and tables in the file...");
+        System.out.println("        Format:");
+        System.out.println("            database1:tbl1");
+        System.out.println("            database1:tbl2");
+        System.out.println("            database2:tbl2");
         System.out.println();
-        System.out.println("Usage 2: import-hive.sh [-d <database> OR --database <database>] [-t <table> OR --table <table>]");
-        System.out.println("    Imports specified table within that database ...");
+        System.out.println("    Usage 5: import-hive.sh [-deleteNonExisting] "  );
+        System.out.println("        Deletes databases and tables which are not in Hive ...");
         System.out.println();
-        System.out.println("Usage 3: import-hive.sh");
-        System.out.println("    Imports all databases and tables...");
+        System.out.println("    Usage 6: import-hive.sh -o <output Path or file> [-f <filename>] [-d <database> OR --database <database>] [-t <table> OR --table <table>]");
+        System.out.println("        To create zip file with exported data and import the zip file at Atlas ...");
         System.out.println();
-        System.out.println("Usage 4: import-hive.sh -f <filename>");
-        System.out.println("  Imports all databases and tables in the file...");
-        System.out.println("    Format:");
-        System.out.println("    database1:tbl1");
-        System.out.println("    database1:tbl2");
-        System.out.println("    database2:tbl2");
-        System.out.println("Usage 5: import-hive.sh [-deleteNonExisting] "  );
-        System.out.println("    Deletes databases and tables which are not in Hive ...");
+        System.out.println("    Usage 7: import-hive.sh -i -o <output Path or file> [-f <filename>] [-d <database> OR --database <database>] [-t <table> OR --table <table>]");
+        System.out.println("        To create zip file with exported data without importing to Atlas which can be imported later ...");
         System.out.println();
     }
 
@@ -286,6 +305,54 @@ public class HiveMetaStoreBridge {
         return convertHdfsPathToLowerCase;
     }
 
+    public boolean importDataDirectlyToAtlas(CommandLine cmd) throws Exception {
+        LOG.info("Importing Hive metadata");
+        boolean ret = false;
+
+        String        databaseToImport = cmd.getOptionValue(OPTION_DATABASE_SHORT);
+        String        tableToImport    = cmd.getOptionValue(OPTION_TABLE_SHORT);
+        String        fileToImport     = cmd.getOptionValue(OPTION_IMPORT_DATA_FILE_SHORT);
+
+        boolean       failOnError       = cmd.hasOption(OPTION_FAIL_ON_ERROR);
+        boolean       deleteNonExisting = cmd.hasOption(OPTION_DELETE_NON_EXISTING);
+
+        LOG.info("delete non existing flag : {} ", deleteNonExisting);
+
+        if (deleteNonExisting) {
+            deleteEntitiesForNonExistingHiveMetadata(failOnError);
+            ret = true;
+        } else if (StringUtils.isNotEmpty(fileToImport)) {
+            File f = new File(fileToImport);
+
+            if (f.exists() && f.canRead()) {
+                BufferedReader br   = new BufferedReader(new FileReader(f));
+                String         line = null;
+
+                while((line = br.readLine()) != null) {
+                    String val[] = line.split(":");
+
+                    if (ArrayUtils.isNotEmpty(val)) {
+                        databaseToImport = val[0];
+
+                        if (val.length > 1) {
+                            tableToImport = val[1];
+                        } else {
+                            tableToImport = "";
+                        }
+
+                        importDatabases(failOnError, databaseToImport, tableToImport);
+                    }
+                }
+                ret = true;
+            } else {
+                LOG.error("Failed to read the input file: " + fileToImport);
+            }
+        } else {
+            importDatabases(failOnError, databaseToImport, tableToImport);
+            ret = true;
+        }
+        return ret;
+    }
 
     @VisibleForTesting
     public void importHiveMetadata(String databaseToImport, String tableToImport, boolean failOnError) throws Exception {
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeV2.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeV2.java
new file mode 100644
index 0000000..0627c0e
--- /dev/null
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeV2.java
@@ -0,0 +1,1036 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.hive.bridge;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.hive.hook.events.BaseHiveEvent;
+import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.hook.AtlasHookException;
+import org.apache.atlas.utils.AtlasPathExtractorUtil;
+import org.apache.atlas.utils.HdfsNameServiceResolver;
+import org.apache.atlas.utils.AtlasConfigurationUtil;
+import org.apache.atlas.utils.PathExtractorContext;
+import org.apache.atlas.utils.LruCache;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasStruct;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.commons.cli.MissingArgumentException;
+import org.apache.commons.collections.CollectionUtils;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.OutputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.apache.atlas.hive.hook.events.BaseHiveEvent.*;
+
+/**
+ * A Bridge Utility that imports metadata into zip file from the Hive Meta Store
+ * which can be exported at Atlas
+ */
+public class HiveMetaStoreBridgeV2 {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridgeV2.class);
+
+    private static final String OPTION_DATABASE_SHORT            = "d";
+    private static final String OPTION_TABLE_SHORT               = "t";
+    private static final String OPTION_IMPORT_DATA_FILE_SHORT    = "f";
+    private static final String OPTION_OUTPUT_FILEPATH_SHORT     = "o";
+    private static final String OPTION_IGNORE_BULK_IMPORT_SHORT  = "i";
+
+    public static final String CONF_PREFIX                     = "atlas.hook.hive.";
+    public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
+    public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION = CONF_PREFIX + "aws_s3.atlas.model.version";
+
+    public static final String CLUSTER_NAME_KEY                = "atlas.cluster.name";
+    public static final String HIVE_USERNAME                   = "atlas.hook.hive.default.username";
+    public static final String HIVE_METADATA_NAMESPACE         = "atlas.metadata.namespace";
+    public static final String DEFAULT_CLUSTER_NAME            = "primary";
+    public static final String TEMP_TABLE_PREFIX               = "_temp-";
+    public static final String SEP                             = ":".intern();
+    public static final String DEFAULT_METASTORE_CATALOG       = "hive";
+    public static final String HOOK_HIVE_PAGE_LIMIT            = CONF_PREFIX + "page.limit";
+
+    private static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2  = "v2";
+    private static final String ZIP_FILE_COMMENT_FORMAT             = "{\"entitiesCount\":%d, \"total\":%d}";
+    private static final int    DEFAULT_PAGE_LIMIT                  = 10000;
+    private static final String DEFAULT_ZIP_FILE_NAME               = "import-hive-output.zip";
+    private static final String ZIP_ENTRY_ENTITIES                  = "entities.json";
+    private static final String TYPES_DEF_JSON                      = "atlas-typesdef.json";
+
+    private static final String JSON_ARRAY_START    = "[";
+    private static final String JSON_COMMA          = ",";
+    private static final String JSON_EMPTY_OBJECT   = "{}";
+    private static final String JSON_ARRAY_END      = "]";
+
+    private static       int    pageLimit = DEFAULT_PAGE_LIMIT;
+    private String awsS3AtlasModelVersion = null;
+
+    private final String        metadataNamespace;
+    private final Hive          hiveClient;
+    private final AtlasClientV2 atlasClientV2;
+    private final boolean       convertHdfsPathToLowerCase;
+
+    private ZipOutputStream zipOutputStream;
+    private String          outZipFileName;
+    private int             totalProcessedEntities = 0;
+
+    private final Map<String, AtlasEntityWithExtInfo> entityLRUCache               = new LruCache<>(10000, 0);
+    private final Map<Table, AtlasEntity>             hiveTablesAndAtlasEntity     = new HashMap<>();
+    private final Map<String, AtlasEntity>            dbEntities                   = new HashMap<>();
+    private final List<Map<String, String>>           databaseAndTableListToImport = new ArrayList<>();
+    private final Map<String, String>                 qualifiedNameGuidMap         = new HashMap<>();
+
+    /**
+     * Construct a HiveMetaStoreBridgeV2.
+     * @param hiveConf {@link HiveConf} for Hive component in the cluster
+     */
+    public HiveMetaStoreBridgeV2(Configuration atlasProperties, HiveConf hiveConf, AtlasClientV2 atlasClientV2) throws Exception {
+        this.metadataNamespace          = getMetadataNamespace(atlasProperties);
+        this.hiveClient                 = Hive.get(hiveConf);
+        this.atlasClientV2              = atlasClientV2;
+        this.convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
+        this.awsS3AtlasModelVersion     = atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
+
+        if (atlasProperties != null) {
+            pageLimit = atlasProperties.getInteger(HOOK_HIVE_PAGE_LIMIT, DEFAULT_PAGE_LIMIT);
+        }
+    }
+
+    public boolean exportDataToZipAndRunAtlasImport(CommandLine cmd) throws MissingArgumentException, IOException, HiveException, AtlasBaseException {
+        boolean       ret               = true;
+        boolean       failOnError       = cmd.hasOption("failOnError");
+
+        String        databaseToImport = cmd.getOptionValue(OPTION_DATABASE_SHORT);
+        String        tableToImport    = cmd.getOptionValue(OPTION_TABLE_SHORT);
+        String        importDataFile   = cmd.getOptionValue(OPTION_IMPORT_DATA_FILE_SHORT);
+        String        outputFileOrPath = cmd.getOptionValue(OPTION_OUTPUT_FILEPATH_SHORT);
+
+        boolean       ignoreBulkImport = cmd.hasOption(OPTION_IGNORE_BULK_IMPORT_SHORT);
+
+        validateOutputFileOrPath(outputFileOrPath);
+
+        try {
+            initializeZipStream();
+
+            if (isValidImportDataFile(importDataFile)) {
+                File f = new File(importDataFile);
+
+                BufferedReader br = new BufferedReader(new FileReader(f));
+                String line = null;
+
+                while ((line = br.readLine()) != null) {
+                    String val[] = line.split(":");
+
+                    if (ArrayUtils.isNotEmpty(val)) {
+                        databaseToImport = val[0];
+
+                        if (val.length > 1) {
+                            tableToImport = val[1];
+                        } else {
+                            tableToImport = "";
+                        }
+
+                        importHiveDatabases(databaseToImport, tableToImport, failOnError);
+                    }
+                }
+            } else {
+                importHiveDatabases(databaseToImport, tableToImport, failOnError);
+            }
+
+            importHiveTables(failOnError);
+            importHiveColumns(failOnError);
+        } finally {
+            endWritingAndZipStream();
+        }
+
+        if (!ignoreBulkImport) {
+            runAtlasImport();
+        }
+
+        return ret;
+    }
+
+    private void validateOutputFileOrPath(String outputFileOrPath) throws MissingArgumentException {
+        if (StringUtils.isBlank(outputFileOrPath)) {
+            throw new MissingArgumentException("Output Path/File can't be empty");
+        }
+
+        File fileOrDirToImport = new File(outputFileOrPath);
+        if (fileOrDirToImport.exists()) {
+            if (fileOrDirToImport.isDirectory()) {
+                this.outZipFileName = outputFileOrPath + File.separator + DEFAULT_ZIP_FILE_NAME;
+                LOG.info("The default output zip file {} will be created at {}", DEFAULT_ZIP_FILE_NAME, outputFileOrPath);
+            } else {
+                throw new MissingArgumentException("output file: " + outputFileOrPath + " already present");
+            }
+        } else if (fileOrDirToImport.getParentFile().isDirectory() && outputFileOrPath.endsWith(".zip")) {
+            LOG.info("The mentioned output zip file {} will be created", outputFileOrPath);
+            this.outZipFileName = outputFileOrPath;
+        } else {
+            throw new MissingArgumentException("Invalid File/Path");
+        }
+    }
+
+    private boolean isValidImportDataFile(String importDataFile) throws MissingArgumentException {
+        boolean ret = false;
+        if (StringUtils.isNotBlank(importDataFile)) {
+            File dataFile = new File(importDataFile);
+
+            if (!dataFile.exists() || !dataFile.canRead()) {
+                throw new MissingArgumentException("Invalid import data file");
+            }
+            ret = true;
+        }
+
+        return ret;
+    }
+
+    private void initializeZipStream() throws IOException, AtlasBaseException {
+        this.zipOutputStream            = new ZipOutputStream(getOutputStream(this.outZipFileName));
+
+        storeTypesDefToZip(new AtlasTypesDef());
+
+        startWritingEntitiesToZip();
+    }
+
+    private void storeTypesDefToZip(AtlasTypesDef typesDef) throws AtlasBaseException {
+        String jsonData = AtlasType.toJson(typesDef);
+        saveToZip(TYPES_DEF_JSON, jsonData);
+    }
+
+    private void saveToZip(String fileName, String jsonData) throws AtlasBaseException {
+        try {
+            ZipEntry e = new ZipEntry(fileName);
+            zipOutputStream.putNextEntry(e);
+            writeBytes(jsonData);
+            zipOutputStream.closeEntry();
+        } catch (IOException e) {
+            throw new AtlasBaseException(String.format("Error writing file %s.", fileName), e);
+        }
+    }
+
+    private void startWritingEntitiesToZip() throws IOException {
+        zipOutputStream.putNextEntry(new ZipEntry(ZIP_ENTRY_ENTITIES));
+        writeBytes(JSON_ARRAY_START);
+    }
+
+    private String getDatabaseToImport(String TableWithDatabase) {
+        String ret = null;
+        String val[] = TableWithDatabase.split("\\.");
+        if (val.length > 1) {
+            ret = val[0];
+        }
+        return ret;
+    }
+
+    private String getTableToImport(String TableWithDatabase) {
+        String ret = null;
+        String val[] = TableWithDatabase.split("\\.");
+        if (val.length > 1) {
+            ret = val[1];
+        }
+        return ret;
+    }
+
+    private void importHiveDatabases(String databaseToImport, String tableWithDatabaseToImport, boolean failOnError) throws HiveException, AtlasBaseException {
+        LOG.info("Importing Hive Databases");
+
+        List<String> databaseNames = null;
+
+        if (StringUtils.isEmpty(databaseToImport) && StringUtils.isNotEmpty(tableWithDatabaseToImport)) {
+            if (isTableWithDatabaseName(tableWithDatabaseToImport)) {
+                databaseToImport = getDatabaseToImport(tableWithDatabaseToImport);
+                tableWithDatabaseToImport = getTableToImport(tableWithDatabaseToImport);
+            }
+        }
+
+        if (StringUtils.isEmpty(databaseToImport)) {
+            //when database to import is empty, import all
+            databaseNames = hiveClient.getAllDatabases();
+        } else {
+            //when database to import has some value then, import that db and all table under it.
+            databaseNames = hiveClient.getDatabasesByPattern(databaseToImport);
+        }
+
+        if (!CollectionUtils.isEmpty(databaseNames)) {
+            LOG.info("Found {} databases", databaseNames.size());
+            for (String databaseName : databaseNames) {
+                try {
+                    if (!dbEntities.containsKey(databaseName)) {
+                        LOG.info("Importing Hive Database {}", databaseName);
+                        AtlasEntityWithExtInfo dbEntity = writeDatabase(databaseName);
+                        if (dbEntity != null) {
+                            dbEntities.put(databaseName, dbEntity.getEntity());
+                        }
+                    }
+                    databaseAndTableListToImport.add(Collections.singletonMap(databaseName, tableWithDatabaseToImport));
+                } catch (IOException e) {
+                    LOG.error("Import failed for hive database {}", databaseName, e);
+
+                    if (failOnError) {
+                        throw new AtlasBaseException(e.getMessage(), e);
+                    }
+                }
+            }
+        } else {
+            LOG.error("No database found");
+            if (failOnError) {
+                throw new AtlasBaseException("No database found");
+            }
+        }
+    }
+
+    private void writeEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws IOException {
+        if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
+            Iterator<Map.Entry<String, AtlasEntity>> itr = entityWithExtInfo.getReferredEntities().entrySet().iterator();
+            while (itr.hasNext()) {
+                Map.Entry<String, AtlasEntity> eachEntity = itr.next();
+                if (eachEntity.getValue().getTypeName().equalsIgnoreCase(HiveDataTypes.HIVE_DB.getName())) {
+                    itr.remove();
+                }
+            }
+        }
+
+        if (!entityLRUCache.containsKey(entityWithExtInfo.getEntity().getGuid())) {
+            entityLRUCache.put(entityWithExtInfo.getEntity().getGuid(), entityWithExtInfo);
+            writeBytes(AtlasType.toJson(entityWithExtInfo) + JSON_COMMA);
+        }
+        totalProcessedEntities++;
+    }
+
+    private void endWritingAndZipStream() throws IOException {
+        writeBytes(JSON_EMPTY_OBJECT);
+        writeBytes(JSON_ARRAY_END);
+        setStreamSize(totalProcessedEntities);
+        close();
+    }
+
+    private void flush() {
+        try {
+            zipOutputStream.flush();
+        } catch (IOException e) {
+            LOG.error("Error: Flush: ", e);
+        }
+    }
+
+    private void close() throws IOException {
+        zipOutputStream.flush();
+        zipOutputStream.closeEntry();
+        zipOutputStream.close();
+    }
+
+    private void writeBytes(String payload) throws IOException {
+        zipOutputStream.write(payload.getBytes());
+    }
+
+    private OutputStream getOutputStream(String fileToWrite) throws IOException {
+        return FileUtils.openOutputStream(new File(fileToWrite));
+    }
+
+    public String getMetadataNamespace(Configuration config) {
+        return AtlasConfigurationUtil.getRecentString(config, HIVE_METADATA_NAMESPACE, getClusterName(config));
+    }
+
+    private String getClusterName(Configuration config) {
+        return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
+    }
+
+    public String getMetadataNamespace() {
+        return metadataNamespace;
+    }
+
+    public boolean isConvertHdfsPathToLowerCase() {
+        return convertHdfsPathToLowerCase;
+    }
+
+    /**
+     * Imports Hive tables if databaseAndTableListToImport is populated
+     * @param failOnError
+     * @throws Exception
+     */
+    public void importHiveTables(boolean failOnError) throws HiveException, AtlasBaseException {
+        LOG.info("Importing Hive Tables");
+
+        int tablesImported = 0;
+
+        if (CollectionUtils.isNotEmpty(databaseAndTableListToImport) && MapUtils.isNotEmpty(dbEntities)) {
+            for (Map<String, String> eachEntry : databaseAndTableListToImport) {
+                final List<Table> tableObjects;
+
+                String databaseName = eachEntry.keySet().iterator().next();
+
+                if (StringUtils.isEmpty(eachEntry.values().iterator().next())) {
+                    tableObjects = hiveClient.getAllTableObjects(databaseName);
+
+                    populateQualifiedNameGuidMap(HiveDataTypes.HIVE_DB.getName(), (String) dbEntities.get(databaseName).getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                } else {
+                    List<String> tableNames = hiveClient.getTablesByPattern(databaseName, eachEntry.values().iterator().next());
+                    tableObjects = new ArrayList<>();
+
+                    for (String tableName : tableNames) {
+                        Table table = hiveClient.getTable(databaseName, tableName);
+                        tableObjects.add(table);
+                        populateQualifiedNameGuidMap(HiveDataTypes.HIVE_TABLE.getName(), getTableQualifiedName(metadataNamespace, table));
+                    }
+                }
+
+                if (!CollectionUtils.isEmpty(tableObjects)) {
+                    LOG.info("Found {} tables to import in database {}", tableObjects.size(), databaseName);
+
+                    try {
+                        for (Table table : tableObjects) {
+                            int imported = importTable(dbEntities.get(databaseName), table, failOnError);
+
+                            tablesImported += imported;
+                        }
+                    } finally {
+                        if (tablesImported == tableObjects.size()) {
+                            LOG.info("Successfully imported {} tables from database {}", tablesImported, databaseName);
+                        } else {
+                            LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import",
+                                    tablesImported, tableObjects.size(), databaseName);
+                        }
+                    }
+                } else {
+                    LOG.error("No tables to import in database {}", databaseName);
+                    if (failOnError) {
+                        throw new AtlasBaseException("No tables to import in database - " + databaseName);
+                    }
+                }
+            }
+        }
+
+        dbEntities.clear();
+    }
+
+    private void populateQualifiedNameGuidMap(String typeName, String qualifiedName) {
+        try {
+            AtlasEntitiesWithExtInfo entitiesWithExtInfo = atlasClientV2.getEntitiesByAttribute(typeName, Collections.singletonList(Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName)), true, false);
+
+            if (entitiesWithExtInfo != null && entitiesWithExtInfo.getEntities() != null) {
+                for (AtlasEntity entity : entitiesWithExtInfo.getEntities()) {
+                    qualifiedNameGuidMap.put((String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getGuid());
+
+                    for(Map.Entry<String, AtlasEntity> eachEntry : entitiesWithExtInfo.getReferredEntities().entrySet()) {
+                        qualifiedNameGuidMap.put((String) eachEntry.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME), eachEntry.getKey());
+                    }
+
+                    if (typeName.equals(HiveDataTypes.HIVE_DB.getName())) {
+                        for (String eachRelatedGuid : getAllRelatedGuids(entity)) {
+                            AtlasEntityWithExtInfo relatedEntity = atlasClientV2.getEntityByGuid(eachRelatedGuid, true, false);
+
+                            qualifiedNameGuidMap.put((String) relatedEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), relatedEntity.getEntity().getGuid());
+                            for (Map.Entry<String, AtlasEntity> eachEntry : relatedEntity.getReferredEntities().entrySet()) {
+                                qualifiedNameGuidMap.put((String) eachEntry.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME), eachEntry.getKey());
+                            }
+                        }
+                    }
+                }
+            }
+        } catch (AtlasServiceException e) {
+            LOG.info("Unable to load the related entities for type {} and qualified name {} from Atlas", typeName, qualifiedName, e);
+        }
+    }
+
+    private Set<String> getAllRelatedGuids(AtlasEntity entity) {
+        Set<String> relGuidsSet = new HashSet<>();
+
+        for (Object o : entity.getRelationshipAttributes().values()) {
+            if (o instanceof AtlasObjectId) {
+                relGuidsSet.add(((AtlasObjectId) o).getGuid());
+            } else if (o instanceof List) {
+                for (Object id : (List) o) {
+                    if (id instanceof AtlasObjectId) {
+                        relGuidsSet.add(((AtlasObjectId) id).getGuid());
+                    }
+                    if (id instanceof Map) {
+                        relGuidsSet.add((String) ((Map) id).get("guid"));
+                    }
+                }
+            }
+        }
+
+        return relGuidsSet;
+    }
+
+    public void importHiveColumns(boolean failOnError) throws AtlasBaseException {
+        LOG.info("Importing Hive Columns");
+
+        if (MapUtils.isEmpty(hiveTablesAndAtlasEntity)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("No hive table present to import columns");
+            }
+
+            return;
+        }
+
+        for (Map.Entry<Table, AtlasEntity> eachTable : hiveTablesAndAtlasEntity.entrySet()) {
+            int               columnsImported = 0;
+            List<AtlasEntity> columnEntities  = new ArrayList<>();
+
+            try {
+                List<AtlasEntity> partKeys = toColumns(eachTable.getKey().getPartitionKeys(), eachTable.getValue(), RELATIONSHIP_HIVE_TABLE_PART_KEYS);
+                List<AtlasEntity> columns  = toColumns(eachTable.getKey().getCols(), eachTable.getValue(), RELATIONSHIP_HIVE_TABLE_COLUMNS);
+
+                partKeys.stream().collect(Collectors.toCollection(() -> columnEntities));
+                columns.stream().collect(Collectors.toCollection(() -> columnEntities));
+
+                for (AtlasEntity eachColumnEntity : columnEntities) {
+                    writeEntityToZip(new AtlasEntityWithExtInfo(eachColumnEntity));
+                    columnsImported++;
+                }
+            } catch (IOException e) {
+                LOG.error("Column Import failed for hive table {}", eachTable.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME), e);
+
+                if (failOnError) {
+                    throw new AtlasBaseException(e.getMessage(), e);
+                }
+            } finally {
+                if (columnsImported == columnEntities.size()) {
+                    LOG.info("Successfully imported {} columns for table {}", columnsImported, eachTable.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                } else {
+                    LOG.error("Imported {} of {} columns for table {}. Please check logs for errors during import", columnsImported, columnEntities.size(), eachTable.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                }
+            }
+        }
+
+    }
+
+    private void runAtlasImport() {
+        AtlasImportRequest request = new AtlasImportRequest();
+        request.setOption(AtlasImportRequest.UPDATE_TYPE_DEFINITION_KEY, "false");
+        request.setOption(AtlasImportRequest.OPTION_KEY_FORMAT, AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT);
+
+        try {
+            AtlasImportResult importResult = atlasClientV2.importData(request, this.outZipFileName);
+
+            if (importResult.getOperationStatus() == AtlasImportResult.OperationStatus.SUCCESS) {
+                LOG.info("Successfully imported the zip file {} at Atlas and imported {} entities. Number of entities to be imported {}.", this.outZipFileName, importResult.getProcessedEntities().size(), totalProcessedEntities);
+            } else {
+                LOG.error("Failed to import or get the status of import for the zip file {} at Atlas. Number of entities to be imported {}.", this.outZipFileName, totalProcessedEntities);
+            }
+        } catch (AtlasServiceException e) {
+            LOG.error("Failed to import or get the status of import for the zip file {} at Atlas. Number of entities to be imported {}.", this.outZipFileName, totalProcessedEntities, e);
+        }
+    }
+
+    public int importTable(AtlasEntity dbEntity, Table table, final boolean failOnError) throws AtlasBaseException {
+        try {
+            AtlasEntityWithExtInfo tableEntity = writeTable(dbEntity, table);
+
+            hiveTablesAndAtlasEntity.put(table, tableEntity.getEntity());
+
+            if (table.getTableType() == TableType.EXTERNAL_TABLE) {
+                String processQualifiedName = getTableProcessQualifiedName(metadataNamespace, table);
+                String tableLocationString  = isConvertHdfsPathToLowerCase() ? lower(table.getDataLocation().toString()) : table.getDataLocation().toString();
+                Path   location             = table.getDataLocation();
+                String query                = getCreateTableString(table, tableLocationString);
+
+                PathExtractorContext   pathExtractorCtx  = new PathExtractorContext(getMetadataNamespace(), isConvertHdfsPathToLowerCase(), awsS3AtlasModelVersion);
+                AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(location, pathExtractorCtx);
+                AtlasEntity            pathInst          = entityWithExtInfo.getEntity();
+                AtlasEntity            tableInst         = tableEntity.getEntity();
+                AtlasEntity            processInst       = new AtlasEntity(HiveDataTypes.HIVE_PROCESS.getName());
+
+                long now = System.currentTimeMillis();
+
+                processInst.setGuid(getGuid(processQualifiedName));
+                processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, processQualifiedName);
+                processInst.setAttribute(ATTRIBUTE_NAME, query);
+                processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
+                processInst.setRelationshipAttribute(ATTRIBUTE_INPUTS, Collections.singletonList(AtlasTypeUtil.getAtlasRelatedObjectId(pathInst, RELATIONSHIP_DATASET_PROCESS_INPUTS)));
+                processInst.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(AtlasTypeUtil.getAtlasRelatedObjectId(tableInst, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)));
+                String userName = table.getOwner();
+                if (StringUtils.isEmpty(userName)) {
+                    userName = ApplicationProperties.get().getString(HIVE_USERNAME, "hive");
+                }
+                processInst.setAttribute(ATTRIBUTE_USER_NAME, userName);
+                processInst.setAttribute(ATTRIBUTE_START_TIME, now);
+                processInst.setAttribute(ATTRIBUTE_END_TIME, now);
+                processInst.setAttribute(ATTRIBUTE_OPERATION_TYPE, "CREATETABLE");
+                processInst.setAttribute(ATTRIBUTE_QUERY_TEXT, query);
+                processInst.setAttribute(ATTRIBUTE_QUERY_ID, query);
+                processInst.setAttribute(ATTRIBUTE_QUERY_PLAN, "{}");
+                processInst.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(query));
+
+                AtlasEntitiesWithExtInfo createTableProcess = new AtlasEntitiesWithExtInfo();
+
+                createTableProcess.addEntity(processInst);
+
+                if (pathExtractorCtx.getKnownEntities() != null) {
+                    pathExtractorCtx.getKnownEntities().values().forEach(entity -> createTableProcess.addEntity(entity));
+                } else {
+                    createTableProcess.addEntity(pathInst);
+                }
+
+                writeEntitiesToZip(createTableProcess);
+            }
+
+            return 1;
+        } catch (Exception e) {
+            LOG.error("Import failed for hive_table {}", table.getTableName(), e);
+
+            if (failOnError) {
+                throw new AtlasBaseException(e.getMessage(), e);
+            }
+
+            return 0;
+        }
+    }
+
+    /**
+     * Write db entity
+     * @param databaseName
+     * @return
+     * @throws Exception
+     */
+    private AtlasEntityWithExtInfo writeDatabase(String databaseName) throws HiveException, IOException {
+        AtlasEntityWithExtInfo ret = null;
+        Database               db  = hiveClient.getDatabase(databaseName);
+
+        if (db != null) {
+            ret = new AtlasEntityWithExtInfo(toDbEntity(db));
+            writeEntityToZip(ret);
+        }
+
+        return ret;
+    }
+
+    private AtlasEntityWithExtInfo writeTable(AtlasEntity dbEntity, Table table) throws AtlasHookException {
+        try {
+            AtlasEntityWithExtInfo tableEntity = toTableEntity(dbEntity, table);
+            writeEntityToZip(tableEntity);
+
+            return tableEntity;
+        } catch (Exception e) {
+            throw new AtlasHookException("HiveMetaStoreBridgeV2.registerTable() failed.", e);
+        }
+    }
+
+    /**
+     * Write an entity to Zip file
+     * @param entity
+     * @return
+     * @throws Exception
+     */
+    private void writeEntityToZip(AtlasEntityWithExtInfo entity) throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Writing {} entity: {}", entity.getEntity().getTypeName(), entity);
+        }
+
+        writeEntity(entity);
+        clearRelationshipAttributes(entity.getEntity());
+        flush();
+    }
+
+    /**
+     * Registers an entity in atlas
+     * @param entities
+     * @return
+     * @throws Exception
+     */
+    private void writeEntitiesToZip(AtlasEntitiesWithExtInfo entities) throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Writing {} entities: {}", entities.getEntities().size(), entities);
+        }
+
+        for (AtlasEntity entity : entities.getEntities()) {
+            writeEntity(new AtlasEntityWithExtInfo(entity));
+        }
+
+        flush();
+        clearRelationshipAttributes(entities);
+    }
+
+    /**
+     * Create a Hive Database entity
+     * @param hiveDB The Hive {@link Database} object from which to map properties
+     * @return new Hive Database AtlasEntity
+     * @throws HiveException
+     */
+    private AtlasEntity toDbEntity(Database hiveDB) {
+        return toDbEntity(hiveDB, null);
+    }
+
+    private AtlasEntity toDbEntity(Database hiveDB, AtlasEntity dbEntity) {
+        if (dbEntity == null) {
+            dbEntity = new AtlasEntity(HiveDataTypes.HIVE_DB.getName());
+        }
+
+        String dbName = getDatabaseName(hiveDB);
+
+        String qualifiedName = getDBQualifiedName(metadataNamespace, dbName);
+        dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
+
+        dbEntity.setGuid(getGuid(true, qualifiedName));
+
+        dbEntity.setAttribute(ATTRIBUTE_NAME, dbName);
+        dbEntity.setAttribute(ATTRIBUTE_DESCRIPTION, hiveDB.getDescription());
+        dbEntity.setAttribute(ATTRIBUTE_OWNER, hiveDB.getOwnerName());
+
+        dbEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
+        dbEntity.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(hiveDB.getLocationUri()));
+        dbEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveDB.getParameters());
+
+        if (hiveDB.getOwnerType() != null) {
+            dbEntity.setAttribute(ATTRIBUTE_OWNER_TYPE, OWNER_TYPE_TO_ENUM_VALUE.get(hiveDB.getOwnerType().getValue()));
+        }
+
+        return dbEntity;
+    }
+
+    private String getDBGuidFromAtlas(String dBQualifiedName) {
+        String guid = null;
+        try {
+            guid = atlasClientV2.getEntityHeaderByAttribute(HiveDataTypes.HIVE_DB.getName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, dBQualifiedName)).getGuid();
+        } catch (AtlasServiceException e) {
+            LOG.warn("Failed to get DB guid from Atlas with qualified name {}", dBQualifiedName, e);
+        }
+        return guid;
+    }
+
+    public static String getDatabaseName(Database hiveDB) {
+        String dbName      = hiveDB.getName().toLowerCase();
+        String catalogName = hiveDB.getCatalogName() != null ? hiveDB.getCatalogName().toLowerCase() : null;
+
+        if (StringUtils.isNotEmpty(catalogName) && !StringUtils.equals(catalogName, DEFAULT_METASTORE_CATALOG)) {
+            dbName = catalogName + SEP + dbName;
+        }
+
+        return dbName;
+    }
+
+    /**
+     * Create a new table instance in Atlas
+     * @param  database AtlasEntity for Hive  {@link AtlasEntity} to which this table belongs
+     * @param hiveTable reference to the Hive {@link Table} from which to map properties
+     * @return Newly created Hive AtlasEntity
+     * @throws Exception
+     */
+    private AtlasEntityWithExtInfo toTableEntity(AtlasEntity database, final Table hiveTable) throws AtlasHookException {
+        AtlasEntityWithExtInfo table = new AtlasEntityWithExtInfo(new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName()));
+
+        AtlasEntity tableEntity        = table.getEntity();
+        String      tableQualifiedName = getTableQualifiedName(metadataNamespace, hiveTable);
+        long        createTime         = BaseHiveEvent.getTableCreateTime(hiveTable);
+        long        lastAccessTime     = hiveTable.getLastAccessTime() > 0 ? hiveTable.getLastAccessTime() : createTime;
+
+        tableEntity.setGuid(getGuid(tableQualifiedName));
+        tableEntity.setRelationshipAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasRelatedObjectId(database, RELATIONSHIP_HIVE_TABLE_DB));
+        tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName);
+        tableEntity.setAttribute(ATTRIBUTE_NAME, hiveTable.getTableName().toLowerCase());
+        tableEntity.setAttribute(ATTRIBUTE_OWNER, hiveTable.getOwner());
+
+        tableEntity.setAttribute(ATTRIBUTE_CREATE_TIME, createTime);
+        tableEntity.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime);
+        tableEntity.setAttribute(ATTRIBUTE_RETENTION, hiveTable.getRetention());
+        tableEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveTable.getParameters());
+        tableEntity.setAttribute(ATTRIBUTE_COMMENT, hiveTable.getParameters().get(ATTRIBUTE_COMMENT));
+        tableEntity.setAttribute(ATTRIBUTE_TABLE_TYPE, hiveTable.getTableType().name());
+        tableEntity.setAttribute(ATTRIBUTE_TEMPORARY, hiveTable.isTemporary());
+
+        if (hiveTable.getViewOriginalText() != null) {
+            tableEntity.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, hiveTable.getViewOriginalText());
+        }
+
+        if (hiveTable.getViewExpandedText() != null) {
+            tableEntity.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, hiveTable.getViewExpandedText());
+        }
+
+        AtlasEntity sdEntity = toStorageDescEntity(hiveTable.getSd(), getStorageDescQFName(tableQualifiedName), AtlasTypeUtil.getObjectId(tableEntity));
+
+        tableEntity.setRelationshipAttribute(ATTRIBUTE_STORAGEDESC, AtlasTypeUtil.getAtlasRelatedObjectId(sdEntity, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
+
+        table.addReferredEntity(database);
+        table.addReferredEntity(sdEntity);
+        table.setEntity(tableEntity);
+
+        return table;
+    }
+
+    private AtlasEntity toStorageDescEntity(StorageDescriptor storageDesc, String sdQualifiedName, AtlasObjectId tableId) {
+        AtlasEntity ret = new AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName());
+
+        ret.setGuid(getGuid(sdQualifiedName));
+        ret.setRelationshipAttribute(ATTRIBUTE_TABLE, AtlasTypeUtil.getAtlasRelatedObjectId(tableId, RELATIONSHIP_HIVE_TABLE_STORAGE_DESC));
+        ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName);
+        ret.setAttribute(ATTRIBUTE_PARAMETERS, storageDesc.getParameters());
+        ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(storageDesc.getLocation()));
+        ret.setAttribute(ATTRIBUTE_INPUT_FORMAT, storageDesc.getInputFormat());
+        ret.setAttribute(ATTRIBUTE_OUTPUT_FORMAT, storageDesc.getOutputFormat());
+        ret.setAttribute(ATTRIBUTE_COMPRESSED, storageDesc.isCompressed());
+        ret.setAttribute(ATTRIBUTE_NUM_BUCKETS, storageDesc.getNumBuckets());
+        ret.setAttribute(ATTRIBUTE_STORED_AS_SUB_DIRECTORIES, storageDesc.isStoredAsSubDirectories());
+
+        if (storageDesc.getBucketCols().size() > 0) {
+            ret.setAttribute(ATTRIBUTE_BUCKET_COLS, storageDesc.getBucketCols());
+        }
+
+        if (storageDesc.getSerdeInfo() != null) {
+            SerDeInfo serdeInfo = storageDesc.getSerdeInfo();
+
+            LOG.info("serdeInfo = {}", serdeInfo);
+            AtlasStruct serdeInfoStruct = new AtlasStruct(HiveDataTypes.HIVE_SERDE.getName());
+
+            serdeInfoStruct.setAttribute(ATTRIBUTE_NAME, serdeInfo.getName());
+            serdeInfoStruct.setAttribute(ATTRIBUTE_SERIALIZATION_LIB, serdeInfo.getSerializationLib());
+            serdeInfoStruct.setAttribute(ATTRIBUTE_PARAMETERS, serdeInfo.getParameters());
+
+            ret.setAttribute(ATTRIBUTE_SERDE_INFO, serdeInfoStruct);
+        }
+
+        if (CollectionUtils.isNotEmpty(storageDesc.getSortCols())) {
+            List<AtlasStruct> sortColsStruct = new ArrayList<>();
+
+            for (Order sortcol : storageDesc.getSortCols()) {
+                String hiveOrderName = HiveDataTypes.HIVE_ORDER.getName();
+                AtlasStruct colStruct = new AtlasStruct(hiveOrderName);
+                colStruct.setAttribute("col", sortcol.getCol());
+                colStruct.setAttribute("order", sortcol.getOrder());
+
+                sortColsStruct.add(colStruct);
+            }
+
+            ret.setAttribute(ATTRIBUTE_SORT_COLS, sortColsStruct);
+        }
+
+        return ret;
+    }
+
+    private List<AtlasEntity> toColumns(List<FieldSchema> schemaList, AtlasEntity table, String relationshipType) {
+        List<AtlasEntity> ret = new ArrayList<>();
+
+        int columnPosition = 0;
+        for (FieldSchema fs : schemaList) {
+            LOG.debug("Processing field {}", fs);
+
+            AtlasEntity column = new AtlasEntity(HiveDataTypes.HIVE_COLUMN.getName());
+
+            String columnQualifiedName = getColumnQualifiedName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), fs.getName());
+
+            column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, columnQualifiedName);
+            column.setGuid(getGuid(columnQualifiedName));
+
+            column.setRelationshipAttribute(ATTRIBUTE_TABLE, AtlasTypeUtil.getAtlasRelatedObjectId(table, relationshipType));
+
+            column.setAttribute(ATTRIBUTE_NAME, fs.getName());
+            column.setAttribute(ATTRIBUTE_OWNER, table.getAttribute(ATTRIBUTE_OWNER));
+            column.setAttribute(ATTRIBUTE_COL_TYPE, fs.getType());
+            column.setAttribute(ATTRIBUTE_COL_POSITION, columnPosition++);
+            column.setAttribute(ATTRIBUTE_COMMENT, fs.getComment());
+
+            ret.add(column);
+        }
+        return ret;
+    }
+
+    private String getCreateTableString(Table table, String location){
+        String            colString = "";
+        List<FieldSchema> colList   = table.getAllCols();
+
+        if (colList != null) {
+            for (FieldSchema col : colList) {
+                colString += col.getName() + " " + col.getType() + ",";
+            }
+
+            if (colList.size() > 0) {
+                colString = colString.substring(0, colString.length() - 1);
+                colString = "(" + colString + ")";
+            }
+        }
+
+        String query = "create external table " + table.getTableName() +  colString + " location '" + location + "'";
+
+        return query;
+    }
+
+    private String lower(String str) {
+        if (StringUtils.isEmpty(str)) {
+            return "";
+        }
+
+        return str.toLowerCase().trim();
+    }
+
+    /**
+     * Construct the qualified name used to uniquely identify a Table instance in Atlas.
+     * @param metadataNamespace Metadata namespace of the cluster to which the Hive component belongs
+     * @param table hive table for which the qualified name is needed
+     * @return Unique qualified name to identify the Table instance in Atlas.
+     */
+    private static String getTableQualifiedName(String metadataNamespace, Table table) {
+        return getTableQualifiedName(metadataNamespace, table.getDbName(), table.getTableName(), table.isTemporary());
+    }
+
+    /**
+     * Construct the qualified name used to uniquely identify a Database instance in Atlas.
+     * @param metadataNamespace Name of the cluster to which the Hive component belongs
+     * @param dbName Name of the Hive database
+     * @return Unique qualified name to identify the Database instance in Atlas.
+     */
+    public static String getDBQualifiedName(String metadataNamespace, String dbName) {
+        return String.format("%s@%s", dbName.toLowerCase(), metadataNamespace);
+    }
+
+    /**
+     * Construct the qualified name used to uniquely identify a Table instance in Atlas.
+     * @param metadataNamespace Name of the cluster to which the Hive component belongs
+     * @param dbName Name of the Hive database to which the Table belongs
+     * @param tableName Name of the Hive table
+     * @param isTemporaryTable is this a temporary table
+     * @return Unique qualified name to identify the Table instance in Atlas.
+     */
+    public static String getTableQualifiedName(String metadataNamespace, String dbName, String tableName, boolean isTemporaryTable) {
+        String tableTempName = tableName;
+
+        if (isTemporaryTable) {
+            if (SessionState.get() != null && SessionState.get().getSessionId() != null) {
+                tableTempName = tableName + TEMP_TABLE_PREFIX + SessionState.get().getSessionId();
+            } else {
+                tableTempName = tableName + TEMP_TABLE_PREFIX + RandomStringUtils.random(10);
+            }
+        }
+
+        return String.format("%s.%s@%s", dbName.toLowerCase(), tableTempName.toLowerCase(), metadataNamespace);
+    }
+
+    public static String getTableProcessQualifiedName(String metadataNamespace, Table table) {
+        String tableQualifiedName = getTableQualifiedName(metadataNamespace, table);
+        long   createdTime        = getTableCreatedTime(table);
+
+        return tableQualifiedName + SEP + createdTime;
+    }
+
+    public static String getStorageDescQFName(String tableQualifiedName) {
+        return tableQualifiedName + "_storage";
+    }
+
+    public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) {
+        final String[] parts             = tableQualifiedName.split("@");
+        final String   tableName         = parts[0];
+        final String   metadataNamespace = parts[1];
+
+        return String.format("%s.%s@%s", tableName, colName.toLowerCase(), metadataNamespace);
+    }
+
+    public static long getTableCreatedTime(Table table) {
+        return table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR;
+    }
+
+    private void clearRelationshipAttributes(AtlasEntitiesWithExtInfo entities) {
+        if (entities != null) {
+            if (entities.getEntities() != null) {
+                for (AtlasEntity entity : entities.getEntities()) {
+                    clearRelationshipAttributes(entity);;
+                }
+            }
+
+            if (entities.getReferredEntities() != null) {
+                clearRelationshipAttributes(entities.getReferredEntities().values());
+            }
+        }
+    }
+
+    private void clearRelationshipAttributes(Collection<AtlasEntity> entities) {
+        if (entities != null) {
+            for (AtlasEntity entity : entities) {
+                clearRelationshipAttributes(entity);
+            }
+        }
+    }
+
+    private void clearRelationshipAttributes(AtlasEntity entity) {
+        if (entity != null && entity.getRelationshipAttributes() != null) {
+            entity.getRelationshipAttributes().clear();
+        }
+    }
+
+    private boolean isTableWithDatabaseName(String tableName) {
+        boolean ret = false;
+        if (tableName.contains(".")) {
+            ret = true;
+        }
+        return ret;
+    }
+
+    private String getGuid(String qualifiedName) {
+        return getGuid(false, qualifiedName);
+    }
+
+    private String getGuid(boolean isDBType, String qualifiedName) {
+        String guid = null;
+
+        if (qualifiedNameGuidMap.containsKey(qualifiedName)) {
+            guid = qualifiedNameGuidMap.get(qualifiedName);
+        } else if (isDBType) {
+            guid = getDBGuidFromAtlas(qualifiedName);
+        }
+
+        if (StringUtils.isBlank(guid)) {
+            guid = generateGuid();
+        }
+
+        return guid;
+    }
+
+    private String generateGuid() {
+        return UUID.randomUUID().toString();
+    }
+
+    public void setStreamSize(long size) {
+        zipOutputStream.setComment(String.format(ZIP_FILE_COMMENT_FORMAT, size, -1));
+    }
+}
\ No newline at end of file
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index 2c18704..cbc1aa9 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.atlas.model.impexp;
 
-
 import com.fasterxml.jackson.annotation.JsonAnySetter;
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -50,10 +49,10 @@ public class AtlasImportRequest implements Serializable {
     public  static final String OPTION_KEY_BATCH_SIZE      = "batchSize";
     public  static final String OPTION_KEY_FORMAT          = "format";
     public  static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect";
-    public static final String START_POSITION_KEY          = "startPosition";
+    public  static final String START_POSITION_KEY         = "startPosition";
+    public  static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition";
     private static final String START_GUID_KEY             = "startGuid";
     private static final String FILE_NAME_KEY              = "fileName";
-    private static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition";
     private static final String OPTION_KEY_STREAM_SIZE     = "size";
 
     private Map<String, String> options;
diff --git a/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
index 3430fda..1db5f20 100644
--- a/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
+++ b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
@@ -21,12 +21,7 @@ package org.apache.atlas.model.migration;
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.atlas.model.AtlasBaseModelObject;
 import org.apache.atlas.model.impexp.MigrationStatus;
-import org.apache.commons.lang.StringUtils;
-
-import java.io.Serializable;
-import java.util.Date;
 
 import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
 import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@@ -36,27 +31,43 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class MigrationImportStatus extends MigrationStatus {
     private String name;
+    private String fileHash;
 
     public MigrationImportStatus() {
     }
 
     public MigrationImportStatus(String name) {
-        this.name = name;
+        this.name     = name;
+        this.fileHash = name;
+    }
+
+    public MigrationImportStatus(String name, String fileHash) {
+        this.name     = name;
+        this.fileHash = fileHash;
     }
 
     public String getName() {
         return name;
     }
 
+    public String getFileHash() {
+        return fileHash;
+    }
+
     public void setName(String name) {
         this.name = name;
     }
 
+    public void setFileHash(String fileHash) {
+        this.fileHash = fileHash;
+    }
+
     @Override
     public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append(", name=").append(name);
-        sb.append(super.toString());
+        final StringBuilder sb = new StringBuilder("MigrationImportStatus{");
+        sb.append("name='").append(name).append('\'');
+        sb.append(", fileHash='").append(fileHash).append('\'');
+        sb.append('}');
         return sb.toString();
     }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
index a22c687..5b22f9c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
@@ -24,9 +24,12 @@ import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.FileInputStream;
+import java.io.IOException;
 import java.util.Date;
 
 import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getEncodedProperty;
@@ -50,7 +53,12 @@ public class DataMigrationStatusService {
 
 
     public void init(String fileToImport) {
-        this.status = new MigrationImportStatus(fileToImport);
+        try {
+            this.status = new MigrationImportStatus(fileToImport, DigestUtils.md5Hex(new FileInputStream(fileToImport)));
+        } catch (IOException e) {
+            LOG.error("Not able to create Migration status", e);
+        }
+
         if (!this.migrationStatusVertexManagement.exists(fileToImport)) {
             return;
         }
@@ -59,21 +67,28 @@ public class DataMigrationStatusService {
     }
 
     public MigrationImportStatus getCreate(String fileName) {
-        return getCreate(new MigrationImportStatus(fileName));
+        MigrationImportStatus create = null;
+        try {
+            create = getCreate(new MigrationImportStatus(fileName, DigestUtils.md5Hex(new FileInputStream(fileName))));
+        } catch (IOException e) {
+            LOG.error("Exception occurred while creating migration import", e);
+        }
+
+        return create;
     }
 
     public MigrationImportStatus getCreate(MigrationImportStatus status) {
         try {
             this.status = this.migrationStatusVertexManagement.createOrUpdate(status);
         } catch (Exception ex) {
-            LOG.error("DataMigrationStatusService: Setting status: {}: Resulted in error!", status.getName(), ex);
+            LOG.error("DataMigrationStatusService: Setting status: {}: Resulted in error!", status.getFileHash(), ex);
         }
 
         return this.status;
     }
 
     public MigrationImportStatus getStatus() {
-        if (this.status != null && this.migrationStatusVertexManagement.exists(this.status.getName())) {
+        if (this.status != null && this.migrationStatusVertexManagement.exists(this.status.getFileHash())) {
             return getCreate(this.status);
         }
 
@@ -89,8 +104,8 @@ public class DataMigrationStatusService {
             return;
         }
 
-        MigrationImportStatus status = getByName(this.status.getName());
-        this.migrationStatusVertexManagement.delete(status.getName());
+        MigrationImportStatus status = getByName(this.status.getFileHash());
+        this.migrationStatusVertexManagement.delete(status.getFileHash());
         this.status = null;
     }
 
@@ -118,7 +133,7 @@ public class DataMigrationStatusService {
         }
 
         public MigrationImportStatus createOrUpdate(MigrationImportStatus status) {
-            this.vertex = findByNameInternal(status.getName());
+            this.vertex = findByNameInternal(status.getFileHash());
 
             if (this.vertex == null) {
                 this.vertex = graph.addVertex();
@@ -192,7 +207,7 @@ public class DataMigrationStatusService {
 
         private void updateVertex(AtlasVertex vertex, MigrationImportStatus status) {
             try {
-                setEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, status.getName());
+                setEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, status.getFileHash());
 
                 setEncodedProperty(vertex, PROPERTY_KEY_START_TIME,
                         (status.getStartTime() != null)
@@ -213,7 +228,7 @@ public class DataMigrationStatusService {
             MigrationImportStatus ret = new MigrationImportStatus();
 
             try {
-                ret.setName(getEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, String.class));
+                ret.setFileHash(getEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, String.class));
 
                 Long dateValue = getEncodedProperty(vertex, PROPERTY_KEY_START_TIME, Long.class);
                 if (dateValue != null) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
index bfb1148..2b3f179 100644
--- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -26,7 +26,9 @@ import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.migration.MigrationImportStatus;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.impexp.ImportService;
+import org.apache.atlas.repository.impexp.ZipExportFileNames;
 import org.apache.atlas.type.AtlasType;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.filefilter.WildcardFileFilter;
 import org.apache.commons.lang.ArrayUtils;
@@ -196,7 +198,13 @@ public class ZipFileMigrationImporter implements Runnable {
     }
 
     private MigrationImportStatus getCreateMigrationStatus(String fileName, int streamSize) {
-        MigrationImportStatus status = new MigrationImportStatus(fileName);
+        MigrationImportStatus status = null;
+        try {
+            status = new MigrationImportStatus(fileName, DigestUtils.md5Hex(new FileInputStream(fileName)));
+        } catch (IOException e) {
+            LOG.error("Exception occurred while creating migration import", e);
+        }
+
         status.setTotalCount(streamSize);
 
         MigrationImportStatus statusRetrieved = dataMigrationStatusService.getCreate(status);
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
index f1dc990..6d368f4 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
@@ -19,13 +19,14 @@ package org.apache.atlas.repository.impexp;
 
 import com.google.inject.Inject;
 import org.apache.atlas.TestModules;
-import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.migration.MigrationImportStatus;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.migration.DataMigrationStatusService;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
+import java.io.FileInputStream;
+import java.io.IOException;
 import java.util.Date;
 
 import static org.testng.Assert.assertEquals;
@@ -38,12 +39,13 @@ public class DataMigrationStatusServiceTest {
     AtlasGraph atlasGraph;
 
     @Test
-    public void createUpdateDelete() {
+    public void createUpdateDelete() throws IOException {
         final String STATUS_DONE = "DONE";
 
         DataMigrationStatusService dataMigrationStatusService = new DataMigrationStatusService(atlasGraph);
 
-        MigrationImportStatus expected = new MigrationImportStatus("/tmp/defg.zip");
+        MigrationImportStatus expected = new MigrationImportStatus("DUMMY-HASH");
+
         expected.setTotalCount(3333);
         expected.setCurrentIndex(20);
         expected.setStartTime(new Date());
@@ -51,7 +53,7 @@ public class DataMigrationStatusServiceTest {
         MigrationImportStatus ret = dataMigrationStatusService.getCreate(expected);
 
         assertNotNull(ret);
-        assertEquals(ret.getName(), expected.getName());
+        assertEquals(ret.getFileHash(), expected.getFileHash());
         assertEquals(ret.getStartTime(), expected.getStartTime());
         assertEquals(ret.getTotalCount(), expected.getTotalCount());
         assertEquals(ret.getCurrentIndex(), expected.getCurrentIndex());
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 135b94b..0580f7f 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -831,13 +831,17 @@ public class AdminResource {
     }
 
     private void addToImportOperationAudits(AtlasImportResult result) throws AtlasBaseException {
-        List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport();
-
         Map<String, Object> optionMap = new HashMap<>();
         optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
         String params = AtlasJson.toJson(optionMap);
 
-        auditImportExportOperations(objectIds, AuditOperation.IMPORT, params);
+        if(result.getExportResult().getRequest() == null) {
+            int resultCount = result.getProcessedEntities().size();
+            auditService.add(AuditOperation.IMPORT, params, AtlasJson.toJson(result.getMetrics()), resultCount);
+        } else {
+            List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport();
+            auditImportExportOperations(objectIds, AuditOperation.IMPORT, params);
+        }
     }
 
     private void addToExportOperationAudits(boolean isSuccessful, AtlasExportResult result) throws AtlasBaseException {