You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2019/06/20 10:30:10 UTC

[atlas] branch master updated (1c399cc -> 8cd7496)

This is an automated email from the ASF dual-hosted git repository.

nixon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git.


    from 1c399cc  ATLAS-3290: Impala Hook should get database name and table name from vertex metadata
     new 08b7639  ATLAS-3256 Modify export API to process with relationshipAttributes
     new 8cd7496  ATLAS-3283 Export-import UTs are getting skipped

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../atlas/repository/impexp/EntitiesExtractor.java |  81 +++++
 .../atlas/repository/impexp/ExportService.java     | 223 ++-----------
 .../ExtractStrategy.java}                          |  17 +-
 .../impexp/IncrementalExportEntityProvider.java    |  32 +-
 .../impexp/RelationshipAttributesExtractor.java    | 115 +++++++
 .../atlas/repository/impexp/VertexExtractor.java   | 183 +++++++++++
 .../repository/impexp/ExportIncrementalTest.java   |  14 +-
 .../repository/impexp/ExportSkipLineageTest.java   |   3 +-
 .../atlas/repository/impexp/ImportServiceTest.java |   4 -
 .../IncrementalExportEntityProviderTest.java       |   4 +-
 .../RelationshipAttributesExtractorTest.java       | 354 +++++++++++++++++++++
 .../impexp/ReplicationEntityAttributeTest.java     |   4 +-
 .../impexp/ZipFileResourceTestUtils.java           |   6 +-
 13 files changed, 817 insertions(+), 223 deletions(-)
 create mode 100644 repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
 copy repository/src/main/java/org/apache/atlas/repository/{store/graph/v2/EntityStream.java => impexp/ExtractStrategy.java} (72%)
 create mode 100644 repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java
 create mode 100644 repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java
 create mode 100644 repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java


[atlas] 01/02: ATLAS-3256 Modify export API to process with relationshipAttributes

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 08b76391cfb8fd231218e5788261e5f2310703cc
Author: nikhilbonte <ni...@freestoneinfotech.com>
AuthorDate: Fri Jun 14 15:21:06 2019 +0530

    ATLAS-3256 Modify export API to process with relationshipAttributes
    
    Signed-off-by: nixonrodrigues <ni...@apache.org>
---
 .../atlas/repository/impexp/EntitiesExtractor.java |  81 +++++
 .../atlas/repository/impexp/ExportService.java     | 223 ++-----------
 .../atlas/repository/impexp/ExtractStrategy.java   |  28 ++
 .../impexp/IncrementalExportEntityProvider.java    |  32 +-
 .../impexp/RelationshipAttributesExtractor.java    | 115 +++++++
 .../atlas/repository/impexp/VertexExtractor.java   | 183 +++++++++++
 .../IncrementalExportEntityProviderTest.java       |   2 +-
 .../RelationshipAttributesExtractorTest.java       | 354 +++++++++++++++++++++
 8 files changed, 821 insertions(+), 197 deletions(-)

diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
new file mode 100644
index 0000000..15cb111
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasTypeRegistry;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class EntitiesExtractor {
+    static final String PROPERTY_GUID = "__guid";
+    private static final String VERTEX_BASED_EXTRACT = "default";
+    private static final String INCREMENTAL_EXTRACT = "incremental";
+    private static final String RELATION_BASED_EXTRACT = "relationship";
+
+    private Map<String, ExtractStrategy> extractors = new HashMap<>();
+    private ExtractStrategy extractor;
+
+    public EntitiesExtractor(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
+        extractors.put(VERTEX_BASED_EXTRACT, new VertexExtractor(atlasGraph, typeRegistry));
+        extractors.put(INCREMENTAL_EXTRACT, new IncrementalExportEntityProvider(atlasGraph));
+        extractors.put(RELATION_BASED_EXTRACT, new RelationshipAttributesExtractor(typeRegistry));
+    }
+
+    public void get(AtlasEntity entity, ExportService.ExportContext context) {
+        if(extractor == null) {
+            extractor = extractors.get(VERTEX_BASED_EXTRACT);
+        }
+
+        switch (context.fetchType) {
+            case CONNECTED:
+                extractor.connectedFetch(entity, context);
+                break;
+
+            case INCREMENTAL:
+                if (context.isHiveDBIncrementalSkipLineage()) {
+                    extractors.get(INCREMENTAL_EXTRACT).fullFetch(entity, context);
+                    break;
+                }
+
+            case FULL:
+            default:
+                extractor.fullFetch(entity, context);
+        }
+    }
+
+    public void setExtractor(AtlasEntityDef atlasEntityDef) {
+        extractor = extractUsing(atlasEntityDef);
+    }
+
+    public void close() {
+        for (ExtractStrategy es : extractors.values()) {
+            es.close();
+        }
+    }
+
+    private ExtractStrategy extractUsing(AtlasEntityDef atlasEntityDef) {
+        return (atlasEntityDef == null || atlasEntityDef.getRelationshipAttributeDefs().size() == 0)
+                ? extractors.get(VERTEX_BASED_EXTRACT)
+                : extractors.get(RELATION_BASED_EXTRACT);
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 11289ea..5055607 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -25,7 +25,6 @@ import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
 import org.apache.atlas.model.typedef.AtlasClassificationDef;
 import org.apache.atlas.model.typedef.AtlasEntityDef;
 import org.apache.atlas.model.typedef.AtlasEnumDef;
@@ -35,19 +34,13 @@ import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
 import org.apache.atlas.repository.util.UniqueList;
-import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.atlas.util.AtlasGremlinQueryProvider;
-import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery;
-import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
-import javax.script.ScriptEngine;
-import javax.script.ScriptException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -63,30 +56,23 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREM
 public class ExportService {
     private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
 
-    public static final String PROPERTY_GUID = "__guid";
-    private static final String PROPERTY_IS_PROCESS = "isProcess";
-
     private final AtlasTypeRegistry         typeRegistry;
-    private final String QUERY_BINDING_START_GUID = "startGuid";
     private final StartEntityFetchByExportRequest startEntityFetchByExportRequest;
+    private final EntitiesExtractor         entitiesExtractor;
     private       AuditsWriter              auditsWriter;
-    private final AtlasGraph                atlasGraph;
     private final EntityGraphRetriever      entityGraphRetriever;
-    private final AtlasGremlinQueryProvider gremlinQueryProvider;
     private       ExportTypeProcessor       exportTypeProcessor;
     private final HdfsPathEntityCreator     hdfsPathEntityCreator;
-    private       IncrementalExportEntityProvider incrementalExportEntityProvider;
 
     @Inject
     public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph,
                          AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) {
         this.typeRegistry         = typeRegistry;
         this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
-        this.atlasGraph           = atlasGraph;
-        this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
         this.auditsWriter         = auditsWriter;
         this.hdfsPathEntityCreator = hdfsPathEntityCreator;
         this.startEntityFetchByExportRequest = new StartEntityFetchByExportRequest(atlasGraph, typeRegistry, AtlasGremlinQueryProvider.INSTANCE);
+        this.entitiesExtractor = new EntitiesExtractor(atlasGraph, typeRegistry);
     }
 
     public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
@@ -95,7 +81,7 @@ public class ExportService {
         AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP,
                 hostName, startTime, getCurrentChangeMarker());
 
-        ExportContext context = new ExportContext(atlasGraph, result, exportSink);
+        ExportContext context = new ExportContext(result, exportSink);
         exportTypeProcessor = new ExportTypeProcessor(typeRegistry);
 
         try {
@@ -109,12 +95,12 @@ public class ExportService {
         } catch(Exception ex) {
             LOG.error("Operation failed: ", ex);
         } finally {
-            atlasGraph.releaseGremlinScriptEngine(context.scriptEngine);
+            entitiesExtractor.close();
+
             LOG.info("<== export(user={}, from={}): status {}: changeMarker: {}",
                     userName, requestingIP, context.result.getOperationStatus(), context.result.getChangeMarker());
             context.clear();
             result.clear();
-            incrementalExportEntityProvider = null;
         }
 
         return context.result;
@@ -203,7 +189,9 @@ public class ExportService {
     }
 
     private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) {
-        debugLog("==> processObjectId({})", item);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> processObjectId({})", item);
+        }
 
         try {
             List<String> entityGuids = getStartingEntity(item, context);
@@ -211,9 +199,10 @@ public class ExportService {
                 return AtlasExportResult.OperationStatus.FAIL;
             }
 
+            entitiesExtractor.setExtractor(typeRegistry.getEntityDefByName(item.getTypeName()));
+
             for (String guid : entityGuids) {
                 processEntityGuid(guid, context);
-                populateEntitesForIncremental(guid, context);
             }
 
             while (!context.guidsToProcess.isEmpty()) {
@@ -227,13 +216,16 @@ public class ExportService {
                     context.lineageProcessed.addAll(context.lineageToProcess.getList());
                     context.lineageToProcess.clear();
                 }
+                context.isSkipConnectedFetch = false;
             }
         } catch (AtlasBaseException excp) {
             LOG.error("Fetching entity failed for: {}", item, excp);
             return AtlasExportResult.OperationStatus.FAIL;
         }
 
-        debugLog("<== processObjectId({})", item);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== processObjectId({})", item);
+        }
         return AtlasExportResult.OperationStatus.SUCCESS;
     }
 
@@ -245,181 +237,41 @@ public class ExportService {
         return startEntityFetchByExportRequest.get(context.result.getRequest(), item);
     }
 
-    private void debugLog(String s, Object... params) {
-        if (!LOG.isDebugEnabled()) {
-            return;
-        }
-
-        LOG.debug(s, params);
-    }
-
     private void processEntityGuid(String guid, ExportContext context) throws AtlasBaseException {
-        debugLog("==> processEntityGuid({})", guid);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> processEntityGuid({})", guid);
+        }
 
         if (context.guidsProcessed.contains(guid)) {
             return;
         }
 
-        TraversalDirection direction = context.guidDirection.get(guid);
         AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
 
-        processEntity(entityWithExtInfo, context, direction);
-
-        debugLog("<== processEntityGuid({})", guid);
+        processEntity(entityWithExtInfo, context);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== processEntityGuid({})", guid);
+        }
     }
 
-    public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo,
-                              ExportContext context,
-                              TraversalDirection direction) throws AtlasBaseException {
-
+    public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
         addEntity(entityWithExtInfo, context);
         exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
 
         context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
-        getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction);
+        entitiesExtractor.get(entityWithExtInfo.getEntity(), context);
 
         if (entityWithExtInfo.getReferredEntities() != null) {
             for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
                 exportTypeProcessor.addTypes(e, context);
-                getConntedEntitiesBasedOnOption(e, context, direction);
+                entitiesExtractor.get(e, context);
             }
 
             context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet());
         }
     }
 
