You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@teaclave.apache.org by ms...@apache.org on 2020/03/04 23:01:09 UTC

[incubator-teaclave] branch develop updated: [scheduler] Redesign scheduler service APIs in the pub/sub model

This is an automated email from the ASF dual-hosted git repository.

mssun pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git


The following commit(s) were added to refs/heads/develop by this push:
     new cae512b  [scheduler] Redesign scheduler service APIs in the pub/sub model
cae512b is described below

commit cae512b26893fe1e8a7af270baadfaaf0f52a819
Author: Mingshen Sun <bo...@mssun.me>
AuthorDate: Wed Mar 4 14:35:01 2020 -0800

    [scheduler] Redesign scheduler service APIs in the pub/sub model
---
 config/build.config.toml                           |   6 +-
 services/management/enclave/src/lib.rs             |   1 -
 .../src/proto/teaclave_scheduler_service.proto     |  42 +++--
 services/proto/src/teaclave_scheduler_service.rs   | 172 +++++++++++++--------
 services/scheduler/enclave/src/service.rs          | 100 ++++++------
 5 files changed, 169 insertions(+), 152 deletions(-)

diff --git a/config/build.config.toml b/config/build.config.toml
index 1976c5f..369be34 100644
--- a/config/build.config.toml
+++ b/config/build.config.toml
@@ -21,9 +21,9 @@ attestation_validity_secs = 3600
 # Specify accepted inbound services to enforce incoming connections via mutual
 # attestation. Below figure illustrates current topology of Teaclave services.
 #
-# client => authentication <-+
-#                            |
-# client => frontend ----------> management --> storage <-- scheduler <-- execution
+# client => authentication <-+       +----> storage <----+
+#                            |       |                   |
+# client => frontend ----------> management -------> scheduler <-- execution
 #                                    |
 #                                    +--> access_control
 #
diff --git a/services/management/enclave/src/lib.rs b/services/management/enclave/src/lib.rs
index 5a8d8cd..afbc615 100644
--- a/services/management/enclave/src/lib.rs
+++ b/services/management/enclave/src/lib.rs
@@ -41,7 +41,6 @@ use teaclave_rpc::server::SgxTrustedTlsServer;
 use teaclave_service_enclave_utils::ServiceEnclave;
 use teaclave_types::{EnclaveInfo, TeeServiceError, TeeServiceResult};
 
-//mod fusion_data;
 mod service;
 mod task;
 
diff --git a/services/proto/src/proto/teaclave_scheduler_service.proto b/services/proto/src/proto/teaclave_scheduler_service.proto
index 8cce926..aba8e58 100644
--- a/services/proto/src/proto/teaclave_scheduler_service.proto
+++ b/services/proto/src/proto/teaclave_scheduler_service.proto
@@ -1,34 +1,28 @@
 syntax = "proto3";
 package teaclave_scheduler_service_proto;
 
