You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/02/18 08:00:07 UTC

incubator-atlas git commit: ATLAS-1563: Entity change listener invocation in V2 Store

Repository: incubator-atlas
Updated Branches:
  refs/heads/master b6b6f9450 -> ea38942ba


ATLAS-1563: Entity change listener invocation in V2 Store

Signed-off-by: Madhan Neethiraj <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/ea38942b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/ea38942b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/ea38942b

Branch: refs/heads/master
Commit: ea38942baa73774f35004d36fc1805a2368c2ace
Parents: b6b6f94
Author: apoorvnaik <an...@hortonworks.com>
Authored: Fri Feb 17 17:21:37 2017 -0800
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Feb 17 23:58:25 2017 -0800

----------------------------------------------------------------------
 .../converters/AtlasInstanceConverter.java      |  15 ++-
 .../graph/v1/AtlasEntityChangeNotifier.java     | 125 +++++++++++++++++++
 .../store/graph/v1/AtlasEntityStoreV1.java      |  43 +++++--
 .../graph/v1/AtlasDeleteHandlerV1Test.java      |   8 +-
 .../store/graph/v1/AtlasEntityStoreV1Test.java  |   5 +-
 .../atlas/listener/EntityChangeListener.java    |   1 +
 .../org/apache/atlas/web/rest/EntityREST.java   |  12 +-
 7 files changed, 185 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
index 9d475bf..e14fafb 100644
--- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
@@ -76,17 +76,24 @@ public class AtlasInstanceConverter {
 
         Iterator<AtlasEntity> entityIterator = entities.iterator();
         for (int i = 0; i < entities.size(); i++) {
-            ITypedReferenceableInstance typedInstance = getITypedReferenceable(entityIterator.next(), ctx);
+            ITypedReferenceableInstance typedInstance = getITypedReferenceable(entityIterator.next());
             entitiesInOldFormat[i] = typedInstance;
         }
         return entitiesInOldFormat;
     }
 
