You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2019/09/20 14:12:36 UTC

[atlas] branch master updated: ATLAS-3416 Import API: delete non-exported hive_table for table level replication

This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c781de  ATLAS-3416 Import API: delete non-exported hive_table for table level replication
1c781de is described below

commit 1c781deb40c23c79c2cf201b70b79a34b0b2acbe
Author: nikhilbonte <ni...@freestoneinfotech.com>
AuthorDate: Thu Aug 29 12:47:20 2019 +0530

    ATLAS-3416 Import API: delete non-exported hive_table for table level replication
    
    Signed-off-by: nixonrodrigues <ni...@apache.org>
---
 .../atlas/model/impexp/ExportImportAuditEntry.java |   1 +
 .../atlas/repository/impexp/AuditsWriter.java      |  22 +++
 .../atlas/repository/impexp/ImportService.java     |  40 ++++-
 .../impexp/TableReplicationRequestProcessor.java   | 181 +++++++++++++++++++++
 .../atlas/repository/impexp/ImportServiceTest.java |  83 +++++++++-
 .../TableReplicationRequestProcessorTest.java      | 146 +++++++++++++++++
 6 files changed, 461 insertions(+), 12 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java b/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java
index a199c6e..90f4296 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java
@@ -35,6 +35,7 @@ public class ExportImportAuditEntry extends AtlasBaseModelObject implements Seri
     private static final long serialVersionUID = 1L;
     public static final String OPERATION_EXPORT = "EXPORT";
     public static final String OPERATION_IMPORT = "IMPORT";
