You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2020/10/19 08:26:19 UTC
[atlas] branch master updated: ATLAS-3938 : Import Hive Script:
Support deletion of non existing database and table entities
This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new b4e4f60 ATLAS-3938 : Import Hive Script: Support deletion of non existing database and table entities
b4e4f60 is described below
commit b4e4f604b46805ade02413da6ef1a68b2f28da71
Author: Pinal <pinal-shah>
AuthorDate: Wed Sep 16 21:50:25 2020 +0530
ATLAS-3938 : Import Hive Script: Support deletion of non existing database and table entities
---
addons/hive-bridge/src/bin/import-hive.sh | 1 +
.../atlas/hive/bridge/HiveMetaStoreBridge.java | 233 ++++++++++++++++++++-
.../main/java/org/apache/atlas/AtlasClientV2.java | 28 ++-
3 files changed, 249 insertions(+), 13 deletions(-)
diff --git a/addons/hive-bridge/src/bin/import-hive.sh b/addons/hive-bridge/src/bin/import-hive.sh
index c1adbef..405497b 100755
--- a/addons/hive-bridge/src/bin/import-hive.sh
+++ b/addons/hive-bridge/src/bin/import-hive.sh
@@ -139,6 +139,7 @@ do
--database) IMPORT_ARGS="$IMPORT_ARGS --database $1"; shift;;
--table) IMPORT_ARGS="$IMPORT_ARGS --table $1"; shift;;
--filename) IMPORT_ARGS="$IMPORT_ARGS --filename $1"; shift;;
+ -deleteNonExisting) IMPORT_ARGS="$IMPORT_ARGS -deleteNonExisting";;
"") break;;
*) JVM_ARGS="$JVM_ARGS $option"
esac
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 24e06dc..c0e2c1f 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -28,6 +28,8 @@ import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.hook.events.BaseHiveEvent;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHookException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations;
@@ -61,6 +63,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
@@ -97,12 +100,15 @@ public class HiveMetaStoreBridge {
public static final String SEP = ":".intern();
public static final String HDFS_PATH = "hdfs_path";
public static final String DEFAULT_METASTORE_CATALOG = "hive";
+ public static final String HIVE_TABLE_DB_EDGE_LABEL = "__hive_table.db";
+ public static final String HOOK_HIVE_PAGE_LIMIT = CONF_PREFIX + "page.limit";
public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2 = "v2";
private static final int EXIT_CODE_SUCCESS = 0;
private static final int EXIT_CODE_FAILED = 1;
private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
+ private static int pageLimit = 10000;
private final String metadataNamespace;
private final Hive hiveClient;
@@ -122,9 +128,13 @@ public class HiveMetaStoreBridge {
options.addOption("t", "table", true, "Table name");
options.addOption("f", "filename", true, "Filename");
options.addOption("failOnError", false, "failOnError");
+ options.addOption("deleteNonExisting", false, "Delete database and table entities in Atlas if not present in Hive");
+
+ CommandLine cmd = new BasicParser().parse(options, args);
+ boolean failOnError = cmd.hasOption("failOnError");
+ boolean deleteNonExisting = cmd.hasOption("deleteNonExisting");
+ LOG.info("delete non existing flag : {} ", deleteNonExisting);
- CommandLine cmd = new BasicParser().parse(options, args);
- boolean failOnError = cmd.hasOption("failOnError");
String databaseToImport = cmd.getOptionValue("d");
String tableToImport = cmd.getOptionValue("t");
String fileToImport = cmd.getOptionValue("f");
@@ -148,7 +158,9 @@ public class HiveMetaStoreBridge {
HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2);
- if (StringUtils.isNotEmpty(fileToImport)) {
+ if (deleteNonExisting) {
+ hiveMetaStoreBridge.deleteEntitiesForNonExistingHiveMetadata(failOnError);
+ } else if (StringUtils.isNotEmpty(fileToImport)) {
File f = new File(fileToImport);
if (f.exists() && f.canRead()) {
@@ -212,6 +224,8 @@ public class HiveMetaStoreBridge {
System.out.println(" database1:tbl1");
System.out.println(" database1:tbl2");
System.out.println(" database2:tbl2");
+ System.out.println("Usage 5: import-hive.sh [-deleteNonExisting] " );
+ System.out.println(" Deletes databases and tables which are not in Hive ...");
System.out.println();
}
@@ -225,6 +239,9 @@ public class HiveMetaStoreBridge {
this.atlasClientV2 = atlasClientV2;
this.convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
this.awsS3AtlasModelVersion = atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
+ if (atlasProperties != null) {
+ pageLimit = atlasProperties.getInteger(HOOK_HIVE_PAGE_LIMIT, 10000);
+ }
}
/**
@@ -959,4 +976,214 @@ public class HiveMetaStoreBridge {
}
return ret;
}
+
+ private List<AtlasEntityHeader> getAllDatabaseInCluster() throws AtlasServiceException {
+
+ List<AtlasEntityHeader> entities = new ArrayList<>();
+ final int pageSize = pageLimit;
+
+ SearchParameters.FilterCriteria fc = new SearchParameters.FilterCriteria();
+ fc.setAttributeName(ATTRIBUTE_CLUSTER_NAME);
+ fc.setAttributeValue(metadataNamespace);
+ fc.setOperator(SearchParameters.Operator.EQ);
+
+ for (int i = 0; ; i++) {
+ int offset = pageSize * i;
+ LOG.info("Retrieving databases: offset={}, pageSize={}", offset, pageSize);
+
+ AtlasSearchResult searchResult = atlasClientV2.basicSearch(HIVE_TYPE_DB, fc,null, null, true, pageSize, offset);
+
+ List<AtlasEntityHeader> entityHeaders = searchResult == null ? null : searchResult.getEntities();
+ int dbCount = entityHeaders == null ? 0 : entityHeaders.size();
+
+ LOG.info("Retrieved {} databases of {} cluster", dbCount, metadataNamespace);
+
+ if (dbCount > 0) {
+ entities.addAll(entityHeaders);
+ }
+
+ if (dbCount < pageSize) { // last page
+ break;
+ }
+ }
+
+ return entities;
+ }
+
+ private List<AtlasEntityHeader> getAllTablesInDb(String databaseGuid) throws AtlasServiceException {
+
+ List<AtlasEntityHeader> entities = new ArrayList<>();
+ final int pageSize = pageLimit;
+
+ for (int i = 0; ; i++) {
+ int offset = pageSize * i;
+ LOG.info("Retrieving tables: offset={}, pageSize={}", offset, pageSize);
+
+ AtlasSearchResult searchResult = atlasClientV2.relationshipSearch(databaseGuid, HIVE_TABLE_DB_EDGE_LABEL, null, null, true, pageSize, offset);
+
+ List<AtlasEntityHeader> entityHeaders = searchResult == null ? null : searchResult.getEntities();
+ int tableCount = entityHeaders == null ? 0 : entityHeaders.size();
+
+ LOG.info("Retrieved {} tables of {} database", tableCount, databaseGuid);
+
+ if (tableCount > 0) {
+ entities.addAll(entityHeaders);
+ }
+
+ if (tableCount < pageSize) { // last page
+ break;
+ }
+ }
+
+ return entities;
+ }
+
+ public String getHiveDatabaseName(String qualifiedName) {
+
+ if (StringUtils.isNotEmpty(qualifiedName)) {
+ String[] split = qualifiedName.split("@");
+ if (split.length > 0) {
+ return split[0];
+ }
+ }
+ return null;
+ }
+
+
+ public String getHiveTableName(String qualifiedName, boolean isTemporary) {
+
+ if (StringUtils.isNotEmpty(qualifiedName)) {
+ String tableName = StringUtils.substringBetween(qualifiedName, ".", "@");
+ if (!isTemporary) {
+ return tableName;
+ } else {
+ if (StringUtils.isNotEmpty(tableName)) {
+ String[] splitTemp = tableName.split(TEMP_TABLE_PREFIX);
+ if (splitTemp.length > 0) {
+ return splitTemp[0];
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ private void deleteByGuid(List<String> guidTodelete) throws AtlasServiceException {
+
+ if (CollectionUtils.isNotEmpty(guidTodelete)) {
+
+ for (String guid : guidTodelete) {
+ EntityMutationResponse response = atlasClientV2.deleteEntityByGuid(guid);
+
+ if (response.getDeletedEntities().size() < 1) {
+ LOG.info("Entity with guid : {} is not deleted", guid);
+ } else {
+ LOG.info("Entity with guid : {} is deleted", guid);
+ }
+ }
+ } else {
+ LOG.info("No Entity to delete from Atlas");
+ }
+ }
+
+ public void deleteEntitiesForNonExistingHiveMetadata(boolean failOnError) throws Exception {
+
+ //fetch databases from Atlas
+ List<AtlasEntityHeader> dbs = null;
+ try {
+ dbs = getAllDatabaseInCluster();
+ LOG.info("Total Databases in cluster {} : {} ", metadataNamespace, dbs.size());
+ } catch (AtlasServiceException e) {
+ LOG.error("Failed to retrieve database entities for cluster {} from Atlas", metadataNamespace, e);
+ if (failOnError) {
+ throw e;
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(dbs)) {
+ //iterate all dbs to check if exists in hive
+ for (AtlasEntityHeader db : dbs) {
+
+ String dbGuid = db.getGuid();
+ String hiveDbName = getHiveDatabaseName((String) db.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+
+ if (StringUtils.isEmpty(hiveDbName)) {
+ LOG.error("Failed to get database from qualifiedName: {}, guid: {} ", db.getAttribute(ATTRIBUTE_QUALIFIED_NAME), dbGuid);
+ continue;
+ }
+
+ List<AtlasEntityHeader> tables;
+ try {
+ tables = getAllTablesInDb(dbGuid);
+ LOG.info("Total Tables in database {} : {} ", hiveDbName, tables.size());
+ } catch (AtlasServiceException e) {
+ LOG.error("Failed to retrieve table entities for database {} from Atlas", hiveDbName, e);
+ if (failOnError) {
+ throw e;
+ }
+ continue;
+ }
+
+ List<String> guidsToDelete = new ArrayList<>();
+ if (!hiveClient.databaseExists(hiveDbName)) {
+
+ //table guids
+ if (CollectionUtils.isNotEmpty(tables)) {
+ for (AtlasEntityHeader table : tables) {
+ guidsToDelete.add(table.getGuid());
+ }
+ }
+
+ //db guid
+ guidsToDelete.add(db.getGuid());
+ LOG.info("Added database {}.{} and its {} tables to delete", metadataNamespace, hiveDbName, tables.size());
+
+ } else {
+ //iterate all table of db to check if it exists
+ if (CollectionUtils.isNotEmpty(tables)) {
+ for (AtlasEntityHeader table : tables) {
+ String hiveTableName = getHiveTableName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), true);
+
+ if (StringUtils.isEmpty(hiveTableName)) {
+ LOG.error("Failed to get table from qualifiedName: {}, guid: {} ", table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), table.getGuid());
+ continue;
+ }
+
+ try {
+ hiveClient.getTable(hiveDbName, hiveTableName, true);
+ } catch (InvalidTableException e) { //table doesn't exists
+ LOG.info("Added table {}.{} to delete", hiveDbName, hiveTableName);
+
+ guidsToDelete.add(table.getGuid());
+ } catch (HiveException e) {
+ LOG.error("Failed to get table {}.{} from Hive", hiveDbName, hiveTableName, e);
+
+ if (failOnError) {
+ throw e;
+ }
+ }
+ }
+ }
+ }
+
+ //delete entities
+ if (CollectionUtils.isNotEmpty(guidsToDelete)) {
+ try {
+ deleteByGuid(guidsToDelete);
+ } catch (AtlasServiceException e) {
+ LOG.error("Failed to delete Atlas entities for database {}", hiveDbName, e);
+
+ if (failOnError) {
+ throw e;
+ }
+ }
+
+ }
+ }
+
+ } else {
+ LOG.info("No database found in service.");
+ }
+
+ }
}
diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
index 18811f8..7bf5023 100644
--- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -123,6 +123,8 @@ public class AtlasClientV2 extends AtlasBaseClient {
private static final String RELATIONSHIPS_URI = BASE_URI + "v2/relationship/";
private static final String BULK_HEADERS = "bulk/headers";
private static final String BULK_SET_CLASSIFICATIONS = "bulk/setClassifications";
+ private static final String RELATIONSHIP_URI = DISCOVERY_URI + "/relationship";
+
//Glossary APIs
private static final String GLOSSARY_URI = BASE_URI + "v2/glossary";
@@ -664,16 +666,22 @@ public class AtlasClientV2 extends AtlasBaseClient {
}
public AtlasSearchResult basicSearch(String typeName, String classification, String query, boolean excludeDeletedEntities, int limit, int offset) throws AtlasServiceException {
- MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
-
- queryParams.add("typeName", typeName);
- queryParams.add("classification", classification);
- queryParams.add(QUERY, query);
- queryParams.add("excludeDeletedEntities", String.valueOf(excludeDeletedEntities));
- queryParams.add(LIMIT, String.valueOf(limit));
- queryParams.add(OFFSET, String.valueOf(offset));
+ return this.basicSearch(typeName, null, classification, query, excludeDeletedEntities, limit, offset);
+ }
+
+ public AtlasSearchResult basicSearch(String typeName, SearchParameters.FilterCriteria entityFilters, String classification, String query, boolean excludeDeletedEntities, int limit, int offset) throws AtlasServiceException {
+ SearchParameters parameters = new SearchParameters();
+ parameters.setTypeName(typeName);
+ parameters.setClassification(classification);
+ parameters.setQuery(query);
+ parameters.setExcludeDeletedEntities(excludeDeletedEntities);
+ parameters.setLimit(limit);
+ parameters.setOffset(offset);
+ if (entityFilters != null){
+ parameters.setEntityFilters(entityFilters);
+ }
- return callAPI(API_V2.BASIC_SEARCH, AtlasSearchResult.class, queryParams);
+ return callAPI(API_V2.BASIC_SEARCH, AtlasSearchResult.class, parameters);
}
public AtlasSearchResult facetedSearch(SearchParameters searchParameters) throws AtlasServiceException {
@@ -1202,7 +1210,7 @@ public class AtlasClientV2 extends AtlasBaseClient {
// Discovery APIs
public static final API_V2 DSL_SEARCH = new API_V2(DSL_SEARCH_URI, HttpMethod.GET, Response.Status.OK);
public static final API_V2 FULL_TEXT_SEARCH = new API_V2(FULL_TEXT_SEARCH_URI, HttpMethod.GET, Response.Status.OK);
- public static final API_V2 BASIC_SEARCH = new API_V2(BASIC_SEARCH_URI, HttpMethod.GET, Response.Status.OK);
+ public static final API_V2 BASIC_SEARCH = new API_V2(BASIC_SEARCH_URI, HttpMethod.POST, Response.Status.OK);
public static final API_V2 FACETED_SEARCH = new API_V2(FACETED_SEARCH_URI, HttpMethod.POST, Response.Status.OK);
public static final API_V2 ATTRIBUTE_SEARCH = new API_V2(DISCOVERY_URI+ "/attribute", HttpMethod.GET, Response.Status.OK);
public static final API_V2 RELATIONSHIP_SEARCH = new API_V2(DISCOVERY_URI+ "/relationship", HttpMethod.GET, Response.Status.OK);