You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by is...@apache.org on 2021/07/11 00:29:11 UTC

[airavata-data-lake] branch master updated: Move transfer mappings to storage

This is an automated email from the ASF dual-hosted git repository.

isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git


The following commit(s) were added to refs/heads/master by this push:
     new f1200a1  Move transfer mappings to storage
     new 4bb9ee4  Merge pull request #23 from isururanawaka/workflow_invocation
f1200a1 is described below

commit f1200a1a77d1647b75ea28824eeddb1463b0fdcc
Author: Isuru Ranawaka <ir...@gmail.com>
AuthorDate: Sat Jul 10 20:28:23 2021 -0400

    Move transfer mappings to storage
---
 .../orchestrator/connectors/DRMSConnector.java     | 122 +++++++-------
 .../processor/OutboundEventProcessor.java          |   3 +-
 .../java/org/apache/airavata/drms/api/Client.java  |  32 ++--
 .../handlers/StoragePreferenceServiceHandler.java  | 169 --------------------
 .../drms/api/handlers/StorageServiceHandler.java   | 177 +++++++++++++++++++++
 .../deserializer/TransferMappingDeserializer.java  |  22 +--
 .../drms-rest-proxy/src/main/resources/drms.pb     | Bin 109388 -> 109344 bytes
 .../preference/StoragePreferenceService.proto      |  50 ------
 .../src/main/proto/storage/StorageService.proto    |  57 +++++++
 9 files changed, 324 insertions(+), 308 deletions(-)

diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
index d76ffa6..f87542e 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -54,67 +54,67 @@ public class DRMSConnector implements AbstractConnector<Configuration> {
         return !this.drmsChannel.isShutdown();
     }
 
