You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/03/26 06:37:23 UTC

atlas git commit: ATLAS-2511: updated import-hive utility to add options to selectively import given database/tables

Repository: atlas
Updated Branches:
  refs/heads/master 31eb3664c -> bd39a509a


ATLAS-2511: updated import-hive utility to add options to selectively import given database/tables

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/bd39a509
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/bd39a509
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/bd39a509

Branch: refs/heads/master
Commit: bd39a509a20ff7860fdfdcab67e46cc503038f1e
Parents: 31eb366
Author: rmani <rm...@hortonworks.com>
Authored: Thu Mar 22 12:33:47 2018 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Sun Mar 25 23:37:11 2018 -0700

----------------------------------------------------------------------
 addons/hive-bridge/src/bin/import-hive.sh       |   8 +-
 .../atlas/hive/bridge/HiveMetaStoreBridge.java  | 127 +++++++++++-----
 .../org/apache/atlas/hive/hook/HiveHook.java    | 144 ++++++++++---------
 .../hive/bridge/HiveMetaStoreBridgeTest.java    |  19 +--
 4 files changed, 179 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/bd39a509/addons/hive-bridge/src/bin/import-hive.sh
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/bin/import-hive.sh b/addons/hive-bridge/src/bin/import-hive.sh
index 47581ac..98f4c84 100755
--- a/addons/hive-bridge/src/bin/import-hive.sh
+++ b/addons/hive-bridge/src/bin/import-hive.sh
@@ -31,6 +31,8 @@ done
 BASEDIR=`dirname ${PRG}`
 BASEDIR=`cd ${BASEDIR}/..;pwd`
 
+allargs=$@
+
 if test -z "${JAVA_HOME}"
 then
     JAVA_BIN=`which java`
@@ -128,8 +130,8 @@ done
 
 echo "Log file for import is $LOGFILE"
 
-"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.hive.bridge.HiveMetaStoreBridge
+"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.hive.bridge.HiveMetaStoreBridge $allargs
 
 RETVAL=$?
-[ $RETVAL -eq 0 ] && echo Hive Data Model imported successfully!!!
-[ $RETVAL -ne 0 ] && echo Failed to import Hive Data Model!!!
+[ $RETVAL -eq 0 ] && echo Hive Meta Data imported successfully!!!
+[ $RETVAL -ne 0 ] && echo Failed to import Hive Meta Data!!!

