You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2019/06/20 10:30:11 UTC

[atlas] 01/02: ATLAS-3256 Modify export API to process with relationshipAttributes

This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 08b76391cfb8fd231218e5788261e5f2310703cc
Author: nikhilbonte <ni...@freestoneinfotech.com>
AuthorDate: Fri Jun 14 15:21:06 2019 +0530

    ATLAS-3256 Modify export API to process with relationshipAttributes
    
    Signed-off-by: nixonrodrigues <ni...@apache.org>
---
 .../atlas/repository/impexp/EntitiesExtractor.java |  81 +++++
 .../atlas/repository/impexp/ExportService.java     | 223 ++-----------
 .../atlas/repository/impexp/ExtractStrategy.java   |  28 ++
 .../impexp/IncrementalExportEntityProvider.java    |  32 +-
 .../impexp/RelationshipAttributesExtractor.java    | 115 +++++++
 .../atlas/repository/impexp/VertexExtractor.java   | 183 +++++++++++
 .../IncrementalExportEntityProviderTest.java       |   2 +-
 .../RelationshipAttributesExtractorTest.java       | 354 +++++++++++++++++++++
 8 files changed, 821 insertions(+), 197 deletions(-)

diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
new file mode 100644
index 0000000..15cb111
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasTypeRegistry;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class EntitiesExtractor {
+    static final String PROPERTY_GUID = "__guid";
+    private static final String VERTEX_BASED_EXTRACT = "default";
+    private static final String INCREMENTAL_EXTRACT = "incremental";
+    private static final String RELATION_BASED_EXTRACT = "relationship";
+
+    private Map<String, ExtractStrategy> extractors = new HashMap<>();
+    private ExtractStrategy extractor;
+
+    public EntitiesExtractor(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
+        extractors.put(VERTEX_BASED_EXTRACT, new VertexExtractor(atlasGraph, typeRegistry));
+        extractors.put(INCREMENTAL_EXTRACT, new IncrementalExportEntityProvider(atlasGraph));
+        extractors.put(RELATION_BASED_EXTRACT, new RelationshipAttributesExtractor(typeRegistry));
+    }
+
+    public void get(AtlasEntity entity, ExportService.ExportContext context) {
+        if(extractor == null) {
+            extractor = extractors.get(VERTEX_BASED_EXTRACT);
+        }
+
+        switch (context.fetchType) {
+            case CONNECTED:
+                extractor.connectedFetch(entity, context);
+                break;
+
+            case INCREMENTAL:
+                if (context.isHiveDBIncrementalSkipLineage()) {
+                    extractors.get(INCREMENTAL_EXTRACT).fullFetch(entity, context);
+                    break;
+                }
+
+            case FULL:
+            default:
+                extractor.fullFetch(entity, context);
+        }
+    }
+
+    public void setExtractor(AtlasEntityDef atlasEntityDef) {
+        extractor = extractUsing(atlasEntityDef);
+    }
+
+    public void close() {
+        for (ExtractStrategy es : extractors.values()) {
+            es.close();
+        }
+    }
+
+    private ExtractStrategy extractUsing(AtlasEntityDef atlasEntityDef) {
+        return (atlasEntityDef == null || atlasEntityDef.getRelationshipAttributeDefs().size() == 0)
+                ? extractors.get(VERTEX_BASED_EXTRACT)
+                : extractors.get(RELATION_BASED_EXTRACT);
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 11289ea..5055607 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -25,7 +25,6 @@ import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
 import org.apache.atlas.model.typedef.AtlasClassificationDef;
 import org.apache.atlas.model.typedef.AtlasEntityDef;
 import org.apache.atlas.model.typedef.AtlasEnumDef;
@@ -35,19 +34,13 @@ import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
 import org.apache.atlas.repository.util.UniqueList;
-import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.atlas.util.AtlasGremlinQueryProvider;
-import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery;
-import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
-import javax.script.ScriptEngine;
-import javax.script.ScriptException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -63,30 +56,23 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREM
 public class ExportService {
     private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
 
-    public static final String PROPERTY_GUID = "__guid";
-    private static final String PROPERTY_IS_PROCESS = "isProcess";
-
     private final AtlasTypeRegistry         typeRegistry;
-    private final String QUERY_BINDING_START_GUID = "startGuid";
     private final StartEntityFetchByExportRequest startEntityFetchByExportRequest;
+    private final EntitiesExtractor         entitiesExtractor;
     private       AuditsWriter              auditsWriter;
-    private final AtlasGraph                atlasGraph;
     private final EntityGraphRetriever      entityGraphRetriever;
-    private final AtlasGremlinQueryProvider gremlinQueryProvider;
     private       ExportTypeProcessor       exportTypeProcessor;
     private final HdfsPathEntityCreator     hdfsPathEntityCreator;
-    private       IncrementalExportEntityProvider incrementalExportEntityProvider;
 
     @Inject
     public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph,
                          AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) {
         this.typeRegistry         = typeRegistry;
         this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
-        this.atlasGraph           = atlasGraph;
-        this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
         this.auditsWriter         = auditsWriter;
         this.hdfsPathEntityCreator = hdfsPathEntityCreator;
         this.startEntityFetchByExportRequest = new StartEntityFetchByExportRequest(atlasGraph, typeRegistry, AtlasGremlinQueryProvider.INSTANCE);
+        this.entitiesExtractor = new EntitiesExtractor(atlasGraph, typeRegistry);
     }
 
     public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
@@ -95,7 +81,7 @@ public class ExportService {
         AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP,
                 hostName, startTime, getCurrentChangeMarker());
 
-        ExportContext context = new ExportContext(atlasGraph, result, exportSink);
+        ExportContext context = new ExportContext(result, exportSink);
         exportTypeProcessor = new ExportTypeProcessor(typeRegistry);
 
         try {
@@ -109,12 +95,12 @@ public class ExportService {
         } catch(Exception ex) {
             LOG.error("Operation failed: ", ex);
         } finally {
-            atlasGraph.releaseGremlinScriptEngine(context.scriptEngine);
+            entitiesExtractor.close();
+
             LOG.info("<== export(user={}, from={}): status {}: changeMarker: {}",
                     userName, requestingIP, context.result.getOperationStatus(), context.result.getChangeMarker());
             context.clear();
             result.clear();
-            incrementalExportEntityProvider = null;
         }
 
         return context.result;
@@ -203,7 +189,9 @@ public class ExportService {
     }
 
     private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) {
-        debugLog("==> processObjectId({})", item);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> processObjectId({})", item);
+        }
 
         try {
             List<String> entityGuids = getStartingEntity(item, context);
@@ -211,9 +199,10 @@ public class ExportService {
                 return AtlasExportResult.OperationStatus.FAIL;
             }
 
+            entitiesExtractor.setExtractor(typeRegistry.getEntityDefByName(item.getTypeName()));
+
             for (String guid : entityGuids) {
                 processEntityGuid(guid, context);
-                populateEntitesForIncremental(guid, context);
             }
 
             while (!context.guidsToProcess.isEmpty()) {
@@ -227,13 +216,16 @@ public class ExportService {
                     context.lineageProcessed.addAll(context.lineageToProcess.getList());
                     context.lineageToProcess.clear();
                 }
+                context.isSkipConnectedFetch = false;
             }
         } catch (AtlasBaseException excp) {
             LOG.error("Fetching entity failed for: {}", item, excp);
             return AtlasExportResult.OperationStatus.FAIL;
         }
 
-        debugLog("<== processObjectId({})", item);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== processObjectId({})", item);
+        }
         return AtlasExportResult.OperationStatus.SUCCESS;
     }
 
