You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2015/12/15 11:20:48 UTC
[2/2] incubator-atlas git commit: ATLAS-342 Atlas is sending an
ENTITY_CREATE event to the ATLAS_ENTITIES topic even if the entity exists
already (shwethags)
ATLAS-342 Atlas is sending an ENTITY_CREATE event to the ATLAS_ENTITIES topic even if the entity exists already (shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/8cbb2c13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/8cbb2c13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/8cbb2c13
Branch: refs/heads/master
Commit: 8cbb2c13bbe71957daf0de9222daaf4dd01199d3
Parents: dce31ab
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Tue Dec 15 15:50:33 2015 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Tue Dec 15 15:50:33 2015 +0530
----------------------------------------------------------------------
release-log.txt | 1 +
.../atlas/repository/MetadataRepository.java | 11 +-
.../graph/GraphBackedMetadataRepository.java | 13 ++-
.../atlas/repository/graph/GraphHelper.java | 8 +-
.../graph/TypedInstanceToGraphMapper.java | 100 +++++++++++--------
.../atlas/services/DefaultMetadataService.java | 80 ++++++++-------
.../apache/atlas/BaseHiveRepositoryTest.java | 7 +-
.../GraphBackedMetadataRepositoryTest.java | 27 +++--
.../graph/GraphRepoMapperScaleTest.java | 2 +-
.../service/DefaultMetadataServiceTest.java | 9 +-
.../apache/atlas/services/MetadataService.java | 17 ++--
.../atlas/typesystem/types/TypeUtils.java | 10 +-
.../atlas/web/resources/EntityResource.java | 8 +-
.../notification/EntityNotificationIT.java | 58 +----------
.../atlas/web/resources/BaseResourceIT.java | 54 +++++++++-
.../web/resources/EntityJerseyResourceIT.java | 52 +++++++++-
16 files changed, 268 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 1ad4977..ce3337d 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -14,6 +14,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
ALL CHANGES:
+ATLAS-342 Atlas is sending an ENTITY_CREATE event to the ATLAS_ENTITIES topic even if the entity exists already (shwethags)
ATLAS-386 Handle hive rename Table (shwethags)
ATLAS-374 Doc: Create a wiki for documenting fault tolerance and HA options for Atlas data (yhemanth via sumasai)
ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhemanth via sumasai)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java
index 2091e71..f66a4e5 100755
--- a/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java
@@ -19,12 +19,13 @@
package org.apache.atlas.repository;
import org.apache.atlas.AtlasException;
-import org.apache.atlas.typesystem.exception.EntityExistsException;
-import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
import org.apache.atlas.typesystem.ITypedStruct;
+import org.apache.atlas.typesystem.exception.EntityExistsException;
+import org.apache.atlas.typesystem.exception.EntityNotFoundException;
import org.apache.atlas.typesystem.types.AttributeInfo;
import org.apache.atlas.typesystem.types.IDataType;
+import org.apache.atlas.typesystem.types.TypeUtils;
import java.util.List;
@@ -82,7 +83,7 @@ public interface MetadataRepository {
* @throws RepositoryException
* @throws EntityExistsException
*/
- String[] createEntities(ITypedReferenceableInstance... entities) throws RepositoryException, EntityExistsException;
+ List<String> createEntities(ITypedReferenceableInstance... entities) throws RepositoryException, EntityExistsException;
/**
* Fetch the complete definition of an entity given its GUID.
@@ -143,13 +144,13 @@ public interface MetadataRepository {
* Adds/Updates the property to the entity that corresponds to the GUID
* Supports only primitive attribute/Class Id updations.
*/
- void updatePartial(ITypedReferenceableInstance entity) throws RepositoryException;
+ TypeUtils.Pair<List<String>, List<String>> updatePartial(ITypedReferenceableInstance entity) throws RepositoryException;
/**
* Adds the property to the entity that corresponds to the GUID
* @param entitiesToBeUpdated The entities to be updated
*/
- String[] updateEntities(ITypedReferenceableInstance... entitiesToBeUpdated) throws RepositoryException;
+ TypeUtils.Pair<List<String>, List<String>> updateEntities(ITypedReferenceableInstance... entitiesToBeUpdated) throws RepositoryException;
/**
* Returns the entity for the given type and qualified name
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
index fe1e576..dd64124 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
@@ -37,6 +37,7 @@ import org.apache.atlas.typesystem.types.AttributeInfo;
import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.typesystem.types.TypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,12 +115,14 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
- public String[] createEntities(ITypedReferenceableInstance... entities) throws RepositoryException,
+ public List<String> createEntities(ITypedReferenceableInstance... entities) throws RepositoryException,
EntityExistsException {
LOG.info("adding entities={}", entities);
try {
TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper);
- return instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.CREATE, entities);
+ TypeUtils.Pair<List<String>, List<String>> idPair =
+ instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.CREATE, entities);
+ return idPair.left;
} catch (EntityExistsException e) {
throw e;
} catch (AtlasException e) {
@@ -279,7 +282,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
- public String[] updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException {
+ public TypeUtils.Pair<List<String>, List<String>> updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException {
LOG.info("updating entity {}", entitiesUpdated);
try {
TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper);
@@ -292,11 +295,11 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
@Override
@GraphTransaction
- public void updatePartial(ITypedReferenceableInstance entity) throws RepositoryException {
+ public TypeUtils.Pair<List<String>, List<String>> updatePartial(ITypedReferenceableInstance entity) throws RepositoryException {
LOG.info("updating entity {}", entity);
try {
TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper);
- instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_PARTIAL, entity);
+ return instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_PARTIAL, entity);
} catch (AtlasException e) {
throw new RepositoryException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index 9ac2819..6b2d5d1 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -71,13 +71,13 @@ public final class GraphHelper {
return INSTANCE;
}
- public Vertex createVertexWithIdentity(ITypedReferenceableInstance typedInstance,
- Set<String> superTypeNames) {
+ public Vertex createVertexWithIdentity(ITypedReferenceableInstance typedInstance, Set<String> superTypeNames) {
+ final String guid = UUID.randomUUID().toString();
+
final Vertex vertexWithIdentity = createVertexWithoutIdentity(typedInstance.getTypeName(),
- typedInstance.getId(), superTypeNames);
+ new Id(guid, 0 , typedInstance.getTypeName()), superTypeNames);
// add identity
- final String guid = UUID.randomUUID().toString();
setProperty(vertexWithIdentity, Constants.GUID_PROPERTY_KEY, guid);
// add version information
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
index 7ef5c50..996f31b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
@@ -41,8 +41,8 @@ import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.ObjectGraphWalker;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.utils.MD5Utils;
-import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,36 +79,40 @@ public final class TypedInstanceToGraphMapper {
this.graphToTypedInstanceMapper = graphToTypedInstanceMapper;
}
- String[] mapTypedInstanceToGraph(Operation operation, ITypedReferenceableInstance... typedInstances)
+ TypeUtils.Pair<List<String>, List<String>> mapTypedInstanceToGraph(Operation operation, ITypedReferenceableInstance... typedInstances)
throws AtlasException {
- List<String> guids = new ArrayList<>();
+
+ List<String> createdIds = new ArrayList<>();
+ List<String> updatedIds = new ArrayList<>();
+
for (ITypedReferenceableInstance typedInstance : typedInstances) {
Collection<IReferenceableInstance> newInstances = walkClassInstances(typedInstance);
- Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> instancesPair =
+ TypeUtils.Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> instancesPair =
createVerticesAndDiscoverInstances(newInstances);
switch (operation) {
case CREATE:
- addOrUpdateAttributesAndTraits(operation, instancesPair.getLeft());
- addFullTextProperty(instancesPair.getLeft());
+ List<String> ids = addOrUpdateAttributesAndTraits(operation, instancesPair.left);
+ createdIds.addAll(ids);
+ addFullTextProperty(instancesPair.left);
break;
case UPDATE_FULL:
case UPDATE_PARTIAL:
- List<ITypedReferenceableInstance> instancesForUpdate = instancesPair.getLeft();
- instancesForUpdate.addAll(instancesPair.getRight());
- addOrUpdateAttributesAndTraits(operation, instancesForUpdate);
- addFullTextProperty(instancesForUpdate);
+ ids = addOrUpdateAttributesAndTraits(Operation.CREATE, instancesPair.left);
+ createdIds.addAll(ids);
+ ids = addOrUpdateAttributesAndTraits(operation, instancesPair.right);
+ updatedIds.addAll(ids);
+
+ addFullTextProperty(instancesPair.left);
+ addFullTextProperty(instancesPair.right);
break;
case DELETE:
throw new UnsupportedOperationException("Not handled - " + operation);
}
-
- //Return guid for
- addToGuids(typedInstance, guids);
}
- return guids.toArray(new String[guids.size()]);
+ return TypeUtils.Pair.of(createdIds, updatedIds);
}
private Collection<IReferenceableInstance> walkClassInstances(ITypedReferenceableInstance typedInstance)
@@ -126,18 +130,21 @@ public final class TypedInstanceToGraphMapper {
return entityProcessor.getInstances();
}
- private void addOrUpdateAttributesAndTraits(Operation operation, List<ITypedReferenceableInstance> instances) throws AtlasException {
+ private List<String> addOrUpdateAttributesAndTraits(Operation operation, List<ITypedReferenceableInstance> instances) throws AtlasException {
+ List<String> guids = new ArrayList<>();
for (ITypedReferenceableInstance instance : instances) {
try {
//new vertex, set all the properties
- addOrUpdateAttributesAndTraits(operation, instance);
+ String guid = addOrUpdateAttributesAndTraits(operation, instance);
+ guids.add(guid);
} catch (SchemaViolationException e) {
throw new EntityExistsException(instance, e);
}
}
+ return guids;
}
- private void addOrUpdateAttributesAndTraits(Operation operation, ITypedReferenceableInstance typedInstance)
+ private String addOrUpdateAttributesAndTraits(Operation operation, ITypedReferenceableInstance typedInstance)
throws AtlasException {
LOG.debug("Adding/Updating typed instance {}", typedInstance.getTypeName());
@@ -158,6 +165,8 @@ public final class TypedInstanceToGraphMapper {
//TODO - Handle Trait updates
addTraits(typedInstance, instanceVertex, classType);
}
+
+ return getId(typedInstance)._getId();
}
private void mapInstanceToVertex(ITypedInstance typedInstance, Vertex instanceVertex,
@@ -215,14 +224,16 @@ public final class TypedInstanceToGraphMapper {
}
}
- private Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> createVerticesAndDiscoverInstances(
+ private TypeUtils.Pair<List<ITypedReferenceableInstance>, List<ITypedReferenceableInstance>> createVerticesAndDiscoverInstances(
Collection<IReferenceableInstance> instances) throws AtlasException {
List<ITypedReferenceableInstance> instancesToCreate = new ArrayList<>();
List<ITypedReferenceableInstance> instancesToUpdate = new ArrayList<>();
for (IReferenceableInstance instance : instances) {
+ ITypedReferenceableInstance newInstance;
Id id = instance.getId();
+
if (!idToVertexMap.containsKey(id)) {
Vertex instanceVertex;
if (id.isAssigned()) { // has a GUID
@@ -231,7 +242,9 @@ public final class TypedInstanceToGraphMapper {
throw new IllegalStateException(
String.format("%s is not of type ITypedReferenceableInstance", instance));
}
- instancesToUpdate.add((ITypedReferenceableInstance) instance);
+ newInstance = (ITypedReferenceableInstance) instance;
+ instancesToUpdate.add(newInstance);
+
} else {
//Check if there is already an instance with the same unique attribute value
ClassType classType = typeSystem.getDataType(ClassType.class, instance.getTypeName());
@@ -239,31 +252,28 @@ public final class TypedInstanceToGraphMapper {
//no entity with the given unique attribute, create new
if (instanceVertex == null) {
- ITypedReferenceableInstance newInstance = classType.convert(instance, Multiplicity.REQUIRED);
+ newInstance = classType.convert(instance, Multiplicity.REQUIRED);
instanceVertex = graphHelper.createVertexWithIdentity(newInstance, classType.getAllSuperTypeNames());
instancesToCreate.add(newInstance);
//Map only unique attributes for cases of circular references
mapInstanceToVertex(newInstance, instanceVertex, classType.fieldMapping().fields, true, Operation.CREATE);
+
} else {
if (!(instance instanceof ReferenceableInstance)) {
throw new IllegalStateException(
String.format("%s is not of type ITypedReferenceableInstance", instance));
}
- instancesToUpdate.add((ITypedReferenceableInstance) instance);
+ newInstance = (ITypedReferenceableInstance) instance;
+ instancesToUpdate.add(newInstance);
}
}
+ //Set the id in the new instance
idToVertexMap.put(id, instanceVertex);
}
}
- return Pair.of(instancesToCreate, instancesToUpdate);
- }
-
- private void addToGuids(ITypedReferenceableInstance typedInstance, List<String> guids) {
- Vertex instanceVertex = idToVertexMap.get(typedInstance.getId());
- String guid = instanceVertex.getProperty(Constants.GUID_PROPERTY_KEY);
- guids.add(guid);
+ return TypeUtils.Pair.of(instancesToCreate, instancesToUpdate);
}
private void addFullTextProperty(List<ITypedReferenceableInstance> instances) throws AtlasException {
@@ -275,7 +285,8 @@ public final class TypedInstanceToGraphMapper {
}
}
- private void addTraits(ITypedReferenceableInstance typedInstance, Vertex instanceVertex, ClassType classType) throws AtlasException {
+ private void addTraits(ITypedReferenceableInstance typedInstance, Vertex instanceVertex, ClassType classType)
+ throws AtlasException {
for (String traitName : typedInstance.getTraits()) {
LOG.debug("mapping trait {}", traitName);
GraphHelper.addProperty(instanceVertex, Constants.TRAIT_NAMES_PROPERTY_KEY, traitName);
@@ -288,7 +299,8 @@ public final class TypedInstanceToGraphMapper {
/******************************************** STRUCT **************************************************/
- private Pair<Vertex, Edge> updateStructVertex(ITypedStruct structInstance, Edge relEdge, Operation operation) throws AtlasException {
+ private TypeUtils.Pair<Vertex, Edge> updateStructVertex(ITypedStruct structInstance, Edge relEdge,
+ Operation operation) throws AtlasException {
//Already existing vertex. Update
Vertex structInstanceVertex = relEdge.getVertex(Direction.IN);
@@ -303,10 +315,11 @@ public final class TypedInstanceToGraphMapper {
mapInstanceToVertex(structInstance, structInstanceVertex, structInstance.fieldMapping().fields, false, operation);
GraphHelper.setProperty(structInstanceVertex, SIGNATURE_HASH_PROPERTY_KEY, String.valueOf(newSignature));
}
- return Pair.of(structInstanceVertex, relEdge);
+ return TypeUtils.Pair.of(structInstanceVertex, relEdge);
}
- private Pair<Vertex, Edge> addStructVertex(ITypedStruct structInstance, Vertex instanceVertex, AttributeInfo attributeInfo, String edgeLabel) throws AtlasException {
+ private TypeUtils.Pair<Vertex, Edge> addStructVertex(ITypedStruct structInstance, Vertex instanceVertex,
+ AttributeInfo attributeInfo, String edgeLabel) throws AtlasException {
// add a new vertex for the struct or trait instance
Vertex structInstanceVertex = graphHelper.createVertexWithoutIdentity(structInstance.getTypeName(), null,
Collections.<String>emptySet()); // no super types for struct type
@@ -317,7 +330,7 @@ public final class TypedInstanceToGraphMapper {
// add an edge to the newly created vertex from the parent
Edge relEdge = graphHelper.addEdge(instanceVertex, structInstanceVertex, edgeLabel);
- return Pair.of(structInstanceVertex, relEdge);
+ return TypeUtils.Pair.of(structInstanceVertex, relEdge);
}
/******************************************** ARRAY **************************************************/
@@ -443,7 +456,7 @@ public final class TypedInstanceToGraphMapper {
private String addOrUpdateStruct(Vertex instanceVertex, AttributeInfo attributeInfo, IDataType elementType,
ITypedStruct structAttr, String curVal,
String edgeLabel, Operation operation) throws AtlasException {
- Pair<Vertex, Edge> vertexEdgePair = null;
+ TypeUtils.Pair<Vertex, Edge> vertexEdgePair = null;
if (curVal != null && structAttr == null) {
//remove edge
removeUnusedReference(curVal, attributeInfo, elementType);
@@ -456,7 +469,7 @@ public final class TypedInstanceToGraphMapper {
vertexEdgePair = addStructVertex(structAttr, instanceVertex, attributeInfo, edgeLabel);
}
- return (vertexEdgePair != null) ? vertexEdgePair.getRight().getId().toString() : null;
+ return (vertexEdgePair != null) ? vertexEdgePair.right.getId().toString() : null;
}
private String addOrUpdateClassVertex(Vertex instanceVertex, AttributeInfo attributeInfo, IDataType elementType,
@@ -468,27 +481,28 @@ public final class TypedInstanceToGraphMapper {
throw new EntityNotFoundException("Could not find vertex for Class Reference " + newVal);
}
- Pair<Vertex, Edge> vertexEdgePair = null;
+ TypeUtils.Pair<Vertex, Edge> vertexEdgePair = null;
if (curVal != null && newVal == null) {
//remove edge
removeUnusedReference(curVal, attributeInfo, elementType);
} else if (curVal != null && newVal != null) {
Edge edge = graphHelper.getOutGoingEdgeById(curVal);
Id classRefId = getId(newVal);
- vertexEdgePair = updateClassEdge(classRefId, newVal, instanceVertex, edge, toVertex, attributeInfo, elementType, edgeLabel, operation);
+ vertexEdgePair = updateClassEdge(classRefId, newVal, instanceVertex, edge, toVertex, attributeInfo,
+ elementType, edgeLabel, operation);
} else if (newVal != null){
vertexEdgePair = addClassEdge(instanceVertex, toVertex, edgeLabel);
}
- return (vertexEdgePair != null) ? vertexEdgePair.getRight().getId().toString() : null;
+ return (vertexEdgePair != null) ? vertexEdgePair.right.getId().toString() : null;
}
/******************************************** CLASS **************************************************/
- private Pair<Vertex, Edge> addClassEdge(Vertex instanceVertex, Vertex toVertex, String edgeLabel) throws AtlasException {
+ private TypeUtils.Pair<Vertex, Edge> addClassEdge(Vertex instanceVertex, Vertex toVertex, String edgeLabel) throws AtlasException {
// add an edge to the class vertex from the instance
Edge edge = graphHelper.addEdge(instanceVertex, toVertex, edgeLabel);
- return Pair.of(toVertex, edge);
+ return TypeUtils.Pair.of(toVertex, edge);
}
private Vertex getClassVertex(ITypedReferenceableInstance typedReference) throws EntityNotFoundException {
@@ -521,11 +535,11 @@ public final class TypedInstanceToGraphMapper {
}
- private Pair<Vertex, Edge> updateClassEdge(Id id, final ITypedReferenceableInstance typedInstance,
+ private TypeUtils.Pair<Vertex, Edge> updateClassEdge(Id id, final ITypedReferenceableInstance typedInstance,
Vertex instanceVertex, Edge edge, Vertex toVertex,
AttributeInfo attributeInfo, IDataType dataType,
String edgeLabel, Operation operation) throws AtlasException {
- Pair<Vertex, Edge> result = Pair.of(toVertex, edge);
+ TypeUtils.Pair<Vertex, Edge> result = TypeUtils.Pair.of(toVertex, edge);
Edge newEdge = edge;
// Update edge if it exists
Vertex invertex = edge.getVertex(Direction.IN);
@@ -535,7 +549,7 @@ public final class TypedInstanceToGraphMapper {
// add an edge to the class vertex from the instance
if(toVertex != null) {
newEdge = graphHelper.addEdge(instanceVertex, toVertex, edgeLabel);
- result = Pair.of(toVertex, newEdge);
+ result = TypeUtils.Pair.of(toVertex, newEdge);
}
removeUnusedReference(edge.getId().toString(), attributeInfo, dataType);
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
index f605c26..b38face 100755
--- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
+++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
@@ -23,16 +23,12 @@ import com.google.common.collect.ImmutableList;
import com.google.inject.Provider;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
-import org.apache.atlas.repository.RepositoryException;
-import org.apache.atlas.typesystem.exception.EntityNotFoundException;
-import org.apache.atlas.typesystem.exception.TypeNotFoundException;
-import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
-import org.apache.atlas.utils.ParamChecker;
import org.apache.atlas.classification.InterfaceAudience;
import org.apache.atlas.listener.EntityChangeListener;
import org.apache.atlas.listener.TypesChangeListener;
import org.apache.atlas.repository.IndexCreationException;
import org.apache.atlas.repository.MetadataRepository;
+import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.typestore.ITypeStore;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
@@ -40,9 +36,12 @@ import org.apache.atlas.typesystem.ITypedStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.TypesDef;
+import org.apache.atlas.typesystem.exception.EntityNotFoundException;
+import org.apache.atlas.typesystem.exception.TypeNotFoundException;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.AttributeInfo;
import org.apache.atlas.typesystem.types.ClassType;
@@ -54,25 +53,24 @@ import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.typesystem.types.ValueConversionException;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
+import org.apache.atlas.utils.ParamChecker;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.actors.threadpool.Arrays;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
/**
* Simple wrapper over TypeSystem and MetadataRepository services with hooks
@@ -279,17 +277,10 @@ public class DefaultMetadataService implements MetadataService {
ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition);
- final String[] guids = repository.createEntities(typedInstances);
-
- Set<ITypedReferenceableInstance> entitites = new HashSet<>();
+ final List<String> guids = repository.createEntities(typedInstances);
- for (String guid : guids) {
- entitites.add(repository.getEntityDefinition(guid));
- }
-
- onEntitiesAdded(entitites);
-
- return new JSONArray(Arrays.asList(guids)).toString();
+ onEntitiesAdded(guids);
+ return new JSONArray(guids).toString();
}
private ITypedReferenceableInstance[] deserializeClassInstances(String entityInstanceDefinition)
@@ -390,14 +381,20 @@ public class DefaultMetadataService implements MetadataService {
ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition cannot be empty");
ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition);
- String[] guids = repository.updateEntities(typedInstances);
- onEntitiesAdded(Arrays.asList(typedInstances));
+ TypeUtils.Pair<List<String>, List<String>> guids = repository.updateEntities(typedInstances);
+ return onEntitiesAddedUpdated(guids);
+ }
+
+ private String onEntitiesAddedUpdated(TypeUtils.Pair<List<String>, List<String>> guids) throws AtlasException {
+ onEntitiesAdded(guids.left);
+ onEntitiesUpdated(guids.right);
- return new JSONArray(Arrays.asList(guids)).toString();
+ guids.left.addAll(guids.right);
+ return new JSONArray(guids.left).toString();
}
@Override
- public void updateEntityAttributeByGuid(final String guid, String attributeName, String value) throws AtlasException {
+ public String updateEntityAttributeByGuid(final String guid, String attributeName, String value) throws AtlasException {
ParamChecker.notEmpty(guid, "guid cannot be null");
ParamChecker.notEmpty(attributeName, "property cannot be null");
ParamChecker.notEmpty(value, "property value cannot be null");
@@ -426,10 +423,8 @@ public class DefaultMetadataService implements MetadataService {
}
((ReferenceableInstance)newInstance).replaceWithNewId(new Id(guid, 0, newInstance.getTypeName()));
- repository.updatePartial(newInstance);
- onEntitiesUpdated(new ArrayList<ITypedReferenceableInstance>() {{
- add(repository.getEntityDefinition(guid));
- }});
+ TypeUtils.Pair<List<String>, List<String>> guids = repository.updatePartial(newInstance);
+ return onEntitiesAddedUpdated(guids);
}
private ITypedReferenceableInstance validateEntityExists(String guid)
@@ -442,7 +437,7 @@ public class DefaultMetadataService implements MetadataService {
}
@Override
- public void updateEntityPartialByGuid(final String guid, Referenceable newEntity) throws AtlasException {
+ public String updateEntityPartialByGuid(final String guid, Referenceable newEntity) throws AtlasException {
ParamChecker.notEmpty(guid, "guid cannot be null");
ParamChecker.notNull(newEntity, "updatedEntity cannot be null");
ITypedReferenceableInstance existInstance = validateEntityExists(guid);
@@ -450,10 +445,8 @@ public class DefaultMetadataService implements MetadataService {
ITypedReferenceableInstance newInstance = convertToTypedInstance(newEntity, existInstance.getTypeName());
((ReferenceableInstance)newInstance).replaceWithNewId(new Id(guid, 0, newInstance.getTypeName()));
- repository.updatePartial(newInstance);
- onEntitiesUpdated(new ArrayList<ITypedReferenceableInstance>() {{
- add(repository.getEntityDefinition(guid));
- }});
+ TypeUtils.Pair<List<String>, List<String>> guids = repository.updatePartial(newInstance);
+ return onEntitiesAddedUpdated(guids);
}
private ITypedReferenceableInstance convertToTypedInstance(Referenceable updatedEntity, String typeName) throws AtlasException {
@@ -511,13 +504,8 @@ public class DefaultMetadataService implements MetadataService {
final ITypedReferenceableInstance newInstance = convertToTypedInstance(updatedEntity, typeName);
((ReferenceableInstance)newInstance).replaceWithNewId(oldInstance.getId());
- repository.updatePartial(newInstance);
-
- onEntitiesUpdated(new ArrayList<ITypedReferenceableInstance>() {{
- add(newInstance);
- }});
-
- return newInstance.getId()._getId();
+ TypeUtils.Pair<List<String>, List<String>> guids = repository.updatePartial(newInstance);
+ return onEntitiesAddedUpdated(guids);
}
private void validateTypeExists(String entityType) throws AtlasException {
@@ -633,12 +621,22 @@ public class DefaultMetadataService implements MetadataService {
}
}
- private void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
+ private void onEntitiesAdded(List<String> guids) throws AtlasException {
+ List<ITypedReferenceableInstance> entities = loadEntities(guids);
for (EntityChangeListener listener : entityChangeListeners) {
listener.onEntitiesAdded(entities);
}
}
+ private List<ITypedReferenceableInstance> loadEntities(List<String> guids) throws EntityNotFoundException,
+ RepositoryException {
+ List<ITypedReferenceableInstance> entities = new ArrayList<>();
+ for (String guid : guids) {
+ entities.add(repository.getEntityDefinition(guid));
+ }
+ return entities;
+ }
+
private void onTypesUpdated(Map<String, IDataType> typesUpdated) throws AtlasException {
Map<TypesChangeListener, Throwable> caughtExceptions = new HashMap<>();
for (Provider<TypesChangeListener> indexerProvider : typeChangeListeners) {
@@ -656,8 +654,8 @@ public class DefaultMetadataService implements MetadataService {
}
}
- private void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities)
- throws AtlasException {
+ private void onEntitiesUpdated(List<String> guids) throws AtlasException {
+ List<ITypedReferenceableInstance> entities = loadEntities(guids);
for (EntityChangeListener listener : entityChangeListeners) {
listener.onEntitiesUpdated(entities);
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
index 1075d85..a49967f 100644
--- a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
+++ b/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
@@ -346,13 +346,10 @@ public class BaseHiveRepositoryTest {
return createInstance(referenceable, clsType);
}
private Id createInstance(Referenceable referenceable, ClassType clsType) throws Exception {
-// String entityJSON = InstanceSerialization.toJson(referenceable, true);
-
-
ITypedReferenceableInstance typedInstance = clsType.convert(referenceable, Multiplicity.REQUIRED);
- String guid = repository.createEntities(typedInstance)[0];
+ List<String> guids = repository.createEntities(typedInstance);
// return the reference to created instance with guid
- return new Id(guid, 0, referenceable.getTypeName());
+ return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
index 78af443..22ff1d6 100755
--- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
@@ -118,10 +118,10 @@ public class GraphBackedMetadataRepositoryTest {
ClassType deptType = typeSystem.getDataType(ClassType.class, "Department");
ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED);
- String[] guids = repositoryService.createEntities(hrDept2);
+ List<String> guids = repositoryService.createEntities(hrDept2);
Assert.assertNotNull(guids);
- Assert.assertEquals(guids.length, 1);
- guid = guids[0];
+ Assert.assertEquals(guids.size(), 5);
+ guid = guids.get(4);
Assert.assertNotNull(guid);
}
@@ -173,14 +173,12 @@ public class GraphBackedMetadataRepositoryTest {
ITypedReferenceableInstance db = dbType.convert(databaseInstance, Multiplicity.REQUIRED);
System.out.println("db = " + db);
- String dbGUID = repositoryService.createEntities(db)[0];
- System.out.println("added db = " + dbGUID);
-
- Referenceable dbInstance = new Referenceable(dbGUID, TestUtils.DATABASE_TYPE, databaseInstance.getValuesMap());
-
- ITypedReferenceableInstance table = createHiveTableInstance(dbInstance);
- String tableGUID = repositoryService.createEntities(table)[0];
- System.out.println("added table = " + tableGUID);
+ //Reuse the same database instance without id, with the same unique attribute
+ ITypedReferenceableInstance table = createHiveTableInstance(databaseInstance);
+ List<String> guids = repositoryService.createEntities(db, table);
+ Assert.assertEquals(guids.size(), 7); //1 db + 5 columns + 1 table. Shouldn't create db again
+ System.out.println("added db = " + guids.get(0));
+ System.out.println("added table = " + guids.get(6));
}
@Test(dependsOnMethods = "testCreateEntity")
@@ -600,9 +598,10 @@ public class GraphBackedMetadataRepositoryTest {
ClassType deptType = typeSystem.getDataType(ClassType.class, "Department");
ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED);
- String[] guids = repositoryService.createEntities(hrDept2);
+ List<String> guids = repositoryService.createEntities(hrDept2);
Assert.assertNotNull(guids);
- Assert.assertEquals(guids.length, 1);
- Assert.assertNotNull(guids[0]);
+ Assert.assertEquals(guids.size(), 2);
+ Assert.assertNotNull(guids.get(0));
+ Assert.assertNotNull(guids.get(1));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java
index c25ccf8..2d01bbe 100755
--- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java
@@ -101,7 +101,7 @@ public class GraphRepoMapperScaleTest {
ClassType dbType = typeSystem.getDataType(ClassType.class, TestUtils.DATABASE_TYPE);
ITypedReferenceableInstance db = dbType.convert(databaseInstance, Multiplicity.REQUIRED);
- dbGUID = repositoryService.createEntities(db)[0];
+ dbGUID = repositoryService.createEntities(db).get(0);
Referenceable dbInstance = new Referenceable(dbGUID, TestUtils.DATABASE_TYPE, databaseInstance.getValuesMap());
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
index 0307fd4..8a80d8c 100644
--- a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java
@@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@Guice(modules = RepositoryMetadataModule.class)
@@ -113,7 +114,11 @@ public class DefaultMetadataServiceTest {
JSONArray entitiesJson = new JSONArray();
entitiesJson.put(entityjson);
String response = metadataService.createEntities(entitiesJson.toString());
- return new JSONArray(response).getString(0);
+ JSONArray guids = new JSONArray(response);
+ if (guids != null && guids.length() > 0) {
+ return guids.getString(0);
+ }
+ return null;
}
private String updateInstance(Referenceable entity) throws Exception {
@@ -154,7 +159,7 @@ public class DefaultMetadataServiceTest {
//using the same name should succeed, but not create another entity
String newId = createInstance(entity);
- Assert.assertEquals(newId, id);
+ assertNull(newId);
//Same entity, but different qualified name should succeed
entity.set("name", TestUtils.randomString());
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/server-api/src/main/java/org/apache/atlas/services/MetadataService.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/services/MetadataService.java b/server-api/src/main/java/org/apache/atlas/services/MetadataService.java
index 0cfed2e..a91c0a0 100644
--- a/server-api/src/main/java/org/apache/atlas/services/MetadataService.java
+++ b/server-api/src/main/java/org/apache/atlas/services/MetadataService.java
@@ -74,7 +74,7 @@ public interface MetadataService {
* Creates an entity, instance of the type.
*
* @param entityDefinition definition
- * @return guid
+ * @return json array of guids of entities created
*/
String createEntities(String entityDefinition) throws AtlasException;
@@ -107,25 +107,28 @@ public interface MetadataService {
/**
* Adds the property to the given entity id(guid).
* Currently supports updates only on PRIMITIVE, CLASS attribute types
- *
- * @param guid entity id
+ * @param guid entity id
* @param attribute property name
* @param value property value
+ * @return json array of guids of entities created/updated
*/
- void updateEntityAttributeByGuid(String guid, String attribute, String value) throws AtlasException;
+ String updateEntityAttributeByGuid(String guid, String attribute, String value) throws AtlasException;
/**
* Supports Partial updates of an entity. Users can update a subset of attributes for an entity identified by its guid
* Note however that it cannot be used to set attribute values to null or delete attrbute values
- *
+ * @param guid entity id
+ * @param entity
+ * @return json array of guids of entities created/updated
+ * @throws AtlasException
*/
- void updateEntityPartialByGuid(String guid, Referenceable entity) throws AtlasException;
+ String updateEntityPartialByGuid(String guid, Referenceable entity) throws AtlasException;
/**
* Batch API - Adds/Updates the given entity id(guid).
*
* @param entityJson entity json
- * @return List of guids which were updated and ones which were newly created as part of the updated entity
+ * @return json array of guids of entities created/updated
*/
String updateEntities(String entityJson) throws AtlasException;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java
index 9d2480b..f5c2ce9 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java
@@ -70,14 +70,18 @@ public class TypeUtils {
return b;
}
- protected static class Pair<L, R> {
- protected L left;
- protected R right;
+ public static class Pair<L, R> {
+ public L left;
+ public R right;
public Pair(L left, R right) {
this.left = left;
this.right = right;
}
+
+ public static <L, R> Pair<L, R> of(L left, R right) {
+ return new Pair<>(left, right);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
index 2ee0027..f5ab4d8 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java
@@ -118,8 +118,12 @@ public class EntityResource {
JSONObject response = new JSONObject();
response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId());
- response.put(AtlasClient.GUID, new JSONArray(guids));
- response.put(AtlasClient.DEFINITION, new JSONObject(metadataService.getEntityDefinition(new JSONArray(guids).getString(0))));
+ JSONArray guidArray = new JSONArray(guids);
+ response.put(AtlasClient.GUID, guidArray);
+ if (guidArray.length() > 0) {
+ response.put(AtlasClient.DEFINITION,
+ new JSONObject(metadataService.getEntityDefinition(new JSONArray(guids).getString(0))));
+ }
return Response.created(locationURI).entity(response).build();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
index 4d2cce7..b2501ec 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
@@ -58,7 +58,6 @@ public class EntityNotificationIT extends BaseResourceIT {
private static final String ENTITIES = "api/atlas/entities";
private static final String TRAITS = "traits";
- private static final int MAX_WAIT_TIME = 10000;
private final String DATABASE_NAME = "db" + randomString();
private final String TABLE_NAME = "table" + randomString();
@Inject
@@ -98,7 +97,7 @@ public class EntityNotificationIT extends BaseResourceIT {
final String guid = tableId._getId();
- waitForNotification(MAX_WAIT_TIME);
+ waitForNotification(notificationConsumer, MAX_WAIT_TIME);
EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
@@ -120,7 +119,7 @@ public class EntityNotificationIT extends BaseResourceIT {
serviceClient.updateEntityAttribute(guid, property, newValue);
- waitForNotification(MAX_WAIT_TIME);
+ waitForNotification(notificationConsumer, MAX_WAIT_TIME);
EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
@@ -155,7 +154,7 @@ public class EntityNotificationIT extends BaseResourceIT {
ClientResponse clientResponse = addTrait(guid, traitInstanceJSON);
assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode());
- waitForNotification(MAX_WAIT_TIME);
+ waitForNotification(notificationConsumer, MAX_WAIT_TIME);
EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
@@ -192,7 +191,7 @@ public class EntityNotificationIT extends BaseResourceIT {
clientResponse = addTrait(guid, traitInstanceJSON);
assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode());
- waitForNotification(MAX_WAIT_TIME);
+ waitForNotification(notificationConsumer, MAX_WAIT_TIME);
entityNotification = notificationConsumer.getLastEntityNotification();
@@ -218,7 +217,7 @@ public class EntityNotificationIT extends BaseResourceIT {
ClientResponse clientResponse = deleteTrait(guid, traitName);
Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
- waitForNotification(MAX_WAIT_TIME);
+ waitForNotification(notificationConsumer, MAX_WAIT_TIME);
EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
@@ -260,51 +259,4 @@ public class EntityNotificationIT extends BaseResourceIT {
return resource.accept(Servlets.JSON_MEDIA_TYPE).type(Servlets.JSON_MEDIA_TYPE)
.method(HttpMethod.DELETE, ClientResponse.class);
}
-
- private void waitForNotification(int maxWait) throws Exception {
- waitFor(maxWait, new Predicate() {
- @Override
- public boolean evaluate() throws Exception {
- return notificationConsumer.getLastEntityNotification() != null;
- }
- });
- }
-
-
- // ----- inner class : EntityNotificationConsumer --------------------------
-
- private static class EntityNotificationConsumer implements Runnable {
- private final NotificationConsumer<EntityNotification> consumerIterator;
- private EntityNotification entityNotification = null;
- private boolean run;
-
- public EntityNotificationConsumer(NotificationConsumer<EntityNotification> consumerIterator) {
- this.consumerIterator = consumerIterator;
- }
-
- @Override
- public void run() {
- while (run && consumerIterator.hasNext()) {
- entityNotification = consumerIterator.next();
- }
- }
-
- public void reset() {
- entityNotification = null;
- }
-
- public void start() {
- Thread thread = new Thread(this);
- run = true;
- thread.start();
- }
-
- public void stop() {
- run = false;
- }
-
- public EntityNotification getLastEntityNotification() {
- return entityNotification;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
index 361cece..aba191c 100755
--- a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
@@ -25,6 +25,8 @@ import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import org.apache.atlas.*;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.TypesDef;
@@ -68,6 +70,7 @@ public abstract class BaseResourceIT {
protected WebResource service;
protected AtlasClient serviceClient;
public static final Logger LOG = LoggerFactory.getLogger(BaseResourceIT.class);
+ protected static final int MAX_WAIT_TIME = 1000;
@BeforeClass
public void setUp() throws Exception {
@@ -119,7 +122,10 @@ public abstract class BaseResourceIT {
System.out.println("created instance for type " + typeName + ", guid: " + guids);
// return the reference to created instance with guid
- return new Id(guids.getString(0), 0, referenceable.getTypeName());
+ if (guids.length() > 0) {
+ return new Id(guids.getString(guids.length() - 1), 0, referenceable.getTypeName());
+ }
+ return null;
}
protected static final String DATABASE_TYPE = "hive_db";
@@ -285,4 +291,50 @@ public abstract class BaseResourceIT {
throw new Exception("Waiting timed out after " + timeout + " msec");
}
}
+
+ // ----- inner class : EntityNotificationConsumer --------------------------
+
+ protected static class EntityNotificationConsumer implements Runnable {
+ private final NotificationConsumer<EntityNotification> consumerIterator;
+ private EntityNotification entityNotification = null;
+ private boolean run;
+
+ public EntityNotificationConsumer(NotificationConsumer<EntityNotification> consumerIterator) {
+ this.consumerIterator = consumerIterator;
+ }
+
+ @Override
+ public void run() {
+ while (run && consumerIterator.hasNext()) {
+ entityNotification = consumerIterator.next();
+ }
+ }
+
+ public void reset() {
+ entityNotification = null;
+ }
+
+ public void start() {
+ Thread thread = new Thread(this);
+ run = true;
+ thread.start();
+ }
+
+ public void stop() {
+ run = false;
+ }
+
+ public EntityNotification getLastEntityNotification() {
+ return entityNotification;
+ }
+ }
+
+ protected void waitForNotification(final EntityNotificationConsumer notificationConsumer, int maxWait) throws Exception {
+ waitFor(maxWait, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return notificationConsumer.getLastEntityNotification() != null;
+ }
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8cbb2c13/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
index f476af3..a268196 100755
--- a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
@@ -19,10 +19,15 @@
package org.apache.atlas.web.resources;
import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.NotificationModule;
+import org.apache.atlas.notification.entity.EntityNotification;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
@@ -43,11 +48,14 @@ import org.apache.atlas.web.util.Servlets;
import org.apache.commons.lang.RandomStringUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
+import org.junit.AfterClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
+import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.ws.rs.HttpMethod;
@@ -59,10 +67,13 @@ import java.util.Map;
import java.util.UUID;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
/**
* Integration tests for Entity Jersey Resource.
*/
+@Guice(modules = NotificationModule.class)
public class EntityJerseyResourceIT extends BaseResourceIT {
private static final Logger LOG = LoggerFactory.getLogger(EntityJerseyResourceIT.class);
@@ -76,11 +87,32 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
private Id tableId;
private String traitName;
+ @Inject
+ private NotificationInterface notificationInterface;
+ private EntityNotificationConsumer notificationConsumer;
+
@BeforeClass
public void setUp() throws Exception {
super.setUp();
createTypeDefinitions();
+
+ List<NotificationConsumer<EntityNotification>> consumers =
+ notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1);
+
+ NotificationConsumer<EntityNotification> consumer = consumers.iterator().next();
+ notificationConsumer = new EntityNotificationConsumer(consumer);
+ notificationConsumer.start();
+ }
+
+ @AfterClass
+ public void tearDown() {
+ notificationConsumer.stop();
+ }
+
+ @BeforeMethod
+ public void setupTest() {
+ notificationConsumer.reset();
}
@Test
@@ -119,18 +151,32 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
@Test
public void testEntityDeduping() throws Exception {
- Referenceable db = new Referenceable(DATABASE_TYPE);
- String dbName = "db" + randomString();
+ final Referenceable db = new Referenceable(DATABASE_TYPE);
+ final String dbName = "db" + randomString();
db.set("name", dbName);
db.set("description", randomString());
- serviceClient.createEntity(db);
+ serviceClient.createEntity(db).getString(0);
+
+ waitForNotification(notificationConsumer, MAX_WAIT_TIME);
+ EntityNotification notification = notificationConsumer.getLastEntityNotification();
+ assertNotNull(notification);
+ assertEquals(notification.getEntity().get("name"), dbName);
+
JSONArray results =
serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
assertEquals(results.length(), 1);
//create entity again shouldn't create another instance with same unique attribute value
+ notificationConsumer.reset();
serviceClient.createEntity(db);
+ try {
+ waitForNotification(notificationConsumer, MAX_WAIT_TIME);
+ fail("Expected time out exception");
+ } catch (Exception e) {
+ //expected timeout
+ }
+
results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName));
assertEquals(results.length(), 1);