You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/06/01 03:03:23 UTC
[airavata-data-lake] 16/42: Implementing generic resource handler
and metadata ingestion
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
commit b6b5cc43be4931b427958e2863d6865b45b5c6ca
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Wed Mar 24 13:03:53 2021 -0400
Implementing generic resource handler and metadata ingestion
---
.../drms/api/handlers/ResourceServiceHandler.java | 99 +++++++++++++++++++++-
.../handlers/StoragePreferenceServiceHandler.java | 6 +-
.../drms/api/handlers/StorageServiceHandler.java | 2 +
.../drms/core/constants/ResourceConstants.java | 21 +++++
.../deserializer/GenericResourceDeserializer.java | 89 +++++++++++++++++++
.../main/proto/resource/DRMSResourceService.proto | 35 +++++++-
6 files changed, 247 insertions(+), 5 deletions(-)
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
index 4e34e7a..34efc05 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java
@@ -17,15 +17,78 @@
package org.apache.airavata.drms.api.handlers;
import com.google.protobuf.Empty;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
+import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
+import org.apache.airavata.datalake.drms.groups.FetchCurrentUserRequest;
+import org.apache.airavata.datalake.drms.groups.FetchCurrentUserResponse;
+import org.apache.airavata.datalake.drms.groups.GroupServiceGrpc;
+import org.apache.airavata.datalake.drms.groups.User;
+import org.apache.airavata.datalake.drms.resource.GenericResource;
import org.apache.airavata.datalake.drms.storage.*;
+import org.apache.airavata.drms.core.Neo4JConnector;
+import org.apache.airavata.drms.core.constants.ResourceConstants;
+import org.apache.airavata.drms.core.constants.StorageConstants;
+import org.apache.airavata.drms.core.deserializer.AnyStoragePreferenceDeserializer;
+import org.apache.airavata.drms.core.deserializer.GenericResourceDeserializer;
import org.lognet.springboot.grpc.GRpcService;
+import org.neo4j.driver.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.List;
@GRpcService
public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceImplBase {
+
+ private static final Logger logger = LoggerFactory.getLogger(ResourceServiceHandler.class);
+
+ @Autowired
+ private Neo4JConnector neo4JConnector;
+
+ @org.springframework.beans.factory.annotation.Value("${group.service.host}")
+ private String groupServiceHost;
+
+ @org.springframework.beans.factory.annotation.Value("${group.service.port}")
+ private int groupServicePort;
+
+ private User getUser(DRMSServiceAuthToken authToken) {
+ ManagedChannel channel = ManagedChannelBuilder.forAddress(groupServiceHost, groupServicePort).usePlaintext().build();
+ GroupServiceGrpc.GroupServiceBlockingStub groupClient = GroupServiceGrpc.newBlockingStub(channel);
+ FetchCurrentUserResponse userResponse = groupClient.fetchCurrentUser(
+ FetchCurrentUserRequest.newBuilder().setAuthToken(authToken).build());
+ return userResponse.getUser();
+ }
+
@Override
public void fetchResource(ResourceFetchRequest request, StreamObserver<ResourceFetchResponse> responseObserver) {
- super.fetchResource(request, responseObserver);
+ User callUser = getUser(request.getAuthToken());
+
+ // TODO review (u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp),
+ List<Record> records = this.neo4JConnector.searchNodes(
+ "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)-[r6:HAS_RESOURCE]->(res:Resource), " +
+ "(u)-[r7:MEMBER_OF]->(g3:Group)<-[r8:SHARED_WITH]-(res) " +
+ "where res.resourceId = '" + request.getResourceId() + "' and u.userId = '"
+ + callUser.getUserId() + "' return distinct res, sp, s");
+
+ if (!records.isEmpty()) {
+ try {
+ List<GenericResource> genericResourceList = GenericResourceDeserializer.deserializeList(records);
+ responseObserver.onNext(ResourceFetchResponse.newBuilder().setResource(genericResourceList.get(0)).build());
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+
+ logger.error("Errored while fetching resource with id {}", request.getResourceId(), e);
+ responseObserver.onError(new Exception("Errored while fetching resource with id "
+ + request.getResourceId() + ". Msg " + e.getMessage()));
+ }
+ } else {
+ logger.error("Could not find a generic resource with id {}", request.getResourceId());
+ responseObserver.onError(new Exception("Could not find a generic resource with id "
+ + request.getResourceId()));
+ }
}
@Override
@@ -45,6 +108,38 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI
@Override
public void searchResource(ResourceSearchRequest request, StreamObserver<ResourceSearchResponse> responseObserver) {
- super.searchResource(request, responseObserver);
+ User callUser = getUser(request.getAuthToken());
+
+ // TODO review (u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp),
+ List<Record> records = this.neo4JConnector.searchNodes(
+ "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)-[r6:HAS_RESOURCE]->(res:Resource), " +
+ "(u)-[r7:MEMBER_OF]->(g3:Group)<-[r8:SHARED_WITH]-(res) " +
+ "where u.userId = '" + callUser.getUserId() + "' return distinct res, sp, s");
+ try {
+ List<GenericResource> genericResourceList = GenericResourceDeserializer.deserializeList(records);
+ ResourceSearchResponse.Builder builder = ResourceSearchResponse.newBuilder();
+ builder.addAllResources(genericResourceList);
+ responseObserver.onNext(builder.build());
+ responseObserver.onCompleted();
+
+ } catch (Exception e) {
+ logger.error("Errored while searching generic resources; Message: {}", e.getMessage(), e);
+ responseObserver.onError(e);
+ }
+ }
+
+ @Override
+ public void addResourceMetadata(AddResourceMetadataRequest request, StreamObserver<Empty> responseObserver) {
+ User callUser = getUser(request.getAuthToken());
+ this.neo4JConnector.createMetadataNode(ResourceConstants.RESOURCE_LABEL, "resourceId",
+ request.getResourceId(), callUser.getUserId(),
+ request.getMetadata().getKey(), request.getMetadata().getValue());
+ responseObserver.onNext(Empty.getDefaultInstance());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void fetchResourceMetadata(FetchResourceMetadataRequest request, StreamObserver<FetchResourceMetadataResponse> responseObserver) {
+ super.fetchResourceMetadata(request, responseObserver);
}
}
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
index 3d7b44a..ab560c3 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
@@ -65,7 +65,8 @@ public class StoragePreferenceServiceHandler extends StoragePreferenceServiceGrp
User callUser = getUser(request.getAuthToken());
List<Record> records = this.neo4JConnector.searchNodes(
- "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference) " +
+ "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference), " +
+ "(u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp) " +
"where sp.storagePreferenceId = '" + request.getStoragePreferenceId() + "' and u.userId = '"
+ callUser.getUserId() + "' return distinct sp, s");
@@ -108,7 +109,8 @@ public class StoragePreferenceServiceHandler extends StoragePreferenceServiceGrp
User callUser = getUser(request.getAuthToken());
List<Record> records = this.neo4JConnector.searchNodes(
- "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)" +
+ "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference), " +
+ "(u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp)" +
" where u.userId ='" + callUser.getUserId() + "' return distinct sp, s");
try {
List<AnyStoragePreference> storagePrefList = AnyStoragePreferenceDeserializer.deserializeList(records);
diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
index a806e70..a00b7ca 100644
--- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
+++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
@@ -113,6 +113,8 @@ public class StorageServiceHandler extends StorageServiceGrpc.StorageServiceImpl
this.neo4JConnector.createMetadataNode(StorageConstants.STORAGE_LABEL, "storageId",
request.getStorageId(), callUser.getUserId(),
request.getKey(), request.getValue());
+ responseObserver.onNext(Empty.getDefaultInstance());
+ responseObserver.onCompleted();
}
@Override
diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/ResourceConstants.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/ResourceConstants.java
new file mode 100644
index 0000000..2c7e0e2
--- /dev/null
+++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/ResourceConstants.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.airavata.drms.core.constants;
+
+public class ResourceConstants {
+ public static final String RESOURCE_LABEL = "Resource";
+}
diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/GenericResourceDeserializer.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/GenericResourceDeserializer.java
new file mode 100644
index 0000000..c4f4d97
--- /dev/null
+++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/GenericResourceDeserializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.drms.core.deserializer;
+
+import org.apache.airavata.datalake.drms.resource.GenericResource;
+import org.apache.airavata.datalake.drms.storage.AnyStorage;
+import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
+import org.apache.airavata.drms.core.constants.ResourceConstants;
+import org.apache.airavata.drms.core.constants.StorageConstants;
+import org.apache.airavata.drms.core.constants.StoragePreferenceConstants;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.internal.InternalRecord;
+import org.neo4j.driver.types.Node;
+import org.springframework.beans.BeanWrapper;
+import org.springframework.beans.PropertyAccessorFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class GenericResourceDeserializer {
+
+ public static List<GenericResource> deserializeList(List<Record> neo4jRecords) throws Exception {
+ List<GenericResource> resourceList = new ArrayList<>();
+ for (Record record : neo4jRecords) {
+ InternalRecord internalRecord = (InternalRecord) record;
+ List<Value> values = internalRecord.values();
+ if (values.size() == 3) {
+ Value resourceValue = values.get(0);
+ Value prfValue = values.get(1);
+ Value stoValue = values.get(2);
+ Node resourceNode = resourceValue.asNode();
+ Node prefNode = prfValue.asNode();
+ Node stoNode = stoValue.asNode();
+ if (resourceNode.hasLabel(ResourceConstants.RESOURCE_LABEL) &&
+ prefNode.hasLabel(StoragePreferenceConstants.STORAGE_PREFERENCE_LABEL) &&
+ stoNode.hasLabel(StorageConstants.STORAGE_LABEL)) {
+
+ AnyStorage storage = AnyStorageDeserializer.deriveStorageFromMap(stoNode.asMap());
+ AnyStoragePreference preference = AnyStoragePreferenceDeserializer.deriveStoragePrefFromMap(
+ prefNode.asMap(), storage);
+ GenericResource genericResource = deriveGenericResourceFromMap(resourceNode.asMap(), preference);
+ resourceList.add(genericResource);
+ }
+ }
+ }
+ return resourceList;
+ }
+
+ public static GenericResource deriveGenericResourceFromMap(Map<String, Object> fixedMap,
+ AnyStoragePreference preference) throws Exception {
+
+ GenericResource.Builder genericResourceBuilder = GenericResource.newBuilder();
+ setObjectFieldsUsingMap(genericResourceBuilder, fixedMap);
+ switch (preference.getStorageCase()){
+ case S3STORAGEPREFERENCE:
+ genericResourceBuilder.setS3Preference(preference.getS3StoragePreference());
+ break;
+ case SSHSTORAGEPREFERENCE:
+ genericResourceBuilder.setSshPreference(preference.getSshStoragePreference());
+ break;
+ }
+
+ return genericResourceBuilder.build();
+ }
+
+ private static void setObjectFieldsUsingMap(Object target, Map<String, Object> values) {
+ for (String field :values.keySet()) {
+ BeanWrapper beanWrapper = PropertyAccessorFactory.forBeanPropertyAccess(target);
+ beanWrapper.setPropertyValue(field, values.get(field));
+ }
+ }
+}
diff --git a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto
index 600974a..7251f12 100644
--- a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto
+++ b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto
@@ -72,9 +72,30 @@ message ResourceSearchRequest {
message ResourceSearchResponse {
org.apache.airavata.datalake.drms.DRMSServiceAuthToken authToken = 1;
- repeated org.apache.airavata.datalake.drms.resource.GenericResource storagesPreference = 2;
+ repeated org.apache.airavata.datalake.drms.resource.GenericResource resources = 2;
}
+message Metadata {
+ string key = 1;
+ string value = 2;
+}
+
+message AddResourceMetadataRequest {
+ org.apache.airavata.datalake.drms.DRMSServiceAuthToken authToken = 1;
+ string resourceId = 2;
+ Metadata metadata = 3;
+}
+
+message FetchResourceMetadataRequest {
+ org.apache.airavata.datalake.drms.DRMSServiceAuthToken authToken = 1;
+ string resourceId = 2;
+}
+
+message FetchResourceMetadataResponse {
+ repeated Metadata metadata = 1;
+}
+
+
service ResourceService {
rpc fetchResource (ResourceFetchRequest) returns (ResourceFetchResponse) {
@@ -106,4 +127,16 @@ service ResourceService {
post: "/v1.0/api/drms/resource/searchPreference"
};
}
+
+ rpc addResourceMetadata (AddResourceMetadataRequest) returns (google.protobuf.Empty) {
+ option (google.api.http) = {
+ post: "/v1.0/api/drms/resource/metadata"
+ };
+ }
+
+ rpc fetchResourceMetadata (FetchResourceMetadataRequest) returns (FetchResourceMetadataResponse) {
+ option (google.api.http) = {
+ get: "/v1.0/api/drms/resource/metadata"
+ };
+ }
}
\ No newline at end of file