You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/02/13 07:44:00 UTC

[4/5] incubator-atlas git commit: ATLAS-1544: implementation of REST endpoints for entity create/update/bulk-get

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/ArrayVertexMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/ArrayVertexMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/ArrayVertexMapper.java
deleted file mode 100644
index ddd2242..0000000
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/ArrayVertexMapper.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.repository.store.graph.v1;
-
-import com.google.common.base.Optional;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
-import org.apache.atlas.repository.graph.GraphHelper;
-import org.apache.atlas.repository.graphdb.AtlasEdge;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.type.AtlasArrayType;
-import org.apache.atlas.type.AtlasStructType;
-import org.apache.atlas.type.AtlasType;
-import org.apache.commons.collections.CollectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.atlas.repository.graph.GraphHelper.string;
-
-@Singleton
-public class ArrayVertexMapper implements InstanceGraphMapper<List> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ArrayVertexMapper.class);
-
-    protected final DeleteHandlerV1 deleteHandler;
-
-    protected StructVertexMapper structVertexMapper;
-
-    @Inject
-    public ArrayVertexMapper(DeleteHandlerV1 deleteHandler) {
-        this.deleteHandler = deleteHandler;
-    }
-
-    void init(StructVertexMapper structVertexMapper) {
-        this.structVertexMapper = structVertexMapper;
-    }
-
-    @Override
-    public List toGraph(GraphMutationContext ctx) throws AtlasBaseException {
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Mapping instance to vertex {} for array attribute {}", string(ctx.getReferringVertex()), ctx.getAttrType().getTypeName());
-        }
-
-        List newElements = (List) ctx.getValue();
-        boolean newAttributeEmpty = (newElements == null || newElements.isEmpty());
-
-        AtlasArrayType arrType = (AtlasArrayType) ctx.getAttrType();
-        AtlasType elementType = arrType.getElementType();
-        List<Object> currentElements = getArrayElementsProperty(elementType, ctx.getReferringVertex(), ctx.getVertexPropertyKey());
-
-        List<Object> newElementsCreated = new ArrayList<>();
-
-        if (!newAttributeEmpty) {
-            for (int index = 0; index < newElements.size(); index++) {
-
-                LOG.debug("Adding/updating element at position {}, current element {}, new element {}", index,
-                    (currentElements != null && index < currentElements.size()) ? currentElements.get(index) : null, newElements.get(index));
-
-                Optional<AtlasEdge> existingEdge = getEdgeAt(currentElements, index, arrType.getElementType());
-
-                GraphMutationContext arrCtx = new GraphMutationContext.Builder(ctx.getOp(), ctx.getAttribute(),
-                    arrType.getElementType(), newElements.get(index))
-                    .referringVertex(ctx.getReferringVertex())
-                    .edge(existingEdge)
-                    .vertexProperty(ctx.getVertexPropertyKey()).build();
-
-                Object newEntry = structVertexMapper.mapCollectionElementsToVertex(arrCtx);
-                newElementsCreated.add(newEntry);
-            }
-        }
-
-        if (AtlasGraphUtilsV1.isReference(elementType)) {
-            List<AtlasEdge> additionalEdges = removeUnusedArrayEntries(ctx.getParentType(), ctx.getAttributeDef(), (List) currentElements, (List) newElementsCreated, elementType);
-            newElementsCreated.addAll(additionalEdges);
-        }
-
-        // for dereference on way out
-        setArrayElementsProperty(elementType, ctx.getReferringVertex(), ctx.getVertexPropertyKey(), newElementsCreated);
-        return newElementsCreated;
-    }
-
-    @Override
-    public void cleanUp() throws AtlasBaseException {
-
-    }
-
-    //Removes unused edges from the old collection, compared to the new collection
-    private List<AtlasEdge> removeUnusedArrayEntries(
-        AtlasStructType entityType,
-        AtlasAttributeDef attributeDef,
-        List<AtlasEdge> currentEntries,
-        List<AtlasEdge> newEntries,
-        AtlasType entryType) throws AtlasBaseException {
-        if (currentEntries != null && !currentEntries.isEmpty()) {
-            LOG.debug("Removing unused entries from the old collection");
-            if (AtlasGraphUtilsV1.isReference(entryType)) {
-
-                Collection<AtlasEdge> edgesToRemove = CollectionUtils.subtract(currentEntries, newEntries);
-
-                LOG.debug("Removing unused entries from the old collection - {}", edgesToRemove);
-
-                if (!edgesToRemove.isEmpty()) {
-                    //Remove the edges for (current edges - new edges)
-                    List<AtlasEdge> additionalElements = new ArrayList<>();
-
-                    for (AtlasEdge edge : edgesToRemove) {
-                        boolean deleteChildReferences = StructVertexMapper.shouldManageChildReferences(entityType, attributeDef.getName());
-                        boolean deleted = deleteHandler.deleteEdgeReference(edge, entryType.getTypeCategory(),
-                            deleteChildReferences, true);
-                        if (!deleted) {
-                            additionalElements.add(edge);
-                        }
-                    }
-
-                    return additionalElements;
-                }
-            }
-        }
-        return Collections.emptyList();
-    }
-
-    public static List<Object> getArrayElementsProperty(AtlasType elementType, AtlasVertex instanceVertex, String propertyName) {
-        String actualPropertyName = GraphHelper.encodePropertyKey(propertyName);
-        if (AtlasGraphUtilsV1.isReference(elementType)) {
-            return (List)instanceVertex.getListProperty(actualPropertyName, AtlasEdge.class);
-        }
-        else {
-            return (List)instanceVertex.getListProperty(actualPropertyName);
-        }
-    }
-
-    private Optional<AtlasEdge> getEdgeAt(List<Object> currentElements, int index, AtlasType elemType) {
-        Optional<AtlasEdge> existingEdge = Optional.absent();
-        if ( AtlasGraphUtilsV1.isReference(elemType) ) {
-            Object currentElement = (currentElements != null && index < currentElements.size()) ?
-                currentElements.get(index) : null;
-
-            if ( currentElement != null) {
-                existingEdge = Optional.of((AtlasEdge) currentElement);
-            }
-        }
-
-        return existingEdge;
-    }
-
-    private void setArrayElementsProperty(AtlasType elementType, AtlasVertex instanceVertex, String propertyName, List<Object> values) {
-        String actualPropertyName = GraphHelper.encodePropertyKey(propertyName);
-        if (AtlasGraphUtilsV1.isReference(elementType)) {
-            GraphHelper.setListPropertyFromElementIds(instanceVertex, actualPropertyName, (List) values);
-        }
-        else {
-            GraphHelper.setProperty(instanceVertex, actualPropertyName, values);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
index 2b0804f..7141911 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
@@ -17,10 +17,10 @@
  */
 package org.apache.atlas.repository.store.graph.v1;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,7 +31,6 @@ import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasStruct;
-import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
 import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
 import org.apache.atlas.repository.store.graph.EntityResolver;
@@ -41,38 +40,21 @@ import org.apache.atlas.type.AtlasMapType;
 import org.apache.atlas.type.AtlasStructType;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.commons.lang3.StringUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery {
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityGraphDiscoveryV1.class);
 
     private final AtlasTypeRegistry           typeRegistry;
-    private final EntityGraphDiscoveryContext discoveredEntities;
-    private final Set<String>                 processedIds    = new HashSet<>();
-    private final Collection<EntityResolver>  entityResolvers = new LinkedHashSet<>();
-
-    @Inject
-    public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, Collection<Provider<EntityResolver>> entityResolverProviders) {
-        this.typeRegistry       = typeRegistry;
-        this.discoveredEntities = new EntityGraphDiscoveryContext(typeRegistry);
+    private final EntityGraphDiscoveryContext discoveryContext;
 
-        for (Provider<EntityResolver> entityResolverProvider : entityResolverProviders) {
-             entityResolvers.add(entityResolverProvider.get());
-        }
-    }
-
-    @VisibleForTesting
-    public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, List<EntityResolver> entityResolvers) {
-        this.typeRegistry       = typeRegistry;
-        this.discoveredEntities = new EntityGraphDiscoveryContext(typeRegistry);
-
-        for (EntityResolver entityResolver : entityResolvers) {
-            this.entityResolvers.add(entityResolver);
-        }
+    public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, EntityStream entityStream) {
+        this.typeRegistry     = typeRegistry;
+        this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry, entityStream);
     }
 
     @Override
