You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/12/10 18:52:48 UTC

[atlas] 01/02: ATLAS-4068: Export/Import: Conditionally Support Simpultaneous Operations.

This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 9cdc31d5851db61296c7ff07ecb9bc85c26939fa
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Wed Dec 9 14:43:54 2020 -0800

    ATLAS-4068: Export/Import: Conditionally Support Simpultaneous Operations.
    
    Signed-off-by: Ashutosh Mestry <am...@cloudera.com>
---
 .../apache/atlas/web/resources/AdminResource.java  | 79 ++++++++++++++--------
 1 file changed, 52 insertions(+), 27 deletions(-)

diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index f8c953b..b20b404 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -22,7 +22,6 @@ import com.sun.jersey.multipart.FormDataParam;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.RequestContext;
 import org.apache.atlas.authorize.AtlasAdminAccessRequest;
 import org.apache.atlas.authorize.AtlasAuthorizationUtils;
 import org.apache.atlas.authorize.AtlasEntityAccessRequest;
@@ -102,7 +101,6 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -392,7 +390,12 @@ public class AdminResource {
 
         AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_EXPORT), "export");
 
-        acquireExportImportLock("export");
+        boolean preventMultipleRequests = request != null
+                && !(request.getOptions().containsKey(AtlasExportRequest.OPTION_SKIP_LINEAGE)
+                     || request.getOptions().containsKey(AtlasExportRequest.OPTION_KEY_REPLICATED_TO));
+        if (preventMultipleRequests) {
+            acquireExportImportLock("export");
+        }
 
         ZipSink exportSink = null;
         boolean isSuccessful = false;
@@ -419,22 +422,15 @@ public class AdminResource {
 
             throw new AtlasBaseException(excp);
         } finally {
-            releaseExportImportLock();
+            if (preventMultipleRequests) {
+                releaseExportImportLock();
+            }
 
             if (exportSink != null) {
                 exportSink.close();
             }
 
-            if (isSuccessful && CollectionUtils.isNotEmpty(result.getRequest().getItemsToExport())) {
-
-                Map<String, Object> optionMap = result.getRequest().getOptions();
-                optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
-                String params = AtlasJson.toJson(optionMap);
-
-                List<AtlasObjectId> objectIds = result.getRequest().getItemsToExport();
-
-                auditImportExportOperations(objectIds, AuditOperation.EXPORT, params);
-            }
+            addToExportOperationAudits(isSuccessful, result);
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("<== AdminResource.export()");
@@ -454,11 +450,15 @@ public class AdminResource {
 
         AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "importData");
 
-        acquireExportImportLock("import");
         AtlasImportResult result = null;
+        boolean preventMultipleRequests = true;
 
         try {
             AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
+            preventMultipleRequests = request != null && !request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
+            if (preventMultipleRequests) {
+                acquireExportImportLock("import");
+            }
 
             result = importService.run(inputStream, request, Servlets.getUserName(httpServletRequest),
                     Servlets.getHostName(httpServletRequest),
@@ -477,20 +477,16 @@ public class AdminResource {
 
             throw new AtlasBaseException(excp);
         } finally {
-            releaseExportImportLock();
+            if (preventMultipleRequests) {
+                releaseExportImportLock();
+            }
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("<== AdminResource.importData(binary)");
             }
         }
 
-        List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport();
-
-        Map<String, Object> optionMap = new HashMap<>();
-        optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
-        String params = AtlasJson.toJson(optionMap);
-
-        auditImportExportOperations(objectIds, AuditOperation.IMPORT, params);
+        addToImportOperationAudits(result);
 
         return result;
     }
@@ -536,13 +532,17 @@ public class AdminResource {
         }
 
         AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "importFile");
-
-        acquireExportImportLock("importFile");
-
+        boolean preventMultipleRequests = true;
         AtlasImportResult result;
 
         try {
             AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
+            preventMultipleRequests = request != null && request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
+
+            if (preventMultipleRequests) {
+                acquireExportImportLock("importFile");
+            }
+
             result = importService.run(request, AtlasAuthorizationUtils.getCurrentUserName(),
                                        Servlets.getHostName(httpServletRequest),
                                        AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
@@ -559,7 +559,9 @@ public class AdminResource {
 
             throw new AtlasBaseException(excp);
         } finally {
-            releaseExportImportLock();
+            if (preventMultipleRequests) {
+                releaseExportImportLock();
+            }
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("<== AdminResource.importFile()");
@@ -770,6 +772,29 @@ public class AdminResource {
         importExportOperationLock.lock();
     }
 
+    private void addToImportOperationAudits(AtlasImportResult result) throws AtlasBaseException {
+        List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport();
+
+        Map<String, Object> optionMap = new HashMap<>();
+        optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
+        String params = AtlasJson.toJson(optionMap);
+
+        auditImportExportOperations(objectIds, AuditOperation.IMPORT, params);
+    }
+
+    private void addToExportOperationAudits(boolean isSuccessful, AtlasExportResult result) throws AtlasBaseException {
+        if (isSuccessful && CollectionUtils.isNotEmpty(result.getRequest().getItemsToExport())) {
+
+            Map<String, Object> optionMap = result.getRequest().getOptions();
+            optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
+            String params = AtlasJson.toJson(optionMap);
+
+            List<AtlasObjectId> objectIds = result.getRequest().getItemsToExport();
+
+            auditImportExportOperations(objectIds, AuditOperation.EXPORT, params);
+        }
+    }
+
     private void auditImportExportOperations(List<AtlasObjectId> objectIds, AuditOperation auditOperation, String params) throws AtlasBaseException {
 
         Map<String, Long> entityCountByType = objectIds.stream().collect(Collectors.groupingBy(AtlasObjectId::getTypeName, Collectors.counting()));