You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2019/03/05 14:36:30 UTC

[atlas] 05/07: ATLAS-3046: Classification Updater tool. Unique name used.

This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch branch-1.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 3a8cda4d2da7b1bd08c8233cc9d7c5076314dfc9
Author: Ashutosh Mestry <am...@hortonworks.com>
AuthorDate: Thu Feb 7 22:17:18 2019 -0800

    ATLAS-3046: Classification Updater tool. Unique name used.
---
 .../org/apache/atlas/tools/BulkFetchAndUpdate.java | 99 ++++++++++++++++------
 1 file changed, 71 insertions(+), 28 deletions(-)

diff --git a/tools/classification-updater/src/main/java/org/apache/atlas/tools/BulkFetchAndUpdate.java b/tools/classification-updater/src/main/java/org/apache/atlas/tools/BulkFetchAndUpdate.java
index 1e0b66d..19f8325 100644
--- a/tools/classification-updater/src/main/java/org/apache/atlas/tools/BulkFetchAndUpdate.java
+++ b/tools/classification-updater/src/main/java/org/apache/atlas/tools/BulkFetchAndUpdate.java
@@ -28,6 +28,8 @@ import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasEntityHeaders;
 import org.apache.atlas.model.typedef.AtlasClassificationDef;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.utils.AtlasJson;
