You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/06/01 03:03:23 UTC

[airavata-data-lake] 16/42: Implementing generic resource handler and metadata ingestion

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git

commit b6b5cc43be4931b427958e2863d6865b45b5c6ca
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Wed Mar 24 13:03:53 2021 -0400

    Implementing generic resource handler and metadata ingestion
---
 .../drms/api/handlers/ResourceServiceHandler.java  | 99 +++++++++++++++++++++-
 .../handlers/StoragePreferenceServiceHandler.java  |  6 +-
 .../drms/api/handlers/StorageServiceHandler.java   |  2 +
 .../drms/core/constants/ResourceConstants.java     | 21 +++++
 .../deserializer/GenericResourceDeserializer.java  | 89 +++++++++++++++++++
 .../main/proto/resource/DRMSResourceService.proto  | 35 +++++++-
 6 files changed, 247 insertions(+), 5 deletions(-)

diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
index 4e34e7a..34efc05 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
@@ -17,15 +17,78 @@
 package org.apache.airavata.drms.api.handlers;
 
 import com.google.protobuf.Empty;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
 import io.grpc.stub.StreamObserver;
+import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
+import org.apache.airavata.datalake.drms.groups.FetchCurrentUserRequest;
+import org.apache.airavata.datalake.drms.groups.FetchCurrentUserResponse;
+import org.apache.airavata.datalake.drms.groups.GroupServiceGrpc;
+import org.apache.airavata.datalake.drms.groups.User;
+import org.apache.airavata.datalake.drms.resource.GenericResource;
 import org.apache.airavata.datalake.drms.storage.*;
+import org.apache.airavata.drms.core.Neo4JConnector;
+import org.apache.airavata.drms.core.constants.ResourceConstants;
+import org.apache.airavata.drms.core.constants.StorageConstants;
+import org.apache.airavata.drms.core.deserializer.AnyStoragePreferenceDeserializer;
+import org.apache.airavata.drms.core.deserializer.GenericResourceDeserializer;
 import org.lognet.springboot.grpc.GRpcService;
