You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2019/06/20 10:30:11 UTC
[atlas] 01/02: ATLAS-3256 Modify export API to process with
relationshipAttributes
This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 08b76391cfb8fd231218e5788261e5f2310703cc
Author: nikhilbonte <ni...@freestoneinfotech.com>
AuthorDate: Fri Jun 14 15:21:06 2019 +0530
ATLAS-3256 Modify export API to process with relationshipAttributes
Signed-off-by: nixonrodrigues <ni...@apache.org>
---
.../atlas/repository/impexp/EntitiesExtractor.java | 81 +++++
.../atlas/repository/impexp/ExportService.java | 223 ++-----------
.../atlas/repository/impexp/ExtractStrategy.java | 28 ++
.../impexp/IncrementalExportEntityProvider.java | 32 +-
.../impexp/RelationshipAttributesExtractor.java | 115 +++++++
.../atlas/repository/impexp/VertexExtractor.java | 183 +++++++++++
.../IncrementalExportEntityProviderTest.java | 2 +-
.../RelationshipAttributesExtractorTest.java | 354 +++++++++++++++++++++
8 files changed, 821 insertions(+), 197 deletions(-)
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
new file mode 100644
index 0000000..15cb111
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasTypeRegistry;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class EntitiesExtractor {
+ static final String PROPERTY_GUID = "__guid";
+ private static final String VERTEX_BASED_EXTRACT = "default";
+ private static final String INCREMENTAL_EXTRACT = "incremental";
+ private static final String RELATION_BASED_EXTRACT = "relationship";
+
+ private Map<String, ExtractStrategy> extractors = new HashMap<>();
+ private ExtractStrategy extractor;
+
+ public EntitiesExtractor(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
+ extractors.put(VERTEX_BASED_EXTRACT, new VertexExtractor(atlasGraph, typeRegistry));
+ extractors.put(INCREMENTAL_EXTRACT, new IncrementalExportEntityProvider(atlasGraph));
+ extractors.put(RELATION_BASED_EXTRACT, new RelationshipAttributesExtractor(typeRegistry));
+ }
+
+ public void get(AtlasEntity entity, ExportService.ExportContext context) {
+ if(extractor == null) {
+ extractor = extractors.get(VERTEX_BASED_EXTRACT);
+ }
+
+ switch (context.fetchType) {
+ case CONNECTED:
+ extractor.connectedFetch(entity, context);
+ break;
+
+ case INCREMENTAL:
+ if (context.isHiveDBIncrementalSkipLineage()) {
+ extractors.get(INCREMENTAL_EXTRACT).fullFetch(entity, context);
+ break;
+ }
+
+ case FULL:
+ default:
+ extractor.fullFetch(entity, context);
+ }
+ }
+
+ public void setExtractor(AtlasEntityDef atlasEntityDef) {
+ extractor = extractUsing(atlasEntityDef);
+ }
+
+ public void close() {
+ for (ExtractStrategy es : extractors.values()) {
+ es.close();
+ }
+ }
+
+ private ExtractStrategy extractUsing(AtlasEntityDef atlasEntityDef) {
+ return (atlasEntityDef == null || atlasEntityDef.getRelationshipAttributeDefs().size() == 0)
+ ? extractors.get(VERTEX_BASED_EXTRACT)
+ : extractors.get(RELATION_BASED_EXTRACT);
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 11289ea..5055607 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -25,7 +25,6 @@ import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
@@ -35,19 +34,13 @@ import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.util.UniqueList;
-import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
-import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery;
-import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
-import javax.script.ScriptEngine;
-import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -63,30 +56,23 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREM
public class ExportService {
private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
- public static final String PROPERTY_GUID = "__guid";
- private static final String PROPERTY_IS_PROCESS = "isProcess";
-
private final AtlasTypeRegistry typeRegistry;
- private final String QUERY_BINDING_START_GUID = "startGuid";
private final StartEntityFetchByExportRequest startEntityFetchByExportRequest;
+ private final EntitiesExtractor entitiesExtractor;
private AuditsWriter auditsWriter;
- private final AtlasGraph atlasGraph;
private final EntityGraphRetriever entityGraphRetriever;
- private final AtlasGremlinQueryProvider gremlinQueryProvider;
private ExportTypeProcessor exportTypeProcessor;
private final HdfsPathEntityCreator hdfsPathEntityCreator;
- private IncrementalExportEntityProvider incrementalExportEntityProvider;
@Inject
public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph,
AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) {
this.typeRegistry = typeRegistry;
this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
- this.atlasGraph = atlasGraph;
- this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
this.auditsWriter = auditsWriter;
this.hdfsPathEntityCreator = hdfsPathEntityCreator;
this.startEntityFetchByExportRequest = new StartEntityFetchByExportRequest(atlasGraph, typeRegistry, AtlasGremlinQueryProvider.INSTANCE);
+ this.entitiesExtractor = new EntitiesExtractor(atlasGraph, typeRegistry);
}
public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
@@ -95,7 +81,7 @@ public class ExportService {
AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP,
hostName, startTime, getCurrentChangeMarker());
- ExportContext context = new ExportContext(atlasGraph, result, exportSink);
+ ExportContext context = new ExportContext(result, exportSink);
exportTypeProcessor = new ExportTypeProcessor(typeRegistry);
try {
@@ -109,12 +95,12 @@ public class ExportService {
} catch(Exception ex) {
LOG.error("Operation failed: ", ex);
} finally {
- atlasGraph.releaseGremlinScriptEngine(context.scriptEngine);
+ entitiesExtractor.close();
+
LOG.info("<== export(user={}, from={}): status {}: changeMarker: {}",
userName, requestingIP, context.result.getOperationStatus(), context.result.getChangeMarker());
context.clear();
result.clear();
- incrementalExportEntityProvider = null;
}
return context.result;
@@ -203,7 +189,9 @@ public class ExportService {
}
private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) {
- debugLog("==> processObjectId({})", item);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> processObjectId({})", item);
+ }
try {
List<String> entityGuids = getStartingEntity(item, context);
@@ -211,9 +199,10 @@ public class ExportService {
return AtlasExportResult.OperationStatus.FAIL;
}
+ entitiesExtractor.setExtractor(typeRegistry.getEntityDefByName(item.getTypeName()));
+
for (String guid : entityGuids) {
processEntityGuid(guid, context);
- populateEntitesForIncremental(guid, context);
}
while (!context.guidsToProcess.isEmpty()) {
@@ -227,13 +216,16 @@ public class ExportService {
context.lineageProcessed.addAll(context.lineageToProcess.getList());
context.lineageToProcess.clear();
}
+ context.isSkipConnectedFetch = false;
}
} catch (AtlasBaseException excp) {
LOG.error("Fetching entity failed for: {}", item, excp);
return AtlasExportResult.OperationStatus.FAIL;
}
- debugLog("<== processObjectId({})", item);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== processObjectId({})", item);
+ }
return AtlasExportResult.OperationStatus.SUCCESS;
}
@@ -245,181 +237,41 @@ public class ExportService {
return startEntityFetchByExportRequest.get(context.result.getRequest(), item);
}
- private void debugLog(String s, Object... params) {
- if (!LOG.isDebugEnabled()) {
- return;
- }
-
- LOG.debug(s, params);
- }
-
private void processEntityGuid(String guid, ExportContext context) throws AtlasBaseException {
- debugLog("==> processEntityGuid({})", guid);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> processEntityGuid({})", guid);
+ }
if (context.guidsProcessed.contains(guid)) {
return;
}
- TraversalDirection direction = context.guidDirection.get(guid);
AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
- processEntity(entityWithExtInfo, context, direction);
-
- debugLog("<== processEntityGuid({})", guid);
+ processEntity(entityWithExtInfo, context);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== processEntityGuid({})", guid);
+ }
}
- public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo,
- ExportContext context,
- TraversalDirection direction) throws AtlasBaseException {
-
+ public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
addEntity(entityWithExtInfo, context);
exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
- getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction);
+ entitiesExtractor.get(entityWithExtInfo.getEntity(), context);
if (entityWithExtInfo.getReferredEntities() != null) {
for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
exportTypeProcessor.addTypes(e, context);
- getConntedEntitiesBasedOnOption(e, context, direction);
+ entitiesExtractor.get(e, context);
}
context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet());
}
}
- private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
- switch (context.fetchType) {
- case CONNECTED:
- getEntityGuidsForConnectedFetch(entity, context, direction);
- break;
-
- case INCREMENTAL:
- if(context.isHiveDBIncrementalSkipLineage()) {
- break;
- }
-
- case FULL:
- default:
- getEntityGuidsForFullFetch(entity, context);
- }
- }
-
- private void populateEntitesForIncremental(String topLevelEntityGuid, ExportContext context) {
- if (context.isHiveDBIncrementalSkipLineage() == false || incrementalExportEntityProvider != null) {
- return;
- }
-
- incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, context.scriptEngine);
- incrementalExportEntityProvider.populate(topLevelEntityGuid, context.changeMarker, context.guidsToProcess);
- }
-
- private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
- if (direction == null || direction == TraversalDirection.UNKNOWN) {
- getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD);
- } else {
- if (isProcessEntity(entity)) {
- direction = TraversalDirection.OUTWARD;
- }
-
- getConnectedEntityGuids(entity, context, direction);
- }
- }
-
- private boolean isProcessEntity(AtlasEntity entity) {
- String typeName = entity.getTypeName();
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
-
- return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
- }
-
- private void getConnectedEntityGuids(AtlasEntity entity, ExportContext context, TraversalDirection... directions) {
- if(directions == null) {
- return;
- }
-
- for (TraversalDirection direction : directions) {
- String query = getQueryForTraversalDirection(direction);
-
- if(LOG.isDebugEnabled()) {
- debugLog("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query);
- }
-
- context.bindings.clear();
- context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
-
- List<Map<String, Object>> result = executeGremlinQuery(query, context);
-
- if (CollectionUtils.isEmpty(result)) {
- continue;
- }
-
- for (Map<String, Object> hashMap : result) {
- String guid = (String) hashMap.get(PROPERTY_GUID);
- TraversalDirection currentDirection = context.guidDirection.get(guid);
- boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
-
- if(context.skipLineage && isLineage) continue;
-
- if (currentDirection == null) {
- context.addToBeProcessed(isLineage, guid, direction);
-
- } else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) {
- // the entity should be reprocessed to get inward entities
- context.guidsProcessed.remove(guid);
- context.addToBeProcessed(isLineage, guid, direction);
- }
- }
-
- if(LOG.isDebugEnabled()) {
- debugLog("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size());
- }
- }
- }
-
- private String getQueryForTraversalDirection(TraversalDirection direction) {
- switch (direction) {
- case INWARD:
- return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE);
-
- default:
- case OUTWARD:
- return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE);
- }
- }
-
- private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) {
- if(LOG.isDebugEnabled()) {
- debugLog("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
- }
-
- String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
-
- context.bindings.clear();
- context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
-
- List<Map<String, Object>> result = executeGremlinQuery(query, context);
-
- if (CollectionUtils.isEmpty(result)) {
- return;
- }
-
- for (Map<String, Object> hashMap : result) {
- String guid = (String) hashMap.get(PROPERTY_GUID);
- boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
-
- if(context.getSkipLineage() && isLineage) continue;
-
- if (!context.guidsProcessed.contains(guid)) {
- context.addToBeProcessed(isLineage, guid, TraversalDirection.BOTH);
- }
- }
-
- if(LOG.isDebugEnabled()) {
- debugLog("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}",
- entity.getGuid(), result.size(), context.guidsToProcess.size());
- }
- }
private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
if(context.sink.hasEntity(entityWithExtInfo.getEntity().getGuid())) {
@@ -448,15 +300,6 @@ public class ExportService {
context.reportProgress();
}
- private List<Map<String, Object>> executeGremlinQuery(String query, ExportContext context) {
- try {
- return (List<Map<String, Object>>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false);
- } catch (ScriptException e) {
- LOG.error("Script execution failed for query: ", query, e);
- return null;
- }
- }
-
public enum TraversalDirection {
UNKNOWN,
INWARD,
@@ -493,7 +336,7 @@ public class ExportService {
final UniqueList<String> entityCreationOrder = new UniqueList<>();
final Set<String> guidsProcessed = new HashSet<>();
- final private UniqueList<String> guidsToProcess = new UniqueList<>();
+ final UniqueList<String> guidsToProcess = new UniqueList<>();
final UniqueList<String> lineageToProcess = new UniqueList<>();
final Set<String> lineageProcessed = new HashSet<>();
final Map<String, TraversalDirection> guidDirection = new HashMap<>();
@@ -505,25 +348,23 @@ public class ExportService {
final AtlasExportResult result;
private final ZipSink sink;
- private final ScriptEngine scriptEngine;
- private final Map<String, Object> bindings;
- private final ExportFetchType fetchType;
- private final boolean skipLineage;
- private final long changeMarker;
+ final ExportFetchType fetchType;
+ final boolean skipLineage;
+ final long changeMarker;
+ boolean isSkipConnectedFetch;
private final boolean isHiveDBIncremental;
private int progressReportCount = 0;
- ExportContext(AtlasGraph atlasGraph, AtlasExportResult result, ZipSink sink) throws AtlasBaseException {
+ ExportContext(AtlasExportResult result, ZipSink sink) {
this.result = result;
this.sink = sink;
- scriptEngine = atlasGraph.getGremlinScriptEngine();
- bindings = new HashMap<>();
fetchType = ExportFetchType.from(result.getRequest().getFetchTypeOptionValue());
skipLineage = result.getRequest().getSkipLineageOptionValue();
this.changeMarker = result.getRequest().getChangeTokenFromOptions();
this.isHiveDBIncremental = checkHiveDBIncrementalSkipLineage(result.getRequest());
+ this.isSkipConnectedFetch = false;
}
private boolean checkHiveDBIncrementalSkipLineage(AtlasExportRequest request) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java
new file mode 100644
index 0000000..6475016
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+
+public interface ExtractStrategy {
+
+ void connectedFetch(AtlasEntity entity, ExportService.ExportContext context);
+ void fullFetch(AtlasEntity entity, ExportService.ExportContext context);
+ void close();
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
index 3a2a917..256d9de 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
@@ -18,6 +18,8 @@
package org.apache.atlas.repository.impexp;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.util.UniqueList;
import org.slf4j.Logger;
@@ -28,11 +30,10 @@ import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-public class IncrementalExportEntityProvider {
+public class IncrementalExportEntityProvider implements ExtractStrategy {
private static final Logger LOG = LoggerFactory.getLogger(IncrementalExportEntityProvider.class);
private static final String QUERY_PARAMETER_START_GUID = "startGuid";
@@ -50,9 +51,23 @@ public class IncrementalExportEntityProvider {
private ScriptEngine scriptEngine;
@Inject
- public IncrementalExportEntityProvider(AtlasGraph atlasGraph, ScriptEngine scriptEngine) {
+ public IncrementalExportEntityProvider(AtlasGraph atlasGraph) {
this.atlasGraph = atlasGraph;
- this.scriptEngine = scriptEngine;
+ try {
+ this.scriptEngine = atlasGraph.getGremlinScriptEngine();
+ } catch (AtlasBaseException e) {
+ LOG.error("Error instantiating script engine.", e);
+ }
+ }
+
+ @Override
+ public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) {
+ populate(entity.getGuid(), context.changeMarker, context.guidsToProcess);
+ }
+
+ @Override
+ public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
+
}
public void populate(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) {
@@ -63,6 +78,13 @@ public class IncrementalExportEntityProvider {
}
}
+ @Override
+ public void close() {
+ if (scriptEngine != null) {
+ atlasGraph.releaseGremlinScriptEngine(scriptEngine);
+ }
+ }
+
private void partial(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) {
guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_TABLE, timeStamp));
guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_SD, timeStamp));
@@ -98,7 +120,7 @@ public class IncrementalExportEntityProvider {
}
for (Map<String, Object> item : result) {
- guids.add((String) item.get(ExportService.PROPERTY_GUID));
+ guids.add((String) item.get(EntitiesExtractor.PROPERTY_GUID));
}
return guids;
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java
new file mode 100644
index 0000000..d609071
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class RelationshipAttributesExtractor implements ExtractStrategy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RelationshipAttributesExtractor.class);
+
+ private final AtlasTypeRegistry typeRegistry;
+
+ public RelationshipAttributesExtractor(AtlasTypeRegistry typeRegistry) {
+ this.typeRegistry = typeRegistry;
+ }
+
+ @Override
+ public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> fullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
+ }
+
+ List<AtlasRelatedObjectId> atlasRelatedObjectIdList = getRelatedObjectIds(entity);
+
+ for (AtlasRelatedObjectId ar : atlasRelatedObjectIdList) {
+ boolean isLineage = isLineageType(ar.getTypeName());
+
+ if (context.skipLineage && isLineage) {
+ continue;
+ }
+ context.addToBeProcessed(isLineage, ar.getGuid(), ExportService.TraversalDirection.BOTH);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== fullFetch({}): guidsToProcess {}", entity.getGuid(), context.guidsToProcess.size());
+ }
+ }
+
+ @Override
+ public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> connectedFetch({}): guidsToProcess {} isSkipConnectedFetch :{}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), context.isSkipConnectedFetch);
+ }
+
+ List<AtlasRelatedObjectId> atlasRelatedObjectIdList = getRelatedObjectIds(entity);
+ for (AtlasRelatedObjectId ar : atlasRelatedObjectIdList) {
+ boolean isLineage = isLineageType(ar.getTypeName());
+
+ if (context.skipLineage && isLineage) {
+ continue;
+ }
+ if (!context.isSkipConnectedFetch || isLineage) {
+ context.addToBeProcessed(isLineage, ar.getGuid(), ExportService.TraversalDirection.BOTH);
+ }
+ }
+
+ if(isLineageType(entity.getTypeName())){
+ context.isSkipConnectedFetch = false;
+ }else{
+ context.isSkipConnectedFetch = true;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> connectedFetch({}): guidsToProcess {}, isSkipConnectedFetch :{}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), context.isSkipConnectedFetch);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ private boolean isLineageType(String typeName) {
+ AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(typeName);
+ return entityDef.getSuperTypes().contains("Process");
+ }
+
+ private List<AtlasRelatedObjectId> getRelatedObjectIds(AtlasEntity entity) {
+ List<AtlasRelatedObjectId> relatedObjectIds = new ArrayList<>();
+
+ for (Object o : entity.getRelationshipAttributes().values()) {
+ if (o instanceof AtlasRelatedObjectId) {
+ relatedObjectIds.add((AtlasRelatedObjectId) o);
+ } else if (o instanceof Collection) {
+ relatedObjectIds.addAll((List) o);
+ }
+ }
+
+ return relatedObjectIds;
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java
new file mode 100644
index 0000000..a5b11be
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.util.AtlasGremlinQueryProvider;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.atlas.repository.impexp.EntitiesExtractor.PROPERTY_GUID;
+
+public class VertexExtractor implements ExtractStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(VertexExtractor.class);
+
+ private static final String PROPERTY_IS_PROCESS = "isProcess";
+ private static final String QUERY_BINDING_START_GUID = "startGuid";
+
+ private final AtlasGremlinQueryProvider gremlinQueryProvider;
+
+ private final Map<String, Object> bindings;
+ private AtlasGraph atlasGraph;
+ private AtlasTypeRegistry typeRegistry;
+ private ScriptEngine scriptEngine;
+
+ public VertexExtractor(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
+ this.atlasGraph = atlasGraph;
+ this.typeRegistry = typeRegistry;
+ try {
+ this.scriptEngine = atlasGraph.getGremlinScriptEngine();
+ } catch (AtlasBaseException e) {
+ LOG.error("Script Engine: Instantiation failed!");
+ }
+ this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
+ this.bindings = new HashMap<>();
+ }
+
+ @Override
+ public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) {
+ if (LOG.isDebugEnabled()){
+ LOG.debug("==> fullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
+ }
+
+ String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
+
+ bindings.clear();
+ bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
+
+ List<Map<String, Object>> result = executeGremlinQuery(query, context);
+
+ if (CollectionUtils.isEmpty(result)) {
+ return;
+ }
+
+ for (Map<String, Object> hashMap : result) {
+ String guid = (String) hashMap.get(PROPERTY_GUID);
+ boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
+
+ if (context.getSkipLineage() && isLineage) continue;
+
+ if (!context.guidsProcessed.contains(guid)) {
+ context.addToBeProcessed(isLineage, guid, ExportService.TraversalDirection.BOTH);
+ }
+ }
+ }
+
+ @Override
+ public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
+ if (LOG.isDebugEnabled()){
+ LOG.debug("==> connectedFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
+ }
+
+ ExportService.TraversalDirection direction = context.guidDirection.get(entity.getGuid());
+
+ if (direction == null || direction == ExportService.TraversalDirection.UNKNOWN) {
+ getConnectedEntityGuids(entity, context, ExportService.TraversalDirection.OUTWARD, ExportService.TraversalDirection.INWARD);
+ } else {
+ if (isProcessEntity(entity)) {
+ direction = ExportService.TraversalDirection.OUTWARD;
+ }
+
+ getConnectedEntityGuids(entity, context, direction);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (scriptEngine != null) {
+ atlasGraph.releaseGremlinScriptEngine(scriptEngine);
+ }
+ }
+
+ private void getConnectedEntityGuids(AtlasEntity entity, ExportService.ExportContext context, ExportService.TraversalDirection... directions) {
+ if (directions == null) {
+ return;
+ }
+
+ for (ExportService.TraversalDirection direction : directions) {
+ String query = getQueryForTraversalDirection(direction);
+
+ bindings.clear();
+ bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
+
+ List<Map<String, Object>> result = executeGremlinQuery(query, context);
+
+ if (CollectionUtils.isEmpty(result)) {
+ continue;
+ }
+
+ for (Map<String, Object> hashMap : result) {
+ String guid = (String) hashMap.get(PROPERTY_GUID);
+ ExportService.TraversalDirection currentDirection = context.guidDirection.get(guid);
+ boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
+
+ if (context.skipLineage && isLineage) continue;
+
+ if (currentDirection == null) {
+ context.addToBeProcessed(isLineage, guid, direction);
+
+ } else if (currentDirection == ExportService.TraversalDirection.OUTWARD && direction == ExportService.TraversalDirection.INWARD) {
+ // the entity should be reprocessed to get inward entities
+ context.guidsProcessed.remove(guid);
+ context.addToBeProcessed(isLineage, guid, direction);
+ }
+ }
+ }
+ }
+
+ private boolean isProcessEntity(AtlasEntity entity) {
+ String typeName = entity.getTypeName();
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+
+ return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
+ }
+
+ private String getQueryForTraversalDirection(ExportService.TraversalDirection direction) {
+ switch (direction) {
+ case INWARD:
+ return this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE);
+
+ default:
+ case OUTWARD:
+ return this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE);
+ }
+ }
+
+ private List<Map<String, Object>> executeGremlinQuery(String query, ExportService.ExportContext context) {
+ try {
+ return (List<Map<String, Object>>) atlasGraph.executeGremlinScript(scriptEngine, bindings, query, false);
+ } catch (ScriptException e) {
+ LOG.error("Script execution failed for query: ", query, e);
+ return null;
+ }
+ }
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
index 85ed5f9..10a0838 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
@@ -63,7 +63,7 @@ public class IncrementalExportEntityProviderTest extends ExportImportTestBase {
verifyCreatedEntities(entityStore, entityGuids, 2);
gremlinScriptEngine = atlasGraph.getGremlinScriptEngine();
- incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, gremlinScriptEngine);
+ incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph);
}
@AfterClass
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java
new file mode 100644
index 0000000..03d50f1
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.runner.LocalSolrRunner;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.testng.ITestContext;
+import org.testng.annotations.Test;
+import org.testng.annotations.Guice;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.DataProvider;
+
+import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.*;
+import static org.testng.Assert.*;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class RelationshipAttributesExtractorTest {
+
+ private static final String EXPORT_FULL = "full";
+ private static final String EXPORT_CONNECTED = "connected";
+ private static final String QUALIFIED_NAME_DB = "db_test_1@02052019";
+ private static final String QUALIFIED_NAME_TABLE_LINEAGE = "db_test_1.test_tbl_ctas_2@02052019";
+ private static final String QUALIFIED_NAME_TABLE_NON_LINEAGE = "db_test_1.test_tbl_1@02052019";
+
+ private static final String GUID_DB = "f0b72ab4-7452-4e42-ac74-2aee7728cce4";
+ private static final String GUID_TABLE_1 = "4d5adf00-2c9b-4877-ad23-c41fd7319150";
+ private static final String GUID_TABLE_2 = "8d0b834c-61ce-42d8-8f66-6fa51c36bccb";
+ private static final String GUID_TABLE_CTAS_2 = "eaec545b-3ac7-4e1b-a497-bd4a2b6434a2";
+ private static final String GUID_HIVE_PROCESS = "bd3138b2-f29e-4226-b859-de25eaa1c18b";
+
+ @Inject
+ private ImportService importService;
+
+ @Inject
+ AtlasTypeRegistry typeRegistry;
+
+ @Inject
+ private AtlasTypeDefStore typeDefStore;
+
+ @Inject
+ private ExportService exportService;
+
+ @BeforeClass
+ public void setup() throws IOException, AtlasBaseException {
+ loadBaseModel();
+ loadHiveModel();
+ }
+
+ @BeforeTest
+ public void setupTest() {
+ RequestContext.clear();
+ RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
+ }
+
+ @AfterClass
+ public void clear() throws Exception {
+ AtlasGraphProvider.cleanup();
+
+ if (useLocalSolr()) {
+ LocalSolrRunner.stop();
+ }
+ }
+
+ @DataProvider(name = "hiveDb")
+ public static Object[][] getData(ITestContext context) throws IOException, AtlasBaseException {
+ return getZipSource("hive_db_lineage.zip");
+ }
+
+ @Test(dataProvider = "hiveDb")
+ public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException {
+ runImportWithNoParameters(importService, zipSource);
+ }
+
+ @Test(dependsOnMethods = "importHiveDb")
+ public void exportDBFull() throws Exception {
+ ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_FULL, false));
+ verifyDBFull(source);
+ }
+
+ @Test(dependsOnMethods = "importHiveDb")
+ public void exportDBFullSkipLineageFull() throws Exception {
+ ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_FULL, true));
+ verifyDBFullSkipLineageFull(source);
+ }
+
+ @Test(dependsOnMethods = "importHiveDb")
+ public void exportTableWithLineageFull() throws Exception {
+ ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_FULL, false));
+ verifyTableWithLineageFull(source);
+ }
+
+ @Test(dependsOnMethods = "importHiveDb")
+ public void exportTableWithLineageSkipLineageFull() throws Exception {
+ ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_FULL, true));
+ verifyTableWithLineageSkipLineageFull(source);
+ }
+
+ @Test(dependsOnMethods = "importHiveDb")
+ public void exportTableWithoutLineageFull() throws Exception {
+ ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_FULL, false));
+ verifyTableWithoutLineageFull(source);
+ }
+
+ @Test(dependsOnMethods = "importHiveDb")
+ public void exportTableWithoutLineageSkipLineageFull() throws Exception {
+ ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_FULL, true));
+ verifyTableWithoutLineageSkipLineageFull(source);
+ }
+
+ @Test(dependsOnMethods = "importHiveDb")
+ public void exportDBConn() throws Exception {
+ ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_CONNECTED, false));
+ verifyDBConn(source);
+ }
+
+ @Test(dependsOnMethods = "importHiveDb")
+ public void exportDBSkipLineageConn() throws Exception {
+ ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_CONNECTED, true));
+ verifyDBSkipLineageConn(source);
+ }
+
+ @Test(dependsOnMethods = "importHiveDb")
+ public void exportTableWithLineageConn() throws Exception {
+ ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_CONNECTED, false));
+ verifyTableWithLineageConn(source);
+ }
+
+ @Test(dependsOnMethods = "importHiveDb")
+ public void exportTableWithLineageSkipLineageConn() throws Exception {
+ ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_CONNECTED, true));
+ verifyTableWithLineageSkipLineageConn(source);
+ }
+
+ @Test(dependsOnMethods = "importHiveDb")
+ public void exportTableWithoutLineageConn() throws Exception {
+ ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_CONNECTED, false));
+ verifyTableWithoutLineageConn(source);
+ }
+
+ @Test(dependsOnMethods = "importHiveDb")
+ public void exportTableWithoutLineageSkipLineageConn() throws Exception {
+ ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_CONNECTED, true));
+ verifyTableWithoutLineageSkipLineageConn(source);
+ }
+
+ private void loadHiveModel() throws IOException, AtlasBaseException {
+ loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry);
+ }
+
+ private void loadBaseModel() throws IOException, AtlasBaseException {
+ loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
+ }
+
+ private AtlasExportRequest getExportRequestForHiveDb(String hiveDbName, String fetchType, boolean skipLineage) {
+ AtlasExportRequest request = new AtlasExportRequest();
+
+ List<AtlasObjectId> itemsToExport = new ArrayList<>();
+ itemsToExport.add(new AtlasObjectId("hive_db", "qualifiedName", hiveDbName));
+ request.setItemsToExport(itemsToExport);
+ request.setOptions(getOptionsMap(fetchType, skipLineage));
+
+ return request;
+ }
+
+ private AtlasExportRequest getExportRequestForHiveTable(String hiveTableName, String fetchType, boolean skipLineage) {
+ AtlasExportRequest request = new AtlasExportRequest();
+
+ List<AtlasObjectId> itemsToExport = new ArrayList<>();
+ itemsToExport.add(new AtlasObjectId("hive_table", "qualifiedName", hiveTableName));
+ request.setItemsToExport(itemsToExport);
+ request.setOptions(getOptionsMap(fetchType, skipLineage));
+
+ return request;
+ }
+
+ private Map<String, Object> getOptionsMap(String fetchType, boolean skipLineage){
+ Map<String, Object> optionsMap = new HashMap<>();
+ optionsMap.put("fetchType", fetchType.isEmpty() ? "full" : fetchType );
+ optionsMap.put("skipLineage", skipLineage);
+
+ return optionsMap;
+ }
+
+ private ZipSource runExport(AtlasExportRequest request) throws AtlasBaseException, IOException {
+ final String requestingIP = "1.0.0.0";
+ final String hostName = "localhost";
+ final String userName = "admin";
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ZipSink zipSink = new ZipSink(baos);
+ AtlasExportResult result = exportService.run(zipSink, request, userName, hostName, requestingIP);
+
+ zipSink.close();
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(baos.toByteArray());
+ ZipSource zipSource = new ZipSource(bis);
+ return zipSource;
+ }
+
+ private void verifyDBFull(ZipSource zipSource) {
+ assertNotNull(zipSource.getCreationOrder());
+ assertEquals(zipSource.getCreationOrder().size(), 5);
+
+ assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+ verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS);
+ }
+
+ private void verifyDBFullSkipLineageFull(ZipSource zipSource) {
+ assertNotNull(zipSource.getCreationOrder());
+ assertEquals(zipSource.getCreationOrder().size(), 4);
+
+ assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+ verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2);
+ }
+
+ private void verifyTableWithLineageFull(ZipSource zipSource) {
+ assertNotNull(zipSource.getCreationOrder());
+ assertEquals(zipSource.getCreationOrder().size(), 5);
+
+ assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+ verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS);
+ }
+
+ private void verifyTableWithLineageSkipLineageFull(ZipSource zipSource) {
+ assertNotNull(zipSource.getCreationOrder());
+ assertEquals(zipSource.getCreationOrder().size(), 4);
+
+ assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+ verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2);
+ }
+
+ private void verifyTableWithoutLineageFull(ZipSource zipSource) {
+ assertNotNull(zipSource.getCreationOrder());
+ assertEquals(zipSource.getCreationOrder().size(), 5);
+
+ assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+ verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2,GUID_HIVE_PROCESS);
+ }
+
+ private void verifyTableWithoutLineageSkipLineageFull(ZipSource zipSource) {
+ assertNotNull(zipSource.getCreationOrder());
+ assertEquals(zipSource.getCreationOrder().size(), 4);
+
+ assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+ verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2);
+ }
+
+
+ private void verifyDBConn(ZipSource zipSource) {
+ assertNotNull(zipSource.getCreationOrder());
+ assertEquals(zipSource.getCreationOrder().size(), 5);
+
+ assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+ verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS);
+ }
+
+ private void verifyDBSkipLineageConn(ZipSource zipSource) {
+ assertNotNull(zipSource.getCreationOrder());
+ assertEquals(zipSource.getCreationOrder().size(), 4);
+
+ assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+ verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2);
+ }
+
+ private void verifyTableWithLineageConn(ZipSource zipSource) {
+ assertNotNull(zipSource.getCreationOrder());
+ assertEquals(zipSource.getCreationOrder().size(), 4);
+
+ assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+ verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS);
+ }
+
+ private void verifyTableWithLineageSkipLineageConn(ZipSource zipSource) {
+ assertNotNull(zipSource.getCreationOrder());
+ assertEquals(zipSource.getCreationOrder().size(),2);
+
+ assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+ verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_CTAS_2);;
+ }
+
+ private void verifyTableWithoutLineageConn(ZipSource zipSource) {
+ assertNotNull(zipSource.getCreationOrder());
+ assertEquals(zipSource.getCreationOrder().size(), 2);
+
+ assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+ verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1);
+ }
+
+ private void verifyTableWithoutLineageSkipLineageConn(ZipSource zipSource) {
+ assertNotNull(zipSource.getCreationOrder());
+ assertEquals(zipSource.getCreationOrder().size(), 2);;
+
+ assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS));
+ verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1);
+ }
+
+ private void verifyExpectedEntities(List<String> fileNames, String... guids){
+ assertEquals(fileNames.size(), guids.length);
+ for (String guid : guids) {
+ assertTrue(fileNames.contains(guid.toLowerCase()));
+ }
+ }
+
+ private List<String> getFileNames(ZipSource zipSource){
+ List<String> ret = new ArrayList<>();
+ assertTrue(zipSource.hasNext());
+
+ while (zipSource.hasNext()){
+ AtlasEntity atlasEntity = zipSource.next();
+ assertNotNull(atlasEntity);
+ ret.add(atlasEntity.getGuid());
+ }
+ return ret;
+ }
+}