You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2019/03/05 14:36:30 UTC
[atlas] 05/07: ATLAS-3046: Classification Updater tool. Unique name
used.
This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch branch-1.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 3a8cda4d2da7b1bd08c8233cc9d7c5076314dfc9
Author: Ashutosh Mestry <am...@hortonworks.com>
AuthorDate: Thu Feb 7 22:17:18 2019 -0800
ATLAS-3046: Classification Updater tool. Unique name used.
---
.../org/apache/atlas/tools/BulkFetchAndUpdate.java | 99 ++++++++++++++++------
1 file changed, 71 insertions(+), 28 deletions(-)
diff --git a/tools/classification-updater/src/main/java/org/apache/atlas/tools/BulkFetchAndUpdate.java b/tools/classification-updater/src/main/java/org/apache/atlas/tools/BulkFetchAndUpdate.java
index 1e0b66d..19f8325 100644
--- a/tools/classification-updater/src/main/java/org/apache/atlas/tools/BulkFetchAndUpdate.java
+++ b/tools/classification-updater/src/main/java/org/apache/atlas/tools/BulkFetchAndUpdate.java
@@ -28,6 +28,8 @@ import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasEntityHeaders;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasJson;
@@ -300,6 +302,7 @@ public class BulkFetchAndUpdate {
private static final String ATTR_NAME_QUALIFIED_NAME = "qualifiedName";
private AtlasClientV2 atlasClientV2;
+ private Map<String, String> typeNameUniqueAttributeNameMap = new HashMap<>();
public Preparer(AtlasClientV2 atlasClientV2) {
this.atlasClientV2 = atlasClientV2;
@@ -322,7 +325,7 @@ public class BulkFetchAndUpdate {
try {
classificationDef.setGuid(null);
String json = AtlasType.toJson(classificationDef);
- fileWriter.write(json + "\n");
+ fileWriter.write(json);
} catch (Exception e) {
LOG.error("Error writing classifications: {}", e);
displayCrLf("Error writing classifications.");
@@ -348,11 +351,11 @@ public class BulkFetchAndUpdate {
}
try {
- AtlasEntityHeaders response = atlasClientV2.getEntityHeaders(fromTimestamp);
- int guidHeaderMapSize = response.getGuidHeaderMap().size();
+ AtlasEntityHeaders entityHeaders = atlasClientV2.getEntityHeaders(fromTimestamp);
+ int guidHeaderMapSize = entityHeaders.getGuidHeaderMap().size();
try {
displayCrLf("Read entities: " + guidHeaderMapSize);
- AtlasEntityHeaders updatedHeaders = removeEntityGuids(response);
+ AtlasEntityHeaders updatedHeaders = removeEntityGuids(entityHeaders);
fileWriter.write(AtlasType.toJson(updatedHeaders));
displayCrLf("Writing entities: " + updatedHeaders.getGuidHeaderMap().size());
@@ -368,12 +371,17 @@ public class BulkFetchAndUpdate {
}
private AtlasEntityHeaders removeEntityGuids(AtlasEntityHeaders headers) {
- Map<String, AtlasEntityHeader> qualifiedNameHeaderMap = new HashMap<>();
+ Map<String, AtlasEntityHeader> uniqueNameEntityHeaderMap = new HashMap<>();
for (AtlasEntityHeader header : headers.getGuidHeaderMap().values()) {
- String qualifiedName = getQualifiedName(header);
- displayCrLf("Processing: " + qualifiedName);
+ String uniqueName = getUniqueName(header);
+ if (StringUtils.isEmpty(uniqueName)) {
+ displayCrLf("UniqueName is empty. Ignoring: " + header.getGuid());
+ LOG.warn("UniqueName is empty. Ignoring: {}", AtlasJson.toJson(header));
+ continue;
+ }
+ displayCrLf("Processing: " + uniqueName);
if (header.getStatus() == DELETED) {
continue;
}
@@ -383,37 +391,72 @@ public class BulkFetchAndUpdate {
continue;
}
- boolean keyFound = qualifiedNameHeaderMap.containsKey(qualifiedName);
+ String key = String.format("%s:%s", header.getTypeName(), uniqueName);
+ boolean keyFound = uniqueNameEntityHeaderMap.containsKey(key);
if (!keyFound) {
- qualifiedNameHeaderMap.put(qualifiedName, header);
+ uniqueNameEntityHeaderMap.put(key, header);
}
- AtlasEntityHeader currentHeader = qualifiedNameHeaderMap.get(qualifiedName);
- for (AtlasClassification c : header.getClassifications()) {
- c.setEntityGuid(null);
-
- if (keyFound) {
- boolean found =
- currentHeader.getClassifications().stream().anyMatch(ox -> ox.getTypeName().equals(c.getTypeName()));
- if (!found) {
- currentHeader.getClassifications().add(c);
- } else {
- displayCrLf("Ignoring: " + c.toString());
- LOG.warn("Ignoring: {}", AtlasJson.toJson(c));
- }
+ updateClassificationsForHeader(header, uniqueNameEntityHeaderMap.get(key), keyFound);
+ displayCrLf("Processing: " + uniqueName);
+ }
+
+ displayCrLf("Processed: " + uniqueNameEntityHeaderMap.size());
+ headers.setGuidHeaderMap(uniqueNameEntityHeaderMap);
+ return headers;
+ }
+
+ private void updateClassificationsForHeader(AtlasEntityHeader header, AtlasEntityHeader currentHeader, boolean keyFound) {
+ for (AtlasClassification c : header.getClassifications()) {
+ c.setEntityGuid(null);
+
+ if (keyFound) {
+ boolean found =
+ currentHeader.getClassifications().stream().anyMatch(ox -> ox.getTypeName().equals(c.getTypeName()));
+ if (!found) {
+ currentHeader.getClassifications().add(c);
+ } else {
+ displayCrLf("Ignoring: " + c.toString());
+ LOG.warn("Ignoring: {}", AtlasJson.toJson(c));
}
}
+ }
+ }
- displayCrLf("Processing: " + qualifiedName);
+ private String getUniqueName(AtlasEntityHeader header) {
+ String uniqueAttributeName = ATTR_NAME_QUALIFIED_NAME;
+ if (!header.getAttributes().containsKey(ATTR_NAME_QUALIFIED_NAME)) {
+ uniqueAttributeName = getUniqueAttribute(header.getTypeName());
}
- displayCrLf("Processed: " + qualifiedNameHeaderMap.size());
- headers.setGuidHeaderMap(qualifiedNameHeaderMap);
- return headers;
+ Object attrValue = header.getAttribute(uniqueAttributeName);
+ if (attrValue == null) {
+ LOG.warn("Unique Attribute Value: empty: {}", AtlasJson.toJson(header));
+ return StringUtils.EMPTY;
+ }
+
+ return attrValue.toString();
}
- private String getQualifiedName(AtlasEntityHeader header) {
- return (String) header.getAttribute(ATTR_NAME_QUALIFIED_NAME);
+ private String getUniqueAttribute(String typeName) {
+ try {
+ if (typeNameUniqueAttributeNameMap.containsKey(typeName)) {
+ return typeNameUniqueAttributeNameMap.get(typeName);
+ }
+
+ AtlasEntityDef entityDef = atlasClientV2.getEntityDefByName(typeName);
+ for (AtlasStructDef.AtlasAttributeDef ad : entityDef.getAttributeDefs()) {
+ if (ad.getIsUnique()) {
+ typeNameUniqueAttributeNameMap.put(typeName, ad.getName());
+ return ad.getName();
+ }
+ }
+ } catch (AtlasServiceException e) {
+ LOG.error("Error fetching type: {}", typeName, e);
+ return null;
+ }
+
+ return null;
}
private List<AtlasClassificationDef> getAllClassificationsDefs() throws Exception {