+    public static final String OPERATION_IMPORT_DELETE_REPL = "IMPORT_DELETE_REPL";
 
     private String userName;
     private String operation;
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index f2d36ed..55990f7 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -45,6 +45,7 @@ import org.springframework.util.CollectionUtils;
 import javax.inject.Inject;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 @Component
 public class AuditsWriter {
@@ -68,6 +69,10 @@ public class AuditsWriter {
         this.auditService = auditService;
     }
 
+    public AtlasServerService getAtlasServerService() {
+        return atlasServerService;
+    }
+
     public void write(String userName, AtlasExportResult result,
                       long startTime, long endTime,
                       List<String> entityCreationOrder) throws AtlasBaseException {
@@ -80,6 +85,12 @@ public class AuditsWriter {
         auditForImport.add(userName, result, startTime, endTime, entityCreationOrder);
     }
 
+    public void write(String userName, String sourceCluster,
+                      long startTime, long endTime,
+                      Set<String> entityCreationOrder) throws AtlasBaseException {
+        auditForImport.add(userName, sourceCluster, startTime, endTime, entityCreationOrder);
+    }
+
     private void updateReplicationAttribute(boolean isReplicationSet,
                                             String serverName, String serverFullName,
                                             List<String> exportedGuids,
@@ -238,5 +249,16 @@ public class AuditsWriter {
             updateReplicationAttribute(replicationOptionState, sourceServerName, sourceServerFullName, entityGuids,
                     Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker());
         }
+
+        public void add(String userName, String sourceCluster, long startTime,
+                        long endTime, Set<String> entityGuids) throws AtlasBaseException {
+
+            sourceServerName = getServerNameFromFullName(sourceCluster);
+            auditService.add(userName,
+                    sourceServerName, getCurrentClusterName(),
+                    ExportImportAuditEntry.OPERATION_IMPORT_DELETE_REPL,
+                    AtlasType.toJson(entityGuids), startTime, endTime, !entityGuids.isEmpty());
+
+        }
     }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index df49ae1..27001e3 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -24,8 +24,10 @@ import org.apache.atlas.RequestContext;
 import org.apache.atlas.entitytransform.BaseEntityHandler;
 import org.apache.atlas.entitytransform.TransformerContext;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.store.graph.BulkImporter;
 import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
@@ -54,24 +56,28 @@ import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMS_KEY;
 public class ImportService {
     private static final Logger LOG = LoggerFactory.getLogger(ImportService.class);
 
+    private static final String ATLAS_TYPE_HIVE_TABLE = "hive_table";
     private final AtlasTypeDefStore typeDefStore;
     private final AtlasTypeRegistry typeRegistry;
     private final BulkImporter bulkImporter;
     private final AuditsWriter auditsWriter;
     private final ImportTransformsShaper importTransformsShaper;
 
+    private TableReplicationRequestProcessor tableReplicationRequestProcessor;
+
     private long startTimestamp;
     private long endTimestamp;
 
     @Inject
     public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, BulkImporter bulkImporter,
-                         AuditsWriter auditsWriter,
-                         ImportTransformsShaper importTransformsShaper) {
+                         AuditsWriter auditsWriter, ImportTransformsShaper importTransformsShaper,
+                         TableReplicationRequestProcessor tableReplicationRequestProcessor) {
         this.typeDefStore = typeDefStore;
         this.typeRegistry = typeRegistry;
         this.bulkImporter = bulkImporter;
         this.auditsWriter = auditsWriter;
         this.importTransformsShaper = importTransformsShaper;
+        this.tableReplicationRequestProcessor = tableReplicationRequestProcessor;
     }
 
     public AtlasImportResult run(InputStream inputStream, String userName,
@@ -109,7 +115,11 @@ public class ImportService {
             startTimestamp = System.currentTimeMillis();
             processTypes(source.getTypesDef(), result);
             setStartPosition(request, source);
+
             processEntities(userName, source, result);
+
+            processReplicationDeletion(source.getExportResult().getRequest(), request);
+
         } catch (AtlasBaseException excp) {
             LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp);
 
@@ -228,6 +238,12 @@ public class ImportService {
         auditsWriter.write(userName, result, startTimestamp, endTimestamp, importSource.getCreationOrder());
     }
 
+    private void processReplicationDeletion(AtlasExportRequest exportRequest, AtlasImportRequest importRequest) throws AtlasBaseException {
+        if (checkHiveTableIncrementalSkipLineage(importRequest, exportRequest)) {
+            tableReplicationRequestProcessor.process(exportRequest, importRequest);
+        }
+    }
+
     private int getDuration(long endTime, long startTime) {
         return (int) (endTime - startTime);
     }
@@ -239,9 +255,25 @@ public class ImportService {
             }
 
             return new ZipSourceWithBackingDirectory(inputStream, configuredTemporaryDirectory);
-        }
-        catch (IOException ex) {
+        } catch (IOException ex) {
             throw new AtlasBaseException(ex);
         }
     }
+
+    @VisibleForTesting
+    boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest importRequest, AtlasExportRequest exportRequest) {
+        if (CollectionUtils.isEmpty(exportRequest.getItemsToExport())) {
+            return false;
+        }
+
+        for (AtlasObjectId itemToExport : exportRequest.getItemsToExport()) {
+            if (!itemToExport.getTypeName().equalsIgnoreCase(ATLAS_TYPE_HIVE_TABLE)){
+                return false;
+            }
+        }
+
+        return importRequest.isReplicationOptionSet() && exportRequest.isReplicationOptionSet() &&
+                exportRequest.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL) &&
+                exportRequest.getSkipLineageOptionValue();
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java
new file mode 100644
index 0000000..d5807a5
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.authorize.AtlasAuthorizationUtils;
+import org.apache.atlas.discovery.AtlasDiscoveryService;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+@Component
+public class TableReplicationRequestProcessor {
+    private static final Logger LOG = LoggerFactory.getLogger(TableReplicationRequestProcessor.class);
+
+    private static final String QUERY_DB_NAME_EQUALS= "where db.name='%s'";
+    private static final String ATTR_NAME_KEY = "name";
+    private static final String TYPE_HIVE_TABLE = "hive_table";
+    private static final String ATTR_QUALIFIED_NAME_KEY = "qualifiedName";
+    private static final String REPLICATED_TAG_NAME = "%s_replicated";
+
+    private long startTstamp;
+    private long endTstamp;
+    private AuditsWriter auditsWriter;
+    private AtlasEntityStore entityStore;
+    private AtlasTypeRegistry typeRegistry;
+    private AtlasDiscoveryService discoveryService;
+
+    @Inject
+    public TableReplicationRequestProcessor(AuditsWriter auditsWriter, AtlasEntityStore entityStore,
+                                            AtlasDiscoveryService atlasDiscoveryService, AtlasTypeRegistry typeRegistry) {
+        this.auditsWriter = auditsWriter;
+        this.entityStore = entityStore;
+        this.typeRegistry = typeRegistry;
+        this.discoveryService = atlasDiscoveryService;
+    }
+
+    public void process(AtlasExportRequest exportRequest, AtlasImportRequest importRequest) throws AtlasBaseException {
+        startTstamp = System.currentTimeMillis();
+        LOG.info("process: deleting entities with type hive_table which are not imported.");
+        String sourceCluster = importRequest.getOptionKeyReplicatedFrom();
+
+        List<String> qualifiedNames = getQualifiedNamesFromRequest(exportRequest);
+
+        List<String> safeGUIDs = getEntitiesFromQualifiedNames(qualifiedNames);
+
+        String dbName = getDbName(safeGUIDs.get(0));
+
+        Set<String> guidsToDelete = getGuidsToDelete(dbName, safeGUIDs, sourceCluster);
+
+        deleteTables(sourceCluster, guidsToDelete);
+    }
+
+    private List<String> getQualifiedNamesFromRequest(AtlasExportRequest exportRequest){
+        List<String> qualifiedNames = new ArrayList<>();
+
+        for (AtlasObjectId objectId : exportRequest.getItemsToExport()) {
+            qualifiedNames.add(objectId.getUniqueAttributes().get(ATTR_QUALIFIED_NAME_KEY).toString());
+        }
+        return qualifiedNames;
+    }
+
+    private List<String> getEntitiesFromQualifiedNames(List<String> qualifiedNames) throws AtlasBaseException {
+
+        List<String> safeGUIDs = new ArrayList<>();
+        for(String qualifiedName : qualifiedNames) {
+            String guid = getGuidByUniqueAttributes(Collections.singletonMap(ATTR_QUALIFIED_NAME_KEY, qualifiedName));
+            safeGUIDs.add(guid);
+        }
+        return safeGUIDs;
+    }
+
+    private String getGuidByUniqueAttributes(Map<String, Object> uniqueAttributes) throws AtlasBaseException {
+        return entityStore.getGuidByUniqueAttributes(typeRegistry.getEntityTypeByName(TYPE_HIVE_TABLE), uniqueAttributes);
+    }
+
+    private String getDbName(String tableGuid) throws AtlasBaseException {
+        String dbGuid = AuditsWriter.ReplKeyGuidFinder.get(typeRegistry, entityStore, tableGuid);
+        return (String) entityStore.getById(dbGuid).getEntity().getAttribute(ATTR_NAME_KEY);
+    }
+
+    private Set<String> getGuidsToDelete(String dbName, List<String> excludeGUIDs, String sourceCluster) throws AtlasBaseException {
+
+        SearchParameters parameters = getSearchParameters(dbName, sourceCluster);
+        Set<String> unsafeGUIDs = new HashSet<>();
+
+        final int max = 10000;
+        int fetchedSize = 0;
+        int i = 0;
+        parameters.setLimit(max);
+
+        while (fetchedSize == (max * i)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("i={}, fetchedSize={}, unsafeGUIDs.size()={}", i, fetchedSize, unsafeGUIDs.size());
+            }
+
+            int offset = max * i;
+            parameters.setOffset(offset);
+
+            AtlasSearchResult searchResult = discoveryService.searchWithParameters(parameters);
+
+            if (CollectionUtils.isEmpty(searchResult.getEntities())) {
+                break;
+            }
+
+            for (AtlasEntityHeader entityHeader : searchResult.getEntities()) {
+                String guid = entityHeader.getGuid();
+                if (!excludeGUIDs.contains(guid)) {
+                    unsafeGUIDs.add(guid);
+                }
+            }
+            fetchedSize = searchResult.getEntities().size();
+            i++;
+        }
+        return unsafeGUIDs;
+    }
+
+    private SearchParameters getSearchParameters(String dbName, String sourceCluster) {
+        String query = String.format(QUERY_DB_NAME_EQUALS, dbName);
+
+        SearchParameters parameters = new SearchParameters();
+        parameters.setExcludeDeletedEntities(false);
+        parameters.setTypeName(TYPE_HIVE_TABLE);
+        parameters.setExcludeDeletedEntities(true);
+
+        parameters.setClassification(String.format(REPLICATED_TAG_NAME, sourceCluster));
+        parameters.setAttributes(new HashSet<String>(){{ add(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM); }});
+        parameters.setQuery(query);
+
+        return parameters;
+    }
+
+    private void deleteTables(String sourceCluster, Set<String> guidsToDelete) throws AtlasBaseException {
+        if (!CollectionUtils.isEmpty(guidsToDelete)) {
+            entityStore.deleteByIds(new ArrayList<>(guidsToDelete));
+
+            endTstamp = System.currentTimeMillis();
+            createAuditEntry(sourceCluster, guidsToDelete);
+        }
+    }
+
+    private void createAuditEntry(String sourceCluster, Set<String> guidsToDelete) throws AtlasBaseException {
+        auditsWriter.write(AtlasAuthorizationUtils.getCurrentUserName(), sourceCluster, startTstamp, endTstamp, guidsToDelete);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Deleted entities => {}", guidsToDelete);
+        }
+    }
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 1bfe62b..95f6ec3 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -24,9 +24,11 @@ import org.apache.atlas.TestModules;
 import org.apache.atlas.TestUtilsV2;
 import org.apache.atlas.discovery.EntityDiscoveryService;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.repository.Constants;
