You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/02/18 00:14:31 UTC

incubator-atlas git commit: ATLAS-1499: Notification processing using V2 Store (#2) - fixes in handling of partial-update notifications

Repository: incubator-atlas
Updated Branches:
  refs/heads/master 1d85e95fa -> ce54e8a4d


ATLAS-1499: Notification processing using V2 Store (#2) - fixes in handling of partial-update notifications

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/ce54e8a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/ce54e8a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/ce54e8a4

Branch: refs/heads/master
Commit: ce54e8a4de04f410c3129ebe1dafc6864e1aa98c
Parents: 1d85e95
Author: apoorvnaik <an...@hortonworks.com>
Authored: Mon Feb 13 10:24:49 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Feb 17 16:13:48 2017 -0800

----------------------------------------------------------------------
 .../converters/AtlasInstanceConverter.java      |  8 +++---
 .../store/graph/v1/AtlasEntityStoreV1.java      |  6 ++---
 .../store/graph/v1/AtlasGraphUtilsV1.java       |  5 ++++
 .../notification/NotificationHookConsumer.java  | 27 +++++++++++++-------
 .../NotificationHookConsumerKafkaTest.java      |  2 +-
 .../NotificationHookConsumerTest.java           |  2 +-
 6 files changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
index 95dcc7a..9d475bf 100644
--- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
@@ -125,7 +125,7 @@ public class AtlasInstanceConverter {
         return ret;
     }
 
-    public AtlasEntity.AtlasEntitiesWithExtInfo getAtlasEntity(IReferenceableInstance referenceable) throws AtlasBaseException {
+    public AtlasEntity.AtlasEntitiesWithExtInfo toAtlasEntity(IReferenceableInstance referenceable) throws AtlasBaseException {
 
         AtlasEntityFormatConverter converter  = (AtlasEntityFormatConverter) instanceFormatters.getConverter(TypeCategory.ENTITY);
         AtlasEntityType      entityType = typeRegistry.getEntityTypeByName(referenceable.getTypeName());
@@ -187,9 +187,9 @@ public class AtlasInstanceConverter {
         return new AtlasBaseException(e);
     }
 
-    public AtlasEntity.AtlasEntitiesWithExtInfo getEntities(List<Referenceable> referenceables) throws AtlasBaseException {
+    public AtlasEntity.AtlasEntitiesWithExtInfo toAtlasEntities(List<Referenceable> referenceables) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> getEntities");
+            LOG.debug("==> toAtlasEntities");
         }
 
         AtlasFormatConverter.ConverterContext context = new AtlasFormatConverter.ConverterContext();
@@ -199,7 +199,7 @@ public class AtlasInstanceConverter {
             context.addEntity(entity);
         }
         if (LOG.isDebugEnabled()) {
-            LOG.debug("<== getEntities");
+            LOG.debug("<== toAtlasEntities");
         }
 
         return context.getEntities();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 4684bfe..31a5e8c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -224,9 +224,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entity to update.");
         }
 
-        AtlasVertex entityVertex = AtlasGraphUtilsV1.getVertexByUniqueAttributes(entityType, uniqAttributes);
+        String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, uniqAttributes);
 
-        updatedEntity.setGuid(AtlasGraphUtilsV1.getIdFromVertex(entityVertex));
+        updatedEntity.setGuid(guid);
 
         return createOrUpdate(new AtlasEntityStream(updatedEntity), true);
     }
@@ -249,7 +249,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
             }
         }
 
-        Collection<AtlasVertex> deletionCandidates = new ArrayList<AtlasVertex>();
+        Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
         deletionCandidates.add(vertex);
 
         return deleteVertices(deletionCandidates);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
index efc50d3..49d5a08 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
@@ -190,6 +190,11 @@ public class AtlasGraphUtilsV1 {
         return vertex;
     }
 
+    public static String getGuidByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) throws AtlasBaseException {
+        AtlasVertex vertexByUniqueAttributes = getVertexByUniqueAttributes(entityType, attrValues);
+        return getIdFromVertex(vertexByUniqueAttributes);
+    }
+
     public static AtlasVertex findByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) {
         AtlasVertex vertex = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 891d7ac..c16fd66 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -29,9 +29,11 @@ import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
+import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
 import org.apache.atlas.service.Service;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
@@ -45,7 +47,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -54,7 +55,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.atlas.notification.hook.HookNotification.*;
+import static org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
+import static org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
+import static org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
+import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
 
 /**
  * Consumer of notifications from hooks e.g., hive hook etc.
@@ -249,7 +253,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                             EntityCreateRequest createRequest = (EntityCreateRequest) message;
                             audit(messageUser, AtlasClient.API.CREATE_ENTITY);
 
-                            entities = instanceConverter.getEntities(createRequest.getEntities());
+                            entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
 
                             atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
                             break;
@@ -262,11 +266,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                             audit(messageUser, AtlasClient.API.UPDATE_ENTITY_PARTIAL);
 
                             Referenceable referenceable = partialUpdateRequest.getEntity();
-                            entities = instanceConverter.getEntities(Collections.singletonList(referenceable));
-                            // There should only be one root entity after the conversion
-                            AtlasEntity entity = entities.getEntities().get(0);
-                            // Need to set the attributes explicitly here as the qualified name might have changed during update
-                            entity.setAttribute(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
+                            entities = instanceConverter.toAtlasEntity(referenceable);
+
+                            AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
+                            String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>(){
+                                { put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()); }
+                            });
+
+                            // There should only be one root entity
+                            entities.getEntities().get(0).setGuid(guid);
+
                             atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true);
                             break;
 
@@ -293,7 +302,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                             EntityUpdateRequest updateRequest = (EntityUpdateRequest) message;
                             audit(messageUser, AtlasClient.API.UPDATE_ENTITY);
 
-                            entities = instanceConverter.getEntities(updateRequest.getEntities());
+                            entities = instanceConverter.toAtlasEntities(updateRequest.getEntities());
                             atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
                             break;
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 13747b2..e744e2e 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -77,7 +77,7 @@ public class NotificationHookConsumerKafkaTest {
         AtlasType mockType = mock(AtlasType.class);
         when(typeRegistry.getType(anyString())).thenReturn(mockType);
         AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
-        when(instanceConverter.getEntities(anyList())).thenReturn(mockEntity);
+        when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
         kafkaNotification = startKafkaServer();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index b86c693..bdb60a2 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -75,7 +75,7 @@ public class NotificationHookConsumerTest {
         AtlasType mockType = mock(AtlasType.class);
         when(typeRegistry.getType(anyString())).thenReturn(mockType);
         AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
-        when(instanceConverter.getEntities(anyList())).thenReturn(mockEntity);
+        when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
         EntityMutationResponse mutationResponse = mock(EntityMutationResponse.class);
         when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenReturn(mutationResponse);
     }