You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/12/10 18:52:48 UTC
[atlas] 01/02: ATLAS-4068: Export/Import: Conditionally Support
Simpultaneous Operations.
This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 9cdc31d5851db61296c7ff07ecb9bc85c26939fa
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Wed Dec 9 14:43:54 2020 -0800
ATLAS-4068: Export/Import: Conditionally Support Simpultaneous Operations.
Signed-off-by: Ashutosh Mestry <am...@cloudera.com>
---
.../apache/atlas/web/resources/AdminResource.java | 79 ++++++++++++++--------
1 file changed, 52 insertions(+), 27 deletions(-)
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index f8c953b..b20b404 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -22,7 +22,6 @@ import com.sun.jersey.multipart.FormDataParam;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.RequestContext;
import org.apache.atlas.authorize.AtlasAdminAccessRequest;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasEntityAccessRequest;
@@ -102,7 +101,6 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -392,7 +390,12 @@ public class AdminResource {
AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_EXPORT), "export");
- acquireExportImportLock("export");
+ boolean preventMultipleRequests = request != null
+ && !(request.getOptions().containsKey(AtlasExportRequest.OPTION_SKIP_LINEAGE)
+ || request.getOptions().containsKey(AtlasExportRequest.OPTION_KEY_REPLICATED_TO));
+ if (preventMultipleRequests) {
+ acquireExportImportLock("export");
+ }
ZipSink exportSink = null;
boolean isSuccessful = false;
@@ -419,22 +422,15 @@ public class AdminResource {
throw new AtlasBaseException(excp);
} finally {
- releaseExportImportLock();
+ if (preventMultipleRequests) {
+ releaseExportImportLock();
+ }
if (exportSink != null) {
exportSink.close();
}
- if (isSuccessful && CollectionUtils.isNotEmpty(result.getRequest().getItemsToExport())) {
-
- Map<String, Object> optionMap = result.getRequest().getOptions();
- optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
- String params = AtlasJson.toJson(optionMap);
-
- List<AtlasObjectId> objectIds = result.getRequest().getItemsToExport();
-
- auditImportExportOperations(objectIds, AuditOperation.EXPORT, params);
- }
+ addToExportOperationAudits(isSuccessful, result);
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.export()");
@@ -454,11 +450,15 @@ public class AdminResource {
AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "importData");
- acquireExportImportLock("import");
AtlasImportResult result = null;
+ boolean preventMultipleRequests = true;
try {
AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
+ preventMultipleRequests = request != null && !request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
+ if (preventMultipleRequests) {
+ acquireExportImportLock("import");
+ }
result = importService.run(inputStream, request, Servlets.getUserName(httpServletRequest),
Servlets.getHostName(httpServletRequest),
@@ -477,20 +477,16 @@ public class AdminResource {
throw new AtlasBaseException(excp);
} finally {
- releaseExportImportLock();
+ if (preventMultipleRequests) {
+ releaseExportImportLock();
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.importData(binary)");
}
}
- List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport();
-
- Map<String, Object> optionMap = new HashMap<>();
- optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
- String params = AtlasJson.toJson(optionMap);
-
- auditImportExportOperations(objectIds, AuditOperation.IMPORT, params);
+ addToImportOperationAudits(result);
return result;
}
@@ -536,13 +532,17 @@ public class AdminResource {
}
AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "importFile");
-
- acquireExportImportLock("importFile");
-
+ boolean preventMultipleRequests = true;
AtlasImportResult result;
try {
AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class);
+ preventMultipleRequests = request != null && request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
+
+ if (preventMultipleRequests) {
+ acquireExportImportLock("importFile");
+ }
+
result = importService.run(request, AtlasAuthorizationUtils.getCurrentUserName(),
Servlets.getHostName(httpServletRequest),
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
@@ -559,7 +559,9 @@ public class AdminResource {
throw new AtlasBaseException(excp);
} finally {
- releaseExportImportLock();
+ if (preventMultipleRequests) {
+ releaseExportImportLock();
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.importFile()");
@@ -770,6 +772,29 @@ public class AdminResource {
importExportOperationLock.lock();
}
+ private void addToImportOperationAudits(AtlasImportResult result) throws AtlasBaseException {
+ List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport();
+
+ Map<String, Object> optionMap = new HashMap<>();
+ optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
+ String params = AtlasJson.toJson(optionMap);
+
+ auditImportExportOperations(objectIds, AuditOperation.IMPORT, params);
+ }
+
+ private void addToExportOperationAudits(boolean isSuccessful, AtlasExportResult result) throws AtlasBaseException {
+ if (isSuccessful && CollectionUtils.isNotEmpty(result.getRequest().getItemsToExport())) {
+
+ Map<String, Object> optionMap = result.getRequest().getOptions();
+ optionMap.put(OPERATION_STATUS, result.getOperationStatus().name());
+ String params = AtlasJson.toJson(optionMap);
+
+ List<AtlasObjectId> objectIds = result.getRequest().getItemsToExport();
+
+ auditImportExportOperations(objectIds, AuditOperation.EXPORT, params);
+ }
+ }
+
private void auditImportExportOperations(List<AtlasObjectId> objectIds, AuditOperation auditOperation, String params) throws AtlasBaseException {
Map<String, Long> entityCountByType = objectIds.stream().collect(Collectors.groupingBy(AtlasObjectId::getTypeName, Collectors.counting()));