You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/02/27 04:14:54 UTC

incubator-atlas git commit: ATLAS-1503: update export to specify objects-to-export using attribute value

Repository: incubator-atlas
Updated Branches:
  refs/heads/master 0c1d599dd -> 515130ccd


ATLAS-1503: update export to specify objects-to-export using attribute value

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/515130cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/515130cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/515130cc

Branch: refs/heads/master
Commit: 515130ccd09003dbc0bbfd9436629c7d5648b15c
Parents: 0c1d599
Author: ashutoshm <am...@hortonworks.com>
Authored: Fri Feb 24 15:41:44 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Sun Feb 26 20:13:46 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/atlas/AtlasErrorCode.java   |   8 +-
 .../graph/v1/AtlasEntityChangeNotifier.java     |  22 +--
 .../store/graph/v1/AtlasEntityStoreV1.java      |   2 +-
 webapp/pom.xml                                  |   1 -
 .../atlas/web/resources/ExportService.java      | 176 +++++++++++++++----
 .../apache/atlas/web/resources/ZipSource.java   |   9 +-
 6 files changed, 161 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index 542b659..d58c514 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -20,11 +20,10 @@ package org.apache.atlas;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.core.Response;
 import java.text.MessageFormat;
 import java.util.Arrays;
 
-import javax.ws.rs.core.Response;
-
 public enum AtlasErrorCode {
     NO_SEARCH_RESULTS(204, "ATLAS2041E", "Given search filter {0} did not yield any results"),
 
@@ -90,7 +89,10 @@ public enum AtlasErrorCode {
     DISCOVERY_QUERY_FAILED(500, "ATLAS5004E", "Discovery query failed {0}"),
     FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK(500, "ATLAS5005E", "Failed to get the lock; another type update might be in progress. Please try again"),
     FAILED_TO_OBTAIN_IMPORT_EXPORT_LOCK(500, "ATLAS5006E", "Another import or export is in progress. Please try again"),
-    NOTIFICATION_FAILED(500, "ATLAS5007E", "Failed to notify for change {0}");
+    NOTIFICATION_FAILED(500, "ATLAS5007E", "Failed to notify for change {0}"),
+    GREMLIN_GROOVY_SCRIPT_ENGINE_FAILED(500, "ATLAS5008E", "scriptEngine cannot be initialized for: {0}"),
+    JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED(500, "ATLAS5009E", "ObjectMapper.readValue returned NULL for class: {0}"),
+    GREMLIN_SCRIPT_EXECUTION_FAILED(500, "ATLAS5010E", "Script execution failed for: {0}");
 
     private String errorCode;
     private String errorMessage;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
index feada34..4ec2a7c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
@@ -23,18 +23,13 @@ import com.google.inject.Singleton;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.listener.EntityChangeListener;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
-import org.apache.atlas.listener.EntityChangeListener;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
-import org.apache.atlas.repository.graph.AtlasGraphProvider;
-import org.apache.atlas.repository.graph.DeleteHandler;
-import org.apache.atlas.repository.graph.FullTextMapper;
-import org.apache.atlas.repository.graph.GraphHelper;
-import org.apache.atlas.repository.graph.GraphToTypedInstanceMapper;
-import org.apache.atlas.repository.graph.TypedInstanceToGraphMapper;
+import org.apache.atlas.repository.graph.*;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.typesystem.ITypedReferenceableInstance;
 import org.apache.atlas.util.AtlasRepositoryConfiguration;
@@ -46,11 +41,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
-import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.CREATE;
-import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
-import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
-import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
-
 
 @Singleton
 public class AtlasEntityChangeNotifier {
@@ -157,11 +147,17 @@ public class AtlasEntityChangeNotifier {
 
         for (AtlasEntityHeader atlasEntityHeader : atlasEntityHeaders) {
             AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(atlasEntityHeader.getGuid());
+
+            if(atlasVertex == null) {
+                continue;
+            }
+
             try {
                 String fullText = fullTextMapper.mapRecursive(atlasVertex, true);
+
                 GraphHelper.setProperty(atlasVertex, Constants.ENTITY_TEXT_PROPERTY_KEY, fullText);
             } catch (AtlasException e) {
-                LOG.error("FullText mapping failed for Vertex[ guid = {} ]", atlasEntityHeader.getGuid());
+                LOG.error("FullText mapping failed for Vertex[ guid = {} ]", atlasEntityHeader.getGuid(), e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 587f3c7..c84f169 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -160,7 +160,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         while (entityStream.hasNext()) {
             AtlasEntity entity = entityStream.next();
 
-            if(processedGuids.contains(entity.getGuid())) {
+            if(entity == null || processedGuids.contains(entity.getGuid())) {
                 continue;
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index a431e02..e7dce78 100755
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -377,7 +377,6 @@
             <groupId>com.webcohesion.enunciate</groupId>
             <artifactId>enunciate-core-annotations</artifactId>
         </dependency>
-
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
index bbd48bc..1e98232 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java
@@ -17,48 +17,73 @@
  */
 package org.apache.atlas.web.resources;
 
+import com.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.typedef.AtlasClassificationDef;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.AtlasServiceException;
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.impexp.*;
-import org.apache.atlas.model.typedef.AtlasClassificationDef;
-import org.apache.atlas.model.typedef.AtlasEntityDef;
 import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.script.*;
+import javax.script.Bindings;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
 import java.util.*;
 
 
 public class ExportService {
     private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
 
+    public static final String OPTION_ATTR_MATCH_TYPE = "matchType";
+    public static final String MATCH_TYPE_STARTS_WITH = "startsWith";
+    public static final String MATCH_TYPE_ENDS_WITH   = "endsWith";
+    public static final String MATCH_TYPE_CONTAINS    = "contains";
+    public static final String MATCH_TYPE_MATCHES     = "matches";
+
     private final AtlasTypeRegistry    typeRegistry;
     private final AtlasGraph           atlasGraph;
     private final EntityGraphRetriever entityGraphRetriever;
 
     // query engine support
-    private ScriptEngineManager scriptEngineManager;
-    private ScriptEngine scriptEngine;
-    private Bindings bindings;
-    private final String gremlinQuery = "g.V('__guid', startGuid).bothE().bothV().has('__guid').__guid.dedup().toList()";
-
-    public ExportService(final AtlasTypeRegistry typeRegistry) {
+    private final ScriptEngine scriptEngine;
+    private final Bindings     bindings;
+    private final String queryByGuid          = "g.V('__guid', startGuid).bothE().bothV().has('__guid').__guid.dedup().toList()";
+    final private String queryByAttrEquals    = "g.V().has('__typeName','%s').has('%s', attrValue).has('__guid').__guid.toList()";
+    final private String queryByAttrStartWith = "g.V().has('__typeName','%s').filter({it.'%s'.startsWith(attrValue)}).has('__guid').__guid.toList()";
+    final private String queryByAttrEndsWith  = "g.V().has('__typeName','%s').filter({it.'%s'.endsWith(attrValue)}).has('__guid').__guid.toList()";
+    final private String queryByAttrContains  = "g.V().has('__typeName','%s').filter({it.'%s'.contains(attrValue)}).has('__guid').__guid.toList()";
+    final private String queryByAttrMatches   = "g.V().has('__typeName','%s').filter({it.'%s'.matches(attrValue)}).has('__guid').__guid.toList()";
+
+    public ExportService(final AtlasTypeRegistry typeRegistry) throws AtlasBaseException {
         this.typeRegistry         = typeRegistry;
         this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
         this.atlasGraph           = AtlasGraphProvider.getGraphInstance();
 
-        initScriptEngine();
+        this.scriptEngine  = new GremlinGroovyScriptEngine();
+
+        //Do not cache script compilations due to memory implications
+        scriptEngine.getContext().setAttribute("#jsr223.groovy.engine.keep.globals", "phantom",  ScriptContext.ENGINE_SCOPE);
+
+        bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE);
     }
 
     private class ExportContext {
@@ -109,16 +134,18 @@ public class ExportService {
         }
 
         try {
-            AtlasEntity entity = entityGraphRetriever.toAtlasEntity(item);
+            List<AtlasEntity> entities = getStartingEntity(item, context);
 
-            processEntity(entity, context);
+            for (AtlasEntity entity: entities) {
+                processEntity(entity, context);
+            }
 
             while (!context.guidsToProcess.isEmpty()) {
                 String guid = context.guidsToProcess.remove(0);
 
-                entity = entityGraphRetriever.toAtlasEntity(guid);
+                AtlasEntity e = entityGraphRetriever.toAtlasEntity(guid);
 
-                processEntity(entity, context);
+                processEntity(e, context);
             }
         } catch (AtlasBaseException excp) {
             context.result.setOperationStatus(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS);
@@ -131,19 +158,92 @@ public class ExportService {
         }
     }
 
+    private List<AtlasEntity> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException {
+        List<AtlasEntity> ret = new ArrayList<>();
+
+        if (StringUtils.isNotEmpty(item.getGuid())) {
+            AtlasEntity entity = entityGraphRetriever.toAtlasEntity(item);
+
+            if (entity != null) {
+                ret = Collections.singletonList(entity);
+            }
+        } else if (StringUtils.isNotEmpty(item.getTypeName()) && MapUtils.isNotEmpty(item.getUniqueAttributes())) {
+            String          typeName   = item.getTypeName();
+            AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+
+            if (entityType == null) {
+                throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName);
+            }
+
+            AtlasExportRequest request = context.result.getRequest();
+            String matchType = null;
+
+            if (MapUtils.isNotEmpty(request.getOptions())) {
+                if (request.getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
+                    matchType = request.getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
+                }
+            }
+
+            final String queryTemplate;
+            if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_STARTS_WITH)) {
+                queryTemplate = queryByAttrStartWith;
+            } else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_ENDS_WITH)) {
+                queryTemplate = queryByAttrEndsWith;
+            } else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_CONTAINS)) {
+                queryTemplate = queryByAttrContains;
+            } else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_MATCHES)) {
+                queryTemplate = queryByAttrMatches;
+            } else { // default
+                queryTemplate = queryByAttrEquals;
+            }
+
+            for (Map.Entry<String, Object> e : item.getUniqueAttributes().entrySet()) {
+                String attrName  = e.getKey();
+                Object attrValue = e.getValue();
+
+                AtlasAttribute attribute = entityType.getAttribute(attrName);
+
+                if (attribute == null || attrValue == null) {
+                    continue;
+                }
+
+                String       query = String.format(queryTemplate, typeName, attribute.getQualifiedName());
+                List<String> guids = executeGremlinScriptFor(query, "attrValue", attrValue.toString());
+
+                if (CollectionUtils.isNotEmpty(guids)) {
+                    for (String guid : guids) {
+                        AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid);
+
+                        if (entity == null) {
+                            continue;
+                        }
+
+                        ret.add(entity);
+                    }
+                }
+
+                break;
+            }
+
+            LOG.info("export(item={}; matchType={}): found {} entities", item, matchType, ret.size());
+        }
+
+        return ret;
+    }
+
     private void processEntity(AtlasEntity entity, ExportContext context) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> processEntity({})", AtlasTypeUtil.getAtlasObjectId(entity));
         }
 
         if (!context.guidsProcessed.contains(entity.getGuid())) {
+            context.guidsProcessed.add(entity.getGuid());
+            context.result.getData().getEntityCreationOrder().add(entity.getGuid());
+
             addTypesAsNeeded(entity.getTypeName(), context);
             addClassificationsAsNeeded(entity, context);
             addEntity(entity, context);
 
-            context.guidsProcessed.add(entity.getGuid());
-            context.result.getData().getEntityCreationOrder().add(entity.getGuid());
-
             getConnectedEntityGuids(entity, context);
         }
 
@@ -159,7 +259,11 @@ public class ExportService {
                 LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
             }
 
-            List<String> result = executeGremlinScriptFor(entity.getGuid());
+            List<String> result = executeGremlinScriptForHive(entity.getGuid());
+            if(result == null) {
+                return;
+            }
+
             for (String guid : result) {
                 if (!context.guidsProcessed.contains(guid)) {
                     context.guidsToProcess.add(guid);
@@ -215,22 +319,20 @@ public class ExportService {
         }
     }
 
-    private List<String> executeGremlinScriptFor(String guid) throws ScriptException {
-
-        bindings.put("startGuid", guid);
-        return (List<String>) atlasGraph.executeGremlinScript(this.scriptEngine, this.bindings, this.gremlinQuery, false);
+    private List<String> executeGremlinScriptForHive(String guid) throws ScriptException {
+        return executeGremlinScriptFor(this.queryByGuid, "startGuid", guid);
     }
 
-    private void initScriptEngine() {
-        if (scriptEngineManager != null) {
-            return;
+    private List<String> executeGremlinScriptFor(String query, String parameterName, String parameterValue) {
+        bindings.put(parameterName, parameterValue);
+        try {
+            return (List<String>) atlasGraph.executeGremlinScript(this.scriptEngine,
+                    this.bindings,
+                    query,
+                    false);
+        } catch (ScriptException e) {
+            LOG.error("Script execution failed for query: ", query, e);
+            return null;
         }
-
-        scriptEngineManager = new ScriptEngineManager();
-        scriptEngine = scriptEngineManager.getEngineByName("gremlin-groovy");
-        bindings = scriptEngine.createBindings();
-
-        //Do not cache script compilations due to memory implications
-        scriptEngine.getContext().setAttribute("#jsr223.groovy.engine.keep.globals", "phantom", ScriptContext.ENGINE_SCOPE);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
index e69a139..4596084 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java
@@ -34,6 +34,8 @@ import java.util.Map;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 
+import static org.apache.atlas.AtlasErrorCode.JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED;
+
 
 public class ZipSource implements EntityImportStream {
     private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class);
@@ -80,7 +82,6 @@ public class ZipSource implements EntityImportStream {
             String entryName = zipEntry.getName().replace(".json", "");
 
             if (guidEntityJsonMap.containsKey(entryName)) continue;
-            if (zipEntry == null) continue;
 
             byte[] buf = new byte[1024];
 
@@ -111,8 +112,12 @@ public class ZipSource implements EntityImportStream {
         try {
             ObjectMapper mapper = new ObjectMapper();
 
-            return mapper.readValue(jsonData, clazz);
+            T ret = mapper.readValue(jsonData, clazz);
+            if(ret == null) {
+                throw new AtlasBaseException(JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED, clazz.toString());
+            }
 
+            return ret;
         } catch (Exception e) {
             throw new AtlasBaseException("Error converting file to JSON.", e);
         }