You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2015/12/15 11:20:48 UTC

[2/2] incubator-atlas git commit: ATLAS-342 Atlas is sending an ENTITY_CREATE event to the ATLAS_ENTITIES topic even if the entity exists already (shwethags)

ATLAS-342 Atlas is sending an ENTITY_CREATE event to the ATLAS_ENTITIES topic even if the entity exists already (shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/8cbb2c13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/8cbb2c13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/8cbb2c13

Branch: refs/heads/master
Commit: 8cbb2c13bbe71957daf0de9222daaf4dd01199d3
Parents: dce31ab
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Tue Dec 15 15:50:33 2015 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Tue Dec 15 15:50:33 2015 +0530

----------------------------------------------------------------------
 release-log.txt                                 |   1 +
 .../atlas/repository/MetadataRepository.java    |  11 +-
 .../graph/GraphBackedMetadataRepository.java    |  13 ++-
 .../atlas/repository/graph/GraphHelper.java     |   8 +-
 .../graph/TypedInstanceToGraphMapper.java       | 100 +++++++++++--------
 .../atlas/services/DefaultMetadataService.java  |  80 ++++++++-------
 .../apache/atlas/BaseHiveRepositoryTest.java    |   7 +-
 .../GraphBackedMetadataRepositoryTest.java      |  27 +++--
 .../graph/GraphRepoMapperScaleTest.java         |   2 +-
 .../service/DefaultMetadataServiceTest.java     |   9 +-
 .../apache/atlas/services/MetadataService.java  |  17 ++--
 .../atlas/typesystem/types/TypeUtils.java       |  10 +-
 .../atlas/web/resources/EntityResource.java     |   8 +-
 .../notification/EntityNotificationIT.java      |  58 +----------
 .../atlas/web/resources/BaseResourceIT.java     |  54 +++++++++-
 .../web/resources/EntityJerseyResourceIT.java   |  52 +++++++++-
 16 files changed, 268 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 1ad4977..ce3337d 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -14,6 +14,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
 ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
 
 ALL CHANGES:
+ATLAS-342 Atlas is sending an ENTITY_CREATE event to the ATLAS_ENTITIES topic even if the entity exists already (shwethags)
 ATLAS-386 Handle hive rename Table (shwethags)
 ATLAS-374 Doc: Create a wiki for documenting fault tolerance and HA options for Atlas data (yhemanth via sumasai)
 ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhemanth via sumasai)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java
index 2091e71..f66a4e5 100755
--- a/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java
@@ -19,12 +19,13 @@
 package org.apache.atlas.repository;
 
 import org.apache.atlas.AtlasException;
-import org.apache.atlas.typesystem.exception.EntityExistsException;
-import org.apache.atlas.typesystem.exception.EntityNotFoundException;
 import org.apache.atlas.typesystem.ITypedReferenceableInstance;
 import org.apache.atlas.typesystem.ITypedStruct;
+import org.apache.atlas.typesystem.exception.EntityExistsException;
+import org.apache.atlas.typesystem.exception.EntityNotFoundException;
 import org.apache.atlas.typesystem.types.AttributeInfo;
 import org.apache.atlas.typesystem.types.IDataType;
+import org.apache.atlas.typesystem.types.TypeUtils;
 
 import java.util.List;
 
