You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2020/10/19 08:30:37 UTC

[atlas] branch branch-2.0 updated: ATLAS-3938 : Import Hive Script: Support deletion of non existing database and table entities

This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new d5ea2b8  ATLAS-3938 : Import Hive Script: Support deletion of non existing database and table entities
d5ea2b8 is described below

commit d5ea2b838a142f17146bb0861eed9cd287179f79
Author: Pinal <pinal-shah>
AuthorDate: Wed Sep 16 21:50:25 2020 +0530

    ATLAS-3938 : Import Hive Script: Support deletion of non existing database and table entities
    
    (cherry picked from commit b4e4f604b46805ade02413da6ef1a68b2f28da71)
---
 addons/hive-bridge/src/bin/import-hive.sh          |   1 +
 .../atlas/hive/bridge/HiveMetaStoreBridge.java     | 233 ++++++++++++++++++++-
 .../main/java/org/apache/atlas/AtlasClientV2.java  |  28 ++-
 3 files changed, 249 insertions(+), 13 deletions(-)

diff --git a/addons/hive-bridge/src/bin/import-hive.sh b/addons/hive-bridge/src/bin/import-hive.sh
index c1adbef..405497b 100755
--- a/addons/hive-bridge/src/bin/import-hive.sh
+++ b/addons/hive-bridge/src/bin/import-hive.sh
@@ -139,6 +139,7 @@ do
     --database) IMPORT_ARGS="$IMPORT_ARGS --database $1"; shift;;
     --table) IMPORT_ARGS="$IMPORT_ARGS --table $1"; shift;;
     --filename) IMPORT_ARGS="$IMPORT_ARGS --filename $1"; shift;;
+    -deleteNonExisting) IMPORT_ARGS="$IMPORT_ARGS -deleteNonExisting";;
     "") break;;
     *) JVM_ARGS="$JVM_ARGS $option"
   esac
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 24e06dc..c0e2c1f 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -28,6 +28,8 @@ import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.hive.hook.events.BaseHiveEvent;
 import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.hook.AtlasHookException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.instance.EntityMutations;