@@ -300,6 +302,7 @@ public class BulkFetchAndUpdate {
         private static final String  ATTR_NAME_QUALIFIED_NAME = "qualifiedName";
 
         private AtlasClientV2 atlasClientV2;
+        private Map<String, String> typeNameUniqueAttributeNameMap = new HashMap<>();
 
         public Preparer(AtlasClientV2 atlasClientV2) {
             this.atlasClientV2 = atlasClientV2;
@@ -322,7 +325,7 @@ public class BulkFetchAndUpdate {
                     try {
                         classificationDef.setGuid(null);
                         String json = AtlasType.toJson(classificationDef);
-                        fileWriter.write(json + "\n");
+                        fileWriter.write(json);
                     } catch (Exception e) {
                         LOG.error("Error writing classifications: {}", e);
                         displayCrLf("Error writing classifications.");
@@ -348,11 +351,11 @@ public class BulkFetchAndUpdate {
             }
 
             try {
-                AtlasEntityHeaders response = atlasClientV2.getEntityHeaders(fromTimestamp);
-                int guidHeaderMapSize = response.getGuidHeaderMap().size();
+                AtlasEntityHeaders entityHeaders = atlasClientV2.getEntityHeaders(fromTimestamp);
+                int guidHeaderMapSize = entityHeaders.getGuidHeaderMap().size();
                 try {
                     displayCrLf("Read entities: " + guidHeaderMapSize);
-                    AtlasEntityHeaders updatedHeaders = removeEntityGuids(response);
+                    AtlasEntityHeaders updatedHeaders = removeEntityGuids(entityHeaders);
                     fileWriter.write(AtlasType.toJson(updatedHeaders));
 
                     displayCrLf("Writing entities: " + updatedHeaders.getGuidHeaderMap().size());
@@ -368,12 +371,17 @@ public class BulkFetchAndUpdate {
         }
 
         private AtlasEntityHeaders removeEntityGuids(AtlasEntityHeaders headers) {
-            Map<String, AtlasEntityHeader> qualifiedNameHeaderMap = new HashMap<>();
+            Map<String, AtlasEntityHeader> uniqueNameEntityHeaderMap = new HashMap<>();
 
             for (AtlasEntityHeader header : headers.getGuidHeaderMap().values()) {
-                String qualifiedName = getQualifiedName(header);
-                displayCrLf("Processing: " + qualifiedName);
+                String uniqueName = getUniqueName(header);
+                if (StringUtils.isEmpty(uniqueName)) {
+                    displayCrLf("UniqueName is empty.  Ignoring: " + header.getGuid());
+                    LOG.warn("UniqueName is empty. Ignoring: {}", AtlasJson.toJson(header));
+                    continue;
+                }
 
+                displayCrLf("Processing: " + uniqueName);
                 if (header.getStatus() == DELETED) {
                     continue;
                 }
@@ -383,37 +391,72 @@ public class BulkFetchAndUpdate {
                     continue;
                 }
 
-                boolean keyFound = qualifiedNameHeaderMap.containsKey(qualifiedName);
+                String key = String.format("%s:%s", header.getTypeName(), uniqueName);
+                boolean keyFound = uniqueNameEntityHeaderMap.containsKey(key);
                 if (!keyFound) {
-                    qualifiedNameHeaderMap.put(qualifiedName, header);
+                    uniqueNameEntityHeaderMap.put(key, header);
                 }
 
-                AtlasEntityHeader currentHeader = qualifiedNameHeaderMap.get(qualifiedName);
-                for (AtlasClassification c : header.getClassifications()) {
-                    c.setEntityGuid(null);
-
-                    if (keyFound) {
-                        boolean found =
-                                currentHeader.getClassifications().stream().anyMatch(ox -> ox.getTypeName().equals(c.getTypeName()));
-                        if (!found) {
-                            currentHeader.getClassifications().add(c);
-                        } else {
-                            displayCrLf("Ignoring: " + c.toString());
-                            LOG.warn("Ignoring: {}", AtlasJson.toJson(c));
-                        }
+                updateClassificationsForHeader(header, uniqueNameEntityHeaderMap.get(key), keyFound);
+                displayCrLf("Processing: " + uniqueName);
+            }
+
+            displayCrLf("Processed: " + uniqueNameEntityHeaderMap.size());
+            headers.setGuidHeaderMap(uniqueNameEntityHeaderMap);
+            return headers;
+        }
+
+        private void updateClassificationsForHeader(AtlasEntityHeader header, AtlasEntityHeader currentHeader, boolean keyFound) {
+            for (AtlasClassification c : header.getClassifications()) {
+                c.setEntityGuid(null);
+
+                if (keyFound) {
+                    boolean found =
+                            currentHeader.getClassifications().stream().anyMatch(ox -> ox.getTypeName().equals(c.getTypeName()));
+                    if (!found) {
+                        currentHeader.getClassifications().add(c);
+                    } else {
+                        displayCrLf("Ignoring: " + c.toString());
+                        LOG.warn("Ignoring: {}", AtlasJson.toJson(c));
                     }
                 }
+            }
+        }
 
-                displayCrLf("Processing: " + qualifiedName);
+        private String getUniqueName(AtlasEntityHeader header) {
+            String uniqueAttributeName = ATTR_NAME_QUALIFIED_NAME;
+            if (!header.getAttributes().containsKey(ATTR_NAME_QUALIFIED_NAME)) {
+                uniqueAttributeName = getUniqueAttribute(header.getTypeName());
             }
 
-            displayCrLf("Processed: " + qualifiedNameHeaderMap.size());
-            headers.setGuidHeaderMap(qualifiedNameHeaderMap);
-            return headers;
+            Object attrValue = header.getAttribute(uniqueAttributeName);
+            if (attrValue == null) {
+                LOG.warn("Unique Attribute Value: empty: {}", AtlasJson.toJson(header));
+                return StringUtils.EMPTY;
+            }
+
+            return attrValue.toString();
         }
 
-        private String getQualifiedName(AtlasEntityHeader header) {
-            return (String) header.getAttribute(ATTR_NAME_QUALIFIED_NAME);
+        private String getUniqueAttribute(String typeName) {
+            try {
+                if (typeNameUniqueAttributeNameMap.containsKey(typeName)) {
+                    return typeNameUniqueAttributeNameMap.get(typeName);
+                }
+
+                AtlasEntityDef entityDef = atlasClientV2.getEntityDefByName(typeName);
+                for (AtlasStructDef.AtlasAttributeDef ad : entityDef.getAttributeDefs()) {
+                    if (ad.getIsUnique()) {
+                        typeNameUniqueAttributeNameMap.put(typeName, ad.getName());
+                        return ad.getName();
+                    }
+                }
+            } catch (AtlasServiceException e) {
+                LOG.error("Error fetching type: {}", typeName, e);
+                return null;
+            }
+
+            return null;
         }
 
         private List<AtlasClassificationDef> getAllClassificationsDefs() throws Exception {