@@ -81,161 +63,239 @@ public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery {
     }
 
     @Override
-    public EntityGraphDiscoveryContext discoverEntities(final List<AtlasEntity> entities) throws AtlasBaseException {
+    public EntityGraphDiscoveryContext discoverEntities() throws AtlasBaseException {
 
-        //walk the graph and discover entity references
-        discover(entities);
+        // walk through entities in stream and validate them; record entity references
+        discoverAndValidate();
 
-        //resolve root and referred entities
+        // resolve entity references discovered in previous step
         resolveReferences();
 
-        return discoveredEntities;
+        return discoveryContext;
     }
 
     @Override
     public void cleanUp() throws AtlasBaseException {
-        processedIds.clear();
-        discoveredEntities.cleanUp();
-
-        for (EntityResolver resolver : entityResolvers) {
-            resolver.cleanUp();
-        }
+        discoveryContext.cleanUp();
     }
 
 
-    protected void discover(List<AtlasEntity> entities) throws AtlasBaseException {
-        for (AtlasEntity entity : entities) {
-            AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName());
+    protected void discoverAndValidate() throws AtlasBaseException {
+        EntityStream entityStream = discoveryContext.getEntityStream();
+
+        Set<String> walkedEntities = new HashSet<>();
+
+        // walk through top-level entities and find entity references
+        while (entityStream.hasNext()) {
+            AtlasEntity entity = entityStream.next();
+
+            if (entity != null) {
+                walkEntityGraph(entity);
+
+                walkedEntities.add(entity.getGuid());
+            }
+        }
+
+        // walk through entities referenced by other entities
+        // referencedGuids will be updated within this for() loop; avoid use of iterators
+        List<String> referencedGuids = discoveryContext.getReferencedGuids();
+        for (int i = 0; i < referencedGuids.size(); i++) {
+            String guid = referencedGuids.get(i);
 
-            if (type == null) {
-                throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName());
+            if (walkedEntities.contains(guid)) {
+                continue;
             }
 
-            discoveredEntities.addRootEntity(entity);
+            AtlasEntity entity = entityStream.getByGuid(guid);
 
-            walkEntityGraph(type, entity);
+            if (entity != null) {
+                walkEntityGraph(entity);
+
+                walkedEntities.add(entity.getGuid());
+            }
         }
     }
 
     protected void resolveReferences() throws AtlasBaseException {
-        for (EntityResolver resolver : entityResolvers) {
-            resolver.init(discoveredEntities);
+        EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(),
+                                                                  new UniqAttrBasedEntityResolver(typeRegistry)
+                                                                };
 
-            resolver.resolveEntityReferences();
+        for (EntityResolver resolver : entityResolvers) {
+            resolver.resolveEntityReferences(discoveryContext);
         }
+    }
 
-        if (discoveredEntities.hasUnresolvedReferences()) {
-            throw new AtlasBaseException(AtlasErrorCode.UNRESOLVED_REFERENCES_FOUND,
-                                                         discoveredEntities.getUnresolvedIds().toString(),
-                                                         discoveredEntities.getUnresolvedIdsByUniqAttribs().toString());
+    private void visitReference(AtlasEntityType type, Object val) throws AtlasBaseException {
+        if (type == null || val == null) {
+            return;
         }
-    }
 
-    private void visitReference(AtlasEntityType type, Object entity) throws AtlasBaseException {
-        if (entity != null) {
-            if (entity instanceof AtlasObjectId) {
-                AtlasObjectId objId = (AtlasObjectId)entity;
+        if (val instanceof AtlasObjectId) {
+            AtlasObjectId objId = (AtlasObjectId)val;
+
+            if (!objId.isValid()) {
+                throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, objId.toString());
+            }
 
-                if (!objId.isValid()) {
-                    throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, "Invalid object id " + objId);
-                }
+            recordObjectReference(objId);
+        } else if (val instanceof Map) {
+            AtlasObjectId objId = new AtlasObjectId((Map)val);
 
-                if (!StringUtils.isEmpty(objId.getGuid()) && (objId.isAssignedGuid() || objId.isUnAssignedGuid())) {
-                    discoveredEntities.addUnResolvedId(objId);
-                } else {
-                    discoveredEntities.addUnresolvedIdByUniqAttribs(objId);
-                }
-            } else if (entity instanceof AtlasEntity) {
-                throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, "Use AtlasObjectId to refer to another instance instead of AtlasEntity " + type.getTypeName());
-            } else {
-                throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, "Invalid object type " + entity.getClass());
+            if (!objId.isValid()) {
+                throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, objId.toString());
             }
+
+            recordObjectReference(objId);
+        } else if (val instanceof AtlasEntity) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, "found AtlasEntity");
+        } else {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
         }
     }
 