@@ -61,6 +63,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -97,12 +100,15 @@ public class HiveMetaStoreBridge {
     public static final String SEP                             = ":".intern();
     public static final String HDFS_PATH                       = "hdfs_path";
     public static final String DEFAULT_METASTORE_CATALOG       = "hive";
+    public static final String HIVE_TABLE_DB_EDGE_LABEL        = "__hive_table.db";
+    public static final String HOOK_HIVE_PAGE_LIMIT            = CONF_PREFIX + "page.limit";
 
     public static final String HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2  = "v2";
 
     private static final int    EXIT_CODE_SUCCESS = 0;
     private static final int    EXIT_CODE_FAILED  = 1;
     private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
+    private static       int    pageLimit         = 10000;
 
     private final String        metadataNamespace;
     private final Hive          hiveClient;
@@ -122,9 +128,13 @@ public class HiveMetaStoreBridge {
             options.addOption("t", "table", true, "Table name");
             options.addOption("f", "filename", true, "Filename");
             options.addOption("failOnError", false, "failOnError");
+            options.addOption("deleteNonExisting", false, "Delete database and table entities in Atlas if not present in Hive");
+
+            CommandLine   cmd               = new BasicParser().parse(options, args);
+            boolean       failOnError       = cmd.hasOption("failOnError");
+            boolean       deleteNonExisting = cmd.hasOption("deleteNonExisting");
+            LOG.info("delete non existing flag : {} ", deleteNonExisting);
 
-            CommandLine   cmd              = new BasicParser().parse(options, args);
-            boolean       failOnError      = cmd.hasOption("failOnError");
             String        databaseToImport = cmd.getOptionValue("d");
             String        tableToImport    = cmd.getOptionValue("t");
             String        fileToImport     = cmd.getOptionValue("f");
@@ -148,7 +158,9 @@ public class HiveMetaStoreBridge {
 
             HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2);
 
-            if (StringUtils.isNotEmpty(fileToImport)) {
+            if (deleteNonExisting) {
+                hiveMetaStoreBridge.deleteEntitiesForNonExistingHiveMetadata(failOnError);
+            } else if (StringUtils.isNotEmpty(fileToImport)) {
                 File f = new File(fileToImport);
 
                 if (f.exists() && f.canRead()) {
@@ -212,6 +224,8 @@ public class HiveMetaStoreBridge {
         System.out.println("    database1:tbl1");
         System.out.println("    database1:tbl2");
         System.out.println("    database2:tbl2");
+        System.out.println("Usage 5: import-hive.sh [-deleteNonExisting] "  );
+        System.out.println("    Deletes databases and tables which are not in Hive ...");
         System.out.println();
     }
 
@@ -225,6 +239,9 @@ public class HiveMetaStoreBridge {
         this.atlasClientV2              = atlasClientV2;
         this.convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
         this.awsS3AtlasModelVersion     = atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
+        if (atlasProperties != null) {
+            pageLimit = atlasProperties.getInteger(HOOK_HIVE_PAGE_LIMIT, 10000);
+        }
     }
 
     /**
@@ -959,4 +976,214 @@ public class HiveMetaStoreBridge {
         }
         return ret;
     }
+
+    private List<AtlasEntityHeader> getAllDatabaseInCluster() throws AtlasServiceException {
+
+        List<AtlasEntityHeader> entities   = new ArrayList<>();
+        final int               pageSize   = pageLimit;
+
+        SearchParameters.FilterCriteria fc = new SearchParameters.FilterCriteria();
+        fc.setAttributeName(ATTRIBUTE_CLUSTER_NAME);
+        fc.setAttributeValue(metadataNamespace);
+        fc.setOperator(SearchParameters.Operator.EQ);
+
+        for (int i = 0; ; i++) {
+            int offset = pageSize * i;
+            LOG.info("Retrieving databases: offset={}, pageSize={}", offset, pageSize);
+
+            AtlasSearchResult searchResult = atlasClientV2.basicSearch(HIVE_TYPE_DB, fc,null, null, true, pageSize, offset);
+
+            List<AtlasEntityHeader> entityHeaders = searchResult == null ? null : searchResult.getEntities();
+            int                     dbCount       = entityHeaders == null ? 0 : entityHeaders.size();
+
+            LOG.info("Retrieved {} databases of {} cluster", dbCount, metadataNamespace);
+
+            if (dbCount > 0) {
+                entities.addAll(entityHeaders);
+            }
+
+            if (dbCount < pageSize) { // last page
+                break;
+            }
+        }
+
+        return entities;
+    }
+
+    private List<AtlasEntityHeader> getAllTablesInDb(String databaseGuid) throws AtlasServiceException {
+
+        List<AtlasEntityHeader> entities = new ArrayList<>();
+        final int               pageSize = pageLimit;
+
+        for (int i = 0; ; i++) {
+            int offset = pageSize * i;
+            LOG.info("Retrieving tables: offset={}, pageSize={}", offset, pageSize);
+
+            AtlasSearchResult searchResult = atlasClientV2.relationshipSearch(databaseGuid, HIVE_TABLE_DB_EDGE_LABEL, null, null, true, pageSize, offset);
+
+            List<AtlasEntityHeader> entityHeaders = searchResult == null ? null : searchResult.getEntities();
+            int                     tableCount    = entityHeaders == null ? 0 : entityHeaders.size();
+
+            LOG.info("Retrieved {} tables of {} database", tableCount, databaseGuid);
+
+            if (tableCount > 0) {
+                entities.addAll(entityHeaders);
+            }
+
+            if (tableCount < pageSize) { // last page
+                break;
+            }
+        }
+
+        return entities;
+    }
+
+    public String getHiveDatabaseName(String qualifiedName) {
+
+        if (StringUtils.isNotEmpty(qualifiedName)) {
+            String[] split = qualifiedName.split("@");
+            if (split.length > 0) {
+                return split[0];
+            }
+        }
+        return null;
+    }
+
+
+    public String getHiveTableName(String qualifiedName, boolean isTemporary) {
+
+        if (StringUtils.isNotEmpty(qualifiedName)) {
+            String tableName = StringUtils.substringBetween(qualifiedName, ".", "@");
+            if (!isTemporary) {
+                return tableName;
+            } else {
+                if (StringUtils.isNotEmpty(tableName)) {
+                    String[] splitTemp = tableName.split(TEMP_TABLE_PREFIX);
+                    if (splitTemp.length > 0) {
+                        return splitTemp[0];
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    private void deleteByGuid(List<String> guidTodelete) throws AtlasServiceException {
+
+        if (CollectionUtils.isNotEmpty(guidTodelete)) {
+
+            for (String guid : guidTodelete) {
+                EntityMutationResponse response = atlasClientV2.deleteEntityByGuid(guid);
+
+                if (response.getDeletedEntities().size() < 1) {
+                    LOG.info("Entity with guid : {} is not deleted", guid);
+                } else {
+                    LOG.info("Entity with guid : {} is deleted", guid);
+                }
+            }
+        } else {
+            LOG.info("No Entity to delete from Atlas");
+        }
+    }
+
+    public void deleteEntitiesForNonExistingHiveMetadata(boolean failOnError) throws Exception {
+
+        //fetch databases from Atlas
+        List<AtlasEntityHeader> dbs = null;
+        try {
+            dbs = getAllDatabaseInCluster();
+            LOG.info("Total Databases in cluster {} : {} ", metadataNamespace, dbs.size());
+        } catch (AtlasServiceException e) {
+            LOG.error("Failed to retrieve database entities for cluster {} from Atlas", metadataNamespace, e);
+            if (failOnError) {
+                throw e;
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(dbs)) {
+            //iterate all dbs to check if exists in hive
+            for (AtlasEntityHeader db : dbs) {
+
+                String dbGuid     = db.getGuid();
+                String hiveDbName = getHiveDatabaseName((String) db.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+
+                if (StringUtils.isEmpty(hiveDbName)) {
+                    LOG.error("Failed to get database from qualifiedName: {}, guid: {} ", db.getAttribute(ATTRIBUTE_QUALIFIED_NAME), dbGuid);
+                    continue;
+                }
+
+                List<AtlasEntityHeader> tables;
+                try {
+                    tables = getAllTablesInDb(dbGuid);
+                    LOG.info("Total Tables in database {} : {} ", hiveDbName, tables.size());
+                } catch (AtlasServiceException e) {
+                    LOG.error("Failed to retrieve table entities for database {} from Atlas", hiveDbName, e);
+                    if (failOnError) {
+                        throw e;
+                    }
+                    continue;
+                }
+
+                List<String> guidsToDelete = new ArrayList<>();
+                if (!hiveClient.databaseExists(hiveDbName)) {
+
+                    //table guids
+                    if (CollectionUtils.isNotEmpty(tables)) {
+                        for (AtlasEntityHeader table : tables) {
+                            guidsToDelete.add(table.getGuid());
+                        }
+                    }
+
+                    //db guid
+                    guidsToDelete.add(db.getGuid());
+                    LOG.info("Added database {}.{} and its {} tables to delete", metadataNamespace, hiveDbName, tables.size());
+
+                } else {
+                    //iterate all table of db to check if it exists
+                    if (CollectionUtils.isNotEmpty(tables)) {
+                        for (AtlasEntityHeader table : tables) {
+                            String hiveTableName = getHiveTableName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), true);
+
+                            if (StringUtils.isEmpty(hiveTableName)) {
+                                LOG.error("Failed to get table from qualifiedName: {}, guid: {} ", table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), table.getGuid());
+                                continue;
+                            }
+
+                            try {
+                                hiveClient.getTable(hiveDbName, hiveTableName, true);
+                            } catch (InvalidTableException e) { //table doesn't exists
+                                LOG.info("Added table {}.{} to delete", hiveDbName, hiveTableName);
+
+                                guidsToDelete.add(table.getGuid());
+                            } catch (HiveException e) {
+                                LOG.error("Failed to get table {}.{} from Hive", hiveDbName, hiveTableName, e);
+
+                                if (failOnError) {
+                                    throw e;
+                                }
+                            }
+                        }
+                    }
+                }
+
+                //delete entities
+                if (CollectionUtils.isNotEmpty(guidsToDelete)) {
+                    try {
+                        deleteByGuid(guidsToDelete);
+                    } catch (AtlasServiceException e) {
+                        LOG.error("Failed to delete Atlas entities for database {}", hiveDbName, e);
+
+                        if (failOnError) {
+                            throw e;
+                        }
+                    }
+
+                }
+            }
+
+        } else {
+            LOG.info("No database found in service.");
+        }
+
+    }
 }
diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
index 18811f8..7bf5023 100644
--- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -123,6 +123,8 @@ public class AtlasClientV2 extends AtlasBaseClient {
     private static final String RELATIONSHIPS_URI        = BASE_URI + "v2/relationship/";
     private static final String BULK_HEADERS             = "bulk/headers";
     private static final String BULK_SET_CLASSIFICATIONS = "bulk/setClassifications";
+    private static final String RELATIONSHIP_URI         = DISCOVERY_URI + "/relationship";
+
 
     //Glossary APIs
     private static final String GLOSSARY_URI         = BASE_URI + "v2/glossary";
@@ -664,16 +666,22 @@ public class AtlasClientV2 extends AtlasBaseClient {
     }
 
     public AtlasSearchResult basicSearch(String typeName, String classification, String query, boolean excludeDeletedEntities, int limit, int offset) throws AtlasServiceException {
-        MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
-
-        queryParams.add("typeName", typeName);
-        queryParams.add("classification", classification);
-        queryParams.add(QUERY, query);
-        queryParams.add("excludeDeletedEntities", String.valueOf(excludeDeletedEntities));
-        queryParams.add(LIMIT, String.valueOf(limit));
-        queryParams.add(OFFSET, String.valueOf(offset));
+        return this.basicSearch(typeName, null, classification, query, excludeDeletedEntities, limit, offset);
+    }
+
+    public AtlasSearchResult basicSearch(String typeName, SearchParameters.FilterCriteria entityFilters, String classification, String query, boolean excludeDeletedEntities, int limit, int offset) throws AtlasServiceException {
+        SearchParameters parameters = new SearchParameters();
+        parameters.setTypeName(typeName);
+        parameters.setClassification(classification);
+        parameters.setQuery(query);
+        parameters.setExcludeDeletedEntities(excludeDeletedEntities);
+        parameters.setLimit(limit);
+        parameters.setOffset(offset);
+        if (entityFilters != null){
+            parameters.setEntityFilters(entityFilters);
+        }
 
-        return callAPI(API_V2.BASIC_SEARCH, AtlasSearchResult.class, queryParams);
+        return callAPI(API_V2.BASIC_SEARCH, AtlasSearchResult.class, parameters);
     }
 
     public AtlasSearchResult facetedSearch(SearchParameters searchParameters) throws AtlasServiceException {
@@ -1202,7 +1210,7 @@ public class AtlasClientV2 extends AtlasBaseClient {
         // Discovery APIs
         public static final API_V2 DSL_SEARCH                  = new API_V2(DSL_SEARCH_URI, HttpMethod.GET, Response.Status.OK);
         public static final API_V2 FULL_TEXT_SEARCH            = new API_V2(FULL_TEXT_SEARCH_URI, HttpMethod.GET, Response.Status.OK);
-        public static final API_V2 BASIC_SEARCH                = new API_V2(BASIC_SEARCH_URI, HttpMethod.GET, Response.Status.OK);
+        public static final API_V2 BASIC_SEARCH                = new API_V2(BASIC_SEARCH_URI, HttpMethod.POST, Response.Status.OK);
         public static final API_V2 FACETED_SEARCH              = new API_V2(FACETED_SEARCH_URI, HttpMethod.POST, Response.Status.OK);
         public static final API_V2 ATTRIBUTE_SEARCH            = new API_V2(DISCOVERY_URI+ "/attribute", HttpMethod.GET, Response.Status.OK);
         public static final API_V2 RELATIONSHIP_SEARCH         = new API_V2(DISCOVERY_URI+ "/relationship", HttpMethod.GET, Response.Status.OK);