You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2018/11/01 23:02:00 UTC
[14/50] [abbrv] atlas git commit: ATLAS-2874: Include handling of
Atlas Entity Transformers in current Import logic
ATLAS-2874: Include handling of Atlas Entity Transformers in current Import logic
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/e33b8bf1
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/e33b8bf1
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/e33b8bf1
Branch: refs/heads/branch-1.0
Commit: e33b8bf1faf478d8d1ae09eae9969d3012208cbc
Parents: dde9355
Author: Sarath Subramanian <ss...@hortonworks.com>
Authored: Mon Sep 17 21:57:40 2018 -0700
Committer: Ashutosh Mestry <am...@hortonworks.com>
Committed: Thu Nov 1 15:42:55 2018 -0700
----------------------------------------------------------------------
.../atlas/model/impexp/AtlasImportRequest.java | 1 +
.../atlas/repository/impexp/ImportService.java | 50 +++++++++++++++++--
.../atlas/repository/impexp/ZipSource.java | 51 ++++++++++++++++----
3 files changed, 89 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/e33b8bf1/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index 2989fbe..06bc231 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -40,6 +40,7 @@ public class AtlasImportRequest implements Serializable {
private static final long serialVersionUID = 1L;
public static final String TRANSFORMS_KEY = "transforms";
+ public static final String TRANSFORMERS_KEY = "transformers";
public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
private static final String START_POSITION_KEY = "startPosition";
private static final String START_GUID_KEY = "startGuid";
http://git-wip-us.apache.org/repos/asf/atlas/blob/e33b8bf1/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index a88ba2b..a09385e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -19,15 +19,17 @@ package org.apache.atlas.repository.impexp;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.entitytransform.BaseEntityHandler;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AttributeTransform;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.BulkImporter;
import org.apache.atlas.store.AtlasTypeDefStore;
-import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
@@ -40,6 +42,11 @@ import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMERS_KEY;
+import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMS_KEY;
@Component
public class ImportService {
@@ -82,9 +89,12 @@ public class ImportService {
try {
LOG.info("==> import(user={}, from={}, request={})", userName, requestingIP, request);
- String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY) : null;
-
+ String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMS_KEY) : null;
setImportTransform(source, transforms);
+
+ String transformers = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMERS_KEY) : null;
+ setEntityTransformerHandlers(source, transformers);
+
startTimestamp = System.currentTimeMillis();
processTypes(source.getTypesDef(), result);
setStartPosition(request, source);
@@ -121,6 +131,38 @@ public class ImportService {
}
+ private void setEntityTransformerHandlers(ZipSource source, String transformersString) {
+ if (StringUtils.isEmpty(transformersString)) {
+ return;
+ }
+
+ Object transformersObj = AtlasType.fromJson(transformersString, Object.class);
+ List transformers = (transformersObj != null && transformersObj instanceof List) ? (List) transformersObj : null;
+
+ List<AttributeTransform> attributeTransforms = new ArrayList<>();
+
+ if (CollectionUtils.isNotEmpty(transformers)) {
+ for (Object transformer : transformers) {
+ String transformerStr = AtlasType.toJson(transformer);
+ AttributeTransform attributeTransform = AtlasType.fromJson(transformerStr, AttributeTransform.class);
+
+ if (attributeTransform == null) {
+ continue;
+ }
+
+ attributeTransforms.add(attributeTransform);
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(attributeTransforms)) {
+ List<BaseEntityHandler> entityHandlers = BaseEntityHandler.createEntityHandlers(attributeTransforms);
+
+ if (CollectionUtils.isNotEmpty(entityHandlers)) {
+ source.setEntityHandlers(entityHandlers);
+ }
+ }
+ }
+
private void debugLog(String s, Object... params) {
if(!LOG.isDebugEnabled()) return;
@@ -148,7 +190,7 @@ public class ImportService {
try {
LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName);
- String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY) : null;
+ String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMS_KEY) : null;
File file = new File(fileName);
ZipSource source = new ZipSource(new ByteArrayInputStream(FileUtils.readFileToByteArray(file)), ImportTransforms.fromJson(transforms));
result = run(source, request, userName, hostName, requestingIP);
http://git-wip-us.apache.org/repos/asf/atlas/blob/e33b8bf1/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
index 60cd9f8..1f436ce 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
@@ -17,6 +17,7 @@
*/
package org.apache.atlas.repository.impexp;
+import org.apache.atlas.entitytransform.BaseEntityHandler;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
@@ -24,6 +25,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.type.AtlasType;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,12 +44,13 @@ import java.util.zip.ZipInputStream;
public class ZipSource implements EntityImportStream {
private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class);
- private final InputStream inputStream;
- private List<String> creationOrder;
- private Iterator<String> iterator;
- private Map<String, String> guidEntityJsonMap;
- private ImportTransforms importTransform;
- private int currentPosition;
+ private final InputStream inputStream;
+ private List<String> creationOrder;
+ private Iterator<String> iterator;
+ private Map<String, String> guidEntityJsonMap;
+ private ImportTransforms importTransform;
+ private List<BaseEntityHandler> entityHandlers;
+ private int currentPosition;
public ZipSource(InputStream inputStream) throws IOException {
this(inputStream, null);
@@ -68,6 +71,14 @@ public class ZipSource implements EntityImportStream {
this.importTransform = importTransform;
}
+ public List<BaseEntityHandler> getEntityHandlers() {
+ return entityHandlers;
+ }
+
+ public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
+ this.entityHandlers = entityHandlers;
+ }
+
public AtlasTypesDef getTypesDef() throws AtlasBaseException {
final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString();
@@ -123,17 +134,39 @@ public class ZipSource implements EntityImportStream {
return this.creationOrder;
}
- public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException {
+ public AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException {
String s = getFromCache(guid);
- AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, s);
+ AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntityWithExtInfo.class, s);
- if (importTransform != null) {
+ if (entityHandlers != null) {
+ applyTransformers(entityWithExtInfo);
+ } else if (importTransform != null) {
entityWithExtInfo = importTransform.apply(entityWithExtInfo);
}
return entityWithExtInfo;
}
+ private void applyTransformers(AtlasEntityWithExtInfo entityWithExtInfo) {
+ if (entityWithExtInfo == null) {
+ return;
+ }
+
+ transform(entityWithExtInfo.getEntity());
+
+ if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
+ for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
+ transform(e);
+ }
+ }
+ }
+
+ private void transform(AtlasEntity e) {
+ for (BaseEntityHandler handler : entityHandlers) {
+ handler.transform(e);
+ }
+ }
+
private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException {
T t;
try {