-    void visitAttribute(AtlasStructType parentType, AtlasType attrType, AtlasAttributeDef attrDef, Object val) throws AtlasBaseException {
-        if (val != null) {
-            if ( isPrimitive(attrType.getTypeCategory()) ) {
-                return;
-            }
-            if (attrType.getTypeCategory() == TypeCategory.ARRAY) {
-                AtlasArrayType arrayType = (AtlasArrayType) attrType;
-                AtlasType      elemType  = arrayType.getElementType();
-
-                visitCollectionReferences(parentType, attrType, attrDef, elemType, val);
-            } else if (attrType.getTypeCategory() == TypeCategory.MAP) {
-                AtlasType keyType   = ((AtlasMapType) attrType).getKeyType();
-                AtlasType valueType = ((AtlasMapType) attrType).getValueType();
-
-                visitMapReferences(parentType, attrType, attrDef, keyType, valueType, val);
-            } else if (attrType.getTypeCategory() == TypeCategory.STRUCT) {
-                visitStruct((AtlasStructType)attrType, val);
-            } else if (attrType.getTypeCategory() == TypeCategory.ENTITY) {
-                visitReference((AtlasEntityType) attrType,  val);
-            }
+    void visitAttribute(AtlasType attrType, Object val) throws AtlasBaseException {
+        if (attrType == null || val == null) {
+            return;
+        }
+
+        if (isPrimitive(attrType.getTypeCategory()) ) {
+            return;
+        }
+        if (attrType.getTypeCategory() == TypeCategory.ARRAY) {
+            AtlasArrayType arrayType = (AtlasArrayType) attrType;
+            AtlasType      elemType  = arrayType.getElementType();
+
+            visitCollectionReferences(elemType, val);
+        } else if (attrType.getTypeCategory() == TypeCategory.MAP) {
+            AtlasType keyType   = ((AtlasMapType) attrType).getKeyType();
+            AtlasType valueType = ((AtlasMapType) attrType).getValueType();
+
+            visitMapReferences(keyType, valueType, val);
+        } else if (attrType.getTypeCategory() == TypeCategory.STRUCT) {
+            visitStruct((AtlasStructType)attrType, val);
+        } else if (attrType.getTypeCategory() == TypeCategory.ENTITY) {
+            visitReference((AtlasEntityType) attrType,  val);
         }
     }
 
-    void visitMapReferences(AtlasStructType parentType, final AtlasType attrType, AtlasAttributeDef attrDef, AtlasType keyType, AtlasType valueType, Object val) throws AtlasBaseException {
+    void visitMapReferences(AtlasType keyType, AtlasType valueType, Object val) throws AtlasBaseException {
+        if (keyType == null || valueType == null || val == null) {
+            return;
+        }
+
         if (isPrimitive(keyType.getTypeCategory()) && isPrimitive(valueType.getTypeCategory())) {
             return;
         }
 
-        if (val != null) {
-            if (Map.class.isAssignableFrom(val.getClass())) {
-                Iterator<Map.Entry> it = ((Map) val).entrySet().iterator();
-                while (it.hasNext()) {
-                    Map.Entry e = it.next();
-                    visitAttribute(parentType, keyType, attrDef, e.getKey());
-                    visitAttribute(parentType, valueType, attrDef, e.getValue());
-                }
+        if (Map.class.isAssignableFrom(val.getClass())) {
+            Iterator<Map.Entry> it = ((Map) val).entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry e = it.next();
+                visitAttribute(keyType, e.getKey());
+                visitAttribute(valueType, e.getValue());
             }
         }
     }
 
-    void visitCollectionReferences(final AtlasStructType parentType, final AtlasType attrType, final AtlasAttributeDef attrDef, AtlasType elemType, Object val) throws AtlasBaseException {
-        if (isPrimitive(elemType.getTypeCategory())) {
+    void visitCollectionReferences(AtlasType elemType, Object val) throws AtlasBaseException {
+        if (elemType == null || val == null || isPrimitive(elemType.getTypeCategory())) {
             return;
         }
 
-        if (val != null) {
-            Iterator it = null;
-            if (val instanceof Collection) {
-                it = ((Collection) val).iterator();
-            } else if (val instanceof Iterable) {
-                it = ((Iterable) val).iterator();
-            } else if (val instanceof Iterator) {
-                it = (Iterator) val;
-            }
-            if (it != null) {
-                while (it.hasNext()) {
-                    Object elem = it.next();
-                    visitAttribute(parentType, elemType, attrDef, elem);
-                }
+        Iterator it = null;
+
+        if (val instanceof Collection) {
+            it = ((Collection) val).iterator();
+        } else if (val instanceof Iterable) {
+            it = ((Iterable) val).iterator();
+        } else if (val instanceof Iterator) {
+            it = (Iterator) val;
+        }
+
+        if (it != null) {
+            while (it.hasNext()) {
+                Object elem = it.next();
+                visitAttribute(elemType, elem);
             }
         }
     }
 
     void visitStruct(AtlasStructType structType, Object val) throws AtlasBaseException {
-        if (structType == null) {
+        if (structType == null || val == null) {
             return;
         }
 
-        for (AtlasStructType.AtlasAttribute attribute : structType.getAllAttributes().values()) {
+        AtlasStruct struct;
+
+        if (val instanceof AtlasStruct) {
+            struct = (AtlasStruct) val;
+        } else if (val instanceof Map) {
+            Map attributes = AtlasTypeUtil.toStructAttributes((Map) val);
+
+            struct = new AtlasStruct(structType.getTypeName(), attributes);
+        } else {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_STRUCT_VALUE, val.toString());
+        }
+
+        for (AtlasAttribute attribute : structType.getAllAttributes().values()) {
             AtlasType attrType = attribute.getAttributeType();
-            Object    attrVal  = ((AtlasStruct) val).getAttribute(attribute.getName());
+            Object    attrVal  = struct.getAttribute(attribute.getName());
 
-            visitAttribute(structType, attrType, attribute.getAttributeDef(), attrVal);
+            visitAttribute(attrType, attrVal);
         }
     }
 
 
-    void walkEntityGraph(AtlasEntityType entityType, AtlasEntity entity) throws AtlasBaseException {
-        visitStruct(entityType, entity);
+    void walkEntityGraph(AtlasEntity entity) throws AtlasBaseException {
+        if (entity == null) {
+            return;
+        }
+
+        validateAndNormalize(entity);
+        AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+        recordObjectReference(entity.getGuid());
+
+        visitStruct(type, entity);
     }
 
 
     boolean isPrimitive(TypeCategory typeCategory) {
         return typeCategory == TypeCategory.PRIMITIVE || typeCategory == TypeCategory.ENUM;
     }
+
+    private void validateAndNormalize(AtlasEntity entity) throws AtlasBaseException {
+        List<String> messages = new ArrayList<>();
+
+        if (!AtlasEntity.isAssigned(entity.getGuid()) && !AtlasEntity.isUnAssigned(entity.getGuid())) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, "invalid guid " + entity.getGuid());
+        }
+
+        AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+        if (type == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName());
+        }
+
+        type.validateValue(entity, entity.getTypeName(), messages);
+
+        if (!messages.isEmpty()) {
+            throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages);
+        }
+
+        type.getNormalizedValue(entity);
+    }
+
+    private void recordObjectReference(String guid) {
+        discoveryContext.addReferencedGuid(guid);
+    }
+
+    private void recordObjectReference(AtlasObjectId objId) {
+        if (objId.isValidGuid()) {
+            discoveryContext.addReferencedGuid(objId.getGuid());
+        } else {
+            discoveryContext.addReferencedByUniqAttribs(objId);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 566207b..1f4ad57 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -19,234 +19,217 @@ package org.apache.atlas.repository.store.graph.v1;
 
 
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.GraphTransaction;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
-import org.apache.atlas.model.instance.AtlasEntity.Status;
-import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.repository.graph.AtlasGraphProvider;
-import org.apache.atlas.repository.graph.GraphHelper;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
 import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
-import org.apache.atlas.repository.store.graph.EntityResolver;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 
+@Singleton
 public class AtlasEntityStoreV1 implements AtlasEntityStore {
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class);
 
-    protected AtlasTypeRegistry typeRegistry;
-
-    private final EntityGraphMapper graphMapper;
-    private final AtlasGraph        graph;
 
-    private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class);
+    private final DeleteHandlerV1   deleteHandler;
+    private final AtlasTypeRegistry typeRegistry;
 
     @Inject
-    public AtlasEntityStoreV1(EntityGraphMapper vertexMapper) {
-        this.graphMapper  = vertexMapper;
-        this.graph        = AtlasGraphProvider.getGraphInstance();
+    public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry) {
+        this.deleteHandler = deleteHandler;
+        this.typeRegistry  = typeRegistry;
     }
 
-    @Inject
-    public void init(AtlasTypeRegistry typeRegistry) throws AtlasBaseException {
-        this.typeRegistry = typeRegistry;
+    @Override
+    @GraphTransaction
+    public AtlasEntityWithExtInfo getById(String guid) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> getById({})", guid);
+        }
+
+        EntityGraphRetriever entityRetriever = new EntityGraphRetriever(typeRegistry);
+
+        AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(guid);
+
+        if (ret == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== getById({}): {}", guid, ret);
+        }
+
+        return ret;
     }
 
     @Override
-    public AtlasEntityWithExtInfo getById(final String guid) throws AtlasBaseException {
+    @GraphTransaction
+    public AtlasEntitiesWithExtInfo getByIds(List<String> guids) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Retrieving entity with guid={}", guid);
+            LOG.debug("==> getByIds({})", guids);
         }
 
         EntityGraphRetriever entityRetriever = new EntityGraphRetriever(typeRegistry);
 
-        return entityRetriever.toAtlasEntityWithExtInfo(guid);
+        AtlasEntitiesWithExtInfo ret = entityRetriever.toAtlasEntitiesWithExtInfo(guids);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== getByIds({}): {}", guids, ret);
+        }
+
+        return ret;
     }
 
     @Override
-    public AtlasEntityWithExtInfo getByUniqueAttribute(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException {
-        String entityTypeName = entityType.getTypeName();
-
+    @GraphTransaction
+    public AtlasEntityWithExtInfo getByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes)
+                                                                                            throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Retrieving entity with type={} and attributes={}: values={}", entityTypeName, uniqAttributes);
+            LOG.debug("==> getByUniqueAttribute({}, {})", entityType.getTypeName(), uniqAttributes);
         }
 
         AtlasVertex entityVertex = AtlasGraphUtilsV1.getVertexByUniqueAttributes(entityType, uniqAttributes);
 
         EntityGraphRetriever entityRetriever = new EntityGraphRetriever(typeRegistry);
 
-        return entityRetriever.toAtlasEntityWithExtInfo(entityVertex);
-    }
+        AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(entityVertex);
 
-    @Override
-    public EntityMutationResponse deleteById(final String guid) {
-        return null;
+        if (ret == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, entityType.getTypeName(),
+                                         uniqAttributes.toString());
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== getByUniqueAttribute({}, {}): {}", entityType.getTypeName(), uniqAttributes, ret);
+        }
+
+        return ret;
     }
 
     @Override
     @GraphTransaction
-    public EntityMutationResponse createOrUpdate(final Map<String, AtlasEntity> entities) throws AtlasBaseException {
-
+    public EntityMutationResponse createOrUpdate(EntityStream entityStream) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> AtlasEntityStoreV1.createOrUpdate({}, {})", entities);
+            LOG.debug("==> createOrUpdate()");
         }
 
-        //Validate
-        List<AtlasEntity> normalizedEntities = validateAndNormalize(entities);
+        if (entityStream == null || !entityStream.hasNext()) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
+        }
 
-        //Discover entities, create vertices
-        EntityMutationContext ctx = preCreateOrUpdate(normalizedEntities);
+        EntityGraphMapper entityGraphMapper = new EntityGraphMapper(deleteHandler, typeRegistry);
+
+        // Create/Update entities
+        EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper);
+
+        EntityMutationResponse ret = entityGraphMapper.mapAttributes(context);
+
+        ret.setGuidAssignments(context.getGuidAssignments());
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("<== AtlasStructDefStoreV1.createOrUpdate({}, {}): {}", entities);
+            LOG.debug("<== createOrUpdate()");
         }
 