@@ -82,7 +83,7 @@ public interface MetadataRepository {
      * @throws RepositoryException
      * @throws EntityExistsException
      */
-    String[] createEntities(ITypedReferenceableInstance... entities) throws RepositoryException, EntityExistsException;
+    List<String> createEntities(ITypedReferenceableInstance... entities) throws RepositoryException, EntityExistsException;
 
     /**
      * Fetch the complete definition of an entity given its GUID.
@@ -143,13 +144,13 @@ public interface MetadataRepository {
      * Adds/Updates the property to the entity that corresponds to the GUID
      * Supports only primitive attribute/Class Id updations.
      */
-    void updatePartial(ITypedReferenceableInstance entity) throws RepositoryException;
+    TypeUtils.Pair<List<String>, List<String>> updatePartial(ITypedReferenceableInstance entity) throws RepositoryException;
 
     /**
      * Adds the property to the entity that corresponds to the GUID
      * @param entitiesToBeUpdated The entities to be updated
      */
-    String[] updateEntities(ITypedReferenceableInstance... entitiesToBeUpdated) throws RepositoryException;
+    TypeUtils.Pair<List<String>, List<String>> updateEntities(ITypedReferenceableInstance... entitiesToBeUpdated) throws RepositoryException;
 
     /**
      * Returns the entity for the given type and qualified name

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
index fe1e576..dd64124 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
@@ -37,6 +37,7 @@ import org.apache.atlas.typesystem.types.AttributeInfo;
 import org.apache.atlas.typesystem.types.ClassType;
 import org.apache.atlas.typesystem.types.IDataType;
 import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.typesystem.types.TypeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -114,12 +115,14 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
 
     @Override
     @GraphTransaction
-    public String[] createEntities(ITypedReferenceableInstance... entities) throws RepositoryException,
+    public List<String> createEntities(ITypedReferenceableInstance... entities) throws RepositoryException,
         EntityExistsException {
         LOG.info("adding entities={}", entities);
         try {
             TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper);
-            return instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.CREATE, entities);
+            TypeUtils.Pair<List<String>, List<String>> idPair =
+                    instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.CREATE, entities);
+            return idPair.left;
         } catch (EntityExistsException e) {
             throw e;
         } catch (AtlasException e) {
@@ -279,7 +282,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
 
     @Override
     @GraphTransaction
-    public String[] updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException {
+    public TypeUtils.Pair<List<String>, List<String>> updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException {
         LOG.info("updating entity {}", entitiesUpdated);
         try {
             TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper);
@@ -292,11 +295,11 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
 
     @Override
     @GraphTransaction
-    public void updatePartial(ITypedReferenceableInstance entity) throws RepositoryException {
+    public TypeUtils.Pair<List<String>, List<String>> updatePartial(ITypedReferenceableInstance entity) throws RepositoryException {
         LOG.info("updating entity {}", entity);
         try {
             TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper);
-            instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_PARTIAL, entity);
+            return instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_PARTIAL, entity);
         } catch (AtlasException e) {
             throw new RepositoryException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index 9ac2819..6b2d5d1 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -71,13 +71,13 @@ public final class GraphHelper {
         return INSTANCE;
     }
 
-    public Vertex createVertexWithIdentity(ITypedReferenceableInstance typedInstance,
-                                                  Set<String> superTypeNames) {
+    public Vertex createVertexWithIdentity(ITypedReferenceableInstance typedInstance, Set<String> superTypeNames) {
+        final String guid = UUID.randomUUID().toString();
+
         final Vertex vertexWithIdentity = createVertexWithoutIdentity(typedInstance.getTypeName(),
-                typedInstance.getId(), superTypeNames);
+                new Id(guid, 0 , typedInstance.getTypeName()), superTypeNames);
 
         // add identity
-        final String guid = UUID.randomUUID().toString();
         setProperty(vertexWithIdentity, Constants.GUID_PROPERTY_KEY, guid);
 
         // add version information

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
index 7ef5c50..996f31b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
@@ -41,8 +41,8 @@ import org.apache.atlas.typesystem.types.Multiplicity;
 import org.apache.atlas.typesystem.types.ObjectGraphWalker;
 import org.apache.atlas.typesystem.types.TraitType;
 import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.typesystem.types.TypeUtils;
 import org.apache.atlas.utils.MD5Utils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,36 +79,40 @@ public final class TypedInstanceToGraphMapper {
         this.graphToTypedInstanceMapper = graphToTypedInstanceMapper;
     }
 
-    String[] mapTypedInstanceToGraph(Operation operation, ITypedReferenceableInstance... typedInstances)
+    TypeUtils.Pair<List<String>, List<String>> mapTypedInstanceToGraph(Operation operation, ITypedReferenceableInstance... typedInstances)
         throws AtlasException {
-        List<String> guids = new ArrayList<>();
+
+        List<String> createdIds = new ArrayList<>();
+        List<String> updatedIds = new ArrayList<>();
+
         for (ITypedReferenceableInstance typedInstance : typedInstances) {
             Collection<IReferenceableInstance> newInstances = walkClassInstances(typedInstance);
-            Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> instancesPair =
+            TypeUtils.Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> instancesPair =
                     createVerticesAndDiscoverInstances(newInstances);
 
             switch (operation) {
                 case CREATE:
-                    addOrUpdateAttributesAndTraits(operation, instancesPair.getLeft());
-                    addFullTextProperty(instancesPair.getLeft());
+                    List<String> ids = addOrUpdateAttributesAndTraits(operation, instancesPair.left);
+                    createdIds.addAll(ids);
+                    addFullTextProperty(instancesPair.left);
                     break;
 
                 case UPDATE_FULL:
                 case UPDATE_PARTIAL:
-                    List<ITypedReferenceableInstance> instancesForUpdate = instancesPair.getLeft();
-                    instancesForUpdate.addAll(instancesPair.getRight());
-                    addOrUpdateAttributesAndTraits(operation, instancesForUpdate);
-                    addFullTextProperty(instancesForUpdate);
+                    ids = addOrUpdateAttributesAndTraits(Operation.CREATE, instancesPair.left);
+                    createdIds.addAll(ids);
+                    ids = addOrUpdateAttributesAndTraits(operation, instancesPair.right);
+                    updatedIds.addAll(ids);
+
+                    addFullTextProperty(instancesPair.left);
+                    addFullTextProperty(instancesPair.right);
                     break;
 
                 case DELETE:
                     throw new UnsupportedOperationException("Not handled - " + operation);
             }
-
-            //Return guid for
-            addToGuids(typedInstance, guids);
         }
-        return guids.toArray(new String[guids.size()]);
+        return TypeUtils.Pair.of(createdIds, updatedIds);
     }
 
     private Collection<IReferenceableInstance> walkClassInstances(ITypedReferenceableInstance typedInstance)
@@ -126,18 +130,21 @@ public final class TypedInstanceToGraphMapper {
         return entityProcessor.getInstances();
     }
 
-    private void addOrUpdateAttributesAndTraits(Operation operation, List<ITypedReferenceableInstance> instances) throws AtlasException {
+    private List<String> addOrUpdateAttributesAndTraits(Operation operation, List<ITypedReferenceableInstance> instances) throws AtlasException {
+        List<String> guids = new ArrayList<>();
         for (ITypedReferenceableInstance instance : instances) {
             try {
                 //new vertex, set all the properties
-                addOrUpdateAttributesAndTraits(operation, instance);
+                String guid = addOrUpdateAttributesAndTraits(operation, instance);
+                guids.add(guid);
             } catch (SchemaViolationException e) {
                 throw new EntityExistsException(instance, e);
             }
         }
+        return guids;
     }
 
-    private void addOrUpdateAttributesAndTraits(Operation operation, ITypedReferenceableInstance typedInstance)
+    private String addOrUpdateAttributesAndTraits(Operation operation, ITypedReferenceableInstance typedInstance)
             throws AtlasException {
         LOG.debug("Adding/Updating typed instance {}", typedInstance.getTypeName());
 
@@ -158,6 +165,8 @@ public final class TypedInstanceToGraphMapper {
             //TODO - Handle Trait updates
             addTraits(typedInstance, instanceVertex, classType);
         }
+
+        return getId(typedInstance)._getId();
     }
 
     private void mapInstanceToVertex(ITypedInstance typedInstance, Vertex instanceVertex,
@@ -215,14 +224,16 @@ public final class TypedInstanceToGraphMapper {
         }
     }
 
-    private Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> createVerticesAndDiscoverInstances(
+    private TypeUtils.Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> createVerticesAndDiscoverInstances(
             Collection<IReferenceableInstance> instances) throws AtlasException {
 
         List<ITypedReferenceableInstance> instancesToCreate = new ArrayList<>();
         List<ITypedReferenceableInstance> instancesToUpdate = new ArrayList<>();
 
         for (IReferenceableInstance instance : instances) {
+            ITypedReferenceableInstance newInstance;
             Id id = instance.getId();
+
             if (!idToVertexMap.containsKey(id)) {
                 Vertex instanceVertex;
                 if (id.isAssigned()) {  // has a GUID
@@ -231,7 +242,9 @@ public final class TypedInstanceToGraphMapper {
                         throw new IllegalStateException(
                                 String.format("%s is not of type ITypedReferenceableInstance", instance));
                     }
-                    instancesToUpdate.add((ITypedReferenceableInstance) instance);
+                    newInstance = (ITypedReferenceableInstance) instance;
+                    instancesToUpdate.add(newInstance);
+
                 } else {
                     //Check if there is already an instance with the same unique attribute value
                     ClassType classType = typeSystem.getDataType(ClassType.class, instance.getTypeName());
@@ -239,31 +252,28 @@ public final class TypedInstanceToGraphMapper {
 
                     //no entity with the given unique attribute, create new
                     if (instanceVertex == null) {
-                        ITypedReferenceableInstance newInstance = classType.convert(instance, Multiplicity.REQUIRED);
+                        newInstance = classType.convert(instance, Multiplicity.REQUIRED);
                         instanceVertex = graphHelper.createVertexWithIdentity(newInstance, classType.getAllSuperTypeNames());
                         instancesToCreate.add(newInstance);
 
                         //Map only unique attributes for cases of circular references
                         mapInstanceToVertex(newInstance, instanceVertex, classType.fieldMapping().fields, true, Operation.CREATE);
+
                     } else {
                         if (!(instance instanceof ReferenceableInstance)) {
                             throw new IllegalStateException(
                                     String.format("%s is not of type ITypedReferenceableInstance", instance));
                         }
-                        instancesToUpdate.add((ITypedReferenceableInstance) instance);
+                        newInstance = (ITypedReferenceableInstance) instance;
+                        instancesToUpdate.add(newInstance);
                     }
                 }
 
+                //Set the id in the new instance
                 idToVertexMap.put(id, instanceVertex);
             }
         }
-        return Pair.of(instancesToCreate, instancesToUpdate);
-    }
-
-    private void addToGuids(ITypedReferenceableInstance typedInstance, List<String> guids) {
-        Vertex instanceVertex = idToVertexMap.get(typedInstance.getId());
-        String guid = instanceVertex.getProperty(Constants.GUID_PROPERTY_KEY);
-        guids.add(guid);
+        return TypeUtils.Pair.of(instancesToCreate, instancesToUpdate);
     }
 
     private void addFullTextProperty(List<ITypedReferenceableInstance> instances) throws AtlasException {
@@ -275,7 +285,8 @@ public final class TypedInstanceToGraphMapper {
         }
     }
 
-    private void addTraits(ITypedReferenceableInstance typedInstance, Vertex instanceVertex, ClassType classType) throws AtlasException {
+    private void addTraits(ITypedReferenceableInstance typedInstance, Vertex instanceVertex, ClassType classType)
+            throws AtlasException {
         for (String traitName : typedInstance.getTraits()) {
             LOG.debug("mapping trait {}", traitName);
             GraphHelper.addProperty(instanceVertex, Constants.TRAIT_NAMES_PROPERTY_KEY, traitName);
@@ -288,7 +299,8 @@ public final class TypedInstanceToGraphMapper {
 
     /******************************************** STRUCT **************************************************/
 
-    private Pair<Vertex, Edge> updateStructVertex(ITypedStruct structInstance, Edge relEdge, Operation operation) throws AtlasException {
+    private TypeUtils.Pair<Vertex, Edge> updateStructVertex(ITypedStruct structInstance, Edge relEdge,
+                                                            Operation operation) throws AtlasException {
         //Already existing vertex. Update
         Vertex structInstanceVertex = relEdge.getVertex(Direction.IN);
 
@@ -303,10 +315,11 @@ public final class TypedInstanceToGraphMapper {
             mapInstanceToVertex(structInstance, structInstanceVertex, structInstance.fieldMapping().fields, false, operation);
             GraphHelper.setProperty(structInstanceVertex, SIGNATURE_HASH_PROPERTY_KEY, String.valueOf(newSignature));
         }
-        return Pair.of(structInstanceVertex, relEdge);
+        return TypeUtils.Pair.of(structInstanceVertex, relEdge);
     }
 
-    private Pair<Vertex, Edge> addStructVertex(ITypedStruct structInstance, Vertex instanceVertex, AttributeInfo attributeInfo, String edgeLabel) throws AtlasException {
+    private TypeUtils.Pair<Vertex, Edge> addStructVertex(ITypedStruct structInstance, Vertex instanceVertex,
+                                                         AttributeInfo attributeInfo, String edgeLabel) throws AtlasException {
         // add a new vertex for the struct or trait instance
         Vertex structInstanceVertex = graphHelper.createVertexWithoutIdentity(structInstance.getTypeName(), null,
                 Collections.<String>emptySet()); // no super types for struct type
@@ -317,7 +330,7 @@ public final class TypedInstanceToGraphMapper {
         // add an edge to the newly created vertex from the parent
         Edge relEdge = graphHelper.addEdge(instanceVertex, structInstanceVertex, edgeLabel);
 
-        return Pair.of(structInstanceVertex, relEdge);
+        return TypeUtils.Pair.of(structInstanceVertex, relEdge);
     }
 
     /******************************************** ARRAY **************************************************/
@@ -443,7 +456,7 @@ public final class TypedInstanceToGraphMapper {
     private String addOrUpdateStruct(Vertex instanceVertex, AttributeInfo attributeInfo, IDataType elementType,
                                      ITypedStruct structAttr, String curVal,
                                      String edgeLabel, Operation operation) throws AtlasException {
-        Pair<Vertex, Edge> vertexEdgePair = null;
+        TypeUtils.Pair<Vertex, Edge> vertexEdgePair = null;
         if (curVal != null && structAttr == null) {
             //remove edge
             removeUnusedReference(curVal, attributeInfo, elementType);
@@ -456,7 +469,7 @@ public final class TypedInstanceToGraphMapper {
             vertexEdgePair = addStructVertex(structAttr, instanceVertex, attributeInfo, edgeLabel);
         }
 
-        return (vertexEdgePair != null) ? vertexEdgePair.getRight().getId().toString() : null;
+        return (vertexEdgePair != null) ? vertexEdgePair.right.getId().toString() : null;
     }
 
     private String addOrUpdateClassVertex(Vertex instanceVertex, AttributeInfo attributeInfo, IDataType elementType,
@@ -468,27 +481,28 @@ public final class TypedInstanceToGraphMapper {
             throw new EntityNotFoundException("Could not find vertex for Class Reference " + newVal);
         }
 
-        Pair<Vertex, Edge> vertexEdgePair = null;
+        TypeUtils.Pair<Vertex, Edge> vertexEdgePair = null;
         if (curVal != null && newVal == null) {
             //remove edge
             removeUnusedReference(curVal, attributeInfo, elementType);
         } else if (curVal != null && newVal != null) {
             Edge edge = graphHelper.getOutGoingEdgeById(curVal);
             Id classRefId = getId(newVal);
-            vertexEdgePair = updateClassEdge(classRefId, newVal, instanceVertex, edge, toVertex, attributeInfo, elementType, edgeLabel, operation);
+            vertexEdgePair = updateClassEdge(classRefId, newVal, instanceVertex, edge, toVertex, attributeInfo,
+                    elementType, edgeLabel, operation);
         } else if (newVal != null){
             vertexEdgePair = addClassEdge(instanceVertex, toVertex, edgeLabel);
         }
 
-        return (vertexEdgePair != null) ? vertexEdgePair.getRight().getId().toString() : null;
+        return (vertexEdgePair != null) ? vertexEdgePair.right.getId().toString() : null;
     }
 
     /******************************************** CLASS **************************************************/
 
-    private Pair<Vertex, Edge> addClassEdge(Vertex instanceVertex, Vertex toVertex, String edgeLabel) throws AtlasException {
+    private TypeUtils.Pair<Vertex, Edge> addClassEdge(Vertex instanceVertex, Vertex toVertex, String edgeLabel) throws AtlasException {
             // add an edge to the class vertex from the instance
           Edge edge = graphHelper.addEdge(instanceVertex, toVertex, edgeLabel);
-          return Pair.of(toVertex, edge);
+          return TypeUtils.Pair.of(toVertex, edge);
     }
 
     private Vertex getClassVertex(ITypedReferenceableInstance typedReference) throws EntityNotFoundException {
@@ -521,11 +535,11 @@ public final class TypedInstanceToGraphMapper {
     }
 
 
-    private Pair<Vertex, Edge> updateClassEdge(Id id, final ITypedReferenceableInstance typedInstance,
+    private TypeUtils.Pair<Vertex, Edge> updateClassEdge(Id id, final ITypedReferenceableInstance typedInstance,
                                                Vertex instanceVertex, Edge edge, Vertex toVertex,
                                                AttributeInfo attributeInfo, IDataType dataType,
                                                String edgeLabel, Operation operation) throws AtlasException {
-        Pair<Vertex, Edge> result = Pair.of(toVertex, edge);
+        TypeUtils.Pair<Vertex, Edge> result = TypeUtils.Pair.of(toVertex, edge);
         Edge newEdge = edge;
         // Update edge if it exists
         Vertex invertex = edge.getVertex(Direction.IN);
@@ -535,7 +549,7 @@ public final class TypedInstanceToGraphMapper {
              // add an edge to the class vertex from the instance
             if(toVertex != null) {
                 newEdge = graphHelper.addEdge(instanceVertex, toVertex, edgeLabel);
-                result = Pair.of(toVertex, newEdge);
+                result = TypeUtils.Pair.of(toVertex, newEdge);
             }
             removeUnusedReference(edge.getId().toString(), attributeInfo, dataType);
         }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
index f605c26..b38face 100755
--- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
+++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
@@ -23,16 +23,12 @@ import com.google.common.collect.ImmutableList;
 import com.google.inject.Provider;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
-import org.apache.atlas.repository.RepositoryException;
-import org.apache.atlas.typesystem.exception.EntityNotFoundException;
-import org.apache.atlas.typesystem.exception.TypeNotFoundException;
-import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
-import org.apache.atlas.utils.ParamChecker;
 import org.apache.atlas.classification.InterfaceAudience;
 import org.apache.atlas.listener.EntityChangeListener;
 import org.apache.atlas.listener.TypesChangeListener;
 import org.apache.atlas.repository.IndexCreationException;
 import org.apache.atlas.repository.MetadataRepository;
+import org.apache.atlas.repository.RepositoryException;
 import org.apache.atlas.repository.typestore.ITypeStore;
 import org.apache.atlas.typesystem.IStruct;
 import org.apache.atlas.typesystem.ITypedReferenceableInstance;
@@ -40,9 +36,12 @@ import org.apache.atlas.typesystem.ITypedStruct;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
 import org.apache.atlas.typesystem.TypesDef;
+import org.apache.atlas.typesystem.exception.EntityNotFoundException;
+import org.apache.atlas.typesystem.exception.TypeNotFoundException;
 import org.apache.atlas.typesystem.json.InstanceSerialization;
 import org.apache.atlas.typesystem.json.TypesSerialization;
 import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
 import org.apache.atlas.typesystem.types.AttributeDefinition;
 import org.apache.atlas.typesystem.types.AttributeInfo;
 import org.apache.atlas.typesystem.types.ClassType;
@@ -54,25 +53,24 @@ import org.apache.atlas.typesystem.types.Multiplicity;
 import org.apache.atlas.typesystem.types.StructTypeDefinition;
 import org.apache.atlas.typesystem.types.TraitType;
 import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.typesystem.types.TypeUtils;
 import org.apache.atlas.typesystem.types.ValueConversionException;
 import org.apache.atlas.typesystem.types.utils.TypesUtil;
+import org.apache.atlas.utils.ParamChecker;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.actors.threadpool.Arrays;
 
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * Simple wrapper over TypeSystem and MetadataRepository services with hooks
@@ -279,17 +277,10 @@ public class DefaultMetadataService implements MetadataService {
 
         ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition);
 
-        final String[] guids = repository.createEntities(typedInstances);
-
-        Set<ITypedReferenceableInstance> entitites = new HashSet<>();
+        final List<String> guids = repository.createEntities(typedInstances);
 
-        for (String guid : guids) {
-            entitites.add(repository.getEntityDefinition(guid));
-        }
-
-        onEntitiesAdded(entitites);
-
-        return new JSONArray(Arrays.asList(guids)).toString();
+        onEntitiesAdded(guids);
+        return new JSONArray(guids).toString();
     }
 
     private ITypedReferenceableInstance[] deserializeClassInstances(String entityInstanceDefinition)
@@ -390,14 +381,20 @@ public class DefaultMetadataService implements MetadataService {
         ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition cannot be empty");
         ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition);
 
-        String[] guids = repository.updateEntities(typedInstances);
-        onEntitiesAdded(Arrays.asList(typedInstances));
+        TypeUtils.Pair<List<String>, List<String>> guids = repository.updateEntities(typedInstances);
+        return onEntitiesAddedUpdated(guids);
+    }
+
+    private String onEntitiesAddedUpdated(TypeUtils.Pair<List<String>, List<String>> guids) throws AtlasException {
+        onEntitiesAdded(guids.left);
+        onEntitiesUpdated(guids.right);
 
-        return new JSONArray(Arrays.asList(guids)).toString();
+        guids.left.addAll(guids.right);
+        return new JSONArray(guids.left).toString();
     }
 
     @Override
-    public void updateEntityAttributeByGuid(final String guid, String attributeName, String value) throws AtlasException {
+    public String updateEntityAttributeByGuid(final String guid, String attributeName, String value) throws AtlasException {
         ParamChecker.notEmpty(guid, "guid cannot be null");
         ParamChecker.notEmpty(attributeName, "property cannot be null");
         ParamChecker.notEmpty(value, "property value cannot be null");
@@ -426,10 +423,8 @@ public class DefaultMetadataService implements MetadataService {
         }
 
         ((ReferenceableInstance)newInstance).replaceWithNewId(new Id(guid, 0, newInstance.getTypeName()));
-        repository.updatePartial(newInstance);
-        onEntitiesUpdated(new ArrayList<ITypedReferenceableInstance>() {{
-            add(repository.getEntityDefinition(guid));
-        }});
+        TypeUtils.Pair<List<String>, List<String>> guids = repository.updatePartial(newInstance);
+        return onEntitiesAddedUpdated(guids);
     }
 
     private ITypedReferenceableInstance validateEntityExists(String guid)
@@ -442,7 +437,7 @@ public class DefaultMetadataService implements MetadataService {
     }
 
     @Override
-    public void updateEntityPartialByGuid(final String guid, Referenceable newEntity) throws AtlasException {
+    public String updateEntityPartialByGuid(final String guid, Referenceable newEntity) throws AtlasException {
         ParamChecker.notEmpty(guid, "guid cannot be null");
         ParamChecker.notNull(newEntity, "updatedEntity cannot be null");
         ITypedReferenceableInstance existInstance = validateEntityExists(guid);
@@ -450,10 +445,8 @@ public class DefaultMetadataService implements MetadataService {
         ITypedReferenceableInstance newInstance = convertToTypedInstance(newEntity, existInstance.getTypeName());
         ((ReferenceableInstance)newInstance).replaceWithNewId(new Id(guid, 0, newInstance.getTypeName()));
 
-        repository.updatePartial(newInstance);
-        onEntitiesUpdated(new ArrayList<ITypedReferenceableInstance>() {{
-            add(repository.getEntityDefinition(guid));
-        }});
+        TypeUtils.Pair<List<String>, List<String>> guids = repository.updatePartial(newInstance);
+        return onEntitiesAddedUpdated(guids);
     }
 
     private ITypedReferenceableInstance convertToTypedInstance(Referenceable updatedEntity, String typeName) throws AtlasException {
@@ -511,13 +504,8 @@ public class DefaultMetadataService implements MetadataService {
         final ITypedReferenceableInstance newInstance = convertToTypedInstance(updatedEntity, typeName);
         ((ReferenceableInstance)newInstance).replaceWithNewId(oldInstance.getId());
 
-        repository.updatePartial(newInstance);
-
-        onEntitiesUpdated(new ArrayList<ITypedReferenceableInstance>() {{
-            add(newInstance);
-        }});
-
-        return newInstance.getId()._getId();
+        TypeUtils.Pair<List<String>, List<String>> guids = repository.updatePartial(newInstance);
+        return onEntitiesAddedUpdated(guids);
     }
 
     private void validateTypeExists(String entityType) throws AtlasException {
@@ -633,12 +621,22 @@ public class DefaultMetadataService implements MetadataService {
         }
     }
 
-    private void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
+    private void onEntitiesAdded(List<String> guids) throws AtlasException {
+        List<ITypedReferenceableInstance> entities = loadEntities(guids);
         for (EntityChangeListener listener : entityChangeListeners) {
             listener.onEntitiesAdded(entities);
         }
     }
 
+    private List<ITypedReferenceableInstance> loadEntities(List<String> guids) throws EntityNotFoundException,
+            RepositoryException {
+        List<ITypedReferenceableInstance> entities = new ArrayList<>();
+        for (String guid : guids) {
+            entities.add(repository.getEntityDefinition(guid));
+        }
+        return entities;
+    }
+
     private void onTypesUpdated(Map<String, IDataType> typesUpdated) throws AtlasException {
         Map<TypesChangeListener, Throwable> caughtExceptions = new HashMap<>();
         for (Provider<TypesChangeListener> indexerProvider : typeChangeListeners) {
@@ -656,8 +654,8 @@ public class DefaultMetadataService implements MetadataService {
         }
     }
 
-    private void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities)
-        throws AtlasException {
+    private void onEntitiesUpdated(List<String> guids) throws AtlasException {
+        List<ITypedReferenceableInstance> entities = loadEntities(guids);
         for (EntityChangeListener listener : entityChangeListeners) {
             listener.onEntitiesUpdated(entities);
         }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
index 1075d85..a49967f 100644
--- a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
+++ b/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
@@ -346,13 +346,10 @@ public class BaseHiveRepositoryTest {
         return createInstance(referenceable, clsType);
     }
     private Id createInstance(Referenceable referenceable, ClassType clsType) throws Exception {
-//        String entityJSON = InstanceSerialization.toJson(referenceable, true);
-
-
         ITypedReferenceableInstance typedInstance = clsType.convert(referenceable, Multiplicity.REQUIRED);
-        String guid = repository.createEntities(typedInstance)[0];
+        List<String> guids = repository.createEntities(typedInstance);
 
         // return the reference to created instance with guid
-        return new Id(guid, 0, referenceable.getTypeName());
+        return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
index 78af443..22ff1d6 100755
--- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
@@ -118,10 +118,10 @@ public class GraphBackedMetadataRepositoryTest {
         ClassType deptType = typeSystem.getDataType(ClassType.class, "Department");
         ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED);
 
-        String[] guids = repositoryService.createEntities(hrDept2);
+        List<String> guids = repositoryService.createEntities(hrDept2);
         Assert.assertNotNull(guids);
-        Assert.assertEquals(guids.length, 1);
-        guid = guids[0];
+        Assert.assertEquals(guids.size(), 5);
+        guid = guids.get(4);
         Assert.assertNotNull(guid);
     }
 
@@ -173,14 +173,12 @@ public class GraphBackedMetadataRepositoryTest {
         ITypedReferenceableInstance db = dbType.convert(databaseInstance, Multiplicity.REQUIRED);
         System.out.println("db = " + db);
 
-        String dbGUID = repositoryService.createEntities(db)[0];
-        System.out.println("added db = " + dbGUID);
-
-        Referenceable dbInstance = new Referenceable(dbGUID, TestUtils.DATABASE_TYPE, databaseInstance.getValuesMap());
-
-        ITypedReferenceableInstance table = createHiveTableInstance(dbInstance);
-        String tableGUID = repositoryService.createEntities(table)[0];
-        System.out.println("added table = " + tableGUID);
+        //Reuse the same database instance without id, with the same unique attribute
+        ITypedReferenceableInstance table = createHiveTableInstance(databaseInstance);
+        List<String> guids = repositoryService.createEntities(db, table);
+        Assert.assertEquals(guids.size(), 7);   //1 db + 5 columns + 1 table. Shouldn't create db again
+        System.out.println("added db = " + guids.get(0));
+        System.out.println("added table = " + guids.get(6));
     }
 
     @Test(dependsOnMethods = "testCreateEntity")
@@ -600,9 +598,10 @@ public class GraphBackedMetadataRepositoryTest {
         ClassType deptType = typeSystem.getDataType(ClassType.class, "Department");
         ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED);
 
-        String[] guids = repositoryService.createEntities(hrDept2);
+        List<String> guids = repositoryService.createEntities(hrDept2);
         Assert.assertNotNull(guids);
-        Assert.assertEquals(guids.length, 1);
-        Assert.assertNotNull(guids[0]);
+        Assert.assertEquals(guids.size(), 2);
+        Assert.assertNotNull(guids.get(0));
+        Assert.assertNotNull(guids.get(1));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java
index c25ccf8..2d01bbe 100755
--- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java
@@ -101,7 +101,7 @@ public class GraphRepoMapperScaleTest {
         ClassType dbType = typeSystem.getDataType(ClassType.class, TestUtils.DATABASE_TYPE);
         ITypedReferenceableInstance db = dbType.convert(databaseInstance, Multiplicity.REQUIRED);
 
-        dbGUID = repositoryService.createEntities(db)[0];
+        dbGUID = repositoryService.createEntities(db).get(0);
 
         Referenceable dbInstance = new Referenceable(dbGUID, TestUtils.DATABASE_TYPE, databaseInstance.getValuesMap());
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
index 0307fd4..8a80d8c 100644
--- a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
 @Guice(modules = RepositoryMetadataModule.class)
@@ -113,7 +114,11 @@ public class DefaultMetadataServiceTest {
         JSONArray entitiesJson = new JSONArray();
         entitiesJson.put(entityjson);
         String response = metadataService.createEntities(entitiesJson.toString());
-        return new JSONArray(response).getString(0);
+        JSONArray guids = new JSONArray(response);
+        if (guids != null && guids.length() > 0) {
+            return guids.getString(0);
+        }
+        return null;
     }
 
     private String updateInstance(Referenceable entity) throws Exception {
@@ -154,7 +159,7 @@ public class DefaultMetadataServiceTest {
 
         //using the same name should succeed, but not create another entity
         String newId = createInstance(entity);
-        Assert.assertEquals(newId, id);
+        assertNull(newId);
 
         //Same entity, but different qualified name should succeed
         entity.set("name", TestUtils.randomString());

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/server-api/src/main/java/org/apache/atlas/services/MetadataService.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/services/MetadataService.java b/server-api/src/main/java/org/apache/atlas/services/MetadataService.java
index 0cfed2e..a91c0a0 100644
--- a/server-api/src/main/java/org/apache/atlas/services/MetadataService.java
+++ b/server-api/src/main/java/org/apache/atlas/services/MetadataService.java
@@ -74,7 +74,7 @@ public interface MetadataService {
      * Creates an entity, instance of the type.
      *
      * @param entityDefinition definition
-     * @return guid
+     * @return json array of guids of entities created
      */
     String createEntities(String entityDefinition) throws AtlasException;
 
@@ -107,25 +107,28 @@ public interface MetadataService {
     /**
      * Adds the property to the given entity id(guid).
      * Currently supports updates only on PRIMITIVE, CLASS attribute types
-     *
-     * @param guid entity id
+     *  @param guid entity id
      * @param attribute property name
      * @param value    property value
+     * @return json array of guids of entities created/updated
      */
-    void updateEntityAttributeByGuid(String guid, String attribute, String value) throws AtlasException;
+    String updateEntityAttributeByGuid(String guid, String attribute, String value) throws AtlasException;
 
     /**
      * Supports Partial updates of an entity. Users can update a subset of attributes for an entity identified by its guid
      * Note however that it cannot be used to set attribute values to null or delete attrbute values
-     *
+     * @param guid entity id
+     * @param entity
+     * @return json array of guids of entities created/updated
+     * @throws AtlasException
      */
-    void updateEntityPartialByGuid(String guid, Referenceable entity) throws AtlasException;
+    String updateEntityPartialByGuid(String guid, Referenceable entity) throws AtlasException;
 
     /**
      * Batch API - Adds/Updates the given entity id(guid).
      *
      * @param entityJson entity json
-     * @return List of guids which were updated and ones which were newly created as part of the updated entity
+     * @return json array of guids of entities created/updated
      */
     String updateEntities(String entityJson) throws AtlasException;
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java
index 9d2480b..f5c2ce9 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java
@@ -70,14 +70,18 @@ public class TypeUtils {
         return b;
     }
 
-    protected static class Pair<L, R> {
-        protected L left;
-        protected R right;
+    public static class Pair<L, R> {
+        public L left;
+        public R right;
 
         public Pair(L left, R right) {
             this.left = left;
             this.right = right;
         }
+
+        public static <L, R> Pair<L, R> of(L left, R right) {
+            return new Pair<>(left, right);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
index 2ee0027..f5ab4d8 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
@@ -118,8 +118,12 @@ public class EntityResource {
 
             JSONObject response = new JSONObject();
             response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
-            response.put(AtlasClient.GUID, new JSONArray(guids));
-            response.put(AtlasClient.DEFINITION, new JSONObject(metadataService.getEntityDefinition(new JSONArray(guids).getString(0))));
+            JSONArray guidArray = new JSONArray(guids);
+            response.put(AtlasClient.GUID, guidArray);
+            if (guidArray.length() > 0) {
+                response.put(AtlasClient.DEFINITION,
+                        new JSONObject(metadataService.getEntityDefinition(new JSONArray(guids).getString(0))));
+            }
 
             return Response.created(locationURI).entity(response).build();
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
index 4d2cce7..b2501ec 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
@@ -58,7 +58,6 @@ public class EntityNotificationIT extends BaseResourceIT {
 
     private static final String ENTITIES = "api/atlas/entities";
     private static final String TRAITS = "traits";
-    private static final int MAX_WAIT_TIME = 10000;
     private final String DATABASE_NAME = "db" + randomString();
     private final String TABLE_NAME = "table" + randomString();
     @Inject
@@ -98,7 +97,7 @@ public class EntityNotificationIT extends BaseResourceIT {
 
         final String guid = tableId._getId();
 
-        waitForNotification(MAX_WAIT_TIME);
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
 
         EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
 
@@ -120,7 +119,7 @@ public class EntityNotificationIT extends BaseResourceIT {
 
         serviceClient.updateEntityAttribute(guid, property, newValue);
 
-        waitForNotification(MAX_WAIT_TIME);
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
 
         EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
 
@@ -155,7 +154,7 @@ public class EntityNotificationIT extends BaseResourceIT {
         ClientResponse clientResponse = addTrait(guid, traitInstanceJSON);
         assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode());
 
-        waitForNotification(MAX_WAIT_TIME);
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
 
         EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
 
@@ -192,7 +191,7 @@ public class EntityNotificationIT extends BaseResourceIT {
         clientResponse = addTrait(guid, traitInstanceJSON);
         assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode());
 
-        waitForNotification(MAX_WAIT_TIME);
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
 
         entityNotification = notificationConsumer.getLastEntityNotification();
 
@@ -218,7 +217,7 @@ public class EntityNotificationIT extends BaseResourceIT {
         ClientResponse clientResponse = deleteTrait(guid, traitName);
         Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
 
-        waitForNotification(MAX_WAIT_TIME);
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
 
         EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
 
@@ -260,51 +259,4 @@ public class EntityNotificationIT extends BaseResourceIT {
         return resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
             .method(HttpMethod.DELETE, ClientResponse.class);
     }
-
-    private void waitForNotification(int maxWait) throws Exception {
-        waitFor(maxWait, new Predicate() {
-            @Override
-            public boolean evaluate() throws Exception {
-                return notificationConsumer.getLastEntityNotification() != null;
-            }
-        });
-    }
-
-
-    // ----- inner class : EntityNotificationConsumer --------------------------
-
-    private static class EntityNotificationConsumer implements Runnable {
-        private final NotificationConsumer<EntityNotification> consumerIterator;
-        private EntityNotification entityNotification = null;
-        private boolean run;
-
-        public EntityNotificationConsumer(NotificationConsumer<EntityNotification> consumerIterator) {
-            this.consumerIterator = consumerIterator;
-        }
-
-        @Override
-        public void run() {
-            while (run && consumerIterator.hasNext()) {
-                entityNotification = consumerIterator.next();
-            }
-        }
-
-        public void reset() {
-            entityNotification = null;
-        }
-
-        public void start() {
-            Thread thread = new Thread(this);
-            run = true;
-            thread.start();
-        }
-
-        public void stop() {
-            run = false;
-        }
-
-        public EntityNotification getLastEntityNotification() {
-            return entityNotification;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
index 361cece..aba191c 100755
--- a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
@@ -25,6 +25,8 @@ import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 import com.sun.jersey.api.client.config.DefaultClientConfig;
 import org.apache.atlas.*;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.entity.EntityNotification;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
 import org.apache.atlas.typesystem.TypesDef;
@@ -68,6 +70,7 @@ public abstract class BaseResourceIT {
     protected WebResource service;
     protected AtlasClient serviceClient;
     public static final Logger LOG = LoggerFactory.getLogger(BaseResourceIT.class);
+    protected static final int MAX_WAIT_TIME = 1000;
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -119,7 +122,10 @@ public abstract class BaseResourceIT {
         System.out.println("created instance for type " + typeName + ", guid: " + guids);
 
         // return the reference to created instance with guid
-        return new Id(guids.getString(0), 0, referenceable.getTypeName());
+        if (guids.length() > 0) {
+            return new Id(guids.getString(guids.length() - 1), 0, referenceable.getTypeName());
+        }
+        return null;
     }
 
     protected static final String DATABASE_TYPE = "hive_db";
@@ -285,4 +291,50 @@ public abstract class BaseResourceIT {
             throw new Exception("Waiting timed out after " + timeout + " msec");
         }
     }
+
+    // ----- inner class : EntityNotificationConsumer --------------------------
+
+    protected static class EntityNotificationConsumer implements Runnable {
+        private final NotificationConsumer<EntityNotification> consumerIterator;
+        private EntityNotification entityNotification = null;
+        private boolean run;
+
+        public EntityNotificationConsumer(NotificationConsumer<EntityNotification> consumerIterator) {
+            this.consumerIterator = consumerIterator;
+        }
+
+        @Override
+        public void run() {
+            while (run && consumerIterator.hasNext()) {
+                entityNotification = consumerIterator.next();
+            }
+        }
+
+        public void reset() {
+            entityNotification = null;
+        }
+
+        public void start() {
+            Thread thread = new Thread(this);
+            run = true;
+            thread.start();
+        }
+
+        public void stop() {
+            run = false;
+        }
+
+        public EntityNotification getLastEntityNotification() {
+            return entityNotification;
+        }
+    }
+
+    protected void waitForNotification(final EntityNotificationConsumer notificationConsumer, int maxWait) throws Exception {
+        waitFor(maxWait, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return notificationConsumer.getLastEntityNotification() != null;
+            }
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
index f476af3..a268196 100755
--- a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
@@ -19,10 +19,15 @@
 package org.apache.atlas.web.resources;
 
 import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.NotificationModule;
+import org.apache.atlas.notification.entity.EntityNotification;
 import org.apache.atlas.typesystem.IStruct;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
@@ -43,11 +48,14 @@ import org.apache.atlas.web.util.Servlets;
 import org.apache.commons.lang.RandomStringUtils;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONObject;
+import org.junit.AfterClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
+import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import javax.ws.rs.HttpMethod;
@@ -59,10 +67,13 @@ import java.util.Map;
 import java.util.UUID;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
 
 /**
  * Integration tests for Entity Jersey Resource.
  */
+@Guice(modules = NotificationModule.class)
 public class EntityJerseyResourceIT extends BaseResourceIT {
 
     private static final Logger LOG = LoggerFactory.getLogger(EntityJerseyResourceIT.class);
@@ -76,11 +87,32 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
     private Id tableId;
     private String traitName;
 
+    @Inject
+    private NotificationInterface notificationInterface;
+    private EntityNotificationConsumer notificationConsumer;
+
     @BeforeClass
     public void setUp() throws Exception {
         super.setUp();
 
         createTypeDefinitions();
+
+        List<NotificationConsumer<EntityNotification>> consumers =
+                notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
+
+        NotificationConsumer<EntityNotification> consumer = consumers.iterator().next();
+        notificationConsumer = new EntityNotificationConsumer(consumer);
+        notificationConsumer.start();
+    }
+
+    @AfterClass
+    public void tearDown() {
+        notificationConsumer.stop();
+    }
+
+    @BeforeMethod
+    public void setupTest() {
+        notificationConsumer.reset();
     }
 
     @Test
@@ -119,18 +151,32 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
 
     @Test
     public void testEntityDeduping() throws Exception {
-        Referenceable db = new Referenceable(DATABASE_TYPE);
-        String dbName = "db" + randomString();
+        final Referenceable db = new Referenceable(DATABASE_TYPE);
+        final String dbName = "db" + randomString();
         db.set("name", dbName);
         db.set("description", randomString());
 
-        serviceClient.createEntity(db);
+        serviceClient.createEntity(db).getString(0);
+
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
+        EntityNotification notification = notificationConsumer.getLastEntityNotification();
+        assertNotNull(notification);
+        assertEquals(notification.getEntity().get("name"), dbName);
+
         JSONArray results =
                 serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
         assertEquals(results.length(), 1);
 
         //create entity again shouldn't create another instance with same unique attribute value
+        notificationConsumer.reset();
         serviceClient.createEntity(db);
+        try {
+            waitForNotification(notificationConsumer, MAX_WAIT_TIME);
+            fail("Expected time out exception");
+        } catch (Exception e) {
+            //expected timeout
+        }
+
         results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
         assertEquals(results.length(), 1);