You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@teaclave.apache.org by ms...@apache.org on 2020/03/09 17:53:19 UTC
[incubator-teaclave] branch develop updated: [scheduler] Implement
pull_task
This is an automated email from the ASF dual-hosted git repository.
mssun pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git
The following commit(s) were added to refs/heads/develop by this push:
new 7a061ef [scheduler] Implement pull_task
7a061ef is described below
commit 7a061ef96c432d4c841f11bbbd99031635cba536
Author: Mingshen Sun <bo...@mssun.me>
AuthorDate: Mon Mar 9 10:36:10 2020 -0700
[scheduler] Implement pull_task
---
.../src/proto/teaclave_scheduler_service.proto | 4 +-
services/proto/src/teaclave_scheduler_service.rs | 18 ++++++--
services/scheduler/enclave/src/service.rs | 34 +++++++++++++-
tests/functional/enclave/src/lib.rs | 2 +
.../enclave/src/teaclave_scheduler_service.rs | 53 ++++++++++++++++++++++
5 files changed, 106 insertions(+), 5 deletions(-)
diff --git a/services/proto/src/proto/teaclave_scheduler_service.proto b/services/proto/src/proto/teaclave_scheduler_service.proto
index 5dc37e7..ea34ec0 100644
--- a/services/proto/src/proto/teaclave_scheduler_service.proto
+++ b/services/proto/src/proto/teaclave_scheduler_service.proto
@@ -9,7 +9,9 @@ message SubscribeResponse {
}
message PullTaskRequest {}
-message PullTaskResponse {}
+message PullTaskResponse {
+ bytes staged_task = 1;
+}
message UpdateTaskRequest {
string staged_task_id = 2;
diff --git a/services/proto/src/teaclave_scheduler_service.rs b/services/proto/src/teaclave_scheduler_service.rs
index b05625a..1d8870a 100644
--- a/services/proto/src/teaclave_scheduler_service.rs
+++ b/services/proto/src/teaclave_scheduler_service.rs
@@ -27,7 +27,16 @@ pub struct SubscribeResponse {
pub struct PullTaskRequest {}
#[into_request(TeaclaveSchedulerResponse::PullTask)]
-pub struct PullTaskResponse {}
+#[derive(Debug)]
+pub struct PullTaskResponse {
+ pub staged_task: StagedTask,
+}
+
+impl PullTaskResponse {
+ pub fn new(staged_task: StagedTask) -> Self {
+ Self { staged_task }
+ }
+}
#[into_request(TeaclaveSchedulerRequest::UpdateTask)]
pub struct UpdateTaskRequest {
@@ -94,14 +103,17 @@ impl std::convert::From<PullTaskRequest> for proto::PullTaskRequest {
impl std::convert::TryFrom<proto::PullTaskResponse> for PullTaskResponse {
type Error = Error;
fn try_from(proto: proto::PullTaskResponse) -> Result<Self> {
- let ret = Self {};
+ let staged_task = StagedTask::from_slice(&proto.staged_task)?;
+ let ret = Self { staged_task };
Ok(ret)
}
}
impl std::convert::From<PullTaskResponse> for proto::PullTaskResponse {
fn from(req: PullTaskResponse) -> Self {
- proto::PullTaskResponse {}
+ proto::PullTaskResponse {
+ staged_task: req.staged_task.to_vec().unwrap(),
+ }
}
}
diff --git a/services/scheduler/enclave/src/service.rs b/services/scheduler/enclave/src/service.rs
index 64371ea..76c0b2c 100644
--- a/services/scheduler/enclave/src/service.rs
+++ b/services/scheduler/enclave/src/service.rs
@@ -36,6 +36,22 @@ use anyhow::anyhow;
use anyhow::Result;
use thiserror::Error;
+#[derive(Error, Debug)]
+pub enum TeaclaveSchedulerError {
+ #[error("scheduler service error")]
+ SchedulerServiceErr,
+ #[error("data error")]
+ DataError,
+ #[error("storage error")]
+ StorageError,
+}
+
+impl From<TeaclaveSchedulerError> for TeaclaveServiceResponseError {
+ fn from(error: TeaclaveSchedulerError) -> Self {
+ TeaclaveServiceResponseError::RequestError(error.to_string())
+ }
+}
+
#[teaclave_service(teaclave_scheduler_service, TeaclaveScheduler, TeaclaveSchedulerError)]
#[derive(Clone)]
pub(crate) struct TeaclaveSchedulerService {
@@ -66,6 +82,18 @@ impl TeaclaveSchedulerService {
Ok(service)
}
+
+ fn pull_staged_task<T: Storable>(&self, key: &[u8]) -> TeaclaveServiceResponseResult<T> {
+ let dequeue_request = DequeueRequest::new(key);
+ let dequeue_response = self
+ .storage_client
+ .clone()
+ .lock()
+ .map_err(|_| TeaclaveSchedulerError::StorageError)?
+ .dequeue(dequeue_request)?;
+ T::from_slice(dequeue_response.value.as_slice())
+ .map_err(|_| TeaclaveSchedulerError::DataError.into())
+ }
}
impl TeaclaveScheduler for TeaclaveSchedulerService {
@@ -74,6 +102,7 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
&self,
request: Request<PublishTaskRequest>,
) -> TeaclaveServiceResponseResult<PublishTaskResponse> {
+ // XXX: Publisher is not implemented
let mut task_queue = self
.task_queue
.lock()
@@ -95,7 +124,10 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
&self,
request: Request<PullTaskRequest>,
) -> TeaclaveServiceResponseResult<PullTaskResponse> {
- unimplemented!()
+ let key = StagedTask::get_queue_key().as_bytes();
+ let staged_task = self.pull_staged_task(key)?;
+ let response = PullTaskResponse::new(staged_task);
+ Ok(response)
}
fn update_task(
diff --git a/tests/functional/enclave/src/lib.rs b/tests/functional/enclave/src/lib.rs
index a87e120..b768b3e 100644
--- a/tests/functional/enclave/src/lib.rs
+++ b/tests/functional/enclave/src/lib.rs
@@ -40,6 +40,7 @@ mod teaclave_authentication_service;
mod teaclave_execution_service;
mod teaclave_frontend_service;
mod teaclave_management_service;
+mod teaclave_scheduler_service;
mod teaclave_storage_service;
#[handle_ecall]
@@ -51,6 +52,7 @@ fn handle_run_test(_: &RunTestInput) -> TeeServiceResult<RunTestOutput> {
teaclave_execution_service::run_tests(),
teaclave_frontend_service::run_tests(),
teaclave_management_service::run_tests(),
+ teaclave_scheduler_service::run_tests(),
);
assert_eq!(ret, true);
diff --git a/tests/functional/enclave/src/teaclave_scheduler_service.rs b/tests/functional/enclave/src/teaclave_scheduler_service.rs
new file mode 100644
index 0000000..a4f0a22
--- /dev/null
+++ b/tests/functional/enclave/src/teaclave_scheduler_service.rs
@@ -0,0 +1,53 @@
+use std::collections::HashMap;
+use std::prelude::v1::*;
+use teaclave_attestation::verifier;
+use teaclave_config::RuntimeConfig;
+use teaclave_config::BUILD_CONFIG;
+use teaclave_proto::teaclave_scheduler_service::*;
+use teaclave_rpc::config::SgxTrustedTlsClientConfig;
+use teaclave_rpc::endpoint::Endpoint;
+use teaclave_types::*;
+
+pub fn run_tests() -> bool {
+ use teaclave_test_utils::*;
+
+ run_tests!(test_pull_task,)
+}
+
+fn get_client(user_id: &str) -> TeaclaveSchedulerClient {
+ let runtime_config = RuntimeConfig::from_toml("runtime.config.toml").expect("runtime");
+ let enclave_info =
+ EnclaveInfo::from_bytes(&runtime_config.audit.enclave_info_bytes.as_ref().unwrap());
+ let enclave_attr = enclave_info
+ .get_enclave_attr("teaclave_scheduler_service")
+ .expect("scheduler");
+ let config = SgxTrustedTlsClientConfig::new().attestation_report_verifier(
+ vec![enclave_attr],
+ BUILD_CONFIG.as_root_ca_cert,
+ verifier::universal_quote_verifier,
+ );
+
+ let channel = Endpoint::new(
+ &runtime_config
+ .internal_endpoints
+ .scheduler
+ .advertised_address,
+ )
+ .config(config)
+ .connect()
+ .unwrap();
+
+ let mut metadata = HashMap::new();
+ metadata.insert("id".to_string(), user_id.to_string());
+ metadata.insert("token".to_string(), "".to_string());
+
+ TeaclaveSchedulerClient::new_with_metadata(channel, metadata).unwrap()
+}
+
+fn test_pull_task() {
+ let mut client = get_client("mock_user");
+ let request = PullTaskRequest {};
+ let response = client.pull_task(request);
+ log::debug!("response: {:?}", response);
+ assert!(response.is_ok());
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@teaclave.apache.org
For additional commands, e-mail: commits-help@teaclave.apache.org