-        return graphMapper.mapAttributes(ctx);
+        return ret;
     }
 
     @Override
-    public AtlasEntitiesWithExtInfo getByIds(final List<String> guids) throws AtlasBaseException {
-        return null;
+    @GraphTransaction
+    public EntityMutationResponse updateByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes,
+                                                          AtlasEntity entity) throws AtlasBaseException {
+        throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "updateByUniqueAttributes() not implemented yet");
     }
 
     @Override
-    public EntityMutationResponse deleteByIds(final List<String> guid) throws AtlasBaseException {
-        return null;
+    @GraphTransaction
+    public EntityMutationResponse deleteById(String guid) throws AtlasBaseException {
+        throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "deleteById() not implemented yet");
     }
 
     @Override
-    public EntityMutationResponse updateByUniqueAttribute(final String typeName, final String attributeName, final String attributeValue, final AtlasEntity entity) throws AtlasBaseException {
-        return null;
+    @GraphTransaction
+    public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes)
+            throws AtlasBaseException {
+        throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "deleteByUniqueAttributes() not implemented yet");
     }
 
     @Override
-    public EntityMutationResponse deleteByUniqueAttribute(final String typeName, final String attributeName, final String attributeValue) throws AtlasBaseException {
-        return null;
+    @GraphTransaction
+    public EntityMutationResponse deleteByIds(List<String> guids) throws AtlasBaseException {
+        throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "deleteByIds() not implemented yet");
     }
 
     @Override
