You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ap...@apache.org on 2018/05/25 03:52:50 UTC
[12/20] atlas git commit: ATLAS-2490: updates to make usage of v1/v2
in class names consistent
http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
new file mode 100644
index 0000000..5e33cf5
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -0,0 +1,780 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.GraphTransactionInterceptor;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.authorize.AtlasEntityAccessRequest;
+import org.apache.atlas.authorize.AtlasPrivilege;
+import org.apache.atlas.authorize.AtlasAuthorizationUtils;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.*;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.EntityGraphDiscovery;
+import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.AtlasEntityUtil;
+import org.apache.atlas.utils.AtlasPerfTracer;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.*;
+
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
+import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
+
+
+@Component
+public class AtlasEntityStoreV2 implements AtlasEntityStore {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class);
+ private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("store.EntityStore");
+
+
+ private final DeleteHandlerV1 deleteHandler;
+ private final AtlasTypeRegistry typeRegistry;
+ private final AtlasEntityChangeNotifier entityChangeNotifier;
+ private final EntityGraphMapper entityGraphMapper;
+ private final EntityGraphRetriever entityRetriever;
+
+ @Inject
+ public AtlasEntityStoreV2(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry,
+ AtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper) {
+ this.deleteHandler = deleteHandler;
+ this.typeRegistry = typeRegistry;
+ this.entityChangeNotifier = entityChangeNotifier;
+ this.entityGraphMapper = entityGraphMapper;
+ this.entityRetriever = new EntityGraphRetriever(typeRegistry);
+ }
+
+ @Override
+ @GraphTransaction
+ public List<String> getEntityGUIDS(final String typename) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> getEntityGUIDS({})", typename);
+ }
+
+ if (StringUtils.isEmpty(typename) || !typeRegistry.isRegisteredType(typename)) {
+ throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME);
+ }
+
+ List<String> ret = AtlasGraphUtilsV2.findEntityGUIDsByType(typename);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== getEntityGUIDS({})", typename);
+ }
+
+ return ret;
+ }
+
+ @Override
+ @GraphTransaction
+ public AtlasEntityWithExtInfo getById(String guid) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> getById({})", guid);
+ }
+
+ AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(guid);
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(ret.getEntity())), "read entity: guid=", guid);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== getById({}): {}", guid, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ @GraphTransaction
+ public AtlasEntitiesWithExtInfo getByIds(List<String> guids) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> getByIds({})", guids);
+ }
+
+ AtlasEntitiesWithExtInfo ret = entityRetriever.toAtlasEntitiesWithExtInfo(guids);
+
+ // verify authorization to read the entities
+ if(ret != null){
+ for(String guid : guids){
+ AtlasEntity entity = ret.getEntity(guid);
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(entity)), "read entity: guid=", guid);
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== getByIds({}): {}", guids, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ @GraphTransaction
+ public AtlasEntityWithExtInfo getByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> getByUniqueAttribute({}, {})", entityType.getTypeName(), uniqAttributes);
+ }
+
+ AtlasVertex entityVertex = AtlasGraphUtilsV2.getVertexByUniqueAttributes(entityType, uniqAttributes);
+ AtlasEntityWithExtInfo ret = entityRetriever.toAtlasEntityWithExtInfo(entityVertex);
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(ret.getEntity())), "read entity: typeName=", entityType.getTypeName(), ", uniqueAttributes=", uniqAttributes);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== getByUniqueAttribute({}, {}): {}", entityType.getTypeName(), uniqAttributes, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ @GraphTransaction
+ public EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate) throws AtlasBaseException {
+ return createOrUpdate(entityStream, isPartialUpdate, false);
+ }
+
+ @Override
+ @GraphTransaction
+ public EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException {
+ return createOrUpdate(entityStream, false, true);
+ }
+
+ @Override
+ @GraphTransaction
+ public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> updateEntity({}, {}, {})", objectId, updatedEntityInfo, isPartialUpdate);
+ }
+
+ if (objectId == null || updatedEntityInfo == null || updatedEntityInfo.getEntity() == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "null entity-id/entity");
+ }
+
+ final String guid;
+
+ if (AtlasTypeUtil.isAssignedGuid(objectId.getGuid())) {
+ guid = objectId.getGuid();
+ } else {
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(objectId.getTypeName());
+
+ if (entityType == null) {
+ throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, objectId.getTypeName());
+ }
+
+ guid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(typeRegistry.getEntityTypeByName(objectId.getTypeName()), objectId.getUniqueAttributes());
+ }
+
+ AtlasEntity entity = updatedEntityInfo.getEntity();
+
+ entity.setGuid(guid);
+
+ return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), isPartialUpdate, false);
+ }
+
+ @Override
+ @GraphTransaction
+ public EntityMutationResponse updateByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes,
+ AtlasEntityWithExtInfo updatedEntityInfo) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> updateByUniqueAttributes({}, {})", entityType.getTypeName(), uniqAttributes);
+ }
+
+ if (updatedEntityInfo == null || updatedEntityInfo.getEntity() == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entity to update.");
+ }
+
+ String guid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, uniqAttributes);
+ AtlasEntity entity = updatedEntityInfo.getEntity();
+
+ entity.setGuid(guid);
+
+ return createOrUpdate(new AtlasEntityStream(updatedEntityInfo), true, false);
+ }
+
+ @Override
+ @GraphTransaction
+ public EntityMutationResponse updateEntityAttributeByGuid(String guid, String attrName, Object attrValue)
+ throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> updateEntityAttributeByGuid({}, {}, {})", guid, attrName, attrValue);
+ }
+
+ AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
+ AtlasEntityType entityType = (AtlasEntityType) typeRegistry.getType(entity.getTypeName());
+ AtlasAttribute attr = entityType.getAttribute(attrName);
+
+ if (attr == null) {
+ throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_ATTRIBUTE, attrName, entity.getTypeName());
+ }
+
+ AtlasType attrType = attr.getAttributeType();
+ AtlasEntity updateEntity = new AtlasEntity();
+
+ updateEntity.setGuid(guid);
+ updateEntity.setTypeName(entity.getTypeName());
+
+ switch (attrType.getTypeCategory()) {
+ case PRIMITIVE:
+ updateEntity.setAttribute(attrName, attrValue);
+ break;
+ case OBJECT_ID_TYPE:
+ AtlasObjectId objId;
+
+ if (attrValue instanceof String) {
+ objId = new AtlasObjectId((String) attrValue, attr.getAttributeDef().getTypeName());
+ } else {
+ objId = (AtlasObjectId) attrType.getNormalizedValue(attrValue);
+ }
+
+ updateEntity.setAttribute(attrName, objId);
+ break;
+
+ default:
+ throw new AtlasBaseException(AtlasErrorCode.ATTRIBUTE_UPDATE_NOT_SUPPORTED, attrName, attrType.getTypeName());
+ }
+
+ return createOrUpdate(new AtlasEntityStream(updateEntity), true, false);
+ }
+
+ @Override
+ @GraphTransaction
+ public EntityMutationResponse deleteById(final String guid) throws AtlasBaseException {
+ if (StringUtils.isEmpty(guid)) {
+ throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+ }
+
+ Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
+ AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid);
+
+ if (vertex != null) {
+ AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex);
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, entityHeader), "delete entity: guid=", guid);
+
+ deletionCandidates.add(vertex);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ // Entity does not exist - treat as non-error, since the caller
+ // wanted to delete the entity and it's already gone.
+ LOG.debug("Deletion request ignored for non-existent entity with guid " + guid);
+ }
+ }
+
+ EntityMutationResponse ret = deleteVertices(deletionCandidates);
+
+ // Notify the change listeners
+ entityChangeNotifier.onEntitiesMutated(ret, false);
+
+ return ret;
+ }
+
+ @Override
+ @GraphTransaction
+ public EntityMutationResponse deleteByIds(final List<String> guids) throws AtlasBaseException {
+ if (CollectionUtils.isEmpty(guids)) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
+ }
+
+ Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
+
+ for (String guid : guids) {
+ AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid);
+
+ if (vertex == null) {
+ if (LOG.isDebugEnabled()) {
+ // Entity does not exist - treat as non-error, since the caller
+ // wanted to delete the entity and it's already gone.
+ LOG.debug("Deletion request ignored for non-existent entity with guid " + guid);
+ }
+
+ continue;
+ }
+
+ AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex);
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, entityHeader), "delete entity: guid=", guid);
+
+ deletionCandidates.add(vertex);
+ }
+
+ if (deletionCandidates.isEmpty()) {
+ LOG.info("No deletion candidate entities were found for guids %s", guids);
+ }
+
+ EntityMutationResponse ret = deleteVertices(deletionCandidates);
+
+ // Notify the change listeners
+ entityChangeNotifier.onEntitiesMutated(ret, false);
+
+ return ret;
+ }
+
+ @Override
+ @GraphTransaction
+ public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException {
+ if (MapUtils.isEmpty(uniqAttributes)) {
+ throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, uniqAttributes.toString());
+ }
+
+ Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
+ AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(entityType, uniqAttributes);
+
+ if (vertex != null) {
+ AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex);
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_DELETE, entityHeader), "delete entity: typeName=", entityType.getTypeName(), ", uniqueAttributes=", uniqAttributes);
+
+ deletionCandidates.add(vertex);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ // Entity does not exist - treat as non-error, since the caller
+ // wanted to delete the entity and it's already gone.
+ LOG.debug("Deletion request ignored for non-existent entity with uniqueAttributes " + uniqAttributes);
+ }
+ }
+
+ EntityMutationResponse ret = deleteVertices(deletionCandidates);
+
+ // Notify the change listeners
+ entityChangeNotifier.onEntitiesMutated(ret, false);
+
+ return ret;
+ }
+
+ @Override
+ @GraphTransaction
+ public String getGuidByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> uniqAttributes) throws AtlasBaseException{
+ return AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, uniqAttributes);
+ }
+
+ @Override
+ @GraphTransaction
+ public void addClassifications(final String guid, final List<AtlasClassification> classifications) throws AtlasBaseException {
+ if (StringUtils.isEmpty(guid)) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
+ }
+
+ if (CollectionUtils.isEmpty(classifications)) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding classifications={} to entity={}", classifications, guid);
+ }
+
+ AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
+
+ for (AtlasClassification classification : classifications) {
+ AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, entityHeader, classification),
+ "add classification: guid=", guid, ", classification=", classification.getTypeName());
+ }
+
+ GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
+ for (AtlasClassification classification : classifications) {
+ validateAndNormalize(classification);
+ }
+
+ // validate if entity, not already associated with classifications
+ validateEntityAssociations(guid, classifications);
+
+ entityGraphMapper.addClassifications(new EntityMutationContext(), guid, classifications);
+ }
+
+ @Override
+ @GraphTransaction
+ public void updateClassifications(String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Updating classifications={} for entity={}", classifications, guid);
+ }
+
+ if (StringUtils.isEmpty(guid)) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid not specified");
+ }
+
+ if (CollectionUtils.isEmpty(classifications)) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
+ }
+
+ AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
+
+ for (AtlasClassification classification : classifications) {
+ AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE_CLASSIFICATION, entityHeader, classification), "update classification: guid=", guid, ", classification=", classification.getTypeName());
+ }
+
+ GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
+
+ entityGraphMapper.updateClassifications(new EntityMutationContext(), guid, classifications);
+ }
+
+ @Override
+ @GraphTransaction
+ public void addClassification(final List<String> guids, final AtlasClassification classification) throws AtlasBaseException {
+ if (CollectionUtils.isEmpty(guids)) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
+ }
+ if (classification == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classification not specified");
+ }
+
+ for (String guid : guids) {
+ AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_ADD_CLASSIFICATION, entityHeader, classification),
+ "add classification: guid=", guid, ", classification=", classification.getTypeName());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding classification={} to entities={}", classification, guids);
+ }
+
+ GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guids);
+
+ validateAndNormalize(classification);
+
+ List<AtlasClassification> classifications = Collections.singletonList(classification);
+
+ for (String guid : guids) {
+ validateEntityAssociations(guid, classifications);
+
+ entityGraphMapper.addClassifications(new EntityMutationContext(), guid, classifications);
+ }
+ }
+
+ @Override
+ @GraphTransaction
+ public void deleteClassifications(final String guid, final List<String> classificationNames) throws AtlasBaseException {
+ if (StringUtils.isEmpty(guid)) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "Guid(s) not specified");
+ }
+ if (CollectionUtils.isEmpty(classificationNames)) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "classifications(s) not specified");
+ }
+
+ AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
+
+ for (String classification : classificationNames) {
+ AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_REMOVE_CLASSIFICATION, entityHeader, new AtlasClassification(classification)), "remove classification: guid=", guid, ", classification=", classification);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Deleting classifications={} from entity={}", classificationNames, guid);
+ }
+
+ GraphTransactionInterceptor.lockObjectAndReleasePostCommit(guid);
+
+ entityGraphMapper.deleteClassifications(guid, classificationNames);
+ }
+
+
+ @GraphTransaction
+ public List<AtlasClassification> retrieveClassifications(String guid) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Retriving classifications for entity={}", guid);
+ }
+
+ AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
+
+ return entityHeader.getClassifications();
+ }
+
+
+ @Override
+ @GraphTransaction
+ public List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Getting classifications for entity={}", guid);
+ }
+
+ AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ_CLASSIFICATION, entityHeader), "get classifications: guid=", guid);
+
+ return entityHeader.getClassifications();
+ }
+
+ @Override
+ @GraphTransaction
+ public AtlasClassification getClassification(String guid, String classificationName) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Getting classifications for entities={}", guid);
+ }
+
+ AtlasClassification ret = null;
+ AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid);
+
+ if (CollectionUtils.isNotEmpty(entityHeader.getClassifications())) {
+ AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ_CLASSIFICATION, entityHeader), "get classification: guid=", guid, ", classification=", classificationName);
+
+ for (AtlasClassification classification : entityHeader.getClassifications()) {
+ if (!StringUtils.equalsIgnoreCase(classification.getTypeName(), classificationName)) {
+ continue;
+ }
+
+ if (StringUtils.isEmpty(classification.getEntityGuid()) || StringUtils.equalsIgnoreCase(classification.getEntityGuid(), guid)) {
+ ret = classification;
+ break;
+ } else if (ret == null) {
+ ret = classification;
+ }
+ }
+ }
+
+ if (ret == null) {
+ throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName);
+ }
+
+ return ret;
+ }
+
+ private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> createOrUpdate()");
+ }
+
+ if (entityStream == null || !entityStream.hasNext()) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
+ }
+
+ AtlasPerfTracer perf = null;
+
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "createOrUpdate()");
+ }
+
+ try {
+ final boolean isImport = entityStream instanceof EntityImportStream;
+ final EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper, isPartialUpdate);
+
+ // Check if authorized to create entities
+ if (!isImport && CollectionUtils.isNotEmpty(context.getCreatedEntities())) {
+ for (AtlasEntity entity : context.getCreatedEntities()) {
+ AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_CREATE, new AtlasEntityHeader(entity)),
+ "create entity: type=", entity.getTypeName());
+ }
+ }
+
+ // for existing entities, skip update if incoming entity doesn't have any change
+ if (CollectionUtils.isNotEmpty(context.getUpdatedEntities())) {
+ List<AtlasEntity> entitiesToSkipUpdate = null;
+
+ for (AtlasEntity entity : context.getUpdatedEntities()) {
+ String guid = entity.getGuid();
+ AtlasVertex vertex = context.getVertex(guid);
+ AtlasEntity entityInStore = entityRetriever.toAtlasEntity(vertex);
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+ if (!AtlasEntityUtil.hasAnyAttributeUpdate(entityType, entity, entityInStore)) {
+ // if classifications are to be replaced as well, then skip updates only when no change in classifications as well
+ if (!replaceClassifications || Objects.equals(entity.getClassifications(), entityInStore.getClassifications())) {
+ if (entitiesToSkipUpdate == null) {
+ entitiesToSkipUpdate = new ArrayList<>();
+ }
+
+ entitiesToSkipUpdate.add(entity);
+ }
+ }
+ }
+
+ if (entitiesToSkipUpdate != null) {
+ context.getUpdatedEntities().removeAll(entitiesToSkipUpdate);
+ }
+
+ // Check if authorized to update entities
+ if (!isImport) {
+ for (AtlasEntity entity : context.getUpdatedEntities()) {
+ AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_UPDATE, new AtlasEntityHeader(entity)),
+ "update entity: type=", entity.getTypeName());
+ }
+ }
+ }
+
+ EntityMutationResponse ret = entityGraphMapper.mapAttributesAndClassifications(context, isPartialUpdate, replaceClassifications);
+
+ ret.setGuidAssignments(context.getGuidAssignments());
+
+ // Notify the change listeners
+ entityChangeNotifier.onEntitiesMutated(ret, isImport);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== createOrUpdate()");
+ }
+
+ return ret;
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
+ }
+
+ private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException {
+ EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV2(typeRegistry, entityStream);
+ EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities();
+ EntityMutationContext context = new EntityMutationContext(discoveryContext);
+
+ for (String guid : discoveryContext.getReferencedGuids()) {
+ AtlasVertex vertex = discoveryContext.getResolvedEntityVertex(guid);
+ AtlasEntity entity = entityStream.getByGuid(guid);
+
+ if (entity != null) { // entity would be null if guid is not in the stream but referenced by an entity in the stream
+ if (vertex != null) {
+ if (!isPartialUpdate) {
+ graphDiscoverer.validateAndNormalize(entity);
+ } else {
+ graphDiscoverer.validateAndNormalizeForUpdate(entity);
+ }
+
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+ String guidVertex = AtlasGraphUtilsV2.getIdFromVertex(vertex);
+
+ if (!StringUtils.equals(guidVertex, guid)) { // if entity was found by unique attribute
+ entity.setGuid(guidVertex);
+ }
+
+ context.addUpdated(guid, entity, entityType, vertex);
+ } else {
+ graphDiscoverer.validateAndNormalize(entity);
+
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+
+ //Create vertices which do not exist in the repository
+ if ((entityStream instanceof EntityImportStream) && AtlasTypeUtil.isAssignedGuid(entity.getGuid())) {
+ vertex = entityGraphMapper.createVertexWithGuid(entity, entity.getGuid());
+ } else {
+ vertex = entityGraphMapper.createVertex(entity);
+ }
+
+ discoveryContext.addResolvedGuid(guid, vertex);
+
+ String generatedGuid = AtlasGraphUtilsV2.getIdFromVertex(vertex);
+
+ entity.setGuid(generatedGuid);
+
+ context.addCreated(guid, entity, entityType, vertex);
+ }
+
+ // during import, update the system attributes
+ if (entityStream instanceof EntityImportStream) {
+ entityGraphMapper.updateSystemAttributes(vertex, entity);
+ }
+ }
+ }
+
+ return context;
+ }
+
+ private EntityMutationResponse deleteVertices(Collection<AtlasVertex> deletionCandidates) throws AtlasBaseException {
+ EntityMutationResponse response = new EntityMutationResponse();
+ RequestContext req = RequestContext.get();
+
+ deleteHandler.deleteEntities(deletionCandidates); // this will update req with list of deleted/updated entities
+
+ for (AtlasObjectId entity : req.getDeletedEntities()) {
+ response.addEntity(DELETE, entity);
+ }
+
+ for (AtlasObjectId entity : req.getUpdatedEntities()) {
+ response.addEntity(UPDATE, entity);
+ }
+
+ return response;
+ }
+
+ private void validateAndNormalize(AtlasClassification classification) throws AtlasBaseException {
+ AtlasClassificationType type = typeRegistry.getClassificationTypeByName(classification.getTypeName());
+
+ if (type == null) {
+ throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classification.getTypeName());
+ }
+
+ List<String> messages = new ArrayList<>();
+
+ type.validateValue(classification, classification.getTypeName(), messages);
+
+ if (!messages.isEmpty()) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, messages);
+ }
+
+ type.getNormalizedValue(classification);
+ }
+
+ /**
+ * Validate if classification is not already associated with the entities
+ *
+ * @param guid unique entity id
+ * @param classifications list of classifications to be associated
+ */
+ private void validateEntityAssociations(String guid, List<AtlasClassification> classifications) throws AtlasBaseException {
+ List<String> entityClassifications = getClassificationNames(guid);
+ String entityTypeName = AtlasGraphUtilsV2.getTypeNameFromGuid(guid);
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
+
+ for (AtlasClassification classification : classifications) {
+ String newClassification = classification.getTypeName();
+
+ if (CollectionUtils.isNotEmpty(entityClassifications) && entityClassifications.contains(newClassification)) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "entity: " + guid +
+ ", already associated with classification: " + newClassification);
+ }
+
+ // for each classification, check whether there are entities it should be restricted to
+ AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(newClassification);
+
+ if (!classificationType.canApplyToEntityType(entityType)) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_ENTITY_FOR_CLASSIFICATION, guid, entityTypeName, newClassification);
+ }
+ }
+ }
+
+ private List<String> getClassificationNames(String guid) throws AtlasBaseException {
+ List<String> ret = null;
+ List<AtlasClassification> classifications = retrieveClassifications(guid);
+
+ if (CollectionUtils.isNotEmpty(classifications)) {
+ ret = new ArrayList<>();
+
+ for (AtlasClassification classification : classifications) {
+ String entityGuid = classification.getEntityGuid();
+
+ if (StringUtils.isEmpty(entityGuid) || StringUtils.equalsIgnoreCase(guid, entityGuid)) {
+ ret.add(classification.getTypeName());
+ }
+ }
+ }
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java
new file mode 100644
index 0000000..75a7e61
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStream.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+
+import java.util.Iterator;
+
+public class AtlasEntityStream implements EntityStream {
+ protected final AtlasEntitiesWithExtInfo entitiesWithExtInfo;
+ protected final EntityStream entityStream;
+ private Iterator<AtlasEntity> iterator;
+
+
+ public AtlasEntityStream(AtlasEntity entity) {
+ this(new AtlasEntitiesWithExtInfo(entity));
+ }
+
+ public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo) {
+ this(new AtlasEntitiesWithExtInfo(entityWithExtInfo));
+ }
+
+ public AtlasEntityStream(AtlasEntitiesWithExtInfo entitiesWithExtInfo) {
+ this.entitiesWithExtInfo = entitiesWithExtInfo;
+ this.iterator = this.entitiesWithExtInfo.getEntities().iterator();
+ this.entityStream = null;
+ }
+
+ public AtlasEntityStream(AtlasEntity entity, EntityStream entityStream) {
+ this.entitiesWithExtInfo = new AtlasEntitiesWithExtInfo(entity);
+ this.iterator = this.entitiesWithExtInfo.getEntities().iterator();
+ this.entityStream = entityStream;
+ }
+
+ public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream entityStream) {
+ this.entitiesWithExtInfo = new AtlasEntitiesWithExtInfo(entityWithExtInfo);
+ this.iterator = this.entitiesWithExtInfo.getEntities().iterator();
+ this.entityStream = entityStream;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public AtlasEntity next() {
+ return iterator.hasNext() ? iterator.next() : null;
+ }
+
+ @Override
+ public void reset() {
+ this.iterator = entitiesWithExtInfo.getEntities().iterator();
+ }
+
+ @Override
+ public AtlasEntity getByGuid(String guid) {
+ return entityStream != null ? entityStream.getByGuid(guid) : entitiesWithExtInfo.getEntity(guid);
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer("AtlasEntityStream{");
+
+ sb.append("entitiesWithExtInfo=").append(entitiesWithExtInfo);
+ sb.append(", iterator=").append(iterator);
+ sb.append('}');
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java
new file mode 100644
index 0000000..6bf962e
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStreamForImport.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+
+public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream {
+ private int currentPosition = 0;
+
+ public AtlasEntityStreamForImport(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream entityStream) {
+ super(entityWithExtInfo, entityStream);
+ }
+
+ @Override
+ public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+ currentPosition++;
+ AtlasEntity entity = next();
+
+ return entity != null ? new AtlasEntityWithExtInfo(entity, super.entitiesWithExtInfo) : null;
+ }
+
+ @Override
+ public AtlasEntity getByGuid(String guid) {
+ AtlasEntity ent = super.entitiesWithExtInfo.getEntity(guid);
+
+ if(ent == null && entityStream != null) {
+ return entityStream.getByGuid(guid);
+ }
+
+ return ent;
+ }
+
+ @Override
+ public int size() {
+ return 1;
+ }
+
+ @Override
+ public void setPosition(int position) {
+ // not applicable for a single entity stream
+ }
+
+ @Override
+ public int getPosition() {
+ return currentPosition;
+ }
+
+ @Override
+ public void setPositionUsingEntityGuid(String guid) {
+ }
+
+ @Override
+ public void onImportComplete(String guid) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
new file mode 100644
index 0000000..7d7233f
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.typedef.AtlasEnumDef;
+import org.apache.atlas.model.typedef.AtlasEnumDef.AtlasEnumElementDef;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.atlas.authorize.AtlasPrivilege;
+import org.apache.atlas.authorize.AtlasTypeAccessRequest;
+import org.apache.atlas.authorize.AtlasAuthorizationUtils;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * EnumDef store in v1 format.
+ */
+class AtlasEnumDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasEnumDef> {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasEnumDefStoreV2.class);
+
+ public AtlasEnumDefStoreV2(AtlasTypeDefGraphStoreV2 typeDefStore, AtlasTypeRegistry typeRegistry) {
+ super(typeDefStore, typeRegistry);
+ }
+
+ @Override
+ public AtlasVertex preCreate(AtlasEnumDef enumDef) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasEnumDefStoreV1.preCreate({})", enumDef);
+ }
+
+ validateType(enumDef);
+
+ AtlasVertex vertex = typeDefStore.findTypeVertexByName(enumDef.getName());
+
+ if (vertex != null) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, enumDef.getName());
+ }
+
+ vertex = typeDefStore.createTypeVertex(enumDef);
+
+ toVertex(enumDef, vertex);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasEnumDefStoreV1.preCreate({}): {}", enumDef, vertex);
+ }
+
+ return vertex;
+ }
+
+ @Override
+ public AtlasEnumDef create(AtlasEnumDef enumDef, AtlasVertex preCreateResult) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasEnumDefStoreV1.create({}, {})", enumDef, preCreateResult);
+ }
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_CREATE, enumDef), "create enum-def ", enumDef.getName());
+
+ AtlasVertex vertex = (preCreateResult == null) ? preCreate(enumDef) : preCreateResult;
+
+ AtlasEnumDef ret = toEnumDef(vertex);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasEntityDefStoreV1.create({}, {}): {}", enumDef, preCreateResult, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public List<AtlasEnumDef> getAll() throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasEnumDefStoreV1.getAll()");
+ }
+
+ List<AtlasEnumDef> ret = new ArrayList<>();
+
+ Iterator<AtlasVertex> vertices = typeDefStore.findTypeVerticesByCategory(TypeCategory.ENUM);
+ while (vertices.hasNext()) {
+ ret.add(toEnumDef(vertices.next()));
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasEnumDefStoreV1.getAll(): count={}", ret.size());
+ }
+
+ return ret;
+ }
+
+ @Override
+ public AtlasEnumDef getByName(String name) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasEnumDefStoreV1.getByName({})", name);
+ }
+
+ AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.ENUM);
+
+ if (vertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
+ }
+
+ vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, TypeCategory.class);
+
+ AtlasEnumDef ret = toEnumDef(vertex);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasEnumDefStoreV1.getByName({}): {}", name, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public AtlasEnumDef getByGuid(String guid) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasEnumDefStoreV1.getByGuid({})", guid);
+ }
+
+ AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.ENUM);
+
+ if (vertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
+ }
+
+ AtlasEnumDef ret = toEnumDef(vertex);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasEnumDefStoreV1.getByGuid({}): {}", guid, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public AtlasEnumDef update(AtlasEnumDef enumDef) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasEnumDefStoreV1.update({})", enumDef);
+ }
+
+ validateType(enumDef);
+
+ AtlasEnumDef ret = StringUtils.isNotBlank(enumDef.getGuid()) ? updateByGuid(enumDef.getGuid(), enumDef)
+ : updateByName(enumDef.getName(), enumDef);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasEnumDefStoreV1.update({}): {}", enumDef, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public AtlasEnumDef updateByName(String name, AtlasEnumDef enumDef) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasEnumDefStoreV1.updateByName({}, {})", name, enumDef);
+ }
+
+ AtlasEnumDef existingDef = typeRegistry.getEnumDefByName(name);
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update enum-def ", name);
+
+ validateType(enumDef);
+
+ AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.ENUM);
+
+ if (vertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
+ }
+
+ typeDefStore.updateTypeVertex(enumDef, vertex);
+
+ toVertex(enumDef, vertex);
+
+ AtlasEnumDef ret = toEnumDef(vertex);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasEnumDefStoreV1.updateByName({}, {}): {}", name, enumDef, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public AtlasEnumDef updateByGuid(String guid, AtlasEnumDef enumDef) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasEnumDefStoreV1.updateByGuid({})", guid);
+ }
+
+ AtlasEnumDef existingDef = typeRegistry.getEnumDefByGuid(guid);
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update enum-def ", (existingDef != null ? existingDef.getName() : guid));
+
+ validateType(enumDef);
+
+ AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.ENUM);
+
+ if (vertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
+ }
+
+ typeDefStore.updateTypeVertex(enumDef, vertex);
+
+ toVertex(enumDef, vertex);
+
+ AtlasEnumDef ret = toEnumDef(vertex);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasEnumDefStoreV1.updateByGuid({}): {}", guid, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public AtlasVertex preDeleteByName(String name) throws AtlasBaseException {
+ AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.ENUM);
+
+ if (vertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
+ }
+
+ AtlasEnumDef existingDef = typeRegistry.getEnumDefByName(name);
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete enum-def ", (existingDef != null ? existingDef.getName() : name));
+
+ return vertex;
+ }
+
+ @Override
+ public AtlasVertex preDeleteByGuid(String guid) throws AtlasBaseException {
+ AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.ENUM);
+
+ if (vertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
+ }
+
+ AtlasEnumDef existingDef = typeRegistry.getEnumDefByGuid(guid);
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete enum-def ", (existingDef != null ? existingDef.getName() : guid));
+
+ return vertex;
+ }
+
+ private void toVertex(AtlasEnumDef enumDef, AtlasVertex vertex) throws AtlasBaseException {
+ if (CollectionUtils.isEmpty(enumDef.getElementDefs())) {
+ throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, enumDef.getName(), "values");
+ }
+
+ List<String> values = new ArrayList<>(enumDef.getElementDefs().size());
+
+ for (AtlasEnumElementDef element : enumDef.getElementDefs()) {
+ // Validate the enum element
+ if (StringUtils.isEmpty(element.getValue()) || null == element.getOrdinal()) {
+ throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, enumDef.getName(), "elementValue");
+ }
+
+ String elemKey = AtlasGraphUtilsV2.getTypeDefPropertyKey(enumDef, element.getValue());
+
+ AtlasGraphUtilsV2.setProperty(vertex, elemKey, element.getOrdinal());
+
+ if (StringUtils.isNotBlank(element.getDescription())) {
+ String descKey = AtlasGraphUtilsV2.getTypeDefPropertyKey(elemKey, "description");
+
+ AtlasGraphUtilsV2.setProperty(vertex, descKey, element.getDescription());
+ }
+
+ values.add(element.getValue());
+ }
+ AtlasGraphUtilsV2.setProperty(vertex, AtlasGraphUtilsV2.getTypeDefPropertyKey(enumDef), values);
+ }
+
+ private AtlasEnumDef toEnumDef(AtlasVertex vertex) {
+ AtlasEnumDef ret = null;
+
+ if (vertex != null && typeDefStore.isTypeVertex(vertex, TypeCategory.ENUM)) {
+ ret = toEnumDef(vertex, new AtlasEnumDef(), typeDefStore);
+ }
+
+ return ret;
+ }
+
+ private static AtlasEnumDef toEnumDef(AtlasVertex vertex, AtlasEnumDef enumDef, AtlasTypeDefGraphStoreV2 typeDefStore) {
+ AtlasEnumDef ret = enumDef != null ? enumDef : new AtlasEnumDef();
+
+ typeDefStore.vertexToTypeDef(vertex, ret);
+
+ List<AtlasEnumElementDef> elements = new ArrayList<>();
+ List<String> elemValues = vertex.getProperty(AtlasGraphUtilsV2.getTypeDefPropertyKey(ret), List.class);
+ for (String elemValue : elemValues) {
+ String elemKey = AtlasGraphUtilsV2.getTypeDefPropertyKey(ret, elemValue);
+ String descKey = AtlasGraphUtilsV2.getTypeDefPropertyKey(elemKey, "description");
+
+ Integer ordinal = AtlasGraphUtilsV2.getProperty(vertex, elemKey, Integer.class);
+ String desc = AtlasGraphUtilsV2.getProperty(vertex, descKey, String.class);
+
+ elements.add(new AtlasEnumElementDef(elemValue, desc, ordinal));
+ }
+ ret.setElementDefs(elements);
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
new file mode 100644
index 0000000..e148aa7
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.SortOrder;
+import org.apache.atlas.discovery.SearchProcessor;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.TypeCategory;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graph.GraphHelper;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasElement;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT;
+import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY;
+import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.*;
+
+/**
+ * Utility methods for Graph.
+ */
+public class AtlasGraphUtilsV2 {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasGraphUtilsV2.class);
+
+ public static final String PROPERTY_PREFIX = Constants.INTERNAL_PROPERTY_KEY_PREFIX + "type.";
+ public static final String SUPERTYPE_EDGE_LABEL = PROPERTY_PREFIX + ".supertype";
+ public static final String ENTITYTYPE_EDGE_LABEL = PROPERTY_PREFIX + ".entitytype";
+ public static final String RELATIONSHIPTYPE_EDGE_LABEL = PROPERTY_PREFIX + ".relationshipType";
+ public static final String VERTEX_TYPE = "typeSystem";
+
+ private static boolean USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES = false;
+ private static String INDEX_SEARCH_PREFIX;
+
+ static {
+ try {
+ Configuration conf = ApplicationProperties.get();
+
+ USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES = conf.getBoolean("atlas.use.index.query.to.find.entity.by.unique.attributes", USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES);
+ INDEX_SEARCH_PREFIX = conf.getString(INDEX_SEARCH_VERTEX_PREFIX_PROPERTY, INDEX_SEARCH_VERTEX_PREFIX_DEFAULT);
+ } catch (Exception excp) {
+ LOG.error("Error reading configuration", excp);
+ } finally {
+ LOG.info("atlas.use.index.query.to.find.entity.by.unique.attributes=" + USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES);
+ }
+ }
+
+ public static String getTypeDefPropertyKey(AtlasBaseTypeDef typeDef) {
+ return getTypeDefPropertyKey(typeDef.getName());
+ }
+
+ public static String getTypeDefPropertyKey(AtlasBaseTypeDef typeDef, String child) {
+ return getTypeDefPropertyKey(typeDef.getName(), child);
+ }
+
+ public static String getTypeDefPropertyKey(String typeName) {
+ return PROPERTY_PREFIX + typeName;
+ }
+
+ public static String getTypeDefPropertyKey(String typeName, String child) {
+ return PROPERTY_PREFIX + typeName + "." + child;
+ }
+
+ public static String getIdFromVertex(AtlasVertex vertex) {
+ return vertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class);
+ }
+
+ public static String getIdFromEdge(AtlasEdge edge) {
+ return edge.getProperty(Constants.GUID_PROPERTY_KEY, String.class);
+ }
+
+ public static String getTypeName(AtlasElement element) {
+ return element.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class);
+ }
+
+ public static String getEdgeLabel(String fromNode, String toNode) {
+ return PROPERTY_PREFIX + "edge." + fromNode + "." + toNode;
+ }
+
+ public static String getEdgeLabel(String property) {
+ return GraphHelper.EDGE_LABEL_PREFIX + property;
+ }
+
+ public static String getAttributeEdgeLabel(AtlasStructType fromType, String attributeName) throws AtlasBaseException {
+ return getEdgeLabel(getQualifiedAttributePropertyKey(fromType, attributeName));
+ }
+
+ public static String getQualifiedAttributePropertyKey(AtlasStructType fromType, String attributeName) throws AtlasBaseException {
+ switch (fromType.getTypeCategory()) {
+ case ENTITY:
+ case STRUCT:
+ case CLASSIFICATION:
+ return fromType.getQualifiedAttributeName(attributeName);
+ default:
+ throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPE, fromType.getTypeCategory().name());
+ }
+ }
+
+ public static boolean isEntityVertex(AtlasVertex vertex) {
+ return StringUtils.isNotEmpty(getIdFromVertex(vertex)) && StringUtils.isNotEmpty(getTypeName(vertex));
+ }
+
+ public static boolean isReference(AtlasType type) {
+ return isReference(type.getTypeCategory());
+ }
+
+ public static boolean isReference(TypeCategory typeCategory) {
+ return typeCategory == TypeCategory.STRUCT ||
+ typeCategory == TypeCategory.ENTITY ||
+ typeCategory == TypeCategory.OBJECT_ID_TYPE;
+ }
+
+ public static String encodePropertyKey(String key) {
+ String ret = AtlasStructType.AtlasAttribute.encodePropertyKey(key);
+
+ return ret;
+ }
+
+ public static String decodePropertyKey(String key) {
+ String ret = AtlasStructType.AtlasAttribute.decodePropertyKey(key);
+
+ return ret;
+ }
+
+ /**
+ * Adds an additional value to a multi-property.
+ *
+ * @param propertyName
+ * @param value
+ */
+ public static AtlasVertex addProperty(AtlasVertex vertex, String propertyName, Object value) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> addProperty({}, {}, {})", toString(vertex), propertyName, value);
+ }
+ propertyName = encodePropertyKey(propertyName);
+ vertex.addProperty(propertyName, value);
+ return vertex;
+ }
+
+ public static <T extends AtlasElement> void setProperty(T element, String propertyName, Object value) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> setProperty({}, {}, {})", toString(element), propertyName, value);
+ }
+
+ propertyName = encodePropertyKey(propertyName);
+
+ Object existingValue = element.getProperty(propertyName, Object.class);
+
+ if (value == null || (value instanceof Collection && ((Collection)value).isEmpty())) {
+ if (existingValue != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing property {} from {}", propertyName, toString(element));
+ }
+
+ element.removeProperty(propertyName);
+ }
+ } else {
+ if (!value.equals(existingValue)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting property {} in {}", propertyName, toString(element));
+ }
+
+ if ( value instanceof Date) {
+ Long encodedValue = ((Date) value).getTime();
+ element.setProperty(propertyName, encodedValue);
+ } else {
+ element.setProperty(propertyName, value);
+ }
+ }
+ }
+ }
+
+ public static <T extends AtlasElement, O> O getProperty(T element, String propertyName, Class<O> returnType) {
+ Object property = element.getProperty(encodePropertyKey(propertyName), returnType);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getProperty({}, {}) ==> {}", toString(element), propertyName, returnType.cast(property));
+ }
+
+ return returnType.cast(property);
+ }
+
+ public static AtlasVertex getVertexByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) throws AtlasBaseException {
+ AtlasVertex vertex = findByUniqueAttributes(entityType, attrValues);
+
+ if (vertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, entityType.getTypeName(),
+ attrValues.toString());
+ }
+
+ return vertex;
+ }
+
+ public static String getGuidByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) throws AtlasBaseException {
+ AtlasVertex vertexByUniqueAttributes = getVertexByUniqueAttributes(entityType, attrValues);
+ return getIdFromVertex(vertexByUniqueAttributes);
+ }
+
+ public static AtlasVertex findByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) {
+ AtlasVertex vertex = null;
+
+ final Map<String, AtlasAttribute> uniqueAttributes = entityType.getUniqAttributes();
+
+ if (MapUtils.isNotEmpty(uniqueAttributes) && MapUtils.isNotEmpty(attrValues)) {
+ for (AtlasAttribute attribute : uniqueAttributes.values()) {
+ Object attrValue = attrValues.get(attribute.getName());
+
+ if (attrValue == null) {
+ continue;
+ }
+
+ if (canUseIndexQuery(entityType, attribute.getName())) {
+ vertex = AtlasGraphUtilsV2.getAtlasVertexFromIndexQuery(entityType, attribute, attrValue);
+ } else {
+ vertex = AtlasGraphUtilsV2.findByTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue);
+
+ // if no instance of given typeName is found, try to find an instance of type's sub-type
+ if (vertex == null && !entityType.getAllSubTypes().isEmpty()) {
+ vertex = AtlasGraphUtilsV2.findBySuperTypeAndPropertyName(entityType.getTypeName(), attribute.getVertexPropertyName(), attrValue);
+ }
+ }
+
+ if (vertex != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("findByUniqueAttributes(type={}, attrName={}, attrValue={}: found vertex {}",
+ entityType.getTypeName(), attribute.getName(), attrValue, vertex);
+ }
+
+ break;
+ }
+ }
+ }
+
+ return vertex;
+ }
+
+ public static AtlasVertex findByGuid(String guid) {
+ AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
+ .has(Constants.GUID_PROPERTY_KEY, guid);
+
+ Iterator<AtlasVertex> results = query.vertices().iterator();
+
+ AtlasVertex vertex = results.hasNext() ? results.next() : null;
+
+ return vertex;
+ }
+
+ public static String getTypeNameFromGuid(String guid) {
+ String ret = null;
+
+ if (StringUtils.isNotEmpty(guid)) {
+ AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid);
+
+ ret = (vertex != null) ? AtlasGraphUtilsV2.getTypeName(vertex) : null;
+ }
+
+ return ret;
+ }
+
+ public static boolean typeHasInstanceVertex(String typeName) throws AtlasBaseException {
+ AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance()
+ .query()
+ .has(Constants.TYPE_NAME_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.EQUAL, typeName);
+
+ Iterator<AtlasVertex> results = query.vertices().iterator();
+
+ boolean hasInstanceVertex = results != null && results.hasNext();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("typeName {} has instance vertex {}", typeName, hasInstanceVertex);
+ }
+
+ return hasInstanceVertex;
+ }
+
+ public static AtlasVertex findByTypeAndPropertyName(String typeName, String propertyName, Object attrVal) {
+ AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
+ .has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName)
+ .has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name())
+ .has(propertyName, attrVal);
+
+ Iterator<AtlasVertex> results = query.vertices().iterator();
+
+ AtlasVertex vertex = results.hasNext() ? results.next() : null;
+
+ return vertex;
+ }
+
+ public static AtlasVertex findBySuperTypeAndPropertyName(String typeName, String propertyName, Object attrVal) {
+ AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
+ .has(Constants.SUPER_TYPES_PROPERTY_KEY, typeName)
+ .has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name())
+ .has(propertyName, attrVal);
+
+ Iterator<AtlasVertex> results = query.vertices().iterator();
+
+ AtlasVertex vertex = results.hasNext() ? results.next() : null;
+
+ return vertex;
+ }
+
+ public static List<String> findEntityGUIDsByType(String typename, SortOrder sortOrder) {
+ AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
+ .has(Constants.ENTITY_TYPE_PROPERTY_KEY, typename);
+ if (sortOrder != null) {
+ AtlasGraphQuery.SortOrder qrySortOrder = sortOrder == SortOrder.ASCENDING ? ASC : DESC;
+ query.orderBy(Constants.QUALIFIED_NAME, qrySortOrder);
+ }
+
+ Iterator<AtlasVertex> results = query.vertices().iterator();
+ ArrayList<String> ret = new ArrayList<>();
+
+ if (!results.hasNext()) {
+ return Collections.emptyList();
+ }
+
+ while (results.hasNext()) {
+ ret.add(getIdFromVertex(results.next()));
+ }
+
+ return ret;
+ }
+
+ public static List<String> findEntityGUIDsByType(String typename) {
+ return findEntityGUIDsByType(typename, null);
+ }
+
+ public static boolean relationshipTypeHasInstanceEdges(String typeName) throws AtlasBaseException {
+ AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance()
+ .query()
+ .has(Constants.TYPE_NAME_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.EQUAL, typeName);
+
+ Iterator<AtlasEdge> results = query.edges().iterator();
+
+ boolean hasInstanceEdges = results != null && results.hasNext();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("relationshipType {} has instance edges {}", typeName, hasInstanceEdges);
+ }
+
+ return hasInstanceEdges;
+ }
+
+ private static String toString(AtlasElement element) {
+ if (element instanceof AtlasVertex) {
+ return toString((AtlasVertex) element);
+ } else if (element instanceof AtlasEdge) {
+ return toString((AtlasEdge)element);
+ }
+
+ return element.toString();
+ }
+
+ public static String toString(AtlasVertex vertex) {
+ if(vertex == null) {
+ return "vertex[null]";
+ } else {
+ if (LOG.isDebugEnabled()) {
+ return getVertexDetails(vertex);
+ } else {
+ return String.format("vertex[id=%s]", vertex.getId().toString());
+ }
+ }
+ }
+
+
+ public static String toString(AtlasEdge edge) {
+ if(edge == null) {
+ return "edge[null]";
+ } else {
+ if (LOG.isDebugEnabled()) {
+ return getEdgeDetails(edge);
+ } else {
+ return String.format("edge[id=%s]", edge.getId().toString());
+ }
+ }
+ }
+
+ public static String getVertexDetails(AtlasVertex vertex) {
+ return String.format("vertex[id=%s type=%s guid=%s]",
+ vertex.getId().toString(), getTypeName(vertex), getIdFromVertex(vertex));
+ }
+
+ public static String getEdgeDetails(AtlasEdge edge) {
+ return String.format("edge[id=%s label=%s from %s -> to %s]", edge.getId(), edge.getLabel(),
+ toString(edge.getOutVertex()), toString(edge.getInVertex()));
+ }
+
+ public static AtlasEntity.Status getState(AtlasElement element) {
+ String state = getStateAsString(element);
+ return state == null ? null : AtlasEntity.Status.valueOf(state);
+ }
+
+ public static String getStateAsString(AtlasElement element) {
+ return element.getProperty(Constants.STATE_PROPERTY_KEY, String.class);
+ }
+
+ private static boolean canUseIndexQuery(AtlasEntityType entityType, String attributeName) {
+ boolean ret = false;
+
+ if (USE_INDEX_QUERY_TO_FIND_ENTITY_BY_UNIQUE_ATTRIBUTES) {
+ final String typeAndSubTypesQryStr = entityType.getTypeAndAllSubTypesQryStr();
+
+ ret = typeAndSubTypesQryStr.length() <= SearchProcessor.MAX_QUERY_STR_LENGTH_TYPES;
+
+ if (ret) {
+ Set<String> indexSet = AtlasGraphProvider.getGraphInstance().getVertexIndexKeys();
+ try {
+ ret = indexSet.contains(entityType.getQualifiedAttributeName(attributeName));
+ }
+ catch (AtlasBaseException ex) {
+ ret = false;
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ private static AtlasVertex getAtlasVertexFromIndexQuery(AtlasEntityType entityType, AtlasAttribute attribute, Object attrVal) {
+ String propertyName = attribute.getVertexPropertyName();
+ AtlasIndexQuery query = getIndexQuery(entityType, propertyName, attrVal.toString());
+
+ for (Iterator<AtlasIndexQuery.Result> iter = query.vertices(); iter.hasNext(); ) {
+ AtlasIndexQuery.Result result = iter.next();
+ AtlasVertex vertex = result.getVertex();
+
+ // skip non-entity vertices, if any got returned
+ if (vertex == null || !vertex.getPropertyKeys().contains(Constants.GUID_PROPERTY_KEY)) {
+ continue;
+ }
+
+ // verify the typeName
+ String typeNameInVertex = getTypeName(vertex);
+
+ if (!entityType.getTypeAndAllSubTypes().contains(typeNameInVertex)) {
+ LOG.warn("incorrect vertex type from index-query: expected='{}'; found='{}'", entityType.getTypeName(), typeNameInVertex);
+
+ continue;
+ }
+
+ if (attrVal.getClass() == String.class) {
+ String s = (String) attrVal;
+ String vertexVal = vertex.getProperty(propertyName, String.class);
+
+ if (!s.equalsIgnoreCase(vertexVal)) {
+ LOG.warn("incorrect match from index-query for property {}: expected='{}'; found='{}'", propertyName, s, vertexVal);
+
+ continue;
+ }
+ }
+
+ return vertex;
+ }
+
+ return null;
+ }
+
+ private static AtlasIndexQuery getIndexQuery(AtlasEntityType entityType, String propertyName, String value) {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(INDEX_SEARCH_PREFIX + "\"").append(Constants.TYPE_NAME_PROPERTY_KEY).append("\":").append(entityType.getTypeAndAllSubTypesQryStr())
+ .append(" AND ")
+ .append(INDEX_SEARCH_PREFIX + "\"").append(propertyName).append("\":").append(AtlasAttribute.escapeIndexQueryValue(value))
+ .append(" AND ")
+ .append(INDEX_SEARCH_PREFIX + "\"").append(Constants.STATE_PROPERTY_KEY).append("\":ACTIVE");
+
+ return AtlasGraphProvider.getGraphInstance().indexQuery(Constants.VERTEX_INDEX, sb.toString());
+ }
+
+ public static String getIndexSearchPrefix() {
+ return INDEX_SEARCH_PREFIX;
+ }
+}