You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ap...@apache.org on 2018/05/25 03:52:49 UTC

[11/20] atlas git commit: ATLAS-2490: updates to make usage of v1/v2 in class names consistent

http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
new file mode 100644
index 0000000..bc03aeb
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
@@ -0,0 +1,508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef.RelationshipCategory;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.query.AtlasDSL;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasRelationshipType;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.atlas.authorize.AtlasPrivilege;
+import org.apache.atlas.authorize.AtlasTypeAccessRequest;
+import org.apache.atlas.authorize.AtlasAuthorizationUtils;
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * RelationshipDef store in v1 format.
+ */
+public class AtlasRelationshipDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasRelationshipDef> {
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasRelationshipDefStoreV2.class);
+
+    @Inject
+    public AtlasRelationshipDefStoreV2(AtlasTypeDefGraphStoreV2 typeDefStore, AtlasTypeRegistry typeRegistry) {
+        super(typeDefStore, typeRegistry);
+    }
+
+    @Override
+    public AtlasVertex preCreate(AtlasRelationshipDef relationshipDef) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.preCreate({})", relationshipDef);
+        }
+
+        validateType(relationshipDef);
+
+        AtlasType type = typeRegistry.getType(relationshipDef.getName());
+
+        if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.RELATIONSHIP) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, relationshipDef.getName(), TypeCategory.RELATIONSHIP.name());
+        }
+
+        AtlasVertex relationshipDefVertex = typeDefStore.findTypeVertexByName(relationshipDef.getName());
+
+        if (relationshipDefVertex != null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, relationshipDef.getName());
+        }
+
+        relationshipDefVertex = typeDefStore.createTypeVertex(relationshipDef);
+
+        updateVertexPreCreate(relationshipDef, (AtlasRelationshipType) type, relationshipDefVertex);
+
+        final AtlasRelationshipEndDef endDef1        = relationshipDef.getEndDef1();
+        final AtlasRelationshipEndDef endDef2        = relationshipDef.getEndDef2();
+        final String                  type1          = endDef1.getType();
+        final String                  type2          = endDef2.getType();
+        final String                  name1          = endDef1.getName();
+        final String                  name2          = endDef2.getName();
+        final AtlasVertex             end1TypeVertex = typeDefStore.findTypeVertexByName(type1);
+        final AtlasVertex             end2TypeVertex = typeDefStore.findTypeVertexByName(type2);
+
+        if (end1TypeVertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_END_TYPE_NAME_NOT_FOUND, relationshipDef.getName(), type1);
+        }
+
+        if (end2TypeVertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_END_TYPE_NAME_NOT_FOUND, relationshipDef.getName(), type2);
+        }
+
+        // create an edge between the relationshipDef and each of the entityDef vertices.
+        AtlasEdge edge1 = typeDefStore.getOrCreateEdge(relationshipDefVertex, end1TypeVertex, AtlasGraphUtilsV2.RELATIONSHIPTYPE_EDGE_LABEL);
+
+        /*
+        Where edge1 and edge2 have the same names and types we do not need a second edge.
+        We are not invoking the equals method on the AtlasRelationshipedDef, as we only want 1 edge even if propagateTags or other properties are different.
+        */
+
+        if (type1.equals(type2) && name1.equals(name2)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("AtlasRelationshipDefStoreV1.preCreate({}): created relationshipDef vertex {}," +
+                        " and one edge as {}, because end1 and end2 have the same type and name", relationshipDef, relationshipDefVertex, edge1);
+            }
+
+        } else {
+            AtlasEdge edge2 = typeDefStore.getOrCreateEdge(relationshipDefVertex, end2TypeVertex, AtlasGraphUtilsV2.RELATIONSHIPTYPE_EDGE_LABEL);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("AtlasRelationshipDefStoreV1.preCreate({}): created relationshipDef vertex {}," +
+                        " edge1 as {}, edge2 as {} ", relationshipDef, relationshipDefVertex, edge1, edge2);
+            }
+
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.preCreate({}): {}", relationshipDef, relationshipDefVertex);
+        }
+        return relationshipDefVertex;
+    }
+
+    @Override
+    public AtlasRelationshipDef create(AtlasRelationshipDef relationshipDef, AtlasVertex preCreateResult)
+            throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.create({}, {})", relationshipDef, preCreateResult);
+        }
+
+        AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_CREATE, relationshipDef), "create relationship-def ", relationshipDef.getName());
+
+        AtlasVertex vertex = (preCreateResult == null) ? preCreate(relationshipDef) : preCreateResult;
+
+        AtlasRelationshipDef ret = toRelationshipDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.create({}, {}): {}", relationshipDef, preCreateResult, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public List<AtlasRelationshipDef> getAll() throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.getAll()");
+        }
+
+        List<AtlasRelationshipDef> ret = new ArrayList<>();
+        Iterator<AtlasVertex> vertices = typeDefStore.findTypeVerticesByCategory(TypeCategory.RELATIONSHIP);
+
+        while (vertices.hasNext()) {
+            ret.add(toRelationshipDef(vertices.next()));
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.getAll(): count={}", ret.size());
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasRelationshipDef getByName(String name) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.getByName({})", name);
+        }
+
+        AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.RELATIONSHIP);
+
+        if (vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
+        }
+
+        vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, TypeCategory.class);
+
+        AtlasRelationshipDef ret = toRelationshipDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.getByName({}): {}", name, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasRelationshipDef getByGuid(String guid) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.getByGuid({})", guid);
+        }
+
+        AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.RELATIONSHIP);
+
+        if (vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
+        }
+
+        AtlasRelationshipDef ret = toRelationshipDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.getByGuid({}): {}", guid, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasRelationshipDef update(AtlasRelationshipDef relationshipDef) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.update({})", relationshipDef);
+        }
+
+        validateType(relationshipDef);
+
+        AtlasRelationshipDef ret = StringUtils.isNotBlank(relationshipDef.getGuid())
+                ? updateByGuid(relationshipDef.getGuid(), relationshipDef)
+                : updateByName(relationshipDef.getName(), relationshipDef);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.update({}): {}", relationshipDef, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasRelationshipDef updateByName(String name, AtlasRelationshipDef relationshipDef)
+            throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.updateByName({}, {})", name, relationshipDef);
+        }
+
+        AtlasRelationshipDef existingDef = typeRegistry.getRelationshipDefByName(name);
+
+        AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update relationship-def ", name);
+
+        validateType(relationshipDef);
+
+        AtlasType type = typeRegistry.getType(relationshipDef.getName());
+
+        if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.RELATIONSHIP) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, relationshipDef.getName(), TypeCategory.RELATIONSHIP.name());
+        }
+
+        AtlasVertex vertex = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.RELATIONSHIP);
+
+        if (vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
+        }
+
+        preUpdateCheck(relationshipDef, (AtlasRelationshipType) type, vertex);
+
+        AtlasRelationshipDef ret = toRelationshipDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.updateByName({}, {}): {}", name, relationshipDef, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasRelationshipDef updateByGuid(String guid, AtlasRelationshipDef relationshipDef)
+            throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.updateByGuid({})", guid);
+        }
+
+        AtlasRelationshipDef existingDef = typeRegistry.getRelationshipDefByGuid(guid);
+
+        AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_UPDATE, existingDef), "update relationship-Def ", (existingDef != null ? existingDef.getName() : guid));
+
+        validateType(relationshipDef);
+
+        AtlasType type = typeRegistry.getTypeByGuid(guid);
+
+        if (type.getTypeCategory() != org.apache.atlas.model.TypeCategory.RELATIONSHIP) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_MATCH_FAILED, relationshipDef.getName(), TypeCategory.RELATIONSHIP.name());
+        }
+
+        AtlasVertex vertex = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.RELATIONSHIP);
+
+        if (vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
+        }
+
+        preUpdateCheck(relationshipDef, (AtlasRelationshipType) type, vertex);
+        // updates should not effect the edges between the types as we do not allow updates that change the endpoints.
+
+        AtlasRelationshipDef ret = toRelationshipDef(vertex);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.updateByGuid({}): {}", guid, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasVertex preDeleteByName(String name) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.preDeleteByName({})", name);
+        }
+
+        AtlasRelationshipDef existingDef = typeRegistry.getRelationshipDefByName(name);
+
+        AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete relationship-def ", name);
+
+        AtlasVertex ret = typeDefStore.findTypeVertexByNameAndCategory(name, TypeCategory.RELATIONSHIP);
+
+        if (ret == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, name);
+        }
+
+        if (AtlasGraphUtilsV2.relationshipTypeHasInstanceEdges(name)) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, name);
+        }
+
+        typeDefStore.deleteTypeVertexOutEdges(ret);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.preDeleteByName({}): {}", name, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasVertex preDeleteByGuid(String guid) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AtlasRelationshipDefStoreV1.preDeleteByGuid({})", guid);
+        }
+
+        AtlasRelationshipDef existingDef = typeRegistry.getRelationshipDefByGuid(guid);
+
+        AtlasAuthorizationUtils.verifyAccess(new AtlasTypeAccessRequest(AtlasPrivilege.TYPE_DELETE, existingDef), "delete relationship-def ", (existingDef != null ? existingDef.getName() : guid));
+
+        AtlasVertex ret = typeDefStore.findTypeVertexByGuidAndCategory(guid, TypeCategory.RELATIONSHIP);
+
+        if (ret == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_GUID_NOT_FOUND, guid);
+        }
+
+        String typeName = AtlasGraphUtilsV2.getProperty(ret, Constants.TYPENAME_PROPERTY_KEY, String.class);
+
+        if (AtlasGraphUtilsV2.relationshipTypeHasInstanceEdges(typeName)) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_HAS_REFERENCES, typeName);
+        }
+
+        typeDefStore.deleteTypeVertexOutEdges(ret);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== AtlasRelationshipDefStoreV1.preDeleteByGuid({}): {}", guid, ret);
+        }
+
+        return ret;
+    }
+
+    private void updateVertexPreCreate(AtlasRelationshipDef relationshipDef, AtlasRelationshipType relationshipType,
+                                       AtlasVertex vertex) throws AtlasBaseException {
+        AtlasRelationshipEndDef end1 = relationshipDef.getEndDef1();
+        AtlasRelationshipEndDef end2 = relationshipDef.getEndDef2();
+
+        // check whether the names added on the relationship Ends are reserved if required.
+        final boolean allowReservedKeywords;
+        try {
+            allowReservedKeywords = ApplicationProperties.get().getBoolean(ALLOW_RESERVED_KEYWORDS, true);
+        } catch (AtlasException e) {
+            throw new AtlasBaseException(e);
+        }
+
+        if (!allowReservedKeywords) {
+            if (AtlasDSL.Parser.isKeyword(end1.getName())) {
+                throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_END1_NAME_INVALID, end1.getName());
+            }
+
+            if (AtlasDSL.Parser.isKeyword(end2.getName())) {
+                throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_END2_NAME_INVALID, end2.getName());
+            }
+        }
+
+        AtlasStructDefStoreV2.updateVertexPreCreate(relationshipDef, relationshipType, vertex, typeDefStore);
+        // Update ends
+        setVertexPropertiesFromRelationshipDef(relationshipDef, vertex);
+    }
+
+    private void preUpdateCheck(AtlasRelationshipDef newRelationshipDef, AtlasRelationshipType relationshipType,
+                                AtlasVertex vertex) throws AtlasBaseException {
+        // We will not support an update to endpoints or category key
+        AtlasRelationshipDef existingRelationshipDef = toRelationshipDef(vertex);
+
+        preUpdateCheck(newRelationshipDef, existingRelationshipDef);
+        // we do allow change to tag propagation and the addition of new attributes.
+
+        AtlasStructDefStoreV2.updateVertexPreUpdate(newRelationshipDef, relationshipType, vertex, typeDefStore);
+
+        setVertexPropertiesFromRelationshipDef(newRelationshipDef, vertex);
+    }
+
+    /**
+     * Check ends are the same and relationshipCategory is the same.
+     *
+     * We do this by comparing 2 relationshipDefs to avoid exposing the AtlasVertex to unit testing.
+     *
+     * @param newRelationshipDef
+     * @param existingRelationshipDef
+     * @throws AtlasBaseException
+     */
+    public static void preUpdateCheck(AtlasRelationshipDef newRelationshipDef, AtlasRelationshipDef existingRelationshipDef) throws AtlasBaseException {
+        // do not allow renames of the Def.
+        String existingName = existingRelationshipDef.getName();
+        String newName      = newRelationshipDef.getName();
+
+        if (!existingName.equals(newName)) {
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_NAME_UPDATE,
+                    newRelationshipDef.getGuid(),existingName, newName);
+        }
+
+        RelationshipCategory existingRelationshipCategory = existingRelationshipDef.getRelationshipCategory();
+        RelationshipCategory newRelationshipCategory      = newRelationshipDef.getRelationshipCategory();
+
+        if ( !existingRelationshipCategory.equals(newRelationshipCategory)){
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_CATEGORY_UPDATE,
+                    newRelationshipDef.getName(),newRelationshipCategory.name(),
+                    existingRelationshipCategory.name() );
+        }
+
+        AtlasRelationshipEndDef existingEnd1 = existingRelationshipDef.getEndDef1();
+        AtlasRelationshipEndDef newEnd1      = newRelationshipDef.getEndDef1();
+
+        if ( !newEnd1.equals(existingEnd1) ) {
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END1_UPDATE,
+                                         newRelationshipDef.getName(), newEnd1.toString(), existingEnd1.toString());
+        }
+
+        AtlasRelationshipEndDef existingEnd2 = existingRelationshipDef.getEndDef2();
+        AtlasRelationshipEndDef newEnd2      = newRelationshipDef.getEndDef2();
+
+        if ( !newEnd2.equals(existingEnd2) ) {
+                throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END2_UPDATE,
+                                         newRelationshipDef.getName(), newEnd2.toString(), existingEnd2.toString());
+        }
+    }
+
+    public static void setVertexPropertiesFromRelationshipDef(AtlasRelationshipDef relationshipDef, AtlasVertex vertex) {
+        vertex.setProperty(Constants.RELATIONSHIPTYPE_END1_KEY, AtlasType.toJson(relationshipDef.getEndDef1()));
+        vertex.setProperty(Constants.RELATIONSHIPTYPE_END2_KEY, AtlasType.toJson(relationshipDef.getEndDef2()));
+        // default the relationship category to association if it has not been specified.
+        String relationshipCategory = RelationshipCategory.ASSOCIATION.name();
+        if (relationshipDef.getRelationshipCategory()!=null) {
+            relationshipCategory =relationshipDef.getRelationshipCategory().name();
+        }
+        // Update RelationshipCategory
+        vertex.setProperty(Constants.RELATIONSHIPTYPE_CATEGORY_KEY, relationshipCategory);
+
+        if (relationshipDef.getPropagateTags() == null) {
+            vertex.setProperty(Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, AtlasRelationshipDef.PropagateTags.NONE.name());
+        } else {
+            vertex.setProperty(Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, relationshipDef.getPropagateTags().name());
+        }
+    }
+
+    private AtlasRelationshipDef toRelationshipDef(AtlasVertex vertex) throws AtlasBaseException {
+        AtlasRelationshipDef ret = null;
+
+        if (vertex != null && typeDefStore.isTypeVertex(vertex, TypeCategory.RELATIONSHIP)) {
+            String name         = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class);
+            String description  = vertex.getProperty(Constants.TYPEDESCRIPTION_PROPERTY_KEY, String.class);
+            String version      = vertex.getProperty(Constants.TYPEVERSION_PROPERTY_KEY, String.class);
+            String end1Str = vertex.getProperty(Constants.RELATIONSHIPTYPE_END1_KEY, String.class);
+            String end2Str = vertex.getProperty(Constants.RELATIONSHIPTYPE_END2_KEY, String.class);
+            String relationStr  = vertex.getProperty(Constants.RELATIONSHIPTYPE_CATEGORY_KEY, String.class);
+            String propagateStr = vertex.getProperty(Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, String.class);
+
+            // set the ends
+            AtlasRelationshipEndDef endDef1 = AtlasType.fromJson(end1Str, AtlasRelationshipEndDef.class);
+            AtlasRelationshipEndDef endDef2 = AtlasType.fromJson(end2Str, AtlasRelationshipEndDef.class);
+
+            // set the relationship Category
+            RelationshipCategory relationshipCategory = null;
+            for (RelationshipCategory value : RelationshipCategory.values()) {
+                if (value.name().equals(relationStr)) {
+                    relationshipCategory = value;
+                }
+            }
+
+            // set the propagateTags
+            PropagateTags propagateTags = null;
+            for (PropagateTags value : PropagateTags.values()) {
+                if (value.name().equals(propagateStr)) {
+                    propagateTags = value;
+                }
+            }
+
+            ret = new AtlasRelationshipDef(name, description, version, relationshipCategory,  propagateTags, endDef1, endDef2);
+
+            // add in the attributes
+            AtlasStructDefStoreV2.toStructDef(vertex, ret, typeDefStore);
+        }
+
+        return ret;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/3d5b4880/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
new file mode 100644
index 0000000..eb1079c
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
@@ -0,0 +1,837 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.TypeCategory;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity.Status;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.AtlasRelationship.AtlasRelationshipWithExtInfo;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
+import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.RepositoryException;
+import org.apache.atlas.repository.graph.GraphHelper;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasRelationshipType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
+import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
+import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO;
+import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE;
+import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
+import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid;
+import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName;
+import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertices;
+import static org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getState;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getTypeName;
+
+@Component
+public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasRelationshipStoreV2.class);
+
+    private static final Long DEFAULT_RELATIONSHIP_VERSION = 0L;
+
+    private final AtlasTypeRegistry         typeRegistry;
+    private final EntityGraphRetriever      entityRetriever;
+    private final DeleteHandlerV1 deleteHandler;
+    private final GraphHelper               graphHelper = GraphHelper.getInstance();
+    private final AtlasEntityChangeNotifier entityChangeNotifier;
+
+    @Inject
+    public AtlasRelationshipStoreV2(AtlasTypeRegistry typeRegistry, DeleteHandlerV1 deleteHandler, AtlasEntityChangeNotifier entityChangeNotifier) {
+        this.typeRegistry         = typeRegistry;
+        this.entityRetriever      = new EntityGraphRetriever(typeRegistry);
+        this.deleteHandler        = deleteHandler;
+        this.entityChangeNotifier = entityChangeNotifier;
+    }
+
+    @Override
+    @GraphTransaction
+    public AtlasRelationship create(AtlasRelationship relationship) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> create({})", relationship);
+        }
+
+        AtlasVertex end1Vertex = getVertexFromEndPoint(relationship.getEnd1());
+        AtlasVertex end2Vertex = getVertexFromEndPoint(relationship.getEnd2());
+
+        validateRelationship(end1Vertex, end2Vertex, relationship.getTypeName(), relationship.getAttributes());
+
+        AtlasEdge edge = createRelationship(end1Vertex, end2Vertex, relationship);
+
+        AtlasRelationship ret = edge != null ? entityRetriever.mapEdgeToAtlasRelationship(edge) : null;
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== create({}): {}", relationship, ret);
+        }
+
+        // notify entities for added/removed classification propagation
+        entityChangeNotifier.notifyPropagatedEntities();
+
+        return ret;
+    }
+
+    @Override
+    @GraphTransaction
+    public AtlasRelationship update(AtlasRelationship relationship) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> update({})", relationship);
+        }
+
+        String guid = relationship.getGuid();
+
+        if (StringUtils.isEmpty(guid)) {
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, guid);
+        }
+
+        AtlasEdge   edge       = graphHelper.getEdgeForGUID(guid);
+        String      edgeType   = AtlasGraphUtilsV2.getTypeName(edge);
+        AtlasVertex end1Vertex = edge.getOutVertex();
+        AtlasVertex end2Vertex = edge.getInVertex();
+
+        // update shouldn't change endType
+        if (StringUtils.isNotEmpty(relationship.getTypeName()) && !StringUtils.equalsIgnoreCase(edgeType, relationship.getTypeName())) {
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_TYPE_CHANGE_NOT_ALLOWED, guid, edgeType, relationship.getTypeName());
+        }
+
+        // update shouldn't change ends
+        if (relationship.getEnd1() != null) {
+            String updatedEnd1Guid = relationship.getEnd1().getGuid();
+
+            if (updatedEnd1Guid == null) {
+                AtlasVertex updatedEnd1Vertex = getVertexFromEndPoint(relationship.getEnd1());
+
+                updatedEnd1Guid = updatedEnd1Vertex == null ? null : AtlasGraphUtilsV2.getIdFromVertex(updatedEnd1Vertex);
+            }
+
+            if (updatedEnd1Guid != null) {
+                String end1Guid = AtlasGraphUtilsV2.getIdFromVertex(end1Vertex);
+
+                if (!StringUtils.equalsIgnoreCase(relationship.getEnd1().getGuid(), end1Guid)) {
+                    throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_END_CHANGE_NOT_ALLOWED, edgeType, guid, end1Guid, relationship.getEnd1().getGuid());
+                }
+            }
+        }
+
+        // update shouldn't change ends
+        if (relationship.getEnd2() != null) {
+            String updatedEnd2Guid = relationship.getEnd2().getGuid();
+
+            if (updatedEnd2Guid == null) {
+                AtlasVertex updatedEnd2Vertex = getVertexFromEndPoint(relationship.getEnd2());
+
+                updatedEnd2Guid = updatedEnd2Vertex == null ? null : AtlasGraphUtilsV2.getIdFromVertex(updatedEnd2Vertex);
+            }
+
+            if (updatedEnd2Guid != null) {
+                String end2Guid = AtlasGraphUtilsV2.getIdFromVertex(end2Vertex);
+
+                if (!StringUtils.equalsIgnoreCase(relationship.getEnd2().getGuid(), end2Guid)) {
+                    throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_UPDATE_END_CHANGE_NOT_ALLOWED, AtlasGraphUtilsV2.getTypeName(edge), guid, end2Guid, relationship.getEnd2().getGuid());
+                }
+            }
+        }
+
+
+        validateRelationship(end1Vertex, end2Vertex, edgeType, relationship.getAttributes());
+
+        AtlasRelationship ret = updateRelationship(edge, relationship);
+
+        // notify entities for added/removed classification propagation
+        entityChangeNotifier.notifyPropagatedEntities();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== update({}): {}", relationship, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    @GraphTransaction
+    public AtlasRelationship getById(String guid) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> getById({})", guid);
+        }
+
+        AtlasEdge         edge = graphHelper.getEdgeForGUID(guid);
+        AtlasRelationship ret  = entityRetriever.mapEdgeToAtlasRelationship(edge);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== getById({}): {}", guid, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    @GraphTransaction
+    public AtlasRelationshipWithExtInfo getExtInfoById(String guid) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> getExtInfoById({})", guid);
+        }
+
+        AtlasEdge                    edge = graphHelper.getEdgeForGUID(guid);
+        AtlasRelationshipWithExtInfo ret  = entityRetriever.mapEdgeToAtlasRelationshipWithExtInfo(edge);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== getExtInfoById({}): {}", guid, ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    @GraphTransaction
+    public void deleteById(String guid) throws AtlasBaseException {
+        deleteById(guid, false);
+    }
+
+    @Override
+    @GraphTransaction
+    public void deleteById(String guid, boolean forceDelete) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> deleteById({}, {})", guid, forceDelete);
+        }
+
+        if (StringUtils.isEmpty(guid)) {
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, " empty/null guid");
+        }
+
+        AtlasEdge edge = graphHelper.getEdgeForGUID(guid);
+
+        if (edge == null) {
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, guid);
+        }
+
+        if (getState(edge) == DELETED) {
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_DELETED, guid);
+        }
+
+        deleteHandler.deleteRelationships(Collections.singleton(edge), forceDelete);
+
+        // notify entities for added/removed classification propagation
+        entityChangeNotifier.notifyPropagatedEntities();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== deleteById({}): {}", guid);
+        }
+    }
+
+    @Override
+    public AtlasEdge getOrCreate(AtlasVertex end1Vertex, AtlasVertex end2Vertex, AtlasRelationship relationship) throws AtlasBaseException {
+        AtlasEdge ret = getRelationshipEdge(end1Vertex, end2Vertex, relationship.getTypeName());
+
+        if (ret == null) {
+            validateRelationship(end1Vertex, end2Vertex, relationship.getTypeName(), relationship.getAttributes());
+
+            ret = createRelationship(end1Vertex, end2Vertex, relationship);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public AtlasRelationship getOrCreate(AtlasRelationship relationship) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> getOrCreate({})", relationship);
+        }
+
+        validateRelationship(relationship);
+
+        AtlasVertex       end1Vertex = getVertexFromEndPoint(relationship.getEnd1());
+        AtlasVertex       end2Vertex = getVertexFromEndPoint(relationship.getEnd2());
+        AtlasRelationship ret        = null;
+
+        // check if relationship exists
+        AtlasEdge relationshipEdge = getRelationshipEdge(end1Vertex, end2Vertex, relationship.getTypeName());
+
+        if (relationshipEdge == null) {
+            validateRelationship(relationship);
+
+            relationshipEdge = createRelationship(end1Vertex, end2Vertex, relationship);
+        }
+
+        if (relationshipEdge != null){
+            ret = entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== getOrCreate({}): {}", relationship, ret);
+        }
+
+        return ret;
+    }
+
+    private AtlasEdge createRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex, AtlasRelationship relationship) throws AtlasBaseException {
+        AtlasEdge ret = null;
+
+        try {
+            ret = getRelationshipEdge(end1Vertex, end2Vertex, relationship.getTypeName());
+
+            if (ret == null) {
+                ret = createRelationshipEdge(end1Vertex, end2Vertex, relationship);
+
+                AtlasRelationshipType relationType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
+
+                if (MapUtils.isNotEmpty(relationType.getAllAttributes())) {
+                    for (AtlasAttribute attr : relationType.getAllAttributes().values()) {
+                        String attrName           = attr.getName();
+                        String attrVertexProperty = attr.getVertexPropertyName();
+                        Object attrValue          = relationship.getAttribute(attrName);
+
+                        AtlasGraphUtilsV2.setProperty(ret, attrVertexProperty, attrValue);
+                    }
+                }
+            } else {
+                throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_ALREADY_EXISTS, relationship.getTypeName(),
+                                             AtlasGraphUtilsV2.getIdFromVertex(end1Vertex), AtlasGraphUtilsV2.getIdFromVertex(end2Vertex));
+            }
+        } catch (RepositoryException e) {
+            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
+        }
+
+        return ret;
+    }
+
+    private AtlasRelationship updateRelationship(AtlasEdge relationshipEdge, AtlasRelationship relationship) throws AtlasBaseException {
+        AtlasRelationshipType relationType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
+
+        updateTagPropagations(relationshipEdge, relationship);
+
+        if (MapUtils.isNotEmpty(relationType.getAllAttributes())) {
+            for (AtlasAttribute attr : relationType.getAllAttributes().values()) {
+                String attrName           = attr.getName();
+                String attrVertexProperty = attr.getVertexPropertyName();
+
+                if (relationship.hasAttribute(attrName)) {
+                    AtlasGraphUtilsV2.setProperty(relationshipEdge, attrVertexProperty, relationship.getAttribute(attrName));
+                }
+            }
+        }
+
+        return entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
+    }
+
+    private void handleBlockedClassifications(AtlasEdge edge, Set<AtlasClassification> blockedPropagatedClassifications) throws AtlasBaseException {
+        if (blockedPropagatedClassifications != null) {
+            List<AtlasVertex> propagatedClassificationVertices               = getClassificationVertices(edge);
+            List<String>      currentClassificationIds                       = getBlockedClassificationIds(edge);
+            List<AtlasVertex> currentBlockedPropagatedClassificationVertices = getBlockedClassificationVertices(propagatedClassificationVertices, currentClassificationIds);
+            List<AtlasVertex> updatedBlockedPropagatedClassificationVertices = new ArrayList<>();
+            List<String>      updatedClassificationIds                       = new ArrayList<>();
+
+            for (AtlasClassification classification : blockedPropagatedClassifications) {
+                AtlasVertex classificationVertex = validateBlockedPropagatedClassification(propagatedClassificationVertices, classification);
+
+                // ignore invalid blocked propagated classification
+                if (classificationVertex == null) {
+                    continue;
+                }
+
+                updatedBlockedPropagatedClassificationVertices.add(classificationVertex);
+
+                String classificationId = classificationVertex.getIdForDisplay();
+
+                updatedClassificationIds.add(classificationId);
+            }
+
+            addToBlockedClassificationIds(edge, updatedClassificationIds);
+
+            // remove propagated tag for added entry
+            List<AtlasVertex> addedBlockedClassifications = (List<AtlasVertex>) CollectionUtils.subtract(updatedBlockedPropagatedClassificationVertices, currentBlockedPropagatedClassificationVertices);
+
+            for (AtlasVertex classificationVertex : addedBlockedClassifications) {
+                List<AtlasVertex> removePropagationFromVertices = graphHelper.getPropagatedEntityVertices(classificationVertex);
+
+                deleteHandler.removeTagPropagation(classificationVertex, removePropagationFromVertices);
+            }
+
+            // add propagated tag for removed entry
+            List<AtlasVertex> removedBlockedClassifications = (List<AtlasVertex>) CollectionUtils.subtract(currentBlockedPropagatedClassificationVertices, updatedBlockedPropagatedClassificationVertices);
+
+            for (AtlasVertex classificationVertex : removedBlockedClassifications) {
+                List<AtlasVertex> addPropagationToVertices = graphHelper.getPropagatedEntityVertices(classificationVertex);
+
+                deleteHandler.addTagPropagation(classificationVertex, addPropagationToVertices);
+            }
+        }
+    }
+
+    private List<AtlasVertex> getBlockedClassificationVertices(List<AtlasVertex> classificationVertices, List<String> blockedClassificationIds) {
+        List<AtlasVertex> ret = new ArrayList<>();
+
+        if (CollectionUtils.isNotEmpty(blockedClassificationIds)) {
+            for (AtlasVertex classificationVertex : classificationVertices) {
+                String classificationId = classificationVertex.getIdForDisplay();
+
+                if (blockedClassificationIds.contains(classificationId)) {
+                    ret.add(classificationVertex);
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    // propagated classifications should contain blocked propagated classification
+    private AtlasVertex validateBlockedPropagatedClassification(List<AtlasVertex> classificationVertices, AtlasClassification classification) {
+        AtlasVertex ret = null;
+
+        for (AtlasVertex vertex : classificationVertices) {
+            String classificationName = getClassificationName(vertex);
+            String entityGuid         = getClassificationEntityGuid(vertex);
+
+            if (classificationName.equals(classification.getTypeName()) && entityGuid.equals(classification.getEntityGuid())) {
+                ret = vertex;
+                break;
+            }
+        }
+
+        return ret;
+    }
+
+    private void addToBlockedClassificationIds(AtlasEdge edge, List<String> classificationIds) {
+        if (edge != null) {
+            if (classificationIds.isEmpty()) {
+                edge.removeProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY);
+            } else {
+                edge.setListProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY, classificationIds);
+            }
+        }
+    }
+
+    private void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship) throws AtlasBaseException {
+        PropagateTags oldTagPropagation = getPropagateTags(edge);
+        PropagateTags newTagPropagation = relationship.getPropagateTags();
+
+        if (newTagPropagation != oldTagPropagation) {
+            List<AtlasVertex>                   currentClassificationVertices = getClassificationVertices(edge);
+            Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap     = graphHelper.getClassificationPropagatedEntitiesMapping(currentClassificationVertices);
+
+            // Update propagation edge
+            AtlasGraphUtilsV2.setProperty(edge, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, newTagPropagation.name());
+
+            List<AtlasVertex>                   updatedClassificationVertices = getClassificationVertices(edge);
+            List<AtlasVertex>                   classificationVerticesUnion   = (List<AtlasVertex>) CollectionUtils.union(currentClassificationVertices, updatedClassificationVertices);
+            Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap     = graphHelper.getClassificationPropagatedEntitiesMapping(classificationVerticesUnion);
+
+            // compute add/remove propagations list
+            Map<AtlasVertex, List<AtlasVertex>> addPropagationsMap    = new HashMap<>();
+            Map<AtlasVertex, List<AtlasVertex>> removePropagationsMap = new HashMap<>();
+
+            if (MapUtils.isEmpty(currentClassificationsMap) && MapUtils.isNotEmpty(updatedClassificationsMap)) {
+                addPropagationsMap.putAll(updatedClassificationsMap);
+
+            } else if (MapUtils.isNotEmpty(currentClassificationsMap) && MapUtils.isEmpty(updatedClassificationsMap)) {
+                removePropagationsMap.putAll(currentClassificationsMap);
+
+            } else {
+                for (AtlasVertex classificationVertex : updatedClassificationsMap.keySet()) {
+                    List<AtlasVertex> currentPropagatingEntities = currentClassificationsMap.containsKey(classificationVertex) ? currentClassificationsMap.get(classificationVertex) : Collections.emptyList();
+                    List<AtlasVertex> updatedPropagatingEntities = updatedClassificationsMap.containsKey(classificationVertex) ? updatedClassificationsMap.get(classificationVertex) : Collections.emptyList();
+
+                    List<AtlasVertex> entitiesAdded   = (List<AtlasVertex>) CollectionUtils.subtract(updatedPropagatingEntities, currentPropagatingEntities);
+                    List<AtlasVertex> entitiesRemoved = (List<AtlasVertex>) CollectionUtils.subtract(currentPropagatingEntities, updatedPropagatingEntities);
+
+                    if (CollectionUtils.isNotEmpty(entitiesAdded)) {
+                        addPropagationsMap.put(classificationVertex, entitiesAdded);
+                    }
+
+                    if (CollectionUtils.isNotEmpty(entitiesRemoved)) {
+                        removePropagationsMap.put(classificationVertex, entitiesRemoved);
+                    }
+                }
+            }
+
+            for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) {
+                deleteHandler.addTagPropagation(classificationVertex, addPropagationsMap.get(classificationVertex));
+            }
+
+            for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) {
+                deleteHandler.removeTagPropagation(classificationVertex, removePropagationsMap.get(classificationVertex));
+            }
+        } else {
+            // update blocked propagated classifications only if there is no change is tag propagation (don't update both)
+            handleBlockedClassifications(edge, relationship.getBlockedPropagatedClassifications());
+        }
+    }
+
+    private void validateRelationship(AtlasRelationship relationship) throws AtlasBaseException {
+        if (relationship == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "AtlasRelationship is null");
+        }
+
+        String                relationshipName = relationship.getTypeName();
+        String                end1TypeName     = getTypeNameFromObjectId(relationship.getEnd1());
+        String                end2TypeName     = getTypeNameFromObjectId(relationship.getEnd2());
+        AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipName);
+
+        if (relationshipType == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "unknown relationship type'" + relationshipName + "'");
+        }
+
+        if (relationship.getEnd1() == null || relationship.getEnd2() == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "end1/end2 is null");
+        }
+
+        boolean validEndTypes = false;
+
+        if (relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end1TypeName)) {
+            validEndTypes = relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end2TypeName);
+        } else if (relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end1TypeName)) {
+            validEndTypes = relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end2TypeName);
+        }
+
+        if (!validEndTypes) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, relationshipName, relationshipType.getEnd2Type().getTypeName(), end1TypeName);
+        }
+
+        validateEnds(relationship);
+
+        validateAndNormalize(relationship);
+    }
+
+    private void validateRelationship(AtlasVertex end1Vertex, AtlasVertex end2Vertex, String relationshipName, Map<String, Object> attributes) throws AtlasBaseException {
+        AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipName);
+
+        if (relationshipType == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "unknown relationship type'" + relationshipName + "'");
+        }
+
+        if (end1Vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_END_IS_NULL, relationshipType.getEnd1Type().getTypeName());
+        }
+
+        if (end2Vertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_END_IS_NULL, relationshipType.getEnd2Type().getTypeName());
+        }
+
+        String                end1TypeName     = AtlasGraphUtilsV2.getTypeName(end1Vertex);
+        String                end2TypeName     = AtlasGraphUtilsV2.getTypeName(end2Vertex);
+
+        boolean validEndTypes = false;
+
+        if (relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end1TypeName)) {
+            validEndTypes = relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end2TypeName);
+        } else if (relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end1TypeName)) {
+            validEndTypes = relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end2TypeName);
+        }
+
+        if (!validEndTypes) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, relationshipName, relationshipType.getEnd2Type().getTypeName(), end1TypeName);
+        }
+
+        List<String>      messages     = new ArrayList<>();
+        AtlasRelationship relationship = new AtlasRelationship(relationshipName, attributes);
+
+        relationshipType.validateValue(relationship, relationshipName, messages);
+
+        if (!messages.isEmpty()) {
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, messages);
+        }
+
+        relationshipType.getNormalizedValue(relationship);
+    }
+
+
+    /**
+     * Validate the ends of the passed relationship
+     * @param relationship
+     * @throws AtlasBaseException
+     */
+    private void validateEnds(AtlasRelationship relationship) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("validateEnds entry relationship:" + relationship);
+        }
+        List<AtlasObjectId>           ends                 = new ArrayList<>();
+        List<AtlasRelationshipEndDef> endDefs              = new ArrayList<>();
+        String                        relationshipTypeName = relationship.getTypeName();
+        AtlasRelationshipDef          relationshipDef      = typeRegistry.getRelationshipDefByName(relationshipTypeName);
+
+        ends.add(relationship.getEnd1());
+        ends.add(relationship.getEnd2());
+        endDefs.add(relationshipDef.getEndDef1());
+        endDefs.add(relationshipDef.getEndDef2());
+
+        for (int i = 0; i < ends.size(); i++) {
+            AtlasObjectId       end              = ends.get(i);
+            String              guid             = end.getGuid();
+            String              typeName         = end.getTypeName();
+            Map<String, Object> uniqueAttributes = end.getUniqueAttributes();
+            AtlasVertex         endVertex        = AtlasGraphUtilsV2.findByGuid(guid);
+
+            if (!AtlasTypeUtil.isValidGuid(guid) || endVertex == null) {
+                throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+
+            } else if (MapUtils.isNotEmpty(uniqueAttributes)) {
+                AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+
+                if (AtlasGraphUtilsV2.findByUniqueAttributes(entityType, uniqueAttributes) == null) {
+                    throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, typeName, uniqueAttributes.toString());
+                }
+            } else {
+                // check whether the guid is the correct type
+                String vertexTypeName = endVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
+
+                if (!Objects.equals(vertexTypeName, typeName)) {
+                    String attrName = endDefs.get(i).getName();
+
+                    throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_INVALID_ENDTYPE, attrName, guid, vertexTypeName, typeName);
+                }
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("validateEnds exit successfully validated relationship:" + relationship);
+        }
+    }
+
+    private void validateAndNormalize(AtlasRelationship relationship) throws AtlasBaseException {
+        List<String> messages = new ArrayList<>();
+
+        if (! AtlasTypeUtil.isValidGuid(relationship.getGuid())) {
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_GUID_NOT_FOUND, relationship.getGuid());
+        }
+
+        AtlasRelationshipType type = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
+
+        if (type == null) {
+            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.RELATIONSHIP.name(), relationship.getTypeName());
+        }
+
+        type.validateValue(relationship, relationship.getTypeName(), messages);
+
+        if (!messages.isEmpty()) {
+            throw new AtlasBaseException(AtlasErrorCode.RELATIONSHIP_CRUD_INVALID_PARAMS, messages);
+        }
+
+        type.getNormalizedValue(relationship);
+    }
+
+    public AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String relationshipType) {
+        String              relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationshipType);
+        Iterator<AtlasEdge> edgesIterator     = getOutGoingEdgesByLabel(fromVertex, relationshipLabel);
+        AtlasEdge           ret               = null;
+
+        while (edgesIterator != null && edgesIterator.hasNext()) {
+            AtlasEdge edge = edgesIterator.next();
+
+            if (edge != null) {
+                Status status = graphHelper.getStatus(edge);
+
+                if ((status == null || status == ACTIVE) &&
+                        StringUtils.equals(getIdFromVertex(edge.getInVertex()), getIdFromVertex(toVertex))) {
+                    ret = edge;
+                    break;
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    private Long getRelationshipVersion(AtlasRelationship relationship) {
+        Long ret = relationship != null ? relationship.getVersion() : null;
+
+        return (ret != null) ? ret : DEFAULT_RELATIONSHIP_VERSION;
+    }
+
+    private AtlasVertex getVertexFromEndPoint(AtlasObjectId endPoint) {
+        AtlasVertex ret = null;
+
+        if (StringUtils.isNotEmpty(endPoint.getGuid())) {
+            ret = AtlasGraphUtilsV2.findByGuid(endPoint.getGuid());
+        } else if (StringUtils.isNotEmpty(endPoint.getTypeName()) && MapUtils.isNotEmpty(endPoint.getUniqueAttributes())) {
+            AtlasEntityType entityType = typeRegistry.getEntityTypeByName(endPoint.getTypeName());
+
+            ret = AtlasGraphUtilsV2.findByUniqueAttributes(entityType, endPoint.getUniqueAttributes());
+        }
+
+        return ret;
+    }
+
+    private AtlasEdge createRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) throws RepositoryException, AtlasBaseException {
+        String        relationshipLabel = getRelationshipEdgeLabel(fromVertex, toVertex, relationship.getTypeName());
+        PropagateTags tagPropagation    = getRelationshipTagPropagation(fromVertex, toVertex, relationship);
+        AtlasEdge     ret               = graphHelper.getOrCreateEdge(fromVertex, toVertex, relationshipLabel);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Created relationship edge from [{}] --> [{}] using edge label: [{}]", getTypeName(fromVertex), getTypeName(toVertex), relationshipLabel);
+        }
+
+        // map additional properties to relationship edge
+        if (ret != null) {
+            final String guid = UUID.randomUUID().toString();
+
+            AtlasGraphUtilsV2.setProperty(ret, Constants.ENTITY_TYPE_PROPERTY_KEY, relationship.getTypeName());
+            AtlasGraphUtilsV2.setProperty(ret, Constants.RELATIONSHIP_GUID_PROPERTY_KEY, guid);
+            AtlasGraphUtilsV2.setProperty(ret, Constants.VERSION_PROPERTY_KEY, getRelationshipVersion(relationship));
+            AtlasGraphUtilsV2.setProperty(ret, Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, tagPropagation.name());
+
+            // blocked propagated classifications
+            handleBlockedClassifications(ret, relationship.getBlockedPropagatedClassifications());
+
+            // propagate tags
+            deleteHandler.addTagPropagation(ret, tagPropagation);
+        }
+
+        return ret;
+    }
+
+    private PropagateTags getRelationshipTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasRelationship relationship) {
+        AtlasRelationshipType   relationshipType = typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
+        AtlasRelationshipEndDef endDef1          = relationshipType.getRelationshipDef().getEndDef1();
+        AtlasRelationshipEndDef endDef2          = relationshipType.getRelationshipDef().getEndDef2();
+        Set<String>             fromVertexTypes  = getTypeAndAllSuperTypes(getTypeName(fromVertex));
+        Set<String>             toVertexTypes    = getTypeAndAllSuperTypes(getTypeName(toVertex));
+        PropagateTags           ret              = relationshipType.getRelationshipDef().getPropagateTags();
+
+        // relationshipDef is defined as end1 (hive_db) and end2 (hive_table) and tagPropagation = ONE_TO_TWO
+        // relationship edge exists from [hive_table --> hive_db]
+        // swap the tagPropagation property for such cases.
+        if (fromVertexTypes.contains(endDef2.getType()) && toVertexTypes.contains(endDef1.getType())) {
+            if (ret == ONE_TO_TWO) {
+                ret = TWO_TO_ONE;
+            } else if (ret == TWO_TO_ONE) {
+                ret = ONE_TO_TWO;
+            }
+        }
+
+        return ret;
+    }
+
+    private String getRelationshipEdgeLabel(AtlasVertex fromVertex, AtlasVertex toVertex, String relationshipTypeName) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("getRelationshipEdgeLabel({})", relationshipTypeName);
+        }
+
+        AtlasRelationshipType   relationshipType   = typeRegistry.getRelationshipTypeByName(relationshipTypeName);
+        String                  ret                = relationshipType.getRelationshipDef().getRelationshipLabel();
+        AtlasRelationshipEndDef endDef1            = relationshipType.getRelationshipDef().getEndDef1();
+        AtlasRelationshipEndDef endDef2            = relationshipType.getRelationshipDef().getEndDef2();
+        Set<String>             fromVertexTypes    = getTypeAndAllSuperTypes(AtlasGraphUtilsV2.getTypeName(fromVertex));
+        Set<String>             toVertexTypes      = getTypeAndAllSuperTypes(AtlasGraphUtilsV2.getTypeName(toVertex));
+        AtlasAttribute          attribute          = null;
+
+        // validate entity type and all its supertypes contains relationshipDefs end type
+        // e.g. [hive_process -> hive_table] -> [Process -> DataSet]
+        if (fromVertexTypes.contains(endDef1.getType()) && toVertexTypes.contains(endDef2.getType())) {
+            String attributeName = endDef1.getName();
+
+            attribute = relationshipType.getEnd1Type().getRelationshipAttribute(attributeName);
+
+        } else if (fromVertexTypes.contains(endDef2.getType()) && toVertexTypes.contains(endDef1.getType())) {
+            String attributeName = endDef2.getName();
+
+            attribute = relationshipType.getEnd2Type().getRelationshipAttribute(attributeName);
+        }
+
+        if (attribute != null) {
+            ret = attribute.getRelationshipEdgeLabel();
+        }
+
+        return ret;
+    }
+
+    public Set<String> getTypeAndAllSuperTypes(String entityTypeName) {
+        AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName);
+
+        return (entityType != null) ? entityType.getTypeAndAllSuperTypes() : new HashSet<String>();
+    }
+
+    private String getTypeNameFromObjectId(AtlasObjectId objectId) {
+        String typeName = objectId.getTypeName();
+
+        if (StringUtils.isBlank(typeName)) {
+            typeName = AtlasGraphUtilsV2.getTypeNameFromGuid(objectId.getGuid());
+        }
+
+        return typeName;
+    }
+
+    /**
+     * Check whether this vertex has a relationship associated with this relationship type.
+     * @param vertex
+     * @param relationshipTypeName
+     * @return true if found an edge with this relationship type in.
+     */
+    private boolean vertexHasRelationshipWithType(AtlasVertex vertex, String relationshipTypeName) {
+        String relationshipEdgeLabel = getRelationshipEdgeLabel(getTypeName(vertex), relationshipTypeName);
+        Iterator<AtlasEdge> iter     = graphHelper.getAdjacentEdgesByLabel(vertex, AtlasEdgeDirection.BOTH, relationshipEdgeLabel);
+
+        return (iter != null) ? iter.hasNext() : false;
+    }
+
+    private String getRelationshipEdgeLabel(String typeName, String relationshipTypeName) {
+        AtlasRelationshipType relationshipType = typeRegistry.getRelationshipTypeByName(relationshipTypeName);
+        AtlasRelationshipDef  relationshipDef  = relationshipType.getRelationshipDef();
+        AtlasEntityType       end1Type         = relationshipType.getEnd1Type();
+        AtlasEntityType       end2Type         = relationshipType.getEnd2Type();
+        Set<String>           vertexTypes      = getTypeAndAllSuperTypes(typeName);
+        AtlasAttribute        attribute        = null;
+
+        if (vertexTypes.contains(end1Type.getTypeName())) {
+            String attributeName = relationshipDef.getEndDef1().getName();
+
+            attribute = (attributeName != null) ? end1Type.getAttribute(attributeName) : null;
+        } else if (vertexTypes.contains(end2Type.getTypeName())) {
+            String attributeName = relationshipDef.getEndDef2().getName();
+
+            attribute = (attributeName != null) ? end2Type.getAttribute(attributeName) : null;
+        }
+
+        return (attribute != null) ? attribute.getRelationshipEdgeLabel() : null;
+    }
+}
\ No newline at end of file