-    public void addClassifications(final String guid, final List<AtlasClassification> classification) throws AtlasBaseException {
-
+    @GraphTransaction
+    public void addClassifications(String guid, List<AtlasClassification> classification) throws AtlasBaseException {
+        throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "addClassifications() not implemented yet");
     }
 
     @Override
-    public void updateClassifications(final String guid, final List<AtlasClassification> classification) throws AtlasBaseException {
-
+    @GraphTransaction
+    public void updateClassifications(String guid, List<AtlasClassification> classification) throws AtlasBaseException {
+        throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "updateClassifications() not implemented yet");
     }
 
     @Override
-    public void deleteClassifications(final String guid, final List<String> classificationNames) throws AtlasBaseException {
-
+    @GraphTransaction
+    public void deleteClassifications(String guid, List<String> classificationNames) throws AtlasBaseException {
+        throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "deleteClassifications() not implemented yet");
     }
 
-    private EntityMutationContext preCreateOrUpdate(final List<AtlasEntity> atlasEntities) throws AtlasBaseException {
-        List<EntityResolver> entityResolvers = new ArrayList<>();
 
-        entityResolvers.add(new IDBasedEntityResolver());
-        entityResolvers.add(new UniqAttrBasedEntityResolver(typeRegistry));
+    private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper) throws AtlasBaseException {
+        EntityGraphDiscovery        graphDiscoverer  = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityStream);
+        EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities();
+        EntityMutationContext       context          = new EntityMutationContext(discoveryContext);
 