-    public Optional<String> getSourceStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
-        DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
-                .setAccessToken(entity.getAuthToken())
-                .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
-                .setAuthenticatedUser(AuthenticatedUser.newBuilder()
-                        .setUsername(entity.getOwnerId())
-                        .setTenantId(entity.getTenantId())
-                        .build())
-                .build();
-        FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
-                .setAuthToken(serviceAuthToken)
-                .build();
-        FindTransferMappingsResponse response = storagePreferenceServiceBlockingStub.getTransferMappings(request);
-        List<TransferMapping> transferMappingList = response.getMappingsList();
-        AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
-        if (!transferMappingList.isEmpty()) {
-            transferMappingList.forEach(transferMapping -> {
-                if (transferMapping.getSourceStoragePreference().getStorageCase()
-                        .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
-                    if (transferMapping.getSourceStoragePreference().getSshStoragePreference()
-                            .getStorage().getHostName().equals(hostname)) {
-                        storagePreferenceId
-                                .set(transferMapping.getSourceStoragePreference()
-                                        .getSshStoragePreference().getStoragePreferenceId());
-                    }
-                }
-            });
-        }
-        return Optional.ofNullable(storagePreferenceId.get());
-    }
-
-    public Optional<String> getDestinationStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
-        DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
-                .setAccessToken(entity.getAuthToken())
-                .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
-                .setAuthenticatedUser(AuthenticatedUser.newBuilder()
-                        .setUsername(entity.getOwnerId())
-                        .setTenantId(entity.getTenantId())
-                        .build())
-                .build();
-        FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
-                .setAuthToken(serviceAuthToken)
-                .build();
-        FindTransferMappingsResponse response = storagePreferenceServiceBlockingStub.getTransferMappings(request);
-        List<TransferMapping> transferMappingList = response.getMappingsList();
-        AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
-        if (!transferMappingList.isEmpty()) {
-            transferMappingList.forEach(transferMapping -> {
-                if (transferMapping.getDestinationStoragePreference().getStorageCase()
-                        .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
-                    if (transferMapping.getDestinationStoragePreference().getSshStoragePreference()
-                            .getStorage().getHostName().equals(hostname)) {
-                        storagePreferenceId
-                                .set(transferMapping.getDestinationStoragePreference()
-                                        .getSshStoragePreference().getStoragePreferenceId());
-                    }
-                }
-            });
-        }
-        return Optional.ofNullable(storagePreferenceId.get());
-    }
+//    public Optional<String> getSourceStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
+//        DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+//                .setAccessToken(entity.getAuthToken())
+//                .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+//                .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+//                        .setUsername(entity.getOwnerId())
+//                        .setTenantId(entity.getTenantId())
+//                        .build())
+//                .build();
+//        FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
+//                .setAuthToken(serviceAuthToken)
+//                .build();
+//        FindTransferMappingsResponse response = storagePreferenceServiceBlockingStub.getTransferMappings(request);
+//        List<TransferMapping> transferMappingList = response.getMappingsList();
+//        AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
+//        if (!transferMappingList.isEmpty()) {
+//            transferMappingList.forEach(transferMapping -> {
+//                if (transferMapping.getSourceStoragePreference().getStorageCase()
+//                        .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
+//                    if (transferMapping.getSourceStoragePreference().getSshStoragePreference()
+//                            .getStorage().getHostName().equals(hostname)) {
+//                        storagePreferenceId
+//                                .set(transferMapping.getSourceStoragePreference()
+//                                        .getSshStoragePreference().getStoragePreferenceId());
+//                    }
+//                }
+//            });
+//        }
+//        return Optional.ofNullable(storagePreferenceId.get());
+//    }
+//
+//    public Optional<String> getDestinationStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
+//        DRMSServiceAuthToken serviceAuthToken = DRMSServiceAuthToken.newBuilder()
+//                .setAccessToken(entity.getAuthToken())
+//                .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+//                .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+//                        .setUsername(entity.getOwnerId())
+//                        .setTenantId(entity.getTenantId())
+//                        .build())
+//                .build();
+//        FindTransferMappingsRequest request = FindTransferMappingsRequest.newBuilder()
+//                .setAuthToken(serviceAuthToken)
+//                .build();
+//        FindTransferMappingsResponse response = storagePreferenceServiceBlockingStub.getTransferMappings(request);
+//        List<TransferMapping> transferMappingList = response.getMappingsList();
+//        AtomicReference<String> storagePreferenceId = new AtomicReference<>(null);
+//        if (!transferMappingList.isEmpty()) {
+//            transferMappingList.forEach(transferMapping -> {
+//                if (transferMapping.getDestinationStoragePreference().getStorageCase()
+//                        .equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
+//                    if (transferMapping.getDestinationStoragePreference().getSshStoragePreference()
+//                            .getStorage().getHostName().equals(hostname)) {
+//                        storagePreferenceId
+//                                .set(transferMapping.getDestinationStoragePreference()
+//                                        .getSshStoragePreference().getStoragePreferenceId());
+//                    }
+//                }
+//            });
+//        }
+//        return Optional.ofNullable(storagePreferenceId.get());
+//    }
 
 
     public Optional<GenericResource> createResource(DataOrchestratorEventRepository repository, DataOrchestratorEntity entity,
diff --git a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
index 82e554c..013ee59 100644
--- a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
+++ b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
@@ -89,7 +89,8 @@ public class OutboundEventProcessor implements MessageProcessor<Configuration> {
             String tail = resourcePath.substring(resourcePath.indexOf(ownerId));
             String[] collections = tail.split("/");
 
-            Optional<String> optionalStorPref = drmsConnector.getSourceStoragePreferenceId(entity, entity.getHostName());
+//            Optional<String> optionalStorPref = drmsConnector.getSourceStoragePreferenceId(entity, entity.getHostName());
+            Optional<String> optionalStorPref = null;
             if (optionalStorPref.isEmpty()) {
                 entity.setEventStatus(EventStatus.ERRORED.name());
                 entity.setError("StoragePreference not found for host: " + entity.getHostName());
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/Client.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/Client.java
index e0e9428..9611fee 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/Client.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/Client.java
@@ -23,7 +23,7 @@ import io.grpc.ManagedChannelBuilder;
 import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
 import org.apache.airavata.datalake.drms.resource.GenericResource;
 import org.apache.airavata.datalake.drms.storage.*;
-import org.apache.airavata.datalake.drms.storage.preference.ssh.SSHStoragePreference;
+import org.apache.airavata.datalake.drms.storage.ssh.SSHStorage;
 import org.apache.custos.clients.CustosClientProvider;
 import org.apache.custos.identity.management.client.IdentityManagementClient;
 import org.json.JSONObject;
@@ -55,7 +55,7 @@ public class Client {
 
 //        StorageCreateRequest request = StorageCreateRequest.newBuilder().setAuthToken(authToken).
 //                setStorage(AnyStorage.newBuilder().setSshStorage(SSHStorage.newBuilder()
-//                        .setStorageId("qwerft-rftgyhu-oplmnj")
+//                        .setStorageId("testing.com")
 //                        .setHostName("localhost")
 //                        .setPort(3565)
 //                        .build())
@@ -116,31 +116,39 @@ public class Client {
         TransferMapping transferMapping = TransferMapping.newBuilder()
                 .setUserId("isjarana@iu.edu")
                 .setTransferScope(TransferScope.GLOBAL)
-                .setDestinationStoragePreference(AnyStoragePreference.newBuilder()
-                        .setSshStoragePreference(SSHStoragePreference.newBuilder()
-                                .setStoragePreferenceId("ssh_storage_preference").build()))
-                .setSourceStoragePreference(AnyStoragePreference.newBuilder()
-                        .setSshStoragePreference(SSHStoragePreference.newBuilder()
-                                .setStoragePreferenceId("ssh_storage_preference_2").build()))
+                .setSourceStorage(AnyStorage
+                        .newBuilder()
+                        .setSshStorage(SSHStorage.newBuilder().setStorageId("testing.com")
+                                .build())
+                        .build())
+                .setDestinationStorage(AnyStorage
+                        .newBuilder()
+                        .setSshStorage(SSHStorage.newBuilder().setStorageId("qwerft-rftgyhu-oplmnj")
+                                .build())
+                        .build())
                 .build();
-
+//
         CreateTransferMappingRequest request = CreateTransferMappingRequest.newBuilder()
                 .setAuthToken(authToken)
                 .setTransferMapping(transferMapping)
                 .build();
 
+//        resourceClient.createTransferMapping(request);
+//
         FindTransferMappingsRequest findTransferMappingsRequest = FindTransferMappingsRequest.newBuilder()
                 .setAuthToken(authToken)
                 .build();
-
+//        resourceClient.getTransferMappings(findTransferMappingsRequest);
+//
         DeleteTransferMappingRequest transferMappingRequest = DeleteTransferMappingRequest.newBuilder()
                 .setAuthToken(authToken)
-                .setId("ssh_storage_preference_2_ssh_storage_preference")
+                .setId("testing.com_qwerft-rftgyhu-oplmnj")
                 .build();
+        resourceClient.deleteTransferMappings(transferMappingRequest);
 
 //        storagePreferenceServiceBlockingStub.deleteTransferMappings(transferMappingRequest);
 
-        storagePreferenceServiceBlockingStub.createTransferMapping(request);
+//        storagePreferenceServiceBlockingStub.createTransferMapping(request);
         ResourceServiceGrpc.ResourceServiceBlockingStub resourceServiceBlockingStub = ResourceServiceGrpc.newBlockingStub(channel);
 
 //        ResourceSearchQuery query = ResourceSearchQuery.newBuilder().setField("type").setValue("COLLECTION").build();
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
index 7221e6d..57858f9 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
@@ -285,177 +285,8 @@ public class StoragePreferenceServiceHandler extends StoragePreferenceServiceGrp
         }
     }
 
-    @Override
-    public void createTransferMapping(CreateTransferMappingRequest request, StreamObserver<CreateTransferMappingResponse> responseObserver) {
-        try {
-            AuthenticatedUser authenticatedUser = request.getAuthToken().getAuthenticatedUser();
-            AnyStoragePreference sourceStoragePreference = request.getTransferMapping().getSourceStoragePreference();
-            AnyStoragePreference destinationStoragePreference = request.getTransferMapping().getDestinationStoragePreference();
-            String sourceId = getStorageId(sourceStoragePreference);
-            String destinationId = getStorageId(destinationStoragePreference);
-
-            TransferScope scope = request.getTransferMapping().getTransferScope();
-            Map<String, Object> properties = new HashMap<>();
-            Map<String, Object> props = new HashMap<>();
-            props.put("tenantId", authenticatedUser.getTenantId());
-            props.put("owner", authenticatedUser.getUsername());
-            props.put("srcStoragePreferenceId", sourceId);
-            props.put("dstStoragePreferenceId", destinationId);
-            String entityId = sourceId + "_" + destinationId;
-            if (scope.equals(TransferScope.GLOBAL)) {
-                props.put("scope", TransferScope.GLOBAL.name());
-            } else {
-                props.put("scope", TransferScope.USER.name());
-            }
-            properties.put("props", props);
-            properties.put("tenantId", authenticatedUser.getTenantId());
-            properties.put("entityId", entityId);
-            properties.put("username", authenticatedUser.getUsername());
-            properties.put("srcStoragePreferenceId", sourceId);
-            properties.put("dstStoragePreferenceId", destinationId);
-            properties.put("owner", authenticatedUser.getUsername());
-
-
-            if (hasAccess(authenticatedUser.getUsername(), authenticatedUser.getTenantId(), sourceId) &&
-                    hasAccess(authenticatedUser.getUsername(), authenticatedUser.getTenantId(), destinationId)) {
-                String query = " Match (u:User), (srcSp:StoragePreference), (dstSp:StoragePreference) where " +
-                        " u.username=$username AND u.tenantId=$tenantId AND " +
-                        "srcSp.storagePreferenceId=$srcStoragePreferenceId AND " +
-                        "srcSp.tenantId = $tenantId AND dstSp.storagePreferenceId=$dstStoragePreferenceId " +
-                        "AND dstSp.tenantId =$tenantId " +
-                        " Merge (u)-[:HAS_TRANSFER_MAPPING]->(tm:TransferMapping{entityId:$entityId, tenantId:$tenantId, " +
-                        "srcStoragePreferenceId:$srcStoragePreferenceId," +
-                        "dstStoragePreferenceId:$dstStoragePreferenceId,owner:$owner}) set tm += $props" +
-                        " Merge (tm)<-[:TRANSFER_OUT]-(srcSp)" +
-                        " Merge (tm)-[:TRANSFER_IN]->(dstSp) return (tm)";
-                this.neo4JConnector.runTransactionalQuery(properties, query);
-
-                String searchQuery = " Match (srcStr:Storage)<-[:CHILD_OF]-" +
-                        "(srcSp:StoragePreference)-[:TRANSFER_OUT]->(tm:TransferMapping)" +
-                        "-[:TRANSFER_IN]->(dstSp:StoragePreference)-[:CHILD_OF]->(dstStr:Storage)  where " +
-                        " tm.entityId=$entityId AND tm.tenantId=$tenantId return srcStr, srcSp, dstStr, dstSp, tm";
-                List<Record> records = this.neo4JConnector.searchNodes(properties, searchQuery);
-                if (!records.isEmpty()) {
-                    List<TransferMapping> transferMappings = TransferMappingDeserializer.deserializeList(records);
-                    if (!transferMappings.isEmpty()) {
-                        CreateTransferMappingResponse response = CreateTransferMappingResponse
-                                .newBuilder()
-                                .setTransferMapping(transferMappings.get(0))
-                                .build();
-                        responseObserver.onNext(response);
-                        responseObserver.onCompleted();
-                    } else {
-                        String msg = "Errored while creating transfer mapping; Message:";
-                        logger.error("Errored while creating transfer mapping; Message:");
-                        responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
-                    }
-                } else {
-                    String msg = "Errored while creating transfer mapping; Message:";
-                    logger.error("Errored while creating transfer mapping; Message:");
-                    responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
-                }
-            } else {
-                String msg = "User does not have permission to create mapping ";
-                logger.error("User does not have permission to create mapping ");
-                responseObserver.onError(Status.PERMISSION_DENIED.withDescription(msg).asRuntimeException());
-
-            }
-        } catch (Exception e) {
-            String msg = "Errored while creating transfer mapping; Message:" + e.getMessage();
-            logger.error("Errored while creating transfer mapping; Message: {}", e.getMessage(), e);
-            responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
-        }
-    }
-
-    @Override
-    public void getTransferMappings(FindTransferMappingsRequest request, StreamObserver<FindTransferMappingsResponse> responseObserver) {
-        try {
-            List<TransferMapping> transferMappings = new ArrayList<>();
-            AuthenticatedUser authenticatedUser = request.getAuthToken().getAuthenticatedUser();
-            Map<String, Object> properties = new HashMap<>();
-            properties.put("username", authenticatedUser.getUsername());
-            properties.put("tenantId", authenticatedUser.getTenantId());
-            properties.put("scope", TransferScope.USER.name());
-            String query = " MATCH (u:User)-[:HAS_TRANSFER_MAPPING]->(t:TransferMapping{scope:$scope}) where u.username = $username AND u.tenantId = $tenantId" +
-                    " Match (srcStr:Storage)<-[:CHILD_OF]-(srcSp:StoragePreference)-[:TRANSFER_OUT]->(t)-[:TRANSFER_IN]->(dstSp:StoragePreference)-[:CHILD_OF]->(dstStr:Storage)" +
-                    " return srcStr, srcSp, dstStr, dstSp, t";
-            List<Record> records = this.neo4JConnector.searchNodes(properties, query);
-            properties.put("scope", TransferScope.GLOBAL.name());
-            String queryFetchGlobal = "Match (srcStr:Storage)<-[:CHILD_OF]-" +
-                    "(srcSp:StoragePreference)-[:TRANSFER_OUT]->(t:TransferMapping{scope:$scope, tenantId:$tenantId})-[:TRANSFER_IN]->(dstSp:StoragePreference)-[:CHILD_OF]->(dstStr:Storage)" +
-                    " return srcStr, srcSp, dstStr, dstSp, t";
-            List<Record> globalRecords = this.neo4JConnector.searchNodes(properties, queryFetchGlobal);
-            if (!records.isEmpty()) {
-                transferMappings = TransferMappingDeserializer.deserializeList(records);
-            }
-            if (!globalRecords.isEmpty()) {
-                transferMappings.addAll(TransferMappingDeserializer.deserializeList(globalRecords));
-            }
-            FindTransferMappingsResponse findTransferMappingsResponse = FindTransferMappingsResponse
-                    .newBuilder()
-                    .addAllMappings(transferMappings)
-                    .build();
-            responseObserver.onNext(findTransferMappingsResponse);
-            responseObserver.onCompleted();
-
-        } catch (Exception ex) {
-            String msg = "Errored while fetching transfer mappings, Message:" + ex.getMessage();
-            logger.error("Errored while fetching transfer mappings, Message: {}", ex.getMessage(), ex);
-            responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
-        }
-    }
-
-    @Override
-    public void deleteTransferMappings(DeleteTransferMappingRequest request, StreamObserver<Empty> responseObserver) {
-        try {
-            AuthenticatedUser authenticatedUser = request.getAuthToken().getAuthenticatedUser();
-            String transferMappingId = request.getId();
-            Map<String, Object> properties = new HashMap<>();
-            properties.put("username", authenticatedUser.getUsername());
-            properties.put("tenantId", authenticatedUser.getTenantId());
-            properties.put("entityId", transferMappingId);
-            String query = " MATCH (u:User)-[:HAS_TRANSFER_MAPPING]->" +
-                    "(t:TransferMapping{entityId:$entityId})" +
-                    " where u.username = $username AND u.tenantId = $tenantId detach delete t";
-            this.neo4JConnector.runTransactionalQuery(properties, query);
-            responseObserver.onNext(Empty.newBuilder().build());
-            responseObserver.onCompleted();
 
-        } catch (Exception ex) {
-            String msg = "Errored while delete transfer mappings, Message:" + ex.getMessage();
-            logger.error("Errored while delete transfer mappings, Message: {}", ex.getMessage(), ex);
-            responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
-        }
-    }
 
-    private boolean hasAccess(String username, String tenantId, String storagePrefId) throws Exception {
-        Map<String, Object> userProps = new HashMap<>();
-        userProps.put("username", username);
-        userProps.put("tenantId", tenantId);
-        userProps.put("entityId", storagePrefId);
 
-        List<Record> records = this.neo4JConnector.searchNodes(userProps,
-                " MATCH (u:User) where u.username = $username AND u.tenantId = $tenantId" +
-                        " OPTIONAL MATCH (u)<-[:SHARED_WITH]-(s1:Storage)<-[:CHILD_OF]->(sp1:StoragePreference{entityId:$entityId})" +
-                        " OPTIONAL MATCH (cg:Group)-[:CHILD_OF*]->(g:Group)<-[:MEMBER_OF]-(u)" +
-                        " OPTIONAL MATCH (sp2:StoragePreference{entityId:$entityId})-[:CHILD_OF]->(s2:Storage)-[:SHARED_WITH]->(cg) " +
-                        " OPTIONAL MATCH (sp3:StoragePreference {entityId:$entityId})-[:CHILD_OF]->(s3:Storage)-[:SHARED_WITH]->(g)" +
-                        " OPTIONAL MATCH (s4:Storage)<-[:CHILD_OF]->(sp4:StoragePreference{entityId:$entityId})-[:SHARED_WITH]->(u)" +
-                        " OPTIONAL MATCH (s5:Storage)<-[:CHILD_OF]->(sp5:StoragePreference{entityId:$entityId})-[:SHARED_WITH]->(cg)" +
-                        " OPTIONAL MATCH (s6:Storage)<-[:CHILD_OF]->(sp6:StoragePreference{entityId:$entityId})-[:SHARED_WITH]->(g)" +
-                        " return distinct s1, sp1, s2, sp2, s3, sp3, s4,sp4, s5,sp5, s6,sp6");
-        if (!records.isEmpty()) {
-            return true;
-        }
-        return false;
-    }
 
-    private String getStorageId(AnyStoragePreference storage) {
-        if (storage.getStorageCase()
-                .equals(AnyStoragePreference.StorageCase.S3_STORAGE_PREFERENCE)) {
-            return storage.getS3StoragePreference().getStoragePreferenceId();
-        } else {
-            return storage.getSshStoragePreference().getStoragePreferenceId();
-        }
-    }
 }
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
index 14abb21..a9dc2d0 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
@@ -17,6 +17,7 @@
 package org.apache.airavata.drms.api.handlers;
 
 import com.google.protobuf.Empty;
+import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import org.apache.airavata.datalake.drms.AuthenticatedUser;
 import org.apache.airavata.datalake.drms.storage.*;
@@ -24,6 +25,7 @@ import org.apache.airavata.drms.api.utils.CustosUtils;
 import org.apache.airavata.drms.core.Neo4JConnector;
 import org.apache.airavata.drms.core.constants.StorageConstants;
 import org.apache.airavata.drms.core.deserializer.AnyStorageDeserializer;
+import org.apache.airavata.drms.core.deserializer.TransferMappingDeserializer;
 import org.apache.airavata.drms.core.serializer.AnyStorageSerializer;
 import org.apache.custos.clients.CustosClientProvider;
 import org.lognet.springboot.grpc.GRpcService;
@@ -32,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -187,4 +190,178 @@ public class StorageServiceHandler extends StorageServiceGrpc.StorageServiceImpl
             responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
         }
     }
