You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ap...@apache.org on 2018/05/25 03:52:49 UTC
[11/20] atlas git commit: ATLAS-2490: updates to make usage of v1/v2
in class names consistent
http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
new file mode 100644
index 0000000..bc03aeb
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
@@ -0,0 +1,508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef.RelationshipCategory;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.query.AtlasDSL;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasRelationshipType;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.atlas.authorize.AtlasPrivilege;
+import org.apache.atlas.authorize.AtlasTypeAccessRequest;
+import org.apache.atlas.authorize.AtlasAuthorizationUtils;
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * RelationshipDef store in v1 format.
+ */
+public class AtlasRelationshipDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasRelationshipDef> {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasRelationshipDefStoreV2.class);
+
+ @Inject
+ public AtlasRelationshipDefStoreV2(AtlasTypeDefGraphStoreV2 typeDefStore, AtlasTypeRegistry typeRegistry) {
+ super(typeDefStore, typeRegistry);
+ }
+
+ @Override
+ public AtlasVertex preCreate(AtlasRelationshipDef relationshipDef) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasRelationshipDefStoreV1.preCreate({})", relationshipDef);
+ }
+
+ validateType(relationshipDef);
+
+ AtlasType type = typeRegistry.getType(relationshipDef.getName());
+
+ if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.RELATIONSHIP) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, relationshipDef.getName(), TypeCategory.RELATIONSHIP.name());
+ }
+
+ AtlasVertex relationshipDefVertex = typeDefStore.findTypeVertexByName(relationshipDef.getName());
+
+ if (relationshipDefVertex != null) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, relationshipDef.getName());
+ }
+
+ relationshipDefVertex = typeDefStore.createTypeVertex(relationshipDef);
+
+ updateVertexPreCreate(relationshipDef, (AtlasRelationshipType) type, relationshipDefVertex);
+
+ final AtlasRelationshipEndDef endDef1 = relationshipDef.getEndDef1();
+ final AtlasRelationshipEndDef endDef2 = relationshipDef.getEndDef2();
+ final String type1 = endDef1.getType();
+ final String type2 = endDef2.getType();
+ final String name1 = endDef1.getName();
+ final String name2 = endDef2.getName();
+ final AtlasVertex end1TypeVertex = typeDefStore.findTypeVertexByName(type1);
+ final AtlasVertex end2TypeVertex = typeDefStore.findTypeVertexByName(type2);
+
+ if (end1TypeVertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_END_TYPE_NAME_NOT_FOUND, relationshipDef.getName(), type1);
+ }
+
+ if (end2TypeVertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_END_TYPE_NAME_NOT_FOUND, relationshipDef.getName(), type2);
+ }
+
+ // create an edge between the relationshipDef and each of the entityDef vertices.
+ AtlasEdge edge1 = typeDefStore.getOrCreateEdge(relationshipDefVertex, end1TypeVertex, AtlasGraphUtilsV2.RELATIONSHIPTYPE_EDGE_LABEL);
+
+ /*
+ Where edge1 and edge2 have the same names and types we do not need a second edge.
+ We are not invoking the equals method on the AtlasRelationshipedDef, as we only want 1 edge even if propagateTags or other properties are different.
+ */
+
+ if (type1.equals(type2) && name1.equals(name2)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AtlasRelationshipDefStoreV1.preCreate({}): created relationshipDef vertex {}," +
+ " and one edge as {}, because end1 and end2 have the same type and name", relationshipDef, relationshipDefVertex, edge1);
+ }
+
+ } else {
+ AtlasEdge edge2 = typeDefStore.getOrCreateEdge(relationshipDefVertex, end2TypeVertex, AtlasGraphUtilsV2.RELATIONSHIPTYPE_EDGE_LABEL);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AtlasRelationshipDefStoreV1.preCreate({}): created relationshipDef vertex {}," +
+ " edge1 as {}, edge2 as {} ", relationshipDef, relationshipDefVertex, edge1, edge2);
+ }
+
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasRelationshipDefStoreV1.preCreate({}): {}", relationshipDef, relationshipDefVertex);
+ }
+ return relationshipDefVertex;
+ }
+
+ @Override
+ public AtlasRelationshipDef create(AtlasRelationshipDef relationshipDef, AtlasVertex preCreateResult)
+ throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasRelationshipDefStoreV1.create({}, {})", relationshipDef, preCreateResult);
+ }
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_CREATE, relationshipDef), "create relationship-def ", relationshipDef.getName());
+
+ AtlasVertex vertex = (preCreateResult == null) ? preCreate(relationshipDef) : preCreateResult;
+
+ AtlasRelationshipDef ret = toRelationshipDef(vertex);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasRelationshipDefStoreV1.create({}, {}): {}", relationshipDef, preCreateResult, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public List<AtlasRelationshipDef> getAll() throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasRelationshipDefStoreV1.getAll()");
+ }
+
+ List<AtlasRelationshipDef> ret = new ArrayList<>();
+ Iterator<AtlasVertex> vertices = typeDefStore.findTypeVerticesByCategory(TypeCategory.RELATIONSHIP);
+
+ while (vertices.hasNext()) {
+ ret.add(toRelationshipDef(vertices.next()));
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasRelationshipDefStoreV1.getAll(): count={}", ret.size());
+ }
+
+ return ret;
+ }
+
+ @Override
+ public AtlasRelationshipDef getByName(String name) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasRelationshipDefStoreV1.getByName({})", name);
+ }
+
+ AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.RELATIONSHIP);
+
+ if (vertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
+ }
+
+ vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, TypeCategory.class);
+
+ AtlasRelationshipDef ret = toRelationshipDef(vertex);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasRelationshipDefStoreV1.getByName({}): {}", name, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public AtlasRelationshipDef getByGuid(String guid) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasRelationshipDefStoreV1.getByGuid({})", guid);
+ }
+
+ AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.RELATIONSHIP);
+
+ if (vertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
+ }
+
+ AtlasRelationshipDef ret = toRelationshipDef(vertex);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasRelationshipDefStoreV1.getByGuid({}): {}", guid, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public AtlasRelationshipDef update(AtlasRelationshipDef relationshipDef) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasRelationshipDefStoreV1.update({})", relationshipDef);
+ }
+
+ validateType(relationshipDef);
+
+ AtlasRelationshipDef ret = StringUtils.isNotBlank(relationshipDef.getGuid())
+ ? updateByGuid(relationshipDef.getGuid(), relationshipDef)
+ : updateByName(relationshipDef.getName(), relationshipDef);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasRelationshipDefStoreV1.update({}): {}", relationshipDef, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public AtlasRelationshipDef updateByName(String name, AtlasRelationshipDef relationshipDef)
+ throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasRelationshipDefStoreV1.updateByName({}, {})", name, relationshipDef);
+ }
+
+ AtlasRelationshipDef existingDef = typeRegistry.getRelationshipDefByName(name);
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update relationship-def ", name);
+
+ validateType(relationshipDef);
+
+ AtlasType type = typeRegistry.getType(relationshipDef.getName());
+
+ if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.RELATIONSHIP) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, relationshipDef.getName(), TypeCategory.RELATIONSHIP.name());
+ }
+
+ AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.RELATIONSHIP);
+
+ if (vertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
+ }
+
+ preUpdateCheck(relationshipDef, (AtlasRelationshipType) type, vertex);
+
+ AtlasRelationshipDef ret = toRelationshipDef(vertex);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasRelationshipDefStoreV1.updateByName({}, {}): {}", name, relationshipDef, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public AtlasRelationshipDef updateByGuid(String guid, AtlasRelationshipDef relationshipDef)
+ throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasRelationshipDefStoreV1.updateByGuid({})", guid);
+ }
+
+ AtlasRelationshipDef existingDef = typeRegistry.getRelationshipDefByGuid(guid);
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update relationship-Def ", (existingDef != null ? existingDef.getName() : guid));
+
+ validateType(relationshipDef);
+
+ AtlasType type = typeRegistry.getTypeByGuid(guid);
+
+ if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.RELATIONSHIP) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, relationshipDef.getName(), TypeCategory.RELATIONSHIP.name());
+ }
+
+ AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.RELATIONSHIP);
+
+ if (vertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
+ }
+
+ preUpdateCheck(relationshipDef, (AtlasRelationshipType) type, vertex);
+ // updates should not effect the edges between the types as we do not allow updates that change the endpoints.
+
+ AtlasRelationshipDef ret = toRelationshipDef(vertex);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasRelationshipDefStoreV1.updateByGuid({}): {}", guid, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public AtlasVertex preDeleteByName(String name) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasRelationshipDefStoreV1.preDeleteByName({})", name);
+ }
+
+ AtlasRelationshipDef existingDef = typeRegistry.getRelationshipDefByName(name);
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete relationship-def ", name);
+
+ AtlasVertex ret = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.RELATIONSHIP);
+
+ if (ret == null) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
+ }
+
+ if (AtlasGraphUtilsV2.relationshipTypeHasInstanceEdges(name)) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, name);
+ }
+
+ typeDefStore.deleteTypeVertexOutEdges(ret);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasRelationshipDefStoreV1.preDeleteByName({}): {}", name, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public AtlasVertex preDeleteByGuid(String guid) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> AtlasRelationshipDefStoreV1.preDeleteByGuid({})", guid);
+ }
+
+ AtlasRelationshipDef existingDef = typeRegistry.getRelationshipDefByGuid(guid);
+
+ AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete relationship-def ", (existingDef != null ? existingDef.getName() : guid));
+
+ AtlasVertex ret = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.RELATIONSHIP);
+
+ if (ret == null) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
+ }
+
+ String typeName = AtlasGraphUtilsV2.getProperty(ret, Constants.TYPENAME_PROPERTY_KEY, String.class);
+
+ if (AtlasGraphUtilsV2.relationshipTypeHasInstanceEdges(typeName)) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, typeName);
+ }
+
+ typeDefStore.deleteTypeVertexOutEdges(ret);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== AtlasRelationshipDefStoreV1.preDeleteByGuid({}): {}", guid, ret);
+ }
+
+ return ret;
+ }
+
+ private void updateVertexPreCreate(AtlasRelationshipDef relationshipDef, AtlasRelationshipType relationshipType,
+ AtlasVertex vertex) throws AtlasBaseException {
+ AtlasRelationshipEndDef end1 = relationshipDef.getEndDef1();
+ AtlasRelationshipEndDef end2 = relationshipDef.getEndDef2();
+
+ // check whether the names added on the relationship Ends are reserved if required.
+ final boolean allowReservedKeywords;
+ try {
+ allowReservedKeywords = ApplicationProperties.get().getBoolean(ALLOW_RESERVED_KEYWORDS, true);
+ } catch (AtlasException e) {
+ throw new AtlasBaseException(e);
+ }
+
+ if (!allowReservedKeywords) {
+ if (AtlasDSL.Parser.isKeyword(end1.getName())) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_END1_NAME_INVALID, end1.getName());
+ }
+
+ if (AtlasDSL.Parser.isKeyword(end2.getName())) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_END2_NAME_INVALID, end2.getName());
+ }
+ }
+
+ AtlasStructDefStoreV2.updateVertexPreCreate(relationshipDef, relationshipType, vertex, typeDefStore);
+ // Update ends
+ setVertexPropertiesFromRelationshipDef(relationshipDef, vertex);
+ }
+
+ private void preUpdateCheck(AtlasRelationshipDef newRelationshipDef, AtlasRelationshipType relationshipType,
+ AtlasVertex vertex) throws AtlasBaseException {
+ // We will not support an update to endpoints or category key
+ AtlasRelationshipDef existingRelationshipDef = toRelationshipDef(vertex);
+
+ preUpdateCheck(newRelationshipDef, existingRelationshipDef);
+ // we do allow change to tag propagation and the addition of new attributes.
+
+ AtlasStructDefStoreV2.updateVertexPreUpdate(newRelationshipDef, relationshipType, vertex, typeDefStore);
+
+ setVertexPropertiesFromRelationshipDef(newRelationshipDef, vertex);
+ }
+
+ /**
+ * Check ends are the same and relationshipCategory is the same.
+ *
+ * We do this by comparing 2 relationshipDefs to avoid exposing the AtlasVertex to unit testing.
+ *
+ * @param newRelationshipDef
+ * @param existingRelationshipDef
+ * @throws AtlasBaseException
+ */
+ public static void preUpdateCheck(AtlasRelationshipDef newRelationshipDef, AtlasRelationshipDef existingRelationshipDef) throws AtlasBaseException {
+ // do not allow renames of the Def.
+ String existingName = existingRelationshipDef.getName();
+ String newName = newRelationshipDef.getName();
+
+ if (!existingName.equals(newName)) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_NAME_UPDATE,
+ newRelationshipDef.getGuid(),existingName, newName);
+ }
+
+ RelationshipCategory existingRelationshipCategory = existingRelationshipDef.getRelationshipCategory();
+ RelationshipCategory newRelationshipCategory = newRelationshipDef.getRelationshipCategory();
+
+ if ( !existingRelationshipCategory.equals(newRelationshipCategory)){
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_CATEGORY_UPDATE,
+ newRelationshipDef.getName(),newRelationshipCategory.name(),
+ existingRelationshipCategory.name() );
+ }
+
+ AtlasRelationshipEndDef existingEnd1 = existingRelationshipDef.getEndDef1();
+ AtlasRelationshipEndDef newEnd1 = newRelationshipDef.getEndDef1();
+
+ if ( !newEnd1.equals(existingEnd1) ) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END1_UPDATE,
+ newRelationshipDef.getName(), newEnd1.toString(), existingEnd1.toString());
+ }
+
+ AtlasRelationshipEndDef existingEnd2 = existingRelationshipDef.getEndDef2();
+ AtlasRelationshipEndDef newEnd2 = newRelationshipDef.getEndDef2();
+
+ if ( !newEnd2.equals(existingEnd2) ) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END2_UPDATE,
+ newRelationshipDef.getName(), newEnd2.toString(), existingEnd2.toString());
+ }
+ }
+
+ public static void setVertexPropertiesFromRelationshipDef(AtlasRelationshipDef relationshipDef, AtlasVertex vertex) {
+ vertex.setProperty(Constants.RELATIONSHIPTYPE_END1_KEY, AtlasType.toJson(relationshipDef.getEndDef1()));
+ vertex.setProperty(Constants.RELATIONSHIPTYPE_END2_KEY, AtlasType.toJson(relationshipDef.getEndDef2()));
+ // default the relationship category to association if it has not been specified.
+ String relationshipCategory = RelationshipCategory.ASSOCIATION.name();
+ if (relationshipDef.getRelationshipCategory()!=null) {
+ relationshipCategory =relationshipDef.getRelationshipCategory().name();
+ }
+ // Update RelationshipCategory
+ vertex.setProperty(Constants.RELATIONSHIPTYPE_CATEGORY_KEY, relationshipCategory);
+
+ if (relationshipDef.getPropagateTags() == null) {
+ vertex.setProperty(Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, AtlasRelationshipDef.PropagateTags.NONE.name());
+ } else {
+ vertex.setProperty(Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, relationshipDef.getPropagateTags().name());
+ }
+ }
+
+ private AtlasRelationshipDef toRelationshipDef(AtlasVertex vertex) throws AtlasBaseException {
+ AtlasRelationshipDef ret = null;
+
+ if (vertex != null && typeDefStore.isTypeVertex(vertex, TypeCategory.RELATIONSHIP)) {
+ String name = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class);
+ String description = vertex.getProperty(Constants.TYPEDESCRIPTION_PROPERTY_KEY, String.class);
+ String version = vertex.getProperty(Constants.TYPEVERSION_PROPERTY_KEY, String.class);
+ String end1Str = vertex.getProperty(Constants.RELATIONSHIPTYPE_END1_KEY, String.class);
+ String end2Str = vertex.getProperty(Constants.RELATIONSHIPTYPE_END2_KEY, String.class);
+ String relationStr = vertex.getProperty(Constants.RELATIONSHIPTYPE_CATEGORY_KEY, String.class);
+ String propagateStr = vertex.getProperty(Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, String.class);
+
+ // set the ends
+ AtlasRelationshipEndDef endDef1 = AtlasType.fromJson(end1Str, AtlasRelationshipEndDef.class);
+ AtlasRelationshipEndDef endDef2 = AtlasType.fromJson(end2Str, AtlasRelationshipEndDef.class);
+
+ // set the relationship Category
+ RelationshipCategory relationshipCategory = null;
+ for (RelationshipCategory value : RelationshipCategory.values()) {
+ if (value.name().equals(relationStr)) {
+ relationshipCategory = value;
+ }
+ }
+
+ // set the propagateTags
+ PropagateTags propagateTags = null;
+ for (PropagateTags value : PropagateTags.values()) {
+ if (value.name().equals(propagateStr)) {
+ propagateTags = value;
+ }
+ }
+
+ ret = new AtlasRelationshipDef(name, description, version, relationshipCategory, propagateTags, endDef1, endDef2);
+
+ // add in the attributes
+ AtlasStructDefStoreV2.toStructDef(vertex, ret, typeDefStore);
+ }
+
+ return ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
new file mode 100644
index 0000000..eb1079c
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
@@ -0,0 +1,837 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.TypeCategory;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity.Status;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.AtlasRelationship.AtlasRelationshipWithExtInfo;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.RepositoryException;
+import org.apache.atlas.repository.graph.GraphHelper;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasRelationshipType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
+import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
+import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
+import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE;
+import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
+import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid;
+import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName;
+import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertices;
+import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getState;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getTypeName;
+
+@Component
+public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasRelationshipStoreV2.class);
+
+ private static final Long DEFAULT_RELATIONSHIP_VERSION = 0L;
+
+ private final AtlasTypeRegistry typeRegistry;
+ private final EntityGraphRetriever entityRetriever;
+ private final DeleteHandlerV1 deleteHandler;
+ private final GraphHelper graphHelper = GraphHelper.getInstance();
+ private final AtlasEntityChangeNotifier entityChangeNotifier;
+
+ @Inject
+ public AtlasRelationshipStoreV2(AtlasTypeRegistry typeRegistry, DeleteHandlerV1 deleteHandler, AtlasEntityChangeNotifier entityChangeNotifier) {
+ this.typeRegistry = typeRegistry;
+ this.entityRetriever = new EntityGraphRetriever(typeRegistry);
+ this.deleteHandler = deleteHandler;
+ this.entityChangeNotifier = entityChangeNotifier;
+ }
+
+ @Override
+ @GraphTransaction
+ public AtlasRelationship create(AtlasRelationship relationship) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> create({})", relationship);
+ }
+
+ AtlasVertex end1Vertex = getVertexFromEndPoint(relationship.getEnd1());
+ AtlasVertex end2Vertex = getVertexFromEndPoint(relationship.getEnd2());
+
+ validateRelationship(end1Vertex, end2Vertex, relationship.getTypeName(), relationship.getAttributes());
+
+ AtlasEdge edge = createRelationship(end1Vertex, end2Vertex, relationship);
+
+ AtlasRelationship ret = edge != null ? entityRetriever.mapEdgeToAtlasRelationship(edge) : null;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== create({}): {}", relationship, ret);
+ }
+
+ // notify entities for added/removed classification propagation
+ entityChangeNotifier.notifyPropagatedEntities();
+
+ return ret;
+ }
+
+ @Override
+ @GraphTransaction
+ public AtlasRelationship update(AtlasRelationship relationship) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> update({})", relationship);
+ }
+
+ String guid = relationship.getGuid();
+
+ if (StringUtils.isEmpty(guid)) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, guid);
+ }
+
+ AtlasEdge edge = graphHelper.getEdgeForGUID(guid);
+ String edgeType = AtlasGraphUtilsV2.getTypeName(edge);
+ AtlasVertex end1Vertex = edge.getOutVertex();
+ AtlasVertex end2Vertex = edge.getInVertex();
+
+ // update shouldn't change endType
+ if (StringUtils.isNotEmpty(relationship.getTypeName()) && !StringUtils.equalsIgnoreCase(edgeType, relationship.getTypeName())) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_TYPE_CHANGE_NOT_ALLOWED, guid, edgeType, relationship.getTypeName());
+ }
+
+ // update shouldn't change ends
+ if (relationship.getEnd1() != null) {
+ String updatedEnd1Guid = relationship.getEnd1().getGuid();
+
+ if (updatedEnd1Guid == null) {
+ AtlasVertex updatedEnd1Vertex = getVertexFromEndPoint(relationship.getEnd1());
+
+ updatedEnd1Guid = updatedEnd1Vertex == null ? null : AtlasGraphUtilsV2.getIdFromVertex(updatedEnd1Vertex);
+ }
+
+ if (updatedEnd1Guid != null) {
+ String end1Guid = AtlasGraphUtilsV2.getIdFromVertex(end1Vertex);
+
+ if (!StringUtils.equalsIgnoreCase(relationship.getEnd1().getGuid(), end1Guid)) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_END_CHANGE_NOT_ALLOWED, edgeType, guid, end1Guid, relationship.getEnd1().getGuid());
+ }
+ }
+ }
+
+ // update shouldn't change ends
+ if (relationship.getEnd2() != null) {
+ String updatedEnd2Guid = relationship.getEnd2().getGuid();
+
+ if (updatedEnd2Guid == null) {
+ AtlasVertex updatedEnd2Vertex = getVertexFromEndPoint(relationship.getEnd2());
+
+ updatedEnd2Guid = updatedEnd2Vertex == null ? null : AtlasGraphUtilsV2.getIdFromVertex(updatedEnd2Vertex);
+ }
+
+ if (updatedEnd2Guid != null) {
+ String end2Guid = AtlasGraphUtilsV2.getIdFromVertex(end2Vertex);
+
+ if (!StringUtils.equalsIgnoreCase(relationship.getEnd2().getGuid(), end2Guid)) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_END_CHANGE_NOT_ALLOWED, AtlasGraphUtilsV2.getTypeName(edge), guid, end2Guid, relationship.getEnd2().getGuid());
+ }
+ }
+ }
+
+
+ validateRelationship(end1Vertex, end2Vertex, edgeType, relationship.getAttributes());
+
+ AtlasRelationship ret = updateRelationship(edge, relationship);
+
+ // notify entities for added/removed classification propagation
+ entityChangeNotifier.notifyPropagatedEntities();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== update({}): {}", relationship, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ @GraphTransaction
+ public AtlasRelationship getById(String guid) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> getById({})", guid);
+ }
+
+ AtlasEdge edge = graphHelper.getEdgeForGUID(guid);
+ AtlasRelationship ret = entityRetriever.mapEdgeToAtlasRelationship(edge);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== getById({}): {}", guid, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ @GraphTransaction
+ public AtlasRelationshipWithExtInfo getExtInfoById(String guid) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> getExtInfoById({})", guid);
+ }
+
+ AtlasEdge edge = graphHelper.getEdgeForGUID(guid);
+ AtlasRelationshipWithExtInfo ret = entityRetriever.mapEdgeToAtlasRelationshipWithExtInfo(edge);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== getExtInfoById({}): {}", guid, ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ @GraphTransaction
+ public void deleteById(String guid) throws AtlasBaseException {
+ deleteById(guid, false);
+ }
+
+ @Override
+ @GraphTransaction
+ public void deleteById(String guid, boolean forceDelete) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> deleteById({}, {})", guid, forceDelete);
+ }
+
+ if (StringUtils.isEmpty(guid)) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, " empty/null guid");
+ }
+
+ AtlasEdge edge = graphHelper.getEdgeForGUID(guid);
+
+ if (edge == null) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, guid);
+ }
+
+ if (getState(edge) == DELETED) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_DELETED, guid);
+ }
+
+ deleteHandler.deleteRelationships(Collections.singleton(edge), forceDelete);
+
+ // notify entities for added/removed classification propagation
+ entityChangeNotifier.notifyPropagatedEntities();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== deleteById({}): {}", guid);
+ }
+ }
+
+ @Override
+ public AtlasEdge getOrCreate(AtlasVertex end1Vertex, AtlasVertex end2Vertex, AtlasRelationship relationship) throws AtlasBaseException {
+ AtlasEdge ret = getRelationshipEdge(end1Vertex, end2Vertex, relationship.getTypeName());
+
+ if (ret == null) {
+ validateRelationship(end1Vertex, end2Vertex, relationship.getTypeName(), relationship.getAttributes());
+
+ ret = createRelationship(end1Vertex, end2Vertex, relationship);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public AtlasRelationship getOrCreate(AtlasRelationship relationship) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> getOrCreate({})", relationship);
+ }
+
+ validateRelationship(relationship);
+
+ AtlasVertex end1Vertex = getVertexFromEndPoint(relationship.getEnd1());
+ AtlasVertex end2Vertex = getVertexFromEndPoint(relationship.getEnd2());
+ AtlasRelationship ret = null;
+
+ // check if relationship exists
+ AtlasEdge relationshipEdge = getRelationshipEdge(end1Vertex, end2Vertex, relationship.getTypeName());
+
+ if (relationshipEdge == null) {
+ validateRelationship(relationship);
+
+ relationshipEdge = createRelationship(end1Vertex, end2Vertex, relationship);
+ }
+
+ if (relationshipEdge != null){
+ ret = entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== getOrCreate({}): {}", relationship, ret);
+ }
+
+ return ret;
+ }
+
+ private AtlasEdge createRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex, AtlasRelationship relationship) throws AtlasBaseException {
+ AtlasEdge ret = null;
+
+ try {
+ ret = getRelationshipEdge(end1Vertex, end2Vertex, relationship.getTypeName());
+
+ if (ret == null) {
+ ret = createRelationshipEdge(end1Vertex, end2Vertex, relationship);
+
+ AtlasRelationshipType relationType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
+
+ if (MapUtils.isNotEmpty(relationType.getAllAttributes())) {
+ for (AtlasAttribute attr : relationType.getAllAttributes().values()) {
+ String attrName = attr.getName();
+ String attrVertexProperty = attr.getVertexPropertyName();
+ Object attrValue = relationship.getAttribute(attrName);
+
+ AtlasGraphUtilsV2.setProperty(ret, attrVertexProperty, attrValue);
+ }
+ }
+ } else {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_EXISTS, relationship.getTypeName(),
+ AtlasGraphUtilsV2.getIdFromVertex(end1Vertex), AtlasGraphUtilsV2.getIdFromVertex(end2Vertex));
+ }
+ } catch (RepositoryException e) {
+ throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
+ }
+
+ return ret;
+ }
+
+ private AtlasRelationship updateRelationship(AtlasEdge relationshipEdge, AtlasRelationship relationship) throws AtlasBaseException {
+ AtlasRelationshipType relationType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
+
+ updateTagPropagations(relationshipEdge, relationship);
+
+ if (MapUtils.isNotEmpty(relationType.getAllAttributes())) {
+ for (AtlasAttribute attr : relationType.getAllAttributes().values()) {
+ String attrName = attr.getName();
+ String attrVertexProperty = attr.getVertexPropertyName();
+
+ if (relationship.hasAttribute(attrName)) {
+ AtlasGraphUtilsV2.setProperty(relationshipEdge, attrVertexProperty, relationship.getAttribute(attrName));
+ }
+ }
+ }
+
+ return entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
+ }
+
+ private void handleBlockedClassifications(AtlasEdge edge, Set<AtlasClassification> blockedPropagatedClassifications) throws AtlasBaseException {
+ if (blockedPropagatedClassifications != null) {
+ List<AtlasVertex> propagatedClassificationVertices = getClassificationVertices(edge);
+ List<String> currentClassificationIds = getBlockedClassificationIds(edge);
+ List<AtlasVertex> currentBlockedPropagatedClassificationVertices = getBlockedClassificationVertices(propagatedClassificationVertices, currentClassificationIds);
+ List<AtlasVertex> updatedBlockedPropagatedClassificationVertices = new ArrayList<>();
+ List<String> updatedClassificationIds = new ArrayList<>();
+
+ for (AtlasClassification classification : blockedPropagatedClassifications) {
+ AtlasVertex classificationVertex = validateBlockedPropagatedClassification(propagatedClassificationVertices, classification);
+
+ // ignore invalid blocked propagated classification
+ if (classificationVertex == null) {
+ continue;
+ }
+
+ updatedBlockedPropagatedClassificationVertices.add(classificationVertex);
+
+ String classificationId = classificationVertex.getIdForDisplay();
+
+ updatedClassificationIds.add(classificationId);
+ }
+
+ addToBlockedClassificationIds(edge, updatedClassificationIds);
+
+ // remove propagated tag for added entry
+ List<AtlasVertex> addedBlockedClassifications = (List<AtlasVertex>) CollectionUtils.subtract(updatedBlockedPropagatedClassificationVertices, currentBlockedPropagatedClassificationVertices);
+
+ for (AtlasVertex classificationVertex : addedBlockedClassifications) {
+ List<AtlasVertex> removePropagationFromVertices = graphHelper.getPropagatedEntityVertices(classificationVertex);
+
+ deleteHandler.removeTagPropagation(classificationVertex, removePropagationFromVertices);
+ }
+
+ // add propagated tag for removed entry
+ List<AtlasVertex> removedBlockedClassifications = (List<AtlasVertex>) CollectionUtils.subtract(currentBlockedPropagatedClassificationVertices, updatedBlockedPropagatedClassificationVertices);
+
+ for (AtlasVertex classificationVertex : removedBlockedClassifications) {
+ List<AtlasVertex> addPropagationToVertices = graphHelper.getPropagatedEntityVertices(classificationVertex);
+
+ deleteHandler.addTagPropagation(classificationVertex, addPropagationToVertices);
+ }
+ }
+ }
+
+ private List<AtlasVertex> getBlockedClassificationVertices(List<AtlasVertex> classificationVertices, List<String> blockedClassificationIds) {
+ List<AtlasVertex> ret = new ArrayList<>();
+
+ if (CollectionUtils.isNotEmpty(blockedClassificationIds)) {
+ for (AtlasVertex classificationVertex : classificationVertices) {
+ String classificationId = classificationVertex.getIdForDisplay();
+
+ if (blockedClassificationIds.contains(classificationId)) {
+ ret.add(classificationVertex);
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ // propagated classifications should contain blocked propagated classification
+ private AtlasVertex validateBlockedPropagatedClassification(List<AtlasVertex> classificationVertices, AtlasClassification classification) {
+ AtlasVertex ret = null;
+
+ for (AtlasVertex vertex : classificationVertices) {
+ String classificationName = getClassificationName(vertex);
+ String entityGuid = getClassificationEntityGuid(vertex);
+
+ if (classificationName.equals(classification.getTypeName()) && entityGuid.equals(classification.getEntityGuid())) {
+ ret = vertex;
+ break;
+ }
+ }
+
+ return ret;
+ }
+
+ private void addToBlockedClassificationIds(AtlasEdge edge, List<String> classificationIds) {
+ if (edge != null) {
+ if (classificationIds.isEmpty()) {
+ edge.removeProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY);
+ } else {
+ edge.setListProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY, classificationIds);
+ }
+ }
+ }
+
+ private void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship) throws AtlasBaseException {
+ PropagateTags oldTagPropagation = getPropagateTags(edge);
+ PropagateTags newTagPropagation = relationship.getPropagateTags();
+
+ if (newTagPropagation != oldTagPropagation) {
+ List<AtlasVertex> currentClassificationVertices = getClassificationVertices(edge);
+ Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap = graphHelper.getClassificationPropagatedEntitiesMapping(currentClassificationVertices);
+
+ // Update propagation edge
+ AtlasGraphUtilsV2.setProperty(edge, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, newTagPropagation.name());
+
+ List<AtlasVertex> updatedClassificationVertices = getClassificationVertices(edge);
+ List<AtlasVertex> classificationVerticesUnion = (List<AtlasVertex>) CollectionUtils.union(currentClassificationVertices, updatedClassificationVertices);
+ Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap = graphHelper.getClassificationPropagatedEntitiesMapping(classificationVerticesUnion);
+
+ // compute add/remove propagations list
+ Map<AtlasVertex, List<AtlasVertex>> addPropagationsMap = new HashMap<>();
+ Map<AtlasVertex, List<AtlasVertex>> removePropagationsMap = new HashMap<>();
+
+ if (MapUtils.isEmpty(currentClassificationsMap) && MapUtils.isNotEmpty(updatedClassificationsMap)) {
+ addPropagationsMap.putAll(updatedClassificationsMap);
+
+ } else if (MapUtils.isNotEmpty(currentClassificationsMap) && MapUtils.isEmpty(updatedClassificationsMap)) {
+ removePropagationsMap.putAll(currentClassificationsMap);
+
+ } else {
+ for (AtlasVertex classificationVertex : updatedClassificationsMap.keySet()) {
+ List<AtlasVertex> currentPropagatingEntities = currentClassificationsMap.containsKey(classificationVertex) ? currentClassificationsMap.get(classificationVertex) : Collections.emptyList();
+ List<AtlasVertex> updatedPropagatingEntities = updatedClassificationsMap.containsKey(classificationVertex) ? updatedClassificationsMap.get(classificationVertex) : Collections.emptyList();
+
+ List<AtlasVertex> entitiesAdded = (List<AtlasVertex>) CollectionUtils.subtract(updatedPropagatingEntities, currentPropagatingEntities);
+ List<AtlasVertex> entitiesRemoved = (List<AtlasVertex>) CollectionUtils.subtract(currentPropagatingEntities, updatedPropagatingEntities);
+
+ if (CollectionUtils.isNotEmpty(entitiesAdded)) {
+ addPropagationsMap.put(classificationVertex, entitiesAdded);
+ }
+
+ if (CollectionUtils.isNotEmpty(entitiesRemoved)) {
+ removePropagationsMap.put(classificationVertex, entitiesRemoved);
+ }
+ }
+ }
+
+ for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) {
+ deleteHandler.addTagPropagation(classificationVertex, addPropagationsMap.get(classificationVertex));
+ }
+
+ for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) {
+ deleteHandler.removeTagPropagation(classificationVertex, removePropagationsMap.get(classificationVertex));
+ }
+ } else {
+ // update blocked propagated classifications only if there is no change is tag propagation (don't update both)
+ handleBlockedClassifications(edge, relationship.getBlockedPropagatedClassifications());
+ }
+ }
+
+ private void validateRelationship(AtlasRelationship relationship) throws AtlasBaseException {
+ if (relationship == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "AtlasRelationship is null");
+ }
+
+ String relationshipName = relationship.getTypeName();
+ String end1TypeName = getTypeNameFromObjectId(relationship.getEnd1());
+ String end2TypeName = getTypeNameFromObjectId(relationship.getEnd2());
+ AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipName);
+
+ if (relationshipType == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "unknown relationship type'" + relationshipName + "'");
+ }
+
+ if (relationship.getEnd1() == null || relationship.getEnd2() == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "end1/end2 is null");
+ }
+
+ boolean validEndTypes = false;
+
+ if (relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end1TypeName)) {
+ validEndTypes = relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end2TypeName);
+ } else if (relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end1TypeName)) {
+ validEndTypes = relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end2TypeName);
+ }
+
+ if (!validEndTypes) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, relationshipName, relationshipType.getEnd2Type().getTypeName(), end1TypeName);
+ }
+
+ validateEnds(relationship);
+
+ validateAndNormalize(relationship);
+ }
+
+ private void validateRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex, String relationshipName, Map<String, Object> attributes) throws AtlasBaseException {
+ AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipName);
+
+ if (relationshipType == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "unknown relationship type'" + relationshipName + "'");
+ }
+
+ if (end1Vertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_END_IS_NULL, relationshipType.getEnd1Type().getTypeName());
+ }
+
+ if (end2Vertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_END_IS_NULL, relationshipType.getEnd2Type().getTypeName());
+ }
+
+ String end1TypeName = AtlasGraphUtilsV2.getTypeName(end1Vertex);
+ String end2TypeName = AtlasGraphUtilsV2.getTypeName(end2Vertex);
+
+ boolean validEndTypes = false;
+
+ if (relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end1TypeName)) {
+ validEndTypes = relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end2TypeName);
+ } else if (relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end1TypeName)) {
+ validEndTypes = relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end2TypeName);
+ }
+
+ if (!validEndTypes) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, relationshipName, relationshipType.getEnd2Type().getTypeName(), end1TypeName);
+ }
+
+ List<String> messages = new ArrayList<>();
+ AtlasRelationship relationship = new AtlasRelationship(relationshipName, attributes);
+
+ relationshipType.validateValue(relationship, relationshipName, messages);
+
+ if (!messages.isEmpty()) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, messages);
+ }
+
+ relationshipType.getNormalizedValue(relationship);
+ }
+
+
+ /**
+ * Validate the ends of the passed relationship
+ * @param relationship
+ * @throws AtlasBaseException
+ */
+ private void validateEnds(AtlasRelationship relationship) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("validateEnds entry relationship:" + relationship);
+ }
+ List<AtlasObjectId> ends = new ArrayList<>();
+ List<AtlasRelationshipEndDef> endDefs = new ArrayList<>();
+ String relationshipTypeName = relationship.getTypeName();
+ AtlasRelationshipDef relationshipDef = typeRegistry.getRelationshipDefByName(relationshipTypeName);
+
+ ends.add(relationship.getEnd1());
+ ends.add(relationship.getEnd2());
+ endDefs.add(relationshipDef.getEndDef1());
+ endDefs.add(relationshipDef.getEndDef2());
+
+ for (int i = 0; i < ends.size(); i++) {
+ AtlasObjectId end = ends.get(i);
+ String guid = end.getGuid();
+ String typeName = end.getTypeName();
+ Map<String, Object> uniqueAttributes = end.getUniqueAttributes();
+ AtlasVertex endVertex = AtlasGraphUtilsV2.findByGuid(guid);
+
+ if (!AtlasTypeUtil.isValidGuid(guid) || endVertex == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+
+ } else if (MapUtils.isNotEmpty(uniqueAttributes)) {
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+
+ if (AtlasGraphUtilsV2.findByUniqueAttributes(entityType, uniqueAttributes) == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, typeName, uniqueAttributes.toString());
+ }
+ } else {
+ // check whether the guid is the correct type
+ String vertexTypeName = endVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
+
+ if (!Objects.equals(vertexTypeName, typeName)) {
+ String attrName = endDefs.get(i).getName();
+
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_INVALID_ENDTYPE, attrName, guid, vertexTypeName, typeName);
+ }
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("validateEnds exit successfully validated relationship:" + relationship);
+ }
+ }
+
+ private void validateAndNormalize(AtlasRelationship relationship) throws AtlasBaseException {
+ List<String> messages = new ArrayList<>();
+
+ if (! AtlasTypeUtil.isValidGuid(relationship.getGuid())) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, relationship.getGuid());
+ }
+
+ AtlasRelationshipType type = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
+
+ if (type == null) {
+ throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.RELATIONSHIP.name(), relationship.getTypeName());
+ }
+
+ type.validateValue(relationship, relationship.getTypeName(), messages);
+
+ if (!messages.isEmpty()) {
+ throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, messages);
+ }
+
+ type.getNormalizedValue(relationship);
+ }
+
+ public AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String relationshipType) {
+ String relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationshipType);
+ Iterator<AtlasEdge> edgesIterator = getOutGoingEdgesByLabel(fromVertex, relationshipLabel);
+ AtlasEdge ret = null;
+
+ while (edgesIterator != null && edgesIterator.hasNext()) {
+ AtlasEdge edge = edgesIterator.next();
+
+ if (edge != null) {
+ Status status = graphHelper.getStatus(edge);
+
+ if ((status == null || status == ACTIVE) &&
+ StringUtils.equals(getIdFromVertex(edge.getInVertex()), getIdFromVertex(toVertex))) {
+ ret = edge;
+ break;
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ private Long getRelationshipVersion(AtlasRelationship relationship) {
+ Long ret = relationship != null ? relationship.getVersion() : null;
+
+ return (ret != null) ? ret : DEFAULT_RELATIONSHIP_VERSION;
+ }
+
+ private AtlasVertex getVertexFromEndPoint(AtlasObjectId endPoint) {
+ AtlasVertex ret = null;
+
+ if (StringUtils.isNotEmpty(endPoint.getGuid())) {
+ ret = AtlasGraphUtilsV2.findByGuid(endPoint.getGuid());
+ } else if (StringUtils.isNotEmpty(endPoint.getTypeName()) && MapUtils.isNotEmpty(endPoint.getUniqueAttributes())) {
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(endPoint.getTypeName());
+
+ ret = AtlasGraphUtilsV2.findByUniqueAttributes(entityType, endPoint.getUniqueAttributes());
+ }
+
+ return ret;
+ }
+
+ private AtlasEdge createRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) throws RepositoryException, AtlasBaseException {
+ String relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationship.getTypeName());
+ PropagateTags tagPropagation = getRelationshipTagPropagation(fromVertex, toVertex, relationship);
+ AtlasEdge ret = graphHelper.getOrCreateEdge(fromVertex, toVertex, relationshipLabel);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created relationship edge from [{}] --> [{}] using edge label: [{}]", getTypeName(fromVertex), getTypeName(toVertex), relationshipLabel);
+ }
+
+ // map additional properties to relationship edge
+ if (ret != null) {
+ final String guid = UUID.randomUUID().toString();
+
+ AtlasGraphUtilsV2.setProperty(ret, Constants.ENTITY_TYPE_PROPERTY_KEY, relationship.getTypeName());
+ AtlasGraphUtilsV2.setProperty(ret, Constants.RELATIONSHIP_GUID_PROPERTY_KEY, guid);
+ AtlasGraphUtilsV2.setProperty(ret, Constants.VERSION_PROPERTY_KEY, getRelationshipVersion(relationship));
+ AtlasGraphUtilsV2.setProperty(ret, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, tagPropagation.name());
+
+ // blocked propagated classifications
+ handleBlockedClassifications(ret, relationship.getBlockedPropagatedClassifications());
+
+ // propagate tags
+ deleteHandler.addTagPropagation(ret, tagPropagation);
+ }
+
+ return ret;
+ }
+
+ private PropagateTags getRelationshipTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) {
+ AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
+ AtlasRelationshipEndDef endDef1 = relationshipType.getRelationshipDef().getEndDef1();
+ AtlasRelationshipEndDef endDef2 = relationshipType.getRelationshipDef().getEndDef2();
+ Set<String> fromVertexTypes = getTypeAndAllSuperTypes(getTypeName(fromVertex));
+ Set<String> toVertexTypes = getTypeAndAllSuperTypes(getTypeName(toVertex));
+ PropagateTags ret = relationshipType.getRelationshipDef().getPropagateTags();
+
+ // relationshipDef is defined as end1 (hive_db) and end2 (hive_table) and tagPropagation = ONE_TO_TWO
+ // relationship edge exists from [hive_table --> hive_db]
+ // swap the tagPropagation property for such cases.
+ if (fromVertexTypes.contains(endDef2.getType()) && toVertexTypes.contains(endDef1.getType())) {
+ if (ret == ONE_TO_TWO) {
+ ret = TWO_TO_ONE;
+ } else if (ret == TWO_TO_ONE) {
+ ret = ONE_TO_TWO;
+ }
+ }
+
+ return ret;
+ }
+
+ private String getRelationshipEdgeLabel(AtlasVertex fromVertex, AtlasVertex toVertex, String relationshipTypeName) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getRelationshipEdgeLabel({})", relationshipTypeName);
+ }
+
+ AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipTypeName);
+ String ret = relationshipType.getRelationshipDef().getRelationshipLabel();
+ AtlasRelationshipEndDef endDef1 = relationshipType.getRelationshipDef().getEndDef1();
+ AtlasRelationshipEndDef endDef2 = relationshipType.getRelationshipDef().getEndDef2();
+ Set<String> fromVertexTypes = getTypeAndAllSuperTypes(AtlasGraphUtilsV2.getTypeName(fromVertex));
+ Set<String> toVertexTypes = getTypeAndAllSuperTypes(AtlasGraphUtilsV2.getTypeName(toVertex));
+ AtlasAttribute attribute = null;
+
+ // validate entity type and all its supertypes contains relationshipDefs end type
+ // e.g. [hive_process -> hive_table] -> [Process -> DataSet]
+ if (fromVertexTypes.contains(endDef1.getType()) && toVertexTypes.contains(endDef2.getType())) {
+ String attributeName = endDef1.getName();
+
+ attribute = relationshipType.getEnd1Type().getRelationshipAttribute(attributeName);
+
+ } else if (fromVertexTypes.contains(endDef2.getType()) && toVertexTypes.contains(endDef1.getType())) {
+ String attributeName = endDef2.getName();
+
+ attribute = relationshipType.getEnd2Type().getRelationshipAttribute(attributeName);
+ }
+
+ if (attribute != null) {
+ ret = attribute.getRelationshipEdgeLabel();
+ }
+
+ return ret;
+ }
+
+ public Set<String> getTypeAndAllSuperTypes(String entityTypeName) {
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
+
+ return (entityType != null) ? entityType.getTypeAndAllSuperTypes() : new HashSet<String>();
+ }
+
+ private String getTypeNameFromObjectId(AtlasObjectId objectId) {
+ String typeName = objectId.getTypeName();
+
+ if (StringUtils.isBlank(typeName)) {
+ typeName = AtlasGraphUtilsV2.getTypeNameFromGuid(objectId.getGuid());
+ }
+
+ return typeName;
+ }
+
+ /**
+ * Check whether this vertex has a relationship associated with this relationship type.
+ * @param vertex
+ * @param relationshipTypeName
+ * @return true if found an edge with this relationship type in.
+ */
+ private boolean vertexHasRelationshipWithType(AtlasVertex vertex, String relationshipTypeName) {
+ String relationshipEdgeLabel = getRelationshipEdgeLabel(getTypeName(vertex), relationshipTypeName);
+ Iterator<AtlasEdge> iter = graphHelper.getAdjacentEdgesByLabel(vertex, AtlasEdgeDirection.BOTH, relationshipEdgeLabel);
+
+ return (iter != null) ? iter.hasNext() : false;
+ }
+
+ private String getRelationshipEdgeLabel(String typeName, String relationshipTypeName) {
+ AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipTypeName);
+ AtlasRelationshipDef relationshipDef = relationshipType.getRelationshipDef();
+ AtlasEntityType end1Type = relationshipType.getEnd1Type();
+ AtlasEntityType end2Type = relationshipType.getEnd2Type();
+ Set<String> vertexTypes = getTypeAndAllSuperTypes(typeName);
+ AtlasAttribute attribute = null;
+
+ if (vertexTypes.contains(end1Type.getTypeName())) {
+ String attributeName = relationshipDef.getEndDef1().getName();
+
+ attribute = (attributeName != null) ? end1Type.getAttribute(attributeName) : null;
+ } else if (vertexTypes.contains(end2Type.getTypeName())) {
+ String attributeName = relationshipDef.getEndDef2().getName();
+
+ attribute = (attributeName != null) ? end2Type.getAttribute(attributeName) : null;
+ }
+
+ return (attribute != null) ? attribute.getRelationshipEdgeLabel() : null;
+ }
+}
\ No newline at end of file