You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2019/09/20 14:12:36 UTC
[atlas] branch master updated: ATLAS-3416 Import API: delete
non-exported hive_table for table level replication
This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 1c781de ATLAS-3416 Import API: delete non-exported hive_table for table level replication
1c781de is described below
commit 1c781deb40c23c79c2cf201b70b79a34b0b2acbe
Author: nikhilbonte <ni...@freestoneinfotech.com>
AuthorDate: Thu Aug 29 12:47:20 2019 +0530
ATLAS-3416 Import API: delete non-exported hive_table for table level replication
Signed-off-by: nixonrodrigues <ni...@apache.org>
---
.../atlas/model/impexp/ExportImportAuditEntry.java | 1 +
.../atlas/repository/impexp/AuditsWriter.java | 22 +++
.../atlas/repository/impexp/ImportService.java | 40 ++++-
.../impexp/TableReplicationRequestProcessor.java | 181 +++++++++++++++++++++
.../atlas/repository/impexp/ImportServiceTest.java | 83 +++++++++-
.../TableReplicationRequestProcessorTest.java | 146 +++++++++++++++++
6 files changed, 461 insertions(+), 12 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java b/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java
index a199c6e..90f4296 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java
@@ -35,6 +35,7 @@ public class ExportImportAuditEntry extends AtlasBaseModelObject implements Seri
private static final long serialVersionUID = 1L;
public static final String OPERATION_EXPORT = "EXPORT";
public static final String OPERATION_IMPORT = "IMPORT";
+ public static final String OPERATION_IMPORT_DELETE_REPL = "IMPORT_DELETE_REPL";
private String userName;
private String operation;
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index f2d36ed..55990f7 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -45,6 +45,7 @@ import org.springframework.util.CollectionUtils;
import javax.inject.Inject;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
@Component
public class AuditsWriter {
@@ -68,6 +69,10 @@ public class AuditsWriter {
this.auditService = auditService;
}
+ public AtlasServerService getAtlasServerService() {
+ return atlasServerService;
+ }
+
public void write(String userName, AtlasExportResult result,
long startTime, long endTime,
List<String> entityCreationOrder) throws AtlasBaseException {
@@ -80,6 +85,12 @@ public class AuditsWriter {
auditForImport.add(userName, result, startTime, endTime, entityCreationOrder);
}
+ public void write(String userName, String sourceCluster,
+ long startTime, long endTime,
+ Set<String> entityCreationOrder) throws AtlasBaseException {
+ auditForImport.add(userName, sourceCluster, startTime, endTime, entityCreationOrder);
+ }
+
private void updateReplicationAttribute(boolean isReplicationSet,
String serverName, String serverFullName,
List<String> exportedGuids,
@@ -238,5 +249,16 @@ public class AuditsWriter {
updateReplicationAttribute(replicationOptionState, sourceServerName, sourceServerFullName, entityGuids,
Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker());
}
+
+ public void add(String userName, String sourceCluster, long startTime,
+ long endTime, Set<String> entityGuids) throws AtlasBaseException {
+
+ sourceServerName = getServerNameFromFullName(sourceCluster);
+ auditService.add(userName,
+ sourceServerName, getCurrentClusterName(),
+ ExportImportAuditEntry.OPERATION_IMPORT_DELETE_REPL,
+ AtlasType.toJson(entityGuids), startTime, endTime, !entityGuids.isEmpty());
+
+ }
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index df49ae1..27001e3 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -24,8 +24,10 @@ import org.apache.atlas.RequestContext;
import org.apache.atlas.entitytransform.BaseEntityHandler;
import org.apache.atlas.entitytransform.TransformerContext;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.BulkImporter;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
@@ -54,24 +56,28 @@ import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMS_KEY;
public class ImportService {
private static final Logger LOG = LoggerFactory.getLogger(ImportService.class);
+ private static final String ATLAS_TYPE_HIVE_TABLE = "hive_table";
private final AtlasTypeDefStore typeDefStore;
private final AtlasTypeRegistry typeRegistry;
private final BulkImporter bulkImporter;
private final AuditsWriter auditsWriter;
private final ImportTransformsShaper importTransformsShaper;
+ private TableReplicationRequestProcessor tableReplicationRequestProcessor;
+
private long startTimestamp;
private long endTimestamp;
@Inject
public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, BulkImporter bulkImporter,
- AuditsWriter auditsWriter,
- ImportTransformsShaper importTransformsShaper) {
+ AuditsWriter auditsWriter, ImportTransformsShaper importTransformsShaper,
+ TableReplicationRequestProcessor tableReplicationRequestProcessor) {
this.typeDefStore = typeDefStore;
this.typeRegistry = typeRegistry;
this.bulkImporter = bulkImporter;
this.auditsWriter = auditsWriter;
this.importTransformsShaper = importTransformsShaper;
+ this.tableReplicationRequestProcessor = tableReplicationRequestProcessor;
}
public AtlasImportResult run(InputStream inputStream, String userName,
@@ -109,7 +115,11 @@ public class ImportService {
startTimestamp = System.currentTimeMillis();
processTypes(source.getTypesDef(), result);
setStartPosition(request, source);
+
processEntities(userName, source, result);
+
+ processReplicationDeletion(source.getExportResult().getRequest(), request);
+
} catch (AtlasBaseException excp) {
LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp);
@@ -228,6 +238,12 @@ public class ImportService {
auditsWriter.write(userName, result, startTimestamp, endTimestamp, importSource.getCreationOrder());
}
+ private void processReplicationDeletion(AtlasExportRequest exportRequest, AtlasImportRequest importRequest) throws AtlasBaseException {
+ if (checkHiveTableIncrementalSkipLineage(importRequest, exportRequest)) {
+ tableReplicationRequestProcessor.process(exportRequest, importRequest);
+ }
+ }
+
private int getDuration(long endTime, long startTime) {
return (int) (endTime - startTime);
}
@@ -239,9 +255,25 @@ public class ImportService {
}
return new ZipSourceWithBackingDirectory(inputStream, configuredTemporaryDirectory);
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new AtlasBaseException(ex);
}
}
+
+ @VisibleForTesting
+ boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest importRequest, AtlasExportRequest exportRequest) {
+ if (CollectionUtils.isEmpty(exportRequest.getItemsToExport())) {
+ return false;
+ }
+
+ for (AtlasObjectId itemToExport : exportRequest.getItemsToExport()) {
+ if (!itemToExport.getTypeName().equalsIgnoreCase(ATLAS_TYPE_HIVE_TABLE)){
+ return false;
+ }
+ }
+
+ return importRequest.isReplicationOptionSet() && exportRequest.isReplicationOptionSet() &&
+ exportRequest.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL) &&
+ exportRequest.getSkipLineageOptionValue();
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java
new file mode 100644
index 0000000..d5807a5
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.authorize.AtlasAuthorizationUtils;
+import org.apache.atlas.discovery.AtlasDiscoveryService;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+@Component
+public class TableReplicationRequestProcessor {
+ private static final Logger LOG = LoggerFactory.getLogger(TableReplicationRequestProcessor.class);
+
+ private static final String QUERY_DB_NAME_EQUALS= "where db.name='%s'";
+ private static final String ATTR_NAME_KEY = "name";
+ private static final String TYPE_HIVE_TABLE = "hive_table";
+ private static final String ATTR_QUALIFIED_NAME_KEY = "qualifiedName";
+ private static final String REPLICATED_TAG_NAME = "%s_replicated";
+
+ private long startTstamp;
+ private long endTstamp;
+ private AuditsWriter auditsWriter;
+ private AtlasEntityStore entityStore;
+ private AtlasTypeRegistry typeRegistry;
+ private AtlasDiscoveryService discoveryService;
+
+ @Inject
+ public TableReplicationRequestProcessor(AuditsWriter auditsWriter, AtlasEntityStore entityStore,
+ AtlasDiscoveryService atlasDiscoveryService, AtlasTypeRegistry typeRegistry) {
+ this.auditsWriter = auditsWriter;
+ this.entityStore = entityStore;
+ this.typeRegistry = typeRegistry;
+ this.discoveryService = atlasDiscoveryService;
+ }
+
+ public void process(AtlasExportRequest exportRequest, AtlasImportRequest importRequest) throws AtlasBaseException {
+ startTstamp = System.currentTimeMillis();
+ LOG.info("process: deleting entities with type hive_table which are not imported.");
+ String sourceCluster = importRequest.getOptionKeyReplicatedFrom();
+
+ List<String> qualifiedNames = getQualifiedNamesFromRequest(exportRequest);
+
+ List<String> safeGUIDs = getEntitiesFromQualifiedNames(qualifiedNames);
+
+ String dbName = getDbName(safeGUIDs.get(0));
+
+ Set<String> guidsToDelete = getGuidsToDelete(dbName, safeGUIDs, sourceCluster);
+
+ deleteTables(sourceCluster, guidsToDelete);
+ }
+
+ private List<String> getQualifiedNamesFromRequest(AtlasExportRequest exportRequest){
+ List<String> qualifiedNames = new ArrayList<>();
+
+ for (AtlasObjectId objectId : exportRequest.getItemsToExport()) {
+ qualifiedNames.add(objectId.getUniqueAttributes().get(ATTR_QUALIFIED_NAME_KEY).toString());
+ }
+ return qualifiedNames;
+ }
+
+ private List<String> getEntitiesFromQualifiedNames(List<String> qualifiedNames) throws AtlasBaseException {
+
+ List<String> safeGUIDs = new ArrayList<>();
+ for(String qualifiedName : qualifiedNames) {
+ String guid = getGuidByUniqueAttributes(Collections.singletonMap(ATTR_QUALIFIED_NAME_KEY, qualifiedName));
+ safeGUIDs.add(guid);
+ }
+ return safeGUIDs;
+ }
+
+ private String getGuidByUniqueAttributes(Map<String, Object> uniqueAttributes) throws AtlasBaseException {
+ return entityStore.getGuidByUniqueAttributes(typeRegistry.getEntityTypeByName(TYPE_HIVE_TABLE), uniqueAttributes);
+ }
+
+ private String getDbName(String tableGuid) throws AtlasBaseException {
+ String dbGuid = AuditsWriter.ReplKeyGuidFinder.get(typeRegistry, entityStore, tableGuid);
+ return (String) entityStore.getById(dbGuid).getEntity().getAttribute(ATTR_NAME_KEY);
+ }
+
+ private Set<String> getGuidsToDelete(String dbName, List<String> excludeGUIDs, String sourceCluster) throws AtlasBaseException {
+
+ SearchParameters parameters = getSearchParameters(dbName, sourceCluster);
+ Set<String> unsafeGUIDs = new HashSet<>();
+
+ final int max = 10000;
+ int fetchedSize = 0;
+ int i = 0;
+ parameters.setLimit(max);
+
+ while (fetchedSize == (max * i)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("i={}, fetchedSize={}, unsafeGUIDs.size()={}", i, fetchedSize, unsafeGUIDs.size());
+ }
+
+ int offset = max * i;
+ parameters.setOffset(offset);
+
+ AtlasSearchResult searchResult = discoveryService.searchWithParameters(parameters);
+
+ if (CollectionUtils.isEmpty(searchResult.getEntities())) {
+ break;
+ }
+
+ for (AtlasEntityHeader entityHeader : searchResult.getEntities()) {
+ String guid = entityHeader.getGuid();
+ if (!excludeGUIDs.contains(guid)) {
+ unsafeGUIDs.add(guid);
+ }
+ }
+ fetchedSize = searchResult.getEntities().size();
+ i++;
+ }
+ return unsafeGUIDs;
+ }
+
+ private SearchParameters getSearchParameters(String dbName, String sourceCluster) {
+ String query = String.format(QUERY_DB_NAME_EQUALS, dbName);
+
+ SearchParameters parameters = new SearchParameters();
+ parameters.setExcludeDeletedEntities(false);
+ parameters.setTypeName(TYPE_HIVE_TABLE);
+ parameters.setExcludeDeletedEntities(true);
+
+ parameters.setClassification(String.format(REPLICATED_TAG_NAME, sourceCluster));
+ parameters.setAttributes(new HashSet<String>(){{ add(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM); }});
+ parameters.setQuery(query);
+
+ return parameters;
+ }
+
+ private void deleteTables(String sourceCluster, Set<String> guidsToDelete) throws AtlasBaseException {
+ if (!CollectionUtils.isEmpty(guidsToDelete)) {
+ entityStore.deleteByIds(new ArrayList<>(guidsToDelete));
+
+ endTstamp = System.currentTimeMillis();
+ createAuditEntry(sourceCluster, guidsToDelete);
+ }
+ }
+
+ private void createAuditEntry(String sourceCluster, Set<String> guidsToDelete) throws AtlasBaseException {
+ auditsWriter.write(AtlasAuthorizationUtils.getCurrentUserName(), sourceCluster, startTstamp, endTstamp, guidsToDelete);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Deleted entities => {}", guidsToDelete);
+ }
+ }
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 1bfe62b..95f6ec3 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -24,9 +24,11 @@ import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.discovery.EntityDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.Constants;
@@ -40,10 +42,7 @@ import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.lang.StringUtils;
-import org.apache.tinkerpop.shaded.kryo.io.Input;
import org.mockito.stubbing.Answer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testng.ITestContext;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterTest;
@@ -54,6 +53,7 @@ import org.testng.annotations.Test;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -61,11 +61,17 @@ import java.util.Map;
import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getInputStreamFrom;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromResourcesJson;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runAndVerifyQuickStart_v1_Import;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_FETCH_TYPE;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_KEY_REPLICATED_TO;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_SKIP_LINEAGE;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -392,7 +398,7 @@ public class ImportServiceTest extends ExportImportTestBase {
@Test
public void importServiceProcessesIOException() {
- ImportService importService = new ImportService(typeDefStore, typeRegistry, null, null,null);
+ ImportService importService = new ImportService(typeDefStore, typeRegistry, null,null, null,null);
AtlasImportRequest req = mock(AtlasImportRequest.class);
Answer<Map> answer = invocationOnMock -> {
@@ -447,8 +453,8 @@ public class ImportServiceTest extends ExportImportTestBase {
@Test(dataProvider = "salesNewTypeAttrs-next")
public void transformUpdatesForSubTypesAddsToExistingTransforms(InputStream inputStream) throws IOException, AtlasBaseException {
- loadBaseModel();
- loadHiveModel();
+ loadBaseModel();
+ loadHiveModel();
String transformJSON = "{ \"Asset\": { \"qualifiedName\":[ \"replace:@cl1:@cl2\" ] }, \"hive_table\": { \"qualifiedName\":[ \"lowercase\" ] } }";
ZipSource zipSource = new ZipSource(inputStream);
@@ -461,9 +467,70 @@ public class ImportServiceTest extends ExportImportTestBase {
assertEquals(importTransforms.getTransforms().get("hive_table").get("qualifiedName").size(), 2);
}
-
@Test(expectedExceptions = AtlasBaseException.class)
public void importEmptyZip() throws IOException, AtlasBaseException {
- new ZipSource((InputStream) getZipSource("empty.zip")[0][0]);
+ new ZipSource(getInputStreamFrom("empty.zip"));
+ }
+
+ @Test
+ public void testCheckHiveTableIncrementalSkipLineage() {
+ AtlasImportRequest importRequest;
+ AtlasExportRequest exportRequest;
+
+ importRequest = getImportRequest("cl1");
+ exportRequest = getExportRequest(FETCH_TYPE_INCREMENTAL, "cl2", true, getItemsToExport("hive_table", "hive_table"));
+ assertTrue(importService.checkHiveTableIncrementalSkipLineage(importRequest, exportRequest));
+
+ exportRequest = getExportRequest(FETCH_TYPE_INCREMENTAL, "cl2", true, getItemsToExport("hive_table", "hive_db", "hive_table"));
+ assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest, exportRequest));
+
+ exportRequest = getExportRequest(FETCH_TYPE_FULL, "cl2", true, getItemsToExport("hive_table", "hive_table"));
+ assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest, exportRequest));
+
+ exportRequest = getExportRequest(FETCH_TYPE_FULL, "", true, getItemsToExport("hive_table", "hive_table"));
+ assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest, exportRequest));
+
+ importRequest = getImportRequest("");
+ exportRequest = getExportRequest(FETCH_TYPE_INCREMENTAL, "cl2", true, getItemsToExport("hive_table", "hive_table"));
+ assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest, exportRequest));
+ }
+
+ private AtlasImportRequest getImportRequest(String replicatedFrom){
+ AtlasImportRequest importRequest = getDefaultImportRequest();
+
+ if (!StringUtils.isEmpty(replicatedFrom)) {
+ importRequest.setOption(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM, replicatedFrom);
+ }
+ return importRequest;
+ }
+
+ private AtlasExportRequest getExportRequest(String fetchType, String replicatedTo, boolean skipLineage, List<AtlasObjectId> itemsToExport){
+ AtlasExportRequest request = new AtlasExportRequest();
+
+ request.setOptions(getOptionsMap(fetchType, replicatedTo, skipLineage));
+ request.setItemsToExport(itemsToExport);
+ return request;
+ }
+
+ private List<AtlasObjectId> getItemsToExport(String... typeNames){
+ List<AtlasObjectId> itemsToExport = new ArrayList<>();
+ for (String typeName : typeNames) {
+ itemsToExport.add(new AtlasObjectId(typeName, "qualifiedName", "db.table@cluster"));
+ }
+ return itemsToExport;
+ }
+
+ private Map<String, Object> getOptionsMap(String fetchType, String replicatedTo, boolean skipLineage){
+ Map<String, Object> options = new HashMap<>();
+
+ if (!StringUtils.isEmpty(fetchType)) {
+ options.put(OPTION_FETCH_TYPE, fetchType);
+ }
+ if (!StringUtils.isEmpty(replicatedTo)) {
+ options.put(OPTION_KEY_REPLICATED_TO, replicatedTo);
+ }
+ options.put(OPTION_SKIP_LINEAGE, skipLineage);
+
+ return options;
}
}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java
new file mode 100644
index 0000000..c9bb11c
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import com.google.inject.Inject;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.ExportImportAuditEntry;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.runner.LocalSolrRunner;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.ITestContext;
+import org.testng.SkipException;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getInputStreamFrom;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class TableReplicationRequestProcessorTest extends ExportImportTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(TableReplicationRequestProcessorTest.class);
+
+ private static final String ENTITY_GUID_REPLICATED = "718a6d12-35a8-4731-aff8-3a64637a43a3";
+ private static final String ENTITY_GUID_NOT_REPLICATED_1 = "e19e5683-d9ae-436a-af1e-0873582d0f1e";
+ private static final String ENTITY_GUID_NOT_REPLICATED_2 = "2e28ae34-576e-4a8b-be48-cf5f925d7b15";
+ private static final String REPL_FROM = "cl1";
+ private static final String REPL_TRANSFORMER = "[{\"conditions\":{\"__entity\":\"topLevel: \"}," +
+ "\"action\":{\"__entity\":\"ADD_CLASSIFICATION: cl1_replicated\"}}," +
+ "{\"action\":{\"__entity.replicatedTo\":\"CLEAR:\",\"__entity.replicatedFrom\":\"CLEAR:\"}}," +
+ "{\"conditions\":{\"hive_db.clusterName\":\"EQUALS: cl1\"},\"action\":{\"hive_db.clusterName\":\"SET: cl2\"}}," +
+ "{\"conditions\":{\"hive_db.location\":\"STARTS_WITH_IGNORE_CASE: file:///\"}," +
+ "\"action\":{\"hive_db.location\":\"REPLACE_PREFIX: = :file:///=file:///\"}}," +
+ "{\"conditions\":{\"hive_storagedesc.location\":\"STARTS_WITH_IGNORE_CASE: file:///\"}," +
+ "\"action\":{\"hive_storagedesc.location\":\"REPLACE_PREFIX: = :file:///=file:///\"}}]";
+
+ @Inject
+ private ImportService importService;
+
+ @Inject
+ private AtlasTypeRegistry typeRegistry;
+
+ @Inject
+ private AtlasEntityStore entityStore;
+
+ @Inject
+ private ExportImportAuditService auditService;
+
+ @Inject
+ private AtlasTypeDefStore typeDefStore;
+
+ @BeforeTest
+ public void setupTest() throws IOException, AtlasBaseException {
+ RequestContext.clear();
+ RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
+ basicSetup(typeDefStore, typeRegistry);
+ }
+
+ @AfterClass
+ public void clear() throws Exception {
+ AtlasGraphProvider.cleanup();
+
+ if (useLocalSolr()) {
+ LocalSolrRunner.stop();
+ }
+ }
+
+ @DataProvider(name = "source1")
+ public static Object[][] getData1(ITestContext context) throws IOException, AtlasBaseException {
+ return getZipSource("repl_exp_1.zip");
+ }
+
+ public static InputStream getData2() {
+ return getInputStreamFrom("repl_exp_2.zip");
+ }
+
+ @Test(dataProvider = "source1")
+ public void importWithIsReplTrue(InputStream zipSource) throws AtlasBaseException, IOException {
+ AtlasImportRequest atlasImportRequest = getDefaultImportRequest();
+
+ atlasImportRequest.setOption("replicatedFrom", REPL_FROM);
+ atlasImportRequest.setOption("transformers", REPL_TRANSFORMER);
+
+ runImportWithParameters(importService, atlasImportRequest, zipSource);
+
+ runImportWithParameters(importService, atlasImportRequest, getData2());
+
+ assertAuditEntry();
+ }
+
+ private void assertAuditEntry() {
+ pauseForIndexCreation();
+ List<ExportImportAuditEntry> result;
+ try {
+ result = auditService.get("", "IMPORT_DELETE_REPL", "", "", "", 10, 0);
+ } catch (Exception e) {
+ throw new SkipException("audit entries not retrieved.");
+ }
+
+ assertNotNull(result);
+ assertTrue(result.size() > 0);
+
+ List<String> deletedGuids = AtlasType.fromJson(result.get(0).getResultSummary(), List.class);
+ assertNotNull(deletedGuids);
+ assertFalse(deletedGuids.contains(ENTITY_GUID_REPLICATED));
+ assertTrue(deletedGuids.contains(ENTITY_GUID_NOT_REPLICATED_1));
+ assertTrue(deletedGuids.contains(ENTITY_GUID_NOT_REPLICATED_2));
+ }
+}