@@ -245,181 +237,41 @@ public class ExportService {
         return startEntityFetchByExportRequest.get(context.result.getRequest(), item);
     }
 
-    private void debugLog(String s, Object... params) {
-        if (!LOG.isDebugEnabled()) {
-            return;
-        }
-
-        LOG.debug(s, params);
-    }
-
     private void processEntityGuid(String guid, ExportContext context) throws AtlasBaseException {
-        debugLog("==> processEntityGuid({})", guid);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> processEntityGuid({})", guid);
+        }
 
         if (context.guidsProcessed.contains(guid)) {
             return;
         }
 
-        TraversalDirection direction = context.guidDirection.get(guid);
         AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
 
-        processEntity(entityWithExtInfo, context, direction);
-
-        debugLog("<== processEntityGuid({})", guid);
+        processEntity(entityWithExtInfo, context);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== processEntityGuid({})", guid);
+        }
     }
 
-    public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo,
-                              ExportContext context,
-                              TraversalDirection direction) throws AtlasBaseException {
-
+    public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
         addEntity(entityWithExtInfo, context);
         exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
 
         context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
-        getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction);
+        entitiesExtractor.get(entityWithExtInfo.getEntity(), context);
 
         if (entityWithExtInfo.getReferredEntities() != null) {
             for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
                 exportTypeProcessor.addTypes(e, context);
-                getConntedEntitiesBasedOnOption(e, context, direction);
+                entitiesExtractor.get(e, context);
             }
 
             context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet());
         }
     }
 
