You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/07/29 01:35:43 UTC
atlas git commit: ATLAS-1995: updated
entity-lookup-by-unique-attributes to use indexQuery
Repository: atlas
Updated Branches:
refs/heads/master d8b868339 -> 6fb2a0388
ATLAS-1995: updated entity-lookup-by-unique-attributes to use indexQuery
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/6fb2a038
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/6fb2a038
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/6fb2a038
Branch: refs/heads/master
Commit: 6fb2a0388a89369ceec57a74954d129df600e163
Parents: d8b8683
Author: ashutoshm <am...@hortonworks.com>
Authored: Thu Jul 27 15:10:36 2017 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Jul 28 18:18:19 2017 -0700
----------------------------------------------------------------------
.../graph/GraphBackedMetadataRepository.java | 1 -
.../store/graph/AtlasTypeDefGraphStore.java | 6 +
.../store/graph/v1/AtlasEntityStoreV1.java | 4 +-
.../store/graph/v1/AtlasGraphUtilsV1.java | 106 ++++++++++-
.../GraphBackedMetadataRepositoryTest.java | 3 +-
.../graph/v1/AtlasDeleteHandlerV1Test.java | 7 +-
.../graph/v1/AtlasRelationshipStoreV1Test.java | 10 +-
.../notification/NotificationHookConsumer.java | 175 ++++++++++---------
8 files changed, 221 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
index 0f3b06b..50b7116 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java
@@ -168,7 +168,6 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
}
@Override
- @GraphTransaction
public ITypedReferenceableInstance getEntityDefinition(String guid) throws RepositoryException, EntityNotFoundException {
return getEntityDefinitions(guid).get(0);
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
index 517da68..3638e19 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java
@@ -378,6 +378,12 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ
}
}
+ try {
+ ttr.updateTypes(ret);
+ } catch (AtlasBaseException e) { // this shouldn't happen, as the types were already validated
+ LOG.error("failed to update the registry after updating the store", e);
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("<== AtlasTypeDefGraphStore.createUpdateTypesDef({}, {}): {}", typesToCreate, typesToUpdate, ret);
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index f340330..1c168b4 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -307,7 +307,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
entity.setGuid(guid);
- return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), true);
+ return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), true, false);
}
@Override
@@ -358,7 +358,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
throw new AtlasBaseException(AtlasErrorCode.ATTRIBUTE_UPDATE_NOT_SUPPORTED, attrName, attrType.getTypeName());
}
- return createOrUpdate(new AtlasEntityStream(updateEntity), true);
+ return createOrUpdate(new AtlasEntityStream(updateEntity), true, false);
}
@Override
http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
index 948d9dd..227f7cd 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
@@ -18,7 +18,9 @@
package org.apache.atlas.repository.store.graph.v1;
+import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.discovery.SearchProcessor;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
@@ -29,12 +31,14 @@ import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +47,7 @@ import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
/**
* Utility methods for Graph.
@@ -55,6 +60,19 @@ public class AtlasGraphUtilsV1 {
public static final String VERTEX_TYPE = "typeSystem";
public static final String RELATIONSHIPTYPE_EDGE_LABEL = PROPERTY_PREFIX + ".relationshipType";
+ private static boolean USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES = false;
+
+ static {
+ try {
+ Configuration conf = ApplicationProperties.get();
+
+ USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES = conf.getBoolean("atlas.use.index.query.to.find.entity.by.unique.attributes", USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES);
+ } catch (Exception excp) {
+ LOG.error("Error reading configuration", excp);
+ } finally {
+ LOG.info("atlas.use.index.query.to.find.entity.by.unique.attributes=" + USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES);
+ }
+ }
public static String getTypeDefPropertyKey(AtlasBaseTypeDef typeDef) {
return getTypeDefPropertyKey(typeDef.getName());
@@ -217,13 +235,22 @@ public class AtlasGraphUtilsV1 {
continue;
}
- vertex = AtlasGraphUtilsV1.findByTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue);
+ if (canUseIndexQuery(entityType, attribute.getName())) {
+ vertex = AtlasGraphUtilsV1.getAtlasVertexFromIndexQuery(entityType, attribute, attrValue);
+ } else {
+ vertex = AtlasGraphUtilsV1.findByTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue);
- if (vertex == null) {
- vertex = AtlasGraphUtilsV1.findBySuperTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue);
+ if (vertex == null) {
+ vertex = AtlasGraphUtilsV1.findBySuperTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue);
+ }
}
if (vertex != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("findByUniqueAttributes(type={}, attrName={}, attrValue={}: found vertex {}",
+ entityType.getTypeName(), attribute.getName(), attrValue, vertex);
+ }
+
break;
}
}
@@ -366,4 +393,77 @@ public class AtlasGraphUtilsV1 {
public static String getStateAsString(AtlasElement element) {
return element.getProperty(Constants.STATE_PROPERTY_KEY, String.class);
}
+
+ private static boolean canUseIndexQuery(AtlasEntityType entityType, String attributeName) {
+ boolean ret = false;
+
+ if (USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES) {
+ final String typeAndSubTypesQryStr = entityType.getTypeAndAllSubTypesQryStr();
+
+ ret = typeAndSubTypesQryStr.length() <= SearchProcessor.MAX_QUERY_STR_LENGTH_TYPES;
+
+ if (ret) {
+ Set<String> indexSet = AtlasGraphProvider.getGraphInstance().getVertexIndexKeys();
+ try {
+ ret = indexSet.contains(entityType.getQualifiedAttributeName(attributeName));
+ }
+ catch (AtlasBaseException ex) {
+ ret = false;
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ private static AtlasVertex getAtlasVertexFromIndexQuery(AtlasEntityType entityType, AtlasAttribute attribute, Object attrVal) {
+ String propertyName = attribute.getVertexPropertyName();
+ AtlasIndexQuery query = getIndexQuery(entityType, propertyName, attrVal.toString());
+
+ for (Iterator<AtlasIndexQuery.Result> iter = query.vertices(); iter.hasNext(); ) {
+ AtlasIndexQuery.Result result = iter.next();
+ AtlasVertex vertex = result.getVertex();
+
+ // skip non-entity vertices, if any got returned
+ if (vertex == null || !vertex.getPropertyKeys().contains(Constants.GUID_PROPERTY_KEY)) {
+ continue;
+ }
+
+ // verify the typeName
+ String typeNameInVertex = getTypeName(vertex);
+
+ if (!entityType.getTypeAndAllSubTypes().contains(typeNameInVertex)) {
+ LOG.warn("incorrect vertex type from index-query: expected='{}'; found='{}'", entityType.getTypeName(), typeNameInVertex);
+
+ continue;
+ }
+
+ if (attrVal.getClass() == String.class) {
+ String s = (String) attrVal;
+ String vertexVal = vertex.getProperty(propertyName, String.class);
+
+ if (!s.equalsIgnoreCase(vertexVal)) {
+ LOG.warn("incorrect match from index-query for property {}: expected='{}'; found='{}'", propertyName, s, vertexVal);
+
+ continue;
+ }
+ }
+
+ return vertex;
+ }
+
+ return null;
+ }
+
+ private static AtlasIndexQuery getIndexQuery(AtlasEntityType entityType, String propertyName, String value) {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("v.\"").append(Constants.TYPE_NAME_PROPERTY_KEY).append("\":").append(entityType.getTypeAndAllSubTypesQryStr())
+ .append(" AND ")
+ .append("v.\"").append(propertyName).append("\":").append(AtlasAttribute.escapeIndexQueryValue(value))
+ .append(" AND ")
+ .append("v.\"").append(Constants.STATE_PROPERTY_KEY).append("\":ACTIVE");
+
+ return AtlasGraphProvider.getGraphInstance().indexQuery(Constants.VERTEX_INDEX, sb.toString());
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
index 8120aaa..f372891 100755
--- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
@@ -474,7 +474,6 @@ public class GraphBackedMetadataRepositoryTest {
return guid;
}
- @GraphTransaction
AtlasVertex getTableEntityVertex() {
AtlasGraph graph = TestUtils.getGraph();
AtlasGraphQuery query = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, ComparisionOperator.EQUAL, TestUtils.TABLE_TYPE);
@@ -651,6 +650,7 @@ public class GraphBackedMetadataRepositoryTest {
}
@Test(dependsOnMethods = "testCreateEntity")
+ @GraphTransaction
public void testGetIdFromVertex() throws Exception {
AtlasVertex tableVertex = getTableEntityVertex();
@@ -664,6 +664,7 @@ public class GraphBackedMetadataRepositoryTest {
}
@Test(dependsOnMethods = "testCreateEntity")
+ @GraphTransaction
public void testGetTypeName() throws Exception {
AtlasVertex tableVertex = getTableEntityVertex();
Assert.assertEquals(GraphHelper.getTypeName(tableVertex), TestUtils.TABLE_TYPE);
http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
index 9331e35..62ef21c 100644
--- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
@@ -41,6 +41,7 @@ import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.services.MetadataService;
import org.apache.atlas.store.AtlasTypeDefStore;
@@ -129,7 +130,11 @@ public abstract class AtlasDeleteHandlerV1Test {
ImmutableList.<AtlasClassificationDef>of(),
ImmutableList.of(mapValueDef, mapOwnerDef));
- typeDefStore.createTypesDef(typesDef);
+ AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typesDef, typeRegistry);
+
+ if (!typesToCreate.isEmpty()) {
+ typeDefStore.createTypesDef(typesToCreate);
+ }
compositeMapOwnerType = typeRegistry.getEntityTypeByName("CompositeMapOwner");
compositeMapValueType = typeRegistry.getEntityTypeByName("CompositeMapValue");
http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
index 3ebda0d..263ad5b 100644
--- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1Test.java
@@ -30,6 +30,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.store.AtlasTypeDefStore;
@@ -97,8 +98,13 @@ public abstract class AtlasRelationshipStoreV1Test {
}
init();
- AtlasTypesDef testTypes = getInverseReferenceTestTypes();
- typeDefStore.createTypesDef(testTypes);
+ AtlasTypesDef typesDef = getInverseReferenceTestTypes();
+
+ AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typesDef, typeRegistry);
+
+ if (!typesToCreate.isEmpty()) {
+ typeDefStore.createTypesDef(typesToCreate);
+ }
}
@BeforeTest
http://git-wip-us.apache.org/repos/asf/atlas/blob/6fb2a038/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 51276d3..b8255b3 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -42,6 +42,7 @@ import org.apache.atlas.service.Service;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.DateTimeHelper;
@@ -71,6 +72,7 @@ import static org.apache.atlas.AtlasClientV2.*;
@Order(4)
public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
+ private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
private static final String LOCALHOST = "localhost";
private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
@@ -236,113 +238,124 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
@VisibleForTesting
void handleMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) throws AtlasServiceException, AtlasException {
+ AtlasPerfTracer perf = null;
+
HookNotificationMessage message = kafkaMsg.getMessage();
String messageUser = message.getUser();
- // Used for intermediate conversions during create and update
- AtlasEntity.AtlasEntitiesWithExtInfo entities;
- for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
- }
- try {
- RequestContext requestContext = RequestContext.createContext();
- requestContext.setUser(messageUser);
- switch (message.getType()) {
- case ENTITY_CREATE:
- EntityCreateRequest createRequest = (EntityCreateRequest) message;
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name());
+ }
- if (numRetries == 0) { // audit only on the first attempt
- audit(messageUser, CREATE_ENTITY.getMethod(), CREATE_ENTITY.getPath());
- }
+ try {
+ // Used for intermediate conversions during create and update
+ AtlasEntity.AtlasEntitiesWithExtInfo entities;
+ for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries);
+ }
+ try {
+ RequestContext requestContext = RequestContext.createContext();
+ requestContext.setUser(messageUser);
- entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
+ switch (message.getType()) {
+ case ENTITY_CREATE:
+ EntityCreateRequest createRequest = (EntityCreateRequest) message;
- atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
- break;
+ if (numRetries == 0) { // audit only on the first attempt
+ audit(messageUser, CREATE_ENTITY.getMethod(), CREATE_ENTITY.getPath());
+ }
- case ENTITY_PARTIAL_UPDATE:
- final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message;
+ entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
- if (numRetries == 0) { // audit only on the first attempt
- audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
- String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName()));
- }
+ atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
+ break;
- Referenceable referenceable = partialUpdateRequest.getEntity();
- entities = instanceConverter.toAtlasEntity(referenceable);
+ case ENTITY_PARTIAL_UPDATE:
+ final EntityPartialUpdateRequest partialUpdateRequest = (EntityPartialUpdateRequest) message;
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
- String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>() {
- {
- put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
+ if (numRetries == 0) { // audit only on the first attempt
+ audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
+ String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName()));
}
- });
- // There should only be one root entity
- entities.getEntities().get(0).setGuid(guid);
+ Referenceable referenceable = partialUpdateRequest.getEntity();
+ entities = instanceConverter.toAtlasEntity(referenceable);
- atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true);
- break;
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
+ String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>() {
+ {
+ put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
+ }
+ });
- case ENTITY_DELETE:
- final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message;
+ // There should only be one root entity
+ entities.getEntities().get(0).setGuid(guid);
- if (numRetries == 0) { // audit only on the first attempt
- audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
- String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName()));
- }
+ atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true);
+ break;
- try {
- AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
- atlasEntityStore.deleteByUniqueAttributes(type,
- new HashMap<String, Object>() {{
- put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue());
- }});
- } catch (ClassCastException cle) {
- LOG.error("Failed to do a partial update on Entity");
- }
- break;
+ case ENTITY_DELETE:
+ final EntityDeleteRequest deleteRequest = (EntityDeleteRequest) message;
- case ENTITY_FULL_UPDATE:
- EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
+ if (numRetries == 0) { // audit only on the first attempt
+ audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
+ String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName()));
+ }
- if (numRetries == 0) { // audit only on the first attempt
- audit(messageUser, UPDATE_ENTITY.getMethod(), UPDATE_ENTITY.getPath());
- }
+ try {
+ AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
+ atlasEntityStore.deleteByUniqueAttributes(type,
+ new HashMap<String, Object>() {{
+ put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue());
+ }});
+ } catch (ClassCastException cle) {
+ LOG.error("Failed to do a partial update on Entity");
+ }
+ break;
- entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
- atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
- break;
+ case ENTITY_FULL_UPDATE:
+ EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
- default:
- throw new IllegalStateException("Unknown notification type: " + message.getType().name());
- }
+ if (numRetries == 0) { // audit only on the first attempt
+ audit(messageUser, UPDATE_ENTITY.getMethod(), UPDATE_ENTITY.getPath());
+ }
- break;
- } catch (Throwable e) {
- LOG.warn("Error handling message", e);
- try {
- LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
- Thread.sleep(consumerRetryInterval);
- } catch (InterruptedException ie) {
- LOG.error("Notification consumer thread sleep interrupted");
- }
+ entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
+ atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
+ break;
+
+ default:
+ throw new IllegalStateException("Unknown notification type: " + message.getType().name());
+ }
- if (numRetries == (maxRetries - 1)) {
- LOG.warn("Max retries exceeded for message {}", message, e);
- failedMessages.add(message);
- if (failedMessages.size() >= failedMsgCacheSize) {
- recordFailedMessages();
+ break;
+ } catch (Throwable e) {
+ LOG.warn("Error handling message", e);
+ try {
+ LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
+ Thread.sleep(consumerRetryInterval);
+ } catch (InterruptedException ie) {
+ LOG.error("Notification consumer thread sleep interrupted");
+ }
+
+ if (numRetries == (maxRetries - 1)) {
+ LOG.warn("Max retries exceeded for message {}", message, e);
+ failedMessages.add(message);
+ if (failedMessages.size() >= failedMsgCacheSize) {
+ recordFailedMessages();
+ }
+ return;
}
- return;
+ } finally {
+ RequestContext.clear();
+ RequestContextV1.clear();
}
- } finally {
- RequestContext.clear();
- RequestContextV1.clear();
}
+ commit(kafkaMsg);
+ } finally {
+ AtlasPerfTracer.log(perf);
}
- commit(kafkaMsg);
}
private void recordFailedMessages() {