You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2018/10/11 05:57:43 UTC

atlas git commit: ATLAS-2843: AtlasClient updates for exportData and importData.

Repository: atlas
Updated Branches:
  refs/heads/master 0c3080152 -> c0a91c7e1


ATLAS-2843: AtlasClient updates for exportData and importData.


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/c0a91c7e
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/c0a91c7e
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/c0a91c7e

Branch: refs/heads/master
Commit: c0a91c7e1859c62b6fcb7eced5aed9679df12323
Parents: 0c30801
Author: Ashutosh Mestry <am...@hortonworks.com>
Authored: Mon Aug 27 10:32:19 2018 -0700
Committer: Ashutosh Mestry <am...@hortonworks.com>
Committed: Wed Oct 10 22:54:09 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/atlas/AtlasBaseClient.java  |  28 +--
 .../repository/impexp/ImportTransforms.java     | 170 +++++++++++--------
 .../repository/impexp/ImportTransformsTest.java |   7 +
 .../web/resources/AdminExportImportTestIT.java  |  19 +--
 4 files changed, 134 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/c0a91c7e/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java
----------------------------------------------------------------------
diff --git a/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java b/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java
index c247902..ca772a7 100644
--- a/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java
+++ b/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java
@@ -59,10 +59,10 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
-import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.ConnectException;
 import java.net.URI;