-    public ITypedReferenceableInstance getITypedReferenceable(AtlasEntity entity, AtlasFormatConverter.ConverterContext ctx) throws AtlasBaseException {
-        Referenceable ref = getReferenceable(entity, ctx);
+    public ITypedReferenceableInstance getITypedReferenceable(AtlasEntity entity) throws AtlasBaseException {
+        try {
+            return metadataService.getEntityDefinition(entity.getGuid());
+        } catch (AtlasException e) {
+            LOG.error("Exception while getting a typed reference for the entity ", e);
+            throw toAtlasBaseException(e);
+        }
+    }
 
+    public ITypedReferenceableInstance getITypedReferenceable(String guid) throws AtlasBaseException {
         try {
-            return metadataService.getTypedReferenceableInstance(ref);
+            return metadataService.getEntityDefinition(guid);
         } catch (AtlasException e) {
             LOG.error("Exception while getting a typed reference for the entity ", e);
             throw toAtlasBaseException(e);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
new file mode 100644
index 0000000..a532f31
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v1;
+
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
+import org.apache.atlas.listener.EntityChangeListener;
+import org.apache.atlas.repository.converters.AtlasInstanceConverter;
+import org.apache.atlas.typesystem.ITypedReferenceableInstance;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.CREATE;
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
+
+
+@Singleton
+public class AtlasEntityChangeNotifier {
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class);
+
+    private final Set<EntityChangeListener> entityChangeListeners;
+    private final AtlasInstanceConverter    instanceConverter;
+
+    @Inject
+    public AtlasEntityChangeNotifier(Set<EntityChangeListener> entityChangeListeners,
+                                     AtlasInstanceConverter    instanceConverter) {
+        this.entityChangeListeners = entityChangeListeners;
+        this.instanceConverter     = instanceConverter;
+    }
+
+    public void onEntitiesMutated(EntityMutationResponse entityMutationResponse) throws AtlasBaseException {
+        if (CollectionUtils.isEmpty(entityChangeListeners) || instanceConverter == null) {
+            return;
+        }
+
+        List<AtlasEntityHeader> createdEntities          = entityMutationResponse.getCreatedEntities();
+        List<AtlasEntityHeader> updatedEntities          = entityMutationResponse.getUpdatedEntities();
+        List<AtlasEntityHeader> partiallyUpdatedEntities = entityMutationResponse.getPartialUpdatedEntities();
+        List<AtlasEntityHeader> deletedEntities          = entityMutationResponse.getDeletedEntities();
+
+        if (CollectionUtils.isNotEmpty(createdEntities)) {
+            List<ITypedReferenceableInstance> typedRefInst = toITypedReferenceable(createdEntities);
+
+            notifyListeners(typedRefInst, EntityOperation.CREATE);
+        }
+
+        if (CollectionUtils.isNotEmpty(updatedEntities)) {
+            List<ITypedReferenceableInstance> typedRefInst = toITypedReferenceable(updatedEntities);
+
+            notifyListeners(typedRefInst, EntityOperation.UPDATE);
+        }
+
+        if (CollectionUtils.isNotEmpty(partiallyUpdatedEntities)) {
+            List<ITypedReferenceableInstance> typedRefInst = toITypedReferenceable(partiallyUpdatedEntities);
+
+            notifyListeners(typedRefInst, EntityOperation.PARTIAL_UPDATE);
+        }
+
+        if (CollectionUtils.isNotEmpty(deletedEntities)) {
+            List<ITypedReferenceableInstance> typedRefInst = toITypedReferenceable(deletedEntities);
+
+            notifyListeners(typedRefInst, EntityOperation.DELETE);
+        }
+    }
+
+    private void notifyListeners(List<ITypedReferenceableInstance> typedRefInsts, EntityOperation operation) throws AtlasBaseException {
+        for (EntityChangeListener listener : entityChangeListeners) {
+            try {
+                switch (operation) {
+                    case CREATE:
+                        listener.onEntitiesAdded(typedRefInsts);
+                        break;
+                    case UPDATE:
+                    case PARTIAL_UPDATE:
+                        listener.onEntitiesUpdated(typedRefInsts);
+                        break;
+                    case DELETE:
+                        listener.onEntitiesDeleted(typedRefInsts);
+                        break;
+                }
+            } catch (AtlasException e) {
+                throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, operation.toString());
+            }
+        }
+    }
+
+    private List<ITypedReferenceableInstance> toITypedReferenceable(List<AtlasEntityHeader> entityHeaders) throws AtlasBaseException {
+        List<ITypedReferenceableInstance> ret = new ArrayList<>(entityHeaders.size());
+
+        for (AtlasEntityHeader entityHeader : entityHeaders) {
+            ret.add(instanceConverter.getITypedReferenceable(entityHeader.getGuid()));
+        }
+
+        return ret;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 31a5e8c..8a6501c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -25,9 +25,13 @@ import org.apache.atlas.GraphTransaction;
 import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasImportResult;
-import org.apache.atlas.model.instance.*;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
@@ -49,13 +53,15 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*;
 public class AtlasEntityStoreV1 implements AtlasEntityStore {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class);
 
-    private final DeleteHandlerV1   deleteHandler;
-    private final AtlasTypeRegistry typeRegistry;
+    private final DeleteHandlerV1           deleteHandler;
+    private final AtlasTypeRegistry         typeRegistry;
+    private final AtlasEntityChangeNotifier entityChangeNotifier;
 
     @Inject
-    public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry) {
-        this.deleteHandler = deleteHandler;
-        this.typeRegistry  = typeRegistry;
+    public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry, AtlasEntityChangeNotifier entityChangeNotifier) {
+        this.deleteHandler        = deleteHandler;
+        this.typeRegistry         = typeRegistry;
+        this.entityChangeNotifier = entityChangeNotifier;
     }
 
     @Override
@@ -208,6 +214,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
             LOG.debug("<== createOrUpdate()");
         }
 
+        // Notify the change listeners
+        entityChangeNotifier.onEntitiesMutated(ret);
+
         return ret;
     }
 
@@ -252,7 +261,12 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
         deletionCandidates.add(vertex);
 
-        return deleteVertices(deletionCandidates);
+        EntityMutationResponse ret = deleteVertices(deletionCandidates);
+
+        // Notify the change listeners
+        entityChangeNotifier.onEntitiesMutated(ret);
+
+        return ret;
     }
 
     @Override
@@ -281,7 +295,13 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         if (deletionCandidates.isEmpty()) {
             LOG.info("No deletion candidate entities were found for guids %s", guids);
         }
-        return deleteVertices(deletionCandidates);
+
+        EntityMutationResponse ret = deleteVertices(deletionCandidates);
+
+        // Notify the change listeners
+        entityChangeNotifier.onEntitiesMutated(ret);
+
+        return ret;
     }
 
     @Override
@@ -297,7 +317,12 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
         Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
         deletionCandidates.add(vertex);
 
-        return deleteVertices(deletionCandidates);
+        EntityMutationResponse ret = deleteVertices(deletionCandidates);
+
+        // Notify the change listeners
+        entityChangeNotifier.onEntitiesMutated(ret);
+
+        return ret;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
index 492abc4..c55e3f7 100644
--- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java
@@ -19,7 +19,6 @@ package org.apache.atlas.repository.store.graph.v1;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.RepositoryMetadataModule;
 import org.apache.atlas.RequestContextV1;
