You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/02/27 04:14:54 UTC
incubator-atlas git commit: ATLAS-1503: update export to specify
objects-to-export using attribute value
Repository: incubator-atlas
Updated Branches:
refs/heads/master 0c1d599dd -> 515130ccd
ATLAS-1503: update export to specify objects-to-export using attribute value
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/515130cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/515130cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/515130cc
Branch: refs/heads/master
Commit: 515130ccd09003dbc0bbfd9436629c7d5648b15c
Parents: 0c1d599
Author: ashutoshm <am...@hortonworks.com>
Authored: Fri Feb 24 15:41:44 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Sun Feb 26 20:13:46 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/atlas/AtlasErrorCode.java | 8 +-
.../graph/v1/AtlasEntityChangeNotifier.java | 22 +--
.../store/graph/v1/AtlasEntityStoreV1.java | 2 +-
webapp/pom.xml | 1 -
.../atlas/web/resources/ExportService.java | 176 +++++++++++++++----
.../apache/atlas/web/resources/ZipSource.java | 9 +-
6 files changed, 161 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index 542b659..d58c514 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -20,11 +20,10 @@ package org.apache.atlas;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.ws.rs.core.Response;
import java.text.MessageFormat;
import java.util.Arrays;
-import javax.ws.rs.core.Response;
-
public enum AtlasErrorCode {
NO_SEARCH_RESULTS(204, "ATLAS2041E", "Given search filter {0} did not yield any results"),
@@ -90,7 +89,10 @@ public enum AtlasErrorCode {
DISCOVERY_QUERY_FAILED(500, "ATLAS5004E", "Discovery query failed {0}"),
FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK(500, "ATLAS5005E", "Failed to get the lock; another type update might be in progress. Please try again"),
FAILED_TO_OBTAIN_IMPORT_EXPORT_LOCK(500, "ATLAS5006E", "Another import or export is in progress. Please try again"),
- NOTIFICATION_FAILED(500, "ATLAS5007E", "Failed to notify for change {0}");
+ NOTIFICATION_FAILED(500, "ATLAS5007E", "Failed to notify for change {0}"),
+ GREMLIN_GROOVY_SCRIPT_ENGINE_FAILED(500, "ATLAS5008E", "scriptEngine cannot be initialized for: {0}"),
+ JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED(500, "ATLAS5009E", "ObjectMapper.readValue returned NULL for class: {0}"),
+ GREMLIN_SCRIPT_EXECUTION_FAILED(500, "ATLAS5010E", "Script execution failed for: {0}");
private String errorCode;
private String errorMessage;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
index feada34..4ec2a7c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
@@ -23,18 +23,13 @@ import com.google.inject.Singleton;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
-import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
-import org.apache.atlas.repository.graph.AtlasGraphProvider;
-import org.apache.atlas.repository.graph.DeleteHandler;
-import org.apache.atlas.repository.graph.FullTextMapper;
-import org.apache.atlas.repository.graph.GraphHelper;
-import org.apache.atlas.repository.graph.GraphToTypedInstanceMapper;
-import org.apache.atlas.repository.graph.TypedInstanceToGraphMapper;
+import org.apache.atlas.repository.graph.*;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
@@ -46,11 +41,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.CREATE;
-import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
-import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
-import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
-
@Singleton
public class AtlasEntityChangeNotifier {
@@ -157,11 +147,17 @@ public class AtlasEntityChangeNotifier {
for (AtlasEntityHeader atlasEntityHeader : atlasEntityHeaders) {
AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(atlasEntityHeader.getGuid());
+
+ if(atlasVertex == null) {
+ continue;
+ }
+
try {
String fullText = fullTextMapper.mapRecursive(atlasVertex, true);
+
GraphHelper.setProperty(atlasVertex, Constants.ENTITY_TEXT_PROPERTY_KEY, fullText);
} catch (AtlasException e) {
- LOG.error("FullText mapping failed for Vertex[ guid = {} ]", atlasEntityHeader.getGuid());
+ LOG.error("FullText mapping failed for Vertex[ guid = {} ]", atlasEntityHeader.getGuid(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 587f3c7..c84f169 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -160,7 +160,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
while (entityStream.hasNext()) {
AtlasEntity entity = entityStream.next();
- if(processedGuids.contains(entity.getGuid())) {
+ if(entity == null || processedGuids.contains(entity.getGuid())) {
continue;
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index a431e02..e7dce78 100755
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -377,7 +377,6 @@
<groupId>com.webcohesion.enunciate</groupId>
<artifactId>enunciate-core-annotations</artifactId>
</dependency>
-
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
index bbd48bc..1e98232 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
@@ -17,48 +17,73 @@
*/
package org.apache.atlas.web.resources;
+import com.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.typedef.AtlasClassificationDef;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.AtlasServiceException;
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.impexp.*;
-import org.apache.atlas.model.typedef.AtlasClassificationDef;
-import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.script.*;
+import javax.script.Bindings;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
import java.util.*;
public class ExportService {
private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
+ public static final String OPTION_ATTR_MATCH_TYPE = "matchType";
+ public static final String MATCH_TYPE_STARTS_WITH = "startsWith";
+ public static final String MATCH_TYPE_ENDS_WITH = "endsWith";
+ public static final String MATCH_TYPE_CONTAINS = "contains";
+ public static final String MATCH_TYPE_MATCHES = "matches";
+
private final AtlasTypeRegistry typeRegistry;
private final AtlasGraph atlasGraph;
private final EntityGraphRetriever entityGraphRetriever;
// query engine support
- private ScriptEngineManager scriptEngineManager;
- private ScriptEngine scriptEngine;
- private Bindings bindings;
- private final String gremlinQuery = "g.V('__guid', startGuid).bothE().bothV().has('__guid').__guid.dedup().toList()";
-
- public ExportService(final AtlasTypeRegistry typeRegistry) {
+ private final ScriptEngine scriptEngine;
+ private final Bindings bindings;
+ private final String queryByGuid = "g.V('__guid', startGuid).bothE().bothV().has('__guid').__guid.dedup().toList()";
+ final private String queryByAttrEquals = "g.V().has('__typeName','%s').has('%s', attrValue).has('__guid').__guid.toList()";
+ final private String queryByAttrStartWith = "g.V().has('__typeName','%s').filter({it.'%s'.startsWith(attrValue)}).has('__guid').__guid.toList()";
+ final private String queryByAttrEndsWith = "g.V().has('__typeName','%s').filter({it.'%s'.endsWith(attrValue)}).has('__guid').__guid.toList()";
+ final private String queryByAttrContains = "g.V().has('__typeName','%s').filter({it.'%s'.contains(attrValue)}).has('__guid').__guid.toList()";
+ final private String queryByAttrMatches = "g.V().has('__typeName','%s').filter({it.'%s'.matches(attrValue)}).has('__guid').__guid.toList()";
+
+ public ExportService(final AtlasTypeRegistry typeRegistry) throws AtlasBaseException {
this.typeRegistry = typeRegistry;
this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
this.atlasGraph = AtlasGraphProvider.getGraphInstance();
- initScriptEngine();
+ this.scriptEngine = new GremlinGroovyScriptEngine();
+
+ //Do not cache script compilations due to memory implications
+ scriptEngine.getContext().setAttribute("#jsr223.groovy.engine.keep.globals", "phantom", ScriptContext.ENGINE_SCOPE);
+
+ bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE);
}
private class ExportContext {
@@ -109,16 +134,18 @@ public class ExportService {
}
try {
- AtlasEntity entity = entityGraphRetriever.toAtlasEntity(item);
+ List<AtlasEntity> entities = getStartingEntity(item, context);
- processEntity(entity, context);
+ for (AtlasEntity entity: entities) {
+ processEntity(entity, context);
+ }
while (!context.guidsToProcess.isEmpty()) {
String guid = context.guidsToProcess.remove(0);
- entity = entityGraphRetriever.toAtlasEntity(guid);
+ AtlasEntity e = entityGraphRetriever.toAtlasEntity(guid);
- processEntity(entity, context);
+ processEntity(e, context);
}
} catch (AtlasBaseException excp) {
context.result.setOperationStatus(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS);
@@ -131,19 +158,92 @@ public class ExportService {
}
}
+ private List<AtlasEntity> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException {
+ List<AtlasEntity> ret = new ArrayList<>();
+
+ if (StringUtils.isNotEmpty(item.getGuid())) {
+ AtlasEntity entity = entityGraphRetriever.toAtlasEntity(item);
+
+ if (entity != null) {
+ ret = Collections.singletonList(entity);
+ }
+ } else if (StringUtils.isNotEmpty(item.getTypeName()) && MapUtils.isNotEmpty(item.getUniqueAttributes())) {
+ String typeName = item.getTypeName();
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+
+ if (entityType == null) {
+ throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName);
+ }
+
+ AtlasExportRequest request = context.result.getRequest();
+ String matchType = null;
+
+ if (MapUtils.isNotEmpty(request.getOptions())) {
+ if (request.getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
+ matchType = request.getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
+ }
+ }
+
+ final String queryTemplate;
+ if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_STARTS_WITH)) {
+ queryTemplate = queryByAttrStartWith;
+ } else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_ENDS_WITH)) {
+ queryTemplate = queryByAttrEndsWith;
+ } else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_CONTAINS)) {
+ queryTemplate = queryByAttrContains;
+ } else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_MATCHES)) {
+ queryTemplate = queryByAttrMatches;
+ } else { // default
+ queryTemplate = queryByAttrEquals;
+ }
+
+ for (Map.Entry<String, Object> e : item.getUniqueAttributes().entrySet()) {
+ String attrName = e.getKey();
+ Object attrValue = e.getValue();
+
+ AtlasAttribute attribute = entityType.getAttribute(attrName);
+
+ if (attribute == null || attrValue == null) {
+ continue;
+ }
+
+ String query = String.format(queryTemplate, typeName, attribute.getQualifiedName());
+ List<String> guids = executeGremlinScriptFor(query, "attrValue", attrValue.toString());
+
+ if (CollectionUtils.isNotEmpty(guids)) {
+ for (String guid : guids) {
+ AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid);
+
+ if (entity == null) {
+ continue;
+ }
+
+ ret.add(entity);
+ }
+ }
+
+ break;
+ }
+
+ LOG.info("export(item={}; matchType={}): found {} entities", item, matchType, ret.size());
+ }
+
+ return ret;
+ }
+
private void processEntity(AtlasEntity entity, ExportContext context) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> processEntity({})", AtlasTypeUtil.getAtlasObjectId(entity));
}
if (!context.guidsProcessed.contains(entity.getGuid())) {
+ context.guidsProcessed.add(entity.getGuid());
+ context.result.getData().getEntityCreationOrder().add(entity.getGuid());
+
addTypesAsNeeded(entity.getTypeName(), context);
addClassificationsAsNeeded(entity, context);
addEntity(entity, context);
- context.guidsProcessed.add(entity.getGuid());
- context.result.getData().getEntityCreationOrder().add(entity.getGuid());
-
getConnectedEntityGuids(entity, context);
}
@@ -159,7 +259,11 @@ public class ExportService {
LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
}
- List<String> result = executeGremlinScriptFor(entity.getGuid());
+ List<String> result = executeGremlinScriptForHive(entity.getGuid());
+ if(result == null) {
+ return;
+ }
+
for (String guid : result) {
if (!context.guidsProcessed.contains(guid)) {
context.guidsToProcess.add(guid);
@@ -215,22 +319,20 @@ public class ExportService {
}
}
- private List<String> executeGremlinScriptFor(String guid) throws ScriptException {
-
- bindings.put("startGuid", guid);
- return (List<String>) atlasGraph.executeGremlinScript(this.scriptEngine, this.bindings, this.gremlinQuery, false);
+ private List<String> executeGremlinScriptForHive(String guid) throws ScriptException {
+ return executeGremlinScriptFor(this.queryByGuid, "startGuid", guid);
}
- private void initScriptEngine() {
- if (scriptEngineManager != null) {
- return;
+ private List<String> executeGremlinScriptFor(String query, String parameterName, String parameterValue) {
+ bindings.put(parameterName, parameterValue);
+ try {
+ return (List<String>) atlasGraph.executeGremlinScript(this.scriptEngine,
+ this.bindings,
+ query,
+ false);
+ } catch (ScriptException e) {
+ LOG.error("Script execution failed for query: ", query, e);
+ return null;
}
-
- scriptEngineManager = new ScriptEngineManager();
- scriptEngine = scriptEngineManager.getEngineByName("gremlin-groovy");
- bindings = scriptEngine.createBindings();
-
- //Do not cache script compilations due to memory implications
- scriptEngine.getContext().setAttribute("#jsr223.groovy.engine.keep.globals", "phantom", ScriptContext.ENGINE_SCOPE);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
index e69a139..4596084 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
@@ -34,6 +34,8 @@ import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
+import static org.apache.atlas.AtlasErrorCode.JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED;
+
public class ZipSource implements EntityImportStream {
private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class);
@@ -80,7 +82,6 @@ public class ZipSource implements EntityImportStream {
String entryName = zipEntry.getName().replace(".json", "");
if (guidEntityJsonMap.containsKey(entryName)) continue;
- if (zipEntry == null) continue;
byte[] buf = new byte[1024];
@@ -111,8 +112,12 @@ public class ZipSource implements EntityImportStream {
try {
ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(jsonData, clazz);
+ T ret = mapper.readValue(jsonData, clazz);
+ if(ret == null) {
+ throw new AtlasBaseException(JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED, clazz.toString());
+ }
+ return ret;
} catch (Exception e) {
throw new AtlasBaseException("Error converting file to JSON.", e);
}