You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2018/03/26 06:37:23 UTC
atlas git commit: ATLAS-2511: updated import-hive utility to add
options to selectively import given database/tables
Repository: atlas
Updated Branches:
refs/heads/master 31eb3664c -> bd39a509a
ATLAS-2511: updated import-hive utility to add options to selectively import given database/tables
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/bd39a509
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/bd39a509
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/bd39a509
Branch: refs/heads/master
Commit: bd39a509a20ff7860fdfdcab67e46cc503038f1e
Parents: 31eb366
Author: rmani <rm...@hortonworks.com>
Authored: Thu Mar 22 12:33:47 2018 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Sun Mar 25 23:37:11 2018 -0700
----------------------------------------------------------------------
addons/hive-bridge/src/bin/import-hive.sh | 8 +-
.../atlas/hive/bridge/HiveMetaStoreBridge.java | 127 +++++++++++-----
.../org/apache/atlas/hive/hook/HiveHook.java | 144 ++++++++++---------
.../hive/bridge/HiveMetaStoreBridgeTest.java | 19 +--
4 files changed, 179 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/bd39a509/addons/hive-bridge/src/bin/import-hive.sh
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/bin/import-hive.sh b/addons/hive-bridge/src/bin/import-hive.sh
index 47581ac..98f4c84 100755
--- a/addons/hive-bridge/src/bin/import-hive.sh
+++ b/addons/hive-bridge/src/bin/import-hive.sh
@@ -31,6 +31,8 @@ done
BASEDIR=`dirname ${PRG}`
BASEDIR=`cd ${BASEDIR}/..;pwd`
+allargs=$@
+
if test -z "${JAVA_HOME}"
then
JAVA_BIN=`which java`
@@ -128,8 +130,8 @@ done
echo "Log file for import is $LOGFILE"
-"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.hive.bridge.HiveMetaStoreBridge
+"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.hive.bridge.HiveMetaStoreBridge $allargs
RETVAL=$?
-[ $RETVAL -eq 0 ] && echo Hive Data Model imported successfully!!!
-[ $RETVAL -ne 0 ] && echo Failed to import Hive Data Model!!!
+[ $RETVAL -eq 0 ] && echo Hive Meta Data imported successfully!!!
+[ $RETVAL -ne 0 ] && echo Failed to import Hive Meta Data!!!
http://git-wip-us.apache.org/repos/asf/atlas/blob/bd39a509/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 51df8d2..09c17a9 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -36,11 +36,11 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
+import org.apache.commons.cli.ParseException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
@@ -86,7 +86,9 @@ public class HiveMetaStoreBridge {
public static final String SEP = ":".intern();
public static final String HDFS_PATH = "hdfs_path";
- private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
+ private static final int EXIT_CODE_SUCCESS = 0;
+ private static final int EXIT_CODE_FAILED = 1;
+ private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
private final HdfsNameServiceResolver hdfsNameServiceResolver = HdfsNameServiceResolver.getInstance();
private final String clusterName;
@@ -95,16 +97,27 @@ public class HiveMetaStoreBridge {
private final boolean convertHdfsPathToLowerCase;
- public static void main(String[] args) throws AtlasHookException {
- try {
- Configuration atlasConf = ApplicationProperties.get();
- String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT);
+ public static void main(String[] args) {
+ int exitCode = EXIT_CODE_FAILED;
- if (atlasEndpoint == null || atlasEndpoint.length == 0){
+ try {
+ Options options = new Options();
+ options.addOption("d", "database", true, "Databbase name");
+ options.addOption("t", "table", true, "Table name");
+ options.addOption("failOnError", false, "failOnError");
+
+ CommandLine cmd = new BasicParser().parse(options, args);
+ boolean failOnError = cmd.hasOption("failOnError");
+ String databaseToImport = cmd.getOptionValue("d");
+ String tableToImport = cmd.getOptionValue("t");
+ Configuration atlasConf = ApplicationProperties.get();
+ String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT);
+
+ if (atlasEndpoint == null || atlasEndpoint.length == 0) {
atlasEndpoint = new String[] { DEFAULT_ATLAS_URL };
}
- AtlasClientV2 atlasClientV2;
+ final AtlasClientV2 atlasClientV2;
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
@@ -116,17 +129,35 @@ public class HiveMetaStoreBridge {
atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint);
}
- Options options = new Options();
- CommandLineParser parser = new BasicParser();
- CommandLine cmd = parser.parse(options, args);
- boolean failOnError = cmd.hasOption("failOnError");
-
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2);
- hiveMetaStoreBridge.importHiveMetadata(failOnError);
+ hiveMetaStoreBridge.importHiveMetadata(databaseToImport, tableToImport, failOnError);
+
+ exitCode = EXIT_CODE_SUCCESS;
+ } catch(ParseException e) {
+ LOG.error("Failed to parse arguments. Error: ", e.getMessage());
+
+ printUsage();
} catch(Exception e) {
- throw new AtlasHookException("HiveMetaStoreBridge.main() failed.", e);
+ LOG.error("Import failed", e);
}
+
+ System.exit(exitCode);
+ }
+
+ private static void printUsage() {
+ System.out.println();
+ System.out.println();
+ System.out.println("Usage 1: import-hive.sh [-d <database> OR --database <database>] " );
+ System.out.println(" Imports specified database and its tables ...");
+ System.out.println();
+ System.out.println("Usage 2: import-hive.sh [-d <database> OR --database <database>] [-t <table> OR --table <table>]");
+ System.out.println(" Imports specified table within that database ...");
+ System.out.println();
+ System.out.println("Usage 3: import-hive.sh");
+ System.out.println(" Imports all databases and tables...");
+ System.out.println();
+ System.out.println();
}
/**
@@ -174,23 +205,33 @@ public class HiveMetaStoreBridge {
@VisibleForTesting
- public void importHiveMetadata(boolean failOnError) throws Exception {
+ public void importHiveMetadata(String databaseToImport, String tableToImport, boolean failOnError) throws Exception {
LOG.info("Importing Hive metadata");
- importDatabases(failOnError);
+ importDatabases(failOnError, databaseToImport, tableToImport);
}
- private void importDatabases(boolean failOnError) throws Exception {
- List<String> databases = hiveClient.getAllDatabases();
+ private void importDatabases(boolean failOnError, String databaseToImport, String tableToImport) throws Exception {
+ final List<String> databaseNames;
+
+ if (StringUtils.isEmpty(databaseToImport)) {
+ databaseNames = hiveClient.getAllDatabases();
+ } else {
+ databaseNames = hiveClient.getDatabasesByPattern(databaseToImport);
+ }
- LOG.info("Found {} databases", databases.size());
+ if(!CollectionUtils.isEmpty(databaseNames)) {
+ LOG.info("Found {} databases", databaseNames.size());
- for (String databaseName : databases) {
- AtlasEntityWithExtInfo dbEntity = registerDatabase(databaseName);
+ for (String databaseName : databaseNames) {
+ AtlasEntityWithExtInfo dbEntity = registerDatabase(databaseName);
- if (dbEntity != null) {
- importTables(dbEntity.getEntity(), databaseName, failOnError);
+ if (dbEntity != null) {
+ importTables(dbEntity.getEntity(), databaseName, tableToImport, failOnError);
+ }
}
+ } else {
+ LOG.info("No database found");
}
}
@@ -201,25 +242,35 @@ public class HiveMetaStoreBridge {
* @param failOnError
* @throws Exception
*/
- private int importTables(AtlasEntity dbEntity, String databaseName, final boolean failOnError) throws Exception {
- List<String> hiveTables = hiveClient.getAllTables(databaseName);
+ private int importTables(AtlasEntity dbEntity, String databaseName, String tblName, final boolean failOnError) throws Exception {
+ int tablesImported = 0;
- LOG.info("Found {} tables in database {}", hiveTables.size(), databaseName);
+ final List<String> tableNames;
- int tablesImported = 0;
+ if (StringUtils.isEmpty(tblName)) {
+ tableNames = hiveClient.getAllTables(databaseName);
+ } else {
+ tableNames = hiveClient.getTablesByPattern(databaseName, tblName);
+ }
- try {
- for (String tableName : hiveTables) {
- int imported = importTable(dbEntity, databaseName, tableName, failOnError);
+ if(!CollectionUtils.isEmpty(tableNames)) {
+ LOG.info("Found {} tables to import in database {}", tableNames.size(), databaseName);
- tablesImported += imported;
- }
- } finally {
- if (tablesImported == hiveTables.size()) {
- LOG.info("Successfully imported all {} tables from database {}", tablesImported, databaseName);
- } else {
- LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import", tablesImported, hiveTables.size(), databaseName);
+ try {
+ for (String tableName : tableNames) {
+ int imported = importTable(dbEntity, databaseName, tableName, failOnError);
+
+ tablesImported += imported;
+ }
+ } finally {
+ if (tablesImported == tableNames.size()) {
+ LOG.info("Successfully imported {} tables from database {}", tablesImported, databaseName);
+ } else {
+ LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import", tablesImported, tableNames.size(), databaseName);
+ }
}
+ } else {
+ LOG.info("No tables to import in database {}", databaseName);
}
return tablesImported;
http://git-wip-us.apache.org/repos/asf/atlas/blob/bd39a509/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 3bf0aab..78f2e83 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -82,77 +82,81 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
LOG.debug("==> HiveHook.run({})", hookContext.getOperationName());
}
- HiveOperation oper = OPERATION_MAP.get(hookContext.getOperationName());
- AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext);
-
- BaseHiveEvent event = null;
-
- switch (oper) {
- case CREATEDATABASE:
- event = new CreateDatabase(context);
- break;
-
- case DROPDATABASE:
- event = new DropDatabase(context);
- break;
-
- case ALTERDATABASE:
- case ALTERDATABASE_OWNER:
- event = new AlterDatabase(context);
- break;
-
- case CREATETABLE:
- event = new CreateTable(context, true);
- break;
-
- case DROPTABLE:
- case DROPVIEW:
- event = new DropTable(context);
- break;
-
- case CREATETABLE_AS_SELECT:
- case CREATEVIEW:
- case ALTERVIEW_AS:
- case LOAD:
- case EXPORT:
- case IMPORT:
- case QUERY:
- case TRUNCATETABLE:
- event = new CreateHiveProcess(context);
- break;
-
- case ALTERTABLE_FILEFORMAT:
- case ALTERTABLE_CLUSTER_SORT:
- case ALTERTABLE_BUCKETNUM:
- case ALTERTABLE_PROPERTIES:
- case ALTERVIEW_PROPERTIES:
- case ALTERTABLE_SERDEPROPERTIES:
- case ALTERTABLE_SERIALIZER:
- case ALTERTABLE_ADDCOLS:
- case ALTERTABLE_REPLACECOLS:
- case ALTERTABLE_PARTCOLTYPE:
- case ALTERTABLE_LOCATION:
- event = new AlterTable(context);
- break;
-
- case ALTERTABLE_RENAME:
- case ALTERVIEW_RENAME:
- event = new AlterTableRename(context);
- break;
-
- case ALTERTABLE_RENAMECOL:
- event = new AlterTableRenameCol(context);
- break;
-
- default:
- if (LOG.isDebugEnabled()) {
- LOG.debug("HiveHook.run({}): operation ignored", hookContext.getOperationName());
- }
- break;
- }
+ try {
+ HiveOperation oper = OPERATION_MAP.get(hookContext.getOperationName());
+ AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext);
+
+ BaseHiveEvent event = null;
+
+ switch (oper) {
+ case CREATEDATABASE:
+ event = new CreateDatabase(context);
+ break;
+
+ case DROPDATABASE:
+ event = new DropDatabase(context);
+ break;
+
+ case ALTERDATABASE:
+ case ALTERDATABASE_OWNER:
+ event = new AlterDatabase(context);
+ break;
+
+ case CREATETABLE:
+ event = new CreateTable(context, true);
+ break;
+
+ case DROPTABLE:
+ case DROPVIEW:
+ event = new DropTable(context);
+ break;
+
+ case CREATETABLE_AS_SELECT:
+ case CREATEVIEW:
+ case ALTERVIEW_AS:
+ case LOAD:
+ case EXPORT:
+ case IMPORT:
+ case QUERY:
+ case TRUNCATETABLE:
+ event = new CreateHiveProcess(context);
+ break;
+
+ case ALTERTABLE_FILEFORMAT:
+ case ALTERTABLE_CLUSTER_SORT:
+ case ALTERTABLE_BUCKETNUM:
+ case ALTERTABLE_PROPERTIES:
+ case ALTERVIEW_PROPERTIES:
+ case ALTERTABLE_SERDEPROPERTIES:
+ case ALTERTABLE_SERIALIZER:
+ case ALTERTABLE_ADDCOLS:
+ case ALTERTABLE_REPLACECOLS:
+ case ALTERTABLE_PARTCOLTYPE:
+ case ALTERTABLE_LOCATION:
+ event = new AlterTable(context);
+ break;
+
+ case ALTERTABLE_RENAME:
+ case ALTERVIEW_RENAME:
+ event = new AlterTableRename(context);
+ break;
+
+ case ALTERTABLE_RENAMECOL:
+ event = new AlterTableRenameCol(context);
+ break;
+
+ default:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("HiveHook.run({}): operation ignored", hookContext.getOperationName());
+ }
+ break;
+ }
- if (event != null) {
- super.notifyEntities(event.getNotificationMessages());
+ if (event != null) {
+ super.notifyEntities(event.getNotificationMessages());
+ }
+ } catch (Throwable t) {
+ LOG.error("HiveHook.run(): failed to process operation {}", hookContext.getOperationName(), t);
}
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/bd39a509/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
index 271511e..d55aa53 100644
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
@@ -97,7 +97,7 @@ public class HiveMetaStoreBridgeTest {
getEntity(HiveDataTypes.HIVE_DB.getName(), AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76"))).getEntity());
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
- bridge.importHiveMetadata(true);
+ bridge.importHiveMetadata(null, null, true);
// verify update is called
verify(atlasClientV2).updateEntity(anyObject());
@@ -126,7 +126,8 @@ public class HiveMetaStoreBridgeTest {
when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77"))
.thenReturn(createTableReference());
- String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(0));
+ Table testTable = hiveTables.get(0);
+ String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable);
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
@@ -136,7 +137,7 @@ public class HiveMetaStoreBridgeTest {
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
- bridge.importHiveMetadata(true);
+ bridge.importHiveMetadata(null, null, true);
// verify update is called on table
verify(atlasClientV2, times(2)).updateEntity(anyObject());
@@ -207,7 +208,7 @@ public class HiveMetaStoreBridgeTest {
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
try {
- bridge.importHiveMetadata(true);
+ bridge.importHiveMetadata(null, null, true);
} catch (Exception e) {
Assert.fail("Partition with null key caused import to fail with exception ", e);
}
@@ -231,7 +232,8 @@ public class HiveMetaStoreBridgeTest {
when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77"))
.thenReturn(createTableReference());
- String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(1));
+ Table testTable = hiveTables.get(1);
+ String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable);
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
@@ -241,7 +243,7 @@ public class HiveMetaStoreBridgeTest {
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
try {
- bridge.importHiveMetadata(false);
+ bridge.importHiveMetadata(null, null, false);
} catch (Exception e) {
Assert.fail("Table registration failed with exception", e);
}
@@ -267,7 +269,8 @@ public class HiveMetaStoreBridgeTest {
when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77"))
.thenReturn(createTableReference());
- String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(1));
+ Table testTable = hiveTables.get(1);
+ String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable);
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
@@ -277,7 +280,7 @@ public class HiveMetaStoreBridgeTest {
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2);
try {
- bridge.importHiveMetadata(true);
+ bridge.importHiveMetadata(null, null, true);
Assert.fail("Table registration is supposed to fail");
} catch (Exception e) {
//Expected