-    private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
-        switch (context.fetchType) {
-            case CONNECTED:
-                getEntityGuidsForConnectedFetch(entity, context, direction);
-                break;
-
-            case INCREMENTAL:
-                if(context.isHiveDBIncrementalSkipLineage()) {
-                    break;
-                }
-
-            case FULL:
-            default:
-                getEntityGuidsForFullFetch(entity, context);
-        }
-    }
-
-    private void populateEntitesForIncremental(String topLevelEntityGuid, ExportContext context) {
-        if (context.isHiveDBIncrementalSkipLineage() == false || incrementalExportEntityProvider != null) {
-            return;
-        }
-
-        incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, context.scriptEngine);
-        incrementalExportEntityProvider.populate(topLevelEntityGuid, context.changeMarker, context.guidsToProcess);
-    }
-
-    private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
-        if (direction == null || direction == TraversalDirection.UNKNOWN) {
-            getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD);
-        } else {
-            if (isProcessEntity(entity)) {
-                direction = TraversalDirection.OUTWARD;
-            }
-
-            getConnectedEntityGuids(entity, context, direction);
-        }
-    }
-
-    private boolean isProcessEntity(AtlasEntity entity) {
-        String          typeName   = entity.getTypeName();
-        AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
-
-        return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
-    }
-
-    private void getConnectedEntityGuids(AtlasEntity entity, ExportContext context, TraversalDirection... directions) {
-        if(directions == null) {
-            return;
-        }
-
-        for (TraversalDirection direction : directions) {
-            String query = getQueryForTraversalDirection(direction);
-
-            if(LOG.isDebugEnabled()) {
-                debugLog("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query);
-            }
-
-            context.bindings.clear();
-            context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
-
-            List<Map<String, Object>> result = executeGremlinQuery(query, context);
-
-            if (CollectionUtils.isEmpty(result)) {
-                continue;
-            }
-
-            for (Map<String, Object> hashMap : result) {
-                String             guid             = (String) hashMap.get(PROPERTY_GUID);
-                TraversalDirection currentDirection = context.guidDirection.get(guid);
-                boolean            isLineage        = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
-
-                if(context.skipLineage && isLineage) continue;
-
-                if (currentDirection == null) {
-                    context.addToBeProcessed(isLineage, guid, direction);
-
-                } else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) {
-                    // the entity should be reprocessed to get inward entities
-                    context.guidsProcessed.remove(guid);
-                    context.addToBeProcessed(isLineage, guid, direction);
-                }
-            }
-
-            if(LOG.isDebugEnabled()) {
-                debugLog("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size());
-            }
-        }
-    }
-
-    private String getQueryForTraversalDirection(TraversalDirection direction) {
-        switch (direction) {
-            case INWARD:
-                return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE);
-
-            default:
-            case OUTWARD:
-                return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE);
-        }
-    }
-
-    private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) {
-        if(LOG.isDebugEnabled()) {
-            debugLog("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
-        }
-
-        String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
-
-        context.bindings.clear();
-        context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
-
-        List<Map<String, Object>> result = executeGremlinQuery(query, context);
-
-        if (CollectionUtils.isEmpty(result)) {
-            return;
-        }
-
-        for (Map<String, Object> hashMap : result) {
-            String  guid      = (String) hashMap.get(PROPERTY_GUID);
-            boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
-
-            if(context.getSkipLineage() && isLineage) continue;
-
-            if (!context.guidsProcessed.contains(guid)) {
-                context.addToBeProcessed(isLineage, guid, TraversalDirection.BOTH);
-            }
-        }
-
-        if(LOG.isDebugEnabled()) {
-            debugLog("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}",
-                                            entity.getGuid(), result.size(), context.guidsToProcess.size());
-        }
-    }
 
     private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
         if(context.sink.hasEntity(entityWithExtInfo.getEntity().getGuid())) {
@@ -448,15 +300,6 @@ public class ExportService {
         context.reportProgress();
     }
 
-    private List<Map<String, Object>> executeGremlinQuery(String query, ExportContext context) {
-        try {
-            return (List<Map<String, Object>>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false);
-        } catch (ScriptException e) {
-            LOG.error("Script execution failed for query: ", query, e);
-            return null;
-        }
-    }
-
     public enum TraversalDirection {
         UNKNOWN,
         INWARD,
@@ -493,7 +336,7 @@ public class ExportService {
 
         final UniqueList<String>              entityCreationOrder = new UniqueList<>();
         final Set<String>                     guidsProcessed = new HashSet<>();
-        final private UniqueList<String>      guidsToProcess = new UniqueList<>();
+        final UniqueList<String>              guidsToProcess = new UniqueList<>();
         final UniqueList<String>              lineageToProcess = new UniqueList<>();
         final Set<String>                     lineageProcessed = new HashSet<>();
         final Map<String, TraversalDirection> guidDirection  = new HashMap<>();
@@ -505,25 +348,23 @@ public class ExportService {
         final AtlasExportResult               result;
         private final ZipSink                 sink;
 
-        private final ScriptEngine        scriptEngine;
-        private final Map<String, Object> bindings;
-        private final ExportFetchType     fetchType;
-        private final boolean             skipLineage;
-        private final long                changeMarker;
+        final ExportFetchType             fetchType;
+        final boolean                     skipLineage;
+        final long                        changeMarker;
+        boolean isSkipConnectedFetch;
         private final boolean isHiveDBIncremental;
 
         private       int                 progressReportCount = 0;
 
-        ExportContext(AtlasGraph atlasGraph, AtlasExportResult result, ZipSink sink) throws AtlasBaseException {
+        ExportContext(AtlasExportResult result, ZipSink sink) {
             this.result = result;
             this.sink   = sink;
 
-            scriptEngine = atlasGraph.getGremlinScriptEngine();
-            bindings     = new HashMap<>();
             fetchType    = ExportFetchType.from(result.getRequest().getFetchTypeOptionValue());
             skipLineage  = result.getRequest().getSkipLineageOptionValue();
             this.changeMarker = result.getRequest().getChangeTokenFromOptions();
             this.isHiveDBIncremental = checkHiveDBIncrementalSkipLineage(result.getRequest());
+            this.isSkipConnectedFetch = false;
         }
 
         private boolean checkHiveDBIncrementalSkipLineage(AtlasExportRequest request) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java
new file mode 100644
index 0000000..6475016
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+
+public interface ExtractStrategy {
+
+    void connectedFetch(AtlasEntity entity, ExportService.ExportContext context);
+    void fullFetch(AtlasEntity entity, ExportService.ExportContext context);
+    void close();
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
index 3a2a917..256d9de 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
@@ -18,6 +18,8 @@
 
 package org.apache.atlas.repository.impexp;
 
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.util.UniqueList;
 import org.slf4j.Logger;
@@ -28,11 +30,10 @@ import javax.script.ScriptEngine;
 import javax.script.ScriptException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-public class IncrementalExportEntityProvider {
+public class IncrementalExportEntityProvider implements ExtractStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(IncrementalExportEntityProvider.class);
 
     private static final String QUERY_PARAMETER_START_GUID = "startGuid";
@@ -50,9 +51,23 @@ public class IncrementalExportEntityProvider {
     private ScriptEngine scriptEngine;
 
     @Inject
-    public IncrementalExportEntityProvider(AtlasGraph atlasGraph, ScriptEngine scriptEngine) {
+    public IncrementalExportEntityProvider(AtlasGraph atlasGraph) {
         this.atlasGraph = atlasGraph;
-        this.scriptEngine = scriptEngine;
+        try {
+            this.scriptEngine = atlasGraph.getGremlinScriptEngine();
+        } catch (AtlasBaseException e) {
+            LOG.error("Error instantiating script engine.", e);
+        }
+    }
+
+    @Override
+    public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) {
+        populate(entity.getGuid(), context.changeMarker, context.guidsToProcess);
+    }
+
+    @Override
+    public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
+
     }
 
     public void populate(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) {
@@ -63,6 +78,13 @@ public class IncrementalExportEntityProvider {
         }
     }
 
+    @Override
+    public void close() {
+        if (scriptEngine != null) {
+            atlasGraph.releaseGremlinScriptEngine(scriptEngine);
+        }
+    }
+
     private void partial(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) {
         guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_TABLE, timeStamp));
         guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_SD, timeStamp));
@@ -98,7 +120,7 @@ public class IncrementalExportEntityProvider {
             }
 
             for (Map<String, Object> item : result) {
-                guids.add((String) item.get(ExportService.PROPERTY_GUID));
+                guids.add((String) item.get(EntitiesExtractor.PROPERTY_GUID));
             }
 
             return guids;
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java
new file mode 100644
index 0000000..d609071
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class RelationshipAttributesExtractor implements ExtractStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RelationshipAttributesExtractor.class);
+
+    private final AtlasTypeRegistry typeRegistry;
+
+    public RelationshipAttributesExtractor(AtlasTypeRegistry typeRegistry) {
+        this.typeRegistry = typeRegistry;
+    }
+
+    @Override
+    public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> fullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
+        }
+
+        List<AtlasRelatedObjectId> atlasRelatedObjectIdList = getRelatedObjectIds(entity);
+
+        for (AtlasRelatedObjectId ar : atlasRelatedObjectIdList) {
+            boolean isLineage = isLineageType(ar.getTypeName());
+
+            if (context.skipLineage && isLineage) {
+                continue;
+            }
+            context.addToBeProcessed(isLineage, ar.getGuid(), ExportService.TraversalDirection.BOTH);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== fullFetch({}): guidsToProcess {}", entity.getGuid(), context.guidsToProcess.size());
+        }
+    }
+
+    @Override
+    public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> connectedFetch({}): guidsToProcess {} isSkipConnectedFetch :{}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), context.isSkipConnectedFetch);
+        }
+
+        List<AtlasRelatedObjectId> atlasRelatedObjectIdList = getRelatedObjectIds(entity);
+        for (AtlasRelatedObjectId ar : atlasRelatedObjectIdList) {
+            boolean isLineage = isLineageType(ar.getTypeName());
+
+            if (context.skipLineage && isLineage) {
+                continue;
+            }
+            if (!context.isSkipConnectedFetch || isLineage) {
+                context.addToBeProcessed(isLineage, ar.getGuid(), ExportService.TraversalDirection.BOTH);
+            }
+        }
+
+        if(isLineageType(entity.getTypeName())){
+            context.isSkipConnectedFetch = false;
+        }else{
+            context.isSkipConnectedFetch = true;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> connectedFetch({}): guidsToProcess {}, isSkipConnectedFetch :{}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), context.isSkipConnectedFetch);
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+
+    private boolean isLineageType(String typeName) {
+        AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(typeName);
+        return entityDef.getSuperTypes().contains("Process");
+    }
+
+    private List<AtlasRelatedObjectId> getRelatedObjectIds(AtlasEntity entity) {
+        List<AtlasRelatedObjectId> relatedObjectIds = new ArrayList<>();
+
+        for (Object o : entity.getRelationshipAttributes().values()) {
+            if (o instanceof AtlasRelatedObjectId) {
+                relatedObjectIds.add((AtlasRelatedObjectId) o);
+            } else if (o instanceof Collection) {
+                relatedObjectIds.addAll((List) o);
+            }
+        }
+
+        return relatedObjectIds;
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java
new file mode 100644
index 0000000..a5b11be
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.util.AtlasGremlinQueryProvider;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.atlas.repository.impexp.EntitiesExtractor.PROPERTY_GUID;
+
+public class VertexExtractor implements ExtractStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(VertexExtractor.class);
+
+    private static final String PROPERTY_IS_PROCESS = "isProcess";
+    private static final String QUERY_BINDING_START_GUID = "startGuid";
+
+    private final AtlasGremlinQueryProvider gremlinQueryProvider;
+
+    private final Map<String, Object> bindings;
+    private AtlasGraph atlasGraph;
+    private AtlasTypeRegistry typeRegistry;
+    private ScriptEngine scriptEngine;
+
+    public VertexExtractor(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
+        this.atlasGraph = atlasGraph;
+        this.typeRegistry = typeRegistry;
+        try {
+            this.scriptEngine = atlasGraph.getGremlinScriptEngine();
+        } catch (AtlasBaseException e) {
+            LOG.error("Script Engine: Instantiation failed!");
+        }
+        this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
+        this.bindings = new HashMap<>();
+    }
+
+    @Override
+    public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) {
+        if (LOG.isDebugEnabled()){
+            LOG.debug("==> fullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
+        }
+
+        String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
+
+        bindings.clear();
+        bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
+
+        List<Map<String, Object>> result = executeGremlinQuery(query, context);
+
+        if (CollectionUtils.isEmpty(result)) {
+            return;
+        }
+
+        for (Map<String, Object> hashMap : result) {
+            String guid = (String) hashMap.get(PROPERTY_GUID);
+            boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
+
+            if (context.getSkipLineage() && isLineage) continue;
+
+            if (!context.guidsProcessed.contains(guid)) {
+                context.addToBeProcessed(isLineage, guid, ExportService.TraversalDirection.BOTH);
+            }
+        }
+    }
+
+    @Override
+    public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
+        if (LOG.isDebugEnabled()){
+            LOG.debug("==> connectedFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
+        }
+
+        ExportService.TraversalDirection direction = context.guidDirection.get(entity.getGuid());
+
+        if (direction == null || direction == ExportService.TraversalDirection.UNKNOWN) {
+            getConnectedEntityGuids(entity, context, ExportService.TraversalDirection.OUTWARD, ExportService.TraversalDirection.INWARD);
+        } else {
+            if (isProcessEntity(entity)) {
+                direction = ExportService.TraversalDirection.OUTWARD;
+            }
+
+            getConnectedEntityGuids(entity, context, direction);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (scriptEngine != null) {
+            atlasGraph.releaseGremlinScriptEngine(scriptEngine);
+        }
+    }
+
+    private void getConnectedEntityGuids(AtlasEntity entity, ExportService.ExportContext context, ExportService.TraversalDirection... directions) {
+        if (directions == null) {
+            return;
+        }
+
+        for (ExportService.TraversalDirection direction : directions) {
+            String query = getQueryForTraversalDirection(direction);
+
+            bindings.clear();
+            bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
+
+            List<Map<String, Object>> result = executeGremlinQuery(query, context);
+
+            if (CollectionUtils.isEmpty(result)) {
+                continue;
+            }
+
+            for (Map<String, Object> hashMap : result) {
+                String guid = (String) hashMap.get(PROPERTY_GUID);
+                ExportService.TraversalDirection currentDirection = context.guidDirection.get(guid);
+                boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
+
+                if (context.skipLineage && isLineage) continue;
+
+                if (currentDirection == null) {
+                    context.addToBeProcessed(isLineage, guid, direction);
+
+                } else if (currentDirection == ExportService.TraversalDirection.OUTWARD && direction == ExportService.TraversalDirection.INWARD) {
+                    // the entity should be reprocessed to get inward entities
+                    context.guidsProcessed.remove(guid);
+                    context.addToBeProcessed(isLineage, guid, direction);
+                }
+            }
+        }
+    }
+
+    private boolean isProcessEntity(AtlasEntity entity) {
+        String typeName = entity.getTypeName();
+        AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+
+        return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
+    }
+
+    private String getQueryForTraversalDirection(ExportService.TraversalDirection direction) {
+        switch (direction) {
+            case INWARD:
+                return this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE);
+
+            default:
+            case OUTWARD:
+                return this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE);
+        }
+    }
+
+    private List<Map<String, Object>> executeGremlinQuery(String query, ExportService.ExportContext context) {
+        try {
+            return (List<Map<String, Object>>) atlasGraph.executeGremlinScript(scriptEngine, bindings, query, false);
+        } catch (ScriptException e) {
+            LOG.error("Script execution failed for query: ", query, e);
+            return null;
+        }
+    }
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
index 85ed5f9..10a0838 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
@@ -63,7 +63,7 @@ public class IncrementalExportEntityProviderTest extends ExportImportTestBase {
         verifyCreatedEntities(entityStore, entityGuids, 2);
 
         gremlinScriptEngine = atlasGraph.getGremlinScriptEngine();
-        incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, gremlinScriptEngine);
+        incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph);
     }
 
     @AfterClass
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java
new file mode 100644
index 0000000..03d50f1
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.runner.LocalSolrRunner;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.testng.ITestContext;
+import org.testng.annotations.Test;
+import org.testng.annotations.Guice;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.DataProvider;
+
+import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.*;
+import static org.testng.Assert.*;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class RelationshipAttributesExtractorTest {
+
+    private static final String EXPORT_FULL = "full";
+    private static final String EXPORT_CONNECTED = "connected";
+    private static final String QUALIFIED_NAME_DB = "db_test_1@02052019";
+    private static final String QUALIFIED_NAME_TABLE_LINEAGE = "db_test_1.test_tbl_ctas_2@02052019";
+    private static final String QUALIFIED_NAME_TABLE_NON_LINEAGE = "db_test_1.test_tbl_1@02052019";
+
+    private static final String GUID_DB = "f0b72ab4-7452-4e42-ac74-2aee7728cce4";
+    private static final String GUID_TABLE_1 = "4d5adf00-2c9b-4877-ad23-c41fd7319150";
+    private static final String GUID_TABLE_2 = "8d0b834c-61ce-42d8-8f66-6fa51c36bccb";
+    private static final String GUID_TABLE_CTAS_2 = "eaec545b-3ac7-4e1b-a497-bd4a2b6434a2";
+    private static final String GUID_HIVE_PROCESS = "bd3138b2-f29e-4226-b859-de25eaa1c18b";
+
+    @Inject
+    private ImportService importService;
+
+    @Inject
+    AtlasTypeRegistry typeRegistry;
+
+    @Inject
+    private AtlasTypeDefStore typeDefStore;
+
+    @Inject
+    private ExportService exportService;
+
+    @BeforeClass
+    public void setup() throws IOException, AtlasBaseException {
+        loadBaseModel();
+        loadHiveModel();
+    }
+
+    @BeforeTest
+    public void setupTest() {
+        RequestContext.clear();
+        RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
+    }
+
+    @AfterClass
+    public void clear() throws Exception {
+        AtlasGraphProvider.cleanup();
+
+        if (useLocalSolr()) {
+            LocalSolrRunner.stop();
+        }
+    }
+
+    @DataProvider(name = "hiveDb")
+    public static Object[][] getData(ITestContext context) throws IOException, AtlasBaseException {
+        return getZipSource("hive_db_lineage.zip");
+    }
+
+    @Test(dataProvider = "hiveDb")
+    public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException {
+        runImportWithNoParameters(importService, zipSource);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportDBFull() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_FULL, false));
+        verifyDBFull(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportDBFullSkipLineageFull() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_FULL, true));
+        verifyDBFullSkipLineageFull(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableWithLineageFull() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_FULL, false));
+        verifyTableWithLineageFull(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableWithLineageSkipLineageFull() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_FULL, true));
+        verifyTableWithLineageSkipLineageFull(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableWithoutLineageFull() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_FULL, false));
+        verifyTableWithoutLineageFull(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableWithoutLineageSkipLineageFull() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_FULL, true));
+        verifyTableWithoutLineageSkipLineageFull(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportDBConn() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_CONNECTED, false));
+        verifyDBConn(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportDBSkipLineageConn() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_CONNECTED, true));
+        verifyDBSkipLineageConn(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableWithLineageConn() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_CONNECTED, false));
+        verifyTableWithLineageConn(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableWithLineageSkipLineageConn() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_CONNECTED, true));
+        verifyTableWithLineageSkipLineageConn(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableWithoutLineageConn() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_CONNECTED, false));
+        verifyTableWithoutLineageConn(source);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableWithoutLineageSkipLineageConn() throws Exception {
+        ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_CONNECTED, true));
+        verifyTableWithoutLineageSkipLineageConn(source);
+    }
+
+    private void loadHiveModel() throws IOException, AtlasBaseException {
+        loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry);
+    }
+
+    private void loadBaseModel() throws IOException, AtlasBaseException {
+        loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
+    }
+
+    private AtlasExportRequest getExportRequestForHiveDb(String hiveDbName, String fetchType, boolean skipLineage) {
+        AtlasExportRequest request = new AtlasExportRequest();
+
+        List<AtlasObjectId> itemsToExport = new ArrayList<>();
+        itemsToExport.add(new AtlasObjectId("hive_db", "qualifiedName", hiveDbName));
+        request.setItemsToExport(itemsToExport);
+        request.setOptions(getOptionsMap(fetchType, skipLineage));
+
+        return request;
+    }
+
+    private AtlasExportRequest getExportRequestForHiveTable(String hiveTableName, String fetchType, boolean skipLineage) {
+        AtlasExportRequest request = new AtlasExportRequest();
+
+        List<AtlasObjectId> itemsToExport = new ArrayList<>();
+        itemsToExport.add(new AtlasObjectId("hive_table", "qualifiedName", hiveTableName));
+        request.setItemsToExport(itemsToExport);
+        request.setOptions(getOptionsMap(fetchType, skipLineage));
+
+        return request;
+    }
+
+    private Map<String, Object> getOptionsMap(String fetchType, boolean skipLineage){
+        Map<String, Object> optionsMap = new HashMap<>();
+        optionsMap.put("fetchType", fetchType.isEmpty() ? "full" : fetchType );
+        optionsMap.put("skipLineage", skipLineage);
+
+        return optionsMap;
+    }
+
+    private ZipSource runExport(AtlasExportRequest request) throws AtlasBaseException, IOException {
+        final String requestingIP = "1.0.0.0";
+        final String hostName = "localhost";
+        final String userName = "admin";
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ZipSink zipSink = new ZipSink(baos);
+        AtlasExportResult result = exportService.run(zipSink, request, userName, hostName, requestingIP);
+
+        zipSink.close();
+
+        ByteArrayInputStream bis = new ByteArrayInputStream(baos.toByteArray());
+        ZipSource zipSource = new ZipSource(bis);
+        return zipSource;
+    }
+
+    private void verifyDBFull(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 5);
+
+        assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS);
+    }
+
+    private void verifyDBFullSkipLineageFull(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 4);
+
+        assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2);
+    }
+
+    private void verifyTableWithLineageFull(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 5);
+
+        assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS);
+    }
+
+    private void verifyTableWithLineageSkipLineageFull(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 4);
+
+        assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2);
+    }
+
+    private void verifyTableWithoutLineageFull(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 5);
+
+        assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2,GUID_HIVE_PROCESS);
+    }
+
+    private void verifyTableWithoutLineageSkipLineageFull(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 4);
+
+        assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2);
+    }
+
+
+    private void verifyDBConn(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 5);
+
+        assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS);
+    }
+
+    private void verifyDBSkipLineageConn(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 4);
+
+        assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2);
+    }
+
+    private void verifyTableWithLineageConn(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 4);
+
+        assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS);
+    }
+
+    private void verifyTableWithLineageSkipLineageConn(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(),2);
+
+        assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_CTAS_2);;
+    }
+
+    private void verifyTableWithoutLineageConn(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 2);
+
+        assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1);
+    }
+
+    private void verifyTableWithoutLineageSkipLineageConn(ZipSource zipSource) {
+        assertNotNull(zipSource.getCreationOrder());
+        assertEquals(zipSource.getCreationOrder().size(), 2);;
+
+        assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+        verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1);
+    }
+
+    private void verifyExpectedEntities(List<String> fileNames, String... guids){
+        assertEquals(fileNames.size(), guids.length);
+        for (String guid : guids) {
+            assertTrue(fileNames.contains(guid.toLowerCase()));
+        }
+    }
+
+    private List<String> getFileNames(ZipSource zipSource){
+        List<String> ret = new ArrayList<>();
+        assertTrue(zipSource.hasNext());
+
+        while (zipSource.hasNext()){
+            AtlasEntity atlasEntity = zipSource.next();
+            assertNotNull(atlasEntity);
+            ret.add(atlasEntity.getGuid());
+        }
+        return ret;
+    }
+}


