You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/12/10 19:07:11 UTC

[atlas] branch branch-2.0 updated (d9f8c88 -> 5d3649c)

This is an automated email from the ASF dual-hosted git repository.

amestry pushed a change to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git.


    from d9f8c88  Atlas-4062: UI:- Classification Daterange Picker Datetime format fix.
     new 97220fd  ATLAS-4068: Export/Import: Conditionally Support Simpultaneous Operations.
     new 5d3649c  ATLAS-4024: Export Service: Export of terms.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../atlas/repository/impexp/ExportService.java     | 43 +++++++++++-
 .../repository/impexp/ExportTypeProcessor.java     | 40 ++++++++++-
 .../apache/atlas/web/resources/AdminResource.java  | 79 ++++++++++++++--------
 3 files changed, 131 insertions(+), 31 deletions(-)


[atlas] 02/02: ATLAS-4024: Export Service: Export of terms.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 5d3649c1b11e495547db22c192c163ff2a0f7044
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Wed Dec 9 21:52:59 2020 -0800

    ATLAS-4024: Export Service: Export of terms.
    
    Signed-off-by: Ashutosh Mestry <am...@cloudera.com>
---
 .../atlas/repository/impexp/ExportService.java     | 43 ++++++++++++++++++++--
 .../repository/impexp/ExportTypeProcessor.java     | 40 +++++++++++++++++++-
 2 files changed, 79 insertions(+), 4 deletions(-)

diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 8af2057..65d7a18 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.impexp;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.glossary.GlossaryService;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.instance.AtlasEntity;
@@ -38,6 +39,7 @@ import org.apache.atlas.repository.util.UniqueList;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.util.AtlasGremlinQueryProvider;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -65,14 +67,17 @@ public class ExportService {
     private final EntityGraphRetriever      entityGraphRetriever;
     private       ExportTypeProcessor       exportTypeProcessor;
     private final HdfsPathEntityCreator     hdfsPathEntityCreator;
+    private final GlossaryService           glossaryService;
 
     @Inject
     public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph graph,
-                         AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) {
+                         AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator,
+                         GlossaryService glossaryService) {
         this.typeRegistry         = typeRegistry;
         this.entityGraphRetriever = new EntityGraphRetriever(graph, this.typeRegistry);
         this.auditsWriter         = auditsWriter;
         this.hdfsPathEntityCreator = hdfsPathEntityCreator;
+        this.glossaryService = glossaryService;
         this.startEntityFetchByExportRequest = new StartEntityFetchByExportRequest(graph, typeRegistry, AtlasGremlinQueryProvider.INSTANCE);
         this.entitiesExtractor = new EntitiesExtractor(graph, typeRegistry);
     }
@@ -84,7 +89,7 @@ public class ExportService {
                 hostName, startTime, getCurrentChangeMarker());
 
         ExportContext context = new ExportContext(result, exportSink);
-        exportTypeProcessor = new ExportTypeProcessor(typeRegistry);
+        exportTypeProcessor = new ExportTypeProcessor(typeRegistry, glossaryService);
 
         try {
             LOG.info("==> export(user={}, from={})", userName, requestingIP);
@@ -264,8 +269,12 @@ public class ExportService {
     }
 
     public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
-        addEntity(entityWithExtInfo, context);
         exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
+        if (MapUtils.isNotEmpty(context.termsGlossary)) {
+            addGlossaryEntities(context);
+        }
+
+        addEntity(entityWithExtInfo, context);
 
         context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
         entitiesExtractor.get(entityWithExtInfo.getEntity(), context);
@@ -280,6 +289,28 @@ public class ExportService {
         }
     }
 
