You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/02/13 07:44:00 UTC
[4/5] incubator-atlas git commit: ATLAS-1544: implementation of REST
endpoints for entity create/update/bulk-get
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/ArrayVertexMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/ArrayVertexMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/ArrayVertexMapper.java
deleted file mode 100644
index ddd2242..0000000
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/ArrayVertexMapper.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.repository.store.graph.v1;
-
-import com.google.common.base.Optional;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
-import org.apache.atlas.repository.graph.GraphHelper;
-import org.apache.atlas.repository.graphdb.AtlasEdge;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.type.AtlasArrayType;
-import org.apache.atlas.type.AtlasStructType;
-import org.apache.atlas.type.AtlasType;
-import org.apache.commons.collections.CollectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.atlas.repository.graph.GraphHelper.string;
-
-@Singleton
-public class ArrayVertexMapper implements InstanceGraphMapper<List> {
-
- private static final Logger LOG = LoggerFactory.getLogger(ArrayVertexMapper.class);
-
- protected final DeleteHandlerV1 deleteHandler;
-
- protected StructVertexMapper structVertexMapper;
-
- @Inject
- public ArrayVertexMapper(DeleteHandlerV1 deleteHandler) {
- this.deleteHandler = deleteHandler;
- }
-
- void init(StructVertexMapper structVertexMapper) {
- this.structVertexMapper = structVertexMapper;
- }
-
- @Override
- public List toGraph(GraphMutationContext ctx) throws AtlasBaseException {
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Mapping instance to vertex {} for array attribute {}", string(ctx.getReferringVertex()), ctx.getAttrType().getTypeName());
- }
-
- List newElements = (List) ctx.getValue();
- boolean newAttributeEmpty = (newElements == null || newElements.isEmpty());
-
- AtlasArrayType arrType = (AtlasArrayType) ctx.getAttrType();
- AtlasType elementType = arrType.getElementType();
- List<Object> currentElements = getArrayElementsProperty(elementType, ctx.getReferringVertex(), ctx.getVertexPropertyKey());
-
- List<Object> newElementsCreated = new ArrayList<>();
-
- if (!newAttributeEmpty) {
- for (int index = 0; index < newElements.size(); index++) {
-
- LOG.debug("Adding/updating element at position {}, current element {}, new element {}", index,
- (currentElements != null && index < currentElements.size()) ? currentElements.get(index) : null, newElements.get(index));
-
- Optional<AtlasEdge> existingEdge = getEdgeAt(currentElements, index, arrType.getElementType());
-
- GraphMutationContext arrCtx = new GraphMutationContext.Builder(ctx.getOp(), ctx.getAttribute(),
- arrType.getElementType(), newElements.get(index))
- .referringVertex(ctx.getReferringVertex())
- .edge(existingEdge)
- .vertexProperty(ctx.getVertexPropertyKey()).build();
-
- Object newEntry = structVertexMapper.mapCollectionElementsToVertex(arrCtx);
- newElementsCreated.add(newEntry);
- }
- }
-
- if (AtlasGraphUtilsV1.isReference(elementType)) {
- List<AtlasEdge> additionalEdges = removeUnusedArrayEntries(ctx.getParentType(), ctx.getAttributeDef(), (List) currentElements, (List) newElementsCreated, elementType);
- newElementsCreated.addAll(additionalEdges);
- }
-
- // for dereference on way out
- setArrayElementsProperty(elementType, ctx.getReferringVertex(), ctx.getVertexPropertyKey(), newElementsCreated);
- return newElementsCreated;
- }
-
- @Override
- public void cleanUp() throws AtlasBaseException {
-
- }
-
- //Removes unused edges from the old collection, compared to the new collection
- private List<AtlasEdge> removeUnusedArrayEntries(
- AtlasStructType entityType,
- AtlasAttributeDef attributeDef,
- List<AtlasEdge> currentEntries,
- List<AtlasEdge> newEntries,
- AtlasType entryType) throws AtlasBaseException {
- if (currentEntries != null && !currentEntries.isEmpty()) {
- LOG.debug("Removing unused entries from the old collection");
- if (AtlasGraphUtilsV1.isReference(entryType)) {
-
- Collection<AtlasEdge> edgesToRemove = CollectionUtils.subtract(currentEntries, newEntries);
-
- LOG.debug("Removing unused entries from the old collection - {}", edgesToRemove);
-
- if (!edgesToRemove.isEmpty()) {
- //Remove the edges for (current edges - new edges)
- List<AtlasEdge> additionalElements = new ArrayList<>();
-
- for (AtlasEdge edge : edgesToRemove) {
- boolean deleteChildReferences = StructVertexMapper.shouldManageChildReferences(entityType, attributeDef.getName());
- boolean deleted = deleteHandler.deleteEdgeReference(edge, entryType.getTypeCategory(),
- deleteChildReferences, true);
- if (!deleted) {
- additionalElements.add(edge);
- }
- }
-
- return additionalElements;
- }
- }
- }
- return Collections.emptyList();
- }
-
- public static List<Object> getArrayElementsProperty(AtlasType elementType, AtlasVertex instanceVertex, String propertyName) {
- String actualPropertyName = GraphHelper.encodePropertyKey(propertyName);
- if (AtlasGraphUtilsV1.isReference(elementType)) {
- return (List)instanceVertex.getListProperty(actualPropertyName, AtlasEdge.class);
- }
- else {
- return (List)instanceVertex.getListProperty(actualPropertyName);
- }
- }
-
- private Optional<AtlasEdge> getEdgeAt(List<Object> currentElements, int index, AtlasType elemType) {
- Optional<AtlasEdge> existingEdge = Optional.absent();
- if ( AtlasGraphUtilsV1.isReference(elemType) ) {
- Object currentElement = (currentElements != null && index < currentElements.size()) ?
- currentElements.get(index) : null;
-
- if ( currentElement != null) {
- existingEdge = Optional.of((AtlasEdge) currentElement);
- }
- }
-
- return existingEdge;
- }
-
- private void setArrayElementsProperty(AtlasType elementType, AtlasVertex instanceVertex, String propertyName, List<Object> values) {
- String actualPropertyName = GraphHelper.encodePropertyKey(propertyName);
- if (AtlasGraphUtilsV1.isReference(elementType)) {
- GraphHelper.setListPropertyFromElementIds(instanceVertex, actualPropertyName, (List) values);
- }
- else {
- GraphHelper.setProperty(instanceVertex, actualPropertyName, values);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
index 2b0804f..7141911 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
@@ -17,10 +17,10 @@
*/
package org.apache.atlas.repository.store.graph.v1;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -31,7 +31,6 @@ import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
-import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.EntityResolver;
@@ -41,38 +40,21 @@ import org.apache.atlas.type.AtlasMapType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.commons.lang3.StringUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityGraphDiscoveryV1.class);
private final AtlasTypeRegistry typeRegistry;
- private final EntityGraphDiscoveryContext discoveredEntities;
- private final Set<String> processedIds = new HashSet<>();
- private final Collection<EntityResolver> entityResolvers = new LinkedHashSet<>();
-
- @Inject
- public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, Collection<Provider<EntityResolver>> entityResolverProviders) {
- this.typeRegistry = typeRegistry;
- this.discoveredEntities = new EntityGraphDiscoveryContext(typeRegistry);
+ private final EntityGraphDiscoveryContext discoveryContext;
- for (Provider<EntityResolver> entityResolverProvider : entityResolverProviders) {
- entityResolvers.add(entityResolverProvider.get());
- }
- }
-
- @VisibleForTesting
- public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, List<EntityResolver> entityResolvers) {
- this.typeRegistry = typeRegistry;
- this.discoveredEntities = new EntityGraphDiscoveryContext(typeRegistry);
-
- for (EntityResolver entityResolver : entityResolvers) {
- this.entityResolvers.add(entityResolver);
- }
+ public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, EntityStream entityStream) {
+ this.typeRegistry = typeRegistry;
+ this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry, entityStream);
}
@Override
@@ -81,161 +63,239 @@ public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery {
}
@Override
- public EntityGraphDiscoveryContext discoverEntities(final List<AtlasEntity> entities) throws AtlasBaseException {
+ public EntityGraphDiscoveryContext discoverEntities() throws AtlasBaseException {
- //walk the graph and discover entity references
- discover(entities);
+ // walk through entities in stream and validate them; record entity references
+ discoverAndValidate();
- //resolve root and referred entities
+ // resolve entity references discovered in previous step
resolveReferences();
- return discoveredEntities;
+ return discoveryContext;
}
@Override
public void cleanUp() throws AtlasBaseException {
- processedIds.clear();
- discoveredEntities.cleanUp();
-
- for (EntityResolver resolver : entityResolvers) {
- resolver.cleanUp();
- }
+ discoveryContext.cleanUp();
}
- protected void discover(List<AtlasEntity> entities) throws AtlasBaseException {
- for (AtlasEntity entity : entities) {
- AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName());
+ protected void discoverAndValidate() throws AtlasBaseException {
+ EntityStream entityStream = discoveryContext.getEntityStream();
+
+ Set<String> walkedEntities = new HashSet<>();
+
+ // walk through top-level entities and find entity references
+ while (entityStream.hasNext()) {
+ AtlasEntity entity = entityStream.next();
+
+ if (entity != null) {
+ walkEntityGraph(entity);
+
+ walkedEntities.add(entity.getGuid());
+ }
+ }
+
+ // walk through entities referenced by other entities
+ // referencedGuids will be updated within this for() loop; avoid use of iterators
+ List<String> referencedGuids = discoveryContext.getReferencedGuids();
+ for (int i = 0; i < referencedGuids.size(); i++) {
+ String guid = referencedGuids.get(i);
- if (type == null) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName());
+ if (walkedEntities.contains(guid)) {
+ continue;
}
- discoveredEntities.addRootEntity(entity);
+ AtlasEntity entity = entityStream.getByGuid(guid);
- walkEntityGraph(type, entity);
+ if (entity != null) {
+ walkEntityGraph(entity);
+
+ walkedEntities.add(entity.getGuid());
+ }
}
}
protected void resolveReferences() throws AtlasBaseException {
- for (EntityResolver resolver : entityResolvers) {
- resolver.init(discoveredEntities);
+ EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(),
+ new UniqAttrBasedEntityResolver(typeRegistry)
+ };
- resolver.resolveEntityReferences();
+ for (EntityResolver resolver : entityResolvers) {
+ resolver.resolveEntityReferences(discoveryContext);
}
+ }
- if (discoveredEntities.hasUnresolvedReferences()) {
- throw new AtlasBaseException(AtlasErrorCode.UNRESOLVED_REFERENCES_FOUND,
- discoveredEntities.getUnresolvedIds().toString(),
- discoveredEntities.getUnresolvedIdsByUniqAttribs().toString());
+ private void visitReference(AtlasEntityType type, Object val) throws AtlasBaseException {
+ if (type == null || val == null) {
+ return;
}
- }
- private void visitReference(AtlasEntityType type, Object entity) throws AtlasBaseException {
- if (entity != null) {
- if (entity instanceof AtlasObjectId) {
- AtlasObjectId objId = (AtlasObjectId)entity;
+ if (val instanceof AtlasObjectId) {
+ AtlasObjectId objId = (AtlasObjectId)val;
+
+ if (!objId.isValid()) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, objId.toString());
+ }
- if (!objId.isValid()) {
- throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, "Invalid object id " + objId);
- }
+ recordObjectReference(objId);
+ } else if (val instanceof Map) {
+ AtlasObjectId objId = new AtlasObjectId((Map)val);
- if (!StringUtils.isEmpty(objId.getGuid()) && (objId.isAssignedGuid() || objId.isUnAssignedGuid())) {
- discoveredEntities.addUnResolvedId(objId);
- } else {
- discoveredEntities.addUnresolvedIdByUniqAttribs(objId);
- }
- } else if (entity instanceof AtlasEntity) {
- throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, "Use AtlasObjectId to refer to another instance instead of AtlasEntity " + type.getTypeName());
- } else {
- throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, "Invalid object type " + entity.getClass());
+ if (!objId.isValid()) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, objId.toString());
}
+
+ recordObjectReference(objId);
+ } else if (val instanceof AtlasEntity) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, "found AtlasEntity");
+ } else {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
}
}
- void visitAttribute(AtlasStructType parentType, AtlasType attrType, AtlasAttributeDef attrDef, Object val) throws AtlasBaseException {
- if (val != null) {
- if ( isPrimitive(attrType.getTypeCategory()) ) {
- return;
- }
- if (attrType.getTypeCategory() == TypeCategory.ARRAY) {
- AtlasArrayType arrayType = (AtlasArrayType) attrType;
- AtlasType elemType = arrayType.getElementType();
-
- visitCollectionReferences(parentType, attrType, attrDef, elemType, val);
- } else if (attrType.getTypeCategory() == TypeCategory.MAP) {
- AtlasType keyType = ((AtlasMapType) attrType).getKeyType();
- AtlasType valueType = ((AtlasMapType) attrType).getValueType();
-
- visitMapReferences(parentType, attrType, attrDef, keyType, valueType, val);
- } else if (attrType.getTypeCategory() == TypeCategory.STRUCT) {
- visitStruct((AtlasStructType)attrType, val);
- } else if (attrType.getTypeCategory() == TypeCategory.ENTITY) {
- visitReference((AtlasEntityType) attrType, val);
- }
+ void visitAttribute(AtlasType attrType, Object val) throws AtlasBaseException {
+ if (attrType == null || val == null) {
+ return;
+ }
+
+ if (isPrimitive(attrType.getTypeCategory()) ) {
+ return;
+ }
+ if (attrType.getTypeCategory() == TypeCategory.ARRAY) {
+ AtlasArrayType arrayType = (AtlasArrayType) attrType;
+ AtlasType elemType = arrayType.getElementType();
+
+ visitCollectionReferences(elemType, val);
+ } else if (attrType.getTypeCategory() == TypeCategory.MAP) {
+ AtlasType keyType = ((AtlasMapType) attrType).getKeyType();
+ AtlasType valueType = ((AtlasMapType) attrType).getValueType();
+
+ visitMapReferences(keyType, valueType, val);
+ } else if (attrType.getTypeCategory() == TypeCategory.STRUCT) {
+ visitStruct((AtlasStructType)attrType, val);
+ } else if (attrType.getTypeCategory() == TypeCategory.ENTITY) {
+ visitReference((AtlasEntityType) attrType, val);
}
}
- void visitMapReferences(AtlasStructType parentType, final AtlasType attrType, AtlasAttributeDef attrDef, AtlasType keyType, AtlasType valueType, Object val) throws AtlasBaseException {
+ void visitMapReferences(AtlasType keyType, AtlasType valueType, Object val) throws AtlasBaseException {
+ if (keyType == null || valueType == null || val == null) {
+ return;
+ }
+
if (isPrimitive(keyType.getTypeCategory()) && isPrimitive(valueType.getTypeCategory())) {
return;
}
- if (val != null) {
- if (Map.class.isAssignableFrom(val.getClass())) {
- Iterator<Map.Entry> it = ((Map) val).entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry e = it.next();
- visitAttribute(parentType, keyType, attrDef, e.getKey());
- visitAttribute(parentType, valueType, attrDef, e.getValue());
- }
+ if (Map.class.isAssignableFrom(val.getClass())) {
+ Iterator<Map.Entry> it = ((Map) val).entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry e = it.next();
+ visitAttribute(keyType, e.getKey());
+ visitAttribute(valueType, e.getValue());
}
}
}
- void visitCollectionReferences(final AtlasStructType parentType, final AtlasType attrType, final AtlasAttributeDef attrDef, AtlasType elemType, Object val) throws AtlasBaseException {
- if (isPrimitive(elemType.getTypeCategory())) {
+ void visitCollectionReferences(AtlasType elemType, Object val) throws AtlasBaseException {
+ if (elemType == null || val == null || isPrimitive(elemType.getTypeCategory())) {
return;
}
- if (val != null) {
- Iterator it = null;
- if (val instanceof Collection) {
- it = ((Collection) val).iterator();
- } else if (val instanceof Iterable) {
- it = ((Iterable) val).iterator();
- } else if (val instanceof Iterator) {
- it = (Iterator) val;
- }
- if (it != null) {
- while (it.hasNext()) {
- Object elem = it.next();
- visitAttribute(parentType, elemType, attrDef, elem);
- }
+ Iterator it = null;
+
+ if (val instanceof Collection) {
+ it = ((Collection) val).iterator();
+ } else if (val instanceof Iterable) {
+ it = ((Iterable) val).iterator();
+ } else if (val instanceof Iterator) {
+ it = (Iterator) val;
+ }
+
+ if (it != null) {
+ while (it.hasNext()) {
+ Object elem = it.next();
+ visitAttribute(elemType, elem);
}
}
}
void visitStruct(AtlasStructType structType, Object val) throws AtlasBaseException {
- if (structType == null) {
+ if (structType == null || val == null) {
return;
}
- for (AtlasStructType.AtlasAttribute attribute : structType.getAllAttributes().values()) {
+ AtlasStruct struct;
+
+ if (val instanceof AtlasStruct) {
+ struct = (AtlasStruct) val;
+ } else if (val instanceof Map) {
+ Map attributes = AtlasTypeUtil.toStructAttributes((Map) val);
+
+ struct = new AtlasStruct(structType.getTypeName(), attributes);
+ } else {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_STRUCT_VALUE, val.toString());
+ }
+
+ for (AtlasAttribute attribute : structType.getAllAttributes().values()) {
AtlasType attrType = attribute.getAttributeType();
- Object attrVal = ((AtlasStruct) val).getAttribute(attribute.getName());
+ Object attrVal = struct.getAttribute(attribute.getName());
- visitAttribute(structType, attrType, attribute.getAttributeDef(), attrVal);
+ visitAttribute(attrType, attrVal);
}
}
- void walkEntityGraph(AtlasEntityType entityType, AtlasEntity entity) throws AtlasBaseException {
- visitStruct(entityType, entity);
+ void walkEntityGraph(AtlasEntity entity) throws AtlasBaseException {
+ if (entity == null) {
+ return;
+ }
+
+ validateAndNormalize(entity);
+ AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+ recordObjectReference(entity.getGuid());
+
+ visitStruct(type, entity);
}
boolean isPrimitive(TypeCategory typeCategory) {
return typeCategory == TypeCategory.PRIMITIVE || typeCategory == TypeCategory.ENUM;
}
+
+ private void validateAndNormalize(AtlasEntity entity) throws AtlasBaseException {
+ List<String> messages = new ArrayList<>();
+
+ if (!AtlasEntity.isAssigned(entity.getGuid()) && !AtlasEntity.isUnAssigned(entity.getGuid())) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, "invalid guid " + entity.getGuid());
+ }
+
+ AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+ if (type == null) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName());
+ }
+
+ type.validateValue(entity, entity.getTypeName(), messages);
+
+ if (!messages.isEmpty()) {
+ throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages);
+ }
+
+ type.getNormalizedValue(entity);
+ }
+
+ private void recordObjectReference(String guid) {
+ discoveryContext.addReferencedGuid(guid);
+ }
+
+ private void recordObjectReference(AtlasObjectId objId) {
+ if (objId.isValidGuid()) {
+ discoveryContext.addReferencedGuid(objId.getGuid());
+ } else {
+ discoveryContext.addReferencedByUniqAttribs(objId);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 566207b..1f4ad57 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -19,234 +19,217 @@ package org.apache.atlas.repository.store.graph.v1;
import com.google.inject.Inject;
+import com.google.inject.Singleton;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.GraphTransaction;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
-import org.apache.atlas.model.instance.AtlasEntity.Status;
-import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.repository.graph.AtlasGraphProvider;
-import org.apache.atlas.repository.graph.GraphHelper;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
-import org.apache.atlas.repository.store.graph.EntityResolver;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+@Singleton
public class AtlasEntityStoreV1 implements AtlasEntityStore {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class);
- protected AtlasTypeRegistry typeRegistry;
-
- private final EntityGraphMapper graphMapper;
- private final AtlasGraph graph;
- private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class);
+ private final DeleteHandlerV1 deleteHandler;
+ private final AtlasTypeRegistry typeRegistry;
@Inject
- public AtlasEntityStoreV1(EntityGraphMapper vertexMapper) {
- this.graphMapper = vertexMapper;
- this.graph = AtlasGraphProvider.getGraphInstance();
+ public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry) {
+ this.deleteHandler = deleteHandler;
+ this.typeRegistry = typeRegistry;
}
- @Inject
- public void init(AtlasTypeRegistry typeRegistry) throws AtlasBaseException {
- this.typeRegistry = typeRegistry;
+ @Override
+ @GraphTransaction
+ public AtlasEntityWithExtInfo getById(String guid) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> getById({})", guid);
+ }
+
+ EntityGraphRetriever entityRetriever = new EntityGraphRetriever(typeRegistry);
+
+ AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(guid);
+
+ if (ret == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== getById({}): {}", guid, ret);
+ }
+
+ return ret;
}
@Override
- public AtlasEntityWithExtInfo getById(final String guid) throws AtlasBaseException {
+ @GraphTransaction
+ public AtlasEntitiesWithExtInfo getByIds(List<String> guids) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
- LOG.debug("Retrieving entity with guid={}", guid);
+ LOG.debug("==> getByIds({})", guids);
}
EntityGraphRetriever entityRetriever = new EntityGraphRetriever(typeRegistry);
- return entityRetriever.toAtlasEntityWithExtInfo(guid);
+ AtlasEntitiesWithExtInfo ret = entityRetriever.toAtlasEntitiesWithExtInfo(guids);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== getByIds({}): {}", guids, ret);
+ }
+
+ return ret;
}
@Override
- public AtlasEntityWithExtInfo getByUniqueAttribute(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException {
- String entityTypeName = entityType.getTypeName();
-
+ @GraphTransaction
+ public AtlasEntityWithExtInfo getByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes)
+ throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
- LOG.debug("Retrieving entity with type={} and attributes={}: values={}", entityTypeName, uniqAttributes);
+ LOG.debug("==> getByUniqueAttribute({}, {})", entityType.getTypeName(), uniqAttributes);
}
AtlasVertex entityVertex = AtlasGraphUtilsV1.getVertexByUniqueAttributes(entityType, uniqAttributes);
EntityGraphRetriever entityRetriever = new EntityGraphRetriever(typeRegistry);
- return entityRetriever.toAtlasEntityWithExtInfo(entityVertex);
- }
+ AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(entityVertex);
- @Override
- public EntityMutationResponse deleteById(final String guid) {
- return null;
+ if (ret == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, entityType.getTypeName(),
+ uniqAttributes.toString());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== getByUniqueAttribute({}, {}): {}", entityType.getTypeName(), uniqAttributes, ret);
+ }
+
+ return ret;
}
@Override
@GraphTransaction
- public EntityMutationResponse createOrUpdate(final Map<String, AtlasEntity> entities) throws AtlasBaseException {
-
+ public EntityMutationResponse createOrUpdate(EntityStream entityStream) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasEntityStoreV1.createOrUpdate({}, {})", entities);
+ LOG.debug("==> createOrUpdate()");
}
- //Validate
- List<AtlasEntity> normalizedEntities = validateAndNormalize(entities);
+ if (entityStream == null || !entityStream.hasNext()) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
+ }
- //Discover entities, create vertices
- EntityMutationContext ctx = preCreateOrUpdate(normalizedEntities);
+ EntityGraphMapper entityGraphMapper = new EntityGraphMapper(deleteHandler, typeRegistry);
+
+ // Create/Update entities
+ EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper);
+
+ EntityMutationResponse ret = entityGraphMapper.mapAttributes(context);
+
+ ret.setGuidAssignments(context.getGuidAssignments());
if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasStructDefStoreV1.createOrUpdate({}, {}): {}", entities);
+ LOG.debug("<== createOrUpdate()");
}
- return graphMapper.mapAttributes(ctx);
+ return ret;
}
@Override
- public AtlasEntitiesWithExtInfo getByIds(final List<String> guids) throws AtlasBaseException {
- return null;
+ @GraphTransaction
+ public EntityMutationResponse updateByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes,
+ AtlasEntity entity) throws AtlasBaseException {
+ throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "updateByUniqueAttributes() not implemented yet");
}
@Override
- public EntityMutationResponse deleteByIds(final List<String> guid) throws AtlasBaseException {
- return null;
+ @GraphTransaction
+ public EntityMutationResponse deleteById(String guid) throws AtlasBaseException {
+ throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "deleteById() not implemented yet");
}
@Override
- public EntityMutationResponse updateByUniqueAttribute(final String typeName, final String attributeName, final String attributeValue, final AtlasEntity entity) throws AtlasBaseException {
- return null;
+ @GraphTransaction
+ public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes)
+ throws AtlasBaseException {
+ throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "deleteByUniqueAttributes() not implemented yet");
}
@Override
- public EntityMutationResponse deleteByUniqueAttribute(final String typeName, final String attributeName, final String attributeValue) throws AtlasBaseException {
- return null;
+ @GraphTransaction
+ public EntityMutationResponse deleteByIds(List<String> guids) throws AtlasBaseException {
+ throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "deleteByIds() not implemented yet");
}
@Override
- public void addClassifications(final String guid, final List<AtlasClassification> classification) throws AtlasBaseException {
-
+ @GraphTransaction
+ public void addClassifications(String guid, List<AtlasClassification> classification) throws AtlasBaseException {
+ throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "addClassifications() not implemented yet");
}
@Override
- public void updateClassifications(final String guid, final List<AtlasClassification> classification) throws AtlasBaseException {
-
+ @GraphTransaction
+ public void updateClassifications(String guid, List<AtlasClassification> classification) throws AtlasBaseException {
+ throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "updateClassifications() not implemented yet");
}
@Override
- public void deleteClassifications(final String guid, final List<String> classificationNames) throws AtlasBaseException {
-
+ @GraphTransaction
+ public void deleteClassifications(String guid, List<String> classificationNames) throws AtlasBaseException {
+ throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "deleteClassifications() not implemented yet");
}
- private EntityMutationContext preCreateOrUpdate(final List<AtlasEntity> atlasEntities) throws AtlasBaseException {
- List<EntityResolver> entityResolvers = new ArrayList<>();
- entityResolvers.add(new IDBasedEntityResolver());
- entityResolvers.add(new UniqAttrBasedEntityResolver(typeRegistry));
+ private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper) throws AtlasBaseException {
+ EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityStream);
+ EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities();
+ EntityMutationContext context = new EntityMutationContext(discoveryContext);
- EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityResolvers);
- EntityGraphDiscoveryContext discoveredEntities = graphDiscoverer.discoverEntities(atlasEntities);
- EntityMutationContext context = new EntityMutationContext(discoveredEntities);
+ for (String guid : discoveryContext.getReferencedGuids()) {
+ AtlasVertex vertex = discoveryContext.getResolvedEntityVertex(guid);
+ AtlasEntity entity = entityStream.getByGuid(guid);
- for (AtlasEntity entity : discoveredEntities.getRootEntities()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasEntityStoreV1.preCreateOrUpdate({}): {}", entity);
- }
-
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
-
- if (entityType == null) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName());
- }
-
- final AtlasVertex vertex;
- AtlasObjectId objId = entity.getAtlasObjectId();
+ if (vertex != null) {
+ // entity would be null if guid is not in the stream but referenced by an entity in the stream
+ if (entity != null) {
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
- if (discoveredEntities.isResolvedId(objId) ) {
- vertex = discoveredEntities.getResolvedEntityVertex(objId);
+ context.addUpdated(entity, entityType, vertex);
- context.addUpdated(entity, entityType, vertex);
-
- String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
-
- RequestContextV1.get().recordEntityUpdate(new AtlasObjectId(entityType.getTypeName(), guid));
+ RequestContextV1.get().recordEntityUpdate(entity.getAtlasObjectId());
+ }
} else {
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+
//Create vertices which do not exist in the repository
- vertex = graphMapper.createVertexTemplate(entity, entityType);
+ vertex = entityGraphMapper.createVertex(entity);
- context.addCreated(entity, entityType, vertex);
+ discoveryContext.addResolvedGuid(guid, vertex);
- discoveredEntities.addResolvedId(objId, vertex);
- discoveredEntities.removeUnResolvedId(objId);
+ String generatedGuid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
- String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
+ entity.setGuid(generatedGuid);
- RequestContextV1.get().recordEntityCreate(new AtlasObjectId(entityType.getTypeName(), guid));
- }
+ context.addCreated(guid, entity, entityType, vertex);
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasEntityStoreV1.preCreateOrUpdate({}): {}", entity, vertex);
+ RequestContextV1.get().recordEntityCreate(entity.getAtlasObjectId());
}
}
return context;
}
-
- private List<AtlasEntity> validateAndNormalize(final Map<String, AtlasEntity> entities) throws AtlasBaseException {
- List<AtlasEntity> normalizedEntities = new ArrayList<>();
- List<String> messages = new ArrayList<>();
-
- for (String entityId : entities.keySet()) {
- if ( !AtlasEntity.isAssigned(entityId) && !AtlasEntity.isUnAssigned(entityId)) {
- throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, ": Guid in map key is invalid " + entityId);
- }
-
- AtlasEntity entity = entities.get(entityId);
-
- if ( entity == null) {
- throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, ": Entity is null for guid " + entityId);
- }
-
- AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName());
- if (type == null) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName());
- }
-
- type.validateValue(entity, entity.getTypeName(), messages);
-
- if ( !messages.isEmpty()) {
- throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages);
- }
-
- AtlasEntity normalizedEntity = (AtlasEntity) type.getNormalizedValue(entity);
-
- normalizedEntities.add(normalizedEntity);
- }
-
- return normalizedEntities;
- }
-
- public void cleanUp() throws AtlasBaseException {
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
new file mode 100644
index 0000000..010b626
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v1;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+
+import java.util.Iterator;
+
+public class AtlasEntityStream implements EntityStream {
+ private AtlasEntitiesWithExtInfo entitiesWithExtInfo = new AtlasEntitiesWithExtInfo();
+ private Iterator<AtlasEntity> iterator;
+
+ public AtlasEntityStream() {
+ }
+
+ public AtlasEntityStream(AtlasEntity entity) {
+ this(new AtlasEntitiesWithExtInfo(entity));
+ }
+
+ public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo) {
+ this(new AtlasEntitiesWithExtInfo(entityWithExtInfo));
+ }
+
+ public AtlasEntityStream(AtlasEntitiesWithExtInfo entitiesWithExtInfo) {
+ this.entitiesWithExtInfo = entitiesWithExtInfo;
+ this.iterator = this.entitiesWithExtInfo.getEntities().iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public AtlasEntity next() {
+ return iterator.hasNext() ? iterator.next() : null;
+ }
+
+ @Override
+ public void reset() {
+ this.iterator = entitiesWithExtInfo.getEntities().iterator();
+ }
+
+ @Override
+ public AtlasEntity getByGuid(String guid) {
+ return entitiesWithExtInfo.getEntity(guid);
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer("AtlasEntityStream{");
+
+ sb.append("entitiesWithExtInfo=").append(entitiesWithExtInfo);
+ sb.append(", iterator=").append(iterator);
+ sb.append('}');
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
index b17cf90..99f074b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
@@ -217,7 +217,7 @@ public class AtlasGraphUtilsV1 {
if (entityVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, entityType.getTypeName(),
- uniqAttributes.keySet().toString(), uniqAttributes.values().toString());
+ uniqAttributes.toString());
}
return entityVertex;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AttributeMutationContext.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AttributeMutationContext.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AttributeMutationContext.java
new file mode 100644
index 0000000..b6d82dd
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AttributeMutationContext.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v1;
+
+
+import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasType;
+
+
+import java.util.Objects;
+
+public class AttributeMutationContext {
+ private EntityOperation op;
+ /**
+ * Atlas Attribute
+ */
+
+ private AtlasAttribute attribute;
+
+ /**
+ * Overriding type for which elements are being mapped
+ */
+ private AtlasType currentElementType;
+
+ /**
+ * Current attribute value/entity/Struct instance
+ */
+ private Object value;
+
+ private String vertexProperty;
+
+ /**
+ *
+ * The vertex which corresponds to the entity/struct for which we are mapping a complex attributes like struct, traits
+ */
+ AtlasVertex referringVertex;
+
+ /**
+ * The current edge(in case of updates) from the parent entity/struct to the complex attribute like struct, trait
+ */
+ AtlasEdge existingEdge;
+
+ public AttributeMutationContext(EntityOperation op, AtlasVertex referringVertex, AtlasAttribute attribute, Object value) {
+ this(op, referringVertex, attribute, value, attribute.getVertexPropertyName(), null, null);
+ }
+
+ public AttributeMutationContext(EntityOperation op, AtlasVertex referringVertex, AtlasAttribute attribute, Object value,
+ String vertexProperty, AtlasType currentElementType, AtlasEdge currentEdge) {
+ this.op = op;
+ this.referringVertex = referringVertex;
+ this.attribute = attribute;
+ this.value = value;
+ this.vertexProperty = vertexProperty;
+ this.currentElementType = currentElementType;
+ this.existingEdge = currentEdge;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(op, referringVertex, attribute, value, vertexProperty, currentElementType, existingEdge);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ } else if (obj == this) {
+ return true;
+ } else if (obj.getClass() != getClass()) {
+ return false;
+ } else {
+ AttributeMutationContext rhs = (AttributeMutationContext) obj;
+ return Objects.equals(op, rhs.op)
+ && Objects.equals(referringVertex, rhs.referringVertex)
+ && Objects.equals(attribute, rhs.attribute)
+ && Objects.equals(value, rhs.value)
+ && Objects.equals(vertexProperty, rhs.vertexProperty)
+ && Objects.equals(currentElementType, rhs.currentElementType)
+ && Objects.equals(existingEdge, rhs.existingEdge);
+ }
+ }
+
+
+ public AtlasStructType getParentType() {
+ return attribute.getDefinedInType();
+ }
+
+ public AtlasStructDef getStructDef() {
+ return attribute.getDefinedInDef();
+ }
+
+ public AtlasAttributeDef getAttributeDef() {
+ return attribute.getAttributeDef();
+ }
+
+ public AtlasType getAttrType() {
+ return currentElementType == null ? attribute.getAttributeType() : currentElementType;
+ }
+
+ public AtlasType getCurrentElementType() {
+ return currentElementType;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public String getVertexProperty() { return vertexProperty; }
+
+ public AtlasVertex getReferringVertex() { return referringVertex; }
+
+ public AtlasEdge getCurrentEdge() {
+ return existingEdge;
+ }
+
+ public void setElementType(final AtlasType attrType) {
+ this.currentElementType = attrType;
+ }
+
+ public AtlasAttribute getAttribute() {
+ return attribute;
+ }
+
+ public EntityOperation getOp() {
+ return op;
+ }
+
+ public void setExistingEdge(AtlasEdge existingEdge) { this.existingEdge = existingEdge; }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
index 14013fb..ff13ea5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
@@ -98,7 +98,7 @@ public abstract class DeleteHandlerV1 {
// Record all deletion candidate GUIDs in RequestContext
// and gather deletion candidate vertices.
for (GraphHelper.VertexInfo vertexInfo : compositeVertices) {
- requestContext.recordEntityDelete(new AtlasObjectId(vertexInfo.getTypeName(), vertexInfo.getGuid()));
+ requestContext.recordEntityDelete(new AtlasObjectId(vertexInfo.getGuid(), vertexInfo.getTypeName()));
deletionCandidateVertices.add(vertexInfo.getVertex());
}
}
@@ -324,7 +324,7 @@ public abstract class DeleteHandlerV1 {
String propertyName = AtlasGraphUtilsV1.getQualifiedAttributePropertyKey(structType, attributeInfo.getName());
if (AtlasGraphUtilsV1.isReference(valueTypeCategory)) {
- List<Object> keys = ArrayVertexMapper.getArrayElementsProperty(keyType, instanceVertex, propertyName);
+ List<Object> keys = EntityGraphMapper.getArrayElementsProperty(keyType, instanceVertex, propertyName);
if (keys != null) {
for (Object key : keys) {
String mapEdgeLabel = GraphHelper.getQualifiedNameForMapKey(edgeLabel, (String) key);
@@ -513,7 +513,7 @@ public abstract class DeleteHandlerV1 {
GraphHelper.setProperty(outVertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY,
requestContext.getRequestTime());
GraphHelper.setProperty(outVertex, Constants.MODIFIED_BY_KEY, requestContext.getUser());
- requestContext.recordEntityUpdate(new AtlasObjectId(typeName, outId));
+ requestContext.recordEntityUpdate(new AtlasObjectId(outId, typeName));
}
}