[atlas] 02/02: ATLAS-3283 Export-import UTs are getting skipped

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 8cd7496566ddbeb843018a039e46e4de38402f7d
Author: nikhilbonte <ni...@freestoneinfotech.com>
AuthorDate: Mon Jun 17 15:41:51 2019 +0530

    ATLAS-3283 Export-import UTs are getting skipped
    
    Signed-off-by: nixonrodrigues <ni...@apache.org>
---
 .../atlas/repository/impexp/ExportIncrementalTest.java     | 14 ++++++++------
 .../atlas/repository/impexp/ExportSkipLineageTest.java     |  3 ++-
 .../apache/atlas/repository/impexp/ImportServiceTest.java  |  4 ----
 .../impexp/IncrementalExportEntityProviderTest.java        |  2 ++
 .../repository/impexp/ReplicationEntityAttributeTest.java  |  4 ++--
 .../atlas/repository/impexp/ZipFileResourceTestUtils.java  |  6 +++---
 6 files changed, 17 insertions(+), 16 deletions(-)

diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
index 1384029..1d44081 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
@@ -66,11 +66,15 @@ public class ExportIncrementalTest extends ExportImportTestBase {
 
     private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental";
     private final String EXPORT_REQUEST_CONNECTED = "export-connected";
+    private AtlasClassificationType classificationTypeT1;
     private long nextTimestamp;
 
     @BeforeClass
     public void setup() throws IOException, AtlasBaseException {
         basicSetup(typeDefStore, typeRegistry);
+        RequestContext.get().setImportInProgress(true);
+        classificationTypeT1 = createNewClassification();
+
         createEntities(entityStore, ENTITIES_SUB_DIR, new String[] { "db", "table-columns"});
         final String[] entityGuids = {DB_GUID, TABLE_GUID};
         verifyCreatedEntities(entityStore, entityGuids, 2);
@@ -108,8 +112,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
     public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException {
         final int expectedEntityCount = 1;
 
-        AtlasClassificationType ct = createNewClassification();
-        entityStore.addClassifications(TABLE_GUID, ImmutableList.of(ct.createDefaultValue()));
+        entityStore.addClassifications(TABLE_GUID, ImmutableList.of(classificationTypeT1.createDefaultValue()));
 
         AtlasExportRequest request = getIncrementalRequest(nextTimestamp);
         ZipSource source = runExportWithParameters(exportService, request);
@@ -127,7 +130,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
     }
 
     private AtlasClassificationType createNewClassification() {
-        createTypes(typeDefStore, ENTITIES_SUB_DIR,"typesDef-new-classification");
+        createTypes(typeDefStore, ENTITIES_SUB_DIR,"typesdef-new-classification");
         return typeRegistry.getClassificationTypeByName("T1");
     }
 
@@ -151,7 +154,6 @@ public class ExportIncrementalTest extends ExportImportTestBase {
 
         long postUpdateTableEntityTimestamp = tableEntity.getEntity().getUpdateTime().getTime();
         assertEquals(preExportTableEntityTimestamp, postUpdateTableEntityTimestamp);
-        nextTimestamp = updateTimesampForNextIncrementalExport(source);
     }
 
     @Test(dependsOnMethods = "atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn")
@@ -179,7 +181,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
 
             return request;
         } catch (IOException e) {
-            throw new SkipException(String.format("getIncrementalRequest: '%s' could not be laoded.", EXPORT_REQUEST_INCREMENTAL));
+            throw new SkipException(String.format("getIncrementalRequest: '%s' could not be loaded.", EXPORT_REQUEST_INCREMENTAL));
         }
     }
 