+import org.neo4j.driver.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.List;
 
 @GRpcService
 public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceImplBase {
+
+    private static final Logger logger = LoggerFactory.getLogger(ResourceServiceHandler.class);
+
+    @Autowired
+    private Neo4JConnector neo4JConnector;
+
+    @org.springframework.beans.factory.annotation.Value("${group.service.host}")
+    private String groupServiceHost;
+
+    @org.springframework.beans.factory.annotation.Value("${group.service.port}")
+    private int groupServicePort;
+
+    private User getUser(DRMSServiceAuthToken authToken) {
+        ManagedChannel channel = ManagedChannelBuilder.forAddress(groupServiceHost, groupServicePort).usePlaintext().build();
+        GroupServiceGrpc.GroupServiceBlockingStub groupClient = GroupServiceGrpc.newBlockingStub(channel);
+        FetchCurrentUserResponse userResponse = groupClient.fetchCurrentUser(
+                FetchCurrentUserRequest.newBuilder().setAuthToken(authToken).build());
+        return userResponse.getUser();
+    }
+
     @Override
     public void fetchResource(ResourceFetchRequest request, StreamObserver<ResourceFetchResponse> responseObserver) {
-        super.fetchResource(request, responseObserver);
+        User callUser = getUser(request.getAuthToken());
+
+        // TODO review (u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp),
+        List<Record> records = this.neo4JConnector.searchNodes(
+                "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)-[r6:HAS_RESOURCE]->(res:Resource), " +
+                        "(u)-[r7:MEMBER_OF]->(g3:Group)<-[r8:SHARED_WITH]-(res) " +
+                        "where res.resourceId = '" + request.getResourceId() + "' and u.userId = '"
+                        + callUser.getUserId() + "' return distinct res, sp, s");
+
+        if (!records.isEmpty()) {
+            try {
+                List<GenericResource> genericResourceList = GenericResourceDeserializer.deserializeList(records);
+                responseObserver.onNext(ResourceFetchResponse.newBuilder().setResource(genericResourceList.get(0)).build());
+                responseObserver.onCompleted();
+            } catch (Exception e) {
+
+                logger.error("Errored while fetching resource with id {}", request.getResourceId(), e);
+                responseObserver.onError(new Exception("Errored while fetching resource with id "
+                        + request.getResourceId() + ". Msg " + e.getMessage()));
+            }
+        } else {
+            logger.error("Could not find a generic resource with id {}", request.getResourceId());
+            responseObserver.onError(new Exception("Could not find a generic resource with id "
+                    + request.getResourceId()));
+        }
     }
 
     @Override
@@ -45,6 +108,38 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
 
     @Override
     public void searchResource(ResourceSearchRequest request, StreamObserver<ResourceSearchResponse> responseObserver) {
-        super.searchResource(request, responseObserver);
+        User callUser = getUser(request.getAuthToken());
+
+        // TODO review (u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp),
+        List<Record> records = this.neo4JConnector.searchNodes(
+                "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)-[r6:HAS_RESOURCE]->(res:Resource), " +
+                        "(u)-[r7:MEMBER_OF]->(g3:Group)<-[r8:SHARED_WITH]-(res) " +
+                        "where u.userId = '" + callUser.getUserId() + "' return distinct res, sp, s");
+        try {
+            List<GenericResource> genericResourceList = GenericResourceDeserializer.deserializeList(records);
+            ResourceSearchResponse.Builder builder = ResourceSearchResponse.newBuilder();
+            builder.addAllResources(genericResourceList);
+            responseObserver.onNext(builder.build());
+            responseObserver.onCompleted();
+
+        } catch (Exception e) {
+            logger.error("Errored while searching generic resources; Message: {}", e.getMessage(), e);
+            responseObserver.onError(e);
+        }
+    }
+
+    @Override
+    public void addResourceMetadata(AddResourceMetadataRequest request, StreamObserver<Empty> responseObserver) {
+        User callUser = getUser(request.getAuthToken());
+        this.neo4JConnector.createMetadataNode(ResourceConstants.RESOURCE_LABEL, "resourceId",
+                request.getResourceId(), callUser.getUserId(),
+                request.getMetadata().getKey(), request.getMetadata().getValue());
+        responseObserver.onNext(Empty.getDefaultInstance());
+        responseObserver.onCompleted();
+    }
+
+    @Override
+    public void fetchResourceMetadata(FetchResourceMetadataRequest request, StreamObserver<FetchResourceMetadataResponse> responseObserver) {
+        super.fetchResourceMetadata(request, responseObserver);
     }
 }
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
index 3d7b44a..ab560c3 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
@@ -65,7 +65,8 @@ public class StoragePreferenceServiceHandler extends StoragePreferenceServiceGrp
         User callUser = getUser(request.getAuthToken());
 
         List<Record> records = this.neo4JConnector.searchNodes(
-                "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference) " +
+                "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference), " +
+                        "(u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp) " +
                         "where sp.storagePreferenceId = '" + request.getStoragePreferenceId() + "' and u.userId = '"
                         + callUser.getUserId() + "' return distinct sp, s");
 
@@ -108,7 +109,8 @@ public class StoragePreferenceServiceHandler extends StoragePreferenceServiceGrp
         User callUser = getUser(request.getAuthToken());
 
         List<Record> records = this.neo4JConnector.searchNodes(
-                "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)" +
+                "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference), " +
+                        "(u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp)" +
                         " where u.userId ='" + callUser.getUserId() + "' return distinct sp, s");
         try {
             List<AnyStoragePreference> storagePrefList = AnyStoragePreferenceDeserializer.deserializeList(records);
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
index a806e70..a00b7ca 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
@@ -113,6 +113,8 @@ public class StorageServiceHandler extends StorageServiceGrpc.StorageServiceImpl
         this.neo4JConnector.createMetadataNode(StorageConstants.STORAGE_LABEL, "storageId",
                 request.getStorageId(), callUser.getUserId(),
                 request.getKey(), request.getValue());
+        responseObserver.onNext(Empty.getDefaultInstance());
+        responseObserver.onCompleted();
     }
 
     @Override
diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/ResourceConstants.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/ResourceConstants.java
new file mode 100644
index 0000000..2c7e0e2
--- /dev/null
+++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/ResourceConstants.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.airavata.drms.core.constants;
+
+public class ResourceConstants {
+    public static final String RESOURCE_LABEL = "Resource";
+}
diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/GenericResourceDeserializer.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/GenericResourceDeserializer.java
new file mode 100644
index 0000000..c4f4d97
--- /dev/null
+++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/GenericResourceDeserializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.drms.core.deserializer;
+
+import org.apache.airavata.datalake.drms.resource.GenericResource;
+import org.apache.airavata.datalake.drms.storage.AnyStorage;
+import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
+import org.apache.airavata.drms.core.constants.ResourceConstants;
+import org.apache.airavata.drms.core.constants.StorageConstants;
+import org.apache.airavata.drms.core.constants.StoragePreferenceConstants;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.internal.InternalRecord;
+import org.neo4j.driver.types.Node;
+import org.springframework.beans.BeanWrapper;
+import org.springframework.beans.PropertyAccessorFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class GenericResourceDeserializer {
+
+    public static List<GenericResource> deserializeList(List<Record> neo4jRecords) throws Exception {
+        List<GenericResource> resourceList = new ArrayList<>();
+        for (Record record : neo4jRecords) {
+            InternalRecord internalRecord = (InternalRecord) record;
+            List<Value> values = internalRecord.values();
+            if (values.size() == 3) {
+                Value resourceValue = values.get(0);
+                Value prfValue = values.get(1);
+                Value stoValue = values.get(2);
+                Node resourceNode = resourceValue.asNode();
+                Node prefNode = prfValue.asNode();
+                Node stoNode = stoValue.asNode();
+                if (resourceNode.hasLabel(ResourceConstants.RESOURCE_LABEL) &&
+                        prefNode.hasLabel(StoragePreferenceConstants.STORAGE_PREFERENCE_LABEL) &&
+                        stoNode.hasLabel(StorageConstants.STORAGE_LABEL)) {
+
+                    AnyStorage storage = AnyStorageDeserializer.deriveStorageFromMap(stoNode.asMap());
+                    AnyStoragePreference preference = AnyStoragePreferenceDeserializer.deriveStoragePrefFromMap(
+                            prefNode.asMap(), storage);
+                    GenericResource genericResource = deriveGenericResourceFromMap(resourceNode.asMap(), preference);
+                    resourceList.add(genericResource);
+                }
+            }
+        }
+        return resourceList;
+    }
+
+    public static GenericResource deriveGenericResourceFromMap(Map<String, Object> fixedMap,
+                                                               AnyStoragePreference preference) throws Exception {
+
+        GenericResource.Builder genericResourceBuilder = GenericResource.newBuilder();
+        setObjectFieldsUsingMap(genericResourceBuilder, fixedMap);
+        switch (preference.getStorageCase()){
+            case S3STORAGEPREFERENCE:
+                genericResourceBuilder.setS3Preference(preference.getS3StoragePreference());
+                break;
+            case SSHSTORAGEPREFERENCE:
+                genericResourceBuilder.setSshPreference(preference.getSshStoragePreference());
+                break;
+        }
+
+        return genericResourceBuilder.build();
+    }
+
+    private static void setObjectFieldsUsingMap(Object target, Map<String, Object> values) {
+        for (String field :values.keySet()) {
+            BeanWrapper beanWrapper = PropertyAccessorFactory.forBeanPropertyAccess(target);
+            beanWrapper.setPropertyValue(field, values.get(field));
+        }
+    }
+}
diff --git a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto
index 600974a..7251f12 100644
--- a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto
+++ b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto
@@ -72,9 +72,30 @@ message ResourceSearchRequest {
 
 message ResourceSearchResponse {
   org.apache.airavata.datalake.drms.DRMSServiceAuthToken authToken = 1;
-  repeated org.apache.airavata.datalake.drms.resource.GenericResource storagesPreference = 2;
+  repeated org.apache.airavata.datalake.drms.resource.GenericResource resources = 2;
 }
 
+message Metadata {
+  string key = 1;
+  string value = 2;
+}
+
+message AddResourceMetadataRequest {
+  org.apache.airavata.datalake.drms.DRMSServiceAuthToken authToken = 1;
+  string resourceId = 2;
+  Metadata metadata = 3;
+}
+
+message FetchResourceMetadataRequest {
+  org.apache.airavata.datalake.drms.DRMSServiceAuthToken authToken = 1;
+  string resourceId = 2;
+}
+
+message FetchResourceMetadataResponse {
+  repeated Metadata metadata = 1;
+}
+
+
 service ResourceService {
 
   rpc fetchResource (ResourceFetchRequest) returns (ResourceFetchResponse) {
@@ -106,4 +127,16 @@ service ResourceService {
       post: "/v1.0/api/drms/resource/searchPreference"
     };
   }
+
+  rpc addResourceMetadata (AddResourceMetadataRequest) returns (google.protobuf.Empty) {
+    option (google.api.http) = {
+      post: "/v1.0/api/drms/resource/metadata"
+    };
+  }
+
+  rpc fetchResourceMetadata (FetchResourceMetadataRequest) returns (FetchResourceMetadataResponse) {
+    option (google.api.http) = {
+      get: "/v1.0/api/drms/resource/metadata"
+    };
+  }
 }
\ No newline at end of file