+    private void addGlossaryEntities(ExportContext context) {
+        try {
+            for (String termGuid : context.termsGlossary.keySet()) {
+                try {
+                    String glossaryGuid = context.termsGlossary.get(termGuid);
+                    if (!context.sink.hasEntity(glossaryGuid)) {
+                        AtlasEntity glossary = entityGraphRetriever.toAtlasEntity(glossaryGuid);
+                        addEntity(new AtlasEntityWithExtInfo(glossary), context);
+                    }
+
+                    if (!context.sink.hasEntity(termGuid)) {
+                        AtlasEntity term = entityGraphRetriever.toAtlasEntity(termGuid);
+                        addEntity(new AtlasEntityWithExtInfo(term), context);
+                    }
+                } catch (AtlasBaseException exception) {
+                    LOG.error("Error fetching Glossary for term: {}", termGuid);
+                }
+            }
+        } finally {
+            context.clearTerms();
+        }
+    }
 
     private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
         if(context.sink.hasEntity(entityWithExtInfo.getEntity().getGuid())) {
@@ -355,6 +386,8 @@ public class ExportService {
         final Set<String>                     enumTypes           = new HashSet<>();
         final Set<String>                     relationshipTypes   = new HashSet<>();
         final Set<String>                     businessMetadataTypes = new HashSet<>();
+        final Map<String, String>             termsGlossary      = new HashMap<>();
+
         final AtlasExportResult               result;
         private final ZipSink                 sink;
 
@@ -471,5 +504,9 @@ public class ExportService {
         public void addToEntityCreationOrder(String guid) {
             entityCreationOrder.add(guid);
         }
+
+        public void clearTerms() {
+            termsGlossary.clear();
+        }
     }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java
index a85db5c..186b4b0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java
@@ -19,9 +19,13 @@
 package org.apache.atlas.repository.impexp;
 
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.glossary.GlossaryService;
 import org.apache.atlas.model.TypeCategory;
+import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.typedef.AtlasStructDef;
 import org.apache.atlas.type.AtlasArrayType;
 import org.apache.atlas.type.AtlasBusinessMetadataType;
@@ -37,15 +41,21 @@ import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 class ExportTypeProcessor {
     private static final Logger LOG = LoggerFactory.getLogger(ExportTypeProcessor.class);
+    private static final String RELATIONSHIP_ATTR_MEANINGS = "meanings";
 
     private AtlasTypeRegistry typeRegistry;
+    private GlossaryService glossaryService;
 
-    ExportTypeProcessor(AtlasTypeRegistry typeRegistry) {
+    ExportTypeProcessor(AtlasTypeRegistry typeRegistry, GlossaryService glossaryService) {
         this.typeRegistry = typeRegistry;
+        this.glossaryService = glossaryService;
     }
 
     public void addTypes(AtlasEntity entity, ExportService.ExportContext context) {
@@ -56,6 +66,34 @@ class ExportTypeProcessor {
                 addClassificationType(c.getTypeName(), context);
             }
         }
+
+        addTerms(entity, context);
+    }
+
+    private void addTerms(AtlasEntity entity, ExportService.ExportContext context) {
+        Object relAttrMeanings = entity.getRelationshipAttribute(RELATIONSHIP_ATTR_MEANINGS);
+        if (relAttrMeanings == null || !(relAttrMeanings instanceof List)) {
+            return;
+        }
+
+        List list = (List) relAttrMeanings;
+        if (CollectionUtils.isEmpty(list)) {
+            return;
+        }
+
+        for (Object objectId : list) {
+            if (objectId instanceof AtlasRelatedObjectId) {
+                AtlasRelatedObjectId termObjectId = (AtlasRelatedObjectId) objectId;
+
+                try {
+                    AtlasGlossaryTerm term = glossaryService.getTerm(termObjectId.getGuid());
+                    context.termsGlossary.put(termObjectId.getGuid(), term.getAnchor().getGlossaryGuid());
+                }
+                catch (AtlasBaseException e) {
+                    LOG.warn("Error fetching term details: {}", termObjectId);
+                }
+            }
+        }
     }
 
     private void addType(String typeName, ExportService.ExportContext context) {


[atlas] 01/02: ATLAS-4068: Export/Import: Conditionally Support Simpultaneous Operations.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 97220fdb7dfc98971f87a81d577cbb66d00e04f7
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Wed Dec 9 14:43:54 2020 -0800

    ATLAS-4068: Export/Import: Conditionally Support Simpultaneous Operations.
    
    Signed-off-by: Ashutosh Mestry <am...@cloudera.com>
---
 .../apache/atlas/web/resources/AdminResource.java  | 79 ++++++++++++++--------
 1 file changed, 52 insertions(+), 27 deletions(-)

diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index f8c953b..b20b404 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -22,7 +22,6 @@ import com.sun.jersey.multipart.FormDataParam;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.RequestContext;
 import org.apache.atlas.authorize.AtlasAdminAccessRequest;
 import org.apache.atlas.authorize.AtlasAuthorizationUtils;
 import org.apache.atlas.authorize.AtlasEntityAccessRequest;
@@ -102,7 +101,6 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -392,7 +390,12 @@ public class AdminResource {
 
         AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_EXPORT), "export");
 
-        acquireExportImportLock("export");
+        boolean preventMultipleRequests = request != null
+                && !(request.getOptions().containsKey(AtlasExportRequest.OPTION_SKIP_LINEAGE)
+                     || request.getOptions().containsKey(AtlasExportRequest.OPTION_KEY_REPLICATED_TO));
+        if (preventMultipleRequests) {
+            acquireExportImportLock("export");
+        }
 
         ZipSink exportSink = null;
         boolean isSuccessful = false;
@@ -419,22 +422,15 @@ public class AdminResource {
 
             throw new AtlasBaseException(excp);
         } finally {
-            releaseExportImportLock();
+            if (preventMultipleRequests) {
+                releaseExportImportLock();
+            }
 
             if (exportSink != null) {
                 exportSink.close();
             }
 
-            if (isSuccessful && CollectionUtils.isNotEmpty(result.getRequest().getItemsToExport())) {
-
-                Map<String, Object> optionMap = result.getRequest().getOptions();
-                optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
-                String params = AtlasJson.toJson(optionMap);
-
-                List<AtlasObjectId> objectIds = result.getRequest().getItemsToExport();
-
-                auditImportExportOperations(objectIds, AuditOperation.EXPORT, params);
-            }
+            addToExportOperationAudits(isSuccessful, result);
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("<== AdminResource.export()");
@@ -454,11 +450,15 @@ public class AdminResource {
 
         AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "importData");
 
-        acquireExportImportLock("import");
         AtlasImportResult result = null;
+        boolean preventMultipleRequests = true;
 
         try {
             AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
+            preventMultipleRequests = request != null && !request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
+            if (preventMultipleRequests) {
+                acquireExportImportLock("import");
+            }
 
             result = importService.run(inputStream, request, Servlets.getUserName(httpServletRequest),
                     Servlets.getHostName(httpServletRequest),
@@ -477,20 +477,16 @@ public class AdminResource {
 
             throw new AtlasBaseException(excp);
         } finally {
-            releaseExportImportLock();
+            if (preventMultipleRequests) {
+                releaseExportImportLock();
+            }
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("<== AdminResource.importData(binary)");
             }
         }
 
-        List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport();
-
-        Map<String, Object> optionMap = new HashMap<>();
-        optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
-        String params = AtlasJson.toJson(optionMap);
-
-        auditImportExportOperations(objectIds, AuditOperation.IMPORT, params);
+        addToImportOperationAudits(result);
 
         return result;
     }
@@ -536,13 +532,17 @@ public class AdminResource {
         }
 
         AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "importFile");
-
-        acquireExportImportLock("importFile");
-
+        boolean preventMultipleRequests = true;
         AtlasImportResult result;
 
         try {
             AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
+            preventMultipleRequests = request != null && request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
+
+            if (preventMultipleRequests) {
+                acquireExportImportLock("importFile");
+            }
+
             result = importService.run(request, AtlasAuthorizationUtils.getCurrentUserName(),
                                        Servlets.getHostName(httpServletRequest),
                                        AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
@@ -559,7 +559,9 @@ public class AdminResource {
 
             throw new AtlasBaseException(excp);
         } finally {
-            releaseExportImportLock();
+            if (preventMultipleRequests) {
+                releaseExportImportLock();
+            }
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("<== AdminResource.importFile()");
@@ -770,6 +772,29 @@ public class AdminResource {
         importExportOperationLock.lock();
     }
 
+    private void addToImportOperationAudits(AtlasImportResult result) throws AtlasBaseException {
+        List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport();
+
+        Map<String, Object> optionMap = new HashMap<>();
+        optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
+        String params = AtlasJson.toJson(optionMap);
+
+        auditImportExportOperations(objectIds, AuditOperation.IMPORT, params);
+    }
+
+    private void addToExportOperationAudits(boolean isSuccessful, AtlasExportResult result) throws AtlasBaseException {
+        if (isSuccessful && CollectionUtils.isNotEmpty(result.getRequest().getItemsToExport())) {
+
+            Map<String, Object> optionMap = result.getRequest().getOptions();
+            optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
+            String params = AtlasJson.toJson(optionMap);
+
+            List<AtlasObjectId> objectIds = result.getRequest().getItemsToExport();
+
+            auditImportExportOperations(objectIds, AuditOperation.EXPORT, params);
+        }
+    }
+
     private void auditImportExportOperations(List<AtlasObjectId> objectIds, AuditOperation auditOperation, String params) throws AtlasBaseException {
 
         Map<String, Long> entityCountByType = objectIds.stream().collect(Collectors.groupingBy(AtlasObjectId::getTypeName, Collectors.counting()));