@@ -187,7 +189,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
         try {
             return TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_CONNECTED, AtlasExportRequest.class);
         } catch (IOException e) {
-            throw new SkipException(String.format("getIncrementalRequest: '%s' could not be laoded.", EXPORT_REQUEST_CONNECTED));
+            throw new SkipException(String.format("getIncrementalRequest: '%s' could not be loaded.", EXPORT_REQUEST_CONNECTED));
         }
     }
 
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
index 28773d5..18b4a30 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
@@ -72,6 +72,7 @@ public class ExportSkipLineageTest extends ExportImportTestBase {
     public void setup() throws IOException, AtlasBaseException {
         loadBaseModel(typeDefStore, typeRegistry);
         loadHiveModel(typeDefStore, typeRegistry);
+        RequestContext.get().setImportInProgress(true);
 
         entityStore = new AtlasEntityStoreV2(deleteDelegate, typeRegistry, mockChangeNotifier, graphMapper);
         createEntities(entityStore, ENTITIES_SUB_DIR, new String[]{"db", "table-columns", "table-view", "table-table-lineage"});
@@ -112,7 +113,7 @@ public class ExportSkipLineageTest extends ExportImportTestBase {
 
             return request;
         } catch (IOException e) {
-            throw new SkipException(String.format("getRequest: '%s' could not be laoded.", filename));
+            throw new SkipException(String.format("getRequest: '%s' could not be loaded.", filename));
         }
     }
 }
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 0224c85..7044243 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -458,10 +458,6 @@ public class ImportServiceTest extends ExportImportTestBase {
         assertEquals(importTransforms.getTransforms().get("hive_table").get("qualifiedName").size(), 2);
     }
 
