You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2019/09/05 06:25:28 UTC

[atlas] branch branch-0.8 updated: ATLAS-3371 Export API - Backport / refactorings for table level replication.

This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-0.8 by this push:
     new 72f8659  ATLAS-3371 Export API - Backport / refactorings for table level replication.
72f8659 is described below

commit 72f86594425fe5d6dd788ae7b457a14c56b84027
Author: nikhilbonte <ni...@freestoneinfotech.com>
AuthorDate: Wed Aug 14 14:12:46 2019 +0530

    ATLAS-3371 Export API - Backport / refactorings for table level replication.
    
    Signed-off-by: nixonrodrigues <ni...@apache.org>
---
 .../graph/GraphBackedMetadataRepository.java       |   8 +-
 .../atlas/repository/impexp/AuditsWriter.java      |  63 +++-
 .../atlas/repository/impexp/EntitiesExtractor.java |  81 +++++
 .../atlas/repository/impexp/ExportService.java     | 387 ++++-----------------
 .../atlas/repository/impexp/ExtractStrategy.java   |  28 ++
 .../impexp/IncrementalExportEntityProvider.java    |  39 ++-
 .../impexp/StartEntityFetchByExportRequest.java    | 208 +++++++++++
 .../atlas/repository/impexp/VertexExtractor.java   | 183 ++++++++++
 .../repository/impexp/ExportIncrementalTest.java   | 101 +++++-
 .../IncrementalExportEntityProviderTest.java       |   3 +-
 .../impexp/ReplicationEntityAttributeTest.java     |   9 +-
 .../StartEntityFetchByExportRequestTest.java       | 112 ++++++
 12 files changed, 880 insertions(+), 342 deletions(-)

diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
index 61e1623..0159010 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
@@ -138,7 +138,9 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
     public CreateUpdateEntitiesResult createEntities(ITypedReferenceableInstance... entities) throws RepositoryException,
                                                                                                      EntityExistsException {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("adding entities={}", entities);
+            for(ITypedReferenceableInstance instance : entities){
+                LOG.debug("adding entity {}", instance);
+            }
         }
 
         try {
@@ -412,7 +414,9 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
     @GraphTransaction
     public CreateUpdateEntitiesResult updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("updating entity {}", entitiesUpdated);
+            for(ITypedReferenceableInstance instance : entitiesUpdated){
+                LOG.debug("updating entity {}", instance);
+            }
         }
 
         try {
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index 1281fd2..612b403 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.atlas.repository.impexp;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.AtlasException;
@@ -28,8 +29,12 @@ import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.impexp.AtlasServer;
 import org.apache.atlas.model.impexp.ExportImportAuditEntry;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +42,7 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 
 import javax.inject.Inject;
+import java.util.Collections;
 import java.util.List;
 
 @Component
@@ -45,6 +51,8 @@ public class AuditsWriter {
     private static final String CLUSTER_NAME_DEFAULT = "default";
     private static final String DC_SERVER_NAME_SEPARATOR = "$";
 
+    private AtlasTypeRegistry typeRegistry;
+    private AtlasEntityStore entityStore;
     private AtlasServerService atlasServerService;
     private ExportImportAuditService auditService;
 
@@ -52,7 +60,10 @@ public class AuditsWriter {
     private ImportAudits auditForImport = new ImportAudits();
 
     @Inject
-    public AuditsWriter(AtlasServerService atlasServerService, ExportImportAuditService auditService) {
+    public AuditsWriter(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore,
+                        AtlasServerService atlasServerService, ExportImportAuditService auditService) {
+        this.typeRegistry = typeRegistry;
+        this.entityStore = entityStore;
         this.atlasServerService = atlasServerService;
         this.auditService = auditService;
     }
@@ -78,7 +89,10 @@ public class AuditsWriter {
             return;
         }
 
-        AtlasServer server = saveServer(serverName, serverFullName, exportedGuids.get(0), lastModifiedTimestamp);
+        String candidateGuid = exportedGuids.get(0);
+        String replGuidKey = ReplKeyGuidFinder.get(typeRegistry, entityStore, candidateGuid);
+        AtlasServer server = saveServer(serverName, serverFullName, replGuidKey, lastModifiedTimestamp);
+
         atlasServerService.updateEntitiesWithServer(server, exportedGuids, attrNameReplicated);
     }
 
@@ -124,6 +138,51 @@ public class AuditsWriter {
         atlasServerService.getCreateAtlasServer(getCurrentClusterName(), getCurrentClusterName());
     }
 
+    static class ReplKeyGuidFinder {
+        private static final String ENTITY_TYPE_HIVE_DB = "hive_db";
+        private static final String ENTITY_TYPE_HIVE_TABLE = "hive_table";
+        private static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column";
+        private static final String QUALIFIED_NAME = "qualifiedName";
+
+        public static String get(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, String candidateGuid) {
+            String guid = null;
+            try {
+                guid = getParentEntityGuid(typeRegistry, entityStore, candidateGuid);
+            } catch (AtlasBaseException e) {
+                LOG.error("Error fetching parent guid for child entity: {}", candidateGuid);
+            }
+
+            if (StringUtils.isEmpty(guid)) {
+                guid = candidateGuid;
+            }
+
+            return guid;
+        }
+
+        private static String getParentEntityGuid(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, String defaultGuid) throws AtlasBaseException {
+            AtlasEntity.AtlasEntityWithExtInfo extInfo = entityStore.getById(defaultGuid);
+            if (extInfo == null || extInfo.getEntity() == null) {
+                return null;
+            }
+
+            String typeName = extInfo.getEntity().getTypeName();
+            if (!typeName.equals(ENTITY_TYPE_HIVE_TABLE) && !typeName.equals(ENTITY_TYPE_HIVE_COLUMN)) {
+                return null;
+            }
+
+            Object hiveDBQualifiedName = extractHiveDBQualifiedName((String) extInfo.getEntity().getAttribute(QUALIFIED_NAME));
+            AtlasEntityType entityType = typeRegistry.getEntityTypeByName(ENTITY_TYPE_HIVE_DB);
+            return entityStore.getGuidByUniqueAttributes(entityType, Collections.singletonMap(QUALIFIED_NAME, hiveDBQualifiedName));
+        }
+
+        @VisibleForTesting
+        static String extractHiveDBQualifiedName(String qualifiedName) {
+            return String.format("%s@%s",
+                    StringUtils.substringBefore(qualifiedName, "."),
+                    StringUtils.substringAfter(qualifiedName, "@"));
+        }
+    }
+
     private class ExportAudits {
         private AtlasExportRequest request;
         private String targetServerName;
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
new file mode 100644
index 0000000..1a4deeb
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasTypeRegistry;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class EntitiesExtractor {
+    static final String PROPERTY_GUID = "__guid";
+    private static final String VERTEX_BASED_EXTRACT = "default";
+    private static final String INCREMENTAL_EXTRACT = "incremental";
+    private static final String RELATION_BASED_EXTRACT = "relationship";
+
+    private Map<String, ExtractStrategy> extractors = new HashMap<>();
+    private ExtractStrategy extractor;
+
+    public EntitiesExtractor(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
+        extractors.put(VERTEX_BASED_EXTRACT, new VertexExtractor(atlasGraph, typeRegistry));
+        extractors.put(INCREMENTAL_EXTRACT, new IncrementalExportEntityProvider(atlasGraph));
+    }
+
+    public void get(AtlasEntity entity, ExportService.ExportContext context) {
+        if(extractor == null) {
+            extractor = extractors.get(VERTEX_BASED_EXTRACT);
+        }
+
+        switch (context.fetchType) {
+            case CONNECTED:
+                extractor.connectedFetch(entity, context);
+                break;
+
+            case INCREMENTAL:
+                if (context.isHiveDBIncrementalSkipLineage()) {
+                    extractors.get(INCREMENTAL_EXTRACT).fullFetch(entity, context);
+                    break;
+                } else if (context.isHiveTableIncrementalSkipLineage()) {
+                    extractors.get(INCREMENTAL_EXTRACT).connectedFetch(entity, context);
+                    break;
+                }
+
+            case FULL:
+            default:
+                extractor.fullFetch(entity, context);
+        }
+    }
+
+    public void setExtractor(AtlasEntityDef atlasEntityDef) {
+        extractor = extractUsing(atlasEntityDef);
+    }
+
+    public void close() {
+        for (ExtractStrategy es : extractors.values()) {
+            es.close();
+        }
+    }
+
+    private ExtractStrategy extractUsing(AtlasEntityDef atlasEntityDef) {
+        return extractors.get(RELATION_BASED_EXTRACT);
+    }
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 7382d32..1e97443 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -18,7 +18,6 @@
 package org.apache.atlas.repository.impexp;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.exception.AtlasBaseException;
@@ -27,65 +26,54 @@ import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
 import org.apache.atlas.model.typedef.AtlasClassificationDef;
 import org.apache.atlas.model.typedef.AtlasEntityDef;
 import org.apache.atlas.model.typedef.AtlasEnumDef;
 import org.apache.atlas.model.typedef.AtlasStructDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
-import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
 import org.apache.atlas.repository.util.UniqueList;
-import org.apache.atlas.type.AtlasEntityType;
-import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
-import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.atlas.util.AtlasGremlinQueryProvider;
-import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
-import javax.script.ScriptEngine;
-import javax.script.ScriptException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
-import static org.apache.atlas.model.impexp.AtlasExportRequest.*;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL;
 
 @Component
 public class ExportService {
     private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
 
-    public static final String PROPERTY_GUID = "__guid";
-    private static final String PROPERTY_IS_PROCESS = "isProcess";
-
-
     private final AtlasTypeRegistry         typeRegistry;
-    private final String QUERY_BINDING_START_GUID = "startGuid";
+    private final StartEntityFetchByExportRequest startEntityFetchByExportRequest;
+    private final EntitiesExtractor         entitiesExtractor;
     private       AuditsWriter              auditsWriter;
-    private final AtlasGraph                atlasGraph;
     private final EntityGraphRetriever      entityGraphRetriever;
-    private final AtlasGremlinQueryProvider gremlinQueryProvider;
     private       ExportTypeProcessor       exportTypeProcessor;
     private final HdfsPathEntityCreator     hdfsPathEntityCreator;
-    private       IncrementalExportEntityProvider incrementalExportEntityProvider;
 
     @Inject
     public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph,
                          AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) {
         this.typeRegistry         = typeRegistry;
         this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
-        this.atlasGraph           = atlasGraph;
-        this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
         this.auditsWriter         = auditsWriter;
         this.hdfsPathEntityCreator = hdfsPathEntityCreator;
+        this.startEntityFetchByExportRequest = new StartEntityFetchByExportRequest(atlasGraph, typeRegistry, AtlasGremlinQueryProvider.INSTANCE);
+        this.entitiesExtractor      = new EntitiesExtractor(atlasGraph, typeRegistry);
     }
 
     public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
@@ -94,7 +82,7 @@ public class ExportService {
         AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP,
                 hostName, startTime, getCurrentChangeMarker());
 
-        ExportContext context = new ExportContext(atlasGraph, result, exportSink);
+        ExportContext context = new ExportContext(result, exportSink);
         exportTypeProcessor = new ExportTypeProcessor(typeRegistry, context);
 
         try {
@@ -108,19 +96,18 @@ public class ExportService {
         } catch(Exception ex) {
             LOG.error("Operation failed: ", ex);
         } finally {
-            atlasGraph.releaseGremlinScriptEngine(context.scriptEngine);
+            entitiesExtractor.close();
             LOG.info("<== export(user={}, from={}): status {}: changeMarker: {}",
                     userName, requestingIP, context.result.getOperationStatus(), context.result.getChangeMarker());
             context.clear();
             result.clear();
-            incrementalExportEntityProvider = null;
         }
 
         return context.result;
     }
 
     private long getCurrentChangeMarker() {
-        return Math.min(RequestContextV1.earliestActiveRequestTime(), RequestContext.earliestActiveRequestTime());
+        return RequestContextV1.earliestActiveRequestTime();
     }
 
     private void updateSinkWithOperationMetrics(String userName, ExportContext context,
@@ -128,7 +115,6 @@ public class ExportService {
                                                 long startTime, long endTime) throws AtlasBaseException {
         int duration = getOperationDuration(startTime, endTime);
         context.result.setSourceClusterName(AuditsWriter.getCurrentClusterName());
-        context.addToEntityCreationOrder(context.lineageProcessed);
 
         context.sink.setExportOrder(context.entityCreationOrder.getList());
         context.sink.setTypesDef(context.result.getData().getTypesDef());
@@ -197,7 +183,9 @@ public class ExportService {
     }
 
     private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) {
-        debugLog("==> processObjectId({})", item);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> processObjectId({})", item);
+        }
 
         try {
             List<String> entityGuids = getStartingEntity(item, context);
@@ -205,9 +193,10 @@ public class ExportService {
                 return AtlasExportResult.OperationStatus.FAIL;
             }
 
+            entitiesExtractor.setExtractor(typeRegistry.getEntityDefByName(item.getTypeName()));
+
             for (String guid : entityGuids) {
                 processEntityGuid(guid, context);
-                populateEntitesForIncremental(guid, context);
             }
 
             while (!context.guidsToProcess.isEmpty()) {
@@ -221,303 +210,63 @@ public class ExportService {
                     context.lineageProcessed.addAll(context.lineageToProcess.getList());
                     context.lineageToProcess.clear();
                 }
+                context.isSkipConnectedFetch = false;
             }
         } catch (AtlasBaseException excp) {
             LOG.error("Fetching entity failed for: {}", item, excp);
             return AtlasExportResult.OperationStatus.FAIL;
         }
 
-        debugLog("<== processObjectId({})", item);
-        return AtlasExportResult.OperationStatus.SUCCESS;
-    }
-
-    private void debugLog(String s, Object... params) {
-        if (!LOG.isDebugEnabled()) {
-            return;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== processObjectId({})", item);
         }
-
-        LOG.debug(s, params);
+        return AtlasExportResult.OperationStatus.SUCCESS;
     }
 
     private List<String> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException {
-        List<String> ret = null;
-
         if(item.getTypeName().equalsIgnoreCase(HdfsPathEntityCreator.HDFS_PATH_TYPE)) {
             hdfsPathEntityCreator.getCreateEntity(item);
         }
 
-        if (StringUtils.isNotEmpty(item.getGuid())) {
-            ret = Collections.singletonList(item.getGuid());
-        } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_FOR_TYPE) && StringUtils.isNotEmpty(item.getTypeName())) {
-            ret = getStartingEntityForMatchTypeForType(item, context);
-        } else if (StringUtils.isNotEmpty(item.getTypeName()) && MapUtils.isNotEmpty(item.getUniqueAttributes())) {
-            ret = getStartingEntityUsingQueryTemplate(item, context, ret);
-        }
-
-        if (ret == null) {
-            ret = Collections.emptyList();
-        }
-
-        logInfoStartingEntitiesFound(item, context, ret);
-        return ret;
-    }
-
-    private List<String> getStartingEntityUsingQueryTemplate(AtlasObjectId item, ExportContext context, List<String> ret) throws AtlasBaseException {
-        final String          queryTemplate = getQueryTemplateForMatchType(context);
-        final String          typeName      = item.getTypeName();
-        final AtlasEntityType entityType    = typeRegistry.getEntityTypeByName(typeName);
-
-        if (entityType == null) {
-            throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName);
-        }
-
-        for (Map.Entry<String, Object> e : item.getUniqueAttributes().entrySet()) {
-            String attrName  = e.getKey();
-            Object attrValue = e.getValue();
-
-            AtlasAttribute attribute = entityType.getAttribute(attrName);
-            if (attribute == null || attrValue == null) {
-                continue;
-            }
-
-            setupBindingsForTypeNameAttrNameAttrValue(context, typeName, attrValue, attribute);
-
-            List<String> guids = executeGremlinQueryForGuids(queryTemplate, context);
-
-            if (CollectionUtils.isNotEmpty(guids)) {
-                if (ret == null) {
-                    ret = new ArrayList<>();
-                }
-
-                for (String guid : guids) {
-                    if (!ret.contains(guid)) {
-                        ret.add(guid);
-                    }
-                }
-            }
-        }
-        return ret;
-    }
-
-    private List<String> getStartingEntityForMatchTypeForType(AtlasObjectId item, ExportContext context) {
-        setupBindingsForTypeName(context, item.getTypeName());
-        return executeGremlinQueryForGuids(getQueryTemplateForMatchType(context), context);
-    }
-
-    private void logInfoStartingEntitiesFound(AtlasObjectId item, ExportContext context, List<String> ret) {
-        LOG.info("export(item={}; matchType={}, fetchType={}): found {} entities: options: {}", item,
-                context.matchType, context.fetchType, ret.size(), AtlasType.toJson(context.result.getRequest()));
-    }
-
-    private void setupBindingsForTypeName(ExportContext context, String typeName) {
-        context.bindings.clear();
-        context.bindings.put("typeName", new HashSet<String>(Arrays.asList(StringUtils.split(typeName,","))));
-    }
-
-    private void setupBindingsForTypeNameAttrNameAttrValue(ExportContext context,
-                                                           String typeName, Object attrValue, AtlasAttribute attribute) {
-        context.bindings.clear();
-        context.bindings.put("typeName", typeName);
-        context.bindings.put("attrName", attribute.getQualifiedName());
-        context.bindings.put("attrValue", attrValue);
-    }
-
-    private String getQueryTemplateForMatchType(ExportContext context) {
-        if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_STARTS_WITH)) {
-            return gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_STARTS_WITH);
-        }
-
-        if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_ENDS_WITH)) {
-            return gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_ENDS_WITH);
-        }
-
-        if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_CONTAINS)) {
-            return gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_CONTAINS);
-        }
-
-        if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_MATCHES)) {
-            return gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_MATCHES);
-        }
-
-        if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_FOR_TYPE)) {
-            return gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_ALL_FOR_TYPE);
-        }
-
-        return gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_DEFAULT);
+        return startEntityFetchByExportRequest.get(context.result.getRequest(), item);
     }
 
     private void processEntityGuid(String guid, ExportContext context) throws AtlasBaseException {
-        debugLog("==> processEntityGuid({})", guid);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> processEntityGuid({})", guid);
+        }
 
         if (context.guidsProcessed.contains(guid)) {
             return;
         }
 