-import "teaclave_common.proto";
-import "teaclave_execution_service.proto";
-
-
-message QueryTaskRequest {
-    string worker_id = 1;
+message SubscribeRequest {}
+message SubscribeResponse {
+  bool success = 1;
 }
 
-message QueryTaskResponse {
-    teaclave_execution_service_proto.StagedFunctionExecuteRequest function_execute_request = 1; 
-    string staged_task_id = 2;
-}
+message PullTaskRequest {}
+message PullTaskResponse {}
 
-message OutputHash {
-    string output_arg_name = 1;
-    string hash = 2;
+message UpdateTaskRequest {
+  string staged_task_id = 2;
 }
+message UpdateTaskResponse {}
 
-message UploadTaskResultRequest {
-    bool success = 1;
-    string staged_task_id = 2;
-    string worker_id = 3;
-    repeated OutputHash output_results = 4;
-}
-
-message UploadTaskResultResponse { }
+message PublishTaskRequest {}
+message PublishTaskResponse {}
 
 service TeaclaveScheduler {
-  rpc QueryTask(QueryTaskRequest) returns (QueryTaskResponse);
-  rpc UploadTaskResult(UploadTaskResultRequest) returns (UploadTaskResultResponse);
-}
\ No newline at end of file
+  // Publisher
+  rpc PublishTask(PublishTaskRequest) returns (PublishTaskResponse);
+
+  // Subscriber
+  rpc Subscribe(SubscribeRequest) returns (SubscribeResponse);
+  rpc PullTask(PullTaskRequest) returns (PullTaskResponse);
+  rpc UpdateTask(UpdateTaskRequest) returns (UpdateTaskResponse);
+}
diff --git a/services/proto/src/teaclave_scheduler_service.rs b/services/proto/src/teaclave_scheduler_service.rs
index a2b7f6e..0584fed 100644
--- a/services/proto/src/teaclave_scheduler_service.rs
+++ b/services/proto/src/teaclave_scheduler_service.rs
@@ -1,3 +1,6 @@
+#![allow(unused_imports)]
+#![allow(unused_variables)]
+
 use std::collections::HashMap;
 use std::prelude::v1::*;
 
@@ -6,121 +9,154 @@ use crate::teaclave_scheduler_service_proto as proto;
 use anyhow::{Error, Result};
 use core::convert::TryInto;
 pub use proto::TeaclaveScheduler;
-pub use proto::TeaclaveSchedulerClient;
 pub use proto::TeaclaveSchedulerRequest;
 pub use proto::TeaclaveSchedulerResponse;
 use teaclave_rpc::into_request;
 
-#[into_request(TeaclaveSchedulerRequest::QueryTask)]
-#[derive(Debug)]
-pub struct QueryTaskRequest {
-    pub worker_id: String,
+#[into_request(TeaclaveSchedulerRequest::Subscribe)]
+pub struct SubscribeRequest {}
+
+#[into_request(TeaclaveSchedulerResponse::Subscribe)]
+pub struct SubscribeResponse {
+    pub success: bool,
 }
 
-#[into_request(TeaclaveSchedulerResponse::QueryTask)]
-#[derive(Debug)]
-pub struct QueryTaskResponse {
-    pub function_execute_request: Option<StagedFunctionExecuteRequest>,
+#[into_request(TeaclaveSchedulerRequest::PullTask)]
+pub struct PullTaskRequest {}
+
+#[into_request(TeaclaveSchedulerResponse::PullTask)]
+pub struct PullTaskResponse {}
+
+#[into_request(TeaclaveSchedulerRequest::UpdateTask)]
+pub struct UpdateTaskRequest {
     pub staged_task_id: String,
 }
 
-#[into_request(TeaclaveSchedulerRequest::UploadTaskResult)]
-pub struct UploadTaskResultRequest {
-    pub success: bool,
-    pub staged_task_id: String,
-    pub worker_id: String,
-    pub output_results: HashMap<String, String>,
+#[into_request(TeaclaveSchedulerResponse::UpdateTask)]
+pub struct UpdateTaskResponse {}
+
+#[into_request(TeaclaveSchedulerRequest::PublishTask)]
+pub struct PublishTaskRequest {}
+
+#[into_request(TeaclaveSchedulerResponse::PublishTask)]
+pub struct PublishTaskResponse {}
+
+impl std::convert::TryFrom<proto::SubscribeRequest> for SubscribeRequest {
+    type Error = Error;
+    fn try_from(proto: proto::SubscribeRequest) -> Result<Self> {
+        let ret = Self {};
+        Ok(ret)
+    }
 }
 
-#[into_request(TeaclaveSchedulerResponse::UploadTaskResult)]
-pub struct UploadTaskResultResponse;
+impl std::convert::From<SubscribeRequest> for proto::SubscribeRequest {
+    fn from(req: SubscribeRequest) -> Self {
+        proto::SubscribeRequest {}
+    }
+}
 
-impl std::convert::TryFrom<proto::QueryTaskRequest> for QueryTaskRequest {
+impl std::convert::TryFrom<proto::SubscribeResponse> for SubscribeResponse {
     type Error = Error;
-    fn try_from(proto: proto::QueryTaskRequest) -> Result<Self> {
+    fn try_from(proto: proto::SubscribeResponse) -> Result<Self> {
         let ret = Self {
-            worker_id: proto.worker_id,
+            success: proto.success,
         };
         Ok(ret)
     }
 }
 
-impl std::convert::From<QueryTaskRequest> for proto::QueryTaskRequest {
-    fn from(req: QueryTaskRequest) -> Self {
-        proto::QueryTaskRequest {
-            worker_id: req.worker_id,
+impl std::convert::From<SubscribeResponse> for proto::SubscribeResponse {
+    fn from(req: SubscribeResponse) -> Self {
+        proto::SubscribeResponse {
+            success: req.success,
         }
     }
 }
 
-impl std::convert::TryFrom<proto::QueryTaskResponse> for QueryTaskResponse {
+impl std::convert::TryFrom<proto::PullTaskRequest> for PullTaskRequest {
     type Error = Error;
-    fn try_from(proto: proto::QueryTaskResponse) -> Result<Self> {
-        let function_execute_request = match proto.function_execute_request {
-            Some(v) => Some(v.try_into()?),
-            None => None,
-        };
-        let ret = Self {
-            function_execute_request,
-            staged_task_id: proto.staged_task_id,
-        };
+    fn try_from(proto: proto::PullTaskRequest) -> Result<Self> {
+        let ret = Self {};
         Ok(ret)
     }
 }
 
-impl std::convert::From<QueryTaskResponse> for proto::QueryTaskResponse {
-    fn from(resp: QueryTaskResponse) -> Self {
-        proto::QueryTaskResponse {
-            function_execute_request: resp.function_execute_request.map(|v| v.into()),
-            staged_task_id: resp.staged_task_id,
-        }
+impl std::convert::From<PullTaskRequest> for proto::PullTaskRequest {
+    fn from(req: PullTaskRequest) -> Self {
+        proto::PullTaskRequest {}
     }
 }
 
-impl std::convert::TryFrom<proto::UploadTaskResultRequest> for UploadTaskResultRequest {
+impl std::convert::TryFrom<proto::PullTaskResponse> for PullTaskResponse {
     type Error = Error;
-    fn try_from(proto: proto::UploadTaskResultRequest) -> Result<Self> {
+    fn try_from(proto: proto::PullTaskResponse) -> Result<Self> {
+        let ret = Self {};
+        Ok(ret)
+    }
+}
+
+impl std::convert::From<PullTaskResponse> for proto::PullTaskResponse {
+    fn from(req: PullTaskResponse) -> Self {
+        proto::PullTaskResponse {}
+    }
+}
+
+impl std::convert::TryFrom<proto::UpdateTaskRequest> for UpdateTaskRequest {
+    type Error = Error;
+    fn try_from(proto: proto::UpdateTaskRequest) -> Result<Self> {
         let ret = Self {
-            success: proto.success,
             staged_task_id: proto.staged_task_id,
-            worker_id: proto.worker_id,
-            output_results: proto
-                .output_results
-                .into_iter()
-                .map(|output| (output.output_arg_name, output.hash))
-                .collect(),
         };
         Ok(ret)
     }
 }
 
-impl std::convert::From<UploadTaskResultRequest> for proto::UploadTaskResultRequest {
-    fn from(req: UploadTaskResultRequest) -> Self {
-        proto::UploadTaskResultRequest {
-            success: req.success,
+impl std::convert::From<UpdateTaskRequest> for proto::UpdateTaskRequest {
+    fn from(req: UpdateTaskRequest) -> Self {
+        proto::UpdateTaskRequest {
             staged_task_id: req.staged_task_id,
-            worker_id: req.worker_id,
-            output_results: req
-                .output_results
-                .into_iter()
-                .map(|(output_arg_name, hash)| proto::OutputHash {
-                    output_arg_name,
-                    hash,
-                })
-                .collect(),
         }
     }
 }
 
-impl std::convert::TryFrom<proto::UploadTaskResultResponse> for UploadTaskResultResponse {
+impl std::convert::TryFrom<proto::UpdateTaskResponse> for UpdateTaskResponse {
+    type Error = Error;
+    fn try_from(proto: proto::UpdateTaskResponse) -> Result<Self> {
+        let ret = Self {};
+        Ok(ret)
+    }
+}
+
+impl std::convert::From<UpdateTaskResponse> for proto::UpdateTaskResponse {
+    fn from(req: UpdateTaskResponse) -> Self {
+        proto::UpdateTaskResponse {}
+    }
+}
+
+impl std::convert::TryFrom<proto::PublishTaskRequest> for PublishTaskRequest {
+    type Error = Error;
+    fn try_from(proto: proto::PublishTaskRequest) -> Result<Self> {
+        let ret = Self {};
+        Ok(ret)
+    }
+}
+
+impl std::convert::From<PublishTaskRequest> for proto::PublishTaskRequest {
+    fn from(req: PublishTaskRequest) -> Self {
+        proto::PublishTaskRequest {}
+    }
+}
+
+impl std::convert::TryFrom<proto::PublishTaskResponse> for PublishTaskResponse {
     type Error = Error;
-    fn try_from(_proto: proto::UploadTaskResultResponse) -> Result<Self> {
-        Ok(UploadTaskResultResponse)
+    fn try_from(proto: proto::PublishTaskResponse) -> Result<Self> {
+        let ret = Self {};
+        Ok(ret)
     }
 }
 
-impl std::convert::From<UploadTaskResultResponse> for proto::UploadTaskResultResponse {
-    fn from(_resp: UploadTaskResultResponse) -> Self {
-        proto::UploadTaskResultResponse {}
+impl std::convert::From<PublishTaskResponse> for proto::PublishTaskResponse {
+    fn from(req: PublishTaskResponse) -> Self {
+        proto::PublishTaskResponse {}
     }
 }
diff --git a/services/scheduler/enclave/src/service.rs b/services/scheduler/enclave/src/service.rs
index 893411a..d972569 100644
--- a/services/scheduler/enclave/src/service.rs
+++ b/services/scheduler/enclave/src/service.rs
@@ -15,15 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#![allow(unused_imports)]
+#![allow(unused_variables)]
+
 #[cfg(feature = "mesalock_sgx")]
 use std::prelude::v1::*;
 use std::sync::{Arc, SgxMutex as Mutex};
 
-use teaclave_proto::teaclave_scheduler_service::{
-    QueryTaskRequest, QueryTaskResponse, TeaclaveScheduler, UploadTaskResultRequest,
-    UploadTaskResultResponse,
-};
-use teaclave_proto::teaclave_storage_service::{DequeueRequest, TeaclaveStorageClient};
+use teaclave_proto::teaclave_scheduler_service::*;
+use teaclave_proto::teaclave_storage_service::*;
 use teaclave_rpc::endpoint::Endpoint;
 use teaclave_rpc::Request;
 use teaclave_service_enclave_utils::teaclave_service;
@@ -34,22 +34,6 @@ use teaclave_types::{
 use anyhow::Result;
 use thiserror::Error;
 
-#[derive(Error, Debug)]
-pub enum TeaclaveSchedulerError {
-    #[error("scheduler service error")]
-    SchedulerServiceErr,
-    #[error("data error")]
-    DataError,
-    #[error("storage error")]
-    StorageError,
-}
-
-impl From<TeaclaveSchedulerError> for TeaclaveServiceResponseError {
-    fn from(error: TeaclaveSchedulerError) -> Self {
-        TeaclaveServiceResponseError::RequestError(error.to_string())
-    }
-}
-
 #[teaclave_service(teaclave_scheduler_service, TeaclaveScheduler, TeaclaveSchedulerError)]
 #[derive(Clone)]
 pub(crate) struct TeaclaveSchedulerService {
@@ -58,50 +42,54 @@ pub(crate) struct TeaclaveSchedulerService {
 
 impl TeaclaveSchedulerService {
     pub(crate) fn new(storage_service_endpoint: Endpoint) -> Result<Self> {
-        let channel = storage_service_endpoint.connect()?;
-        let client = TeaclaveStorageClient::new(channel)?;
-        let service = Self {
-            storage_client: Arc::new(Mutex::new(client)),
+        let mut i = 0;
+        let channel = loop {
+            match storage_service_endpoint.connect() {
+                Ok(channel) => break channel,
+                Err(_) => {
+                    anyhow::ensure!(i < 3, "failed to connect to storage service");
+                    log::debug!("Failed to connect to storage service, retry {}", i);
+                    i += 1;
+                }
+            }
+            std::thread::sleep(std::time::Duration::from_secs(1));
         };
-        Ok(service)
-    }
+        let storage_client = Arc::new(Mutex::new(TeaclaveStorageClient::new(channel)?));
+        let service = Self { storage_client };
 
-    fn dequeue_from_db<T: Storable>(&self, key: &[u8]) -> TeaclaveServiceResponseResult<T> {
-        let dequeue_request = DequeueRequest::new(key);
-        let dequeue_response = self
-            .storage_client
-            .clone()
-            .lock()
-            .map_err(|_| TeaclaveSchedulerError::StorageError)?
-            .dequeue(dequeue_request)?;
-        T::from_slice(dequeue_response.value.as_slice())
-            .map_err(|_| TeaclaveSchedulerError::DataError.into())
+        Ok(service)
     }
 }
 
 impl TeaclaveScheduler for TeaclaveSchedulerService {
-    fn query_task(
+    // Publisher
+    fn publish_task(
         &self,
-        request: Request<QueryTaskRequest>,
-    ) -> TeaclaveServiceResponseResult<QueryTaskResponse> {
-        let request = request.message;
-        let _worker_id = request.worker_id;
-        let key = StagedTask::get_queue_key().as_bytes();
-        let task: TeaclaveServiceResponseResult<StagedTask> = self.dequeue_from_db(key);
-        let response = match task {
-            Ok(_task) => unimplemented!(),
-            Err(_) => QueryTaskResponse {
-                function_execute_request: None,
-                staged_task_id: "".to_owned(),
-            },
-        };
-        Ok(response)
+        request: Request<PublishTaskRequest>,
+    ) -> TeaclaveServiceResponseResult<PublishTaskResponse> {
+        unimplemented!()
+    }
+
+    // Subscriber
+    fn subscribe(
+        &self,
+        request: Request<SubscribeRequest>,
+    ) -> TeaclaveServiceResponseResult<SubscribeResponse> {
+        unimplemented!()
     }
-    fn upload_task_result(
+
+    fn pull_task(
+        &self,
+        request: Request<PullTaskRequest>,
+    ) -> TeaclaveServiceResponseResult<PullTaskResponse> {
+        unimplemented!()
+    }
+
+    fn update_task(
         &self,
-        _request: Request<UploadTaskResultRequest>,
-    ) -> TeaclaveServiceResponseResult<UploadTaskResultResponse> {
-        unimplemented!();
+        request: Request<UpdateTaskRequest>,
+    ) -> TeaclaveServiceResponseResult<UpdateTaskResponse> {
+        unimplemented!()
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@teaclave.apache.org
For additional commands, e-mail: commits-help@teaclave.apache.org