You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2021/05/13 22:23:09 UTC

[atlas] branch master updated: ATLAS-4274: Non matching relation are created via bulk import

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 821b4c4  ATLAS-4274: Non matching relation are created via bulk import
821b4c4 is described below

commit 821b4c441153505db7f0ba9c5670623c527af989
Author: sidmishra <si...@cloudera.com>
AuthorDate: Sat May 8 09:52:23 2021 -0700

    ATLAS-4274: Non matching relation are created via bulk import
    
    Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
 .../org/apache/atlas/glossary/GlossaryService.java |   3 +
 .../apache/atlas/glossary/GlossaryTermUtils.java   | 205 ++++++++++++++++++---
 2 files changed, 185 insertions(+), 23 deletions(-)

diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java
index 73217de..9c84598 100644
--- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java
+++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java
@@ -1118,6 +1118,7 @@ public class GlossaryService {
         }
     }
 
+    @GraphTransaction
     public BulkImportResponse importGlossaryData(InputStream inputStream, String fileName) throws AtlasBaseException {
         BulkImportResponse ret = new BulkImportResponse();
 
@@ -1161,6 +1162,8 @@ public class GlossaryService {
 
     private void updateGlossaryTermsRelation(List<AtlasGlossaryTerm> glossaryTerms, BulkImportResponse bulkImportResponse) {
         for (AtlasGlossaryTerm glossaryTerm : glossaryTerms) {
+            glossaryTermUtils.updateGlossaryTermRelations(glossaryTerm);
+
             if (glossaryTerm.hasTerms()) {
                 String glossaryTermName = glossaryTerm.getName();
                 String glossaryName     = getGlossaryName(glossaryTerm);
diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java
index 553d3d0..fa02c8e 100644
--- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java
+++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java
@@ -66,7 +66,9 @@ public class GlossaryTermUtils extends GlossaryUtils {
     private static final int INDEX_FOR_GLOSSARY_AT_RECORD = 0;
     private static final int INDEX_FOR_TERM_AT_RECORD     = 1;
 
-    private static final ThreadLocal<Map<String, String>> glossaryNameGuidCacheForImport = ThreadLocal.withInitial(() -> new LinkedHashMap<>());
+    private static final ThreadLocal<Map<String, String>>  glossaryNameGuidCache      = ThreadLocal.withInitial(() -> new LinkedHashMap<>());
+    private static final ThreadLocal<Map<String, Integer>> glossaryTermOrderCache     = ThreadLocal.withInitial(() -> new HashMap<>());
+    private static final ThreadLocal<Map<String, String>>  glossaryTermQNameGuidCache = ThreadLocal.withInitial(() -> new HashMap<>());
 
     protected GlossaryTermUtils(AtlasRelationshipStore relationshipStore, AtlasTypeRegistry typeRegistry, DataAccess dataAccess) {
         super(relationshipStore, typeRegistry, dataAccess);
@@ -150,7 +152,9 @@ public class GlossaryTermUtils extends GlossaryUtils {
     }
 
     public void clearImportCache() {
-        glossaryNameGuidCacheForImport.get().clear();
+        glossaryNameGuidCache.get().clear();
+        glossaryTermOrderCache.get().clear();
+        glossaryTermQNameGuidCache.get().clear();
     }
 
     private boolean isRelationshipGuidSame(AtlasRelatedObjectId storeObject, AtlasRelatedObjectId relatedObjectId) {
@@ -560,7 +564,7 @@ public class GlossaryTermUtils extends GlossaryUtils {
             } else {
                 glossaryName = record[INDEX_FOR_GLOSSARY_AT_RECORD];
 
-                String glossaryGuid = glossaryNameGuidCacheForImport.get().get(glossaryName);
+                String glossaryGuid = glossaryNameGuidCache.get().get(glossaryName);
 
                 if (StringUtils.isEmpty(glossaryGuid)) {
                     glossaryGuid = getGlossaryGUIDFromGraphDB(glossaryName);
@@ -569,13 +573,15 @@ public class GlossaryTermUtils extends GlossaryUtils {
                         glossaryGuid = createGlossary(glossaryName, failedTermMsgs);
                     }
 
-                    glossaryNameGuidCacheForImport.get().put(glossaryName, glossaryGuid);
+                    glossaryNameGuidCache.get().put(glossaryName, glossaryGuid);
                 }
 
                 if (StringUtils.isNotEmpty(glossaryGuid)) {
                     glossaryTerm = populateGlossaryTermObject(failedTermMsgs, record, glossaryGuid, false);
 
-                    glossaryTerm.setQualifiedName(getGlossaryTermQualifiedName(glossaryTerm.getName(), glossaryGuid));
+                    glossaryTerm.setQualifiedName(getGlossaryTermQualifiedName(glossaryTerm.getName(), glossaryName));
+
+                    glossaryTermOrderCache.get().put(glossaryTerm.getQualifiedName(), rowCount);
 
                     glossaryTerms.add(glossaryTerm);
                 }
@@ -604,12 +610,12 @@ public class GlossaryTermUtils extends GlossaryUtils {
             if (ArrayUtils.isNotEmpty(record) && StringUtils.isNotBlank(record[INDEX_FOR_GLOSSARY_AT_RECORD])) {
                 AtlasGlossaryTerm glossaryTerm = new AtlasGlossaryTerm();
                 String            glossaryName = record[INDEX_FOR_GLOSSARY_AT_RECORD];
-                String            glossaryGuid = glossaryNameGuidCacheForImport.get().get(glossaryName);
+                String            glossaryGuid = glossaryNameGuidCache.get().get(glossaryName);
 
                 if (StringUtils.isNotEmpty(glossaryGuid)) {
                     glossaryTerm = populateGlossaryTermObject(failedTermMsgs, record, glossaryGuid, true);
 
-                    glossaryTerm.setQualifiedName(getGlossaryTermQualifiedName(glossaryTerm.getName(), glossaryGuid));
+                    glossaryTerm.setQualifiedName(getGlossaryTermQualifiedName(glossaryTerm.getName(), glossaryName));
 
                     glossaryTerms.add(glossaryTerm);
                 }
@@ -656,6 +662,19 @@ public class GlossaryTermUtils extends GlossaryUtils {
         return String.join(", ", ret);
     }
 
+    public void updateGlossaryTermRelations(AtlasGlossaryTerm updatedGlossaryTerm) {
+        if (glossaryTermQNameGuidCache.get().containsKey(updatedGlossaryTerm.getQualifiedName())) {
+            try {
+                AtlasGlossaryTerm glossaryTermFromDB = getGlossaryTem(glossaryTermQNameGuidCache.get().get(updatedGlossaryTerm.getQualifiedName()));
+                copyRelations(updatedGlossaryTerm, glossaryTermFromDB);
+            } catch (AtlasBaseException e) {
+                if (DEBUG_ENABLED) {
+                    LOG.debug("Error occurred while loading glossary Term", e);
+                }
+            }
+        }
+    }
+
     protected Map getMapValue(String csvRecord, List<String> failedTermMsgs, boolean populateRelations) {
         Map ret = null;
 
@@ -702,21 +721,28 @@ public class GlossaryTermUtils extends GlossaryUtils {
                 AtlasVertex vertex      = null;
                 String      dataArray[] = data.split(FileUtils.ESCAPE_CHARACTER + FileUtils.COLON_CHARACTER);
 
-                if ((dataArray.length % 2) == 0) {
+                if (dataArray.length == 2) {
+                    String relatedTermQualifiedName = dataArray[1] + invalidNameChars[0] + dataArray[0];
+                    String currTermQualifiedName    = termName + invalidNameChars[0] + glossaryName;
+
                     vertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(GlossaryUtils.ATLAS_GLOSSARY_TERM_TYPENAME,
-                            GlossaryUtils.ATLAS_GLOSSARY_TERM_TYPENAME + invalidNameChars[1] + QUALIFIED_NAME_ATTR, dataArray[1] + invalidNameChars[0] + dataArray[0]);
-                } else {
-                    failedTermMsgs.add("Either incorrect data specified for Term or Term does not exist : " +termName);
-                }
+                            GlossaryUtils.ATLAS_GLOSSARY_TERM_TYPENAME + invalidNameChars[1] + QUALIFIED_NAME_ATTR, relatedTermQualifiedName);
+
+                    if (vertex != null) {
+                        String glossaryTermGuid = AtlasGraphUtilsV2.getIdFromVertex(vertex);
 
-                if (vertex != null) {
-                    String glossaryTermGuid = AtlasGraphUtilsV2.getIdFromVertex(vertex);
-                    relatedTermHeader       = new AtlasRelatedTermHeader();
-                    relatedTermHeader.setTermGuid(glossaryTermGuid);
-                    ret.add(relatedTermHeader);
+                        relatedTermHeader       = new AtlasRelatedTermHeader();
+                        relatedTermHeader.setTermGuid(glossaryTermGuid);
+
+                        cacheRelatedTermQNameGuid(currTermQualifiedName, relatedTermQualifiedName, glossaryTermGuid);
+
+                        ret.add(relatedTermHeader);
+                    } else {
+                        failedTermMsgs.add("The provided Reference " + dataArray[1] + "@" + dataArray[0] +
+                                " does not exist at Atlas referred at record with TermName  : " + termName + " and GlossaryName : " + glossaryName);
+                    }
                 } else {
-                    failedTermMsgs.add("The provided Reference " + dataArray[1] + "@" + dataArray[0] +
-                                    " does not exist at Atlas referred at record with TermName  : " + termName + " and GlossaryName : " + glossaryName);
+                    failedTermMsgs.add("Incorrect relation data specified for the term : " + termName + "@" + glossaryName);
                 }
             }
         }
@@ -759,6 +785,8 @@ public class GlossaryTermUtils extends GlossaryUtils {
 
                 ret.setSynonyms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[INDEX_FOR_GLOSSARY_AT_RECORD], failedTermMsgList) : null);
 
+                ret.setReplacedBy((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[INDEX_FOR_GLOSSARY_AT_RECORD], failedTermMsgList) : null);
+
                 ret.setValidValues((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[INDEX_FOR_GLOSSARY_AT_RECORD], failedTermMsgList) : null);
 
                 ret.setReplacementTerms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[INDEX_FOR_GLOSSARY_AT_RECORD], failedTermMsgList) : null);
@@ -782,10 +810,8 @@ public class GlossaryTermUtils extends GlossaryUtils {
         return ret;
     }
 
-    private String getGlossaryTermQualifiedName(String glossaryTermName, String glossaryGuid) throws AtlasBaseException {
-        AtlasGlossary glossary = dataAccess.load(getGlossarySkeleton(glossaryGuid));
-
-        return glossaryTermName + "@" + glossary.getQualifiedName();
+    private String getGlossaryTermQualifiedName(String glossaryTermName, String glossaryName) throws AtlasBaseException {
+        return glossaryTermName + "@" + glossaryName;
     }
 
     private String getGlossaryGUIDFromGraphDB(String glossaryName) {
@@ -812,4 +838,137 @@ public class GlossaryTermUtils extends GlossaryUtils {
 
         return ret;
     }
+
+    private void cacheRelatedTermQNameGuid(String currTermQualifiedName, String relatedTermQualifiedName, String termGuid) {
+        if (!glossaryTermQNameGuidCache.get().containsKey(relatedTermQualifiedName) &&
+                glossaryTermOrderCache.get().containsKey(currTermQualifiedName) &&
+                glossaryTermOrderCache.get().containsKey(relatedTermQualifiedName) &&
+                glossaryTermOrderCache.get().get(currTermQualifiedName) < glossaryTermOrderCache.get().get(relatedTermQualifiedName)) {
+            glossaryTermQNameGuidCache.get().put(relatedTermQualifiedName, termGuid);
+        }
+    }
+
+    private AtlasGlossaryTerm getGlossaryTem(String termGuid) throws AtlasBaseException {
+        AtlasGlossaryTerm ret = null;
+
+        if (DEBUG_ENABLED) {
+            LOG.debug("==> GlossaryTemUtils.getGlossaryTem({})", termGuid);
+        }
+
+        if (!Objects.isNull(termGuid)) {
+            AtlasGlossaryTerm atlasGlossary = getAtlasGlossaryTermSkeleton(termGuid);
+            ret = dataAccess.load(atlasGlossary);
+
+            if (DEBUG_ENABLED) {
+                LOG.debug("<== GlossaryTemUtils.getGlossaryTem() : {}", ret);
+            }
+        }
+        return ret;
+    }
+
+    private void copyRelations(AtlasGlossaryTerm toGlossaryTerm, AtlasGlossaryTerm fromGlossaryTerm) {
+        if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getSeeAlso())) {
+            if (CollectionUtils.isNotEmpty(toGlossaryTerm.getSeeAlso())) {
+                toGlossaryTerm.getSeeAlso().addAll(fromGlossaryTerm.getSeeAlso());
+            } else {
+                toGlossaryTerm.setSeeAlso(fromGlossaryTerm.getSeeAlso());
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getSynonyms())) {
+            if (CollectionUtils.isNotEmpty(toGlossaryTerm.getSynonyms())) {
+                toGlossaryTerm.getSynonyms().addAll(fromGlossaryTerm.getSynonyms());
+            } else {
+                toGlossaryTerm.setSynonyms(fromGlossaryTerm.getSynonyms());
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getAntonyms())) {
+            if (CollectionUtils.isNotEmpty(toGlossaryTerm.getAntonyms())) {
+                toGlossaryTerm.getAntonyms().addAll(fromGlossaryTerm.getAntonyms());
+            } else {
+                toGlossaryTerm.setAntonyms(fromGlossaryTerm.getAntonyms());
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getPreferredTerms())) {
+            if (CollectionUtils.isNotEmpty(toGlossaryTerm.getPreferredTerms())) {
+                toGlossaryTerm.getPreferredTerms().addAll(fromGlossaryTerm.getPreferredTerms());
+            } else {
+                toGlossaryTerm.setPreferredTerms(fromGlossaryTerm.getPreferredTerms());
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getPreferredToTerms())) {
+            if (CollectionUtils.isNotEmpty(toGlossaryTerm.getPreferredToTerms())) {
+                toGlossaryTerm.getPreferredToTerms().addAll(fromGlossaryTerm.getPreferredToTerms());
+            } else {
+                toGlossaryTerm.setPreferredToTerms(fromGlossaryTerm.getPreferredToTerms());
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getReplacementTerms())) {
+            if (CollectionUtils.isNotEmpty(toGlossaryTerm.getReplacementTerms())) {
+                toGlossaryTerm.getReplacementTerms().addAll(fromGlossaryTerm.getReplacementTerms());
+            } else {
+                toGlossaryTerm.setReplacementTerms(fromGlossaryTerm.getReplacementTerms());
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getReplacedBy())) {
+            if (CollectionUtils.isNotEmpty(toGlossaryTerm.getReplacedBy())) {
+                toGlossaryTerm.getReplacedBy().addAll(fromGlossaryTerm.getReplacedBy());
+            } else {
+                toGlossaryTerm.setReplacedBy(fromGlossaryTerm.getReplacedBy());
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getTranslationTerms())) {
+            if (CollectionUtils.isNotEmpty(toGlossaryTerm.getTranslationTerms())) {
+                toGlossaryTerm.getTranslationTerms().addAll(fromGlossaryTerm.getTranslationTerms());
+            } else {
+                toGlossaryTerm.setTranslationTerms(fromGlossaryTerm.getTranslationTerms());
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getTranslatedTerms())) {
+            if (CollectionUtils.isNotEmpty(toGlossaryTerm.getTranslatedTerms())) {
+                toGlossaryTerm.getTranslatedTerms().addAll(fromGlossaryTerm.getTranslatedTerms());
+            } else {
+                toGlossaryTerm.setTranslatedTerms(fromGlossaryTerm.getTranslatedTerms());
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getIsA())) {
+            if (CollectionUtils.isNotEmpty(toGlossaryTerm.getIsA())) {
+                toGlossaryTerm.getIsA().addAll(fromGlossaryTerm.getIsA());
+            } else {
+                toGlossaryTerm.setIsA(fromGlossaryTerm.getIsA());
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getClassifies())) {
+            if (CollectionUtils.isNotEmpty(toGlossaryTerm.getClassifies())) {
+                toGlossaryTerm.getClassifies().addAll(fromGlossaryTerm.getClassifies());
+            } else {
+                toGlossaryTerm.setClassifies(fromGlossaryTerm.getClassifies());
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getValidValues())) {
+            if (CollectionUtils.isNotEmpty(toGlossaryTerm.getValidValues())) {
+                toGlossaryTerm.getValidValues().addAll(fromGlossaryTerm.getValidValues());
+            } else {
+                toGlossaryTerm.setValidValues(fromGlossaryTerm.getValidValues());
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getValidValuesFor())) {
+            if (CollectionUtils.isNotEmpty(toGlossaryTerm.getValidValuesFor())) {
+                toGlossaryTerm.getValidValuesFor().addAll(fromGlossaryTerm.getValidValuesFor());
+            } else {
+                toGlossaryTerm.setValidValuesFor(fromGlossaryTerm.getValidValuesFor());
+            }
+        }
+    }
 }