-        EntityGraphDiscovery        graphDiscoverer    = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityResolvers);
-        EntityGraphDiscoveryContext discoveredEntities = graphDiscoverer.discoverEntities(atlasEntities);
-        EntityMutationContext       context            = new EntityMutationContext(discoveredEntities);
+        for (String guid : discoveryContext.getReferencedGuids()) {
+            AtlasVertex vertex = discoveryContext.getResolvedEntityVertex(guid);
+            AtlasEntity entity = entityStream.getByGuid(guid);
 
-        for (AtlasEntity entity : discoveredEntities.getRootEntities()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("==> AtlasEntityStoreV1.preCreateOrUpdate({}): {}", entity);
-            }
-
-            AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
-
-            if (entityType == null) {
-                throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName());
-            }
-
-            final AtlasVertex vertex;
-            AtlasObjectId     objId = entity.getAtlasObjectId();
+            if (vertex != null) {
+                // entity would be null if guid is not in the stream but referenced by an entity in the stream
+                if (entity != null) {
+                    AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
 
-            if (discoveredEntities.isResolvedId(objId) ) {
-                vertex = discoveredEntities.getResolvedEntityVertex(objId);
+                    context.addUpdated(entity, entityType, vertex);
 
-                context.addUpdated(entity, entityType, vertex);
-
-                String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
-
-                RequestContextV1.get().recordEntityUpdate(new AtlasObjectId(entityType.getTypeName(), guid));
+                    RequestContextV1.get().recordEntityUpdate(entity.getAtlasObjectId());
+                }
             } else {
+                AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+
                 //Create vertices which do not exist in the repository
-                vertex = graphMapper.createVertexTemplate(entity, entityType);
+                vertex = entityGraphMapper.createVertex(entity);
 
-                context.addCreated(entity, entityType, vertex);
+                discoveryContext.addResolvedGuid(guid, vertex);
 
-                discoveredEntities.addResolvedId(objId, vertex);
-                discoveredEntities.removeUnResolvedId(objId);
+                String generatedGuid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
 
-                String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
+                entity.setGuid(generatedGuid);
 
-                RequestContextV1.get().recordEntityCreate(new AtlasObjectId(entityType.getTypeName(), guid));
-            }
+                context.addCreated(guid, entity, entityType, vertex);
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("<== AtlasEntityStoreV1.preCreateOrUpdate({}): {}", entity, vertex);
+                RequestContextV1.get().recordEntityCreate(entity.getAtlasObjectId());
             }
         }
 
         return context;
     }
