You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2019/09/05 06:25:28 UTC
[atlas] branch branch-0.8 updated: ATLAS-3371 Export API - Backport
/ refactorings for table level replication.
This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-0.8 by this push:
new 72f8659 ATLAS-3371 Export API - Backport / refactorings for table level replication.
72f8659 is described below
commit 72f86594425fe5d6dd788ae7b457a14c56b84027
Author: nikhilbonte <ni...@freestoneinfotech.com>
AuthorDate: Wed Aug 14 14:12:46 2019 +0530
ATLAS-3371 Export API - Backport / refactorings for table level replication.
Signed-off-by: nixonrodrigues <ni...@apache.org>
---
.../graph/GraphBackedMetadataRepository.java | 8 +-
.../atlas/repository/impexp/AuditsWriter.java | 63 +++-
.../atlas/repository/impexp/EntitiesExtractor.java | 81 +++++
.../atlas/repository/impexp/ExportService.java | 387 ++++-----------------
.../atlas/repository/impexp/ExtractStrategy.java | 28 ++
.../impexp/IncrementalExportEntityProvider.java | 39 ++-
.../impexp/StartEntityFetchByExportRequest.java | 208 +++++++++++
.../atlas/repository/impexp/VertexExtractor.java | 183 ++++++++++
.../repository/impexp/ExportIncrementalTest.java | 101 +++++-
.../IncrementalExportEntityProviderTest.java | 3 +-
.../impexp/ReplicationEntityAttributeTest.java | 9 +-
.../StartEntityFetchByExportRequestTest.java | 112 ++++++
12 files changed, 880 insertions(+), 342 deletions(-)
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
index 61e1623..0159010 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
@@ -138,7 +138,9 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
public CreateUpdateEntitiesResult createEntities(ITypedReferenceableInstance... entities) throws RepositoryException,
EntityExistsException {
if (LOG.isDebugEnabled()) {
- LOG.debug("adding entities={}", entities);
+ for(ITypedReferenceableInstance instance : entities){
+ LOG.debug("adding entity {}", instance);
+ }
}
try {
@@ -412,7 +414,9 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@GraphTransaction
public CreateUpdateEntitiesResult updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException {
if (LOG.isDebugEnabled()) {
- LOG.debug("updating entity {}", entitiesUpdated);
+ for(ITypedReferenceableInstance instance : entitiesUpdated){
+ LOG.debug("updating entity {}", instance);
+ }
}
try {
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index 1281fd2..612b403 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -18,6 +18,7 @@
package org.apache.atlas.repository.impexp;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasException;
@@ -28,8 +29,12 @@ import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.impexp.AtlasServer;
import org.apache.atlas.model.impexp.ExportImportAuditEntry;
+import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +42,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.inject.Inject;
+import java.util.Collections;
import java.util.List;
@Component
@@ -45,6 +51,8 @@ public class AuditsWriter {
private static final String CLUSTER_NAME_DEFAULT = "default";
private static final String DC_SERVER_NAME_SEPARATOR = "$";
+ private AtlasTypeRegistry typeRegistry;
+ private AtlasEntityStore entityStore;
private AtlasServerService atlasServerService;
private ExportImportAuditService auditService;
@@ -52,7 +60,10 @@ public class AuditsWriter {
private ImportAudits auditForImport = new ImportAudits();
@Inject
- public AuditsWriter(AtlasServerService atlasServerService, ExportImportAuditService auditService) {
+ public AuditsWriter(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore,
+ AtlasServerService atlasServerService, ExportImportAuditService auditService) {
+ this.typeRegistry = typeRegistry;
+ this.entityStore = entityStore;
this.atlasServerService = atlasServerService;
this.auditService = auditService;
}
@@ -78,7 +89,10 @@ public class AuditsWriter {
return;
}
- AtlasServer server = saveServer(serverName, serverFullName, exportedGuids.get(0), lastModifiedTimestamp);
+ String candidateGuid = exportedGuids.get(0);
+ String replGuidKey = ReplKeyGuidFinder.get(typeRegistry, entityStore, candidateGuid);
+ AtlasServer server = saveServer(serverName, serverFullName, replGuidKey, lastModifiedTimestamp);
+
atlasServerService.updateEntitiesWithServer(server, exportedGuids, attrNameReplicated);
}
@@ -124,6 +138,51 @@ public class AuditsWriter {
atlasServerService.getCreateAtlasServer(getCurrentClusterName(), getCurrentClusterName());
}
+ static class ReplKeyGuidFinder {
+ private static final String ENTITY_TYPE_HIVE_DB = "hive_db";
+ private static final String ENTITY_TYPE_HIVE_TABLE = "hive_table";
+ private static final String ENTITY_TYPE_HIVE_COLUMN = "hive_column";
+ private static final String QUALIFIED_NAME = "qualifiedName";
+
+ public static String get(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, String candidateGuid) {
+ String guid = null;
+ try {
+ guid = getParentEntityGuid(typeRegistry, entityStore, candidateGuid);
+ } catch (AtlasBaseException e) {
+ LOG.error("Error fetching parent guid for child entity: {}", candidateGuid);
+ }
+
+ if (StringUtils.isEmpty(guid)) {
+ guid = candidateGuid;
+ }
+
+ return guid;
+ }
+
+ private static String getParentEntityGuid(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, String defaultGuid) throws AtlasBaseException {
+ AtlasEntity.AtlasEntityWithExtInfo extInfo = entityStore.getById(defaultGuid);
+ if (extInfo == null || extInfo.getEntity() == null) {
+ return null;
+ }
+
+ String typeName = extInfo.getEntity().getTypeName();
+ if (!typeName.equals(ENTITY_TYPE_HIVE_TABLE) && !typeName.equals(ENTITY_TYPE_HIVE_COLUMN)) {
+ return null;
+ }
+
+ Object hiveDBQualifiedName = extractHiveDBQualifiedName((String) extInfo.getEntity().getAttribute(QUALIFIED_NAME));
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(ENTITY_TYPE_HIVE_DB);
+ return entityStore.getGuidByUniqueAttributes(entityType, Collections.singletonMap(QUALIFIED_NAME, hiveDBQualifiedName));
+ }
+
+ @VisibleForTesting
+ static String extractHiveDBQualifiedName(String qualifiedName) {
+ return String.format("%s@%s",
+ StringUtils.substringBefore(qualifiedName, "."),
+ StringUtils.substringAfter(qualifiedName, "@"));
+ }
+ }
+
private class ExportAudits {
private AtlasExportRequest request;
private String targetServerName;
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
new file mode 100644
index 0000000..1a4deeb
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasTypeRegistry;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class EntitiesExtractor {
+ static final String PROPERTY_GUID = "__guid";
+ private static final String VERTEX_BASED_EXTRACT = "default";
+ private static final String INCREMENTAL_EXTRACT = "incremental";
+ private static final String RELATION_BASED_EXTRACT = "relationship";
+
+ private Map<String, ExtractStrategy> extractors = new HashMap<>();
+ private ExtractStrategy extractor;
+
+ public EntitiesExtractor(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
+ extractors.put(VERTEX_BASED_EXTRACT, new VertexExtractor(atlasGraph, typeRegistry));
+ extractors.put(INCREMENTAL_EXTRACT, new IncrementalExportEntityProvider(atlasGraph));
+ }
+
+ public void get(AtlasEntity entity, ExportService.ExportContext context) {
+ if(extractor == null) {
+ extractor = extractors.get(VERTEX_BASED_EXTRACT);
+ }
+
+ switch (context.fetchType) {
+ case CONNECTED:
+ extractor.connectedFetch(entity, context);
+ break;
+
+ case INCREMENTAL:
+ if (context.isHiveDBIncrementalSkipLineage()) {
+ extractors.get(INCREMENTAL_EXTRACT).fullFetch(entity, context);
+ break;
+ } else if (context.isHiveTableIncrementalSkipLineage()) {
+ extractors.get(INCREMENTAL_EXTRACT).connectedFetch(entity, context);
+ break;
+ }
+
+ case FULL:
+ default:
+ extractor.fullFetch(entity, context);
+ }
+ }
+
+ public void setExtractor(AtlasEntityDef atlasEntityDef) {
+ extractor = extractUsing(atlasEntityDef);
+ }
+
+ public void close() {
+ for (ExtractStrategy es : extractors.values()) {
+ es.close();
+ }
+ }
+
+ private ExtractStrategy extractUsing(AtlasEntityDef atlasEntityDef) {
+ return extractors.get(RELATION_BASED_EXTRACT);
+ }
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 7382d32..1e97443 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -18,7 +18,6 @@
package org.apache.atlas.repository.impexp;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException;
@@ -27,65 +26,54 @@ import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
-import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
import org.apache.atlas.repository.util.UniqueList;
-import org.apache.atlas.type.AtlasEntityType;
-import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
-import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
-import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
-import javax.script.ScriptEngine;
-import javax.script.ScriptException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.*;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL;
@Component
public class ExportService {
private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
- public static final String PROPERTY_GUID = "__guid";
- private static final String PROPERTY_IS_PROCESS = "isProcess";
-
-
private final AtlasTypeRegistry typeRegistry;
- private final String QUERY_BINDING_START_GUID = "startGuid";
+ private final StartEntityFetchByExportRequest startEntityFetchByExportRequest;
+ private final EntitiesExtractor entitiesExtractor;
private AuditsWriter auditsWriter;
- private final AtlasGraph atlasGraph;
private final EntityGraphRetriever entityGraphRetriever;
- private final AtlasGremlinQueryProvider gremlinQueryProvider;
private ExportTypeProcessor exportTypeProcessor;
private final HdfsPathEntityCreator hdfsPathEntityCreator;
- private IncrementalExportEntityProvider incrementalExportEntityProvider;
@Inject
public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph,
AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) {
this.typeRegistry = typeRegistry;
this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
- this.atlasGraph = atlasGraph;
- this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
this.auditsWriter = auditsWriter;
this.hdfsPathEntityCreator = hdfsPathEntityCreator;
+ this.startEntityFetchByExportRequest = new StartEntityFetchByExportRequest(atlasGraph, typeRegistry, AtlasGremlinQueryProvider.INSTANCE);
+ this.entitiesExtractor = new EntitiesExtractor(atlasGraph, typeRegistry);
}
public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
@@ -94,7 +82,7 @@ public class ExportService {
AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP,
hostName, startTime, getCurrentChangeMarker());
- ExportContext context = new ExportContext(atlasGraph, result, exportSink);
+ ExportContext context = new ExportContext(result, exportSink);
exportTypeProcessor = new ExportTypeProcessor(typeRegistry, context);
try {
@@ -108,19 +96,18 @@ public class ExportService {
} catch(Exception ex) {
LOG.error("Operation failed: ", ex);
} finally {
- atlasGraph.releaseGremlinScriptEngine(context.scriptEngine);
+ entitiesExtractor.close();
LOG.info("<== export(user={}, from={}): status {}: changeMarker: {}",
userName, requestingIP, context.result.getOperationStatus(), context.result.getChangeMarker());
context.clear();
result.clear();
- incrementalExportEntityProvider = null;
}
return context.result;
}
private long getCurrentChangeMarker() {
- return Math.min(RequestContextV1.earliestActiveRequestTime(), RequestContext.earliestActiveRequestTime());
+ return RequestContextV1.earliestActiveRequestTime();
}
private void updateSinkWithOperationMetrics(String userName, ExportContext context,
@@ -128,7 +115,6 @@ public class ExportService {
long startTime, long endTime) throws AtlasBaseException {
int duration = getOperationDuration(startTime, endTime);
context.result.setSourceClusterName(AuditsWriter.getCurrentClusterName());
- context.addToEntityCreationOrder(context.lineageProcessed);
context.sink.setExportOrder(context.entityCreationOrder.getList());
context.sink.setTypesDef(context.result.getData().getTypesDef());
@@ -197,7 +183,9 @@ public class ExportService {
}
private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) {
- debugLog("==> processObjectId({})", item);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> processObjectId({})", item);
+ }
try {
List<String> entityGuids = getStartingEntity(item, context);
@@ -205,9 +193,10 @@ public class ExportService {
return AtlasExportResult.OperationStatus.FAIL;
}
+ entitiesExtractor.setExtractor(typeRegistry.getEntityDefByName(item.getTypeName()));
+
for (String guid : entityGuids) {
processEntityGuid(guid, context);
- populateEntitesForIncremental(guid, context);
}
while (!context.guidsToProcess.isEmpty()) {
@@ -221,303 +210,63 @@ public class ExportService {
context.lineageProcessed.addAll(context.lineageToProcess.getList());
context.lineageToProcess.clear();
}
+ context.isSkipConnectedFetch = false;
}
} catch (AtlasBaseException excp) {
LOG.error("Fetching entity failed for: {}", item, excp);
return AtlasExportResult.OperationStatus.FAIL;
}
- debugLog("<== processObjectId({})", item);
- return AtlasExportResult.OperationStatus.SUCCESS;
- }
-
- private void debugLog(String s, Object... params) {
- if (!LOG.isDebugEnabled()) {
- return;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== processObjectId({})", item);
}
-
- LOG.debug(s, params);
+ return AtlasExportResult.OperationStatus.SUCCESS;
}
private List<String> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException {
- List<String> ret = null;
-
if(item.getTypeName().equalsIgnoreCase(HdfsPathEntityCreator.HDFS_PATH_TYPE)) {
hdfsPathEntityCreator.getCreateEntity(item);
}
- if (StringUtils.isNotEmpty(item.getGuid())) {
- ret = Collections.singletonList(item.getGuid());
- } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_FOR_TYPE) && StringUtils.isNotEmpty(item.getTypeName())) {
- ret = getStartingEntityForMatchTypeForType(item, context);
- } else if (StringUtils.isNotEmpty(item.getTypeName()) && MapUtils.isNotEmpty(item.getUniqueAttributes())) {
- ret = getStartingEntityUsingQueryTemplate(item, context, ret);
- }
-
- if (ret == null) {
- ret = Collections.emptyList();
- }
-
- logInfoStartingEntitiesFound(item, context, ret);
- return ret;
- }
-
- private List<String> getStartingEntityUsingQueryTemplate(AtlasObjectId item, ExportContext context, List<String> ret) throws AtlasBaseException {
- final String queryTemplate = getQueryTemplateForMatchType(context);
- final String typeName = item.getTypeName();
- final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
-
- if (entityType == null) {
- throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName);
- }
-
- for (Map.Entry<String, Object> e : item.getUniqueAttributes().entrySet()) {
- String attrName = e.getKey();
- Object attrValue = e.getValue();
-
- AtlasAttribute attribute = entityType.getAttribute(attrName);
- if (attribute == null || attrValue == null) {
- continue;
- }
-
- setupBindingsForTypeNameAttrNameAttrValue(context, typeName, attrValue, attribute);
-
- List<String> guids = executeGremlinQueryForGuids(queryTemplate, context);
-
- if (CollectionUtils.isNotEmpty(guids)) {
- if (ret == null) {
- ret = new ArrayList<>();
- }
-
- for (String guid : guids) {
- if (!ret.contains(guid)) {
- ret.add(guid);
- }
- }
- }
- }
- return ret;
- }
-
- private List<String> getStartingEntityForMatchTypeForType(AtlasObjectId item, ExportContext context) {
- setupBindingsForTypeName(context, item.getTypeName());
- return executeGremlinQueryForGuids(getQueryTemplateForMatchType(context), context);
- }
-
- private void logInfoStartingEntitiesFound(AtlasObjectId item, ExportContext context, List<String> ret) {
- LOG.info("export(item={}; matchType={}, fetchType={}): found {} entities: options: {}", item,
- context.matchType, context.fetchType, ret.size(), AtlasType.toJson(context.result.getRequest()));
- }
-
- private void setupBindingsForTypeName(ExportContext context, String typeName) {
- context.bindings.clear();
- context.bindings.put("typeName", new HashSet<String>(Arrays.asList(StringUtils.split(typeName,","))));
- }
-
- private void setupBindingsForTypeNameAttrNameAttrValue(ExportContext context,
- String typeName, Object attrValue, AtlasAttribute attribute) {
- context.bindings.clear();
- context.bindings.put("typeName", typeName);
- context.bindings.put("attrName", attribute.getQualifiedName());
- context.bindings.put("attrValue", attrValue);
- }
-
- private String getQueryTemplateForMatchType(ExportContext context) {
- if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_STARTS_WITH)) {
- return gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_STARTS_WITH);
- }
-
- if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_ENDS_WITH)) {
- return gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_ENDS_WITH);
- }
-
- if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_CONTAINS)) {
- return gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_CONTAINS);
- }
-
- if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_MATCHES)) {
- return gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_MATCHES);
- }
-
- if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_FOR_TYPE)) {
- return gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_ALL_FOR_TYPE);
- }
-
- return gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_DEFAULT);
+ return startEntityFetchByExportRequest.get(context.result.getRequest(), item);
}
private void processEntityGuid(String guid, ExportContext context) throws AtlasBaseException {
- debugLog("==> processEntityGuid({})", guid);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> processEntityGuid({})", guid);
+ }
if (context.guidsProcessed.contains(guid)) {
return;
}
- TraversalDirection direction = context.guidDirection.get(guid);
AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
- processEntity(guid, entityWithExtInfo, context, direction);
+ processEntity( entityWithExtInfo, context);
- debugLog("<== processEntityGuid({})", guid);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== processEntityGuid({})", guid);
+ }
}
- public void processEntity(String guid, AtlasEntityWithExtInfo entityWithExtInfo,
- ExportContext context,
- TraversalDirection direction) throws AtlasBaseException {
-
- if (!context.lineageProcessed.contains(guid) && context.doesTimestampQualify(entityWithExtInfo.getEntity())) {
- context.addToEntityCreationOrder(entityWithExtInfo.getEntity().getGuid());
- }
+ public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
addEntity(entityWithExtInfo, context);
exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
- getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction);
+ entitiesExtractor.get(entityWithExtInfo.getEntity(), context);
if (entityWithExtInfo.getReferredEntities() != null) {
for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
exportTypeProcessor.addTypes(e, context);
- getConntedEntitiesBasedOnOption(e, context, direction);
+ entitiesExtractor.get(e, context);
}
context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet());
}
}
- private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
- switch (context.fetchType) {
- case CONNECTED:
- getEntityGuidsForConnectedFetch(entity, context, direction);
- break;
-
- case INCREMENTAL:
- if(context.isHiveDBIncrementalSkipLineage()) {
- break;
- }
-
- case FULL:
- default:
- getEntityGuidsForFullFetch(entity, context);
- }
- }
-
- private void populateEntitesForIncremental(String topLevelEntityGuid, ExportContext context) throws AtlasBaseException {
- if (context.isHiveDBIncrementalSkipLineage() == false || incrementalExportEntityProvider != null) {
- return;
- }
-
- incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, context.scriptEngine);
- incrementalExportEntityProvider.populate(topLevelEntityGuid, context.changeMarker, context.guidsToProcess);
- }
-
- private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
- if (direction == null || direction == TraversalDirection.UNKNOWN) {
- getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD);
- } else {
- if (isProcessEntity(entity)) {
- direction = TraversalDirection.OUTWARD;
- }
-
- getConnectedEntityGuids(entity, context, direction);
- }
- }
-
- private boolean isProcessEntity(AtlasEntity entity) {
- String typeName = entity.getTypeName();
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
-
- return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
- }
-
- private void getConnectedEntityGuids(AtlasEntity entity, ExportContext context, TraversalDirection... directions) {
- if(directions == null) {
- return;
- }
-
- for (TraversalDirection direction : directions) {
- String query = getQueryForTraversalDirection(direction);
-
- if(LOG.isDebugEnabled()) {
- debugLog("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query);
- }
-
- context.bindings.clear();
- context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
-
- List<HashMap<String, Object>> result = executeGremlinQuery(query, context);
-
- if (CollectionUtils.isEmpty(result)) {
- continue;
- }
-
- for (HashMap<String, Object> hashMap : result) {
- String guid = (String) hashMap.get(PROPERTY_GUID);
- TraversalDirection currentDirection = context.guidDirection.get(guid);
- boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
-
- if(context.skipLineage && isLineage) continue;
-
- if (currentDirection == null) {
- context.addToBeProcessed(isLineage, guid, direction);
-
- } else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) {
- // the entity should be reprocessed to get inward entities
- context.guidsProcessed.remove(guid);
- context.addToBeProcessed(isLineage, guid, direction);
- }
- }
-
- if(LOG.isDebugEnabled()) {
- debugLog("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size());
- }
- }
- }
-
- private String getQueryForTraversalDirection(TraversalDirection direction) {
- switch (direction) {
- case INWARD:
- return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE);
-
- default:
- case OUTWARD:
- return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE);
- }
- }
-
- private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) {
- if(LOG.isDebugEnabled()) {
- debugLog("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
- }
-
- String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
-
- context.bindings.clear();
- context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
-
- List<HashMap<String, Object>> result = executeGremlinQuery(query, context);
-
- if (CollectionUtils.isEmpty(result)) {
- return;
- }
-
- for (HashMap<String, Object> hashMap : result) {
- String guid = (String) hashMap.get(PROPERTY_GUID);
- boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
-
- if(context.getSkipLineage() && isLineage) continue;
-
- if (!context.guidsProcessed.contains(guid)) {
- context.addToBeProcessed(isLineage, guid, TraversalDirection.BOTH);
- }
- }
-
- if(LOG.isDebugEnabled()) {
- debugLog("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}",
- entity.getGuid(), result.size(), context.guidsToProcess.size());
- }
- }
-
private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
if(context.sink.hasEntity(entityWithExtInfo.getEntity().getGuid())) {
return;
@@ -537,7 +286,6 @@ public class ExportService {
} else {
List<AtlasEntity> entities = context.getEntitiesWithModifiedTimestamp(entityWithExtInfo);
for (AtlasEntity e : entities) {
- context.addToEntityCreationOrder(e.getGuid());
context.addToSink(new AtlasEntityWithExtInfo(e));
context.result.incrementMeticsCounter(String.format("entity:%s", e.getTypeName()));
}
@@ -546,24 +294,6 @@ public class ExportService {
context.reportProgress();
}
- private List<HashMap<String, Object>> executeGremlinQuery(String query, ExportContext context) {
- try {
- return (List<HashMap<String, Object>>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false);
- } catch (ScriptException e) {
- LOG.error("Script execution failed for query: ", query, e);
- return null;
- }
- }
-
- private List<String> executeGremlinQueryForGuids(String query, ExportContext context) {
- try {
- return (List<String>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false);
- } catch (ScriptException e) {
- LOG.error("Script execution failed for query: ", query, e);
- return null;
- }
- }
-
public enum TraversalDirection {
UNKNOWN,
INWARD,
@@ -596,11 +326,11 @@ public class ExportService {
static class ExportContext {
private static final int REPORTING_THREASHOLD = 1000;
private static final String ATLAS_TYPE_HIVE_DB = "hive_db";
-
+ private static final String ATLAS_TYPE_HIVE_TABLE = "hive_table";
final UniqueList<String> entityCreationOrder = new UniqueList<>();
final Set<String> guidsProcessed = new HashSet<>();
- final private UniqueList<String> guidsToProcess = new UniqueList<>();
+ final UniqueList<String> guidsToProcess = new UniqueList<>();
final UniqueList<String> lineageToProcess = new UniqueList<>();
final Set<String> lineageProcessed = new HashSet<>();
final Map<String, TraversalDirection> guidDirection = new HashMap<>();
@@ -611,31 +341,29 @@ public class ExportService {
final AtlasExportResult result;
private final ZipSink sink;
- private final ScriptEngine scriptEngine;
- private final Map<String, Object> bindings;
- private final ExportFetchType fetchType;
- private final String matchType;
- private final boolean skipLineage;
- private final long changeMarker;
- private final boolean isHiveDBIncremental;
+ final ExportFetchType fetchType;
+ final boolean skipLineage;
+ final long changeMarker;
+ boolean isSkipConnectedFetch;
+ private final boolean isHiveDBIncremental;
+ private final boolean isHiveTableIncremental;
private int progressReportCount = 0;
- ExportContext(AtlasGraph atlasGraph, AtlasExportResult result, ZipSink sink) throws AtlasBaseException {
+ ExportContext(AtlasExportResult result, ZipSink sink) throws AtlasBaseException {
this.result = result;
this.sink = sink;
- scriptEngine = atlasGraph.getGremlinScriptEngine();
- bindings = new HashMap<>();
fetchType = ExportFetchType.from(result.getRequest().getFetchTypeOptionValue());
- matchType = result.getRequest().getMatchTypeOptionValue();
skipLineage = result.getRequest().getSkipLineageOptionValue();
this.changeMarker = result.getRequest().getChangeTokenFromOptions();
this.isHiveDBIncremental = checkHiveDBIncrementalSkipLineage(result.getRequest());
+ this.isHiveTableIncremental = checkHiveTableIncrementalSkipLineage(result.getRequest());
+ this.isSkipConnectedFetch = false;
}
private boolean checkHiveDBIncrementalSkipLineage(AtlasExportRequest request) {
- if(request.getItemsToExport().size() == 0) {
+ if(CollectionUtils.isEmpty(request.getItemsToExport())) {
return false;
}
@@ -644,6 +372,16 @@ public class ExportService {
request.getSkipLineageOptionValue();
}
+ private boolean checkHiveTableIncrementalSkipLineage(AtlasExportRequest request) {
+ if(CollectionUtils.isEmpty(request.getItemsToExport())) {
+ return false;
+ }
+
+ return request.getItemsToExport().get(0).getTypeName().equalsIgnoreCase(ATLAS_TYPE_HIVE_TABLE) &&
+ request.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL) &&
+ request.getSkipLineageOptionValue();
+ }
+
public List<AtlasEntity> getEntitiesWithModifiedTimestamp(AtlasEntityWithExtInfo entityWithExtInfo) {
if(fetchType != ExportFetchType.INCREMENTAL) {
return new ArrayList<>();
@@ -701,6 +439,7 @@ public class ExportService {
}
public void addToSink(AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
+ addToEntityCreationOrder(entityWithExtInfo.getEntity().getGuid());
sink.add(entityWithExtInfo);
}
@@ -708,12 +447,12 @@ public class ExportService {
return isHiveDBIncremental;
}
- public void addToEntityCreationOrder(String guid) {
- entityCreationOrder.add(guid);
+ public boolean isHiveTableIncrementalSkipLineage() {
+ return isHiveTableIncremental;
}
- public void addToEntityCreationOrder(Collection<String> guids) {
- entityCreationOrder.addAll(guids);
+ public void addToEntityCreationOrder(String guid) {
+ entityCreationOrder.add(guid);
}
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java
new file mode 100644
index 0000000..91dd6f7
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+
+public interface ExtractStrategy {
+
+ void connectedFetch(AtlasEntity entity, ExportService.ExportContext context);
+ void fullFetch(AtlasEntity entity, ExportService.ExportContext context);
+ void close();
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
index 4a0d1b2..f9d2e57 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
@@ -18,6 +18,8 @@
package org.apache.atlas.repository.impexp;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.util.UniqueList;
import org.slf4j.Logger;
@@ -32,7 +34,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-public class IncrementalExportEntityProvider {
+public class IncrementalExportEntityProvider implements ExtractStrategy {
private static final Logger LOG = LoggerFactory.getLogger(IncrementalExportEntityProvider.class);
private static final String QUERY_PARAMETER_START_GUID = "startGuid";
@@ -45,15 +47,35 @@ public class IncrementalExportEntityProvider {
private static final String QUERY_SD = QUERY_TABLE + ".out('__hive_table.sd')";
private static final String QUERY_COLUMN = QUERY_TABLE + ".out('__hive_table.columns')";
private static final String TRANSFORM_CLAUSE = ".transform{[__guid:it.__guid]}.toList()";
-
private static final String TIMESTAMP_CLAUSE = ".has('__modificationTimestamp', T.gt, modificationTimestamp)";
+ private static final String QUERY_TABLE_DB = QUERY_DB + ".out('__hive_table.db')";
+ private static final String QUERY_TABLE_SD = QUERY_DB + ".out('__hive_table.sd')";
+ private static final String QUERY_TABLE_COLUMNS = QUERY_DB + ".out('__hive_table.columns')";
+
private ScriptEngine scriptEngine;
@Inject
- public IncrementalExportEntityProvider(AtlasGraph atlasGraph, ScriptEngine scriptEngine) {
+ public IncrementalExportEntityProvider(AtlasGraph atlasGraph) {
this.atlasGraph = atlasGraph;
- this.scriptEngine = scriptEngine;
+ try {
+ this.scriptEngine = atlasGraph.getGremlinScriptEngine();
+ } catch (AtlasBaseException e) {
+ LOG.error("Error instantiating script engine.", e);
+ }
+ }
+
+ @Override
+ public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) {
+ populate(entity.getGuid(), context.changeMarker, context.guidsToProcess);
+ }
+
+ @Override
+ public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
+ //starting entity is hive_table
+ context.guidsToProcess.addAll(fetchGuids(entity.getGuid(), QUERY_TABLE_DB, context.changeMarker));
+ context.guidsToProcess.addAll(fetchGuids(entity.getGuid(), QUERY_TABLE_SD, context.changeMarker));
+ context.guidsToProcess.addAll(fetchGuids(entity.getGuid(), QUERY_TABLE_COLUMNS, context.changeMarker));
}
public void populate(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) {
@@ -99,7 +121,7 @@ public class IncrementalExportEntityProvider {
}
for (HashMap<String, Object> item : result) {
- guids.add((String) item.get(ExportService.PROPERTY_GUID));
+ guids.add((String) item.get(EntitiesExtractor.PROPERTY_GUID));
}
return guids;
@@ -109,4 +131,11 @@ public class IncrementalExportEntityProvider {
return null;
}
}
+
+ @Override
+ public void close() {
+ if (scriptEngine != null) {
+ atlasGraph.releaseGremlinScriptEngine(scriptEngine);
+ }
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/StartEntityFetchByExportRequest.java b/repository/src/main/java/org/apache/atlas/repository/impexp/StartEntityFetchByExportRequest.java
new file mode 100644
index 0000000..fd67c16
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/StartEntityFetchByExportRequest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.util.AtlasGremlinQueryProvider;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_CONTAINS;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_ENDS_WITH;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_FOR_TYPE;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_MATCHES;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_STARTS_WITH;
+
+public class StartEntityFetchByExportRequest {
+ private static final Logger LOG = LoggerFactory.getLogger(StartEntityFetchByExportRequest.class);
+
+ static final String DEFAULT_MATCH = "*";
+ static final String BINDING_PARAMETER_TYPENAME = "typeName";
+ static final String BINDING_PARAMETER_ATTR_NAME = "attrName";
+ static final String BINDING_PARAMTER_ATTR_VALUE = "attrValue";
+
+ private AtlasGraph atlasGraph;
+ private AtlasTypeRegistry typeRegistry;
+
+
+ private Map<String, String> matchTypeQuery;
+
+ public StartEntityFetchByExportRequest(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry, AtlasGremlinQueryProvider gremlinQueryProvider) {
+ this.typeRegistry = typeRegistry;
+ this.atlasGraph = atlasGraph;
+ initMatchTypeQueryMap(gremlinQueryProvider);
+ }
+
+ private void initMatchTypeQueryMap(AtlasGremlinQueryProvider gremlinQueryProvider) {
+ matchTypeQuery = new HashMap<>();
+ matchTypeQuery.put(MATCH_TYPE_STARTS_WITH, gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_TYPE_STARTS_WITH));
+ matchTypeQuery.put(MATCH_TYPE_ENDS_WITH, gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_TYPE_ENDS_WITH));
+ matchTypeQuery.put(MATCH_TYPE_CONTAINS, gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_TYPE_CONTAINS));
+ matchTypeQuery.put(MATCH_TYPE_MATCHES, gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_TYPE_MATCHES));
+ matchTypeQuery.put(MATCH_TYPE_FOR_TYPE, gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_TYPE_ALL_FOR_TYPE));
+ matchTypeQuery.put(DEFAULT_MATCH, gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_TYPE_DEFAULT));
+
+ }
+
+ public List<AtlasObjectId> get(AtlasExportRequest exportRequest) {
+ List<AtlasObjectId> list = new ArrayList<>();
+ for(AtlasObjectId objectId : exportRequest.getItemsToExport()) {
+ List<String> guids = get(exportRequest, objectId);
+ if (guids.isEmpty()) {
+ continue;
+ }
+
+ objectId.setGuid(guids.get(0));
+ list.add(objectId);
+ }
+
+ return list;
+ }
+
+ public List<String> get(AtlasExportRequest exportRequest, AtlasObjectId item) {
+ List<String> ret = new ArrayList<>();
+ String matchType = exportRequest.getMatchTypeOptionValue();
+
+ try {
+ if (StringUtils.isNotEmpty(item.getGuid())) {
+ ret.add(item.getGuid());
+ return ret;
+ }
+
+ if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_FOR_TYPE) && StringUtils.isNotEmpty(item.getTypeName())) {
+ ret = getEntitiesForMatchTypeType(item, matchType);
+ return ret;
+ }
+
+ if (StringUtils.isNotEmpty(item.getTypeName()) && MapUtils.isNotEmpty(item.getUniqueAttributes())) {
+ ret = getEntitiesForMatchTypeUsingUniqueAttributes(item, matchType);
+ return ret;
+ }
+ }
+ catch (AtlasBaseException ex) {
+ LOG.error("Error fetching starting entity for: {}", item, ex);
+ } finally {
+ LOG.info("export(item={}; matchType={}, fetchType={}): found {} entities: options: {}", item,
+ exportRequest.getMatchTypeOptionValue(), exportRequest.getFetchTypeOptionValue(), ret.size(), AtlasType.toJson(exportRequest));
+ }
+
+ return ret;
+ }
+
+ private List<String> getEntitiesForMatchTypeUsingUniqueAttributes(AtlasObjectId item, String matchType) throws AtlasBaseException {
+ final String queryTemplate = getQueryTemplateForMatchType(matchType);
+ final String typeName = item.getTypeName();
+ final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+
+ Set<String> ret = new HashSet<>();
+ if (entityType == null) {
+ throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName);
+ }
+
+ for (Map.Entry<String, Object> e : item.getUniqueAttributes().entrySet()) {
+ String attrName = e.getKey();
+ Object attrValue = e.getValue();
+
+ AtlasStructType.AtlasAttribute attribute = entityType.getAttribute(attrName);
+ if (attribute == null || attrValue == null) {
+ continue;
+ }
+
+ List<String> guids = executeGremlinQuery(queryTemplate,
+ getBindingsForObjectId(typeName, attribute.getQualifiedName(), e.getValue()));
+
+ if (!CollectionUtils.isNotEmpty(guids)) {
+ continue;
+ }
+
+ ret.addAll(guids);
+ }
+
+ return new ArrayList<>(ret);
+ }
+
+ private List<String> getEntitiesForMatchTypeType(AtlasObjectId item, String matchType) {
+ return executeGremlinQuery(getQueryTemplateForMatchType(matchType), getBindingsForTypeName(item.getTypeName()));
+ }
+
+ @VisibleForTesting
+ String getQueryTemplateForMatchType(String matchType) {
+ return matchTypeQuery.containsKey(matchType)
+ ? matchTypeQuery.get(matchType)
+ : matchTypeQuery.get(DEFAULT_MATCH);
+ }
+
+ private HashMap<String, Object> getBindingsForTypeName(String typeName) {
+ HashMap<String, Object> ret = new HashMap<>();
+ ret.put(BINDING_PARAMETER_TYPENAME, new HashSet<>(Arrays.asList(StringUtils.split(typeName, ","))));
+ return ret;
+ }
+
+ private HashMap<String, Object> getBindingsForObjectId(String typeName, String attrName, Object attrValue) {
+ HashMap<String, Object> ret = new HashMap<>();
+ ret.put(BINDING_PARAMETER_TYPENAME, typeName);
+ ret.put(BINDING_PARAMETER_ATTR_NAME, attrName);
+ ret.put(BINDING_PARAMTER_ATTR_VALUE, attrValue);
+
+ return ret;
+ }
+
+ @VisibleForTesting
+ List<String> executeGremlinQuery(String query, Map<String, Object> bindings) {
+ try {
+ return (List<String>) atlasGraph.executeGremlinScript(getScriptEngine(), bindings, query, false);
+ } catch (ScriptException e) {
+ LOG.error("Script execution failed for query: ", query, e);
+ return null;
+ }
+ }
+
+ public ScriptEngine getScriptEngine() {
+ try {
+ return atlasGraph.getGremlinScriptEngine();
+ }
+ catch (AtlasBaseException e) {
+ LOG.error("Error initializing script engine.", e);
+ }
+
+ return null;
+ }
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java
new file mode 100644
index 0000000..433f731
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.util.AtlasGremlinQueryProvider;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.atlas.repository.impexp.EntitiesExtractor.PROPERTY_GUID;
+
+public class VertexExtractor implements ExtractStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(VertexExtractor.class);
+
+ private static final String PROPERTY_IS_PROCESS = "isProcess";
+ private static final String QUERY_BINDING_START_GUID = "startGuid";
+
+ private final AtlasGremlinQueryProvider gremlinQueryProvider;
+
+ private final Map<String, Object> bindings;
+ private AtlasGraph atlasGraph;
+ private AtlasTypeRegistry typeRegistry;
+ private ScriptEngine scriptEngine;
+
+ public VertexExtractor(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
+ this.atlasGraph = atlasGraph;
+ this.typeRegistry = typeRegistry;
+ try {
+ this.scriptEngine = atlasGraph.getGremlinScriptEngine();
+ } catch (AtlasBaseException e) {
+ LOG.error("Script Engine: Instantiation failed!");
+ }
+ this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE;
+ this.bindings = new HashMap<>();
+ }
+
+ @Override
+ public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) {
+ if (LOG.isDebugEnabled()){
+ LOG.debug("==> fullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
+ }
+
+ String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
+
+ bindings.clear();
+ bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
+
+ List<Map<String, Object>> result = executeGremlinQuery(query, context);
+
+ if (CollectionUtils.isEmpty(result)) {
+ return;
+ }
+
+ for (Map<String, Object> hashMap : result) {
+ String guid = (String) hashMap.get(PROPERTY_GUID);
+ boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
+
+ if (context.getSkipLineage() && isLineage) continue;
+
+ if (!context.guidsProcessed.contains(guid)) {
+ context.addToBeProcessed(isLineage, guid, ExportService.TraversalDirection.BOTH);
+ }
+ }
+ }
+
+ @Override
+ public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) {
+ if (LOG.isDebugEnabled()){
+ LOG.debug("==> connectedFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
+ }
+
+ ExportService.TraversalDirection direction = context.guidDirection.get(entity.getGuid());
+
+ if (direction == null || direction == ExportService.TraversalDirection.UNKNOWN) {
+ getConnectedEntityGuids(entity, context, ExportService.TraversalDirection.OUTWARD, ExportService.TraversalDirection.INWARD);
+ } else {
+ if (isProcessEntity(entity)) {
+ direction = ExportService.TraversalDirection.OUTWARD;
+ }
+
+ getConnectedEntityGuids(entity, context, direction);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (scriptEngine != null) {
+ atlasGraph.releaseGremlinScriptEngine(scriptEngine);
+ }
+ }
+
+ private void getConnectedEntityGuids(AtlasEntity entity, ExportService.ExportContext context, ExportService.TraversalDirection... directions) {
+ if (directions == null) {
+ return;
+ }
+
+ for (ExportService.TraversalDirection direction : directions) {
+ String query = getQueryForTraversalDirection(direction);
+
+ bindings.clear();
+ bindings.put(QUERY_BINDING_START_GUID, entity.getGuid());
+
+ List<Map<String, Object>> result = executeGremlinQuery(query, context);
+
+ if (CollectionUtils.isEmpty(result)) {
+ continue;
+ }
+
+ for (Map<String, Object> hashMap : result) {
+ String guid = (String) hashMap.get(PROPERTY_GUID);
+ ExportService.TraversalDirection currentDirection = context.guidDirection.get(guid);
+ boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS);
+
+ if (context.skipLineage && isLineage) continue;
+
+ if (currentDirection == null) {
+ context.addToBeProcessed(isLineage, guid, direction);
+
+ } else if (currentDirection == ExportService.TraversalDirection.OUTWARD && direction == ExportService.TraversalDirection.INWARD) {
+ // the entity should be reprocessed to get inward entities
+ context.guidsProcessed.remove(guid);
+ context.addToBeProcessed(isLineage, guid, direction);
+ }
+ }
+ }
+ }
+
+ private boolean isProcessEntity(AtlasEntity entity) {
+ String typeName = entity.getTypeName();
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+
+ return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
+ }
+
+ private String getQueryForTraversalDirection(ExportService.TraversalDirection direction) {
+ switch (direction) {
+ case INWARD:
+ return this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE);
+
+ default:
+ case OUTWARD:
+ return this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE);
+ }
+ }
+
+ private List<Map<String, Object>> executeGremlinQuery(String query, ExportService.ExportContext context) {
+ try {
+ return (List<Map<String, Object>>) atlasGraph.executeGremlinScript(scriptEngine, bindings, query, false);
+ } catch (ScriptException e) {
+ LOG.error("Script execution failed for query: ", query, e);
+ return null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
index a355297..5ec62e5 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
@@ -26,29 +26,37 @@ import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
import org.apache.atlas.repository.util.UniqueList;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils;
+import org.testng.ITestContext;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
+import org.testng.annotations.DataProvider;
import javax.inject.Inject;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
+import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ExportIncrementalTest extends ExportImportTestBase {
@@ -62,15 +70,30 @@ public class ExportIncrementalTest extends ExportImportTestBase {
ExportService exportService;
@Inject
+ private ImportService importService;
+
+ @Inject
private AtlasEntityStoreV1 entityStore;
private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental";
private final String EXPORT_REQUEST_CONNECTED = "export-connected";
+ private AtlasClassificationType classificationTypeT1;
private long nextTimestamp;
+ private static final String EXPORT_INCREMENTAL = "incremental";
+ private static final String QUALIFIED_NAME_DB = "db_test_1@02052019";
+ private static final String QUALIFIED_NAME_TABLE_LINEAGE = "db_test_1.test_tbl_ctas_2@02052019";
+
+
+ private static final String GUID_DB = "f0b72ab4-7452-4e42-ac74-2aee7728cce4";
+ private static final String GUID_TABLE_2 = "8d0b834c-61ce-42d8-8f66-6fa51c36bccb";
+ private static final String GUID_TABLE_CTAS_2 = "eaec545b-3ac7-4e1b-a497-bd4a2b6434a2";
+
@BeforeClass
public void setup() throws IOException, AtlasBaseException {
basicSetup(typeDefStore, typeRegistry);
+ classificationTypeT1 = createNewClassification();
+
createEntities(entityStore, ENTITIES_SUB_DIR, new String[] { "db", "table-columns"});
final Object[] entityGuids = new Object[]{DB_GUID, TABLE_GUID};
verifyCreatedEntities(entityStore, entityGuids, 2);
@@ -108,8 +131,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() throws AtlasBaseException {
final int expectedEntityCount = 1;
- AtlasClassificationType ct = createNewClassification();
- entityStore.addClassifications(TABLE_GUID, ImmutableList.of(ct.createDefaultValue()));
+ entityStore.addClassifications(TABLE_GUID, ImmutableList.of(classificationTypeT1.createDefaultValue()));
AtlasExportRequest request = getIncrementalRequest(nextTimestamp);
ZipSource source = runExportWithParameters(exportService, request);
@@ -127,7 +149,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
}
private AtlasClassificationType createNewClassification() {
- createTypes(typeDefStore, ENTITIES_SUB_DIR,"typesDef-new-classification");
+ createTypes(typeDefStore, ENTITIES_SUB_DIR,"typesdef-new-classification");
return typeRegistry.getClassificationTypeByName("T1");
}
@@ -151,7 +173,6 @@ public class ExportIncrementalTest extends ExportImportTestBase {
long postUpdateTableEntityTimestamp = tableEntity.getEntity().getUpdateTime().getTime();
assertEquals(preExportTableEntityTimestamp, postUpdateTableEntityTimestamp);
- nextTimestamp = updateTimesampForNextIncrementalExport(source);
}
@Test(dependsOnMethods = "atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn")
@@ -172,6 +193,36 @@ public class ExportIncrementalTest extends ExportImportTestBase {
assertEquals(creationOrder.size(), zipCreationOrder.size());
}
+ @DataProvider(name = "hiveDb")
+ public static Object[][] getData(ITestContext context) throws IOException, AtlasBaseException {
+ return getZipSource("hive_db_lineage.zip");
+ }
+
+ @Test(dataProvider = "hiveDb")
+ public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException {
+ runImportWithNoParameters(importService, zipSource);
+ }
+
+ @Test(dependsOnMethods = "importHiveDb")
+ public void exportTableInrementalConnected() throws AtlasBaseException {
+ ZipSource source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true));
+ verifyExpectedEntities(getFileNames(source), GUID_DB, GUID_TABLE_CTAS_2);
+
+ nextTimestamp = updateTimesampForNextIncrementalExport(source);
+
+ try {
+ source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true));
+ }catch (SkipException e){
+
+ }
+
+ entityStore.addClassifications(GUID_TABLE_CTAS_2, ImmutableList.of(classificationTypeT1.createDefaultValue()));
+
+ source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, true));
+ verifyExpectedEntities(getFileNames(source), GUID_TABLE_CTAS_2);
+ }
+
+
private AtlasExportRequest getIncrementalRequest(long timestamp) {
try {
AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_INCREMENTAL, AtlasExportRequest.class);
@@ -179,7 +230,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
return request;
} catch (IOException e) {
- throw new SkipException(String.format("getIncrementalRequest: '%s' could not be laoded.", EXPORT_REQUEST_INCREMENTAL));
+ throw new SkipException(String.format("getIncrementalRequest: '%s' could not be loaded.", EXPORT_REQUEST_INCREMENTAL));
}
}
@@ -187,8 +238,46 @@ public class ExportIncrementalTest extends ExportImportTestBase {
try {
return TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_CONNECTED, AtlasExportRequest.class);
} catch (IOException e) {
- throw new SkipException(String.format("getIncrementalRequest: '%s' could not be laoded.", EXPORT_REQUEST_CONNECTED));
+ throw new SkipException(String.format("getIncrementalRequest: '%s' could not be loaded.", EXPORT_REQUEST_CONNECTED));
}
}
+ private AtlasExportRequest getExportRequestForHiveTable(String name, String fetchType, long changeMarker, boolean skipLineage) {
+ AtlasExportRequest request = new AtlasExportRequest();
+
+ List<AtlasObjectId> itemsToExport = new ArrayList<>();
+ itemsToExport.add(new AtlasObjectId("hive_table", "qualifiedName", name));
+ request.setItemsToExport(itemsToExport);
+ request.setOptions(getOptionsMap(fetchType, changeMarker, skipLineage));
+
+ return request;
+ }
+
+ private Map<String, Object> getOptionsMap(String fetchType, long changeMarker, boolean skipLineage){
+ Map<String, Object> optionsMap = new HashMap<>();
+ optionsMap.put("fetchType", fetchType.isEmpty() ? "full" : fetchType );
+ optionsMap.put( "changeMarker", changeMarker);
+ optionsMap.put("skipLineage", skipLineage);
+
+ return optionsMap;
+ }
+
+ private void verifyExpectedEntities(List<String> fileNames, String... guids){
+ assertEquals(fileNames.size(), guids.length);
+ for (String guid : guids) {
+ assertTrue(fileNames.contains(guid.toLowerCase()));
+ }
+ }
+
+ private List<String> getFileNames(ZipSource zipSource){
+ List<String> ret = new ArrayList<>();
+ assertTrue(zipSource.hasNext());
+
+ while (zipSource.hasNext()){
+ AtlasEntity atlasEntity = zipSource.next();
+ assertNotNull(atlasEntity);
+ ret.add(atlasEntity.getGuid());
+ }
+ return ret;
+ }
}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
index 99d88eb..4615266 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
@@ -23,7 +23,6 @@ import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
-import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
import org.apache.atlas.repository.util.UniqueList;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
@@ -64,7 +63,7 @@ public class IncrementalExportEntityProviderTest extends ExportImportTestBase {
verifyCreatedEntities(entityStore, entityGuids, 2);
gremlinScriptEngine = atlasGraph.getGremlinScriptEngine();
- incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, gremlinScriptEngine);
+ incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph);
}
@AfterClass
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
index 868b732..5e909e1 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
@@ -32,7 +32,6 @@ import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
-import org.apache.atlas.repository.store.graph.v1.EntityGraphMapper;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
@@ -144,6 +143,14 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
assertReplicationAttribute(Constants.ATTR_NAME_REPLICATED_FROM);
}
+ @Test
+ public void replKeyGuidFinder() {
+ String expectedDBQualifiedName = "stocks_base@cl1";
+
+ assertEquals(AuditsWriter.ReplKeyGuidFinder.extractHiveDBQualifiedName("stocks_base.stocks_daily.volume@cl1"), expectedDBQualifiedName);
+ assertEquals(AuditsWriter.ReplKeyGuidFinder.extractHiveDBQualifiedName("stocks_base.stocks_daily@cl1"), expectedDBQualifiedName);
+ }
+
private void assertReplicationAttribute(String attrNameReplication) throws AtlasBaseException {
pauseForIndexCreation();
AtlasEntity.AtlasEntitiesWithExtInfo entities = entityStore.getByIds(ImmutableList.of(DB_GUID, TABLE_GUID));
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/StartEntityFetchByExportRequestTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/StartEntityFetchByExportRequestTest.java
new file mode 100644
index 0000000..84407ad
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/StartEntityFetchByExportRequestTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.util.AtlasGremlin3QueryProvider;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.atlas.repository.impexp.StartEntityFetchByExportRequest.BINDING_PARAMETER_ATTR_NAME;
+import static org.apache.atlas.repository.impexp.StartEntityFetchByExportRequest.BINDING_PARAMETER_TYPENAME;
+import static org.apache.atlas.repository.impexp.StartEntityFetchByExportRequest.BINDING_PARAMTER_ATTR_VALUE;
+import static org.testng.Assert.assertEquals;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class StartEntityFetchByExportRequestTest extends ExportImportTestBase {
+
+ @Inject
+ private AtlasGraph atlasGraph;
+
+ @Inject
+ private AtlasTypeRegistry typeRegistry;
+
+ @Inject
+ private AtlasTypeDefStore typeDefStore;
+
+ private AtlasGremlin3QueryProvider atlasGremlin3QueryProvider;
+ private StartEntityFetchByExportRequestSpy startEntityFetchByExportRequestSpy;
+
+ private class StartEntityFetchByExportRequestSpy extends StartEntityFetchByExportRequest {
+ String generatedQuery;
+ Map<String, Object> suppliedBindingsMap;
+
+ public StartEntityFetchByExportRequestSpy(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
+ super(atlasGraph, typeRegistry, atlasGremlin3QueryProvider);
+ }
+
+ @Override
+ List<String> executeGremlinQuery(String query, Map<String, Object> bindings) {
+ this.generatedQuery = query;
+ this.suppliedBindingsMap = bindings;
+
+ return Collections.EMPTY_LIST;
+ }
+
+ public String getGeneratedQuery() {
+ return generatedQuery;
+ }
+
+ public Map<String, Object> getSuppliedBindingsMap() {
+ return suppliedBindingsMap;
+ }
+ }
+
+ @BeforeClass void setup() throws IOException, AtlasBaseException {
+ super.basicSetup(typeDefStore, typeRegistry);
+ atlasGremlin3QueryProvider = new AtlasGremlin3QueryProvider();
+ startEntityFetchByExportRequestSpy = new StartEntityFetchByExportRequestSpy(atlasGraph, typeRegistry);
+ }
+
+ @Test
+ public void fetchTypeGuid() {
+ String exportRequestJson = "{ \"itemsToExport\": [ { \"typeName\": \"hive_db\", \"guid\": \"111-222-333\" } ]}";
+ AtlasExportRequest exportRequest = AtlasType.fromJson(exportRequestJson, AtlasExportRequest.class);
+
+ List<AtlasObjectId> objectGuidMap = startEntityFetchByExportRequestSpy.get(exportRequest);
+
+ assertEquals(objectGuidMap.get(0).getGuid(), "111-222-333");
+ }
+
+ @Test
+ public void fetchTypeUniqueAttributes() {
+ String exportRequestJson = "{ \"itemsToExport\": [ { \"typeName\": \"hive_db\", \"uniqueAttributes\": {\"qualifiedName\": \"stocks@cl1\"} } ]}";
+ AtlasExportRequest exportRequest = AtlasType.fromJson(exportRequestJson, AtlasExportRequest.class);
+
+ startEntityFetchByExportRequestSpy.get(exportRequest);
+ assertEquals(startEntityFetchByExportRequestSpy.getGeneratedQuery(), startEntityFetchByExportRequestSpy.getQueryTemplateForMatchType(exportRequest.getMatchTypeOptionValue()));
+ assertEquals(startEntityFetchByExportRequestSpy.getSuppliedBindingsMap().get(BINDING_PARAMETER_TYPENAME), "hive_db");
+ assertEquals(startEntityFetchByExportRequestSpy.getSuppliedBindingsMap().get(BINDING_PARAMETER_ATTR_NAME), "Referenceable.qualifiedName");
+ assertEquals(startEntityFetchByExportRequestSpy.getSuppliedBindingsMap().get(BINDING_PARAMTER_ATTR_VALUE), "stocks@cl1");
+ }
+}
\ No newline at end of file