-        TraversalDirection direction = context.guidDirection.get(guid);
         AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
 
-        processEntity(guid, entityWithExtInfo, context, direction);
+        processEntity( entityWithExtInfo, context);
 
-        debugLog("<== processEntityGuid({})", guid);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== processEntityGuid({})", guid);
+        }
     }
 
-    public void processEntity(String guid, AtlasEntityWithExtInfo entityWithExtInfo,
-                               ExportContext context,
-                               TraversalDirection direction) throws AtlasBaseException {
-
-        if (!context.lineageProcessed.contains(guid) && context.doesTimestampQualify(entityWithExtInfo.getEntity())) {
-            context.addToEntityCreationOrder(entityWithExtInfo.getEntity().getGuid());
-        }
+    public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
 
         addEntity(entityWithExtInfo, context);
         exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
 
         context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
-        getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction);
+        entitiesExtractor.get(entityWithExtInfo.getEntity(), context);
 
         if (entityWithExtInfo.getReferredEntities() != null) {
             for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
                 exportTypeProcessor.addTypes(e, context);
-                getConntedEntitiesBasedOnOption(e, context, direction);
+                entitiesExtractor.get(e, context);
             }
 
             context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet());
         }
     }
 
-    private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
-        switch (context.fetchType) {
-            case CONNECTED:
-                getEntityGuidsForConnectedFetch(entity, context, direction);
-                break;
-
-            case INCREMENTAL:
-                if(context.isHiveDBIncrementalSkipLineage()) {
-                    break;
-                }
-
-            case FULL:
-            default:
-                getEntityGuidsForFullFetch(entity, context);
-        }
-    }
-
-    private void populateEntitesForIncremental(String topLevelEntityGuid, ExportContext context) throws AtlasBaseException {
-        if (context.isHiveDBIncrementalSkipLineage() == false || incrementalExportEntityProvider != null) {
-            return;
-        }
-
-        incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, context.scriptEngine);
-        incrementalExportEntityProvider.populate(topLevelEntityGuid, context.changeMarker, context.guidsToProcess);
-    }
-
-    private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
-        if (direction == null || direction == TraversalDirection.UNKNOWN) {
-            getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD);
-        } else {
-            if (isProcessEntity(entity)) {
-                direction = TraversalDirection.OUTWARD;
-            }
-
-            getConnectedEntityGuids(entity, context, direction);
-        }
-    }
-
-    private boolean isProcessEntity(AtlasEntity entity) {
-        String          typeName   = entity.getTypeName();
-        AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
-
-        return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
-    }
-
-    private void getConnectedEntityGuids(AtlasEntity entity, ExportContext context, TraversalDirection... directions) {
-        if(directions == null) {
-            return;
-        }
-
-        for (TraversalDirection direction : directions) {
-            String query = getQueryForTraversalDirection(direction);
-
-            if(LOG.isDebugEnabled()) {
-                debugLog("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query);
-            }
-
-            context.bindings.clear();
-            context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
-
-            List<HashMap<String, Object>> result = executeGremlinQuery(query, context);
-
-            if (CollectionUtils.isEmpty(result)) {
-                continue;
-            }
-
-            for (HashMap<String, Object> hashMap : result) {
-                String             guid             = (String) hashMap.get(PROPERTY_GUID);
-                TraversalDirection currentDirection = context.guidDirection.get(guid);
-                boolean            isLineage        = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
-
-                if(context.skipLineage && isLineage) continue;
-
-                if (currentDirection == null) {
-                    context.addToBeProcessed(isLineage, guid, direction);
-
-                } else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) {
-                    // the entity should be reprocessed to get inward entities
-                    context.guidsProcessed.remove(guid);
-                    context.addToBeProcessed(isLineage, guid, direction);
-                }
-            }
-
-            if(LOG.isDebugEnabled()) {
-                debugLog("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size());
-            }
-        }
-    }
-
-    private String getQueryForTraversalDirection(TraversalDirection direction) {
-        switch (direction) {
-            case INWARD:
-                return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE);
-
-            default:
-            case OUTWARD:
-                return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE);
-        }
-    }
-
-    private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) {
-        if(LOG.isDebugEnabled()) {
-            debugLog("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
-        }
-
-        String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
-
-        context.bindings.clear();
-        context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
-
-        List<HashMap<String, Object>> result = executeGremlinQuery(query, context);
-
-        if (CollectionUtils.isEmpty(result)) {
-            return;
-        }
-
-        for (HashMap<String, Object> hashMap : result) {
-            String  guid      = (String) hashMap.get(PROPERTY_GUID);
-            boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
-
-            if(context.getSkipLineage() && isLineage) continue;
-
-            if (!context.guidsProcessed.contains(guid)) {
-                context.addToBeProcessed(isLineage, guid, TraversalDirection.BOTH);
-            }
-        }
-
-        if(LOG.isDebugEnabled()) {
-            debugLog("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}",
-                                            entity.getGuid(), result.size(), context.guidsToProcess.size());
-        }
-    }
-
     private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
         if(context.sink.hasEntity(entityWithExtInfo.getEntity().getGuid())) {
             return;
@@ -537,7 +286,6 @@ public class ExportService {
         } else {
             List<AtlasEntity> entities = context.getEntitiesWithModifiedTimestamp(entityWithExtInfo);
             for (AtlasEntity e : entities) {
-                context.addToEntityCreationOrder(e.getGuid());
                 context.addToSink(new AtlasEntityWithExtInfo(e));
                 context.result.incrementMeticsCounter(String.format("entity:%s", e.getTypeName()));
             }
@@ -546,24 +294,6 @@ public class ExportService {
         context.reportProgress();
     }
 
-    private List<HashMap<String, Object>> executeGremlinQuery(String query, ExportContext context) {
-        try {
-            return (List<HashMap<String, Object>>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false);
-        } catch (ScriptException e) {
-            LOG.error("Script execution failed for query: ", query, e);
-            return null;
-        }
-    }
-
-    private List<String> executeGremlinQueryForGuids(String query, ExportContext context) {
-        try {
-            return (List<String>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false);
-        } catch (ScriptException e) {
-            LOG.error("Script execution failed for query: ", query, e);
-            return null;
-        }
-    }
-
     public enum TraversalDirection {
         UNKNOWN,
         INWARD,
@@ -596,11 +326,11 @@ public class ExportService {
     static class ExportContext {
         private static final int REPORTING_THREASHOLD = 1000;
         private static final String ATLAS_TYPE_HIVE_DB = "hive_db";
-
+        private static final String ATLAS_TYPE_HIVE_TABLE = "hive_table";
 
         final UniqueList<String>              entityCreationOrder = new UniqueList<>();
         final Set<String>                     guidsProcessed = new HashSet<>();
-        final private UniqueList<String>      guidsToProcess = new UniqueList<>();
+        final UniqueList<String>              guidsToProcess = new UniqueList<>();
         final UniqueList<String>              lineageToProcess = new UniqueList<>();
         final Set<String>                     lineageProcessed = new HashSet<>();
         final Map<String, TraversalDirection> guidDirection  = new HashMap<>();
@@ -611,31 +341,29 @@ public class ExportService {
         final AtlasExportResult               result;
         private final ZipSink                 sink;
 
-        private final ScriptEngine        scriptEngine;
-        private final Map<String, Object> bindings;
-        private final ExportFetchType     fetchType;
-        private final String              matchType;
-        private final boolean             skipLineage;
-        private final long                changeMarker;
-        private final boolean isHiveDBIncremental;
+        final ExportFetchType     fetchType;
+        final boolean             skipLineage;
+        final long                changeMarker;
+        boolean                   isSkipConnectedFetch;
 
+        private final boolean isHiveDBIncremental;
+        private final boolean isHiveTableIncremental;
         private       int                 progressReportCount = 0;
 
-        ExportContext(AtlasGraph atlasGraph, AtlasExportResult result, ZipSink sink) throws AtlasBaseException {
+        ExportContext(AtlasExportResult result, ZipSink sink) throws AtlasBaseException {
             this.result = result;
             this.sink   = sink;
 
-            scriptEngine = atlasGraph.getGremlinScriptEngine();
-            bindings     = new HashMap<>();
             fetchType    = ExportFetchType.from(result.getRequest().getFetchTypeOptionValue());
-            matchType    = result.getRequest().getMatchTypeOptionValue();
             skipLineage  = result.getRequest().getSkipLineageOptionValue();
             this.changeMarker = result.getRequest().getChangeTokenFromOptions();
             this.isHiveDBIncremental = checkHiveDBIncrementalSkipLineage(result.getRequest());
+            this.isHiveTableIncremental = checkHiveTableIncrementalSkipLineage(result.getRequest());
+            this.isSkipConnectedFetch = false;
         }
 
         private boolean checkHiveDBIncrementalSkipLineage(AtlasExportRequest request) {
-            if(request.getItemsToExport().size() == 0) {
+            if(CollectionUtils.isEmpty(request.getItemsToExport())) {
                 return false;
             }
 
@@ -644,6 +372,16 @@ public class ExportService {
                     request.getSkipLineageOptionValue();
         }
 
+        private boolean checkHiveTableIncrementalSkipLineage(AtlasExportRequest request) {
+            if(CollectionUtils.isEmpty(request.getItemsToExport())) {
+                return false;
+            }
+
+            return request.getItemsToExport().get(0).getTypeName().equalsIgnoreCase(ATLAS_TYPE_HIVE_TABLE) &&
+                    request.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL) &&
+                    request.getSkipLineageOptionValue();
+        }
+
         public List<AtlasEntity> getEntitiesWithModifiedTimestamp(AtlasEntityWithExtInfo entityWithExtInfo) {
             if(fetchType != ExportFetchType.INCREMENTAL) {
                 return new ArrayList<>();
@@ -701,6 +439,7 @@ public class ExportService {
         }
 
         public void addToSink(AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
+            addToEntityCreationOrder(entityWithExtInfo.getEntity().getGuid());
             sink.add(entityWithExtInfo);
         }
 
@@ -708,12 +447,12 @@ public class ExportService {
             return isHiveDBIncremental;
         }
 
-        public void addToEntityCreationOrder(String guid) {
-            entityCreationOrder.add(guid);
+        public boolean isHiveTableIncrementalSkipLineage() {
+            return isHiveTableIncremental;
         }
 
-        public void addToEntityCreationOrder(Collection<String> guids) {
-            entityCreationOrder.addAll(guids);
+        public void addToEntityCreationOrder(String guid) {
+            entityCreationOrder.add(guid);
         }
     }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java
new file mode 100644
index 0000000..91dd6f7
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+
+public interface ExtractStrategy {
+
+    void connectedFetch(AtlasEntity entity, ExportService.ExportContext context);
+    void fullFetch(AtlasEntity entity, ExportService.ExportContext context);
+    void close();
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
index 4a0d1b2..f9d2e57 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
@@ -18,6 +18,8 @@
 
 package org.apache.atlas.repository.impexp;
 
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.util.UniqueList;
 import org.slf4j.Logger;
@@ -32,7 +34,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-public class IncrementalExportEntityProvider {
+public class IncrementalExportEntityProvider implements ExtractStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(IncrementalExportEntityProvider.class);
 
     private static final String QUERY_PARAMETER_START_GUID = "startGuid";
@@ -45,15 +47,35 @@ public class IncrementalExportEntityProvider {
     private static final String QUERY_SD = QUERY_TABLE + ".out('__hive_table.sd')";
     private static final String QUERY_COLUMN = QUERY_TABLE + ".out('__hive_table.columns')";
     private static final String TRANSFORM_CLAUSE = ".transform{[__guid:it.__guid]}.toList()";
-
     private static final String TIMESTAMP_CLAUSE = ".has('__modificationTimestamp', T.gt, modificationTimestamp)";
 
+    private static final String QUERY_TABLE_DB = QUERY_DB + ".out('__hive_table.db')";
+    private static final String QUERY_TABLE_SD = QUERY_DB + ".out('__hive_table.sd')";
+    private static final String QUERY_TABLE_COLUMNS = QUERY_DB + ".out('__hive_table.columns')";
+
     private ScriptEngine scriptEngine;
 
     @Inject
-    public IncrementalExportEntityProvider(AtlasGraph atlasGraph, ScriptEngine scriptEngine) {
+    public IncrementalExportEntityProvider(AtlasGraph atlasGraph) {
         this.atlasGraph = atlasGraph;
-        this.scriptEngine = scriptEngine;
+        try {
+            this.scriptEngine = atlasGraph.getGremlinScriptEngine();
+        } catch (AtlasBaseException e) {
+            LOG.error("Error instantiating script engine.", e);
+        }
+    }
+
+    @Override
+    public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) {
+        populate(entity.getGuid(), context.changeMarker, context.guidsToProcess);
+    }
+
+    @Override
+    public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
+        //starting entity is hive_table
+        context.guidsToProcess.addAll(fetchGuids(entity.getGuid(), QUERY_TABLE_DB, context.changeMarker));
+        context.guidsToProcess.addAll(fetchGuids(entity.getGuid(), QUERY_TABLE_SD, context.changeMarker));
+        context.guidsToProcess.addAll(fetchGuids(entity.getGuid(), QUERY_TABLE_COLUMNS, context.changeMarker));
     }
 
     public void populate(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) {
@@ -99,7 +121,7 @@ public class IncrementalExportEntityProvider {
             }
 
             for (HashMap<String, Object> item : result) {
-                guids.add((String) item.get(ExportService.PROPERTY_GUID));
+                guids.add((String) item.get(EntitiesExtractor.PROPERTY_GUID));
             }
 
             return guids;
@@ -109,4 +131,11 @@ public class IncrementalExportEntityProvider {
             return null;
         }
     }
+
+    @Override
+    public void close() {
+        if (scriptEngine != null) {
+            atlasGraph.releaseGremlinScriptEngine(scriptEngine);
+        }
+    }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/StartEntityFetchByExportRequest.java b/repository/src/main/java/org/apache/atlas/repository/impexp/StartEntityFetchByExportRequest.java
new file mode 100644
index 0000000..fd67c16
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/StartEntityFetchByExportRequest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.util.AtlasGremlinQueryProvider;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_CONTAINS;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_ENDS_WITH;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_FOR_TYPE;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_MATCHES;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_STARTS_WITH;
+
+public class StartEntityFetchByExportRequest {
+    private static final Logger LOG = LoggerFactory.getLogger(StartEntityFetchByExportRequest.class);
+
+    static final String DEFAULT_MATCH = "*";
+    static final String BINDING_PARAMETER_TYPENAME = "typeName";
+    static final String BINDING_PARAMETER_ATTR_NAME = "attrName";
+    static final String BINDING_PARAMTER_ATTR_VALUE = "attrValue";
+
+    private AtlasGraph           atlasGraph;
+    private AtlasTypeRegistry    typeRegistry;
+
+
+    private Map<String, String>  matchTypeQuery;
+
+    public StartEntityFetchByExportRequest(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry, AtlasGremlinQueryProvider gremlinQueryProvider) {
+        this.typeRegistry = typeRegistry;
+        this.atlasGraph = atlasGraph;
+        initMatchTypeQueryMap(gremlinQueryProvider);
+    }
+
+    private void initMatchTypeQueryMap(AtlasGremlinQueryProvider gremlinQueryProvider) {
+        matchTypeQuery = new HashMap<>();
+        matchTypeQuery.put(MATCH_TYPE_STARTS_WITH, gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_TYPE_STARTS_WITH));
+        matchTypeQuery.put(MATCH_TYPE_ENDS_WITH, gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_TYPE_ENDS_WITH));
+        matchTypeQuery.put(MATCH_TYPE_CONTAINS, gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_TYPE_CONTAINS));
+        matchTypeQuery.put(MATCH_TYPE_MATCHES, gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_TYPE_MATCHES));
+        matchTypeQuery.put(MATCH_TYPE_FOR_TYPE, gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_TYPE_ALL_FOR_TYPE));
+        matchTypeQuery.put(DEFAULT_MATCH, gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_TYPE_DEFAULT));
+
+    }
+
+    public List<AtlasObjectId> get(AtlasExportRequest exportRequest) {
+        List<AtlasObjectId> list = new ArrayList<>();
+        for(AtlasObjectId objectId : exportRequest.getItemsToExport()) {
+            List<String> guids = get(exportRequest, objectId);
+            if (guids.isEmpty()) {
+                continue;
+            }
+
+            objectId.setGuid(guids.get(0));
+            list.add(objectId);
+        }
+
+        return list;
+    }
+
+    public List<String> get(AtlasExportRequest exportRequest, AtlasObjectId item) {
+        List<String> ret = new ArrayList<>();
+        String matchType = exportRequest.getMatchTypeOptionValue();
+
+        try {
+            if (StringUtils.isNotEmpty(item.getGuid())) {
+                ret.add(item.getGuid());
+                return ret;
+            }
+
+            if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_FOR_TYPE) && StringUtils.isNotEmpty(item.getTypeName())) {
+                ret = getEntitiesForMatchTypeType(item, matchType);
+                return ret;
+            }
+
+            if (StringUtils.isNotEmpty(item.getTypeName()) && MapUtils.isNotEmpty(item.getUniqueAttributes())) {
+                ret = getEntitiesForMatchTypeUsingUniqueAttributes(item, matchType);
+                return ret;
+            }
+        }
+        catch (AtlasBaseException ex) {
+            LOG.error("Error fetching starting entity for: {}", item, ex);
+        } finally {
+            LOG.info("export(item={}; matchType={}, fetchType={}): found {} entities: options: {}", item,
+                    exportRequest.getMatchTypeOptionValue(), exportRequest.getFetchTypeOptionValue(), ret.size(), AtlasType.toJson(exportRequest));
+        }
+
+        return ret;
+    }
+
+    private List<String> getEntitiesForMatchTypeUsingUniqueAttributes(AtlasObjectId item, String matchType) throws AtlasBaseException {
+        final String          queryTemplate = getQueryTemplateForMatchType(matchType);
+        final String          typeName      = item.getTypeName();
+        final AtlasEntityType entityType    = typeRegistry.getEntityTypeByName(typeName);
+
+        Set<String> ret = new HashSet<>();
+        if (entityType == null) {
+            throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName);
+        }
+
+        for (Map.Entry<String, Object> e : item.getUniqueAttributes().entrySet()) {
+            String attrName  = e.getKey();
+            Object attrValue = e.getValue();
+
+            AtlasStructType.AtlasAttribute attribute = entityType.getAttribute(attrName);
+            if (attribute == null || attrValue == null) {
+                continue;
+            }
+
+            List<String> guids = executeGremlinQuery(queryTemplate,
+                                    getBindingsForObjectId(typeName, attribute.getQualifiedName(), e.getValue()));
+
+            if (!CollectionUtils.isNotEmpty(guids)) {
+                continue;
+            }
+
+            ret.addAll(guids);
+        }
+
+        return new ArrayList<>(ret);
+    }
+
+    private List<String> getEntitiesForMatchTypeType(AtlasObjectId item, String matchType) {
+        return executeGremlinQuery(getQueryTemplateForMatchType(matchType), getBindingsForTypeName(item.getTypeName()));
+    }
+
+    @VisibleForTesting
+    String getQueryTemplateForMatchType(String matchType) {
+        return matchTypeQuery.containsKey(matchType)
+                ? matchTypeQuery.get(matchType)
+                : matchTypeQuery.get(DEFAULT_MATCH);
+    }
+
+    private HashMap<String, Object> getBindingsForTypeName(String typeName) {
+        HashMap<String, Object> ret = new HashMap<>();
+        ret.put(BINDING_PARAMETER_TYPENAME, new HashSet<>(Arrays.asList(StringUtils.split(typeName, ","))));
+        return ret;
+    }
+
+    private HashMap<String, Object> getBindingsForObjectId(String typeName, String attrName, Object attrValue) {
+        HashMap<String, Object> ret = new HashMap<>();
+        ret.put(BINDING_PARAMETER_TYPENAME, typeName);
+        ret.put(BINDING_PARAMETER_ATTR_NAME, attrName);
+        ret.put(BINDING_PARAMTER_ATTR_VALUE, attrValue);
+
+        return ret;
+    }
+
+    @VisibleForTesting
+    List<String> executeGremlinQuery(String query, Map<String, Object> bindings) {
+        try {
+            return (List<String>) atlasGraph.executeGremlinScript(getScriptEngine(), bindings, query, false);
+        } catch (ScriptException e) {
+            LOG.error("Script execution failed for query: ", query, e);
+            return null;
+        }
+    }
+
+    public ScriptEngine getScriptEngine() {
+        try {
+            return atlasGraph.getGremlinScriptEngine();
+        }
+        catch (AtlasBaseException e) {
+            LOG.error("Error initializing script engine.", e);
+        }
+
+        return null;
+    }
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java
new file mode 100644
index 0000000..433f731
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.util.AtlasGremlinQueryProvider;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.atlas.repository.impexp.EntitiesExtractor.PROPERTY_GUID;
+
+public class VertexExtractor implements ExtractStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(VertexExtractor.class);
+
+    private static final String PROPERTY_IS_PROCESS = "isProcess";
+    private static final String QUERY_BINDING_START_GUID = "startGuid";
+
+    private final AtlasGremlinQueryProvider gremlinQueryProvider;
+
+    private final Map<String, Object> bindings;
+    private AtlasGraph atlasGraph;
+    private AtlasTypeRegistry typeRegistry;
+    private ScriptEngine scriptEngine;
+
+    public VertexExtractor(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
+        this.atlasGraph = atlasGraph;
+        this.typeRegistry = typeRegistry;
+        try {
+            this.scriptEngine = atlasGraph.getGremlinScriptEngine();
+        } catch (AtlasBaseException e) {
+            LOG.error("Script Engine: Instantiation failed!");
+        }
+        this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
+        this.bindings = new HashMap<>();
+    }
+
+    @Override
+    public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) {
+        if (LOG.isDebugEnabled()){
+            LOG.debug("==> fullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
+        }
+
+        String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
+
+        bindings.clear();
+        bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
+
+        List<Map<String, Object>> result = executeGremlinQuery(query, context);
+
+        if (CollectionUtils.isEmpty(result)) {
+            return;
+        }
+
+        for (Map<String, Object> hashMap : result) {
+            String guid = (String) hashMap.get(PROPERTY_GUID);
+            boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
+
+            if (context.getSkipLineage() && isLineage) continue;
+
+            if (!context.guidsProcessed.contains(guid)) {
+                context.addToBeProcessed(isLineage, guid, ExportService.TraversalDirection.BOTH);
+            }
+        }
+    }
+
+    @Override
+    public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
+        if (LOG.isDebugEnabled()){
+            LOG.debug("==> connectedFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
+        }
+
+        ExportService.TraversalDirection direction = context.guidDirection.get(entity.getGuid());
+
+        if (direction == null || direction == ExportService.TraversalDirection.UNKNOWN) {
+            getConnectedEntityGuids(entity, context, ExportService.TraversalDirection.OUTWARD, ExportService.TraversalDirection.INWARD);
+        } else {
+            if (isProcessEntity(entity)) {
+                direction = ExportService.TraversalDirection.OUTWARD;
+            }
+
+            getConnectedEntityGuids(entity, context, direction);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (scriptEngine != null) {
+            atlasGraph.releaseGremlinScriptEngine(scriptEngine);
+        }
+    }
+
+    private void getConnectedEntityGuids(AtlasEntity entity, ExportService.ExportContext context, ExportService.TraversalDirection... directions) {
+        if (directions == null) {
+            return;
+        }
+
+        for (ExportService.TraversalDirection direction : directions) {
+            String query = getQueryForTraversalDirection(direction);
+
+            bindings.clear();
+            bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
+
+            List<Map<String, Object>> result = executeGremlinQuery(query, context);
+
+            if (CollectionUtils.isEmpty(result)) {
+                continue;
+            }
+
+            for (Map<String, Object> hashMap : result) {
+                String guid = (String) hashMap.get(PROPERTY_GUID);
+                ExportService.TraversalDirection currentDirection = context.guidDirection.get(guid);
+                boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
+
+                if (context.skipLineage && isLineage) continue;
+
+                if (currentDirection == null) {
+                    context.addToBeProcessed(isLineage, guid, direction);
+
+                } else if (currentDirection == ExportService.TraversalDirection.OUTWARD && direction == ExportService.TraversalDirection.INWARD) {
+                    // the entity should be reprocessed to get inward entities
+                    context.guidsProcessed.remove(guid);
+                    context.addToBeProcessed(isLineage, guid, direction);
+                }
+            }
+        }
+    }
+
+    private boolean isProcessEntity(AtlasEntity entity) {
+        String typeName = entity.getTypeName();
+        AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+
+        return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
+    }
+
+    private String getQueryForTraversalDirection(ExportService.TraversalDirection direction) {
+        switch (direction) {
+            case INWARD:
+                return this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE);
+
+            default:
+            case OUTWARD:
+                return this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE);
+        }
+    }
+
+    private List<Map<String, Object>> executeGremlinQuery(String query, ExportService.ExportContext context) {
+        try {
+            return (List<Map<String, Object>>) atlasGraph.executeGremlinScript(scriptEngine, bindings, query, false);
+        } catch (ScriptException e) {
+            LOG.error("Script execution failed for query: ", query, e);
+            return null;
+        }
+    }
+}
\ No newline at end of file
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
index a355297..5ec62e5 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
@@ -26,29 +26,37 @@ import org.apache.atlas.TestUtilsV2;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
 import org.apache.atlas.repository.util.UniqueList;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.utils.TestResourceFileUtils;
+import org.testng.ITestContext;
 import org.testng.SkipException;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
+import org.testng.annotations.DataProvider;
 
 import javax.inject.Inject;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 @Guice(modules = TestModules.TestOnlyModule.class)
 public class ExportIncrementalTest extends ExportImportTestBase {
@@ -62,15 +70,30 @@ public class ExportIncrementalTest extends ExportImportTestBase {
     ExportService exportService;
 
     @Inject
+    private ImportService importService;
+
+    @Inject
     private AtlasEntityStoreV1 entityStore;
 
     private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental";
     private final String EXPORT_REQUEST_CONNECTED = "export-connected";
+    private AtlasClassificationType classificationTypeT1;
     private long nextTimestamp;
 
+    private static final String EXPORT_INCREMENTAL = "incremental";
+    private static final String QUALIFIED_NAME_DB = "db_test_1@02052019";
+    private static final String QUALIFIED_NAME_TABLE_LINEAGE = "db_test_1.test_tbl_ctas_2@02052019";
+
+
+    private static final String GUID_DB = "f0b72ab4-7452-4e42-ac74-2aee7728cce4";
+    private static final String GUID_TABLE_2 = "8d0b834c-61ce-42d8-8f66-6fa51c36bccb";
+    private static final String GUID_TABLE_CTAS_2 = "eaec545b-3ac7-4e1b-a497-bd4a2b6434a2";
+
     @BeforeClass
     public void setup() throws IOException, AtlasBaseException {
         basicSetup(typeDefStore, typeRegistry);
+        classificationTypeT1 = createNewClassification();
+
         createEntities(entityStore, ENTITIES_SUB_DIR, new String[] { "db", "table-columns"});
         final Object[] entityGuids = new Object[]{DB_GUID, TABLE_GUID};
         verifyCreatedEntities(entityStore, entityGuids, 2);
@@ -108,8 +131,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
     public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException {
         final int expectedEntityCount = 1;
 
-        AtlasClassificationType ct = createNewClassification();
-        entityStore.addClassifications(TABLE_GUID, ImmutableList.of(ct.createDefaultValue()));
+        entityStore.addClassifications(TABLE_GUID, ImmutableList.of(classificationTypeT1.createDefaultValue()));
 
         AtlasExportRequest request = getIncrementalRequest(nextTimestamp);
         ZipSource source = runExportWithParameters(exportService, request);
@@ -127,7 +149,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
     }
 
     private AtlasClassificationType createNewClassification() {
-        createTypes(typeDefStore, ENTITIES_SUB_DIR,"typesDef-new-classification");
+        createTypes(typeDefStore, ENTITIES_SUB_DIR,"typesdef-new-classification");
         return typeRegistry.getClassificationTypeByName("T1");
     }
 
@@ -151,7 +173,6 @@ public class ExportIncrementalTest extends ExportImportTestBase {
 
         long postUpdateTableEntityTimestamp = tableEntity.getEntity().getUpdateTime().getTime();
         assertEquals(preExportTableEntityTimestamp, postUpdateTableEntityTimestamp);
-        nextTimestamp = updateTimesampForNextIncrementalExport(source);
     }
 
     @Test(dependsOnMethods = "atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn")
@@ -172,6 +193,36 @@ public class ExportIncrementalTest extends ExportImportTestBase {
         assertEquals(creationOrder.size(), zipCreationOrder.size());
     }
 
+    @DataProvider(name = "hiveDb")
+    public static Object[][] getData(ITestContext context) throws IOException, AtlasBaseException {
+        return getZipSource("hive_db_lineage.zip");
+    }
+
+    @Test(dataProvider = "hiveDb")
+    public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException {
+        runImportWithNoParameters(importService, zipSource);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableInrementalConnected() throws AtlasBaseException {
+        ZipSource source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true));
+        verifyExpectedEntities(getFileNames(source), GUID_DB, GUID_TABLE_CTAS_2);
+
+        nextTimestamp = updateTimesampForNextIncrementalExport(source);
+
+        try {
+            source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true));
+        }catch (SkipException e){
+
+        }
+
+        entityStore.addClassifications(GUID_TABLE_CTAS_2, ImmutableList.of(classificationTypeT1.createDefaultValue()));
+
+        source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true));
+        verifyExpectedEntities(getFileNames(source), GUID_TABLE_CTAS_2);
+    }
+
+
     private AtlasExportRequest getIncrementalRequest(long timestamp) {
         try {
             AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_INCREMENTAL, AtlasExportRequest.class);
@@ -179,7 +230,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
 
             return request;
         } catch (IOException e) {
-            throw new SkipException(String.format("getIncrementalRequest: '%s' could not be laoded.", EXPORT_REQUEST_INCREMENTAL));
+            throw new SkipException(String.format("getIncrementalRequest: '%s' could not be loaded.", EXPORT_REQUEST_INCREMENTAL));
         }
     }
 
@@ -187,8 +238,46 @@ public class ExportIncrementalTest extends ExportImportTestBase {
         try {
             return TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_CONNECTED, AtlasExportRequest.class);
         } catch (IOException e) {
-            throw new SkipException(String.format("getIncrementalRequest: '%s' could not be laoded.", EXPORT_REQUEST_CONNECTED));
+            throw new SkipException(String.format("getIncrementalRequest: '%s' could not be loaded.", EXPORT_REQUEST_CONNECTED));
         }
     }
 