http://git-wip-us.apache.org/repos/asf/atlas/blob/bd39a509/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 51df8d2..09c17a9 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -36,11 +36,11 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasStruct;
+import org.apache.commons.cli.ParseException;
 import org.apache.commons.collections.CollectionUtils;
 
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.configuration.Configuration;
@@ -86,7 +86,9 @@ public class HiveMetaStoreBridge {
     public static final String SEP                             = ":".intern();
     public static final String HDFS_PATH                       = "hdfs_path";
 
-    private static final String DEFAULT_ATLAS_URL            = "http://localhost:21000/";
+    private static final int    EXIT_CODE_SUCCESS = 0;
+    private static final int    EXIT_CODE_FAILED  = 1;
+    private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
 
     private final HdfsNameServiceResolver hdfsNameServiceResolver = HdfsNameServiceResolver.getInstance();
     private final String                  clusterName;
@@ -95,16 +97,27 @@ public class HiveMetaStoreBridge {
     private final boolean                 convertHdfsPathToLowerCase;
 
 
-    public static void main(String[] args) throws AtlasHookException {
-        try {
-            Configuration atlasConf     = ApplicationProperties.get();
-            String[]      atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT);
+    public static void main(String[] args) {
+        int exitCode = EXIT_CODE_FAILED;
 
-            if (atlasEndpoint == null || atlasEndpoint.length == 0){
+        try {
+            Options options = new Options();
+            options.addOption("d", "database", true, "Databbase name");
+            options.addOption("t", "table", true, "Table name");
+            options.addOption("failOnError", false, "failOnError");
+
+            CommandLine   cmd              = new BasicParser().parse(options, args);
+            boolean       failOnError      = cmd.hasOption("failOnError");
+            String        databaseToImport = cmd.getOptionValue("d");
+            String        tableToImport    = cmd.getOptionValue("t");
+            Configuration atlasConf        = ApplicationProperties.get();
+            String[]      atlasEndpoint    = atlasConf.getStringArray(ATLAS_ENDPOINT);
+
+            if (atlasEndpoint == null || atlasEndpoint.length == 0) {
                 atlasEndpoint = new String[] { DEFAULT_ATLAS_URL };
             }
 
-            AtlasClientV2 atlasClientV2;
+            final AtlasClientV2 atlasClientV2;
 
             if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
                 String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
@@ -116,17 +129,35 @@ public class HiveMetaStoreBridge {
                 atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint);
             }
 
-            Options           options     = new Options();
-            CommandLineParser parser      = new BasicParser();
-            CommandLine       cmd         = parser.parse(options, args);
-            boolean           failOnError = cmd.hasOption("failOnError");
-
             HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2);
 
-            hiveMetaStoreBridge.importHiveMetadata(failOnError);
+            hiveMetaStoreBridge.importHiveMetadata(databaseToImport, tableToImport, failOnError);
+
+            exitCode = EXIT_CODE_SUCCESS;
+        } catch(ParseException e) {
+            LOG.error("Failed to parse arguments. Error: ", e.getMessage());
+
+            printUsage();
         } catch(Exception e) {
-            throw new AtlasHookException("HiveMetaStoreBridge.main() failed.", e);
+            LOG.error("Import failed", e);
         }
+
+        System.exit(exitCode);
+    }
+
+    private static void printUsage() {
+        System.out.println();
+        System.out.println();
+        System.out.println("Usage 1: import-hive.sh [-d <database> OR --database <database>] "  );
+        System.out.println("    Imports specified database and its tables ...");
+        System.out.println();
+        System.out.println("Usage 2: import-hive.sh [-d <database> OR --database <database>] [-t <table> OR --table <table>]");
+        System.out.println("    Imports specified table within that database ...");
+        System.out.println();
+        System.out.println("Usage 3: import-hive.sh");
+        System.out.println("    Imports all databases and tables...");
+        System.out.println();
+        System.out.println();
     }
 
     /**
@@ -174,23 +205,33 @@ public class HiveMetaStoreBridge {
 
 
     @VisibleForTesting
-    public void importHiveMetadata(boolean failOnError) throws Exception {
+    public void importHiveMetadata(String databaseToImport, String tableToImport, boolean failOnError) throws Exception {
         LOG.info("Importing Hive metadata");
 
-        importDatabases(failOnError);
+        importDatabases(failOnError, databaseToImport, tableToImport);
     }
 
-    private void importDatabases(boolean failOnError) throws Exception {
-        List<String> databases = hiveClient.getAllDatabases();
+    private void importDatabases(boolean failOnError, String databaseToImport, String tableToImport) throws Exception {
+        final List<String> databaseNames;
+
+        if (StringUtils.isEmpty(databaseToImport)) {
+            databaseNames = hiveClient.getAllDatabases();
+        } else {
+            databaseNames = hiveClient.getDatabasesByPattern(databaseToImport);
+        }
 
-        LOG.info("Found {} databases", databases.size());
+        if(!CollectionUtils.isEmpty(databaseNames)) {
+            LOG.info("Found {} databases", databaseNames.size());
 
-        for (String databaseName : databases) {
-            AtlasEntityWithExtInfo dbEntity = registerDatabase(databaseName);
+            for (String databaseName : databaseNames) {
+                AtlasEntityWithExtInfo dbEntity = registerDatabase(databaseName);
 
-            if (dbEntity != null) {
-                importTables(dbEntity.getEntity(), databaseName, failOnError);
+                if (dbEntity != null) {
+                    importTables(dbEntity.getEntity(), databaseName, tableToImport, failOnError);
+                }
             }
+        } else {
+            LOG.info("No database found");
         }
     }
 
@@ -201,25 +242,35 @@ public class HiveMetaStoreBridge {
      * @param failOnError
      * @throws Exception
      */
-    private int importTables(AtlasEntity dbEntity, String databaseName, final boolean failOnError) throws Exception {
-        List<String> hiveTables = hiveClient.getAllTables(databaseName);
+    private int importTables(AtlasEntity dbEntity, String databaseName, String tblName, final boolean failOnError) throws Exception {
+        int tablesImported = 0;
 
-        LOG.info("Found {} tables in database {}", hiveTables.size(), databaseName);
+        final List<String> tableNames;
 
-        int tablesImported = 0;
+        if (StringUtils.isEmpty(tblName)) {
+            tableNames = hiveClient.getAllTables(databaseName);
+        } else {
+            tableNames = hiveClient.getTablesByPattern(databaseName, tblName);
+        }
 
-        try {
-            for (String tableName : hiveTables) {
-                int imported = importTable(dbEntity, databaseName, tableName, failOnError);
+        if(!CollectionUtils.isEmpty(tableNames)) {
+            LOG.info("Found {} tables to import in database {}", tableNames.size(), databaseName);
 
-                tablesImported += imported;
-            }
-        } finally {
-            if (tablesImported == hiveTables.size()) {
-                LOG.info("Successfully imported all {} tables from database {}", tablesImported, databaseName);
-            } else {
-                LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import", tablesImported, hiveTables.size(), databaseName);
+            try {
+                for (String tableName : tableNames) {
+                    int imported = importTable(dbEntity, databaseName, tableName, failOnError);
+
+                    tablesImported += imported;
+                }
+            } finally {
+                if (tablesImported == tableNames.size()) {
+                    LOG.info("Successfully imported {} tables from database {}", tablesImported, databaseName);
+                } else {
+                    LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import", tablesImported, tableNames.size(), databaseName);
+                }
             }
+        } else {
+            LOG.info("No tables to import in database {}", databaseName);
         }
 
         return tablesImported;

http://git-wip-us.apache.org/repos/asf/atlas/blob/bd39a509/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 3bf0aab..78f2e83 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -82,77 +82,81 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             LOG.debug("==> HiveHook.run({})", hookContext.getOperationName());
         }
 
-        HiveOperation        oper    = OPERATION_MAP.get(hookContext.getOperationName());
-        AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext);
-
-        BaseHiveEvent event = null;
-
-        switch (oper) {
-            case CREATEDATABASE:
-                event = new CreateDatabase(context);
-            break;
-
-            case DROPDATABASE:
-                event = new DropDatabase(context);
-            break;
-
-            case ALTERDATABASE:
-            case ALTERDATABASE_OWNER:
-                event = new AlterDatabase(context);
-            break;
-
-            case CREATETABLE:
-                event = new CreateTable(context, true);
-            break;
-
-            case DROPTABLE:
-            case DROPVIEW:
-                event = new DropTable(context);
-            break;
-
-            case CREATETABLE_AS_SELECT:
-            case CREATEVIEW:
-            case ALTERVIEW_AS:
-            case LOAD:
-            case EXPORT:
-            case IMPORT:
-            case QUERY:
-            case TRUNCATETABLE:
-                event = new CreateHiveProcess(context);
-            break;
-
-            case ALTERTABLE_FILEFORMAT:
-            case ALTERTABLE_CLUSTER_SORT:
-            case ALTERTABLE_BUCKETNUM:
-            case ALTERTABLE_PROPERTIES:
-            case ALTERVIEW_PROPERTIES:
-            case ALTERTABLE_SERDEPROPERTIES:
-            case ALTERTABLE_SERIALIZER:
-            case ALTERTABLE_ADDCOLS:
-            case ALTERTABLE_REPLACECOLS:
-            case ALTERTABLE_PARTCOLTYPE:
-            case ALTERTABLE_LOCATION:
-                event = new AlterTable(context);
-            break;
-
-            case ALTERTABLE_RENAME:
-            case ALTERVIEW_RENAME:
-                event = new AlterTableRename(context);
-            break;
-
-            case ALTERTABLE_RENAMECOL:
-                event = new AlterTableRenameCol(context);
-            break;
-
-            default:
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("HiveHook.run({}): operation ignored", hookContext.getOperationName());
-                }
-            break;
-        }
+        try {
+            HiveOperation        oper    = OPERATION_MAP.get(hookContext.getOperationName());
+            AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext);
+
+            BaseHiveEvent event = null;
+
+            switch (oper) {
+                case CREATEDATABASE:
+                    event = new CreateDatabase(context);
+                break;
+
+                case DROPDATABASE:
+                    event = new DropDatabase(context);
+                break;
+
+                case ALTERDATABASE:
+                case ALTERDATABASE_OWNER:
+                    event = new AlterDatabase(context);
+                break;
+
+                case CREATETABLE:
+                    event = new CreateTable(context, true);
+                break;
+
+                case DROPTABLE:
+                case DROPVIEW:
+                    event = new DropTable(context);
+                break;
+
+                case CREATETABLE_AS_SELECT:
+                case CREATEVIEW:
+                case ALTERVIEW_AS:
+                case LOAD:
+                case EXPORT:
+                case IMPORT:
+                case QUERY:
+                case TRUNCATETABLE:
+                    event = new CreateHiveProcess(context);
+                break;
+
+                case ALTERTABLE_FILEFORMAT:
+                case ALTERTABLE_CLUSTER_SORT:
+                case ALTERTABLE_BUCKETNUM:
+                case ALTERTABLE_PROPERTIES:
+                case ALTERVIEW_PROPERTIES:
+                case ALTERTABLE_SERDEPROPERTIES:
+                case ALTERTABLE_SERIALIZER:
+                case ALTERTABLE_ADDCOLS:
+                case ALTERTABLE_REPLACECOLS:
+                case ALTERTABLE_PARTCOLTYPE:
+                case ALTERTABLE_LOCATION:
+                    event = new AlterTable(context);
+                break;
+
+                case ALTERTABLE_RENAME:
+                case ALTERVIEW_RENAME:
+                    event = new AlterTableRename(context);
+                break;
+
+                case ALTERTABLE_RENAMECOL:
+                    event = new AlterTableRenameCol(context);
+                break;
+
+                default:
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("HiveHook.run({}): operation ignored", hookContext.getOperationName());
+                    }
+                break;
+            }
 
-        if (event != null) {
-            super.notifyEntities(event.getNotificationMessages());
+            if (event != null) {
+                super.notifyEntities(event.getNotificationMessages());
+            }
+        } catch (Throwable t) {
+            LOG.error("HiveHook.run(): failed to process operation {}", hookContext.getOperationName(), t);
         }
 
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/bd39a509/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
index 271511e..d55aa53 100644
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
@@ -97,7 +97,7 @@ public class HiveMetaStoreBridgeTest {
                         getEntity(HiveDataTypes.HIVE_DB.getName(), AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76"))).getEntity());
 
         HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
-        bridge.importHiveMetadata(true);
+        bridge.importHiveMetadata(null, null, true);
 
         // verify update is called
         verify(atlasClientV2).updateEntity(anyObject());
@@ -126,7 +126,8 @@ public class HiveMetaStoreBridgeTest {
         when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77"))
                 .thenReturn(createTableReference());
 
-        String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(0));
+        Table testTable =  hiveTables.get(0);
+        String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable);
 
         when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
             Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
@@ -136,7 +137,7 @@ public class HiveMetaStoreBridgeTest {
 
 
         HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
-        bridge.importHiveMetadata(true);
+        bridge.importHiveMetadata(null, null, true);
 
         // verify update is called on table
         verify(atlasClientV2, times(2)).updateEntity(anyObject());
@@ -207,7 +208,7 @@ public class HiveMetaStoreBridgeTest {
 
         HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
         try {
-            bridge.importHiveMetadata(true);
+            bridge.importHiveMetadata(null, null, true);
         } catch (Exception e) {
             Assert.fail("Partition with null key caused import to fail with exception ", e);
         }
@@ -231,7 +232,8 @@ public class HiveMetaStoreBridgeTest {
         when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77"))
                 .thenReturn(createTableReference());
 
-        String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(1));
+        Table testTable =  hiveTables.get(1);
+        String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable);
 
         when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
                 Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
@@ -241,7 +243,7 @@ public class HiveMetaStoreBridgeTest {
 
         HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
         try {
-            bridge.importHiveMetadata(false);
+            bridge.importHiveMetadata(null, null, false);
         } catch (Exception e) {
             Assert.fail("Table registration failed with exception", e);
         }
@@ -267,7 +269,8 @@ public class HiveMetaStoreBridgeTest {
         when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77"))
                 .thenReturn(createTableReference());
 
-        String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(1));
+        Table testTable  = hiveTables.get(1);
+        String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable);
 
         when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
                 Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
@@ -277,7 +280,7 @@ public class HiveMetaStoreBridgeTest {
 
         HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
         try {
-            bridge.importHiveMetadata(true);
+            bridge.importHiveMetadata(null, null, true);
             Assert.fail("Table registration is supposed to fail");
         } catch (Exception e) {
             //Expected