-
-    private List<AtlasEntity> validateAndNormalize(final Map<String, AtlasEntity> entities) throws AtlasBaseException {
-        List<AtlasEntity> normalizedEntities = new ArrayList<>();
-        List<String>      messages           = new ArrayList<>();
-
-        for (String entityId : entities.keySet()) {
-            if ( !AtlasEntity.isAssigned(entityId) && !AtlasEntity.isUnAssigned(entityId)) {
-                throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, ": Guid in map key is invalid " + entityId);
-            }
-
-            AtlasEntity entity = entities.get(entityId);
-
-            if ( entity == null) {
-                throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, ": Entity is null for guid " + entityId);
-            }
-
-            AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName());
-            if (type == null) {
-                throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName());
-            }
-
-            type.validateValue(entity, entity.getTypeName(), messages);
-
-            if ( !messages.isEmpty()) {
-                throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages);
-            }
-
-            AtlasEntity normalizedEntity = (AtlasEntity) type.getNormalizedValue(entity);
-
-            normalizedEntities.add(normalizedEntity);
-        }
-
-        return normalizedEntities;
-    }
-
-    public void cleanUp() throws AtlasBaseException {
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
new file mode 100644
index 0000000..010b626
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v1;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+
+import java.util.Iterator;
+
+public class AtlasEntityStream implements EntityStream {
+    private AtlasEntitiesWithExtInfo entitiesWithExtInfo = new AtlasEntitiesWithExtInfo();
+    private Iterator<AtlasEntity>    iterator;
+
+    public AtlasEntityStream() {
+    }
+
+    public AtlasEntityStream(AtlasEntity entity) {
+        this(new AtlasEntitiesWithExtInfo(entity));
+    }
+
+    public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo) {
+        this(new AtlasEntitiesWithExtInfo(entityWithExtInfo));
+    }
+
+    public AtlasEntityStream(AtlasEntitiesWithExtInfo entitiesWithExtInfo) {
+        this.entitiesWithExtInfo = entitiesWithExtInfo;
+        this.iterator            = this.entitiesWithExtInfo.getEntities().iterator();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return iterator.hasNext();
+    }
+
+    @Override
+    public AtlasEntity next() {
+        return iterator.hasNext() ? iterator.next() : null;
+    }
+
+    @Override
+    public void reset() {
+        this.iterator = entitiesWithExtInfo.getEntities().iterator();
+    }
+
+    @Override
+    public AtlasEntity getByGuid(String guid) {
+        return entitiesWithExtInfo.getEntity(guid);
+    }
+
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer("AtlasEntityStream{");
+
+        sb.append("entitiesWithExtInfo=").append(entitiesWithExtInfo);
+        sb.append(", iterator=").append(iterator);
+        sb.append('}');
+
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
index b17cf90..99f074b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
@@ -217,7 +217,7 @@ public class AtlasGraphUtilsV1 {
 
         if (entityVertex == null) {
             throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, entityType.getTypeName(),
-                                         uniqAttributes.keySet().toString(), uniqAttributes.values().toString());
+                                         uniqAttributes.toString());
         }
 
         return entityVertex;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AttributeMutationContext.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AttributeMutationContext.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AttributeMutationContext.java