+    private AtlasExportRequest getExportRequestForHiveTable(String name, String fetchType, long changeMarker, boolean skipLineage) {
+        AtlasExportRequest request = new AtlasExportRequest();
+
+        List<AtlasObjectId> itemsToExport = new ArrayList<>();
+        itemsToExport.add(new AtlasObjectId("hive_table", "qualifiedName", name));
+        request.setItemsToExport(itemsToExport);
+        request.setOptions(getOptionsMap(fetchType, changeMarker, skipLineage));
+
+        return request;
+    }
+
+    private Map<String, Object> getOptionsMap(String fetchType, long changeMarker, boolean skipLineage){
+        Map<String, Object> optionsMap = new HashMap<>();
+        optionsMap.put("fetchType", fetchType.isEmpty() ? "full" : fetchType );
+        optionsMap.put( "changeMarker", changeMarker);
+        optionsMap.put("skipLineage", skipLineage);
+
+        return optionsMap;
+    }
+
+    private void verifyExpectedEntities(List<String> fileNames, String... guids){
+        assertEquals(fileNames.size(), guids.length);
+        for (String guid : guids) {
+            assertTrue(fileNames.contains(guid.toLowerCase()));
+        }
+    }
+
+    private List<String> getFileNames(ZipSource zipSource){
+        List<String> ret = new ArrayList<>();
+        assertTrue(zipSource.hasNext());
+
+        while (zipSource.hasNext()){
+            AtlasEntity atlasEntity = zipSource.next();
+            assertNotNull(atlasEntity);
+            ret.add(atlasEntity.getGuid());
+        }
+        return ret;
+    }
 }
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
index 99d88eb..4615266 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
@@ -23,7 +23,6 @@ import org.apache.atlas.TestModules;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
-import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
 import org.apache.atlas.repository.util.UniqueList;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasTypeRegistry;
