You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2018/11/01 23:01:52 UTC
[06/50] [abbrv] atlas git commit: ATLAS-2843: AtlasClient updates for
exportData and importData.
ATLAS-2843: AtlasClient updates for exportData and importData.
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/20aa9be0
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/20aa9be0
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/20aa9be0
Branch: refs/heads/branch-1.0
Commit: 20aa9be0af63e2c0d5022202fb9014ed861de86d
Parents: 0cd5bc5
Author: Ashutosh Mestry <am...@hortonworks.com>
Authored: Mon Aug 27 10:32:19 2018 -0700
Committer: Ashutosh Mestry <am...@hortonworks.com>
Committed: Thu Nov 1 15:42:54 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/atlas/AtlasBaseClient.java | 28 +--
.../repository/impexp/ImportTransforms.java | 170 +++++++++++--------
.../repository/impexp/ImportTransformsTest.java | 7 +
.../web/resources/AdminExportImportTestIT.java | 19 +--
4 files changed, 134 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/20aa9be0/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java
----------------------------------------------------------------------
diff --git a/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java b/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java
index c247902..ca772a7 100644
--- a/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java
+++ b/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java
@@ -59,10 +59,10 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
-import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.URI;
@@ -385,7 +385,7 @@ public abstract class AtlasBaseClient {
}
try {
if(api.getProduces().equals(MediaType.APPLICATION_OCTET_STREAM)) {
- return (T) IOUtils.toByteArray(clientResponse.getEntityInputStream());
+ return (T) clientResponse.getEntityInputStream();
} else if (responseType.getRawClass().equals(ObjectNode.class)) {
String stringEntity = clientResponse.getEntity(String.class);
try {
@@ -404,8 +404,6 @@ public abstract class AtlasBaseClient {
}
} catch (ClientHandlerException e) {
throw new AtlasServiceException(api, e);
- } catch (IOException e) {
- throw new AtlasServiceException(api, e);
}
} else if (clientResponse.getStatus() != ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) {
break;
@@ -467,9 +465,9 @@ public abstract class AtlasBaseClient {
return configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_RETRIES_KEY, AtlasBaseClient.DEFAULT_NUM_RETRIES);
}
- public byte[] exportData(AtlasExportRequest request) throws AtlasServiceException {
+ public InputStream exportData(AtlasExportRequest request) throws AtlasServiceException {
try {
- return (byte[]) callAPI(EXPORT, Object.class, request);
+ return (InputStream) callAPI(EXPORT, Object.class, request);
} catch (Exception e) {
LOG.error("error writing to file", e);
throw new AtlasServiceException(e);
@@ -479,14 +477,22 @@ public abstract class AtlasBaseClient {
public void exportData(AtlasExportRequest request, String absolutePath) throws AtlasServiceException {
OutputStream fileOutputStream = null;
try {
- byte[] fileBytes = exportData(request);
+ InputStream inputStream = exportData(request);
fileOutputStream = new FileOutputStream(new File(absolutePath));
- IOUtils.write(fileBytes, fileOutputStream);
+ byte[] buffer = new byte[8 * 1024];
+ int bytesRead;
+ while ((bytesRead = inputStream.read(buffer)) != -1) {
+ fileOutputStream.write(buffer, 0, bytesRead);
+ }
+
+ IOUtils.closeQuietly(inputStream);
+ IOUtils.closeQuietly(fileOutputStream);
+
} catch (Exception e) {
LOG.error("error writing to file", e);
throw new AtlasServiceException(e);
} finally {
- if(fileOutputStream != null) {
+ if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
@@ -502,9 +508,9 @@ public abstract class AtlasBaseClient {
new FileDataBodyPart(IMPORT_DATA_PARAMETER, new File(absoluteFilePath)));
}
- public AtlasImportResult importData(AtlasImportRequest request, byte[] fileData) throws AtlasServiceException {
+ public AtlasImportResult importData(AtlasImportRequest request, InputStream stream) throws AtlasServiceException {
return performImportData(getImportRequestBodyPart(request),
- new StreamDataBodyPart(IMPORT_DATA_PARAMETER, new ByteArrayInputStream(fileData)));
+ new StreamDataBodyPart(IMPORT_DATA_PARAMETER, stream));
}
private AtlasImportResult performImportData(BodyPart requestPart, BodyPart filePart) throws AtlasServiceException {
http://git-wip-us.apache.org/repos/asf/atlas/blob/20aa9be0/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransforms.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransforms.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransforms.java
index 2f27448..72b684b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransforms.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransforms.java
@@ -35,17 +35,16 @@ import java.util.Set;
public class ImportTransforms {
private static final Logger LOG = LoggerFactory.getLogger(ImportTransforms.class);
- private Map<String, Map<String, List<ImportTransformer>>> transforms;
+ private static final String ALL_ATTRIBUTES = "*";
+ private Map<String, Map<String, List<ImportTransformer>>> transforms;
public static ImportTransforms fromJson(String jsonString) {
- ImportTransforms ret = null;
-
- if (StringUtils.isNotBlank(jsonString)) {
- ret = new ImportTransforms(jsonString);
+ if (StringUtils.isEmpty(jsonString)) {
+ return null;
}
- return ret;
+ return new ImportTransforms(jsonString);
}
public Map<String, Map<String, List<ImportTransformer>>> getTransforms() {
@@ -72,13 +71,15 @@ public class ImportTransforms {
}
public AtlasEntity.AtlasEntityWithExtInfo apply(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
- if (entityWithExtInfo != null) {
- apply(entityWithExtInfo.getEntity());
+ if (entityWithExtInfo == null) {
+ return entityWithExtInfo;
+ }
- if(MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
- for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
- apply(e);
- }
+ apply(entityWithExtInfo.getEntity());
+
+ if(MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
+ for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
+ apply(e);
}
}
@@ -86,30 +87,46 @@ public class ImportTransforms {
}
public AtlasEntity apply(AtlasEntity entity) throws AtlasBaseException {
- if(entity != null) {
- Map<String, List<ImportTransformer>> entityTransforms = getTransforms(entity.getTypeName());
+ if (entity == null) {
+ return entity;
+ }
+
+ Map<String, List<ImportTransformer>> entityTransforms = getTransforms(entity.getTypeName());
+ if (MapUtils.isEmpty(entityTransforms)) {
+ return entity;
+ }
+
+ applyEntitySpecific(entity, entityTransforms);
- if (MapUtils.isNotEmpty(entityTransforms)) {
- for (Map.Entry<String, List<ImportTransformer>> entry : entityTransforms.entrySet()) {
- String attributeName = entry.getKey();
- List<ImportTransformer> attrTransforms = entry.getValue();
+ applyAttributeSpecific(entity, entityTransforms);
- if (!entity.hasAttribute(attributeName)) {
- continue;
- }
+ return entity;
+ }
- Object transformedValue = entity.getAttribute(attributeName);
+ private void applyAttributeSpecific(AtlasEntity entity, Map<String, List<ImportTransformer>> entityTransforms) throws AtlasBaseException {
+ for (Map.Entry<String, List<ImportTransformer>> entry : entityTransforms.entrySet()) {
+ String attributeName = entry.getKey();
+ List<ImportTransformer> attrTransforms = entry.getValue();
- for (ImportTransformer attrTransform : attrTransforms) {
- transformedValue = attrTransform.apply(transformedValue);
- }
+ if (!entity.hasAttribute(attributeName)) {
+ continue;
+ }
- entity.setAttribute(attributeName, transformedValue);
- }
+ Object attributeValue = entity.getAttribute(attributeName);
+ for (ImportTransformer attrTransform : attrTransforms) {
+ attributeValue = attrTransform.apply(attributeValue);
}
+
+ entity.setAttribute(attributeName, attributeValue);
}
+ }
- return entity;
+ private void applyEntitySpecific(AtlasEntity entity, Map<String, List<ImportTransformer>> entityTransforms) throws AtlasBaseException {
+ if(entityTransforms.containsKey(ALL_ATTRIBUTES)) {
+ for (ImportTransformer attrTransform : entityTransforms.get(ALL_ATTRIBUTES)) {
+ attrTransform.apply(entity);
+ }
+ }
}
private ImportTransforms() {
@@ -119,38 +136,58 @@ public class ImportTransforms {
private ImportTransforms(String jsonString) {
this();
- if(jsonString != null) {
- Map typeTransforms = AtlasType.fromJson(jsonString, Map.class);
-
- if (MapUtils.isNotEmpty(typeTransforms)) {
- for (Object key : typeTransforms.keySet()) {
- Object value = typeTransforms.get(key);
- String entityType = (String) key;
- Map<String, Object> attributeTransforms = (Map<String, Object>)value;
-
- if (MapUtils.isNotEmpty(attributeTransforms)) {
- for (Map.Entry<String, Object> e : attributeTransforms.entrySet()) {
- String attributeName = e.getKey();
- List<String> transforms = (List<String>)e.getValue();
-
- if (CollectionUtils.isNotEmpty(transforms)) {
- for (String transform : transforms) {
- ImportTransformer transformers = null;
-
- try {
- transformers = ImportTransformer.getTransformer(transform);
- } catch (AtlasBaseException ex) {
- LOG.error("Error converting string to ImportTransformer: {}", transform, ex);
- }
-
- if (transformers != null) {
- add(entityType, attributeName, transformers);
- }
- }
- }
- }
- }
+ if (StringUtils.isEmpty(jsonString)) {
+ return;
+ }
+
+ Map typeTransforms = AtlasType.fromJson(jsonString, Map.class);
+ if (MapUtils.isEmpty(typeTransforms)) {
+ return;
+ }
+
+ addOuterMap(typeTransforms);
+ }
+
+ private void addOuterMap(Map typeTransforms) {
+ for (Object key : typeTransforms.keySet()) {
+ Object value = typeTransforms.get(key);
+ String entityType = (String) key;
+ Map<String, Object> attributeTransforms = (Map<String, Object>)value;
+
+ if (MapUtils.isEmpty(attributeTransforms)) {
+ continue;
+ }
+
+ addInnerMap(entityType, attributeTransforms);
+ }
+ }
+
+ private void addInnerMap(String entityType, Map<String, Object> attributeTransforms) {
+ for (Map.Entry<String, Object> e : attributeTransforms.entrySet()) {
+ String attributeName = e.getKey();
+ List<String> transforms = (List<String>)e.getValue();
+
+ if (CollectionUtils.isEmpty(transforms)) {
+ continue;
+ }
+
+ addTransforms(entityType, attributeName, transforms);
+ }
+ }
+
+ private void addTransforms(String entityType, String attributeName, List<String> transforms) {
+ for (String transform : transforms) {
+ ImportTransformer transformers = null;
+
+ try {
+ transformers = ImportTransformer.getTransformer(transform);
+ if (transformers == null) {
+ continue;
}
+
+ add(entityType, attributeName, transformers);
+ } catch (AtlasBaseException ex) {
+ LOG.error("Error converting string to ImportTransformer: {}", transform, ex);
}
}
}
@@ -158,21 +195,16 @@ public class ImportTransforms {
private void add(String typeName, String attributeName, ImportTransformer transformer) {
Map<String, List<ImportTransformer>> attrMap;
- if(transforms.containsKey(typeName)) {
- attrMap = transforms.get(typeName);
- } else {
+ if(!transforms.containsKey(typeName)) {
attrMap = new HashMap<>();
transforms.put(typeName, attrMap);
}
- List<ImportTransformer> list;
- if(attrMap.containsKey(attributeName)) {
- list = attrMap.get(attributeName);
- } else {
- list = new ArrayList<>();
- attrMap.put(attributeName, list);
+ attrMap = transforms.get(typeName);
+ if(!attrMap.containsKey(attributeName)) {
+ attrMap.put(attributeName, new ArrayList<ImportTransformer>());
}
- list.add(transformer);
+ attrMap.get(attributeName).add(transformer);
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/20aa9be0/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
index 1e8211a..cd623d0 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
@@ -128,6 +128,7 @@ public class ImportTransformsTest {
t.apply(entity);
+ assertEquals(entity.getClassifications().size(), 0);
assertNotNull(t);
assertEquals(entity.getAttribute(ATTR_NAME_QUALIFIED_NAME), expected_qualifiedName);
}
@@ -145,6 +146,7 @@ public class ImportTransformsTest {
assertNotNull(t);
assertEquals(entity.getAttribute(ATTR_NAME_QUALIFIED_NAME), expected_qualifiedName);
+ assertEquals(entity.getAttribute(HIVE_TABLE_ATTR_SYNC_INFO), new ArrayList<String>() {{ add(expected_syncInfo); }});
}
@@ -161,6 +163,8 @@ public class ImportTransformsTest {
t.apply(entity);
assertNotNull(t);
+ assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_FROM));
+ assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_TO));
}
@Test
@@ -173,6 +177,8 @@ public class ImportTransformsTest {
t.apply(entity);
assertNotNull(t);
+ assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_FROM));
+ assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_TO));
}
@Test
@@ -185,6 +191,7 @@ public class ImportTransformsTest {
t.apply(entity);
assertNotNull(t);
+ assertEquals(entity.getStatus(), AtlasEntity.Status.DELETED);
}
@Test
http://git-wip-us.apache.org/repos/asf/atlas/blob/20aa9be0/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java
index cc5d36b..d156054 100644
--- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java
@@ -32,15 +32,13 @@ import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
-import java.io.ByteArrayInputStream;
+import java.io.FileInputStream;
import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
+import java.io.InputStream;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
public class AdminExportImportTestIT extends BaseResourceIT {
@@ -69,10 +67,10 @@ public class AdminExportImportTestIT extends BaseResourceIT {
final int EXPECTED_CREATION_ORDER_SIZE = 10;
AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(".", EXPORT_REQUEST_FILE, AtlasExportRequest.class);
- byte[] exportedBytes = atlasClientV2.exportData(request);
- assertNotNull(exportedBytes);
+ InputStream exportedStream = atlasClientV2.exportData(request);
+ assertNotNull(exportedStream);
- ZipSource zs = new ZipSource(new ByteArrayInputStream(exportedBytes));
+ ZipSource zs = new ZipSource(exportedStream);
assertNotNull(zs.getExportResult());
assertTrue(zs.getCreationOrder().size() > EXPECTED_CREATION_ORDER_SIZE);
}
@@ -87,14 +85,15 @@ public class AdminExportImportTestIT extends BaseResourceIT {
private void performImport(String fileToImport, AtlasImportRequest request) throws AtlasServiceException {
- byte[] fileBytes = new byte[0];
+ FileInputStream fileInputStream = null;
+
try {
- fileBytes = Files.readAllBytes(Paths.get(TestResourceFileUtils.getTestFilePath(fileToImport)));
+ fileInputStream = new FileInputStream(TestResourceFileUtils.getTestFilePath(fileToImport));
} catch (IOException e) {
assertFalse(true, "Exception: " + e.getMessage());
}
- AtlasImportResult result = atlasClientV2.importData(request, fileBytes);
+ AtlasImportResult result = atlasClientV2.importData(request, fileInputStream);
assertNotNull(result);
assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
assertNotNull(result.getMetrics());