@@ -40,10 +42,7 @@ import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.lang.StringUtils;
-import org.apache.tinkerpop.shaded.kryo.io.Input;
 import org.mockito.stubbing.Answer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.ITestContext;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterTest;
@@ -54,6 +53,7 @@ import org.testng.annotations.Test;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,11 +61,17 @@ import java.util.Map;
 import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getInputStreamFrom;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromResourcesJson;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runAndVerifyQuickStart_v1_Import;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_FETCH_TYPE;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_KEY_REPLICATED_TO;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_SKIP_LINEAGE;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -392,7 +398,7 @@ public class ImportServiceTest extends ExportImportTestBase {
 
     @Test
     public void importServiceProcessesIOException() {
-        ImportService importService = new ImportService(typeDefStore, typeRegistry, null, null,null);
+        ImportService importService = new ImportService(typeDefStore, typeRegistry, null,null, null,null);
         AtlasImportRequest req = mock(AtlasImportRequest.class);
 
         Answer<Map> answer = invocationOnMock -> {
@@ -447,8 +453,8 @@ public class ImportServiceTest extends ExportImportTestBase {
 
     @Test(dataProvider = "salesNewTypeAttrs-next")
     public void transformUpdatesForSubTypesAddsToExistingTransforms(InputStream inputStream) throws IOException, AtlasBaseException {
-            loadBaseModel();
-            loadHiveModel();
+        loadBaseModel();
+        loadHiveModel();
 
         String transformJSON = "{ \"Asset\": { \"qualifiedName\":[ \"replace:@cl1:@cl2\" ] }, \"hive_table\": { \"qualifiedName\":[ \"lowercase\" ] } }";
         ZipSource zipSource = new ZipSource(inputStream);
@@ -461,9 +467,70 @@ public class ImportServiceTest extends ExportImportTestBase {
         assertEquals(importTransforms.getTransforms().get("hive_table").get("qualifiedName").size(), 2);
     }
 
-
     @Test(expectedExceptions = AtlasBaseException.class)
     public void importEmptyZip() throws IOException, AtlasBaseException {
-        new ZipSource((InputStream) getZipSource("empty.zip")[0][0]);
+        new ZipSource(getInputStreamFrom("empty.zip"));
+    }
+
+    @Test
+    public void testCheckHiveTableIncrementalSkipLineage() {
+        AtlasImportRequest importRequest;
+        AtlasExportRequest exportRequest;
+
+        importRequest = getImportRequest("cl1");
+        exportRequest = getExportRequest(FETCH_TYPE_INCREMENTAL, "cl2", true, getItemsToExport("hive_table", "hive_table"));
+        assertTrue(importService.checkHiveTableIncrementalSkipLineage(importRequest, exportRequest));
+
+        exportRequest = getExportRequest(FETCH_TYPE_INCREMENTAL, "cl2", true, getItemsToExport("hive_table", "hive_db", "hive_table"));
+        assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest, exportRequest));
+
+        exportRequest = getExportRequest(FETCH_TYPE_FULL, "cl2", true, getItemsToExport("hive_table", "hive_table"));
+        assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest, exportRequest));
+
+        exportRequest = getExportRequest(FETCH_TYPE_FULL, "", true, getItemsToExport("hive_table", "hive_table"));
+        assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest, exportRequest));
+
+        importRequest = getImportRequest("");
+        exportRequest = getExportRequest(FETCH_TYPE_INCREMENTAL, "cl2", true, getItemsToExport("hive_table", "hive_table"));
+        assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest, exportRequest));
+    }
+
+    private AtlasImportRequest getImportRequest(String replicatedFrom){
+        AtlasImportRequest importRequest = getDefaultImportRequest();
+
+        if (!StringUtils.isEmpty(replicatedFrom)) {
+            importRequest.setOption(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM, replicatedFrom);
+        }
+        return importRequest;
+    }
+
+    private AtlasExportRequest getExportRequest(String fetchType, String replicatedTo, boolean skipLineage, List<AtlasObjectId> itemsToExport){
+        AtlasExportRequest request = new AtlasExportRequest();
+
+        request.setOptions(getOptionsMap(fetchType, replicatedTo, skipLineage));
+        request.setItemsToExport(itemsToExport);
+        return request;
+    }
+
+    private List<AtlasObjectId> getItemsToExport(String... typeNames){
+        List<AtlasObjectId> itemsToExport = new ArrayList<>();
+        for (String typeName : typeNames) {
+            itemsToExport.add(new AtlasObjectId(typeName, "qualifiedName", "db.table@cluster"));
+        }
+        return itemsToExport;
+    }
+
+    private Map<String, Object> getOptionsMap(String fetchType, String replicatedTo, boolean skipLineage){
+        Map<String, Object> options = new HashMap<>();
+
+        if (!StringUtils.isEmpty(fetchType)) {
+            options.put(OPTION_FETCH_TYPE, fetchType);
+        }
+        if (!StringUtils.isEmpty(replicatedTo)) {
+            options.put(OPTION_KEY_REPLICATED_TO, replicatedTo);
+        }
+        options.put(OPTION_SKIP_LINEAGE, skipLineage);
+
+        return options;
     }
 }
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java
new file mode 100644
index 0000000..c9bb11c
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import com.google.inject.Inject;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.ExportImportAuditEntry;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.runner.LocalSolrRunner;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.ITestContext;
+import org.testng.SkipException;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getInputStreamFrom;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class TableReplicationRequestProcessorTest extends ExportImportTestBase {
+    private static final Logger LOG = LoggerFactory.getLogger(TableReplicationRequestProcessorTest.class);
+
+    private static final String ENTITY_GUID_REPLICATED = "718a6d12-35a8-4731-aff8-3a64637a43a3";
+    private static final String ENTITY_GUID_NOT_REPLICATED_1 = "e19e5683-d9ae-436a-af1e-0873582d0f1e";
+    private static final String ENTITY_GUID_NOT_REPLICATED_2 = "2e28ae34-576e-4a8b-be48-cf5f925d7b15";
+    private static final String REPL_FROM = "cl1";
+    private static final String REPL_TRANSFORMER = "[{\"conditions\":{\"__entity\":\"topLevel: \"}," +
+            "\"action\":{\"__entity\":\"ADD_CLASSIFICATION: cl1_replicated\"}}," +
+            "{\"action\":{\"__entity.replicatedTo\":\"CLEAR:\",\"__entity.replicatedFrom\":\"CLEAR:\"}}," +
+            "{\"conditions\":{\"hive_db.clusterName\":\"EQUALS: cl1\"},\"action\":{\"hive_db.clusterName\":\"SET: cl2\"}}," +
+            "{\"conditions\":{\"hive_db.location\":\"STARTS_WITH_IGNORE_CASE: file:///\"}," +
+            "\"action\":{\"hive_db.location\":\"REPLACE_PREFIX: = :file:///=file:///\"}}," +
+            "{\"conditions\":{\"hive_storagedesc.location\":\"STARTS_WITH_IGNORE_CASE: file:///\"}," +
+            "\"action\":{\"hive_storagedesc.location\":\"REPLACE_PREFIX: = :file:///=file:///\"}}]";
+
+    @Inject
+    private ImportService importService;
+
+    @Inject
+    private AtlasTypeRegistry typeRegistry;
+
+    @Inject
+    private AtlasEntityStore entityStore;
+
+    @Inject
+    private ExportImportAuditService auditService;
+
+    @Inject
+    private AtlasTypeDefStore typeDefStore;
+
+    @BeforeTest
+    public void setupTest() throws IOException, AtlasBaseException {
+        RequestContext.clear();
+        RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
+        basicSetup(typeDefStore, typeRegistry);
+    }
+
+    @AfterClass
+    public void clear() throws Exception {
+        AtlasGraphProvider.cleanup();
+
+        if (useLocalSolr()) {
+            LocalSolrRunner.stop();
+        }
+    }
+
+    @DataProvider(name = "source1")
+    public static Object[][] getData1(ITestContext context) throws IOException, AtlasBaseException {
+        return getZipSource("repl_exp_1.zip");
+    }
+
+    public static InputStream getData2() {
+        return getInputStreamFrom("repl_exp_2.zip");
+    }
+
+    @Test(dataProvider = "source1")
+    public void importWithIsReplTrue(InputStream zipSource) throws AtlasBaseException, IOException {
+        AtlasImportRequest atlasImportRequest = getDefaultImportRequest();
+
+        atlasImportRequest.setOption("replicatedFrom", REPL_FROM);
+        atlasImportRequest.setOption("transformers", REPL_TRANSFORMER);
+
+        runImportWithParameters(importService, atlasImportRequest, zipSource);
+
+        runImportWithParameters(importService, atlasImportRequest, getData2());
+
+        assertAuditEntry();
+    }
+
+    private void assertAuditEntry() {
+        pauseForIndexCreation();
+        List<ExportImportAuditEntry> result;
+        try {
+            result = auditService.get("", "IMPORT_DELETE_REPL", "", "",  "", 10, 0);
+        } catch (Exception e) {
+            throw new SkipException("audit entries not retrieved.");
+        }
+
+        assertNotNull(result);
+        assertTrue(result.size() > 0);
+
+        List<String> deletedGuids = AtlasType.fromJson(result.get(0).getResultSummary(), List.class);
+        assertNotNull(deletedGuids);
+        assertFalse(deletedGuids.contains(ENTITY_GUID_REPLICATED));
+        assertTrue(deletedGuids.contains(ENTITY_GUID_NOT_REPLICATED_1));
+        assertTrue(deletedGuids.contains(ENTITY_GUID_NOT_REPLICATED_2));
+    }
+}