@@ -64,7 +63,7 @@ public class IncrementalExportEntityProviderTest extends ExportImportTestBase {
         verifyCreatedEntities(entityStore, entityGuids, 2);
 
         gremlinScriptEngine = atlasGraph.getGremlinScriptEngine();
-        incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, gremlinScriptEngine);
+        incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph);
     }
 
     @AfterClass
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
index 868b732..5e909e1 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
@@ -32,7 +32,6 @@ import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.typedef.AtlasEntityDef;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
-import org.apache.atlas.repository.store.graph.v1.EntityGraphMapper;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasType;
@@ -144,6 +143,14 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
         assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_FROM);
     }
 
+    @Test
+    public void replKeyGuidFinder() {
+        String expectedDBQualifiedName = "stocks_base@cl1";
+
+        assertEquals(AuditsWriter.ReplKeyGuidFinder.extractHiveDBQualifiedName("stocks_base.stocks_daily.volume@cl1"), expectedDBQualifiedName);
+        assertEquals(AuditsWriter.ReplKeyGuidFinder.extractHiveDBQualifiedName("stocks_base.stocks_daily@cl1"), expectedDBQualifiedName);
+    }
+
     private void assertReplicationAttribute(String attrNameReplication) throws AtlasBaseException {
         pauseForIndexCreation();
         AtlasEntity.AtlasEntitiesWithExtInfo entities = entityStore.getByIds(ImmutableList.of(DB_GUID, TABLE_GUID));
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/StartEntityFetchByExportRequestTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/StartEntityFetchByExportRequestTest.java
new file mode 100644
index 0000000..84407ad
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/StartEntityFetchByExportRequestTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.util.AtlasGremlin3QueryProvider;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.atlas.repository.impexp.StartEntityFetchByExportRequest.BINDING_PARAMETER_ATTR_NAME;
+import static org.apache.atlas.repository.impexp.StartEntityFetchByExportRequest.BINDING_PARAMETER_TYPENAME;
+import static org.apache.atlas.repository.impexp.StartEntityFetchByExportRequest.BINDING_PARAMTER_ATTR_VALUE;
+import static org.testng.Assert.assertEquals;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class StartEntityFetchByExportRequestTest extends ExportImportTestBase {
+
+    @Inject
+    private AtlasGraph atlasGraph;
+
+    @Inject
+    private AtlasTypeRegistry typeRegistry;
+
+    @Inject
+    private AtlasTypeDefStore typeDefStore;
+
+    private AtlasGremlin3QueryProvider atlasGremlin3QueryProvider;
+    private StartEntityFetchByExportRequestSpy startEntityFetchByExportRequestSpy;
+
+    private class StartEntityFetchByExportRequestSpy extends StartEntityFetchByExportRequest {
+        String generatedQuery;
+        Map<String, Object> suppliedBindingsMap;
+
+        public StartEntityFetchByExportRequestSpy(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
+            super(atlasGraph, typeRegistry, atlasGremlin3QueryProvider);
+        }
+
+        @Override
+        List<String> executeGremlinQuery(String query, Map<String, Object> bindings) {
+            this.generatedQuery = query;
+            this.suppliedBindingsMap = bindings;
+
+            return Collections.EMPTY_LIST;
+        }
+
+        public String getGeneratedQuery() {
+            return generatedQuery;
+        }
+
+        public Map<String, Object> getSuppliedBindingsMap() {
+            return suppliedBindingsMap;
+        }
+    }
+
+    @BeforeClass void setup() throws IOException, AtlasBaseException {
+        super.basicSetup(typeDefStore, typeRegistry);
+        atlasGremlin3QueryProvider = new AtlasGremlin3QueryProvider();
+        startEntityFetchByExportRequestSpy = new StartEntityFetchByExportRequestSpy(atlasGraph, typeRegistry);
+    }
+
+    @Test
+    public void fetchTypeGuid() {
+        String exportRequestJson = "{ \"itemsToExport\": [ { \"typeName\": \"hive_db\", \"guid\": \"111-222-333\" } ]}";
+        AtlasExportRequest exportRequest = AtlasType.fromJson(exportRequestJson, AtlasExportRequest.class);
+
+        List<AtlasObjectId> objectGuidMap = startEntityFetchByExportRequestSpy.get(exportRequest);
+
+        assertEquals(objectGuidMap.get(0).getGuid(), "111-222-333");
+    }
+
+    @Test
+    public void fetchTypeUniqueAttributes() {
+        String exportRequestJson = "{ \"itemsToExport\": [ { \"typeName\": \"hive_db\", \"uniqueAttributes\": {\"qualifiedName\": \"stocks@cl1\"} } ]}";
+        AtlasExportRequest exportRequest = AtlasType.fromJson(exportRequestJson, AtlasExportRequest.class);
+
+        startEntityFetchByExportRequestSpy.get(exportRequest);
+        assertEquals(startEntityFetchByExportRequestSpy.getGeneratedQuery(), startEntityFetchByExportRequestSpy.getQueryTemplateForMatchType(exportRequest.getMatchTypeOptionValue()));
+        assertEquals(startEntityFetchByExportRequestSpy.getSuppliedBindingsMap().get(BINDING_PARAMETER_TYPENAME), "hive_db");
+        assertEquals(startEntityFetchByExportRequestSpy.getSuppliedBindingsMap().get(BINDING_PARAMETER_ATTR_NAME), "Referenceable.qualifiedName");
+        assertEquals(startEntityFetchByExportRequestSpy.getSuppliedBindingsMap().get(BINDING_PARAMTER_ATTR_VALUE), "stocks@cl1");
+    }
+}
\ No newline at end of file