You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/12/10 18:52:47 UTC

[atlas] branch master updated (ff55b0a -> 31125c5)

This is an automated email from the ASF dual-hosted git repository.

amestry pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git.


    from ff55b0a  Atlas-4062: UI:- Classification Daterange Picker Datetime format fix.
     new 9cdc31d  ATLAS-4068: Export/Import: Conditionally Support Simpultaneous Operations.
     new 31125c5  ATLAS-4024: Export Service: Export of terms.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../atlas/repository/impexp/ExportService.java     | 43 +++++++++++-
 .../repository/impexp/ExportTypeProcessor.java     | 40 ++++++++++-
 .../apache/atlas/web/resources/AdminResource.java  | 79 ++++++++++++++--------
 3 files changed, 131 insertions(+), 31 deletions(-)


[atlas] 01/02: ATLAS-4068: Export/Import: Conditionally Support Simpultaneous Operations.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 9cdc31d5851db61296c7ff07ecb9bc85c26939fa
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Wed Dec 9 14:43:54 2020 -0800

    ATLAS-4068: Export/Import: Conditionally Support Simpultaneous Operations.
    
    Signed-off-by: Ashutosh Mestry <am...@cloudera.com>
---
 .../apache/atlas/web/resources/AdminResource.java  | 79 ++++++++++++++--------
 1 file changed, 52 insertions(+), 27 deletions(-)

diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index f8c953b..b20b404 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -22,7 +22,6 @@ import com.sun.jersey.multipart.FormDataParam;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.RequestContext;
 import org.apache.atlas.authorize.AtlasAdminAccessRequest;
 import org.apache.atlas.authorize.AtlasAuthorizationUtils;
 import org.apache.atlas.authorize.AtlasEntityAccessRequest;
@@ -102,7 +101,6 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -392,7 +390,12 @@ public class AdminResource {
 
         AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_EXPORT), "export");
 
-        acquireExportImportLock("export");
+        boolean preventMultipleRequests = request != null
+                && !(request.getOptions().containsKey(AtlasExportRequest.OPTION_SKIP_LINEAGE)
+                     || request.getOptions().containsKey(AtlasExportRequest.OPTION_KEY_REPLICATED_TO));
+        if (preventMultipleRequests) {
+            acquireExportImportLock("export");
+        }
 
         ZipSink exportSink = null;
         boolean isSuccessful = false;
@@ -419,22 +422,15 @@ public class AdminResource {
 
             throw new AtlasBaseException(excp);
         } finally {
-            releaseExportImportLock();
+            if (preventMultipleRequests) {
+                releaseExportImportLock();
+            }
 
             if (exportSink != null) {
                 exportSink.close();
             }
 
-            if (isSuccessful && CollectionUtils.isNotEmpty(result.getRequest().getItemsToExport())) {
-
-                Map<String, Object> optionMap = result.getRequest().getOptions();
-                optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
-                String params = AtlasJson.toJson(optionMap);
-
-                List<AtlasObjectId> objectIds = result.getRequest().getItemsToExport();
-
-                auditImportExportOperations(objectIds, AuditOperation.EXPORT, params);
-            }
+            addToExportOperationAudits(isSuccessful, result);
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("<== AdminResource.export()");
@@ -454,11 +450,15 @@ public class AdminResource {
 
         AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "importData");
 
-        acquireExportImportLock("import");
         AtlasImportResult result = null;
+        boolean preventMultipleRequests = true;
 
         try {
             AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
+            preventMultipleRequests = request != null && !request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
+            if (preventMultipleRequests) {
+                acquireExportImportLock("import");
+            }
 
             result = importService.run(inputStream, request, Servlets.getUserName(httpServletRequest),
                     Servlets.getHostName(httpServletRequest),
@@ -477,20 +477,16 @@ public class AdminResource {
 
             throw new AtlasBaseException(excp);
         } finally {
-            releaseExportImportLock();
+            if (preventMultipleRequests) {
+                releaseExportImportLock();
+            }
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("<== AdminResource.importData(binary)");
             }
         }
 
-        List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport();
-
-        Map<String, Object> optionMap = new HashMap<>();
-        optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
-        String params = AtlasJson.toJson(optionMap);
-
-        auditImportExportOperations(objectIds, AuditOperation.IMPORT, params);
+        addToImportOperationAudits(result);
 
         return result;
     }
@@ -536,13 +532,17 @@ public class AdminResource {
         }
 
         AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "importFile");
