You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/02/18 00:14:31 UTC
incubator-atlas git commit: ATLAS-1499: Notification processing using
V2 Store (#2) - fixes in handling of partial-update notifications
Repository: incubator-atlas
Updated Branches:
refs/heads/master 1d85e95fa -> ce54e8a4d
ATLAS-1499: Notification processing using V2 Store (#2) - fixes in handling of partial-update notifications
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/ce54e8a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/ce54e8a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/ce54e8a4
Branch: refs/heads/master
Commit: ce54e8a4de04f410c3129ebe1dafc6864e1aa98c
Parents: 1d85e95
Author: apoorvnaik <an...@hortonworks.com>
Authored: Mon Feb 13 10:24:49 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Feb 17 16:13:48 2017 -0800
----------------------------------------------------------------------
.../converters/AtlasInstanceConverter.java | 8 +++---
.../store/graph/v1/AtlasEntityStoreV1.java | 6 ++---
.../store/graph/v1/AtlasGraphUtilsV1.java | 5 ++++
.../notification/NotificationHookConsumer.java | 27 +++++++++++++-------
.../NotificationHookConsumerKafkaTest.java | 2 +-
.../NotificationHookConsumerTest.java | 2 +-
6 files changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
index 95dcc7a..9d475bf 100644
--- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
@@ -125,7 +125,7 @@ public class AtlasInstanceConverter {
return ret;
}
- public AtlasEntity.AtlasEntitiesWithExtInfo getAtlasEntity(IReferenceableInstance referenceable) throws AtlasBaseException {
+ public AtlasEntity.AtlasEntitiesWithExtInfo toAtlasEntity(IReferenceableInstance referenceable) throws AtlasBaseException {
AtlasEntityFormatConverter converter = (AtlasEntityFormatConverter) instanceFormatters.getConverter(TypeCategory.ENTITY);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(referenceable.getTypeName());
@@ -187,9 +187,9 @@ public class AtlasInstanceConverter {
return new AtlasBaseException(e);
}
- public AtlasEntity.AtlasEntitiesWithExtInfo getEntities(List<Referenceable> referenceables) throws AtlasBaseException {
+ public AtlasEntity.AtlasEntitiesWithExtInfo toAtlasEntities(List<Referenceable> referenceables) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> getEntities");
+ LOG.debug("==> toAtlasEntities");
}
AtlasFormatConverter.ConverterContext context = new AtlasFormatConverter.ConverterContext();
@@ -199,7 +199,7 @@ public class AtlasInstanceConverter {
context.addEntity(entity);
}
if (LOG.isDebugEnabled()) {
- LOG.debug("<== getEntities");
+ LOG.debug("<== toAtlasEntities");
}
return context.getEntities();
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 4684bfe..31a5e8c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -224,9 +224,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entity to update.");
}
- AtlasVertex entityVertex = AtlasGraphUtilsV1.getVertexByUniqueAttributes(entityType, uniqAttributes);
+ String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, uniqAttributes);
- updatedEntity.setGuid(AtlasGraphUtilsV1.getIdFromVertex(entityVertex));
+ updatedEntity.setGuid(guid);
return createOrUpdate(new AtlasEntityStream(updatedEntity), true);
}
@@ -249,7 +249,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
}
}
- Collection<AtlasVertex> deletionCandidates = new ArrayList<AtlasVertex>();
+ Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
deletionCandidates.add(vertex);
return deleteVertices(deletionCandidates);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
index efc50d3..49d5a08 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
@@ -190,6 +190,11 @@ public class AtlasGraphUtilsV1 {
return vertex;
}
+ public static String getGuidByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) throws AtlasBaseException {
+ AtlasVertex vertexByUniqueAttributes = getVertexByUniqueAttributes(entityType, attrValues);
+ return getIdFromVertex(vertexByUniqueAttributes);
+ }
+
public static AtlasVertex findByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) {
AtlasVertex vertex = null;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 891d7ac..c16fd66 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -29,9 +29,11 @@ import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
+import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
import org.apache.atlas.service.Service;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
@@ -45,7 +47,6 @@ import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -54,7 +55,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.atlas.notification.hook.HookNotification.*;
+import static org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
+import static org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
+import static org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
+import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
/**
* Consumer of notifications from hooks e.g., hive hook etc.
@@ -249,7 +253,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
EntityCreateRequest createRequest = (EntityCreateRequest) message;
audit(messageUser, AtlasClient.API.CREATE_ENTITY);
- entities = instanceConverter.getEntities(createRequest.getEntities());
+ entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
break;
@@ -262,11 +266,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
audit(messageUser, AtlasClient.API.UPDATE_ENTITY_PARTIAL);
Referenceable referenceable = partialUpdateRequest.getEntity();
- entities = instanceConverter.getEntities(Collections.singletonList(referenceable));
- // There should only be one root entity after the conversion
- AtlasEntity entity = entities.getEntities().get(0);
- // Need to set the attributes explicitly here as the qualified name might have changed during update
- entity.setAttribute(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
+ entities = instanceConverter.toAtlasEntity(referenceable);
+
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
+ String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>(){
+ { put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()); }
+ });
+
+ // There should only be one root entity
+ entities.getEntities().get(0).setGuid(guid);
+
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true);
break;
@@ -293,7 +302,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
audit(messageUser, AtlasClient.API.UPDATE_ENTITY);
- entities = instanceConverter.getEntities(updateRequest.getEntities());
+ entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
break;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 13747b2..e744e2e 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -77,7 +77,7 @@ public class NotificationHookConsumerKafkaTest {
AtlasType mockType = mock(AtlasType.class);
when(typeRegistry.getType(anyString())).thenReturn(mockType);
AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
- when(instanceConverter.getEntities(anyList())).thenReturn(mockEntity);
+ when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
kafkaNotification = startKafkaServer();
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index b86c693..bdb60a2 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -75,7 +75,7 @@ public class NotificationHookConsumerTest {
AtlasType mockType = mock(AtlasType.class);
when(typeRegistry.getType(anyString())).thenReturn(mockType);
AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
- when(instanceConverter.getEntities(anyList())).thenReturn(mockEntity);
+ when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
EntityMutationResponse mutationResponse = mock(EntityMutationResponse.class);
when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenReturn(mutationResponse);
}