-    private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
-        switch (context.fetchType) {
-            case CONNECTED:
-                getEntityGuidsForConnectedFetch(entity, context, direction);
-                break;
-
-            case INCREMENTAL:
-                if(context.isHiveDBIncrementalSkipLineage()) {
-                    break;
-                }
-
-            case FULL:
-            default:
-                getEntityGuidsForFullFetch(entity, context);
-        }
-    }
-
-    private void populateEntitesForIncremental(String topLevelEntityGuid, ExportContext context) {
-        if (context.isHiveDBIncrementalSkipLineage() == false || incrementalExportEntityProvider != null) {
-            return;
-        }
-
-        incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, context.scriptEngine);
-        incrementalExportEntityProvider.populate(topLevelEntityGuid, context.changeMarker, context.guidsToProcess);
-    }
-
-    private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
-        if (direction == null || direction == TraversalDirection.UNKNOWN) {
-            getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD);
-        } else {
-            if (isProcessEntity(entity)) {
-                direction = TraversalDirection.OUTWARD;
-            }
-
-            getConnectedEntityGuids(entity, context, direction);
-        }
-    }
-
-    private boolean isProcessEntity(AtlasEntity entity) {
-        String          typeName   = entity.getTypeName();
-        AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
-
-        return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
-    }
-
-    private void getConnectedEntityGuids(AtlasEntity entity, ExportContext context, TraversalDirection... directions) {
-        if(directions == null) {
-            return;
-        }
-
-        for (TraversalDirection direction : directions) {
-            String query = getQueryForTraversalDirection(direction);
-
-            if(LOG.isDebugEnabled()) {
-                debugLog("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query);
-            }
-
-            context.bindings.clear();
-            context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
-
-            List<Map<String, Object>> result = executeGremlinQuery(query, context);
-
-            if (CollectionUtils.isEmpty(result)) {
-                continue;
-            }
-
-            for (Map<String, Object> hashMap : result) {
-                String             guid             = (String) hashMap.get(PROPERTY_GUID);
-                TraversalDirection currentDirection = context.guidDirection.get(guid);
-                boolean            isLineage        = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
-
-                if(context.skipLineage && isLineage) continue;
-
-                if (currentDirection == null) {
-                    context.addToBeProcessed(isLineage, guid, direction);
-
-                } else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) {
-                    // the entity should be reprocessed to get inward entities
-                    context.guidsProcessed.remove(guid);
-                    context.addToBeProcessed(isLineage, guid, direction);
-                }
-            }
-
-            if(LOG.isDebugEnabled()) {
-                debugLog("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size());
-            }
-        }
-    }
-
-    private String getQueryForTraversalDirection(TraversalDirection direction) {
-        switch (direction) {
-            case INWARD:
-                return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE);
-
-            default:
-            case OUTWARD:
-                return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE);
-        }
-    }
-
-    private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) {
-        if(LOG.isDebugEnabled()) {
-            debugLog("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
-        }
-
-        String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
-
-        context.bindings.clear();
-        context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
-
-        List<Map<String, Object>> result = executeGremlinQuery(query, context);
-
-        if (CollectionUtils.isEmpty(result)) {
-            return;
-        }
-
-        for (Map<String, Object> hashMap : result) {
-            String  guid      = (String) hashMap.get(PROPERTY_GUID);
-            boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
-
-            if(context.getSkipLineage() && isLineage) continue;
-
-            if (!context.guidsProcessed.contains(guid)) {
-                context.addToBeProcessed(isLineage, guid, TraversalDirection.BOTH);
-            }
-        }
-
-        if(LOG.isDebugEnabled()) {
-            debugLog("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}",
-                                            entity.getGuid(), result.size(), context.guidsToProcess.size());
-        }
-    }
 
     private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
         if(context.sink.hasEntity(entityWithExtInfo.getEntity().getGuid())) {
@@ -448,15 +300,6 @@ public class ExportService {
         context.reportProgress();
     }
 
-    private List<Map<String, Object>> executeGremlinQuery(String query, ExportContext context) {
-        try {
-            return (List<Map<String, Object>>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false);
-        } catch (ScriptException e) {
-            LOG.error("Script execution failed for query: ", query, e);
-            return null;
-        }
-    }
-
     public enum TraversalDirection {
         UNKNOWN,
         INWARD,
@@ -493,7 +336,7 @@ public class ExportService {
 
         final UniqueList<String>              entityCreationOrder = new UniqueList<>();
         final Set<String>                     guidsProcessed = new HashSet<>();
-        final private UniqueList<String>      guidsToProcess = new UniqueList<>();
+        final UniqueList<String>              guidsToProcess = new UniqueList<>();
         final UniqueList<String>              lineageToProcess = new UniqueList<>();
         final Set<String>                     lineageProcessed = new HashSet<>();
         final Map<String, TraversalDirection> guidDirection  = new HashMap<>();
@@ -505,25 +348,23 @@ public class ExportService {
         final AtlasExportResult               result;
         private final ZipSink                 sink;
 
-        private final ScriptEngine        scriptEngine;
-        private final Map<String, Object> bindings;
-        private final ExportFetchType     fetchType;
-        private final boolean             skipLineage;
-        private final long                changeMarker;
+        final ExportFetchType             fetchType;
+        final boolean                     skipLineage;
+        final long                        changeMarker;
+        boolean isSkipConnectedFetch;
         private final boolean isHiveDBIncremental;
 
         private       int                 progressReportCount = 0;
 
-        ExportContext(AtlasGraph atlasGraph, AtlasExportResult result, ZipSink sink) throws AtlasBaseException {
+        ExportContext(AtlasExportResult result, ZipSink sink) {
             this.result = result;
             this.sink   = sink;
 
-            scriptEngine = atlasGraph.getGremlinScriptEngine();
-            bindings     = new HashMap<>();
             fetchType    = ExportFetchType.from(result.getRequest().getFetchTypeOptionValue());
             skipLineage  = result.getRequest().getSkipLineageOptionValue();
             this.changeMarker = result.getRequest().getChangeTokenFromOptions();
             this.isHiveDBIncremental = checkHiveDBIncrementalSkipLineage(result.getRequest());
+            this.isSkipConnectedFetch = false;
         }
 
         private boolean checkHiveDBIncrementalSkipLineage(AtlasExportRequest request) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java
new file mode 100644
index 0000000..6475016
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+
+public interface ExtractStrategy {
+
+    void connectedFetch(AtlasEntity entity, ExportService.ExportContext context);
+    void fullFetch(AtlasEntity entity, ExportService.ExportContext context);
+    void close();
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
index 3a2a917..256d9de 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
@@ -18,6 +18,8 @@
 
 package org.apache.atlas.repository.impexp;
 
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.util.UniqueList;
 import org.slf4j.Logger;
@@ -28,11 +30,10 @@ import javax.script.ScriptEngine;
 import javax.script.ScriptException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-public class IncrementalExportEntityProvider {
+public class IncrementalExportEntityProvider implements ExtractStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(IncrementalExportEntityProvider.class);
 
     private static final String QUERY_PARAMETER_START_GUID = "startGuid";
@@ -50,9 +51,23 @@ public class IncrementalExportEntityProvider {
     private ScriptEngine scriptEngine;
 
     @Inject
-    public IncrementalExportEntityProvider(AtlasGraph atlasGraph, ScriptEngine scriptEngine) {
+    public IncrementalExportEntityProvider(AtlasGraph atlasGraph) {
         this.atlasGraph = atlasGraph;
-        this.scriptEngine = scriptEngine;
+        try {
+            this.scriptEngine = atlasGraph.getGremlinScriptEngine();
+        } catch (AtlasBaseException e) {
+            LOG.error("Error instantiating script engine.", e);
+        }
+    }
+
+    @Override
+    public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) {
+        populate(entity.getGuid(), context.changeMarker, context.guidsToProcess);
+    }
+
+    @Override
+    public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
+
     }
 
     public void populate(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) {
@@ -63,6 +78,13 @@ public class IncrementalExportEntityProvider {
         }
     }
 
+    @Override
+    public void close() {
+        if (scriptEngine != null) {
+            atlasGraph.releaseGremlinScriptEngine(scriptEngine);
+        }
+    }
+
     private void partial(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) {
         guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_TABLE, timeStamp));
         guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_SD, timeStamp));
@@ -98,7 +120,7 @@ public class IncrementalExportEntityProvider {
             }
 
             for (Map<String, Object> item : result) {
-                guids.add((String) item.get(ExportService.PROPERTY_GUID));
+                guids.add((String) item.get(EntitiesExtractor.PROPERTY_GUID));
             }
 
             return guids;
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java
new file mode 100644
index 0000000..d609071
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class RelationshipAttributesExtractor implements ExtractStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RelationshipAttributesExtractor.class);
+
+    private final AtlasTypeRegistry typeRegistry;
+
+    public RelationshipAttributesExtractor(AtlasTypeRegistry typeRegistry) {
+        this.typeRegistry = typeRegistry;
+    }
+
+    @Override
+    public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> fullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
+        }
+
+        List<AtlasRelatedObjectId> atlasRelatedObjectIdList = getRelatedObjectIds(entity);
+
+        for (AtlasRelatedObjectId ar : atlasRelatedObjectIdList) {
+            boolean isLineage = isLineageType(ar.getTypeName());
+
+            if (context.skipLineage && isLineage) {
+                continue;
+            }
+            context.addToBeProcessed(isLineage, ar.getGuid(), ExportService.TraversalDirection.BOTH);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== fullFetch({}): guidsToProcess {}", entity.getGuid(), context.guidsToProcess.size());
+        }
+    }
+
+    @Override
+    public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> connectedFetch({}): guidsToProcess {} isSkipConnectedFetch :{}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), context.isSkipConnectedFetch);
+        }
+
+        List<AtlasRelatedObjectId> atlasRelatedObjectIdList = getRelatedObjectIds(entity);
+        for (AtlasRelatedObjectId ar : atlasRelatedObjectIdList) {
+            boolean isLineage = isLineageType(ar.getTypeName());
+
+            if (context.skipLineage && isLineage) {
+                continue;
+            }
+            if (!context.isSkipConnectedFetch || isLineage) {
+                context.addToBeProcessed(isLineage, ar.getGuid(), ExportService.TraversalDirection.BOTH);
+            }
+        }
+
+        if(isLineageType(entity.getTypeName())){
+            context.isSkipConnectedFetch = false;
+        }else{
+            context.isSkipConnectedFetch = true;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> connectedFetch({}): guidsToProcess {}, isSkipConnectedFetch :{}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), context.isSkipConnectedFetch);
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+
+    private boolean isLineageType(String typeName) {
+        AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(typeName);
+        return entityDef.getSuperTypes().contains("Process");
+    }
+
+    private List<AtlasRelatedObjectId> getRelatedObjectIds(AtlasEntity entity) {
+        List<AtlasRelatedObjectId> relatedObjectIds = new ArrayList<>();
+
+        for (Object o : entity.getRelationshipAttributes().values()) {
+            if (o instanceof AtlasRelatedObjectId) {
+                relatedObjectIds.add((AtlasRelatedObjectId) o);
+            } else if (o instanceof Collection) {
+                relatedObjectIds.addAll((List) o);
+            }
+        }
+
+        return relatedObjectIds;
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java
new file mode 100644
index 0000000..a5b11be
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.util.AtlasGremlinQueryProvider;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.atlas.repository.impexp.EntitiesExtractor.PROPERTY_GUID;
+
+public class VertexExtractor implements ExtractStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(VertexExtractor.class);
+
+    private static final String PROPERTY_IS_PROCESS = "isProcess";
+    private static final String QUERY_BINDING_START_GUID = "startGuid";
+
+    private final AtlasGremlinQueryProvider gremlinQueryProvider;
+
+    private final Map<String, Object> bindings;
+    private AtlasGraph atlasGraph;
+    private AtlasTypeRegistry typeRegistry;
+    private ScriptEngine scriptEngine;
+
+    public VertexExtractor(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
+        this.atlasGraph = atlasGraph;
+        this.typeRegistry = typeRegistry;
+        try {
+            this.scriptEngine = atlasGraph.getGremlinScriptEngine();
+        } catch (AtlasBaseException e) {
+            LOG.error("Script Engine: Instantiation failed!");
+        }
+        this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
+        this.bindings = new HashMap<>();
+    }
+
+    @Override
+    public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) {
+        if (LOG.isDebugEnabled()){
+            LOG.debug("==> fullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
+        }
+
+        String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
+
+        bindings.clear();
+        bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
+
+        List<Map<String, Object>> result = executeGremlinQuery(query, context);
+
+        if (CollectionUtils.isEmpty(result)) {
+            return;
+        }
+
+        for (Map<String, Object> hashMap : result) {
+            String guid = (String) hashMap.get(PROPERTY_GUID);
+            boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
+
+            if (context.getSkipLineage() && isLineage) continue;
+
+            if (!context.guidsProcessed.contains(guid)) {
+                context.addToBeProcessed(isLineage, guid, ExportService.TraversalDirection.BOTH);
+            }
+        }
+    }
+
+    @Override
+    public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
+        if (LOG.isDebugEnabled()){
+            LOG.debug("==> connectedFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
+        }
+
+        ExportService.TraversalDirection direction = context.guidDirection.get(entity.getGuid());
+
+        if (direction == null || direction == ExportService.TraversalDirection.UNKNOWN) {
+            getConnectedEntityGuids(entity, context, ExportService.TraversalDirection.OUTWARD, ExportService.TraversalDirection.INWARD);
+        } else {
+            if (isProcessEntity(entity)) {
+                direction = ExportService.TraversalDirection.OUTWARD;
+            }
+
+            getConnectedEntityGuids(entity, context, direction);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (scriptEngine != null) {
+            atlasGraph.releaseGremlinScriptEngine(scriptEngine);
+        }
+    }
+
+    private void getConnectedEntityGuids(AtlasEntity entity, ExportService.ExportContext context, ExportService.TraversalDirection... directions) {
+        if (directions == null) {
+            return;
+        }
+
+        for (ExportService.TraversalDirection direction : directions) {
+            String query = getQueryForTraversalDirection(direction);
+
+            bindings.clear();
+            bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
+
+            List<Map<String, Object>> result = executeGremlinQuery(query, context);
+
+            if (CollectionUtils.isEmpty(result)) {
+                continue;
+            }
+
+            for (Map<String, Object> hashMap : result) {
+                String guid = (String) hashMap.get(PROPERTY_GUID);
+                ExportService.TraversalDirection currentDirection = context.guidDirection.get(guid);
+                boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
+
+                if (context.skipLineage && isLineage) continue;
+
+                if (currentDirection == null) {
+                    context.addToBeProcessed(isLineage, guid, direction);
+
+                } else if (currentDirection == ExportService.TraversalDirection.OUTWARD && direction == ExportService.TraversalDirection.INWARD) {
+                    // the entity should be reprocessed to get inward entities
+                    context.guidsProcessed.remove(guid);
+                    context.addToBeProcessed(isLineage, guid, direction);
+                }
+            }
+        }
+    }
+
+    private boolean isProcessEntity(AtlasEntity entity) {
+        String typeName = entity.getTypeName();
+        AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+
+        return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
+    }
+
+    private String getQueryForTraversalDirection(ExportService.TraversalDirection direction) {
+        switch (direction) {
+            case INWARD:
+                return this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE);
+
+            default:
+            case OUTWARD:
+                return this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE);
+        }
+    }
+
+    private List<Map<String, Object>> executeGremlinQuery(String query, ExportService.ExportContext context) {
+        try {
+            return (List<Map<String, Object>>) atlasGraph.executeGremlinScript(scriptEngine, bindings, query, false);
+        } catch (ScriptException e) {
+            LOG.error("Script execution failed for query: ", query, e);
+            return null;
+        }
+    }
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
index 85ed5f9..10a0838 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
@@ -63,7 +63,7 @@ public class IncrementalExportEntityProviderTest extends ExportImportTestBase {
         verifyCreatedEntities(entityStore, entityGuids, 2);
 
         gremlinScriptEngine = atlasGraph.getGremlinScriptEngine();
-        incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, gremlinScriptEngine);
+        incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph);
     }
 
     @AfterClass
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java
new file mode 100644
index 0000000..03d50f1
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.runner.LocalSolrRunner;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.testng.ITestContext;
+import org.testng.annotations.Test;
+import org.testng.annotations.Guice;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.DataProvider;
+
+import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.*;
+import static org.testng.Assert.*;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class RelationshipAttributesExtractorTest {
+
+    private static final String EXPORT_FULL = "full";
+    private static final String EXPORT_CONNECTED = "connected";
+    private static final String QUALIFIED_NAME_DB = "db_test_1@02052019";
+    private static final String QUALIFIED_NAME_TABLE_LINEAGE = "db_test_1.test_tbl_ctas_2@02052019";
+    private static final String QUALIFIED_NAME_TABLE_NON_LINEAGE = "db_test_1.test_tbl_1@02052019";
+
+    private static final String GUID_DB = "f0b72ab4-7452-4e42-ac74-2aee7728cce4";
+    private static final String GUID_TABLE_1 = "4d5adf00-2c9b-4877-ad23-c41fd7319150";
+    private static final String GUID_TABLE_2 = "8d0b834c-61ce-42d8-8f66-6fa51c36bccb";
+    private static final String GUID_TABLE_CTAS_2 = "eaec545b-3ac7-4e1b-a497-bd4a2b6434a2";
+    private static final String GUID_HIVE_PROCESS = "bd3138b2-f29e-4226-b859-de25eaa1c18b";
+
+    @Inject
+    private ImportService importService;
+
+    @Inject
+    AtlasTypeRegistry typeRegistry;
+
+    @Inject
+    private AtlasTypeDefStore typeDefStore;
+
+    @Inject
+    private ExportService exportService;
+
+    @BeforeClass
+    public void setup() throws IOException, AtlasBaseException {
+        loadBaseModel();
+        loadHiveModel();
+    }
+
+    @BeforeTest
+    public void setupTest() {
+        RequestContext.clear();
+        RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
+    }
+
+    @AfterClass
+    public void clear() throws Exception {
+        AtlasGraphProvider.cleanup();
+
+        if (useLocalSolr()) {
+            LocalSolrRunner.stop();
+        }
+    }
+
+    @DataProvider(name = "hiveDb")
+    public static Object[][] getData(ITestContext context) throws IOException, AtlasBaseException {
+        return getZipSource("hive_db_lineage.zip");
+    }
+
+    @Test(dataProvider = "hiveDb")
+    public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException {
+        runImportWithNoParameters(importService, zipSource);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportDBFull() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_FULL, false));
+        verifyDBFull(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportDBFullSkipLineageFull() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_FULL, true));
+        verifyDBFullSkipLineageFull(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableWithLineageFull() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_FULL, false));
+        verifyTableWithLineageFull(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableWithLineageSkipLineageFull() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_FULL, true));
+        verifyTableWithLineageSkipLineageFull(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableWithoutLineageFull() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_FULL, false));
+        verifyTableWithoutLineageFull(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableWithoutLineageSkipLineageFull() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_FULL, true));
+        verifyTableWithoutLineageSkipLineageFull(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportDBConn() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_CONNECTED, false));
+        verifyDBConn(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportDBSkipLineageConn() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_CONNECTED, true));
+        verifyDBSkipLineageConn(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableWithLineageConn() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_CONNECTED, false));
+        verifyTableWithLineageConn(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableWithLineageSkipLineageConn() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_CONNECTED, true));
+        verifyTableWithLineageSkipLineageConn(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableWithoutLineageConn() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_CONNECTED, false));
+        verifyTableWithoutLineageConn(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableWithoutLineageSkipLineageConn() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_CONNECTED, true));
+        verifyTableWithoutLineageSkipLineageConn(source);
+    }
+
+    private void loadHiveModel() throws IOException, AtlasBaseException {
+        loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry);
+    }
+
+    private void loadBaseModel() throws IOException, AtlasBaseException {
+        loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
+    }
+
+    private AtlasExportRequest getExportRequestForHiveDb(String hiveDbName, String fetchType, boolean skipLineage) {
+        AtlasExportRequest request = new AtlasExportRequest();
+
+        List<AtlasObjectId> itemsToExport = new ArrayList<>();
+        itemsToExport.add(new AtlasObjectId("hive_db", "qualifiedName", hiveDbName));
+        request.setItemsToExport(itemsToExport);
+        request.setOptions(getOptionsMap(fetchType, skipLineage));
+
+        return request;
+    }
+
+    private AtlasExportRequest getExportRequestForHiveTable(String hiveTableName, String fetchType, boolean skipLineage) {
+        AtlasExportRequest request = new AtlasExportRequest();
+
+        List<AtlasObjectId> itemsToExport = new ArrayList<>();
+        itemsToExport.add(new AtlasObjectId("hive_table", "qualifiedName", hiveTableName));
+        request.setItemsToExport(itemsToExport);
+        request.setOptions(getOptionsMap(fetchType, skipLineage));
+
+        return request;
+    }
+
+    private Map<String, Object> getOptionsMap(String fetchType, boolean skipLineage){
+        Map<String, Object> optionsMap = new HashMap<>();
+        optionsMap.put("fetchType", fetchType.isEmpty() ? "full" : fetchType );
+        optionsMap.put("skipLineage", skipLineage);
+
+        return optionsMap;
+    }
+
+    private ZipSource runExport(AtlasExportRequest request) throws AtlasBaseException, IOException {
+        final String requestingIP = "1.0.0.0";
+        final String hostName = "localhost";
+        final String userName = "admin";
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ZipSink zipSink = new ZipSink(baos);
+        AtlasExportResult result = exportService.run(zipSink, request, userName, hostName, requestingIP);
+
+        zipSink.close();
+
+        ByteArrayInputStream bis = new ByteArrayInputStream(baos.toByteArray());
+        ZipSource zipSource = new ZipSource(bis);
+        return zipSource;
+    }
+
+    private void verifyDBFull(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 5);
+
+        assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS);
+    }
+
+    private void verifyDBFullSkipLineageFull(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 4);
+
+        assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2);
+    }
+
+    private void verifyTableWithLineageFull(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 5);
+
+        assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS);
+    }
+
+    private void verifyTableWithLineageSkipLineageFull(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 4);
+
+        assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2);
+    }
+
+    private void verifyTableWithoutLineageFull(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 5);
+
+        assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2,GUID_HIVE_PROCESS);
+    }
+
+    private void verifyTableWithoutLineageSkipLineageFull(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 4);
+
+        assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2);
+    }
+
+
+    private void verifyDBConn(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 5);
+
+        assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS);
+    }
+
+    private void verifyDBSkipLineageConn(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 4);
+
+        assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2);
+    }
+
+    private void verifyTableWithLineageConn(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 4);
+
+        assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS);
+    }
+
+    private void verifyTableWithLineageSkipLineageConn(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(),2);
+
+        assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_CTAS_2);;
+    }
+
+    private void verifyTableWithoutLineageConn(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 2);
+
+        assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1);
+    }
+
+    private void verifyTableWithoutLineageSkipLineageConn(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 2);;
+
+        assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1);
+    }
+
+    private void verifyExpectedEntities(List<String> fileNames, String... guids){
+        assertEquals(fileNames.size(), guids.length);
+        for (String guid : guids) {
+            assertTrue(fileNames.contains(guid.toLowerCase()));
+        }
+    }
+
+    private List<String> getFileNames(ZipSource zipSource){
+        List<String> ret = new ArrayList<>();
+        assertTrue(zipSource.hasNext());
+
+        while (zipSource.hasNext()){
+            AtlasEntity atlasEntity = zipSource.next();
+            assertNotNull(atlasEntity);
+            ret.add(atlasEntity.getGuid());
+        }
+        return ret;
+    }
+}