You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@teaclave.apache.org by ms...@apache.org on 2020/03/04 23:01:09 UTC
[incubator-teaclave] branch develop updated: [scheduler] Redesign
scheduler service APIs in the pub/sub model
This is an automated email from the ASF dual-hosted git repository.
mssun pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git
The following commit(s) were added to refs/heads/develop by this push:
new cae512b [scheduler] Redesign scheduler service APIs in the pub/sub model
cae512b is described below
commit cae512b26893fe1e8a7af270baadfaaf0f52a819
Author: Mingshen Sun <bo...@mssun.me>
AuthorDate: Wed Mar 4 14:35:01 2020 -0800
[scheduler] Redesign scheduler service APIs in the pub/sub model
---
config/build.config.toml | 6 +-
services/management/enclave/src/lib.rs | 1 -
.../src/proto/teaclave_scheduler_service.proto | 42 +++--
services/proto/src/teaclave_scheduler_service.rs | 172 +++++++++++++--------
services/scheduler/enclave/src/service.rs | 100 ++++++------
5 files changed, 169 insertions(+), 152 deletions(-)
diff --git a/config/build.config.toml b/config/build.config.toml
index 1976c5f..369be34 100644
--- a/config/build.config.toml
+++ b/config/build.config.toml
@@ -21,9 +21,9 @@ attestation_validity_secs = 3600
# Specify accepted inbound services to enforce incoming connections via mutual
# attestation. Below figure illustrates current topology of Teaclave services.
#
-# client => authentication <-+
-# |
-# client => frontend ----------> management --> storage <-- scheduler <-- execution
+# client => authentication <-+ +----> storage <----+
+# | | |
+# client => frontend ----------> management -------> scheduler <-- execution
# |
# +--> access_control
#
diff --git a/services/management/enclave/src/lib.rs b/services/management/enclave/src/lib.rs
index 5a8d8cd..afbc615 100644
--- a/services/management/enclave/src/lib.rs
+++ b/services/management/enclave/src/lib.rs
@@ -41,7 +41,6 @@ use teaclave_rpc::server::SgxTrustedTlsServer;
use teaclave_service_enclave_utils::ServiceEnclave;
use teaclave_types::{EnclaveInfo, TeeServiceError, TeeServiceResult};
-//mod fusion_data;
mod service;
mod task;
diff --git a/services/proto/src/proto/teaclave_scheduler_service.proto b/services/proto/src/proto/teaclave_scheduler_service.proto
index 8cce926..aba8e58 100644
--- a/services/proto/src/proto/teaclave_scheduler_service.proto
+++ b/services/proto/src/proto/teaclave_scheduler_service.proto
@@ -1,34 +1,28 @@
syntax = "proto3";
package teaclave_scheduler_service_proto;
-import "teaclave_common.proto";
-import "teaclave_execution_service.proto";
-
-
-message QueryTaskRequest {
- string worker_id = 1;
+message SubscribeRequest {}
+message SubscribeResponse {
+ bool success = 1;
}
-message QueryTaskResponse {
- teaclave_execution_service_proto.StagedFunctionExecuteRequest function_execute_request = 1;
- string staged_task_id = 2;
-}
+message PullTaskRequest {}
+message PullTaskResponse {}
-message OutputHash {
- string output_arg_name = 1;
- string hash = 2;
+message UpdateTaskRequest {
+ string staged_task_id = 2;
}
+message UpdateTaskResponse {}
-message UploadTaskResultRequest {
- bool success = 1;
- string staged_task_id = 2;
- string worker_id = 3;
- repeated OutputHash output_results = 4;
-}
-
-message UploadTaskResultResponse { }
+message PublishTaskRequest {}
+message PublishTaskResponse {}
service TeaclaveScheduler {
- rpc QueryTask(QueryTaskRequest) returns (QueryTaskResponse);
- rpc UploadTaskResult(UploadTaskResultRequest) returns (UploadTaskResultResponse);
-}
\ No newline at end of file
+ // Publisher
+ rpc PublishTask(PublishTaskRequest) returns (PublishTaskResponse);
+
+ // Subscriber
+ rpc Subscribe(SubscribeRequest) returns (SubscribeResponse);
+ rpc PullTask(PullTaskRequest) returns (PullTaskResponse);
+ rpc UpdateTask(UpdateTaskRequest) returns (UpdateTaskResponse);
+}
diff --git a/services/proto/src/teaclave_scheduler_service.rs b/services/proto/src/teaclave_scheduler_service.rs
index a2b7f6e..0584fed 100644
--- a/services/proto/src/teaclave_scheduler_service.rs
+++ b/services/proto/src/teaclave_scheduler_service.rs
@@ -1,3 +1,6 @@
+#![allow(unused_imports)]
+#![allow(unused_variables)]
+
use std::collections::HashMap;
use std::prelude::v1::*;
@@ -6,121 +9,154 @@ use crate::teaclave_scheduler_service_proto as proto;
use anyhow::{Error, Result};
use core::convert::TryInto;
pub use proto::TeaclaveScheduler;
-pub use proto::TeaclaveSchedulerClient;
pub use proto::TeaclaveSchedulerRequest;
pub use proto::TeaclaveSchedulerResponse;
use teaclave_rpc::into_request;
-#[into_request(TeaclaveSchedulerRequest::QueryTask)]
-#[derive(Debug)]
-pub struct QueryTaskRequest {
- pub worker_id: String,
+#[into_request(TeaclaveSchedulerRequest::Subscribe)]
+pub struct SubscribeRequest {}
+
+#[into_request(TeaclaveSchedulerResponse::Subscribe)]
+pub struct SubscribeResponse {
+ pub success: bool,
}
-#[into_request(TeaclaveSchedulerResponse::QueryTask)]
-#[derive(Debug)]
-pub struct QueryTaskResponse {
- pub function_execute_request: Option<StagedFunctionExecuteRequest>,
+#[into_request(TeaclaveSchedulerRequest::PullTask)]
+pub struct PullTaskRequest {}
+
+#[into_request(TeaclaveSchedulerResponse::PullTask)]
+pub struct PullTaskResponse {}
+
+#[into_request(TeaclaveSchedulerRequest::UpdateTask)]
+pub struct UpdateTaskRequest {
pub staged_task_id: String,
}
-#[into_request(TeaclaveSchedulerRequest::UploadTaskResult)]
-pub struct UploadTaskResultRequest {
- pub success: bool,
- pub staged_task_id: String,
- pub worker_id: String,
- pub output_results: HashMap<String, String>,
+#[into_request(TeaclaveSchedulerResponse::UpdateTask)]
+pub struct UpdateTaskResponse {}
+
+#[into_request(TeaclaveSchedulerRequest::PublishTask)]
+pub struct PublishTaskRequest {}
+
+#[into_request(TeaclaveSchedulerResponse::PublishTask)]
+pub struct PublishTaskResponse {}
+
+impl std::convert::TryFrom<proto::SubscribeRequest> for SubscribeRequest {
+ type Error = Error;
+ fn try_from(proto: proto::SubscribeRequest) -> Result<Self> {
+ let ret = Self {};
+ Ok(ret)
+ }
}
-#[into_request(TeaclaveSchedulerResponse::UploadTaskResult)]
-pub struct UploadTaskResultResponse;
+impl std::convert::From<SubscribeRequest> for proto::SubscribeRequest {
+ fn from(req: SubscribeRequest) -> Self {
+ proto::SubscribeRequest {}
+ }
+}
-impl std::convert::TryFrom<proto::QueryTaskRequest> for QueryTaskRequest {
+impl std::convert::TryFrom<proto::SubscribeResponse> for SubscribeResponse {
type Error = Error;
- fn try_from(proto: proto::QueryTaskRequest) -> Result<Self> {
+ fn try_from(proto: proto::SubscribeResponse) -> Result<Self> {
let ret = Self {
- worker_id: proto.worker_id,
+ success: proto.success,
};
Ok(ret)
}
}
-impl std::convert::From<QueryTaskRequest> for proto::QueryTaskRequest {
- fn from(req: QueryTaskRequest) -> Self {
- proto::QueryTaskRequest {
- worker_id: req.worker_id,
+impl std::convert::From<SubscribeResponse> for proto::SubscribeResponse {
+ fn from(req: SubscribeResponse) -> Self {
+ proto::SubscribeResponse {
+ success: req.success,
}
}
}
-impl std::convert::TryFrom<proto::QueryTaskResponse> for QueryTaskResponse {
+impl std::convert::TryFrom<proto::PullTaskRequest> for PullTaskRequest {
type Error = Error;
- fn try_from(proto: proto::QueryTaskResponse) -> Result<Self> {
- let function_execute_request = match proto.function_execute_request {
- Some(v) => Some(v.try_into()?),
- None => None,
- };
- let ret = Self {
- function_execute_request,
- staged_task_id: proto.staged_task_id,
- };
+ fn try_from(proto: proto::PullTaskRequest) -> Result<Self> {
+ let ret = Self {};
Ok(ret)
}
}
-impl std::convert::From<QueryTaskResponse> for proto::QueryTaskResponse {
- fn from(resp: QueryTaskResponse) -> Self {
- proto::QueryTaskResponse {
- function_execute_request: resp.function_execute_request.map(|v| v.into()),
- staged_task_id: resp.staged_task_id,
- }
+impl std::convert::From<PullTaskRequest> for proto::PullTaskRequest {
+ fn from(req: PullTaskRequest) -> Self {
+ proto::PullTaskRequest {}
}
}
-impl std::convert::TryFrom<proto::UploadTaskResultRequest> for UploadTaskResultRequest {
+impl std::convert::TryFrom<proto::PullTaskResponse> for PullTaskResponse {
type Error = Error;
- fn try_from(proto: proto::UploadTaskResultRequest) -> Result<Self> {
+ fn try_from(proto: proto::PullTaskResponse) -> Result<Self> {
+ let ret = Self {};
+ Ok(ret)
+ }
+}
+
+impl std::convert::From<PullTaskResponse> for proto::PullTaskResponse {
+ fn from(req: PullTaskResponse) -> Self {
+ proto::PullTaskResponse {}
+ }
+}
+
+impl std::convert::TryFrom<proto::UpdateTaskRequest> for UpdateTaskRequest {
+ type Error = Error;
+ fn try_from(proto: proto::UpdateTaskRequest) -> Result<Self> {
let ret = Self {
- success: proto.success,
staged_task_id: proto.staged_task_id,
- worker_id: proto.worker_id,
- output_results: proto
- .output_results
- .into_iter()
- .map(|output| (output.output_arg_name, output.hash))
- .collect(),
};
Ok(ret)
}
}
-impl std::convert::From<UploadTaskResultRequest> for proto::UploadTaskResultRequest {
- fn from(req: UploadTaskResultRequest) -> Self {
- proto::UploadTaskResultRequest {
- success: req.success,
+impl std::convert::From<UpdateTaskRequest> for proto::UpdateTaskRequest {
+ fn from(req: UpdateTaskRequest) -> Self {
+ proto::UpdateTaskRequest {
staged_task_id: req.staged_task_id,
- worker_id: req.worker_id,
- output_results: req
- .output_results
- .into_iter()
- .map(|(output_arg_name, hash)| proto::OutputHash {
- output_arg_name,
- hash,
- })
- .collect(),
}
}
}
-impl std::convert::TryFrom<proto::UploadTaskResultResponse> for UploadTaskResultResponse {
+impl std::convert::TryFrom<proto::UpdateTaskResponse> for UpdateTaskResponse {
+ type Error = Error;
+ fn try_from(proto: proto::UpdateTaskResponse) -> Result<Self> {
+ let ret = Self {};
+ Ok(ret)
+ }
+}
+
+impl std::convert::From<UpdateTaskResponse> for proto::UpdateTaskResponse {
+ fn from(req: UpdateTaskResponse) -> Self {
+ proto::UpdateTaskResponse {}
+ }
+}
+
+impl std::convert::TryFrom<proto::PublishTaskRequest> for PublishTaskRequest {
+ type Error = Error;
+ fn try_from(proto: proto::PublishTaskRequest) -> Result<Self> {
+ let ret = Self {};
+ Ok(ret)
+ }
+}
+
+impl std::convert::From<PublishTaskRequest> for proto::PublishTaskRequest {
+ fn from(req: PublishTaskRequest) -> Self {
+ proto::PublishTaskRequest {}
+ }
+}
+
+impl std::convert::TryFrom<proto::PublishTaskResponse> for PublishTaskResponse {
type Error = Error;
- fn try_from(_proto: proto::UploadTaskResultResponse) -> Result<Self> {
- Ok(UploadTaskResultResponse)
+ fn try_from(proto: proto::PublishTaskResponse) -> Result<Self> {
+ let ret = Self {};
+ Ok(ret)
}
}
-impl std::convert::From<UploadTaskResultResponse> for proto::UploadTaskResultResponse {
- fn from(_resp: UploadTaskResultResponse) -> Self {
- proto::UploadTaskResultResponse {}
+impl std::convert::From<PublishTaskResponse> for proto::PublishTaskResponse {
+ fn from(req: PublishTaskResponse) -> Self {
+ proto::PublishTaskResponse {}
}
}
diff --git a/services/scheduler/enclave/src/service.rs b/services/scheduler/enclave/src/service.rs
index 893411a..d972569 100644
--- a/services/scheduler/enclave/src/service.rs
+++ b/services/scheduler/enclave/src/service.rs
@@ -15,15 +15,15 @@
// specific language governing permissions and limitations
// under the License.
+#![allow(unused_imports)]
+#![allow(unused_variables)]
+
#[cfg(feature = "mesalock_sgx")]
use std::prelude::v1::*;
use std::sync::{Arc, SgxMutex as Mutex};
-use teaclave_proto::teaclave_scheduler_service::{
- QueryTaskRequest, QueryTaskResponse, TeaclaveScheduler, UploadTaskResultRequest,
- UploadTaskResultResponse,
-};
-use teaclave_proto::teaclave_storage_service::{DequeueRequest, TeaclaveStorageClient};
+use teaclave_proto::teaclave_scheduler_service::*;
+use teaclave_proto::teaclave_storage_service::*;
use teaclave_rpc::endpoint::Endpoint;
use teaclave_rpc::Request;
use teaclave_service_enclave_utils::teaclave_service;
@@ -34,22 +34,6 @@ use teaclave_types::{
use anyhow::Result;
use thiserror::Error;
-#[derive(Error, Debug)]
-pub enum TeaclaveSchedulerError {
- #[error("scheduler service error")]
- SchedulerServiceErr,
- #[error("data error")]
- DataError,
- #[error("storage error")]
- StorageError,
-}
-
-impl From<TeaclaveSchedulerError> for TeaclaveServiceResponseError {
- fn from(error: TeaclaveSchedulerError) -> Self {
- TeaclaveServiceResponseError::RequestError(error.to_string())
- }
-}
-
#[teaclave_service(teaclave_scheduler_service, TeaclaveScheduler, TeaclaveSchedulerError)]
#[derive(Clone)]
pub(crate) struct TeaclaveSchedulerService {
@@ -58,50 +42,54 @@ pub(crate) struct TeaclaveSchedulerService {
impl TeaclaveSchedulerService {
pub(crate) fn new(storage_service_endpoint: Endpoint) -> Result<Self> {
- let channel = storage_service_endpoint.connect()?;
- let client = TeaclaveStorageClient::new(channel)?;
- let service = Self {
- storage_client: Arc::new(Mutex::new(client)),
+ let mut i = 0;
+ let channel = loop {
+ match storage_service_endpoint.connect() {
+ Ok(channel) => break channel,
+ Err(_) => {
+ anyhow::ensure!(i < 3, "failed to connect to storage service");
+ log::debug!("Failed to connect to storage service, retry {}", i);
+ i += 1;
+ }
+ }
+ std::thread::sleep(std::time::Duration::from_secs(1));
};
- Ok(service)
- }
+ let storage_client = Arc::new(Mutex::new(TeaclaveStorageClient::new(channel)?));
+ let service = Self { storage_client };
- fn dequeue_from_db<T: Storable>(&self, key: &[u8]) -> TeaclaveServiceResponseResult<T> {
- let dequeue_request = DequeueRequest::new(key);
- let dequeue_response = self
- .storage_client
- .clone()
- .lock()
- .map_err(|_| TeaclaveSchedulerError::StorageError)?
- .dequeue(dequeue_request)?;
- T::from_slice(dequeue_response.value.as_slice())
- .map_err(|_| TeaclaveSchedulerError::DataError.into())
+ Ok(service)
}
}
impl TeaclaveScheduler for TeaclaveSchedulerService {
- fn query_task(
+ // Publisher
+ fn publish_task(
&self,
- request: Request<QueryTaskRequest>,
- ) -> TeaclaveServiceResponseResult<QueryTaskResponse> {
- let request = request.message;
- let _worker_id = request.worker_id;
- let key = StagedTask::get_queue_key().as_bytes();
- let task: TeaclaveServiceResponseResult<StagedTask> = self.dequeue_from_db(key);
- let response = match task {
- Ok(_task) => unimplemented!(),
- Err(_) => QueryTaskResponse {
- function_execute_request: None,
- staged_task_id: "".to_owned(),
- },
- };
- Ok(response)
+ request: Request<PublishTaskRequest>,
+ ) -> TeaclaveServiceResponseResult<PublishTaskResponse> {
+ unimplemented!()
+ }
+
+ // Subscriber
+ fn subscribe(
+ &self,
+ request: Request<SubscribeRequest>,
+ ) -> TeaclaveServiceResponseResult<SubscribeResponse> {
+ unimplemented!()
}
- fn upload_task_result(
+
+ fn pull_task(
+ &self,
+ request: Request<PullTaskRequest>,
+ ) -> TeaclaveServiceResponseResult<PullTaskResponse> {
+ unimplemented!()
+ }
+
+ fn update_task(
&self,
- _request: Request<UploadTaskResultRequest>,
- ) -> TeaclaveServiceResponseResult<UploadTaskResultResponse> {
- unimplemented!();
+ request: Request<UpdateTaskRequest>,
+ ) -> TeaclaveServiceResponseResult<UpdateTaskResponse> {
+ unimplemented!()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@teaclave.apache.org
For additional commands, e-mail: commits-help@teaclave.apache.org