-
-        acquireExportImportLock("importFile");
-
+        boolean preventMultipleRequests = true;
         AtlasImportResult result;
 
         try {
             AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
+            preventMultipleRequests = request != null && request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
+
+            if (preventMultipleRequests) {
+                acquireExportImportLock("importFile");
+            }
+
             result = importService.run(request, AtlasAuthorizationUtils.getCurrentUserName(),
                                        Servlets.getHostName(httpServletRequest),
                                        AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
@@ -559,7 +559,9 @@ public class AdminResource {
 
             throw new AtlasBaseException(excp);
         } finally {
-            releaseExportImportLock();
+            if (preventMultipleRequests) {
+                releaseExportImportLock();
+            }
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("<== AdminResource.importFile()");
@@ -770,6 +772,29 @@ public class AdminResource {
         importExportOperationLock.lock();
     }
 
+    private void addToImportOperationAudits(AtlasImportResult result) throws AtlasBaseException {
+        List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport();
+
+        Map<String, Object> optionMap = new HashMap<>();
+        optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
+        String params = AtlasJson.toJson(optionMap);
+
+        auditImportExportOperations(objectIds, AuditOperation.IMPORT, params);
+    }
+
+    private void addToExportOperationAudits(boolean isSuccessful, AtlasExportResult result) throws AtlasBaseException {
+        if (isSuccessful && CollectionUtils.isNotEmpty(result.getRequest().getItemsToExport())) {
+
+            Map<String, Object> optionMap = result.getRequest().getOptions();
+            optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
+            String params = AtlasJson.toJson(optionMap);
+
+            List<AtlasObjectId> objectIds = result.getRequest().getItemsToExport();
+
+            auditImportExportOperations(objectIds, AuditOperation.EXPORT, params);
+        }
+    }
+
     private void auditImportExportOperations(List<AtlasObjectId> objectIds, AuditOperation auditOperation, String params) throws AtlasBaseException {
 
         Map<String, Long> entityCountByType = objectIds.stream().collect(Collectors.groupingBy(AtlasObjectId::getTypeName, Collectors.counting()));


[atlas] 02/02: ATLAS-4024: Export Service: Export of terms.

Posted by am...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 31125c58c8a6d2edf0bd7de4e31c44e18adc9a7f
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Wed Dec 9 21:52:59 2020 -0800

    ATLAS-4024: Export Service: Export of terms.
    
    Signed-off-by: Ashutosh Mestry <am...@cloudera.com>
---
 .../atlas/repository/impexp/ExportService.java     | 43 ++++++++++++++++++++--
 .../repository/impexp/ExportTypeProcessor.java     | 40 +++++++++++++++++++-
 2 files changed, 79 insertions(+), 4 deletions(-)

diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 8af2057..65d7a18 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.impexp;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.glossary.GlossaryService;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.instance.AtlasEntity;
@@ -38,6 +39,7 @@ import org.apache.atlas.repository.util.UniqueList;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.util.AtlasGremlinQueryProvider;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -65,14 +67,17 @@ public class ExportService {
     private final EntityGraphRetriever      entityGraphRetriever;
     private       ExportTypeProcessor       exportTypeProcessor;
     private final HdfsPathEntityCreator     hdfsPathEntityCreator;
+    private final GlossaryService           glossaryService;
 
     @Inject
     public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph graph,
-                         AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) {
+                         AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator,
+                         GlossaryService glossaryService) {
         this.typeRegistry         = typeRegistry;
         this.entityGraphRetriever = new EntityGraphRetriever(graph, this.typeRegistry);
         this.auditsWriter         = auditsWriter;
         this.hdfsPathEntityCreator = hdfsPathEntityCreator;
+        this.glossaryService = glossaryService;
         this.startEntityFetchByExportRequest = new StartEntityFetchByExportRequest(graph, typeRegistry, AtlasGremlinQueryProvider.INSTANCE);
         this.entitiesExtractor = new EntitiesExtractor(graph, typeRegistry);
     }
@@ -84,7 +89,7 @@ public class ExportService {
                 hostName, startTime, getCurrentChangeMarker());
 
         ExportContext context = new ExportContext(result, exportSink);
-        exportTypeProcessor = new ExportTypeProcessor(typeRegistry);
+        exportTypeProcessor = new ExportTypeProcessor(typeRegistry, glossaryService);
 
         try {
             LOG.info("==> export(user={}, from={})", userName, requestingIP);
@@ -264,8 +269,12 @@ public class ExportService {
     }
 
     public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
-        addEntity(entityWithExtInfo, context);
         exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
+        if (MapUtils.isNotEmpty(context.termsGlossary)) {
+            addGlossaryEntities(context);
+        }
+
+        addEntity(entityWithExtInfo, context);
 
         context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
         entitiesExtractor.get(entityWithExtInfo.getEntity(), context);
@@ -280,6 +289,28 @@ public class ExportService {
         }
     }
 
+    private void addGlossaryEntities(ExportContext context) {
+        try {
+            for (String termGuid : context.termsGlossary.keySet()) {
+                try {
+                    String glossaryGuid = context.termsGlossary.get(termGuid);
+                    if (!context.sink.hasEntity(glossaryGuid)) {
+                        AtlasEntity glossary = entityGraphRetriever.toAtlasEntity(glossaryGuid);
+                        addEntity(new AtlasEntityWithExtInfo(glossary), context);
+                    }
+
+                    if (!context.sink.hasEntity(termGuid)) {
+                        AtlasEntity term = entityGraphRetriever.toAtlasEntity(termGuid);
+                        addEntity(new AtlasEntityWithExtInfo(term), context);
+                    }
+                } catch (AtlasBaseException exception) {
+                    LOG.error("Error fetching Glossary for term: {}", termGuid);
+                }
+            }
+        } finally {
+            context.clearTerms();
+        }
+    }
 
     private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
         if(context.sink.hasEntity(entityWithExtInfo.getEntity().getGuid())) {
@@ -355,6 +386,8 @@ public class ExportService {
         final Set<String>                     enumTypes           = new HashSet<>();
         final Set<String>                     relationshipTypes   = new HashSet<>();
         final Set<String>                     businessMetadataTypes = new HashSet<>();
+        final Map<String, String>             termsGlossary      = new HashMap<>();
+
         final AtlasExportResult               result;
         private final ZipSink                 sink;
 
@@ -471,5 +504,9 @@ public class ExportService {
         public void addToEntityCreationOrder(String guid) {
             entityCreationOrder.add(guid);
         }
+
+        public void clearTerms() {
+            termsGlossary.clear();
+        }
     }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java
index a85db5c..186b4b0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java
@@ -19,9 +19,13 @@
 package org.apache.atlas.repository.impexp;
 
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.glossary.GlossaryService;
 import org.apache.atlas.model.TypeCategory;
+import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.typedef.AtlasStructDef;
 import org.apache.atlas.type.AtlasArrayType;
 import org.apache.atlas.type.AtlasBusinessMetadataType;
@@ -37,15 +41,21 @@ import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 class ExportTypeProcessor {
     private static final Logger LOG = LoggerFactory.getLogger(ExportTypeProcessor.class);
+    private static final String RELATIONSHIP_ATTR_MEANINGS = "meanings";
 
     private AtlasTypeRegistry typeRegistry;
+    private GlossaryService glossaryService;
 
-    ExportTypeProcessor(AtlasTypeRegistry typeRegistry) {
+    ExportTypeProcessor(AtlasTypeRegistry typeRegistry, GlossaryService glossaryService) {
         this.typeRegistry = typeRegistry;
+        this.glossaryService = glossaryService;
     }
 
     public void addTypes(AtlasEntity entity, ExportService.ExportContext context) {
@@ -56,6 +66,34 @@ class ExportTypeProcessor {
                 addClassificationType(c.getTypeName(), context);
             }
         }
+
+        addTerms(entity, context);
+    }
+
+    private void addTerms(AtlasEntity entity, ExportService.ExportContext context) {
+        Object relAttrMeanings = entity.getRelationshipAttribute(RELATIONSHIP_ATTR_MEANINGS);
+        if (relAttrMeanings == null || !(relAttrMeanings instanceof List)) {
+            return;
+        }
+
+        List list = (List) relAttrMeanings;
+        if (CollectionUtils.isEmpty(list)) {
+            return;
+        }
+
+        for (Object objectId : list) {
+            if (objectId instanceof AtlasRelatedObjectId) {
+                AtlasRelatedObjectId termObjectId = (AtlasRelatedObjectId) objectId;
+
+                try {
+                    AtlasGlossaryTerm term = glossaryService.getTerm(termObjectId.getGuid());
+                    context.termsGlossary.put(termObjectId.getGuid(), term.getAnchor().getGlossaryGuid());
+                }
+                catch (AtlasBaseException e) {
+                    LOG.warn("Error fetching term details: {}", termObjectId);
+                }
+            }
+        }
     }
 
     private void addType(String typeName, ExportService.ExportContext context) {