You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/11/07 08:16:10 UTC
atlas git commit: ATLAS-2251: notification module updates (#3)
Repository: atlas
Updated Branches:
refs/heads/ATLAS-2251 48daa7cb5 -> 84f1349df
ATLAS-2251: notification module updates (#3)
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/84f1349d
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/84f1349d
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/84f1349d
Branch: refs/heads/ATLAS-2251
Commit: 84f1349dfabfbb0795c0d25ba2182e2f8b4a0343
Parents: 48daa7c
Author: Madhan Neethiraj <ma...@apache.org>
Authored: Mon Nov 6 22:10:50 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Tue Nov 7 00:15:37 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/atlas/type/AtlasType.java | 4 --
.../apache/atlas/type/AtlasTypeRegistry.java | 4 +-
.../apache/atlas/v1/model/instance/Struct.java | 2 +
.../entity/EntityMessageDeserializer.java | 11 ++++
.../hook/HookMessageDeserializer.java | 18 ++++--
.../notification/hook/HookNotificationTest.java | 23 +++----
.../apache/atlas/GraphTransactionAdvisor.java | 63 ++++++++++++++++++++
.../converters/AtlasInstanceConverter.java | 34 ++++++++++-
.../graph/v1/AtlasEntityChangeNotifier.java | 11 +---
.../store/graph/v1/AtlasGraphUtilsV1.java | 2 +-
10 files changed, 138 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/84f1349d/intg/src/main/java/org/apache/atlas/type/AtlasType.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasType.java b/intg/src/main/java/org/apache/atlas/type/AtlasType.java
index 62b1df5..63d2a9d 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasType.java
@@ -239,10 +239,6 @@ public abstract class AtlasType {
ret = mapper.readValue(root, EntityDeleteRequest.class);
break;
}
-
- if (ret != null) {
- ret.normalize();
- }
}
return ret;
http://git-wip-us.apache.org/repos/asf/atlas/blob/84f1349d/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
index bd4b0e9..5f55b43 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
@@ -846,8 +846,8 @@ public class AtlasTypeRegistry {
boolean alreadyLockedByCurrentThread = typeRegistryUpdateLock.isHeldByCurrentThread();
if (!alreadyLockedByCurrentThread) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("lockTypeRegistryForUpdate(): waiting for lock to be released by thread {}", lockedByThread);
+ if (lockedByThread != null) {
+ LOG.info("lockTypeRegistryForUpdate(): waiting for lock to be released by thread {}", lockedByThread);
}
} else {
LOG.warn("lockTypeRegistryForUpdate(): already locked. currentLockCount={}",
http://git-wip-us.apache.org/repos/asf/atlas/blob/84f1349d/intg/src/main/java/org/apache/atlas/v1/model/instance/Struct.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/v1/model/instance/Struct.java b/intg/src/main/java/org/apache/atlas/v1/model/instance/Struct.java
index 48d4389..5f61f6c 100644
--- a/intg/src/main/java/org/apache/atlas/v1/model/instance/Struct.java
+++ b/intg/src/main/java/org/apache/atlas/v1/model/instance/Struct.java
@@ -161,6 +161,8 @@ public class Struct implements Serializable {
for (Object val : listValue) {
normalizedValue.add(normalizeAttributeValue(val));
}
+
+ value = normalizedValue;
}
return value;
http://git-wip-us.apache.org/repos/asf/atlas/blob/84f1349d/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
index 526aa93..f1e1992 100644
--- a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
@@ -47,4 +47,15 @@ public class EntityMessageDeserializer extends AbstractMessageDeserializer<Entit
new TypeReference<AtlasNotificationMessage<EntityNotification>>() {},
AbstractNotification.CURRENT_MESSAGE_VERSION, NOTIFICATION_LOGGER);
}
+
+ @Override
+ public EntityNotification deserialize(String messageJson) {
+ final EntityNotification ret = super.deserialize(messageJson);
+
+ if (ret != null) {
+ ret.normalize();
+ }
+
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/84f1349d/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
index 1b337d4..6dff821 100644
--- a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
@@ -21,7 +21,7 @@ package org.apache.atlas.notification.hook;
import org.apache.atlas.model.notification.AtlasNotificationMessage;
import org.apache.atlas.notification.AbstractMessageDeserializer;
import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.v1.model.notification.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage;
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
/**
* Hook notification message deserializer.
*/
-public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNotification.HookNotificationMessage> {
+public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNotificationMessage> {
/**
* Logger for hook notification messages.
@@ -45,11 +45,19 @@ public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNot
* Create a hook notification message deserializer.
*/
public HookMessageDeserializer() {
- super(new TypeReference<HookNotification.HookNotificationMessage>() {},
- new TypeReference<AtlasNotificationMessage<HookNotification.HookNotificationMessage>>() {},
+ super(new TypeReference<HookNotificationMessage>() {},
+ new TypeReference<AtlasNotificationMessage<HookNotificationMessage>>() {},
AbstractNotification.CURRENT_MESSAGE_VERSION, NOTIFICATION_LOGGER);
}
+ @Override
+ public HookNotificationMessage deserialize(String messageJson) {
+ final HookNotificationMessage ret = super.deserialize(messageJson);
- // ----- helper methods --------------------------------------------------
+ if (ret != null) {
+ ret.normalize();
+ }
+
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/84f1349d/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
index 3c87377..a8d4926 100644
--- a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java
@@ -19,13 +19,16 @@ package org.apache.atlas.notification.hook;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.type.AtlasType;
-import org.apache.atlas.v1.model.notification.HookNotification;
+import org.apache.atlas.v1.model.notification.HookNotification.EntityCreateRequest;
+import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage;
+import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationType;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
+
public class HookNotificationTest {
+ private HookMessageDeserializer deserializer = new HookMessageDeserializer();
@Test
public void testNewMessageSerDe() throws Exception {
@@ -34,15 +37,15 @@ public class HookNotificationTest {
entity1.set("complex", new Referenceable("othertype"));
Referenceable entity2 = new Referenceable("newtype");
String user = "user";
- HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(user, entity1, entity2);
+ EntityCreateRequest request = new EntityCreateRequest(user, entity1, entity2);
String notificationJson = AtlasType.toV1Json(request);
- HookNotification.HookNotificationMessage actualNotification = AtlasType.fromV1Json(notificationJson, HookNotification.HookNotificationMessage.class);
+ HookNotificationMessage actualNotification = deserializer.deserialize(notificationJson);
- assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
+ assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_CREATE);
assertEquals(actualNotification.getUser(), user);
- HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) actualNotification;
+ EntityCreateRequest createRequest = (EntityCreateRequest) actualNotification;
assertEquals(createRequest.getEntities().size(), 2);
Referenceable actualEntity1 = createRequest.getEntities().get(0);
@@ -56,7 +59,7 @@ public class HookNotificationTest {
//Code to generate the json, use it for hard-coded json used later in this test
Referenceable entity = new Referenceable("sometype");
entity.set("attr", "value");
- HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(null, entity);
+ EntityCreateRequest request = new EntityCreateRequest(null, entity);
String notificationJsonFromCode = AtlasType.toV1Json(request);
System.out.println(notificationJsonFromCode);
@@ -85,9 +88,9 @@ public class HookNotificationTest {
+ "}";
- HookNotification.HookNotificationMessage actualNotification = AtlasType.fromV1Json(notificationJson, HookNotification.HookNotificationMessage.class);
+ HookNotificationMessage actualNotification = deserializer.deserialize(notificationJson);
- assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE);
- assertEquals(actualNotification.getUser(), HookNotification.HookNotificationMessage.UNKNOW_USER);
+ assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_CREATE);
+ assertEquals(actualNotification.getUser(), HookNotificationMessage.UNKNOW_USER);
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/84f1349d/repository/src/main/java/org/apache/atlas/GraphTransactionAdvisor.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionAdvisor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionAdvisor.java
new file mode 100644
index 0000000..9751a87
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionAdvisor.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas;
+
+import org.aopalliance.aop.Advice;
+import org.apache.atlas.annotation.GraphTransaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.aop.Pointcut;
+import org.springframework.aop.support.AbstractPointcutAdvisor;
+import org.springframework.aop.support.StaticMethodMatcherPointcut;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.lang.reflect.Method;
+
+@Component
+public class GraphTransactionAdvisor extends AbstractPointcutAdvisor {
+ private static final Logger LOG = LoggerFactory.getLogger(GraphTransactionAdvisor.class);
+
+ private final StaticMethodMatcherPointcut pointcut = new StaticMethodMatcherPointcut() {
+ @Override
+ public boolean matches(Method method, Class<?> targetClass) {
+ boolean annotationPresent = method.isAnnotationPresent(GraphTransaction.class);
+ if (annotationPresent) {
+ LOG.info("GraphTransaction intercept for {}.{}", targetClass.getName(), method.getName());
+ }
+ return annotationPresent;
+ }
+ };
+
+ private final GraphTransactionInterceptor interceptor;
+
+ @Inject
+ public GraphTransactionAdvisor(GraphTransactionInterceptor interceptor) {
+ this.interceptor = interceptor;
+ }
+
+ @Override
+ public Pointcut getPointcut() {
+ return pointcut;
+ }
+
+ @Override
+ public Advice getAdvice() {
+ return interceptor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/84f1349d/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
index 87448ee..2884f8f 100644
--- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.converters;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.CreateUpdateEntitiesResult;
+import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
@@ -30,6 +31,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
import org.apache.atlas.model.instance.GuidMapping;
import org.apache.atlas.model.legacy.EntityResult;
+import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.repository.converters.AtlasFormatConverter.ConverterContext;
@@ -59,11 +61,13 @@ public class AtlasInstanceConverter {
private final AtlasTypeRegistry typeRegistry;
private final AtlasFormatConverters instanceFormatters;
+ private final EntityGraphRetriever entityGraphRetriever;
@Inject
public AtlasInstanceConverter(AtlasTypeRegistry typeRegistry, AtlasFormatConverters instanceFormatters) {
- this.typeRegistry = typeRegistry;
- this.instanceFormatters = instanceFormatters;
+ this.typeRegistry = typeRegistry;
+ this.instanceFormatters = instanceFormatters;
+ this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
}
public Referenceable[] getReferenceables(Collection<AtlasEntity> entities) throws AtlasBaseException {
@@ -87,6 +91,12 @@ public class AtlasInstanceConverter {
return getReferenceable(entity, new ConverterContext());
}
+ public Referenceable getReferenceable(String guid) throws AtlasBaseException {
+ AtlasEntity.AtlasEntityWithExtInfo entity = getAndCacheEntity(guid);
+
+ return getReferenceable(entity);
+ }
+
public Referenceable getReferenceable(AtlasEntity.AtlasEntityWithExtInfo entity) throws AtlasBaseException {
AtlasFormatConverter.ConverterContext ctx = new AtlasFormatConverter.ConverterContext();
@@ -278,4 +288,24 @@ public class AtlasInstanceConverter {
return ret;
}
+
+
+ private AtlasEntity.AtlasEntityWithExtInfo getAndCacheEntity(String guid) throws AtlasBaseException {
+ RequestContextV1 context = RequestContextV1.get();
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = context.getInstanceV2(guid);
+
+ if (entityWithExtInfo == null) {
+ entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
+
+ if (entityWithExtInfo != null) {
+ context.cache(entityWithExtInfo);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cache miss -> GUID = {}", guid);
+ }
+ }
+ }
+
+ return entityWithExtInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/84f1349d/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
index 1d5a6ac..4c511c1 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
@@ -18,6 +18,7 @@
package org.apache.atlas.repository.store.graph.v1;
+import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.EntityChangeListener;
@@ -97,7 +98,6 @@ public class AtlasEntityChangeNotifier {
return;
}
- /* TODO:
for (EntityChangeListener listener : entityChangeListeners) {
try {
listener.onTraitsAdded(entity, traits);
@@ -105,7 +105,6 @@ public class AtlasEntityChangeNotifier {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitAdd");
}
}
- */
}
public void onClassificationDeletedFromEntity(String entityId, List<String> traitNames) throws AtlasBaseException {
@@ -118,7 +117,6 @@ public class AtlasEntityChangeNotifier {
return;
}
- /* TODO:
for (EntityChangeListener listener : entityChangeListeners) {
try {
listener.onTraitsDeleted(entity, traitNames);
@@ -126,7 +124,6 @@ public class AtlasEntityChangeNotifier {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitDelete");
}
}
- */
}
public void onClassificationUpdatedToEntity(String entityId, List<AtlasClassification> classifications) throws AtlasBaseException {
@@ -140,7 +137,6 @@ public class AtlasEntityChangeNotifier {
return;
}
- /* TODO:
for (EntityChangeListener listener : entityChangeListeners) {
try {
listener.onTraitsUpdated(entity, traits);
@@ -148,7 +144,6 @@ public class AtlasEntityChangeNotifier {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitUpdate");
}
}
- */
}
private String getListenerName(EntityChangeListener listener) {
@@ -162,7 +157,6 @@ public class AtlasEntityChangeNotifier {
List<Referenceable> typedRefInsts = toReferenceables(entityHeaders);
- /* TODO:
for (EntityChangeListener listener : entityChangeListeners) {
try {
switch (operation) {
@@ -181,7 +175,6 @@ public class AtlasEntityChangeNotifier {
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), operation.toString());
}
}
- */
}
private List<Referenceable> toReferenceables(List<AtlasEntityHeader> entityHeaders) throws AtlasBaseException {
@@ -197,11 +190,9 @@ public class AtlasEntityChangeNotifier {
private Referenceable toReferenceable(String entityId) throws AtlasBaseException {
Referenceable ret = null;
- /* TODO:
if (StringUtils.isNotEmpty(entityId)) {
ret = instanceConverter.getReferenceable(entityId);
}
- */
return ret;
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/84f1349d/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
index 583fc6f..1eb4183 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
@@ -58,7 +58,7 @@ public class AtlasGraphUtilsV1 {
public static final String PROPERTY_PREFIX = Constants.INTERNAL_PROPERTY_KEY_PREFIX + "type.";
public static final String SUPERTYPE_EDGE_LABEL = PROPERTY_PREFIX + ".supertype";
public static final String ENTITYTYPE_EDGE_LABEL = PROPERTY_PREFIX + ".entitytype";
- public static final String VERTEX_TYPE = "typeRegistry";
+ public static final String VERTEX_TYPE = "typeSystem";
public static final String RELATIONSHIPTYPE_EDGE_LABEL = PROPERTY_PREFIX + ".relationshipType";
private static boolean USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES = false;