@@ -59,7 +58,6 @@ import org.apache.atlas.typesystem.persistence.Id;
 import org.apache.atlas.typesystem.types.Multiplicity;
 import org.apache.atlas.typesystem.types.TraitType;
 import org.apache.atlas.typesystem.types.TypeSystem;
-import org.apache.atlas.util.AtlasRepositoryConfiguration;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -69,7 +67,6 @@ import org.testng.annotations.Test;
 
 import javax.inject.Inject;
 
-import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -82,6 +79,7 @@ import static org.apache.atlas.TestUtils.COLUMN_TYPE;
 import static org.apache.atlas.TestUtils.DEPARTMENT_TYPE;
 import static org.apache.atlas.TestUtils.NAME;
 import static org.apache.atlas.TestUtils.TABLE_TYPE;
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
@@ -107,6 +105,8 @@ public abstract class AtlasDeleteHandlerV1Test {
 
     private TypeSystem typeSystem = TypeSystem.getInstance();
 
+    AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class);
+
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -145,7 +145,7 @@ public abstract class AtlasDeleteHandlerV1Test {
     @BeforeTest
     public void init() throws Exception {
         DeleteHandlerV1 deleteHandler = getDeleteHandler(typeRegistry);
-        entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry);
+        entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry, mockChangeNotifier);
         RequestContextV1.clear();
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java
index dd82cb2..7f76236 100644
--- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java
@@ -73,6 +73,7 @@ import static org.apache.atlas.TestUtils.COLUMN_TYPE;
 import static org.apache.atlas.TestUtils.NAME;
 import static org.apache.atlas.TestUtils.randomString;
 import static org.apache.atlas.TestUtilsV2.TABLE_TYPE;
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
@@ -98,6 +99,8 @@ public class AtlasEntityStoreV1Test {
     private AtlasEntityWithExtInfo   dbEntity;
     private AtlasEntityWithExtInfo   tblEntity;
 
+    AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class);
+
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -128,7 +131,7 @@ public class AtlasEntityStoreV1Test {
 
     @BeforeTest
     public void init() throws Exception {
-        entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry);
+        entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry, mockChangeNotifier);
         RequestContextV1.clear();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java
----------------------------------------------------------------------
diff --git a/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java b/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java
index 4bf1d05..e9a7d1a 100644
--- a/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java
+++ b/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.listener;
 
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.typesystem.IStruct;
 import org.apache.atlas.typesystem.ITypedReferenceableInstance;
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
index 2f7ba20..92ea93e 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java
@@ -74,7 +74,7 @@ public class EntityREST {
     public static final String PREFIX_ATTR = "attr:";
 
     private final AtlasTypeRegistry         typeRegistry;
-    private final AtlasInstanceConverter restAdapters;
+    private final AtlasInstanceConverter    instanceConverter;
     private final MetadataService           metadataService;
     private final AtlasEntityStore          entitiesStore;
 
@@ -82,7 +82,7 @@ public class EntityREST {
     public EntityREST(AtlasTypeRegistry typeRegistry, AtlasInstanceConverter instanceConverter,
                       MetadataService metadataService, AtlasEntityStore entitiesStore) {
         this.typeRegistry    = typeRegistry;
-        this.restAdapters    = instanceConverter;
+        this.instanceConverter = instanceConverter;
         this.metadataService = metadataService;
         this.entitiesStore   = entitiesStore;
     }
@@ -204,7 +204,7 @@ public class EntityREST {
 
         try {
             IStruct trait = metadataService.getTraitDefinition(guid, typeName);
-            return restAdapters.getClassification(trait);
+            return instanceConverter.getClassification(trait);
 
         } catch (AtlasException e) {
             throw toAtlasBaseException(e);
@@ -231,7 +231,7 @@ public class EntityREST {
             List<AtlasClassification> clsList = new ArrayList<>();
             for ( String traitName : metadataService.getTraitNames(guid) ) {
                 IStruct trait = metadataService.getTraitDefinition(guid, traitName);
-                AtlasClassification cls = restAdapters.getClassification(trait);
+                AtlasClassification cls = instanceConverter.getClassification(trait);
                 clsList.add(cls);
             }
 
@@ -258,7 +258,7 @@ public class EntityREST {
         }
 
         for (AtlasClassification classification:  classifications) {
-            final ITypedStruct trait = restAdapters.getTrait(classification);
+            final ITypedStruct trait = instanceConverter.getTrait(classification);
             try {
                 metadataService.addTrait(guid, trait);
             } catch (IllegalArgumentException e) {
@@ -378,7 +378,7 @@ public class EntityREST {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "empty entity list");
         }
 
-        final ITypedStruct trait = restAdapters.getTrait(classification);
+        final ITypedStruct trait = instanceConverter.getTrait(classification);
 
         try {
             metadataService.addTrait(entityGuids, trait);