You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/12/10 18:52:49 UTC
[atlas] 02/02: ATLAS-4024: Export Service: Export of terms.
This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 31125c58c8a6d2edf0bd7de4e31c44e18adc9a7f
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Wed Dec 9 21:52:59 2020 -0800
ATLAS-4024: Export Service: Export of terms.
Signed-off-by: Ashutosh Mestry <am...@cloudera.com>
---
.../atlas/repository/impexp/ExportService.java | 43 ++++++++++++++++++++--
.../repository/impexp/ExportTypeProcessor.java | 40 +++++++++++++++++++-
2 files changed, 79 insertions(+), 4 deletions(-)
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 8af2057..65d7a18 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.impexp;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.glossary.GlossaryService;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
@@ -38,6 +39,7 @@ import org.apache.atlas.repository.util.UniqueList;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -65,14 +67,17 @@ public class ExportService {
private final EntityGraphRetriever entityGraphRetriever;
private ExportTypeProcessor exportTypeProcessor;
private final HdfsPathEntityCreator hdfsPathEntityCreator;
+ private final GlossaryService glossaryService;
@Inject
public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph graph,
- AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) {
+ AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator,
+ GlossaryService glossaryService) {
this.typeRegistry = typeRegistry;
this.entityGraphRetriever = new EntityGraphRetriever(graph, this.typeRegistry);
this.auditsWriter = auditsWriter;
this.hdfsPathEntityCreator = hdfsPathEntityCreator;
+ this.glossaryService = glossaryService;
this.startEntityFetchByExportRequest = new StartEntityFetchByExportRequest(graph, typeRegistry, AtlasGremlinQueryProvider.INSTANCE);
this.entitiesExtractor = new EntitiesExtractor(graph, typeRegistry);
}
@@ -84,7 +89,7 @@ public class ExportService {
hostName, startTime, getCurrentChangeMarker());
ExportContext context = new ExportContext(result, exportSink);
- exportTypeProcessor = new ExportTypeProcessor(typeRegistry);
+ exportTypeProcessor = new ExportTypeProcessor(typeRegistry, glossaryService);
try {
LOG.info("==> export(user={}, from={})", userName, requestingIP);
@@ -264,8 +269,12 @@ public class ExportService {
}
public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
- addEntity(entityWithExtInfo, context);
exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
+ if (MapUtils.isNotEmpty(context.termsGlossary)) {
+ addGlossaryEntities(context);
+ }
+
+ addEntity(entityWithExtInfo, context);
context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
entitiesExtractor.get(entityWithExtInfo.getEntity(), context);
@@ -280,6 +289,28 @@ public class ExportService {
}
}
+ private void addGlossaryEntities(ExportContext context) {
+ try {
+ for (String termGuid : context.termsGlossary.keySet()) {
+ try {
+ String glossaryGuid = context.termsGlossary.get(termGuid);
+ if (!context.sink.hasEntity(glossaryGuid)) {
+ AtlasEntity glossary = entityGraphRetriever.toAtlasEntity(glossaryGuid);
+ addEntity(new AtlasEntityWithExtInfo(glossary), context);
+ }
+
+ if (!context.sink.hasEntity(termGuid)) {
+ AtlasEntity term = entityGraphRetriever.toAtlasEntity(termGuid);
+ addEntity(new AtlasEntityWithExtInfo(term), context);
+ }
+ } catch (AtlasBaseException exception) {
+ LOG.error("Error fetching Glossary for term: {}", termGuid);
+ }
+ }
+ } finally {
+ context.clearTerms();
+ }
+ }
private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
if(context.sink.hasEntity(entityWithExtInfo.getEntity().getGuid())) {
@@ -355,6 +386,8 @@ public class ExportService {
final Set<String> enumTypes = new HashSet<>();
final Set<String> relationshipTypes = new HashSet<>();
final Set<String> businessMetadataTypes = new HashSet<>();
+ final Map<String, String> termsGlossary = new HashMap<>();
+
final AtlasExportResult result;
private final ZipSink sink;
@@ -471,5 +504,9 @@ public class ExportService {
public void addToEntityCreationOrder(String guid) {
entityCreationOrder.add(guid);
}
+
+ public void clearTerms() {
+ termsGlossary.clear();
+ }
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java
index a85db5c..186b4b0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java
@@ -19,9 +19,13 @@
package org.apache.atlas.repository.impexp;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.glossary.GlossaryService;
import org.apache.atlas.model.TypeCategory;
+import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBusinessMetadataType;
@@ -37,15 +41,21 @@ import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
class ExportTypeProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ExportTypeProcessor.class);
+ private static final String RELATIONSHIP_ATTR_MEANINGS = "meanings";
private AtlasTypeRegistry typeRegistry;
+ private GlossaryService glossaryService;
- ExportTypeProcessor(AtlasTypeRegistry typeRegistry) {
+ ExportTypeProcessor(AtlasTypeRegistry typeRegistry, GlossaryService glossaryService) {
this.typeRegistry = typeRegistry;
+ this.glossaryService = glossaryService;
}
public void addTypes(AtlasEntity entity, ExportService.ExportContext context) {
@@ -56,6 +66,34 @@ class ExportTypeProcessor {
addClassificationType(c.getTypeName(), context);
}
}
+
+ addTerms(entity, context);
+ }
+
+ private void addTerms(AtlasEntity entity, ExportService.ExportContext context) {
+ Object relAttrMeanings = entity.getRelationshipAttribute(RELATIONSHIP_ATTR_MEANINGS);
+ if (relAttrMeanings == null || !(relAttrMeanings instanceof List)) {
+ return;
+ }
+
+ List list = (List) relAttrMeanings;
+ if (CollectionUtils.isEmpty(list)) {
+ return;
+ }
+
+ for (Object objectId : list) {
+ if (objectId instanceof AtlasRelatedObjectId) {
+ AtlasRelatedObjectId termObjectId = (AtlasRelatedObjectId) objectId;
+
+ try {
+ AtlasGlossaryTerm term = glossaryService.getTerm(termObjectId.getGuid());
+ context.termsGlossary.put(termObjectId.getGuid(), term.getAnchor().getGlossaryGuid());
+ }
+ catch (AtlasBaseException e) {
+ LOG.warn("Error fetching term details: {}", termObjectId);
+ }
+ }
+ }
}
private void addType(String typeName, ExportService.ExportContext context) {