You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/12/10 18:52:49 UTC

[atlas] 02/02: ATLAS-4024: Export Service: Export of terms.

This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 31125c58c8a6d2edf0bd7de4e31c44e18adc9a7f
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Wed Dec 9 21:52:59 2020 -0800

    ATLAS-4024: Export Service: Export of terms.
    
    Signed-off-by: Ashutosh Mestry <am...@cloudera.com>
---
 .../atlas/repository/impexp/ExportService.java     | 43 ++++++++++++++++++++--
 .../repository/impexp/ExportTypeProcessor.java     | 40 +++++++++++++++++++-
 2 files changed, 79 insertions(+), 4 deletions(-)

diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 8af2057..65d7a18 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.impexp;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.glossary.GlossaryService;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.instance.AtlasEntity;
@@ -38,6 +39,7 @@ import org.apache.atlas.repository.util.UniqueList;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.util.AtlasGremlinQueryProvider;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -65,14 +67,17 @@ public class ExportService {
     private final EntityGraphRetriever      entityGraphRetriever;
     private       ExportTypeProcessor       exportTypeProcessor;
     private final HdfsPathEntityCreator     hdfsPathEntityCreator;
+    private final GlossaryService           glossaryService;
 
     @Inject
     public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph graph,
-                         AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) {
+                         AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator,
+                         GlossaryService glossaryService) {
         this.typeRegistry         = typeRegistry;
         this.entityGraphRetriever = new EntityGraphRetriever(graph, this.typeRegistry);
         this.auditsWriter         = auditsWriter;
         this.hdfsPathEntityCreator = hdfsPathEntityCreator;
+        this.glossaryService = glossaryService;
         this.startEntityFetchByExportRequest = new StartEntityFetchByExportRequest(graph, typeRegistry, AtlasGremlinQueryProvider.INSTANCE);
         this.entitiesExtractor = new EntitiesExtractor(graph, typeRegistry);
     }
@@ -84,7 +89,7 @@ public class ExportService {
                 hostName, startTime, getCurrentChangeMarker());
 
         ExportContext context = new ExportContext(result, exportSink);
-        exportTypeProcessor = new ExportTypeProcessor(typeRegistry);
+        exportTypeProcessor = new ExportTypeProcessor(typeRegistry, glossaryService);
 
         try {
             LOG.info("==> export(user={}, from={})", userName, requestingIP);
@@ -264,8 +269,12 @@ public class ExportService {
     }
 
     public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
-        addEntity(entityWithExtInfo, context);
         exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
+        if (MapUtils.isNotEmpty(context.termsGlossary)) {
+            addGlossaryEntities(context);
+        }
+
+        addEntity(entityWithExtInfo, context);
 
         context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
         entitiesExtractor.get(entityWithExtInfo.getEntity(), context);
@@ -280,6 +289,28 @@ public class ExportService {
         }
     }
 
+    private void addGlossaryEntities(ExportContext context) {
+        try {
+            for (String termGuid : context.termsGlossary.keySet()) {
+                try {
+                    String glossaryGuid = context.termsGlossary.get(termGuid);
+                    if (!context.sink.hasEntity(glossaryGuid)) {
+                        AtlasEntity glossary = entityGraphRetriever.toAtlasEntity(glossaryGuid);
+                        addEntity(new AtlasEntityWithExtInfo(glossary), context);
+                    }
+
+                    if (!context.sink.hasEntity(termGuid)) {
+                        AtlasEntity term = entityGraphRetriever.toAtlasEntity(termGuid);
+                        addEntity(new AtlasEntityWithExtInfo(term), context);
+                    }
+                } catch (AtlasBaseException exception) {
+                    LOG.error("Error fetching Glossary for term: {}", termGuid);
+                }
+            }
+        } finally {
+            context.clearTerms();
+        }
+    }
 
     private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException {
         if(context.sink.hasEntity(entityWithExtInfo.getEntity().getGuid())) {
@@ -355,6 +386,8 @@ public class ExportService {
         final Set<String>                     enumTypes           = new HashSet<>();
         final Set<String>                     relationshipTypes   = new HashSet<>();
         final Set<String>                     businessMetadataTypes = new HashSet<>();
+        final Map<String, String>             termsGlossary      = new HashMap<>();
+
         final AtlasExportResult               result;
         private final ZipSink                 sink;
 
@@ -471,5 +504,9 @@ public class ExportService {
         public void addToEntityCreationOrder(String guid) {
             entityCreationOrder.add(guid);
         }
+
+        public void clearTerms() {
+            termsGlossary.clear();
+        }
     }
 }
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java
index a85db5c..186b4b0 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportTypeProcessor.java
@@ -19,9 +19,13 @@
 package org.apache.atlas.repository.impexp;
 
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.glossary.GlossaryService;
 import org.apache.atlas.model.TypeCategory;
+import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.typedef.AtlasStructDef;
 import org.apache.atlas.type.AtlasArrayType;
 import org.apache.atlas.type.AtlasBusinessMetadataType;
@@ -37,15 +41,21 @@ import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 class ExportTypeProcessor {
     private static final Logger LOG = LoggerFactory.getLogger(ExportTypeProcessor.class);
+    private static final String RELATIONSHIP_ATTR_MEANINGS = "meanings";
 
     private AtlasTypeRegistry typeRegistry;
+    private GlossaryService glossaryService;
 
-    ExportTypeProcessor(AtlasTypeRegistry typeRegistry) {
+    ExportTypeProcessor(AtlasTypeRegistry typeRegistry, GlossaryService glossaryService) {
         this.typeRegistry = typeRegistry;
+        this.glossaryService = glossaryService;
     }
 
     public void addTypes(AtlasEntity entity, ExportService.ExportContext context) {
@@ -56,6 +66,34 @@ class ExportTypeProcessor {
                 addClassificationType(c.getTypeName(), context);
             }
         }
+
+        addTerms(entity, context);
+    }
+
+    private void addTerms(AtlasEntity entity, ExportService.ExportContext context) {
+        Object relAttrMeanings = entity.getRelationshipAttribute(RELATIONSHIP_ATTR_MEANINGS);
+        if (relAttrMeanings == null || !(relAttrMeanings instanceof List)) {
+            return;
+        }
+
+        List list = (List) relAttrMeanings;
+        if (CollectionUtils.isEmpty(list)) {
+            return;
+        }
+
+        for (Object objectId : list) {
+            if (objectId instanceof AtlasRelatedObjectId) {
+                AtlasRelatedObjectId termObjectId = (AtlasRelatedObjectId) objectId;
+
+                try {
+                    AtlasGlossaryTerm term = glossaryService.getTerm(termObjectId.getGuid());
+                    context.termsGlossary.put(termObjectId.getGuid(), term.getAnchor().getGlossaryGuid());
+                }
+                catch (AtlasBaseException e) {
+                    LOG.warn("Error fetching term details: {}", termObjectId);
+                }
+            }
+        }
     }
 
     private void addType(String typeName, ExportService.ExportContext context) {