+
+
+    @Override
+    public void createTransferMapping(CreateTransferMappingRequest request, StreamObserver<CreateTransferMappingResponse> responseObserver) {
+        try {
+            AuthenticatedUser authenticatedUser = request.getAuthToken().getAuthenticatedUser();
+            AnyStorage sourceStoragePreference = request.getTransferMapping().getSourceStorage();
+            AnyStorage destinationStoragePreference = request.getTransferMapping().getDestinationStorage();
+            String sourceId = getStorageId(sourceStoragePreference);
+            String destinationId = getStorageId(destinationStoragePreference);
+
+            TransferScope scope = request.getTransferMapping().getTransferScope();
+            Map<String, Object> properties = new HashMap<>();
+            Map<String, Object> props = new HashMap<>();
+            props.put("tenantId", authenticatedUser.getTenantId());
+            props.put("owner", authenticatedUser.getUsername());
+            props.put("srcStorageId", sourceId);
+            props.put("dstStorageId", destinationId);
+            String entityId = sourceId + "_" + destinationId;
+            if (scope.equals(TransferScope.GLOBAL)) {
+                props.put("scope", TransferScope.GLOBAL.name());
+            } else {
+                props.put("scope", TransferScope.USER.name());
+            }
+            properties.put("props", props);
+            properties.put("tenantId", authenticatedUser.getTenantId());
+            properties.put("entityId", entityId);
+            properties.put("username", authenticatedUser.getUsername());
+            properties.put("srcStorageId", sourceId);
+            properties.put("dstStorageId", destinationId);
+            properties.put("owner", authenticatedUser.getUsername());
+
+
+            if (hasAccess(authenticatedUser.getUsername(), authenticatedUser.getTenantId(), sourceId) &&
+                    hasAccess(authenticatedUser.getUsername(), authenticatedUser.getTenantId(), destinationId)) {
+                String query = " Match (u:User), (srcSp:Storage), (dstSp:Storage) where " +
+                        " u.username=$username AND u.tenantId=$tenantId AND " +
+                        "srcSp.storageId=$srcStorageId AND " +
+                        "srcSp.tenantId = $tenantId AND dstSp.storageId=$dstStorageId " +
+                        "AND dstSp.tenantId =$tenantId " +
+                        " Merge (u)-[:HAS_TRANSFER_MAPPING]->(tm:TransferMapping{entityId:$entityId, tenantId:$tenantId, " +
+                        "srcStorageId:$srcStorageId," +
+                        "dstStorageId:$dstStorageId,owner:$owner}) set tm += $props" +
+                        " Merge (tm)<-[:TRANSFER_OUT]-(srcSp)" +
+                        " Merge (tm)-[:TRANSFER_IN]->(dstSp) return (tm)";
+                this.neo4JConnector.runTransactionalQuery(properties, query);
+
+                String searchQuery = " Match (srcStr:Storage)-[:TRANSFER_OUT]->(tm:TransferMapping)" +
+                        "-[:TRANSFER_IN]->(dstStr:Storage)  where " +
+                        " tm.entityId=$entityId AND tm.tenantId=$tenantId return srcStr,  dstStr,  tm";
+                List<Record> records = this.neo4JConnector.searchNodes(properties, searchQuery);
+                if (!records.isEmpty()) {
+                    List<TransferMapping> transferMappings = TransferMappingDeserializer.deserializeList(records);
+                    if (!transferMappings.isEmpty()) {
+                        CreateTransferMappingResponse response = CreateTransferMappingResponse
+                                .newBuilder()
+                                .setTransferMapping(transferMappings.get(0))
+                                .build();
+                        responseObserver.onNext(response);
+                        responseObserver.onCompleted();
+                    } else {
+                        String msg = "Errored while creating transfer mapping; Message:";
+                        logger.error("Errored while creating transfer mapping; Message:");
+                        responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
+                    }
+                } else {
+                    String msg = "Errored while creating transfer mapping; Message:";
+                    logger.error("Errored while creating transfer mapping; Message:");
+                    responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
+                }
+            } else {
+                String msg = "User does not have permission to create mapping ";
+                logger.error("User does not have permission to create mapping ");
+                responseObserver.onError(Status.PERMISSION_DENIED.withDescription(msg).asRuntimeException());
+
+            }
+        } catch (Exception e) {
+            String msg = "Errored while creating transfer mapping; Message:" + e.getMessage();
+            logger.error("Errored while creating transfer mapping; Message: {}", e.getMessage(), e);
+            responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
+        }
+    }
+
+    @Override
+    public void getTransferMappings(FindTransferMappingsRequest request, StreamObserver<FindTransferMappingsResponse> responseObserver) {
+        try {
+            List<TransferMapping> transferMappings = new ArrayList<>();
+            AuthenticatedUser authenticatedUser = request.getAuthToken().getAuthenticatedUser();
+            Map<String, Object> properties = new HashMap<>();
+            properties.put("username", authenticatedUser.getUsername());
+            properties.put("tenantId", authenticatedUser.getTenantId());
+            properties.put("scope", TransferScope.USER.name());
+            String query = " MATCH (u:User)-[:HAS_TRANSFER_MAPPING]->(t:TransferMapping{scope:$scope}) where u.username = $username AND u.tenantId = $tenantId" +
+                    " Match (srcStr:Storage)-[:TRANSFER_OUT]->(t)-[:TRANSFER_IN]->(dstStr:Storage)" +
+                    " return srcStr,  dstStr,  t";
+            List<Record> records = this.neo4JConnector.searchNodes(properties, query);
+            properties.put("scope", TransferScope.GLOBAL.name());
+            String queryFetchGlobal = "Match (srcStr:Storage)<-[:TRANSFER_OUT]->(t:TransferMapping{scope:$scope, tenantId:$tenantId})-[:TRANSFER_IN]->(dstStr:Storage)" +
+                    " return srcStr,  dstStr,  t";
+            List<Record> globalRecords = this.neo4JConnector.searchNodes(properties, queryFetchGlobal);
+            if (!records.isEmpty()) {
+                transferMappings = TransferMappingDeserializer.deserializeList(records);
+            }
+            if (!globalRecords.isEmpty()) {
+                transferMappings.addAll(TransferMappingDeserializer.deserializeList(globalRecords));
+            }
+            FindTransferMappingsResponse findTransferMappingsResponse = FindTransferMappingsResponse
+                    .newBuilder()
+                    .addAllMappings(transferMappings)
+                    .build();
+            responseObserver.onNext(findTransferMappingsResponse);
+            responseObserver.onCompleted();
+
+        } catch (Exception ex) {
+            String msg = "Errored while fetching transfer mappings, Message:" + ex.getMessage();
+            logger.error("Errored while fetching transfer mappings, Message: {}", ex.getMessage(), ex);
+            responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
+        }
+    }
+
+    @Override
+    public void deleteTransferMappings(DeleteTransferMappingRequest request, StreamObserver<Empty> responseObserver) {
+        try {
+            AuthenticatedUser authenticatedUser = request.getAuthToken().getAuthenticatedUser();
+            String transferMappingId = request.getId();
+            Map<String, Object> properties = new HashMap<>();
+            properties.put("username", authenticatedUser.getUsername());
+            properties.put("tenantId", authenticatedUser.getTenantId());
+            properties.put("entityId", transferMappingId);
+            String query = " MATCH (u:User)-[:HAS_TRANSFER_MAPPING]->" +
+                    "(t:TransferMapping{entityId:$entityId})" +
+                    " where u.username = $username AND u.tenantId = $tenantId detach delete t";
+            this.neo4JConnector.runTransactionalQuery(properties, query);
+            responseObserver.onNext(Empty.newBuilder().build());
+            responseObserver.onCompleted();
+
+        } catch (Exception ex) {
+            String msg = "Errored while delete transfer mappings, Message:" + ex.getMessage();
+            logger.error("Errored while delete transfer mappings, Message: {}", ex.getMessage(), ex);
+            responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
+        }
+    }
+
+
+    private String getStorageId(AnyStorage storage) {
+        if (storage.getStorageCase()
+                .equals(AnyStorage.StorageCase.S3_STORAGE)) {
+            return storage.getS3Storage().getStorageId();
+        } else {
+            return storage.getSshStorage().getStorageId();
+        }
+    }
+
+    private boolean hasAccess(String username, String tenantId, String storageId) throws Exception {
+        Map<String, Object> userProps = new HashMap<>();
+        userProps.put("username", username);
+        userProps.put("tenantId", tenantId);
+        userProps.put("entityId", storageId);
+
+        List<Record> records = this.neo4JConnector.searchNodes(userProps,
+                " MATCH (u:User) where u.username = $username AND u.tenantId = $tenantId" +
+                        " OPTIONAL MATCH (u)<-[:SHARED_WITH]-(s1:Storage{entityId:$entityId})<-[:CHILD_OF]->(sp1:StoragePreference)" +
+                        " OPTIONAL MATCH (cg:Group)-[:CHILD_OF*]->(g:Group)<-[:MEMBER_OF]-(u)" +
+                        " OPTIONAL MATCH (sp2:StoragePreference)-[:CHILD_OF]->(s2:Storage{entityId:$entityId})-[:SHARED_WITH]->(cg) " +
+                        " OPTIONAL MATCH (sp3:StoragePreference )-[:CHILD_OF]->(s3:Storage{entityId:$entityId})-[:SHARED_WITH]->(g)" +
+                        " OPTIONAL MATCH (s4:Storage{entityId:$entityId})<-[:CHILD_OF]->(sp4:StoragePreference)-[:SHARED_WITH]->(u)" +
+                        " OPTIONAL MATCH (s5:Storage{entityId:$entityId})<-[:CHILD_OF]->(sp5:StoragePreference)-[:SHARED_WITH]->(cg)" +
+                        " OPTIONAL MATCH (s6:Storage{entityId:$entityId})<-[:CHILD_OF]->(sp6:StoragePreference)-[:SHARED_WITH]->(g)" +
+                        " return distinct s1, sp1, s2, sp2, s3, sp3, s4,sp4, s5,sp5, s6,sp6");
+        if (!records.isEmpty()) {
+            return true;
+        }
+        return false;
+    }
 }
diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/TransferMappingDeserializer.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/TransferMappingDeserializer.java
index 461bbe8..51e9a57 100644
--- a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/TransferMappingDeserializer.java
+++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/TransferMappingDeserializer.java
@@ -1,7 +1,6 @@
 package org.apache.airavata.drms.core.deserializer;
 
 import org.apache.airavata.datalake.drms.storage.AnyStorage;
-import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
 import org.apache.airavata.datalake.drms.storage.TransferMapping;
 import org.apache.airavata.datalake.drms.storage.TransferScope;
 import org.neo4j.driver.Record;
@@ -20,29 +19,22 @@ public class TransferMappingDeserializer {
             InternalRecord internalRecord = (InternalRecord) record;
             List<Value> values = internalRecord.values();
 
-            if (values.size() == 5) {
+            if (values.size() == 3) {
                 Value srcStr = values.get(0);
-                Value srcSpr = values.get(1);
-                Value dstStr = values.get(2);
-                Value dstSp = values.get(3);
-                Value tm = values.get(4);
+                Value dstStr = values.get(1);
+                Value tm = values.get(2);
 
-                if (!srcStr.isNull() && !srcSpr.isNull() && !tm.isNull()
-                        && !dstStr.isNull() && !dstSp.isNull()) {
+                if (!srcStr.isNull() && !tm.isNull()
+                        && !dstStr.isNull()) {
                     AnyStorage storage = AnyStorageDeserializer.deriveStorageFromMap(srcStr.asMap());
-                    AnyStoragePreference srcPreference = AnyStoragePreferenceDeserializer
-                            .deriveStoragePrefFromMap(srcSpr.asMap(), storage);
-
                     AnyStorage dstStorage = AnyStorageDeserializer.deriveStorageFromMap(dstStr.asMap());
-                    AnyStoragePreference dstPreference = AnyStoragePreferenceDeserializer
-                            .deriveStoragePrefFromMap(dstSp.asMap(), dstStorage);
 
                     Map<String, Object> map = tm.asMap();
                     TransferMapping transferMapping = TransferMapping.newBuilder()
                             .setTransferScope(TransferScope.valueOf(map.get("scope").toString()))
                             .setId(map.get("entityId").toString())
-                            .setSourceStoragePreference(srcPreference)
-                            .setDestinationStoragePreference(dstPreference)
+                            .setSourceStorage(storage)
+                            .setDestinationStorage(dstStorage)
                             .setUserId(map.get("owner").toString())
                             .build();
 
diff --git a/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb b/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb
index 21a39a1..0227833 100644
Binary files a/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb and b/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb differ
diff --git a/data-resource-management-service/drms-stubs/src/main/proto/preference/StoragePreferenceService.proto b/data-resource-management-service/drms-stubs/src/main/proto/preference/StoragePreferenceService.proto
index 6c2c6ef..0b0bad2 100644
--- a/data-resource-management-service/drms-stubs/src/main/proto/preference/StoragePreferenceService.proto
+++ b/data-resource-management-service/drms-stubs/src/main/proto/preference/StoragePreferenceService.proto
@@ -81,43 +81,9 @@ message StoragePreferenceSearchResponse {
     repeated AnyStoragePreference storages_preference = 2;
 }
 
-enum TransferScope {
-    UNKNOWN = 0;
-    GLOBAL = 1;
-    USER = 2;
-}
-
-message TransferMapping {
-    string id = 1;
-    string user_id = 2;
-    AnyStoragePreference source_storage_preference = 3;
-    AnyStoragePreference destination_storage_preference = 4;
-    TransferScope transfer_scope = 5;
-}
-
-message CreateTransferMappingRequest {
-    org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
-    TransferMapping transfer_mapping = 2;
-}
 
-message CreateTransferMappingResponse {
-    TransferMapping transfer_mapping = 1;
-}
 
-message FindTransferMappingsRequest {
-    org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
-    string id = 2;
-    TransferScope transfer_scope = 3;
-}
 
-message FindTransferMappingsResponse {
-    repeated TransferMapping mappings = 1;
-}
-
-message DeleteTransferMappingRequest {
-    org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
-    string id = 2;
-}
 
 
 service StoragePreferenceService {
@@ -152,21 +118,5 @@ service StoragePreferenceService {
     };
     }
 
-    rpc createTransferMapping (CreateTransferMappingRequest) returns (CreateTransferMappingResponse) {
-        option (google.api.http) = {
-      post: "/v1.0/api/drms/storagePreference/transferMapping"
-    };
-    }
 
-    rpc getTransferMappings (FindTransferMappingsRequest) returns (FindTransferMappingsResponse) {
-        option (google.api.http) = {
-      get: "/v1.0/api/drms/storagePreference/transferMapping"
-    };
-    }
-
-    rpc deleteTransferMappings (DeleteTransferMappingRequest) returns (google.protobuf.Empty) {
-        option (google.api.http) = {
-      delete: "/v1.0/api/drms/storagePreference/transferMapping"
-    };
-    }
 }
\ No newline at end of file
diff --git a/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto b/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
index abab38a..3f5b3a2 100644
--- a/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
+++ b/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
@@ -88,6 +88,45 @@ message AddStorageMetadataRequest {
   string value = 4;
 }
 
+enum TransferScope {
+  UNKNOWN = 0;
+  GLOBAL = 1;
+  USER = 2;
+}
+
+message TransferMapping {
+  string id = 1;
+  string user_id = 2;
+  AnyStorage source_storage = 3;
+  AnyStorage destination_storage = 4;
+  TransferScope transfer_scope = 5;
+}
+
+
+message CreateTransferMappingRequest {
+  org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
+  TransferMapping transfer_mapping = 2;
+}
+
+message CreateTransferMappingResponse {
+  TransferMapping transfer_mapping = 1;
+}
+
+message FindTransferMappingsRequest {
+  org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
+  string id = 2;
+  TransferScope transfer_scope = 3;
+}
+
+message FindTransferMappingsResponse {
+  repeated TransferMapping mappings = 1;
+}
+
+message DeleteTransferMappingRequest {
+  org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
+  string id = 2;
+}
+
 service StorageService {
 
   rpc fetchStorage (StorageFetchRequest) returns (StorageFetchResponse) {
@@ -125,4 +164,22 @@ service StorageService {
       post: "/v1.0/api/drms/storage/metadata"
     };
   }
+
+  rpc createTransferMapping (CreateTransferMappingRequest) returns (CreateTransferMappingResponse) {
+    option (google.api.http) = {
+      post: "/v1.0/api/drms/storagePreference/transferMapping"
+    };
+  }
+
+  rpc getTransferMappings (FindTransferMappingsRequest) returns (FindTransferMappingsResponse) {
+    option (google.api.http) = {
+      get: "/v1.0/api/drms/storagePreference/transferMapping"
+    };
+  }
+
+  rpc deleteTransferMappings (DeleteTransferMappingRequest) returns (google.protobuf.Empty) {
+    option (google.api.http) = {
+      delete: "/v1.0/api/drms/storagePreference/transferMapping"
+    };
+  }
 }
\ No newline at end of file