@@ -385,7 +385,7 @@ public abstract class AtlasBaseClient {
                 }
                 try {
                     if(api.getProduces().equals(MediaType.APPLICATION_OCTET_STREAM)) {
-                        return (T) IOUtils.toByteArray(clientResponse.getEntityInputStream());
+                        return (T) clientResponse.getEntityInputStream();
                     } else if (responseType.getRawClass().equals(ObjectNode.class)) {
                         String stringEntity = clientResponse.getEntity(String.class);
                         try {
@@ -404,8 +404,6 @@ public abstract class AtlasBaseClient {
                     }
                 } catch (ClientHandlerException e) {
                     throw new AtlasServiceException(api, e);
-                } catch (IOException e) {
-                    throw new AtlasServiceException(api, e);
                 }
             } else if (clientResponse.getStatus() != ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) {
                 break;
@@ -467,9 +465,9 @@ public abstract class AtlasBaseClient {
         return configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_RETRIES_KEY, AtlasBaseClient.DEFAULT_NUM_RETRIES);
     }
 
-    public byte[] exportData(AtlasExportRequest request) throws AtlasServiceException {
+    public InputStream exportData(AtlasExportRequest request) throws AtlasServiceException {
         try {
-            return (byte[]) callAPI(EXPORT, Object.class, request);
+            return (InputStream) callAPI(EXPORT, Object.class, request);
         } catch (Exception e) {
             LOG.error("error writing to file", e);
             throw new AtlasServiceException(e);
@@ -479,14 +477,22 @@ public abstract class AtlasBaseClient {
     public void exportData(AtlasExportRequest request, String absolutePath) throws AtlasServiceException {
         OutputStream fileOutputStream = null;
         try {
-            byte[] fileBytes = exportData(request);
+            InputStream inputStream = exportData(request);
             fileOutputStream = new FileOutputStream(new File(absolutePath));
-            IOUtils.write(fileBytes, fileOutputStream);
+            byte[] buffer = new byte[8 * 1024];
+            int bytesRead;
+            while ((bytesRead = inputStream.read(buffer)) != -1) {
+                fileOutputStream.write(buffer, 0, bytesRead);
+            }
+
+            IOUtils.closeQuietly(inputStream);
+            IOUtils.closeQuietly(fileOutputStream);
+
         } catch (Exception e) {
             LOG.error("error writing to file", e);
             throw new AtlasServiceException(e);
         } finally {
-            if(fileOutputStream != null) {
+            if (fileOutputStream != null) {
                 try {
                     fileOutputStream.close();
                 } catch (IOException e) {
@@ -502,9 +508,9 @@ public abstract class AtlasBaseClient {
                             new FileDataBodyPart(IMPORT_DATA_PARAMETER, new File(absoluteFilePath)));
     }
 
-    public AtlasImportResult importData(AtlasImportRequest request, byte[] fileData) throws AtlasServiceException {
+    public AtlasImportResult importData(AtlasImportRequest request, InputStream stream) throws AtlasServiceException {
         return performImportData(getImportRequestBodyPart(request),
-                                new StreamDataBodyPart(IMPORT_DATA_PARAMETER, new ByteArrayInputStream(fileData)));
+                                new StreamDataBodyPart(IMPORT_DATA_PARAMETER, stream));
     }
 
     private AtlasImportResult performImportData(BodyPart requestPart, BodyPart filePart) throws AtlasServiceException {

http://git-wip-us.apache.org/repos/asf/atlas/blob/c0a91c7e/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransforms.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransforms.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransforms.java
index 2f27448..72b684b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransforms.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransforms.java
@@ -35,17 +35,16 @@ import java.util.Set;
 public class ImportTransforms {
     private static final Logger LOG = LoggerFactory.getLogger(ImportTransforms.class);
 
-    private Map<String, Map<String, List<ImportTransformer>>> transforms;
+    private static final String ALL_ATTRIBUTES = "*";
 
+    private Map<String, Map<String, List<ImportTransformer>>> transforms;
 
     public static ImportTransforms fromJson(String jsonString) {
-        ImportTransforms ret = null;
-
-        if (StringUtils.isNotBlank(jsonString)) {
-            ret = new ImportTransforms(jsonString);
+        if (StringUtils.isEmpty(jsonString)) {
+            return null;
         }
 
-        return ret;
+        return new ImportTransforms(jsonString);
     }
 
     public Map<String, Map<String, List<ImportTransformer>>> getTransforms() {
@@ -72,13 +71,15 @@ public class ImportTransforms {
     }
 
     public AtlasEntity.AtlasEntityWithExtInfo apply(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
-        if (entityWithExtInfo != null) {
-            apply(entityWithExtInfo.getEntity());
+        if (entityWithExtInfo == null) {
+            return entityWithExtInfo;
+        }
 
-            if(MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
-                for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
-                    apply(e);
-                }
+        apply(entityWithExtInfo.getEntity());
+
+        if(MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
+            for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
+                apply(e);
             }
         }
 
@@ -86,30 +87,46 @@ public class ImportTransforms {
     }
 
     public  AtlasEntity apply(AtlasEntity entity) throws AtlasBaseException {
-        if(entity != null) {
-            Map<String, List<ImportTransformer>> entityTransforms = getTransforms(entity.getTypeName());
+        if (entity == null) {
+            return entity;
+        }
+
+        Map<String, List<ImportTransformer>> entityTransforms = getTransforms(entity.getTypeName());
+        if (MapUtils.isEmpty(entityTransforms)) {
+            return entity;
+        }
+
+        applyEntitySpecific(entity, entityTransforms);
 
-            if (MapUtils.isNotEmpty(entityTransforms)) {
-                for (Map.Entry<String, List<ImportTransformer>> entry : entityTransforms.entrySet()) {
-                    String                   attributeName  = entry.getKey();
-                    List<ImportTransformer> attrTransforms = entry.getValue();
+        applyAttributeSpecific(entity, entityTransforms);
 
-                    if (!entity.hasAttribute(attributeName)) {
-                        continue;
-                    }
+        return entity;
+    }
 
-                    Object transformedValue = entity.getAttribute(attributeName);
+    private void applyAttributeSpecific(AtlasEntity entity, Map<String, List<ImportTransformer>> entityTransforms) throws AtlasBaseException {
+        for (Map.Entry<String, List<ImportTransformer>> entry : entityTransforms.entrySet()) {
+            String                   attributeName  = entry.getKey();
+            List<ImportTransformer> attrTransforms = entry.getValue();
 
-                    for (ImportTransformer attrTransform : attrTransforms) {
-                        transformedValue = attrTransform.apply(transformedValue);
-                    }
+            if (!entity.hasAttribute(attributeName)) {
+                continue;
+            }
 
-                    entity.setAttribute(attributeName, transformedValue);
-                }
+            Object attributeValue = entity.getAttribute(attributeName);
+            for (ImportTransformer attrTransform : attrTransforms) {
+                attributeValue = attrTransform.apply(attributeValue);
             }
+
+            entity.setAttribute(attributeName, attributeValue);
         }
+    }
 
-        return entity;
+    private void applyEntitySpecific(AtlasEntity entity, Map<String, List<ImportTransformer>> entityTransforms) throws AtlasBaseException {
+        if(entityTransforms.containsKey(ALL_ATTRIBUTES)) {
+            for (ImportTransformer attrTransform : entityTransforms.get(ALL_ATTRIBUTES)) {
+                attrTransform.apply(entity);
+            }
+        }
     }
 
     private ImportTransforms() {
@@ -119,38 +136,58 @@ public class ImportTransforms {
     private ImportTransforms(String jsonString) {
         this();
 
-        if(jsonString != null) {
-            Map typeTransforms = AtlasType.fromJson(jsonString, Map.class);
-
-            if (MapUtils.isNotEmpty(typeTransforms)) {
-                for (Object key : typeTransforms.keySet()) {
-                    Object              value               = typeTransforms.get(key);
-                    String              entityType          = (String) key;
-                    Map<String, Object> attributeTransforms = (Map<String, Object>)value;
-
-                    if (MapUtils.isNotEmpty(attributeTransforms)) {
-                        for (Map.Entry<String, Object> e : attributeTransforms.entrySet()) {
-                            String       attributeName = e.getKey();
-                            List<String> transforms    = (List<String>)e.getValue();
-
-                            if (CollectionUtils.isNotEmpty(transforms)) {
-                                for (String transform : transforms) {
-                                    ImportTransformer transformers = null;
-
-                                    try {
-                                        transformers = ImportTransformer.getTransformer(transform);
-                                    } catch (AtlasBaseException ex) {
-                                        LOG.error("Error converting string to ImportTransformer: {}", transform, ex);
-                                    }
-
-                                    if (transformers != null) {
-                                        add(entityType, attributeName, transformers);
-                                    }
-                                }
-                            }
-                        }
-                    }
+        if (StringUtils.isEmpty(jsonString)) {
+            return;
+        }
+
+        Map typeTransforms = AtlasType.fromJson(jsonString, Map.class);
+        if (MapUtils.isEmpty(typeTransforms)) {
+            return;
+        }
+
+        addOuterMap(typeTransforms);
+    }
+
+    private void addOuterMap(Map typeTransforms) {
+        for (Object key : typeTransforms.keySet()) {
+            Object              value               = typeTransforms.get(key);
+            String              entityType          = (String) key;
+            Map<String, Object> attributeTransforms = (Map<String, Object>)value;
+
+            if (MapUtils.isEmpty(attributeTransforms)) {
+                continue;
+            }
+
+            addInnerMap(entityType, attributeTransforms);
+        }
+    }
+
+    private void addInnerMap(String entityType, Map<String, Object> attributeTransforms) {
+        for (Map.Entry<String, Object> e : attributeTransforms.entrySet()) {
+            String       attributeName = e.getKey();
+            List<String> transforms    = (List<String>)e.getValue();
+
+            if (CollectionUtils.isEmpty(transforms)) {
+                continue;
+            }
+
+            addTransforms(entityType, attributeName, transforms);
+        }
+    }
+
+    private void addTransforms(String entityType, String attributeName, List<String> transforms) {
+        for (String transform : transforms) {
+            ImportTransformer transformers = null;
+
+            try {
+                transformers = ImportTransformer.getTransformer(transform);
+                if (transformers == null) {
+                    continue;
                 }
+
+                add(entityType, attributeName, transformers);
+            } catch (AtlasBaseException ex) {
+                LOG.error("Error converting string to ImportTransformer: {}", transform, ex);
             }
         }
     }
@@ -158,21 +195,16 @@ public class ImportTransforms {
     private void add(String typeName, String attributeName, ImportTransformer transformer) {
         Map<String, List<ImportTransformer>> attrMap;
 
-        if(transforms.containsKey(typeName)) {
-            attrMap = transforms.get(typeName);
-        } else {
+        if(!transforms.containsKey(typeName)) {
             attrMap = new HashMap<>();
             transforms.put(typeName, attrMap);
         }
 
-        List<ImportTransformer> list;
-        if(attrMap.containsKey(attributeName)) {
-            list = attrMap.get(attributeName);
-        } else {
-            list = new ArrayList<>();
-            attrMap.put(attributeName, list);
+        attrMap = transforms.get(typeName);
+        if(!attrMap.containsKey(attributeName)) {
+            attrMap.put(attributeName, new ArrayList<ImportTransformer>());
         }
 
-        list.add(transformer);
+        attrMap.get(attributeName).add(transformer);
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/c0a91c7e/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
index 1e8211a..cd623d0 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
@@ -128,6 +128,7 @@ public class ImportTransformsTest {
 
         t.apply(entity);
 
+        assertEquals(entity.getClassifications().size(), 0);
         assertNotNull(t);
         assertEquals(entity.getAttribute(ATTR_NAME_QUALIFIED_NAME), expected_qualifiedName);
     }
@@ -145,6 +146,7 @@ public class ImportTransformsTest {
 
         assertNotNull(t);
         assertEquals(entity.getAttribute(ATTR_NAME_QUALIFIED_NAME), expected_qualifiedName);
+        assertEquals(entity.getAttribute(HIVE_TABLE_ATTR_SYNC_INFO), new ArrayList<String>() {{ add(expected_syncInfo); }});
     }
 
 
@@ -161,6 +163,8 @@ public class ImportTransformsTest {
         t.apply(entity);
 
         assertNotNull(t);
+        assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_FROM));
+        assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_TO));
     }
 
     @Test
@@ -173,6 +177,8 @@ public class ImportTransformsTest {
         t.apply(entity);
 
         assertNotNull(t);
+        assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_FROM));
+        assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_TO));
     }
 
     @Test
@@ -185,6 +191,7 @@ public class ImportTransformsTest {
 
         t.apply(entity);
         assertNotNull(t);
+        assertEquals(entity.getStatus(),  AtlasEntity.Status.DELETED);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/atlas/blob/c0a91c7e/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java
index cc5d36b..d156054 100644
--- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java
@@ -32,15 +32,13 @@ import org.testng.SkipException;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
-import java.io.ByteArrayInputStream;
+import java.io.FileInputStream;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
+import java.io.InputStream;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
 public class AdminExportImportTestIT extends BaseResourceIT {
@@ -69,10 +67,10 @@ public class AdminExportImportTestIT extends BaseResourceIT {
         final int EXPECTED_CREATION_ORDER_SIZE = 10;
 
         AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(".", EXPORT_REQUEST_FILE, AtlasExportRequest.class);
-        byte[] exportedBytes = atlasClientV2.exportData(request);
-        assertNotNull(exportedBytes);
+        InputStream exportedStream = atlasClientV2.exportData(request);
+        assertNotNull(exportedStream);
 
-        ZipSource zs = new ZipSource(new ByteArrayInputStream(exportedBytes));
+        ZipSource zs = new ZipSource(exportedStream);
         assertNotNull(zs.getExportResult());
         assertTrue(zs.getCreationOrder().size() > EXPECTED_CREATION_ORDER_SIZE);
     }
@@ -87,14 +85,15 @@ public class AdminExportImportTestIT extends BaseResourceIT {
 
     private void performImport(String fileToImport, AtlasImportRequest request) throws AtlasServiceException {
 
-        byte[] fileBytes = new byte[0];
+        FileInputStream fileInputStream = null;
+
         try {
-            fileBytes = Files.readAllBytes(Paths.get(TestResourceFileUtils.getTestFilePath(fileToImport)));
+            fileInputStream = new FileInputStream(TestResourceFileUtils.getTestFilePath(fileToImport));
         } catch (IOException e) {
             assertFalse(true, "Exception: " + e.getMessage());
         }
-        AtlasImportResult result = atlasClientV2.importData(request, fileBytes);
 
+        AtlasImportResult result = atlasClientV2.importData(request, fileInputStream);
         assertNotNull(result);
         assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
         assertNotNull(result.getMetrics());