You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2021/05/13 22:24:09 UTC
[atlas] branch branch-2.0 updated: ATLAS-4274: Non matching
relation are created via bulk import
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 1310598 ATLAS-4274: Non matching relation are created via bulk import
1310598 is described below
commit 1310598af79ea3a815972cde18d50be252337856
Author: sidmishra <si...@cloudera.com>
AuthorDate: Sat May 8 09:52:23 2021 -0700
ATLAS-4274: Non matching relation are created via bulk import
Signed-off-by: Sarath Subramanian <sa...@apache.org>
(cherry picked from commit 821b4c441153505db7f0ba9c5670623c527af989)
---
.../org/apache/atlas/glossary/GlossaryService.java | 3 +
.../apache/atlas/glossary/GlossaryTermUtils.java | 205 ++++++++++++++++++---
2 files changed, 185 insertions(+), 23 deletions(-)
diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java
index 73217de..9c84598 100644
--- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java
+++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryService.java
@@ -1118,6 +1118,7 @@ public class GlossaryService {
}
}
+ @GraphTransaction
public BulkImportResponse importGlossaryData(InputStream inputStream, String fileName) throws AtlasBaseException {
BulkImportResponse ret = new BulkImportResponse();
@@ -1161,6 +1162,8 @@ public class GlossaryService {
private void updateGlossaryTermsRelation(List<AtlasGlossaryTerm> glossaryTerms, BulkImportResponse bulkImportResponse) {
for (AtlasGlossaryTerm glossaryTerm : glossaryTerms) {
+ glossaryTermUtils.updateGlossaryTermRelations(glossaryTerm);
+
if (glossaryTerm.hasTerms()) {
String glossaryTermName = glossaryTerm.getName();
String glossaryName = getGlossaryName(glossaryTerm);
diff --git a/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java b/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java
index 553d3d0..fa02c8e 100644
--- a/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java
+++ b/repository/src/main/java/org/apache/atlas/glossary/GlossaryTermUtils.java
@@ -66,7 +66,9 @@ public class GlossaryTermUtils extends GlossaryUtils {
private static final int INDEX_FOR_GLOSSARY_AT_RECORD = 0;
private static final int INDEX_FOR_TERM_AT_RECORD = 1;
- private static final ThreadLocal<Map<String, String>> glossaryNameGuidCacheForImport = ThreadLocal.withInitial(() -> new LinkedHashMap<>());
+ private static final ThreadLocal<Map<String, String>> glossaryNameGuidCache = ThreadLocal.withInitial(() -> new LinkedHashMap<>());
+ private static final ThreadLocal<Map<String, Integer>> glossaryTermOrderCache = ThreadLocal.withInitial(() -> new HashMap<>());
+ private static final ThreadLocal<Map<String, String>> glossaryTermQNameGuidCache = ThreadLocal.withInitial(() -> new HashMap<>());
protected GlossaryTermUtils(AtlasRelationshipStore relationshipStore, AtlasTypeRegistry typeRegistry, DataAccess dataAccess) {
super(relationshipStore, typeRegistry, dataAccess);
@@ -150,7 +152,9 @@ public class GlossaryTermUtils extends GlossaryUtils {
}
public void clearImportCache() {
- glossaryNameGuidCacheForImport.get().clear();
+ glossaryNameGuidCache.get().clear();
+ glossaryTermOrderCache.get().clear();
+ glossaryTermQNameGuidCache.get().clear();
}
private boolean isRelationshipGuidSame(AtlasRelatedObjectId storeObject, AtlasRelatedObjectId relatedObjectId) {
@@ -560,7 +564,7 @@ public class GlossaryTermUtils extends GlossaryUtils {
} else {
glossaryName = record[INDEX_FOR_GLOSSARY_AT_RECORD];
- String glossaryGuid = glossaryNameGuidCacheForImport.get().get(glossaryName);
+ String glossaryGuid = glossaryNameGuidCache.get().get(glossaryName);
if (StringUtils.isEmpty(glossaryGuid)) {
glossaryGuid = getGlossaryGUIDFromGraphDB(glossaryName);
@@ -569,13 +573,15 @@ public class GlossaryTermUtils extends GlossaryUtils {
glossaryGuid = createGlossary(glossaryName, failedTermMsgs);
}
- glossaryNameGuidCacheForImport.get().put(glossaryName, glossaryGuid);
+ glossaryNameGuidCache.get().put(glossaryName, glossaryGuid);
}
if (StringUtils.isNotEmpty(glossaryGuid)) {
glossaryTerm = populateGlossaryTermObject(failedTermMsgs, record, glossaryGuid, false);
- glossaryTerm.setQualifiedName(getGlossaryTermQualifiedName(glossaryTerm.getName(), glossaryGuid));
+ glossaryTerm.setQualifiedName(getGlossaryTermQualifiedName(glossaryTerm.getName(), glossaryName));
+
+ glossaryTermOrderCache.get().put(glossaryTerm.getQualifiedName(), rowCount);
glossaryTerms.add(glossaryTerm);
}
@@ -604,12 +610,12 @@ public class GlossaryTermUtils extends GlossaryUtils {
if (ArrayUtils.isNotEmpty(record) && StringUtils.isNotBlank(record[INDEX_FOR_GLOSSARY_AT_RECORD])) {
AtlasGlossaryTerm glossaryTerm = new AtlasGlossaryTerm();
String glossaryName = record[INDEX_FOR_GLOSSARY_AT_RECORD];
- String glossaryGuid = glossaryNameGuidCacheForImport.get().get(glossaryName);
+ String glossaryGuid = glossaryNameGuidCache.get().get(glossaryName);
if (StringUtils.isNotEmpty(glossaryGuid)) {
glossaryTerm = populateGlossaryTermObject(failedTermMsgs, record, glossaryGuid, true);
- glossaryTerm.setQualifiedName(getGlossaryTermQualifiedName(glossaryTerm.getName(), glossaryGuid));
+ glossaryTerm.setQualifiedName(getGlossaryTermQualifiedName(glossaryTerm.getName(), glossaryName));
glossaryTerms.add(glossaryTerm);
}
@@ -656,6 +662,19 @@ public class GlossaryTermUtils extends GlossaryUtils {
return String.join(", ", ret);
}
+ public void updateGlossaryTermRelations(AtlasGlossaryTerm updatedGlossaryTerm) {
+ if (glossaryTermQNameGuidCache.get().containsKey(updatedGlossaryTerm.getQualifiedName())) {
+ try {
+ AtlasGlossaryTerm glossaryTermFromDB = getGlossaryTem(glossaryTermQNameGuidCache.get().get(updatedGlossaryTerm.getQualifiedName()));
+ copyRelations(updatedGlossaryTerm, glossaryTermFromDB);
+ } catch (AtlasBaseException e) {
+ if (DEBUG_ENABLED) {
+ LOG.debug("Error occurred while loading glossary Term", e);
+ }
+ }
+ }
+ }
+
protected Map getMapValue(String csvRecord, List<String> failedTermMsgs, boolean populateRelations) {
Map ret = null;
@@ -702,21 +721,28 @@ public class GlossaryTermUtils extends GlossaryUtils {
AtlasVertex vertex = null;
String dataArray[] = data.split(FileUtils.ESCAPE_CHARACTER + FileUtils.COLON_CHARACTER);
- if ((dataArray.length % 2) == 0) {
+ if (dataArray.length == 2) {
+ String relatedTermQualifiedName = dataArray[1] + invalidNameChars[0] + dataArray[0];
+ String currTermQualifiedName = termName + invalidNameChars[0] + glossaryName;
+
vertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(GlossaryUtils.ATLAS_GLOSSARY_TERM_TYPENAME,
- GlossaryUtils.ATLAS_GLOSSARY_TERM_TYPENAME + invalidNameChars[1] + QUALIFIED_NAME_ATTR, dataArray[1] + invalidNameChars[0] + dataArray[0]);
- } else {
- failedTermMsgs.add("Either incorrect data specified for Term or Term does not exist : " +termName);
- }
+ GlossaryUtils.ATLAS_GLOSSARY_TERM_TYPENAME + invalidNameChars[1] + QUALIFIED_NAME_ATTR, relatedTermQualifiedName);
+
+ if (vertex != null) {
+ String glossaryTermGuid = AtlasGraphUtilsV2.getIdFromVertex(vertex);
- if (vertex != null) {
- String glossaryTermGuid = AtlasGraphUtilsV2.getIdFromVertex(vertex);
- relatedTermHeader = new AtlasRelatedTermHeader();
- relatedTermHeader.setTermGuid(glossaryTermGuid);
- ret.add(relatedTermHeader);
+ relatedTermHeader = new AtlasRelatedTermHeader();
+ relatedTermHeader.setTermGuid(glossaryTermGuid);
+
+ cacheRelatedTermQNameGuid(currTermQualifiedName, relatedTermQualifiedName, glossaryTermGuid);
+
+ ret.add(relatedTermHeader);
+ } else {
+ failedTermMsgs.add("The provided Reference " + dataArray[1] + "@" + dataArray[0] +
+ " does not exist at Atlas referred at record with TermName : " + termName + " and GlossaryName : " + glossaryName);
+ }
} else {
- failedTermMsgs.add("The provided Reference " + dataArray[1] + "@" + dataArray[0] +
- " does not exist at Atlas referred at record with TermName : " + termName + " and GlossaryName : " + glossaryName);
+ failedTermMsgs.add("Incorrect relation data specified for the term : " + termName + "@" + glossaryName);
}
}
}
@@ -759,6 +785,8 @@ public class GlossaryTermUtils extends GlossaryUtils {
ret.setSynonyms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[INDEX_FOR_GLOSSARY_AT_RECORD], failedTermMsgList) : null);
+ ret.setReplacedBy((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[INDEX_FOR_GLOSSARY_AT_RECORD], failedTermMsgList) : null);
+
ret.setValidValues((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[INDEX_FOR_GLOSSARY_AT_RECORD], failedTermMsgList) : null);
ret.setReplacementTerms((length > ++i) ? (Set<AtlasRelatedTermHeader>) getAtlasRelatedTermHeaderSet(record[i], ret.getName(), record[INDEX_FOR_GLOSSARY_AT_RECORD], failedTermMsgList) : null);
@@ -782,10 +810,8 @@ public class GlossaryTermUtils extends GlossaryUtils {
return ret;
}
- private String getGlossaryTermQualifiedName(String glossaryTermName, String glossaryGuid) throws AtlasBaseException {
- AtlasGlossary glossary = dataAccess.load(getGlossarySkeleton(glossaryGuid));
-
- return glossaryTermName + "@" + glossary.getQualifiedName();
+ private String getGlossaryTermQualifiedName(String glossaryTermName, String glossaryName) throws AtlasBaseException {
+ return glossaryTermName + "@" + glossaryName;
}
private String getGlossaryGUIDFromGraphDB(String glossaryName) {
@@ -812,4 +838,137 @@ public class GlossaryTermUtils extends GlossaryUtils {
return ret;
}
+
+ private void cacheRelatedTermQNameGuid(String currTermQualifiedName, String relatedTermQualifiedName, String termGuid) {
+ if (!glossaryTermQNameGuidCache.get().containsKey(relatedTermQualifiedName) &&
+ glossaryTermOrderCache.get().containsKey(currTermQualifiedName) &&
+ glossaryTermOrderCache.get().containsKey(relatedTermQualifiedName) &&
+ glossaryTermOrderCache.get().get(currTermQualifiedName) < glossaryTermOrderCache.get().get(relatedTermQualifiedName)) {
+ glossaryTermQNameGuidCache.get().put(relatedTermQualifiedName, termGuid);
+ }
+ }
+
+ private AtlasGlossaryTerm getGlossaryTem(String termGuid) throws AtlasBaseException {
+ AtlasGlossaryTerm ret = null;
+
+ if (DEBUG_ENABLED) {
+ LOG.debug("==> GlossaryTemUtils.getGlossaryTem({})", termGuid);
+ }
+
+ if (!Objects.isNull(termGuid)) {
+ AtlasGlossaryTerm atlasGlossary = getAtlasGlossaryTermSkeleton(termGuid);
+ ret = dataAccess.load(atlasGlossary);
+
+ if (DEBUG_ENABLED) {
+ LOG.debug("<== GlossaryTemUtils.getGlossaryTem() : {}", ret);
+ }
+ }
+ return ret;
+ }
+
+ private void copyRelations(AtlasGlossaryTerm toGlossaryTerm, AtlasGlossaryTerm fromGlossaryTerm) {
+ if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getSeeAlso())) {
+ if (CollectionUtils.isNotEmpty(toGlossaryTerm.getSeeAlso())) {
+ toGlossaryTerm.getSeeAlso().addAll(fromGlossaryTerm.getSeeAlso());
+ } else {
+ toGlossaryTerm.setSeeAlso(fromGlossaryTerm.getSeeAlso());
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getSynonyms())) {
+ if (CollectionUtils.isNotEmpty(toGlossaryTerm.getSynonyms())) {
+ toGlossaryTerm.getSynonyms().addAll(fromGlossaryTerm.getSynonyms());
+ } else {
+ toGlossaryTerm.setSynonyms(fromGlossaryTerm.getSynonyms());
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getAntonyms())) {
+ if (CollectionUtils.isNotEmpty(toGlossaryTerm.getAntonyms())) {
+ toGlossaryTerm.getAntonyms().addAll(fromGlossaryTerm.getAntonyms());
+ } else {
+ toGlossaryTerm.setAntonyms(fromGlossaryTerm.getAntonyms());
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getPreferredTerms())) {
+ if (CollectionUtils.isNotEmpty(toGlossaryTerm.getPreferredTerms())) {
+ toGlossaryTerm.getPreferredTerms().addAll(fromGlossaryTerm.getPreferredTerms());
+ } else {
+ toGlossaryTerm.setPreferredTerms(fromGlossaryTerm.getPreferredTerms());
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getPreferredToTerms())) {
+ if (CollectionUtils.isNotEmpty(toGlossaryTerm.getPreferredToTerms())) {
+ toGlossaryTerm.getPreferredToTerms().addAll(fromGlossaryTerm.getPreferredToTerms());
+ } else {
+ toGlossaryTerm.setPreferredToTerms(fromGlossaryTerm.getPreferredToTerms());
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getReplacementTerms())) {
+ if (CollectionUtils.isNotEmpty(toGlossaryTerm.getReplacementTerms())) {
+ toGlossaryTerm.getReplacementTerms().addAll(fromGlossaryTerm.getReplacementTerms());
+ } else {
+ toGlossaryTerm.setReplacementTerms(fromGlossaryTerm.getReplacementTerms());
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getReplacedBy())) {
+ if (CollectionUtils.isNotEmpty(toGlossaryTerm.getReplacedBy())) {
+ toGlossaryTerm.getReplacedBy().addAll(fromGlossaryTerm.getReplacedBy());
+ } else {
+ toGlossaryTerm.setReplacedBy(fromGlossaryTerm.getReplacedBy());
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getTranslationTerms())) {
+ if (CollectionUtils.isNotEmpty(toGlossaryTerm.getTranslationTerms())) {
+ toGlossaryTerm.getTranslationTerms().addAll(fromGlossaryTerm.getTranslationTerms());
+ } else {
+ toGlossaryTerm.setTranslationTerms(fromGlossaryTerm.getTranslationTerms());
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getTranslatedTerms())) {
+ if (CollectionUtils.isNotEmpty(toGlossaryTerm.getTranslatedTerms())) {
+ toGlossaryTerm.getTranslatedTerms().addAll(fromGlossaryTerm.getTranslatedTerms());
+ } else {
+ toGlossaryTerm.setTranslatedTerms(fromGlossaryTerm.getTranslatedTerms());
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getIsA())) {
+ if (CollectionUtils.isNotEmpty(toGlossaryTerm.getIsA())) {
+ toGlossaryTerm.getIsA().addAll(fromGlossaryTerm.getIsA());
+ } else {
+ toGlossaryTerm.setIsA(fromGlossaryTerm.getIsA());
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getClassifies())) {
+ if (CollectionUtils.isNotEmpty(toGlossaryTerm.getClassifies())) {
+ toGlossaryTerm.getClassifies().addAll(fromGlossaryTerm.getClassifies());
+ } else {
+ toGlossaryTerm.setClassifies(fromGlossaryTerm.getClassifies());
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getValidValues())) {
+ if (CollectionUtils.isNotEmpty(toGlossaryTerm.getValidValues())) {
+ toGlossaryTerm.getValidValues().addAll(fromGlossaryTerm.getValidValues());
+ } else {
+ toGlossaryTerm.setValidValues(fromGlossaryTerm.getValidValues());
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(fromGlossaryTerm.getValidValuesFor())) {
+ if (CollectionUtils.isNotEmpty(toGlossaryTerm.getValidValuesFor())) {
+ toGlossaryTerm.getValidValuesFor().addAll(fromGlossaryTerm.getValidValuesFor());
+ } else {
+ toGlossaryTerm.setValidValuesFor(fromGlossaryTerm.getValidValuesFor());
+ }
+ }
+ }
}