You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/02/18 08:00:07 UTC
incubator-atlas git commit: ATLAS-1563: Entity change listener
invocation in V2 Store
Repository: incubator-atlas
Updated Branches:
refs/heads/master b6b6f9450 -> ea38942ba
ATLAS-1563: Entity change listener invocation in V2 Store
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/ea38942b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/ea38942b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/ea38942b
Branch: refs/heads/master
Commit: ea38942baa73774f35004d36fc1805a2368c2ace
Parents: b6b6f94
Author: apoorvnaik <an...@hortonworks.com>
Authored: Fri Feb 17 17:21:37 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Feb 17 23:58:25 2017 -0800
----------------------------------------------------------------------
.../converters/AtlasInstanceConverter.java | 15 ++-
.../graph/v1/AtlasEntityChangeNotifier.java | 125 +++++++++++++++++++
.../store/graph/v1/AtlasEntityStoreV1.java | 43 +++++--
.../graph/v1/AtlasDeleteHandlerV1Test.java | 8 +-
.../store/graph/v1/AtlasEntityStoreV1Test.java | 5 +-
.../atlas/listener/EntityChangeListener.java | 1 +
.../org/apache/atlas/web/rest/EntityREST.java | 12 +-
7 files changed, 185 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
index 9d475bf..e14fafb 100644
--- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
@@ -76,17 +76,24 @@ public class AtlasInstanceConverter {
Iterator<AtlasEntity> entityIterator = entities.iterator();
for (int i = 0; i < entities.size(); i++) {
- ITypedReferenceableInstance typedInstance = getITypedReferenceable(entityIterator.next(), ctx);
+ ITypedReferenceableInstance typedInstance = getITypedReferenceable(entityIterator.next());
entitiesInOldFormat[i] = typedInstance;
}
return entitiesInOldFormat;
}
- public ITypedReferenceableInstance getITypedReferenceable(AtlasEntity entity, AtlasFormatConverter.ConverterContext ctx) throws AtlasBaseException {
- Referenceable ref = getReferenceable(entity, ctx);
+ public ITypedReferenceableInstance getITypedReferenceable(AtlasEntity entity) throws AtlasBaseException {
+ try {
+ return metadataService.getEntityDefinition(entity.getGuid());
+ } catch (AtlasException e) {
+ LOG.error("Exception while getting a typed reference for the entity ", e);
+ throw toAtlasBaseException(e);
+ }
+ }
+ public ITypedReferenceableInstance getITypedReferenceable(String guid) throws AtlasBaseException {
try {
- return metadataService.getTypedReferenceableInstance(ref);
+ return metadataService.getEntityDefinition(guid);
} catch (AtlasException e) {
LOG.error("Exception while getting a typed reference for the entity ", e);
throw toAtlasBaseException(e);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
new file mode 100644
index 0000000..a532f31
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v1;
+
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
+import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.repository.converters.AtlasInstanceConverter;
+import org.apache.atlas.typesystem.ITypedReferenceableInstance;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.CREATE;
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
+
+
+@Singleton
+public class AtlasEntityChangeNotifier {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class);
+
+ private final Set<EntityChangeListener> entityChangeListeners;
+ private final AtlasInstanceConverter instanceConverter;
+
+ @Inject
+ public AtlasEntityChangeNotifier(Set<EntityChangeListener> entityChangeListeners,
+ AtlasInstanceConverter instanceConverter) {
+ this.entityChangeListeners = entityChangeListeners;
+ this.instanceConverter = instanceConverter;
+ }
+
+ public void onEntitiesMutated(EntityMutationResponse entityMutationResponse) throws AtlasBaseException {
+ if (CollectionUtils.isEmpty(entityChangeListeners) || instanceConverter == null) {
+ return;
+ }
+
+ List<AtlasEntityHeader> createdEntities = entityMutationResponse.getCreatedEntities();
+ List<AtlasEntityHeader> updatedEntities = entityMutationResponse.getUpdatedEntities();
+ List<AtlasEntityHeader> partiallyUpdatedEntities = entityMutationResponse.getPartialUpdatedEntities();
+ List<AtlasEntityHeader> deletedEntities = entityMutationResponse.getDeletedEntities();
+
+ if (CollectionUtils.isNotEmpty(createdEntities)) {
+ List<ITypedReferenceableInstance> typedRefInst = toITypedReferenceable(createdEntities);
+
+ notifyListeners(typedRefInst, EntityOperation.CREATE);
+ }
+
+ if (CollectionUtils.isNotEmpty(updatedEntities)) {
+ List<ITypedReferenceableInstance> typedRefInst = toITypedReferenceable(updatedEntities);
+
+ notifyListeners(typedRefInst, EntityOperation.UPDATE);
+ }
+
+ if (CollectionUtils.isNotEmpty(partiallyUpdatedEntities)) {
+ List<ITypedReferenceableInstance> typedRefInst = toITypedReferenceable(partiallyUpdatedEntities);
+
+ notifyListeners(typedRefInst, EntityOperation.PARTIAL_UPDATE);
+ }
+
+ if (CollectionUtils.isNotEmpty(deletedEntities)) {
+ List<ITypedReferenceableInstance> typedRefInst = toITypedReferenceable(deletedEntities);
+
+ notifyListeners(typedRefInst, EntityOperation.DELETE);
+ }
+ }
+
+ private void notifyListeners(List<ITypedReferenceableInstance> typedRefInsts, EntityOperation operation) throws AtlasBaseException {
+ for (EntityChangeListener listener : entityChangeListeners) {
+ try {
+ switch (operation) {
+ case CREATE:
+ listener.onEntitiesAdded(typedRefInsts);
+ break;
+ case UPDATE:
+ case PARTIAL_UPDATE:
+ listener.onEntitiesUpdated(typedRefInsts);
+ break;
+ case DELETE:
+ listener.onEntitiesDeleted(typedRefInsts);
+ break;
+ }
+ } catch (AtlasException e) {
+ throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, operation.toString());
+ }
+ }
+ }
+
+ private List<ITypedReferenceableInstance> toITypedReferenceable(List<AtlasEntityHeader> entityHeaders) throws AtlasBaseException {
+ List<ITypedReferenceableInstance> ret = new ArrayList<>(entityHeaders.size());
+
+ for (AtlasEntityHeader entityHeader : entityHeaders) {
+ ret.add(instanceConverter.getITypedReferenceable(entityHeader.getGuid()));
+ }
+
+ return ret;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 31a5e8c..8a6501c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -25,9 +25,13 @@ import org.apache.atlas.GraphTransaction;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
-import org.apache.atlas.model.instance.*;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
@@ -49,13 +53,15 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*;
public class AtlasEntityStoreV1 implements AtlasEntityStore {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class);
- private final DeleteHandlerV1 deleteHandler;
- private final AtlasTypeRegistry typeRegistry;
+ private final DeleteHandlerV1 deleteHandler;
+ private final AtlasTypeRegistry typeRegistry;
+ private final AtlasEntityChangeNotifier entityChangeNotifier;
@Inject
- public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry) {
- this.deleteHandler = deleteHandler;
- this.typeRegistry = typeRegistry;
+ public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry, AtlasEntityChangeNotifier entityChangeNotifier) {
+ this.deleteHandler = deleteHandler;
+ this.typeRegistry = typeRegistry;
+ this.entityChangeNotifier = entityChangeNotifier;
}
@Override
@@ -208,6 +214,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
LOG.debug("<== createOrUpdate()");
}
+ // Notify the change listeners
+ entityChangeNotifier.onEntitiesMutated(ret);
+
return ret;
}
@@ -252,7 +261,12 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
deletionCandidates.add(vertex);
- return deleteVertices(deletionCandidates);
+ EntityMutationResponse ret = deleteVertices(deletionCandidates);
+
+ // Notify the change listeners
+ entityChangeNotifier.onEntitiesMutated(ret);
+
+ return ret;
}
@Override
@@ -281,7 +295,13 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
if (deletionCandidates.isEmpty()) {
LOG.info("No deletion candidate entities were found for guids %s", guids);
}
- return deleteVertices(deletionCandidates);
+
+ EntityMutationResponse ret = deleteVertices(deletionCandidates);
+
+ // Notify the change listeners
+ entityChangeNotifier.onEntitiesMutated(ret);
+
+ return ret;
}
@Override
@@ -297,7 +317,12 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
deletionCandidates.add(vertex);
- return deleteVertices(deletionCandidates);
+ EntityMutationResponse ret = deleteVertices(deletionCandidates);
+
+ // Notify the change listeners
+ entityChangeNotifier.onEntitiesMutated(ret);
+
+ return ret;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
index 492abc4..c55e3f7 100644
--- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
@@ -19,7 +19,6 @@ package org.apache.atlas.repository.store.graph.v1;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RepositoryMetadataModule;
import org.apache.atlas.RequestContextV1;
@@ -59,7 +58,6 @@ import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeSystem;
-import org.apache.atlas.util.AtlasRepositoryConfiguration;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -69,7 +67,6 @@ import org.testng.annotations.Test;
import javax.inject.Inject;
-import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -82,6 +79,7 @@ import static org.apache.atlas.TestUtils.COLUMN_TYPE;
import static org.apache.atlas.TestUtils.DEPARTMENT_TYPE;
import static org.apache.atlas.TestUtils.NAME;
import static org.apache.atlas.TestUtils.TABLE_TYPE;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
@@ -107,6 +105,8 @@ public abstract class AtlasDeleteHandlerV1Test {
private TypeSystem typeSystem = TypeSystem.getInstance();
+ AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class);
+
@BeforeClass
public void setUp() throws Exception {
@@ -145,7 +145,7 @@ public abstract class AtlasDeleteHandlerV1Test {
@BeforeTest
public void init() throws Exception {
DeleteHandlerV1 deleteHandler = getDeleteHandler(typeRegistry);
- entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry);
+ entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry, mockChangeNotifier);
RequestContextV1.clear();
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java
index dd82cb2..7f76236 100644
--- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java
@@ -73,6 +73,7 @@ import static org.apache.atlas.TestUtils.COLUMN_TYPE;
import static org.apache.atlas.TestUtils.NAME;
import static org.apache.atlas.TestUtils.randomString;
import static org.apache.atlas.TestUtilsV2.TABLE_TYPE;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -98,6 +99,8 @@ public class AtlasEntityStoreV1Test {
private AtlasEntityWithExtInfo dbEntity;
private AtlasEntityWithExtInfo tblEntity;
+ AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class);
+
@BeforeClass
public void setUp() throws Exception {
@@ -128,7 +131,7 @@ public class AtlasEntityStoreV1Test {
@BeforeTest
public void init() throws Exception {
- entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry);
+ entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry, mockChangeNotifier);
RequestContextV1.clear();
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java b/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java
index 4bf1d05..e9a7d1a 100644
--- a/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java
+++ b/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java
@@ -19,6 +19,7 @@
package org.apache.atlas.listener;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.typesystem.IStruct;
import org.apache.atlas.typesystem.ITypedReferenceableInstance;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
index 2f7ba20..92ea93e 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
@@ -74,7 +74,7 @@ public class EntityREST {
public static final String PREFIX_ATTR = "attr:";
private final AtlasTypeRegistry typeRegistry;
- private final AtlasInstanceConverter restAdapters;
+ private final AtlasInstanceConverter instanceConverter;
private final MetadataService metadataService;
private final AtlasEntityStore entitiesStore;
@@ -82,7 +82,7 @@ public class EntityREST {
public EntityREST(AtlasTypeRegistry typeRegistry, AtlasInstanceConverter instanceConverter,
MetadataService metadataService, AtlasEntityStore entitiesStore) {
this.typeRegistry = typeRegistry;
- this.restAdapters = instanceConverter;
+ this.instanceConverter = instanceConverter;
this.metadataService = metadataService;
this.entitiesStore = entitiesStore;
}
@@ -204,7 +204,7 @@ public class EntityREST {
try {
IStruct trait = metadataService.getTraitDefinition(guid, typeName);
- return restAdapters.getClassification(trait);
+ return instanceConverter.getClassification(trait);
} catch (AtlasException e) {
throw toAtlasBaseException(e);
@@ -231,7 +231,7 @@ public class EntityREST {
List<AtlasClassification> clsList = new ArrayList<>();
for ( String traitName : metadataService.getTraitNames(guid) ) {
IStruct trait = metadataService.getTraitDefinition(guid, traitName);
- AtlasClassification cls = restAdapters.getClassification(trait);
+ AtlasClassification cls = instanceConverter.getClassification(trait);
clsList.add(cls);
}
@@ -258,7 +258,7 @@ public class EntityREST {
}
for (AtlasClassification classification: classifications) {
- final ITypedStruct trait = restAdapters.getTrait(classification);
+ final ITypedStruct trait = instanceConverter.getTrait(classification);
try {
metadataService.addTrait(guid, trait);
} catch (IllegalArgumentException e) {
@@ -378,7 +378,7 @@ public class EntityREST {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "empty entity list");
}
- final ITypedStruct trait = restAdapters.getTrait(classification);
+ final ITypedStruct trait = instanceConverter.getTrait(classification);
try {
metadataService.addTrait(entityGuids, trait);