-    @Test(dataProvider = "empty-zip", expectedExceptions = AtlasBaseException.class)
-    public void importEmptyZip(ZipSource zipSource) {
-
-    }
 
     @Test(expectedExceptions = AtlasBaseException.class)
     public void importEmptyZip() throws IOException, AtlasBaseException {
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
index 10a0838..42b6353 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.atlas.repository.impexp;
 
+import org.apache.atlas.RequestContext;
 import org.apache.atlas.TestModules;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
@@ -58,6 +59,7 @@ public class IncrementalExportEntityProviderTest extends ExportImportTestBase {
     @BeforeClass
     public void setup() throws IOException, AtlasBaseException {
         basicSetup(typeDefStore, typeRegistry);
+        RequestContext.get().setImportInProgress(true);
         createEntities(entityStore, ENTITIES_SUB_DIR, new String[] { "db", "table-columns"});
         final String[] entityGuids = {DB_GUID, TABLE_GUID};
         verifyCreatedEntities(entityStore, entityGuids, 2);
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
index 1eccdbf..829390b 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
@@ -193,7 +193,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
             REPLICATED_TO_CLUSTER_NAME = (String) request.getOptions().get(OPTION_KEY_REPLICATED_TO);
             return request;
         } catch (IOException e) {
-            throw new SkipException(String.format("getExportRequestWithReplicationOption: '%s' could not be laoded.", EXPORT_REQUEST_FILE));
+            throw new SkipException(String.format("getExportRequestWithReplicationOption: '%s' could not be loaded.", EXPORT_REQUEST_FILE));
         }
     }
 
@@ -203,7 +203,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
             REPLICATED_FROM_CLUSTER_NAME = request.getOptions().get(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
             return request;
         } catch (IOException e) {
-            throw new SkipException(String.format("getExportRequestWithReplicationOption: '%s' could not be laoded.", IMPORT_REQUEST_FILE));
+            throw new SkipException(String.format("getExportRequestWithReplicationOption: '%s' could not be loaded.", IMPORT_REQUEST_FILE));
         }
     }
 }
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
index 5e287d8..76b423e 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
@@ -192,7 +192,7 @@ public class ZipFileResourceTestUtils {
         try {
             return TestResourceFileUtils.readObjectFromJson(entitiesSubDir, fileName, AtlasTypesDef.class);
         } catch (IOException e) {
-            throw new SkipException(String.format("createTypes: '%s' could not be laoded.", fileName));
+            throw new SkipException(String.format("createTypes: '%s' could not be loaded.", fileName));
         }
     }
 
@@ -201,7 +201,7 @@ public class ZipFileResourceTestUtils {
         try {
             return TestResourceFileUtils.readObjectFromJson(entitiesSubDir, fileName, AtlasEntity.AtlasEntityWithExtInfo.class);
         } catch (IOException e) {
-            throw new SkipException(String.format("createTypes: '%s' could not be laoded.", fileName));
+            throw new SkipException(String.format("createTypes: '%s' could not be loaded.", fileName));
         }
     }
 
@@ -220,7 +220,7 @@ public class ZipFileResourceTestUtils {
             assertTrue((response.getCreatedEntities() != null && response.getCreatedEntities().size() > 0) ||
                     (response.getMutatedEntities() != null && response.getMutatedEntities().size() > 0));
         } catch (AtlasBaseException e) {
-            throw new SkipException(String.format("createAtlasEntity: could not load '%s'.", atlasEntity.getEntity().getTypeName()));
+            throw new SkipException(String.format("createAtlasEntity: could not loaded '%s'.", atlasEntity.getEntity().getTypeName()));
         }
     }