new file mode 100644
index 0000000..b6d82dd
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AttributeMutationContext.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v1;
+
+
+import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
+import org.apache.atlas.model.typedef.AtlasStructDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasType;
+
+
+import java.util.Objects;
+
+public class AttributeMutationContext {
+    private EntityOperation op;
+    /**
+     * Atlas Attribute
+     */
+
+    private AtlasAttribute attribute;
+
+    /**
+     * Overriding type for which elements are being mapped
+     */
+    private AtlasType currentElementType;
+
+    /**
+     * Current attribute value/entity/Struct instance
+     */
+    private Object value;
+
+    private String vertexProperty;
+
+    /**
+     *
+     * The vertex which corresponds to the entity/struct for which we are mapping a complex attributes like struct, traits
+     */
+    AtlasVertex referringVertex;
+
+    /**
+     * The current edge(in case of updates) from the parent entity/struct to the complex attribute like struct, trait
+     */
+    AtlasEdge existingEdge;
+
+    public AttributeMutationContext(EntityOperation op, AtlasVertex referringVertex, AtlasAttribute attribute, Object value) {
+        this(op, referringVertex, attribute, value, attribute.getVertexPropertyName(), null, null);
+    }
+
+    public AttributeMutationContext(EntityOperation op, AtlasVertex referringVertex, AtlasAttribute attribute, Object value,
+                                    String vertexProperty, AtlasType currentElementType, AtlasEdge currentEdge) {
+        this.op                 = op;
+        this.referringVertex    = referringVertex;
+        this.attribute          = attribute;
+        this.value              = value;
+        this.vertexProperty     = vertexProperty;
+        this.currentElementType = currentElementType;
+        this.existingEdge       = currentEdge;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(op, referringVertex, attribute, value, vertexProperty, currentElementType, existingEdge);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) {
+            return false;
+        } else if (obj == this) {
+            return true;
+        } else if (obj.getClass() != getClass()) {
+            return false;
+        } else {
+            AttributeMutationContext rhs = (AttributeMutationContext) obj;
+            return Objects.equals(op, rhs.op)
+                    && Objects.equals(referringVertex, rhs.referringVertex)
+                    && Objects.equals(attribute, rhs.attribute)
+                    && Objects.equals(value, rhs.value)
+                    && Objects.equals(vertexProperty, rhs.vertexProperty)
+                    && Objects.equals(currentElementType, rhs.currentElementType)
+                    && Objects.equals(existingEdge, rhs.existingEdge);
+        }
+    }
+
+
+    public AtlasStructType getParentType() {
+        return attribute.getDefinedInType();
+    }
+
+    public AtlasStructDef getStructDef() {
+        return attribute.getDefinedInDef();
+    }
+
+    public AtlasAttributeDef getAttributeDef() {
+        return attribute.getAttributeDef();
+    }
+
+    public AtlasType getAttrType() {
+        return currentElementType == null ? attribute.getAttributeType() : currentElementType;
+    }
+
+    public AtlasType getCurrentElementType() {
+        return currentElementType;
+    }
+
+    public Object getValue() {
+        return value;
+    }
+
+    public String getVertexProperty() { return vertexProperty; }
+
+    public AtlasVertex getReferringVertex() { return referringVertex; }
+
+    public AtlasEdge getCurrentEdge() {
+        return existingEdge;
+    }
+
+    public void setElementType(final AtlasType attrType) {
+        this.currentElementType = attrType;
+    }
+
+    public AtlasAttribute getAttribute() {
+        return attribute;
+    }
+
+    public EntityOperation getOp() {
+        return op;
+    }
+
+    public void setExistingEdge(AtlasEdge existingEdge) { this.existingEdge = existingEdge; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce20d6f5/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
index 14013fb..ff13ea5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
@@ -98,7 +98,7 @@ public abstract class DeleteHandlerV1 {
             // Record all deletion candidate GUIDs in RequestContext
             // and gather deletion candidate vertices.
             for (GraphHelper.VertexInfo vertexInfo : compositeVertices) {
-                requestContext.recordEntityDelete(new AtlasObjectId(vertexInfo.getTypeName(), vertexInfo.getGuid()));
+                requestContext.recordEntityDelete(new AtlasObjectId(vertexInfo.getGuid(), vertexInfo.getTypeName()));
                 deletionCandidateVertices.add(vertexInfo.getVertex());
             }
         }
@@ -324,7 +324,7 @@ public abstract class DeleteHandlerV1 {
                     String propertyName = AtlasGraphUtilsV1.getQualifiedAttributePropertyKey(structType, attributeInfo.getName());
 
                     if (AtlasGraphUtilsV1.isReference(valueTypeCategory)) {
-                        List<Object> keys = ArrayVertexMapper.getArrayElementsProperty(keyType, instanceVertex, propertyName);
+                        List<Object> keys = EntityGraphMapper.getArrayElementsProperty(keyType, instanceVertex, propertyName);
                         if (keys != null) {
                             for (Object key : keys) {
                                 String mapEdgeLabel = GraphHelper.getQualifiedNameForMapKey(edgeLabel, (String) key);
@@ -513,7 +513,7 @@ public abstract class DeleteHandlerV1 {
             GraphHelper.setProperty(outVertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY,
                 requestContext.getRequestTime());
             GraphHelper.setProperty(outVertex, Constants.MODIFIED_BY_KEY, requestContext.getUser());
-            requestContext.recordEntityUpdate(new AtlasObjectId(typeName, outId));
+            requestContext.recordEntityUpdate(new AtlasObjectId(outId, typeName));
         }
     }