You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ap...@apache.org on 2018/05/25 03:52:57 UTC
[19/20] atlas git commit: ATLAS-2490: updates to make usage of v1/v2
in class names consistent
http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
deleted file mode 100644
index 0e90336..0000000
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.repository.store.graph.v1;
-
-
-import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.RequestContextV1;
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.listener.EntityChangeListener;
-import org.apache.atlas.listener.EntityChangeListenerV2;
-import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
-import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
-import org.apache.atlas.model.instance.AtlasClassification;
-import org.apache.atlas.model.instance.AtlasEntity;
-
-import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
-import org.apache.atlas.model.instance.AtlasEntityHeader;
-import org.apache.atlas.model.instance.AtlasRelatedObjectId;
-import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
-import org.apache.atlas.type.AtlasEntityType;
-import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.atlas.v1.model.instance.Struct;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.converters.AtlasInstanceConverter;
-import org.apache.atlas.repository.graph.FullTextMapperV2;
-import org.apache.atlas.repository.graph.GraphHelper;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.util.AtlasRepositoryConfiguration;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD;
-import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE;
-import static org.apache.atlas.util.AtlasRepositoryConfiguration.isV2EntityNotificationEnabled;
-
-
-@Component
-public class AtlasEntityChangeNotifier {
- private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class);
-
- private final Set<EntityChangeListener> entityChangeListeners;
- private final Set<EntityChangeListenerV2> entityChangeListenersV2;
- private final AtlasInstanceConverter instanceConverter;
- private final FullTextMapperV2 fullTextMapperV2;
- private final AtlasTypeRegistry atlasTypeRegistry;
-
-
- @Inject
- public AtlasEntityChangeNotifier(Set<EntityChangeListener> entityChangeListeners,
- Set<EntityChangeListenerV2> entityChangeListenersV2,
- AtlasInstanceConverter instanceConverter,
- FullTextMapperV2 fullTextMapperV2,
- AtlasTypeRegistry atlasTypeRegistry) {
- this.entityChangeListeners = entityChangeListeners;
- this.entityChangeListenersV2 = entityChangeListenersV2;
- this.instanceConverter = instanceConverter;
- this.fullTextMapperV2 = fullTextMapperV2;
- this.atlasTypeRegistry = atlasTypeRegistry;
- }
-
- public void onEntitiesMutated(EntityMutationResponse entityMutationResponse, boolean isImport) throws AtlasBaseException {
- if (CollectionUtils.isEmpty(entityChangeListeners) || instanceConverter == null) {
- return;
- }
-
- List<AtlasEntityHeader> createdEntities = entityMutationResponse.getCreatedEntities();
- List<AtlasEntityHeader> updatedEntities = entityMutationResponse.getUpdatedEntities();
- List<AtlasEntityHeader> partiallyUpdatedEntities = entityMutationResponse.getPartialUpdatedEntities();
- List<AtlasEntityHeader> deletedEntities = entityMutationResponse.getDeletedEntities();
-
- // complete full text mapping before calling toReferenceables(), from notifyListners(), to
- // include all vertex updates in the current graph-transaction
- doFullTextMapping(createdEntities);
- doFullTextMapping(updatedEntities);
- doFullTextMapping(partiallyUpdatedEntities);
-
- notifyListeners(createdEntities, EntityOperation.CREATE, isImport);
- notifyListeners(updatedEntities, EntityOperation.UPDATE, isImport);
- notifyListeners(partiallyUpdatedEntities, EntityOperation.PARTIAL_UPDATE, isImport);
- notifyListeners(deletedEntities, EntityOperation.DELETE, isImport);
-
- notifyPropagatedEntities();
- }
-
- public void onClassificationAddedToEntity(AtlasEntity entity, List<AtlasClassification> addedClassifications) throws AtlasBaseException {
- if (isV2EntityNotificationEnabled()) {
- doFullTextMapping(entity.getGuid());
-
- for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
- listener.onClassificationsAdded(entity, addedClassifications);
- }
- } else {
- updateFullTextMapping(entity.getGuid(), addedClassifications);
-
- Referenceable entityRef = toReferenceable(entity.getGuid());
- List<Struct> traits = toStruct(addedClassifications);
-
- if (entity == null || CollectionUtils.isEmpty(traits)) {
- return;
- }
-
- for (EntityChangeListener listener : entityChangeListeners) {
- try {
- listener.onTraitsAdded(entityRef, traits);
- } catch (AtlasException e) {
- throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitAdd");
- }
- }
- }
- }
-
- public void onClassificationUpdatedToEntity(AtlasEntity entity, List<AtlasClassification> updatedClassifications) throws AtlasBaseException {
- if (isV2EntityNotificationEnabled()) {
- doFullTextMapping(entity.getGuid());
-
- for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
- listener.onClassificationsUpdated(entity, updatedClassifications);
- }
- } else {
- doFullTextMapping(entity.getGuid());
-
- Referenceable entityRef = toReferenceable(entity.getGuid());
- List<Struct> traits = toStruct(updatedClassifications);
-
- if (entityRef == null || CollectionUtils.isEmpty(traits)) {
- return;
- }
-
- for (EntityChangeListener listener : entityChangeListeners) {
- try {
- listener.onTraitsUpdated(entityRef, traits);
- } catch (AtlasException e) {
- throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitUpdate");
- }
- }
- }
- }
-
- public void onClassificationDeletedFromEntity(AtlasEntity entity, List<AtlasClassification> deletedClassifications) throws AtlasBaseException {
- if (isV2EntityNotificationEnabled()) {
- doFullTextMapping(entity.getGuid());
-
- for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
- listener.onClassificationsDeleted(entity, deletedClassifications);
- }
- } else {
- doFullTextMapping(entity.getGuid());
-
- Referenceable entityRef = toReferenceable(entity.getGuid());
- List<Struct> traits = toStruct(deletedClassifications);
-
- if (entityRef == null || CollectionUtils.isEmpty(deletedClassifications)) {
- return;
- }
-
- for (EntityChangeListener listener : entityChangeListeners) {
- try {
- listener.onTraitsDeleted(entityRef, traits);
- } catch (AtlasException e) {
- throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TraitDelete");
- }
- }
-
- }
- }
-
- public void onTermAddedToEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
- // listeners notified on term-entity association only if v2 notifications are enabled
- if (isV2EntityNotificationEnabled()) {
- for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
- listener.onTermAdded(term, entityIds);
- }
- } else {
- List<Referenceable> entityRefs = toReferenceables(entityIds);
-
- for (EntityChangeListener listener : entityChangeListeners) {
- try {
- listener.onTermAdded(entityRefs, term);
- } catch (AtlasException e) {
- throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TermAdd");
- }
- }
- }
- }
-
- public void onTermDeletedFromEntities(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
- // listeners notified on term-entity disassociation only if v2 notifications are enabled
- if (isV2EntityNotificationEnabled()) {
- for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
- listener.onTermDeleted(term, entityIds);
- }
- } else {
- List<Referenceable> entityRefs = toReferenceables(entityIds);
-
- for (EntityChangeListener listener : entityChangeListeners) {
- try {
- listener.onTermDeleted(entityRefs, term);
- } catch (AtlasException e) {
- throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), "TermDelete");
- }
- }
- }
- }
-
- public void notifyPropagatedEntities() throws AtlasBaseException {
- RequestContextV1 context = RequestContextV1.get();
- Map<String, List<AtlasClassification>> addedPropagations = context.getAddedPropagations();
- Map<String, List<AtlasClassification>> removedPropagations = context.getRemovedPropagations();
-
- notifyPropagatedEntities(addedPropagations, PROPAGATED_CLASSIFICATION_ADD);
- notifyPropagatedEntities(removedPropagations, PROPAGATED_CLASSIFICATION_DELETE);
- }
-
- private void notifyPropagatedEntities(Map<String, List<AtlasClassification>> entityPropagationMap, EntityAuditActionV2 action) throws AtlasBaseException {
- if (MapUtils.isEmpty(entityPropagationMap) || action == null) {
- return;
- }
-
- for (String guid : entityPropagationMap.keySet()) {
- AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(guid);
- AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
-
- if (entity == null) {
- continue;
- }
-
- if (action == PROPAGATED_CLASSIFICATION_ADD) {
- onClassificationAddedToEntity(entity, entityPropagationMap.get(guid));
- } else if (action == PROPAGATED_CLASSIFICATION_DELETE) {
- onClassificationDeletedFromEntity(entity, entityPropagationMap.get(guid));
- }
- }
- }
-
- private String getListenerName(EntityChangeListener listener) {
- return listener.getClass().getSimpleName();
- }
-
- private void notifyListeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException {
- if (CollectionUtils.isEmpty(entityHeaders)) {
- return;
- }
-
- if (isV2EntityNotificationEnabled()) {
- notifyV2Listeners(entityHeaders, operation, isImport);
- } else {
- notifyV1Listeners(entityHeaders, operation, isImport);
- }
- }
-
- private void notifyV1Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException {
- List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, operation);
-
- for (EntityChangeListener listener : entityChangeListeners) {
- try {
- switch (operation) {
- case CREATE:
- listener.onEntitiesAdded(typedRefInsts, isImport);
- break;
- case UPDATE:
- case PARTIAL_UPDATE:
- listener.onEntitiesUpdated(typedRefInsts, isImport);
- break;
- case DELETE:
- listener.onEntitiesDeleted(typedRefInsts, isImport);
- break;
- }
- } catch (AtlasException e) {
- throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, getListenerName(listener), operation.toString());
- }
- }
- }
-
- private void notifyV2Listeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException {
- List<AtlasEntity> entities = toAtlasEntities(entityHeaders);
-
- for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
- switch (operation) {
- case CREATE:
- listener.onEntitiesAdded(entities, isImport);
- break;
- case UPDATE:
- case PARTIAL_UPDATE:
- listener.onEntitiesUpdated(entities, isImport);
- break;
- case DELETE:
- listener.onEntitiesDeleted(entities, isImport);
- break;
- }
- }
- }
-
- private List<Referenceable> toReferenceables(List<AtlasEntityHeader> entityHeaders, EntityOperation operation) throws AtlasBaseException {
- List<Referenceable> ret = new ArrayList<>(entityHeaders.size());
-
- // delete notifications don't need all attributes. Hence the special handling for delete operation
- if (operation == EntityOperation.DELETE) {
- for (AtlasEntityHeader entityHeader : entityHeaders) {
- ret.add(new Referenceable(entityHeader.getGuid(), entityHeader.getTypeName(), entityHeader.getAttributes()));
- }
- } else {
- for (AtlasEntityHeader entityHeader : entityHeaders) {
- ret.add(toReferenceable(entityHeader.getGuid()));
- }
- }
-
- return ret;
- }
-
- private List<Referenceable> toReferenceables(List<AtlasRelatedObjectId> entityIds) throws AtlasBaseException {
- List<Referenceable> ret = new ArrayList<>();
-
- if (CollectionUtils.isNotEmpty(entityIds)) {
- for (AtlasRelatedObjectId relatedObjectId : entityIds) {
- String entityGuid = relatedObjectId.getGuid();
-
- ret.add(toReferenceable(entityGuid));
- }
- }
-
- return ret;
- }
-
- private Referenceable toReferenceable(String entityId) throws AtlasBaseException {
- Referenceable ret = null;
-
- if (StringUtils.isNotEmpty(entityId)) {
- ret = instanceConverter.getReferenceable(entityId);
- }
-
- return ret;
- }
-
- private List<Struct> toStruct(List<AtlasClassification> classifications) throws AtlasBaseException {
- List<Struct> ret = null;
-
- if (classifications != null) {
- ret = new ArrayList<>(classifications.size());
-
- for (AtlasClassification classification : classifications) {
- if (classification != null) {
- ret.add(instanceConverter.getTrait(classification));
- }
- }
- }
-
- return ret;
- }
-
- private List<AtlasEntity> toAtlasEntities(List<AtlasEntityHeader> entityHeaders) throws AtlasBaseException {
- List<AtlasEntity> ret = new ArrayList<>();
-
- if (CollectionUtils.isNotEmpty(entityHeaders)) {
- for (AtlasEntityHeader entityHeader : entityHeaders) {
- String entityGuid = entityHeader.getGuid();
- String typeName = entityHeader.getTypeName();
-
- // Skip all internal types as the HARD DELETE will cause lookup errors
- AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName);
- if (Objects.nonNull(entityType) && entityType.isInternalType()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping internal type = {}", typeName);
- }
- continue;
- }
-
- AtlasEntityWithExtInfo entityWithExtInfo = instanceConverter.getAndCacheEntity(entityGuid);
-
- if (entityWithExtInfo != null) {
- ret.add(entityWithExtInfo.getEntity());
- }
- }
- }
-
- return ret;
- }
-
- private void doFullTextMapping(List<AtlasEntityHeader> entityHeaders) {
- if (CollectionUtils.isEmpty(entityHeaders)) {
- return;
- }
-
- try {
- if(!AtlasRepositoryConfiguration.isFullTextSearchEnabled()) {
- return;
- }
- } catch (AtlasException e) {
- LOG.warn("Unable to determine if FullText is disabled. Proceeding with FullText mapping");
- }
-
- for (AtlasEntityHeader entityHeader : entityHeaders) {
- if(GraphHelper.isInternalType(entityHeader.getTypeName())) {
- continue;
- }
-
- String guid = entityHeader.getGuid();
- AtlasVertex vertex = AtlasGraphUtilsV1.findByGuid(guid);
-
- if(vertex == null) {
- continue;
- }
-
- try {
- String fullText = fullTextMapperV2.getIndexTextForEntity(guid);
-
- GraphHelper.setProperty(vertex, Constants.ENTITY_TEXT_PROPERTY_KEY, fullText);
- } catch (AtlasBaseException e) {
- LOG.error("FullText mapping failed for Vertex[ guid = {} ]", guid, e);
- }
- }
- }
-
- private void updateFullTextMapping(String entityId, List<AtlasClassification> classifications) {
- try {
- if(!AtlasRepositoryConfiguration.isFullTextSearchEnabled()) {
- return;
- }
- } catch (AtlasException e) {
- LOG.warn("Unable to determine if FullText is disabled. Proceeding with FullText mapping");
- }
-
- if (StringUtils.isEmpty(entityId) || CollectionUtils.isEmpty(classifications)) {
- return;
- }
-
- AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(entityId);
- if(atlasVertex == null || GraphHelper.isInternalType(atlasVertex)) {
- return;
- }
-
- try {
- String classificationFullText = fullTextMapperV2.getIndexTextForClassifications(entityId, classifications);
- String existingFullText = (String) GraphHelper.getProperty(atlasVertex, Constants.ENTITY_TEXT_PROPERTY_KEY);
-
- String newFullText = existingFullText + " " + classificationFullText;
- GraphHelper.setProperty(atlasVertex, Constants.ENTITY_TEXT_PROPERTY_KEY, newFullText);
- } catch (AtlasBaseException e) {
- LOG.error("FullText mapping failed for Vertex[ guid = {} ]", entityId, e);
- }
- }
-
- private void doFullTextMapping(String guid) {
- AtlasEntityHeader entityHeader = new AtlasEntityHeader();
- entityHeader.setGuid(guid);
-
- doFullTextMapping(Collections.singletonList(entityHeader));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityDefStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityDefStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityDefStoreV1.java
deleted file mode 100644
index ebdba3b..0000000
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityDefStoreV1.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.repository.store.graph.v1;
-
-import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.authorize.AtlasPrivilege;
-import org.apache.atlas.authorize.AtlasTypeAccessRequest;
-import org.apache.atlas.authorize.AtlasAuthorizationUtils;
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.typedef.AtlasEntityDef;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.type.AtlasEntityType;
-import org.apache.atlas.type.AtlasType;
-import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * EntityDef store in v1 format.
- */
-public class AtlasEntityDefStoreV1 extends AtlasAbstractDefStoreV1<AtlasEntityDef> {
- private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityDefStoreV1.class);
-
- @Inject
- public AtlasEntityDefStoreV1(AtlasTypeDefGraphStoreV1 typeDefStore, AtlasTypeRegistry typeRegistry) {
- super(typeDefStore, typeRegistry);
- }
-
- @Override
- public AtlasVertex preCreate(AtlasEntityDef entityDef) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasEntityDefStoreV1.preCreate({})", entityDef);
- }
-
- validateType(entityDef);
-
- AtlasType type = typeRegistry.getType(entityDef.getName());
-
- if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.ENTITY) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, entityDef.getName(), TypeCategory.CLASS.name());
- }
-
-
-
- AtlasVertex ret = typeDefStore.findTypeVertexByName(entityDef.getName());
-
- if (ret != null) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, entityDef.getName());
- }
-
- ret = typeDefStore.createTypeVertex(entityDef);
-
- updateVertexPreCreate(entityDef, (AtlasEntityType)type, ret);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasEntityDefStoreV1.preCreate({}): {}", entityDef, ret);
- }
-
- return ret;
- }
-
- @Override
- public AtlasEntityDef create(AtlasEntityDef entityDef, AtlasVertex preCreateResult) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasEntityDefStoreV1.create({}, {})", entityDef, preCreateResult);
- }
-
- AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_CREATE, entityDef), "create entity-def ", entityDef.getName());
-
- AtlasVertex vertex = (preCreateResult == null) ? preCreate(entityDef) : preCreateResult;
-
- updateVertexAddReferences(entityDef, vertex);
-
- AtlasEntityDef ret = toEntityDef(vertex);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasEntityDefStoreV1.create({}, {}): {}", entityDef, preCreateResult, ret);
- }
-
- return ret;
- }
-
- @Override
- public List<AtlasEntityDef> getAll() throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasEntityDefStoreV1.getAll()");
- }
-
- List<AtlasEntityDef> ret = new ArrayList<>();
- Iterator<AtlasVertex> vertices = typeDefStore.findTypeVerticesByCategory(TypeCategory.CLASS);
-
- while (vertices.hasNext()) {
- ret.add(toEntityDef(vertices.next()));
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasEntityDefStoreV1.getAll(): count={}", ret.size());
- }
-
- return ret;
- }
-
- @Override
- public AtlasEntityDef getByName(String name) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasEntityDefStoreV1.getByName({})", name);
- }
-
- AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.CLASS);
-
- if (vertex == null) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
- }
-
- vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, TypeCategory.class);
-
- AtlasEntityDef ret = toEntityDef(vertex);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasEntityDefStoreV1.getByName({}): {}", name, ret);
- }
-
- return ret;
- }
-
- @Override
- public AtlasEntityDef getByGuid(String guid) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasEntityDefStoreV1.getByGuid({})", guid);
- }
-
- AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.CLASS);
-
- if (vertex == null) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
- }
-
- AtlasEntityDef ret = toEntityDef(vertex);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasEntityDefStoreV1.getByGuid({}): {}", guid, ret);
- }
-
- return ret;
- }
-
- @Override
- public AtlasEntityDef update(AtlasEntityDef entityDef) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasEntityDefStoreV1.update({})", entityDef);
- }
-
- validateType(entityDef);
-
- AtlasEntityDef ret = StringUtils.isNotBlank(entityDef.getGuid()) ? updateByGuid(entityDef.getGuid(), entityDef)
- : updateByName(entityDef.getName(), entityDef);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasEntityDefStoreV1.update({}): {}", entityDef, ret);
- }
-
- return ret;
- }
-
- @Override
- public AtlasEntityDef updateByName(String name, AtlasEntityDef entityDef) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasEntityDefStoreV1.updateByName({}, {})", name, entityDef);
- }
-
- AtlasEntityDef existingDef = typeRegistry.getEntityDefByName(name);
-
- AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update entity-def ", name);
-
- validateType(entityDef);
-
- AtlasType type = typeRegistry.getType(entityDef.getName());
-
- if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.ENTITY) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, entityDef.getName(), TypeCategory.CLASS.name());
- }
-
- AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.CLASS);
-
- if (vertex == null) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
- }
-
- updateVertexPreUpdate(entityDef, (AtlasEntityType)type, vertex);
- updateVertexAddReferences(entityDef, vertex);
-
- AtlasEntityDef ret = toEntityDef(vertex);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasEntityDefStoreV1.updateByName({}, {}): {}", name, entityDef, ret);
- }
-
- return ret;
- }
-
- @Override
- public AtlasEntityDef updateByGuid(String guid, AtlasEntityDef entityDef) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasEntityDefStoreV1.updateByGuid({})", guid);
- }
-
- AtlasEntityDef existingDef = typeRegistry.getEntityDefByGuid(guid);
-
- AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update entity-def ", (existingDef != null ? existingDef.getName() : guid));
-
- validateType(entityDef);
-
- AtlasType type = typeRegistry.getTypeByGuid(guid);
-
- if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.ENTITY) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, entityDef.getName(), TypeCategory.CLASS.name());
- }
-
- AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.CLASS);
-
- if (vertex == null) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
- }
-
- updateVertexPreUpdate(entityDef, (AtlasEntityType)type, vertex);
- updateVertexAddReferences(entityDef, vertex);
-
- AtlasEntityDef ret = toEntityDef(vertex);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasEntityDefStoreV1.updateByGuid({}): {}", guid, ret);
- }
-
- return ret;
- }
-
- @Override
- public AtlasVertex preDeleteByName(String name) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasEntityDefStoreV1.preDeleteByName({})", name);
- }
-
- AtlasEntityDef existingDef = typeRegistry.getEntityDefByName(name);
-
- AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete entity-def ", name);
-
- AtlasVertex ret = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.CLASS);
-
- if (AtlasGraphUtilsV1.typeHasInstanceVertex(name)) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, name);
- }
-
- if (ret == null) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
- }
-
- // error if we are trying to delete an entityDef that has a relationshipDef
- if (typeDefStore.hasIncomingEdgesWithLabel(ret, AtlasGraphUtilsV1.RELATIONSHIPTYPE_EDGE_LABEL)){
- throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_RELATIONSHIPS, name);
- }
-
- typeDefStore.deleteTypeVertexOutEdges(ret);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasEntityDefStoreV1.preDeleteByName({}): {}", name, ret);
- }
-
- return ret;
- }
-
- @Override
- public AtlasVertex preDeleteByGuid(String guid) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> AtlasEntityDefStoreV1.preDeleteByGuid({})", guid);
- }
-
- AtlasEntityDef existingDef = typeRegistry.getEntityDefByGuid(guid);
-
- AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete entity-def ", (existingDef != null ? existingDef.getName() : guid));
-
- AtlasVertex ret = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.CLASS);
-
- String typeName = AtlasGraphUtilsV1.getProperty(ret, Constants.TYPENAME_PROPERTY_KEY, String.class);
-
- if (AtlasGraphUtilsV1.typeHasInstanceVertex(typeName)) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, typeName);
- }
-
- if (ret == null) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
- }
-
- // error if we are trying to delete an entityDef that has a relationshipDef
- if (typeDefStore.hasIncomingEdgesWithLabel(ret, AtlasGraphUtilsV1.RELATIONSHIPTYPE_EDGE_LABEL)){
- throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_RELATIONSHIPS, typeName);
- }
-
- typeDefStore.deleteTypeVertexOutEdges(ret);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== AtlasEntityDefStoreV1.preDeleteByGuid({}): {}", guid, ret);
- }
-
- return ret;
- }
-
- private void updateVertexPreCreate(AtlasEntityDef entityDef, AtlasEntityType entityType, AtlasVertex vertex) throws AtlasBaseException {
- AtlasStructDefStoreV1.updateVertexPreCreate(entityDef, entityType, vertex, typeDefStore);
- }
-
- private void updateVertexPreUpdate(AtlasEntityDef entityDef, AtlasEntityType entityType, AtlasVertex vertex)
- throws AtlasBaseException {
- AtlasStructDefStoreV1.updateVertexPreUpdate(entityDef, entityType, vertex, typeDefStore);
- }
-
- private void updateVertexAddReferences(AtlasEntityDef entityDef, AtlasVertex vertex) throws AtlasBaseException {
- AtlasStructDefStoreV1.updateVertexAddReferences(entityDef, vertex, typeDefStore);
-
- typeDefStore.createSuperTypeEdges(vertex, entityDef.getSuperTypes(), TypeCategory.CLASS);
- }
-
- private AtlasEntityDef toEntityDef(AtlasVertex vertex) throws AtlasBaseException {
- AtlasEntityDef ret = null;
-
- if (vertex != null && typeDefStore.isTypeVertex(vertex, TypeCategory.CLASS)) {
- ret = new AtlasEntityDef();
-
- AtlasStructDefStoreV1.toStructDef(vertex, ret, typeDefStore);
-
- ret.setSuperTypes(typeDefStore.getSuperTypeNames(vertex));
- }
-
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
deleted file mode 100644
index e31ca4d..0000000
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.repository.store.graph.v1;
-
-import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.TypeCategory;
-import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.model.instance.AtlasStruct;
-import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
-import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
-import org.apache.atlas.repository.store.graph.EntityResolver;
-import org.apache.atlas.type.AtlasArrayType;
-import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
-import org.apache.atlas.type.AtlasEntityType;
-import org.apache.atlas.type.AtlasMapType;
-import org.apache.atlas.type.AtlasStructType;
-import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
-import org.apache.atlas.type.AtlasType;
-import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.type.AtlasTypeUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-
-public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery {
- private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityGraphDiscoveryV1.class);
-
- private final AtlasTypeRegistry typeRegistry;
- private final EntityGraphDiscoveryContext discoveryContext;
-
- public AtlasEntityGraphDiscoveryV1(AtlasTypeRegistry typeRegistry, EntityStream entityStream) {
- this.typeRegistry = typeRegistry;
- this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry, entityStream);
- }
-
- @Override
- public void init() throws AtlasBaseException {
- //Nothing to do
- }
-
- @Override
- public EntityGraphDiscoveryContext discoverEntities() throws AtlasBaseException {
- // walk through entities in stream and validate them; record entity references
- discover();
-
- // resolve entity references discovered in previous step
- resolveReferences();
-
- return discoveryContext;
- }
-
- @Override
- public void validateAndNormalize(AtlasEntity entity) throws AtlasBaseException {
- List<String> messages = new ArrayList<>();
-
- if (! AtlasTypeUtil.isValidGuid(entity.getGuid())) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, "invalid guid " + entity.getGuid());
- }
-
- AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName());
-
- if (type == null) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName());
- }
-
- type.validateValue(entity, entity.getTypeName(), messages);
-
- if (!messages.isEmpty()) {
- throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages);
- }
-
- type.getNormalizedValue(entity);
- }
-
- @Override
- public void validateAndNormalizeForUpdate(AtlasEntity entity) throws AtlasBaseException {
- List<String> messages = new ArrayList<>();
-
- if (! AtlasTypeUtil.isValidGuid(entity.getGuid())) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, "invalid guid " + entity.getGuid());
- }
-
- AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName());
-
- if (type == null) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName());
- }
-
- type.validateValueForUpdate(entity, entity.getTypeName(), messages);
-
- if (!messages.isEmpty()) {
- throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages);
- }
-
- type.getNormalizedValueForUpdate(entity);
- }
-
- @Override
- public void cleanUp() throws AtlasBaseException {
- discoveryContext.cleanUp();
- }
-
-
- protected void discover() throws AtlasBaseException {
- EntityStream entityStream = discoveryContext.getEntityStream();
-
- Set<String> walkedEntities = new HashSet<>();
-
- // walk through top-level entities and find entity references
- while (entityStream.hasNext()) {
- AtlasEntity entity = entityStream.next();
-
- if (entity == null) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "found null entity");
- }
-
- walkEntityGraph(entity);
-
- walkedEntities.add(entity.getGuid());
- }
-
- // walk through entities referenced by other entities
- // referencedGuids will be updated within this for() loop; avoid use of iterators
- List<String> referencedGuids = discoveryContext.getReferencedGuids();
- for (int i = 0; i < referencedGuids.size(); i++) {
- String guid = referencedGuids.get(i);
-
- if (walkedEntities.contains(guid)) {
- continue;
- }
-
- AtlasEntity entity = entityStream.getByGuid(guid);
-
- if (entity != null) {
- walkEntityGraph(entity);
-
- walkedEntities.add(entity.getGuid());
- }
- }
- }
-
- protected void resolveReferences() throws AtlasBaseException {
- EntityResolver[] entityResolvers = new EntityResolver[] { new IDBasedEntityResolver(typeRegistry),
- new UniqAttrBasedEntityResolver(typeRegistry)
- };
-
- for (EntityResolver resolver : entityResolvers) {
- resolver.resolveEntityReferences(discoveryContext);
- }
- }
-
- private void visitReference(AtlasObjectIdType type, Object val) throws AtlasBaseException {
- if (type == null || val == null) {
- return;
- }
-
- if (val instanceof AtlasObjectId) {
- AtlasObjectId objId = (AtlasObjectId)val;
-
- if (!AtlasTypeUtil.isValid(objId)) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, objId.toString());
- }
-
- recordObjectReference(objId);
- } else if (val instanceof Map) {
- AtlasObjectId objId = new AtlasObjectId((Map)val);
-
- if (!AtlasTypeUtil.isValid(objId)) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, objId.toString());
- }
-
- recordObjectReference(objId);
- } else {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
- }
- }
-
- void visitAttribute(AtlasType attrType, Object val) throws AtlasBaseException {
- if (attrType == null || val == null) {
- return;
- }
-
- switch (attrType.getTypeCategory()) {
- case PRIMITIVE:
- case ENUM:
- return;
-
- case ARRAY: {
- AtlasArrayType arrayType = (AtlasArrayType) attrType;
- AtlasType elemType = arrayType.getElementType();
-
- visitCollectionReferences(elemType, val);
- }
- break;
-
- case MAP: {
- AtlasType keyType = ((AtlasMapType) attrType).getKeyType();
- AtlasType valueType = ((AtlasMapType) attrType).getValueType();
-
- visitMapReferences(keyType, valueType, val);
- }
- break;
-
- case STRUCT:
- visitStruct((AtlasStructType)attrType, val);
- break;
-
- case OBJECT_ID_TYPE:
- visitReference((AtlasObjectIdType) attrType, val);
- break;
-
- default:
- throw new AtlasBaseException(AtlasErrorCode.TYPE_CATEGORY_INVALID, attrType.getTypeCategory().name());
- }
- }
-
- void visitMapReferences(AtlasType keyType, AtlasType valueType, Object val) throws AtlasBaseException {
- if (keyType == null || valueType == null || val == null) {
- return;
- }
-
- if (isPrimitive(keyType.getTypeCategory()) && isPrimitive(valueType.getTypeCategory())) {
- return;
- }
-
- if (Map.class.isAssignableFrom(val.getClass())) {
- Iterator<Map.Entry> it = ((Map) val).entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry e = it.next();
- visitAttribute(keyType, e.getKey());
- visitAttribute(valueType, e.getValue());
- }
- }
- }
-
- void visitCollectionReferences(AtlasType elemType, Object val) throws AtlasBaseException {
- if (elemType == null || val == null || isPrimitive(elemType.getTypeCategory())) {
- return;
- }
-
- Iterator it = null;
-
- if (val instanceof Collection) {
- it = ((Collection) val).iterator();
- } else if (val instanceof Iterable) {
- it = ((Iterable) val).iterator();
- } else if (val instanceof Iterator) {
- it = (Iterator) val;
- }
-
- if (it != null) {
- while (it.hasNext()) {
- Object elem = it.next();
- visitAttribute(elemType, elem);
- }
- }
- }
-
- void visitStruct(AtlasStructType structType, Object val) throws AtlasBaseException {
- if (structType == null || val == null) {
- return;
- }
-
- final AtlasStruct struct;
-
- if (val instanceof AtlasStruct) {
- struct = (AtlasStruct) val;
- } else if (val instanceof Map) {
- Map attributes = AtlasTypeUtil.toStructAttributes((Map) val);
-
- struct = new AtlasStruct(structType.getTypeName(), attributes);
- } else {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_STRUCT_VALUE, val.toString());
- }
-
- visitStruct(structType, struct);
- }
-
- void visitEntity(AtlasEntityType entityType, AtlasEntity entity) throws AtlasBaseException {
- List<String> visitedAttributes = new ArrayList<>();
-
- // visit relationship attributes
- visitRelationships(entityType, entity, visitedAttributes);
-
- // visit struct attributes
- for (AtlasAttribute attribute : entityType.getAllAttributes().values()) {
- AtlasType attrType = attribute.getAttributeType();
- String attrName = attribute.getName();
- Object attrVal = entity.getAttribute(attrName);
-
- if (entity.hasAttribute(attrName) && !visitedAttributes.contains(attrName)) {
- visitAttribute(attrType, attrVal);
- }
- }
- }
-
- private void visitRelationships(AtlasEntityType entityType, AtlasEntity entity, List<String> visitedAttributes) throws AtlasBaseException {
- for (AtlasAttribute attribute : entityType.getRelationshipAttributes().values()) {
- AtlasType attrType = attribute.getAttributeType();
- String attrName = attribute.getName();
- Object attrVal = entity.getRelationshipAttribute(attrName);
-
- if (entity.hasRelationshipAttribute(attrName)) {
- visitAttribute(attrType, attrVal);
-
- visitedAttributes.add(attrName);
- }
- }
- }
-
- void visitStruct(AtlasStructType structType, AtlasStruct struct) throws AtlasBaseException {
- for (AtlasAttribute attribute : structType.getAllAttributes().values()) {
- AtlasType attrType = attribute.getAttributeType();
- Object attrVal = struct.getAttribute(attribute.getName());
-
- visitAttribute(attrType, attrVal);
- }
- }
-
- void walkEntityGraph(AtlasEntity entity) throws AtlasBaseException {
- if (entity == null) {
- return;
- }
-
- AtlasEntityType type = typeRegistry.getEntityTypeByName(entity.getTypeName());
-
- if (type == null) {
- throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName());
- }
-
- recordObjectReference(entity.getGuid());
-
- visitEntity(type, entity);
- }
-
-
- boolean isPrimitive(TypeCategory typeCategory) {
- return typeCategory == TypeCategory.PRIMITIVE || typeCategory == TypeCategory.ENUM;
- }
-
- private void recordObjectReference(String guid) {
- discoveryContext.addReferencedGuid(guid);
- }
-
- private void recordObjectReference(AtlasObjectId objId) {
- if (AtlasTypeUtil.isValidGuid(objId)) {
- discoveryContext.addReferencedGuid(objId.getGuid());
- } else {
- discoveryContext.addReferencedByUniqAttribs(objId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
deleted file mode 100644
index 528065c..0000000
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ /dev/null
@@ -1,781 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.repository.store.graph.v1;
-
-
-import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.GraphTransactionInterceptor;
-import org.apache.atlas.RequestContextV1;
-import org.apache.atlas.annotation.GraphTransaction;
-import org.apache.atlas.authorize.AtlasEntityAccessRequest;
-import org.apache.atlas.authorize.AtlasPrivilege;
-import org.apache.atlas.authorize.AtlasAuthorizationUtils;
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.instance.*;
-import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
-import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
-import org.apache.atlas.repository.store.graph.AtlasEntityStore;
-import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
-import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
-import org.apache.atlas.type.AtlasClassificationType;
-import org.apache.atlas.type.AtlasEntityType;
-import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
-import org.apache.atlas.type.AtlasType;
-import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.type.AtlasTypeUtil;
-import org.apache.atlas.utils.AtlasEntityUtil;
-import org.apache.atlas.utils.AtlasPerfTracer;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-import javax.inject.Inject;
-import java.util.*;
-
-import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME;
-import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
-import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
-
-
-@Component
-public class AtlasEntityStoreV1 implements AtlasEntityStore {
- private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class);
- private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("store.EntityStore");
-
-
- private final DeleteHandlerV1 deleteHandler;
- private final AtlasTypeRegistry typeRegistry;
- private final AtlasEntityChangeNotifier entityChangeNotifier;
- private final EntityGraphMapper entityGraphMapper;
- private final EntityGraphRetriever entityRetriever;
-
- @Inject
- public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry,
- AtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper) {
- this.deleteHandler = deleteHandler;
- this.typeRegistry = typeRegistry;
- this.entityChangeNotifier = entityChangeNotifier;
- this.entityGraphMapper = entityGraphMapper;
- this.entityRetriever = new EntityGraphRetriever(typeRegistry);
- }
-
- @Override
- @GraphTransaction
- public List<String> getEntityGUIDS(final String typename) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> getEntityGUIDS({})", typename);
- }
-
- if (StringUtils.isEmpty(typename) || !typeRegistry.isRegisteredType(typename)) {
- throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME);
- }
-
- List<String> ret = AtlasGraphUtilsV1.findEntityGUIDsByType(typename);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== getEntityGUIDS({})", typename);
- }
-
- return ret;
- }
-
- @Override
- @GraphTransaction
- public AtlasEntityWithExtInfo getById(String guid) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> getById({})", guid);
- }
-
- AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(guid);
-
- AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(ret.getEntity())), "read entity: guid=", guid);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== getById({}): {}", guid, ret);
- }
-
- return ret;
- }
-
- @Override
- @GraphTransaction
- public AtlasEntitiesWithExtInfo getByIds(List<String> guids) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> getByIds({})", guids);
- }
-
- AtlasEntitiesWithExtInfo ret = entityRetriever.toAtlasEntitiesWithExtInfo(guids);
-
- // verify authorization to read the entities
- if(ret != null){
- for(String guid : guids){
- AtlasEntity entity = ret.getEntity(guid);
-
- AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(entity)), "read entity: guid=", guid);
- }
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== getByIds({}): {}", guids, ret);
- }
-
- return ret;
- }
-
- @Override
- @GraphTransaction
- public AtlasEntityWithExtInfo getByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> getByUniqueAttribute({}, {})", entityType.getTypeName(), uniqAttributes);
- }
-
- AtlasVertex entityVertex = AtlasGraphUtilsV1.getVertexByUniqueAttributes(entityType, uniqAttributes);
- AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(entityVertex);
-
- AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(ret.getEntity())), "read entity: typeName=", entityType.getTypeName(), ", uniqueAttributes=", uniqAttributes);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== getByUniqueAttribute({}, {}): {}", entityType.getTypeName(), uniqAttributes, ret);
- }
-
- return ret;
- }
-
- @Override
- @GraphTransaction
- public EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate) throws AtlasBaseException {
- return createOrUpdate(entityStream, isPartialUpdate, false);
- }
-
- @Override
- @GraphTransaction
- public EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException {
- return createOrUpdate(entityStream, false, true);
- }
-
- @Override
- @GraphTransaction
- public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> updateEntity({}, {}, {})", objectId, updatedEntityInfo, isPartialUpdate);
- }
-
- if (objectId == null || updatedEntityInfo == null || updatedEntityInfo.getEntity() == null) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "null entity-id/entity");
- }
-
- final String guid;
-
- if (AtlasTypeUtil.isAssignedGuid(objectId.getGuid())) {
- guid = objectId.getGuid();
- } else {
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(objectId.getTypeName());
-
- if (entityType == null) {
- throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, objectId.getTypeName());
- }
-
- guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(typeRegistry.getEntityTypeByName(objectId.getTypeName()), objectId.getUniqueAttributes());
- }
-
- AtlasEntity entity = updatedEntityInfo.getEntity();
-
- entity.setGuid(guid);
-
- return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), isPartialUpdate, false);
- }
-
- @Override
- @GraphTransaction
- public EntityMutationResponse updateByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes,
- AtlasEntityWithExtInfo updatedEntityInfo) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> updateByUniqueAttributes({}, {})", entityType.getTypeName(), uniqAttributes);
- }
-
- if (updatedEntityInfo == null || updatedEntityInfo.getEntity() == null) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entity to update.");
- }
-
- String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, uniqAttributes);
- AtlasEntity entity = updatedEntityInfo.getEntity();
-
- entity.setGuid(guid);
-
- return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), true, false);
- }
-
- @Override
- @GraphTransaction
- public EntityMutationResponse updateEntityAttributeByGuid(String guid, String attrName, Object attrValue)
- throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> updateEntityAttributeByGuid({}, {}, {})", guid, attrName, attrValue);
- }
-
- AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
- AtlasEntityType entityType = (AtlasEntityType) typeRegistry.getType(entity.getTypeName());
- AtlasAttribute attr = entityType.getAttribute(attrName);
-
- if (attr == null) {
- throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_ATTRIBUTE, attrName, entity.getTypeName());
- }
-
- AtlasType attrType = attr.getAttributeType();
- AtlasEntity updateEntity = new AtlasEntity();
-
- updateEntity.setGuid(guid);
- updateEntity.setTypeName(entity.getTypeName());
-
- switch (attrType.getTypeCategory()) {
- case PRIMITIVE:
- updateEntity.setAttribute(attrName, attrValue);
- break;
- case OBJECT_ID_TYPE:
- AtlasObjectId objId;
-
- if (attrValue instanceof String) {
- objId = new AtlasObjectId((String) attrValue, attr.getAttributeDef().getTypeName());
- } else {
- objId = (AtlasObjectId) attrType.getNormalizedValue(attrValue);
- }
-
- updateEntity.setAttribute(attrName, objId);
- break;
-
- default:
- throw new AtlasBaseException(AtlasErrorCode.ATTRIBUTE_UPDATE_NOT_SUPPORTED, attrName, attrType.getTypeName());
- }
-
- return createOrUpdate(new AtlasEntityStream(updateEntity), true, false);
- }
-
- @Override
- @GraphTransaction
- public EntityMutationResponse deleteById(final String guid) throws AtlasBaseException {
- if (StringUtils.isEmpty(guid)) {
- throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
- }
-
- Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
- AtlasVertex vertex = AtlasGraphUtilsV1.findByGuid(guid);
-
- if (vertex != null) {
- AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex);
-
- AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, entityHeader), "delete entity: guid=", guid);
-
- deletionCandidates.add(vertex);
- } else {
- if (LOG.isDebugEnabled()) {
- // Entity does not exist - treat as non-error, since the caller
- // wanted to delete the entity and it's already gone.
- LOG.debug("Deletion request ignored for non-existent entity with guid " + guid);
- }
- }
-
- EntityMutationResponse ret = deleteVertices(deletionCandidates);
-
- // Notify the change listeners
- entityChangeNotifier.onEntitiesMutated(ret, false);
-
- return ret;
- }
-
- @Override
- @GraphTransaction
- public EntityMutationResponse deleteByIds(final List<String> guids) throws AtlasBaseException {
- if (CollectionUtils.isEmpty(guids)) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
- }
-
- Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
-
- for (String guid : guids) {
- AtlasVertex vertex = AtlasGraphUtilsV1.findByGuid(guid);
-
- if (vertex == null) {
- if (LOG.isDebugEnabled()) {
- // Entity does not exist - treat as non-error, since the caller
- // wanted to delete the entity and it's already gone.
- LOG.debug("Deletion request ignored for non-existent entity with guid " + guid);
- }
-
- continue;
- }
-
- AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex);
-
- AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, entityHeader), "delete entity: guid=", guid);
-
- deletionCandidates.add(vertex);
- }
-
- if (deletionCandidates.isEmpty()) {
- LOG.info("No deletion candidate entities were found for guids %s", guids);
- }
-
- EntityMutationResponse ret = deleteVertices(deletionCandidates);
-
- // Notify the change listeners
- entityChangeNotifier.onEntitiesMutated(ret, false);
-
- return ret;
- }
-
- @Override
- @GraphTransaction
- public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException {
- if (MapUtils.isEmpty(uniqAttributes)) {
- throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, uniqAttributes.toString());
- }
-
- Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
- AtlasVertex vertex = AtlasGraphUtilsV1.findByUniqueAttributes(entityType, uniqAttributes);
-
- if (vertex != null) {
- AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex);
-
- AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, entityHeader), "delete entity: typeName=", entityType.getTypeName(), ", uniqueAttributes=", uniqAttributes);
-
- deletionCandidates.add(vertex);
- } else {
- if (LOG.isDebugEnabled()) {
- // Entity does not exist - treat as non-error, since the caller
- // wanted to delete the entity and it's already gone.
- LOG.debug("Deletion request ignored for non-existent entity with uniqueAttributes " + uniqAttributes);
- }
- }
-
- EntityMutationResponse ret = deleteVertices(deletionCandidates);
-
- // Notify the change listeners
- entityChangeNotifier.onEntitiesMutated(ret, false);
-
- return ret;
- }
-
- @Override
- @GraphTransaction
- public String getGuidByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException{
- return AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, uniqAttributes);
- }
-
- @Override
- @GraphTransaction
- public void addClassifications(final String guid, final List<AtlasClassification> classifications) throws AtlasBaseException {
- if (StringUtils.isEmpty(guid)) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
- }
-
- if (CollectionUtils.isEmpty(classifications)) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding classifications={} to entity={}", classifications, guid);
- }
-
- AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
-
- for (AtlasClassification classification : classifications) {
- AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, entityHeader, classification),
- "add classification: guid=", guid, ", classification=", classification.getTypeName());
- }
-
- GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
- for (AtlasClassification classification : classifications) {
- validateAndNormalize(classification);
- }
-
- // validate if entity, not already associated with classifications
- validateEntityAssociations(guid, classifications);
-
- entityGraphMapper.addClassifications(new EntityMutationContext(), guid, classifications);
- }
-
- @Override
- @GraphTransaction
- public void updateClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Updating classifications={} for entity={}", classifications, guid);
- }
-
- if (StringUtils.isEmpty(guid)) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid not specified");
- }
-
- if (CollectionUtils.isEmpty(classifications)) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
- }
-
- AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
-
- for (AtlasClassification classification : classifications) {
- AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION, entityHeader, classification), "update classification: guid=", guid, ", classification=", classification.getTypeName());
- }
-
- GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
-
- entityGraphMapper.updateClassifications(new EntityMutationContext(), guid, classifications);
- }
-
- @Override
- @GraphTransaction
- public void addClassification(final List<String> guids, final AtlasClassification classification) throws AtlasBaseException {
- if (CollectionUtils.isEmpty(guids)) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
- }
- if (classification == null) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classification not specified");
- }
-
- for (String guid : guids) {
- AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
-
- AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, entityHeader, classification),
- "add classification: guid=", guid, ", classification=", classification.getTypeName());
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding classification={} to entities={}", classification, guids);
- }
-
- GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guids);
-
- validateAndNormalize(classification);
-
- List<AtlasClassification> classifications = Collections.singletonList(classification);
-
- for (String guid : guids) {
- validateEntityAssociations(guid, classifications);
-
- entityGraphMapper.addClassifications(new EntityMutationContext(), guid, classifications);
- }
- }
-
- @Override
- @GraphTransaction
- public void deleteClassifications(final String guid, final List<String> classificationNames) throws AtlasBaseException {
- if (StringUtils.isEmpty(guid)) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
- }
- if (CollectionUtils.isEmpty(classificationNames)) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
- }
-
- AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
-
- for (String classification : classificationNames) {
- AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_REMOVE_CLASSIFICATION, entityHeader, new AtlasClassification(classification)), "remove classification: guid=", guid, ", classification=", classification);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Deleting classifications={} from entity={}", classificationNames, guid);
- }
-
- GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
-
- entityGraphMapper.deleteClassifications(guid, classificationNames);
- }
-
-
- @GraphTransaction
- public List<AtlasClassification> retrieveClassifications(String guid) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Retriving classifications for entity={}", guid);
- }
-
- AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
-
- return entityHeader.getClassifications();
- }
-
-
- @Override
- @GraphTransaction
- public List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Getting classifications for entity={}", guid);
- }
-
- AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
-
- AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ_CLASSIFICATION, entityHeader), "get classifications: guid=", guid);
-
- return entityHeader.getClassifications();
- }
-
- @Override
- @GraphTransaction
- public AtlasClassification getClassification(String guid, String classificationName) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Getting classifications for entities={}", guid);
- }
-
- AtlasClassification ret = null;
- AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
-
- if (CollectionUtils.isNotEmpty(entityHeader.getClassifications())) {
- AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ_CLASSIFICATION, entityHeader), "get classification: guid=", guid, ", classification=", classificationName);
-
- for (AtlasClassification classification : entityHeader.getClassifications()) {
- if (!StringUtils.equalsIgnoreCase(classification.getTypeName(), classificationName)) {
- continue;
- }
-
- if (StringUtils.isEmpty(classification.getEntityGuid()) || StringUtils.equalsIgnoreCase(classification.getEntityGuid(), guid)) {
- ret = classification;
- break;
- } else if (ret == null) {
- ret = classification;
- }
- }
- }
-
- if (ret == null) {
- throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName);
- }
-
- return ret;
- }
-
- private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> createOrUpdate()");
- }
-
- if (entityStream == null || !entityStream.hasNext()) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
- }
-
- AtlasPerfTracer perf = null;
-
- if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
- perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "createOrUpdate()");
- }
-
- try {
- final boolean isImport = entityStream instanceof EntityImportStream;
- final EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper, isPartialUpdate);
-
- // Check if authorized to create entities
- if (!isImport && CollectionUtils.isNotEmpty(context.getCreatedEntities())) {
- for (AtlasEntity entity : context.getCreatedEntities()) {
- AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, new AtlasEntityHeader(entity)),
- "create entity: type=", entity.getTypeName());
- }
- }
-
- // for existing entities, skip update if incoming entity doesn't have any change
- if (CollectionUtils.isNotEmpty(context.getUpdatedEntities())) {
- List<AtlasEntity> entitiesToSkipUpdate = null;
-
- for (AtlasEntity entity : context.getUpdatedEntities()) {
- String guid = entity.getGuid();
- AtlasVertex vertex = context.getVertex(guid);
- AtlasEntity entityInStore = entityRetriever.toAtlasEntity(vertex);
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
-
- if (!AtlasEntityUtil.hasAnyAttributeUpdate(entityType, entity, entityInStore)) {
- // if classifications are to be replaced as well, then skip updates only when no change in classifications as well
- if (!replaceClassifications || Objects.equals(entity.getClassifications(), entityInStore.getClassifications())) {
- if (entitiesToSkipUpdate == null) {
- entitiesToSkipUpdate = new ArrayList<>();
- }
-
- entitiesToSkipUpdate.add(entity);
- }
- }
- }
-
- if (entitiesToSkipUpdate != null) {
- context.getUpdatedEntities().removeAll(entitiesToSkipUpdate);
- }
-
- // Check if authorized to update entities
- if (!isImport) {
- for (AtlasEntity entity : context.getUpdatedEntities()) {
- AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, new AtlasEntityHeader(entity)),
- "update entity: type=", entity.getTypeName());
- }
- }
- }
-
- EntityMutationResponse ret = entityGraphMapper.mapAttributesAndClassifications(context, isPartialUpdate, replaceClassifications);
-
- ret.setGuidAssignments(context.getGuidAssignments());
-
- // Notify the change listeners
- entityChangeNotifier.onEntitiesMutated(ret, isImport);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== createOrUpdate()");
- }
-
- return ret;
- } finally {
- AtlasPerfTracer.log(perf);
- }
- }
-
- private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException {
- EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV1(typeRegistry, entityStream);
- EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities();
- EntityMutationContext context = new EntityMutationContext(discoveryContext);
-
- for (String guid : discoveryContext.getReferencedGuids()) {
- AtlasVertex vertex = discoveryContext.getResolvedEntityVertex(guid);
- AtlasEntity entity = entityStream.getByGuid(guid);
-
- if (entity != null) { // entity would be null if guid is not in the stream but referenced by an entity in the stream
- if (vertex != null) {
- if (!isPartialUpdate) {
- graphDiscoverer.validateAndNormalize(entity);
- } else {
- graphDiscoverer.validateAndNormalizeForUpdate(entity);
- }
-
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
-
- String guidVertex = AtlasGraphUtilsV1.getIdFromVertex(vertex);
-
- if (!StringUtils.equals(guidVertex, guid)) { // if entity was found by unique attribute
- entity.setGuid(guidVertex);
- }
-
- context.addUpdated(guid, entity, entityType, vertex);
- } else {
- graphDiscoverer.validateAndNormalize(entity);
-
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
-
-
- //Create vertices which do not exist in the repository
- if ((entityStream instanceof EntityImportStream) && AtlasTypeUtil.isAssignedGuid(entity.getGuid())) {
- vertex = entityGraphMapper.createVertexWithGuid(entity, entity.getGuid());
- } else {
- vertex = entityGraphMapper.createVertex(entity);
- }
-
- discoveryContext.addResolvedGuid(guid, vertex);
-
- String generatedGuid = AtlasGraphUtilsV1.getIdFromVertex(vertex);
-
- entity.setGuid(generatedGuid);
-
- context.addCreated(guid, entity, entityType, vertex);
- }
-
- // during import, update the system attributes
- if (entityStream instanceof EntityImportStream) {
- entityGraphMapper.updateSystemAttributes(vertex, entity);
- }
- }
- }
-
- return context;
- }
-
- private EntityMutationResponse deleteVertices(Collection<AtlasVertex> deletionCandidates) throws AtlasBaseException {
- EntityMutationResponse response = new EntityMutationResponse();
- RequestContextV1 req = RequestContextV1.get();
-
- deleteHandler.deleteEntities(deletionCandidates); // this will update req with list of deleted/updated entities
-
- for (AtlasObjectId entity : req.getDeletedEntities()) {
- response.addEntity(DELETE, entity);
- }
-
- for (AtlasObjectId entity : req.getUpdatedEntities()) {
- response.addEntity(UPDATE, entity);
- }
-
- return response;
- }
-
- private void validateAndNormalize(AtlasClassification classification) throws AtlasBaseException {
- AtlasClassificationType type = typeRegistry.getClassificationTypeByName(classification.getTypeName());
-
- if (type == null) {
- throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classification.getTypeName());
- }
-
- List<String> messages = new ArrayList<>();
-
- type.validateValue(classification, classification.getTypeName(), messages);
-
- if (!messages.isEmpty()) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, messages);
- }
-
- type.getNormalizedValue(classification);
- }
-
- /**
- * Validate if classification is not already associated with the entities
- *
- * @param guid unique entity id
- * @param classifications list of classifications to be associated
- */
- private void validateEntityAssociations(String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
- List<String> entityClassifications = getClassificationNames(guid);
- String entityTypeName = AtlasGraphUtilsV1.getTypeNameFromGuid(guid);
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
-
- for (AtlasClassification classification : classifications) {
- String newClassification = classification.getTypeName();
-
- if (CollectionUtils.isNotEmpty(entityClassifications) && entityClassifications.contains(newClassification)) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "entity: " + guid +
- ", already associated with classification: " + newClassification);
- }
-
- // for each classification, check whether there are entities it should be restricted to
- AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(newClassification);
-
- if (!classificationType.canApplyToEntityType(entityType)) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_ENTITY_FOR_CLASSIFICATION, guid, entityTypeName, newClassification);
- }
- }
- }
-
- private List<String> getClassificationNames(String guid) throws AtlasBaseException {
- List<String> ret = null;
- List<AtlasClassification> classifications = retrieveClassifications(guid);
-
- if (CollectionUtils.isNotEmpty(classifications)) {
- ret = new ArrayList<>();
-
- for (AtlasClassification classification : classifications) {
- String entityGuid = classification.getEntityGuid();
-
- if (StringUtils.isEmpty(entityGuid) || StringUtils.equalsIgnoreCase(guid, entityGuid)) {
- ret.add(classification.getTypeName());
- }
- }
- }
-
- return ret;
- }
-}