You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@teaclave.apache.org by ms...@apache.org on 2020/03/17 17:10:52 UTC
[incubator-teaclave] 01/03: [services] Rewrite and coordinate
scheduler and execution serivces
This is an automated email from the ASF dual-hosted git repository.
mssun pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git
commit 9073e05f505220b1da535f4edcec5b7a071a36f5
Author: Mingshen Sun <bo...@mssun.me>
AuthorDate: Fri Mar 13 16:49:24 2020 -0700
[services] Rewrite and coordinate scheduler and execution serivces
---
binder/src/macros.rs | 6 +-
cmake/scripts/test.sh | 2 +-
services/execution/enclave/src/lib.rs | 70 +++++-----
services/execution/enclave/src/service.rs | 144 +++++++++++++++------
services/management/enclave/src/task.rs | 2 +
services/proto/build.rs | 1 -
services/proto/src/lib.rs | 5 -
.../src/proto/teaclave_execution_service.proto | 32 -----
.../src/proto/teaclave_scheduler_service.proto | 18 ++-
services/proto/src/teaclave_execution_service.rs | 139 --------------------
services/proto/src/teaclave_scheduler_service.rs | 110 +++++++++++++---
services/scheduler/enclave/src/service.rs | 52 +++++++-
tests/functional/enclave/src/lib.rs | 2 -
.../enclave/src/teaclave_execution_service.rs | 75 -----------
.../enclave/src/teaclave_frontend_service.rs | 46 ++-----
.../enclave/src/teaclave_scheduler_service.rs | 20 ++-
tests/unit/enclave/src/lib.rs | 3 -
types/src/task.rs | 2 +
types/src/worker.rs | 5 +
utils/service_enclave_utils/src/lib.rs | 20 +++
20 files changed, 355 insertions(+), 399 deletions(-)
diff --git a/binder/src/macros.rs b/binder/src/macros.rs
index effb9ee..d81fa1e 100644
--- a/binder/src/macros.rs
+++ b/binder/src/macros.rs
@@ -94,13 +94,13 @@ macro_rules! register_ecall_handler {
// The last argument could be either * mut usize, or &mut usize
let input_buf: &[u8] = unsafe { std::slice::from_raw_parts(in_buf, in_len) };
- trace!("tee receive cmd: {:x}, input_buf = {:?}", cmd, input_buf);
+ log::trace!("tee receive cmd: {:x}, input_buf = {:?}", cmd, input_buf);
let inner_vec = unsafe {
match ecall_ipc_lib_dispatcher(cmd, input_buf) {
Ok(out) => out,
Err(e) => {
- error!("tee execute cmd: {:x}, error: {}", cmd, e);
+ log::error!("tee execute cmd: {:x}, error: {}", cmd, e);
return teaclave_types::EnclaveStatus(1);
}
}
@@ -112,7 +112,7 @@ macro_rules! register_ecall_handler {
*out_len = inner_len;
if inner_len > out_max {
- debug!("tee before copy out_buf check: out_max={:x} < inner={:x}", out_max, inner_len);
+ log::debug!("tee before copy out_buf check: out_max={:x} < inner={:x}", out_max, inner_len);
return teaclave_types::EnclaveStatus(0x0000_000c);
}
diff --git a/cmake/scripts/test.sh b/cmake/scripts/test.sh
index 6993e42..1b320c1 100755
--- a/cmake/scripts/test.sh
+++ b/cmake/scripts/test.sh
@@ -73,7 +73,7 @@ run_functional_tests() {
sleep 3 # wait for management service and scheduler_service
./teaclave_access_control_service &
./teaclave_frontend_service &
- ./teaclave_execution_service &
+ # ./teaclave_execution_service &
popd
sleep 3 # wait for other services
./teaclave_functional_tests
diff --git a/services/execution/enclave/src/lib.rs b/services/execution/enclave/src/lib.rs
index 3584f92..4df60bc 100644
--- a/services/execution/enclave/src/lib.rs
+++ b/services/execution/enclave/src/lib.rs
@@ -22,55 +22,49 @@ extern crate sgx_tstd as std;
#[cfg(feature = "mesalock_sgx")]
use std::prelude::v1::*;
-#[macro_use]
-extern crate log;
-
-use teaclave_attestation::{AttestationConfig, RemoteAttestation};
+use teaclave_attestation::verifier;
use teaclave_binder::proto::{
ECallCommand, FinalizeEnclaveInput, FinalizeEnclaveOutput, InitEnclaveInput, InitEnclaveOutput,
StartServiceInput, StartServiceOutput,
};
use teaclave_binder::{handle_ecall, register_ecall_handler};
use teaclave_config::RuntimeConfig;
-use teaclave_proto::teaclave_execution_service::{
- TeaclaveExecutionRequest, TeaclaveExecutionResponse,
-};
-use teaclave_rpc::config::SgxTrustedTlsServerConfig;
-use teaclave_rpc::server::SgxTrustedTlsServer;
+use teaclave_config::BUILD_CONFIG;
+use teaclave_service_enclave_utils::create_trusted_scheduler_endpoint;
use teaclave_service_enclave_utils::ServiceEnclave;
-use teaclave_types::{TeeServiceError, TeeServiceResult};
+use teaclave_types::{EnclaveInfo, TeeServiceError, TeeServiceResult};
mod service;
+const AS_ROOT_CA_CERT: &[u8] = BUILD_CONFIG.as_root_ca_cert;
+const AUDITOR_PUBLIC_KEYS_LEN: usize = BUILD_CONFIG.auditor_public_keys.len();
+const AUDITOR_PUBLIC_KEYS: &[&[u8]; AUDITOR_PUBLIC_KEYS_LEN] = BUILD_CONFIG.auditor_public_keys;
+
fn start_service(config: &RuntimeConfig) -> anyhow::Result<()> {
- let listen_address = config.internal_endpoints.execution.listen_address;
- let as_config = &config.attestation;
- let attestation_config = AttestationConfig::new(
- &as_config.algorithm,
- &as_config.url,
- &as_config.key,
- &as_config.spid,
+ let enclave_info = EnclaveInfo::verify_and_new(
+ config
+ .audit
+ .enclave_info_bytes
+ .as_ref()
+ .expect("enclave_info"),
+ AUDITOR_PUBLIC_KEYS,
+ config
+ .audit
+ .auditor_signatures_bytes
+ .as_ref()
+ .expect("auditor signatures"),
+ )?;
+ let scheduler_service_address = &config.internal_endpoints.scheduler.advertised_address;
+ let scheduler_service_endpoint = create_trusted_scheduler_endpoint(
+ &scheduler_service_address,
+ &enclave_info,
+ AS_ROOT_CA_CERT,
+ verifier::universal_quote_verifier,
);
- let attested_tls_config = RemoteAttestation::new()
- .config(attestation_config)
- .generate_and_endorse()
- .unwrap()
- .attested_tls_config()
- .unwrap();
- let server_config =
- SgxTrustedTlsServerConfig::from_attested_tls_config(attested_tls_config).unwrap();
- let mut server =
- SgxTrustedTlsServer::<TeaclaveExecutionResponse, TeaclaveExecutionRequest>::new(
- listen_address,
- server_config,
- );
- match server.start(service::TeaclaveExecutionService::new()) {
- Ok(_) => (),
- Err(e) => {
- error!("Service exit, error: {}.", e);
- }
- }
+ let mut service = service::TeaclaveExecutionService::new(scheduler_service_endpoint).unwrap();
+ let _ = service.start();
+
Ok(())
}
@@ -101,8 +95,10 @@ register_ecall_handler!(
#[cfg(feature = "enclave_unit_test")]
pub mod tests {
+ use super::*;
+ use teaclave_test_utils::*;
pub fn run_tests() -> bool {
- true
+ run_tests!(service::tests::test_invoke_function)
}
}
diff --git a/services/execution/enclave/src/service.rs b/services/execution/enclave/src/service.rs
index ce451ef..f939c15 100644
--- a/services/execution/enclave/src/service.rs
+++ b/services/execution/enclave/src/service.rs
@@ -18,64 +18,128 @@
#[cfg(feature = "mesalock_sgx")]
use std::prelude::v1::*;
-use std::sync::Arc;
+use std::sync::{Arc, SgxMutex as Mutex};
-use teaclave_proto::teaclave_execution_service::{
- StagedFunctionExecuteRequest, StagedFunctionExecuteResponse, TeaclaveExecution,
-};
-use teaclave_service_enclave_utils::teaclave_service;
-use teaclave_types::{TeaclaveServiceResponseError, TeaclaveServiceResponseResult};
-
-use teaclave_rpc::Request;
+use teaclave_proto::teaclave_scheduler_service::*;
+use teaclave_rpc::endpoint::Endpoint;
+use teaclave_types::{StagedTask, WorkerInvocationResult};
use teaclave_worker::Worker;
-use thiserror::Error;
-
-#[derive(Error, Debug)]
-pub enum TeaclaveExecutionError {
- #[error("woker running spec error")]
- WorkerRunningSpecError,
-}
-impl From<TeaclaveExecutionError> for TeaclaveServiceResponseError {
- fn from(error: TeaclaveExecutionError) -> Self {
- TeaclaveServiceResponseError::RequestError(error.to_string())
- }
-}
+use anyhow::Result;
-#[teaclave_service(teaclave_execution_service, TeaclaveExecution, TeaclaveExecutionError)]
#[derive(Clone)]
pub(crate) struct TeaclaveExecutionService {
worker: Arc<Worker>,
+ scheduler_client: Arc<Mutex<TeaclaveSchedulerClient>>,
}
impl TeaclaveExecutionService {
- pub(crate) fn new() -> Self {
- TeaclaveExecutionService {
+ pub(crate) fn new(scheduler_service_endpoint: Endpoint) -> Result<Self> {
+ let mut i = 0;
+ let channel = loop {
+ match scheduler_service_endpoint.connect() {
+ Ok(channel) => break channel,
+ Err(_) => {
+ anyhow::ensure!(i < 3, "failed to connect to storage service");
+ log::debug!("Failed to connect to storage service, retry {}", i);
+ i += 1;
+ }
+ }
+ std::thread::sleep(std::time::Duration::from_secs(1));
+ };
+ let scheduler_client = Arc::new(Mutex::new(TeaclaveSchedulerClient::new(channel)?));
+ Ok(TeaclaveExecutionService {
worker: Arc::new(Worker::default()),
- }
+ scheduler_client,
+ })
}
-}
-impl TeaclaveExecution for TeaclaveExecutionService {
- fn invoke_function(
- &self,
- request: Request<StagedFunctionExecuteRequest>,
- ) -> TeaclaveServiceResponseResult<StagedFunctionExecuteResponse> {
- let request = request.message;
- match self.worker.invoke_function(request.into()) {
- Ok(summary) => {
- info!("[+] Invoking function ok: {}", summary);
- Ok(summary.into())
- }
- Err(e) => {
- error!("[+] Invoking function failed: {}", e);
- Err(TeaclaveExecutionError::WorkerRunningSpecError.into())
- }
+ pub(crate) fn start(&mut self) -> Result<()> {
+ loop {
+ std::thread::sleep(std::time::Duration::from_secs(3));
+ let scheduler_client = self.scheduler_client.clone();
+ let mut client = match scheduler_client.lock() {
+ Ok(client) => client,
+ Err(e) => {
+ log::error!("Error: {:?}", e);
+ continue;
+ }
+ };
+
+ let request = PullTaskRequest {};
+ log::debug!("pull_task");
+ let response = match client.pull_task(request) {
+ Ok(response) => response,
+ Err(e) => {
+ log::error!("Error: {:?}", e);
+ continue;
+ }
+ };
+ log::debug!("response: {:?}", response);
+ let result = self.invoke_task(response.staged_task);
+ self.update_task(result);
}
}
+
+ fn invoke_task(&mut self, _task: StagedTask) -> WorkerInvocationResult {
+ // TODO: convert task to function, i.e., needs help from agent
+ unimplemented!()
+ }
+
+ fn update_task(&mut self, _result: WorkerInvocationResult) {
+ unimplemented!()
+ }
}
#[cfg(test_mode)]
mod test_mode {
use super::*;
}
+
+#[cfg(feature = "enclave_unit_test")]
+pub mod tests {
+ use super::*;
+ use std::convert::TryInto;
+ use teaclave_types::*;
+
+ pub fn test_invoke_function() {
+ let function_args = TeaclaveFunctionArguments::new(&hashmap!(
+ "feature_size" => "4",
+ "max_depth" => "4",
+ "iterations" => "100",
+ "shrinkage" => "0.1",
+ "feature_sample_ratio" => "1.0",
+ "data_sample_ratio" => "1.0",
+ "min_leaf_size" => "1",
+ "loss" => "LAD",
+ "training_optimization_level" => "2"
+ ));
+
+ let plain_input = "fixtures/functions/gbdt_training/train.txt";
+ let enc_output = "fixtures/functions/gbdt_training/model.enc.out";
+
+ let input_info =
+ TeaclaveWorkerInputFileInfo::create_with_plaintext_file(plain_input).unwrap();
+ let input_files = TeaclaveWorkerFileRegistry::new(hashmap!(
+ "training_data".to_string() => input_info));
+
+ let output_info =
+ TeaclaveWorkerOutputFileInfo::new(enc_output, TeaclaveFileRootKey128::default());
+ let output_files = TeaclaveWorkerFileRegistry::new(hashmap!(
+ "trained_model".to_string() => output_info));
+ let invocation = WorkerInvocation {
+ runtime_name: "default".to_string(),
+ executor_type: "native".try_into().unwrap(),
+ function_name: "gbdt_training".to_string(),
+ function_payload: String::new(),
+ function_args,
+ input_files,
+ output_files,
+ };
+
+ let worker = Worker::default();
+ let result = worker.invoke_function(invocation);
+ assert!(result.is_ok());
+ log::debug!("summary: {:?}", result.unwrap());
+ }
+}
diff --git a/services/management/enclave/src/task.rs b/services/management/enclave/src/task.rs
index d6e3729..2d3a584 100644
--- a/services/management/enclave/src/task.rs
+++ b/services/management/enclave/src/task.rs
@@ -59,6 +59,8 @@ pub(crate) fn create_task(
approved_user_list: HashSet::new(),
input_map: HashMap::new(),
output_map: HashMap::new(),
+ return_value: None,
+ output_file_hash: HashMap::new(),
status: TaskStatus::Created,
};
// check arguments
diff --git a/services/proto/build.rs b/services/proto/build.rs
index b1f4897..fc17b99 100644
--- a/services/proto/build.rs
+++ b/services/proto/build.rs
@@ -26,7 +26,6 @@ fn main() {
"src/proto/teaclave_authentication_service.proto",
"src/proto/teaclave_common.proto",
"src/proto/teaclave_storage_service.proto",
- "src/proto/teaclave_execution_service.proto",
"src/proto/teaclave_frontend_service.proto",
"src/proto/teaclave_management_service.proto",
"src/proto/teaclave_scheduler_service.proto",
diff --git a/services/proto/src/lib.rs b/services/proto/src/lib.rs
index b7a3af2..18c285f 100644
--- a/services/proto/src/lib.rs
+++ b/services/proto/src/lib.rs
@@ -22,7 +22,6 @@ extern crate sgx_tstd as std;
pub mod teaclave_access_control_service;
pub mod teaclave_authentication_service;
pub mod teaclave_common;
-pub mod teaclave_execution_service;
pub mod teaclave_frontend_service;
pub mod teaclave_management_service;
pub mod teaclave_scheduler_service;
@@ -46,10 +45,6 @@ pub mod teaclave_storage_service_proto {
include_proto!("teaclave_storage_service_proto");
}
-pub mod teaclave_execution_service_proto {
- include_proto!("teaclave_execution_service_proto");
-}
-
pub mod teaclave_frontend_service_proto {
include_proto!("teaclave_frontend_service_proto");
}
diff --git a/services/proto/src/proto/teaclave_execution_service.proto b/services/proto/src/proto/teaclave_execution_service.proto
deleted file mode 100644
index f0a57ba..0000000
--- a/services/proto/src/proto/teaclave_execution_service.proto
+++ /dev/null
@@ -1,32 +0,0 @@
-syntax = "proto3";
-package teaclave_execution_service_proto;
-
-import "teaclave_common.proto";
-
-message WorkerInputFileInfo {
- string path = 1;
- teaclave_common_proto.FileCryptoInfo crypto_info = 2;
-}
-
-message WorkerOutputFileInfo {
- string path = 1;
- teaclave_common_proto.FileCryptoInfo crypto_info = 2;
-}
-
-message StagedFunctionExecuteRequest {
- string runtime_name = 1;
- string executor_type = 2;
- string function_name = 3;
- string function_payload = 4;
- map<string, string> function_args = 11;
- map<string, WorkerInputFileInfo> input_files = 21;
- map<string, WorkerOutputFileInfo> output_files = 22;
-}
-
-message StagedFunctionExecuteResponse {
- string summary = 1;
-}
-
-service TeaclaveExecution {
- rpc InvokeFunction(StagedFunctionExecuteRequest) returns (StagedFunctionExecuteResponse);
-}
\ No newline at end of file
diff --git a/services/proto/src/proto/teaclave_scheduler_service.proto b/services/proto/src/proto/teaclave_scheduler_service.proto
index ea34ec0..151efe7 100644
--- a/services/proto/src/proto/teaclave_scheduler_service.proto
+++ b/services/proto/src/proto/teaclave_scheduler_service.proto
@@ -13,10 +13,18 @@ message PullTaskResponse {
bytes staged_task = 1;
}
-message UpdateTaskRequest {
- string staged_task_id = 2;
+message UpdateTaskStatusRequest {
+ string task_id = 1;
+ teaclave_common_proto.TaskStatus task_status = 2;
}
-message UpdateTaskResponse {}
+message UpdateTaskStatusResponse {}
+
+message UpdateTaskResultRequest {
+ string task_id = 1;
+ bytes return_value = 2;
+ map<string, string> output_file_hash = 3;
+}
+message UpdateTaskResultResponse {}
message PublishTaskRequest {
bytes staged_task = 1;
@@ -30,5 +38,7 @@ service TeaclaveScheduler {
// Subscriber
rpc Subscribe(SubscribeRequest) returns (SubscribeResponse);
rpc PullTask(PullTaskRequest) returns (PullTaskResponse);
- rpc UpdateTask(UpdateTaskRequest) returns (UpdateTaskResponse);
+
+ rpc UpdateTaskStatus(UpdateTaskStatusRequest) returns (UpdateTaskStatusResponse);
+ rpc UpdateTaskResult(UpdateTaskResultRequest) returns (UpdateTaskResultResponse);
}
diff --git a/services/proto/src/teaclave_execution_service.rs b/services/proto/src/teaclave_execution_service.rs
deleted file mode 100644
index 4713c1b..0000000
--- a/services/proto/src/teaclave_execution_service.rs
+++ /dev/null
@@ -1,139 +0,0 @@
-use std::prelude::v1::*;
-
-use anyhow::{anyhow, Error, Result};
-use core::convert::TryInto;
-use teaclave_rpc::into_request;
-use teaclave_types::{
- TeaclaveFunctionArguments, TeaclaveWorkerInputFileInfo, TeaclaveWorkerOutputFileInfo,
- WorkerInvocation,
-};
-
-use crate::teaclave_execution_service_proto as proto;
-pub use proto::TeaclaveExecution;
-pub use proto::TeaclaveExecutionClient;
-pub use proto::TeaclaveExecutionRequest;
-pub use proto::TeaclaveExecutionResponse;
-
-#[into_request(TeaclaveExecutionRequest::InvokeFunction)]
-#[derive(Debug)]
-pub struct StagedFunctionExecuteRequest {
- pub invocation: WorkerInvocation,
-}
-
-#[into_request(TeaclaveExecutionResponse::InvokeFunction)]
-#[derive(Debug)]
-pub struct StagedFunctionExecuteResponse {
- pub summary: std::string::String,
-}
-
-impl std::convert::TryFrom<proto::WorkerInputFileInfo> for TeaclaveWorkerInputFileInfo {
- type Error = Error;
- fn try_from(proto: proto::WorkerInputFileInfo) -> Result<Self> {
- let path = std::path::Path::new(&proto.path).to_path_buf();
- let crypto_info = proto
- .crypto_info
- .ok_or_else(|| anyhow!("Missing field: crypto_info"))?
- .try_into()?;
- Ok(TeaclaveWorkerInputFileInfo { path, crypto_info })
- }
-}
-
-impl std::convert::TryFrom<proto::WorkerOutputFileInfo> for TeaclaveWorkerOutputFileInfo {
- type Error = Error;
- fn try_from(proto: proto::WorkerOutputFileInfo) -> Result<Self> {
- let path = std::path::Path::new(&proto.path).to_path_buf();
- let crypto_info = proto
- .crypto_info
- .ok_or_else(|| anyhow!("Missing field: crypto_info"))?
- .try_into()?;
- Ok(TeaclaveWorkerOutputFileInfo { path, crypto_info })
- }
-}
-
-// For server side
-impl std::convert::TryFrom<proto::StagedFunctionExecuteRequest> for StagedFunctionExecuteRequest {
- type Error = Error;
-
- fn try_from(proto: proto::StagedFunctionExecuteRequest) -> Result<Self> {
- let ret = Self {
- invocation: WorkerInvocation {
- runtime_name: proto.runtime_name,
- executor_type: proto.executor_type.as_str().try_into()?,
- function_name: proto.function_name,
- function_payload: proto.function_payload,
- function_args: TeaclaveFunctionArguments {
- args: proto.function_args,
- },
- input_files: proto.input_files.try_into()?,
- output_files: proto.output_files.try_into()?,
- },
- };
-
- Ok(ret)
- }
-}
-
-impl std::convert::TryFrom<proto::StagedFunctionExecuteResponse> for StagedFunctionExecuteResponse {
- type Error = Error;
-
- fn try_from(proto: proto::StagedFunctionExecuteResponse) -> Result<Self> {
- let ret = Self {
- summary: proto.summary,
- };
-
- Ok(ret)
- }
-}
-
-// For client side
-impl std::convert::From<TeaclaveWorkerInputFileInfo> for proto::WorkerInputFileInfo {
- fn from(info: TeaclaveWorkerInputFileInfo) -> Self {
- proto::WorkerInputFileInfo {
- path: info.path.to_string_lossy().to_string(),
- crypto_info: Some(info.crypto_info.into()),
- }
- }
-}
-
-impl std::convert::From<TeaclaveWorkerOutputFileInfo> for proto::WorkerOutputFileInfo {
- fn from(info: TeaclaveWorkerOutputFileInfo) -> Self {
- proto::WorkerOutputFileInfo {
- path: info.path.to_string_lossy().to_string(),
- crypto_info: Some(info.crypto_info.into()),
- }
- }
-}
-
-impl From<StagedFunctionExecuteRequest> for WorkerInvocation {
- fn from(request: StagedFunctionExecuteRequest) -> Self {
- request.invocation
- }
-}
-
-impl From<StagedFunctionExecuteRequest> for proto::StagedFunctionExecuteRequest {
- fn from(request: StagedFunctionExecuteRequest) -> Self {
- Self {
- runtime_name: request.invocation.runtime_name,
- executor_type: request.invocation.executor_type.to_string(),
- function_name: request.invocation.function_name,
- function_payload: request.invocation.function_payload,
- function_args: request.invocation.function_args.args,
- input_files: request.invocation.input_files.into(),
- output_files: request.invocation.output_files.into(),
- }
- }
-}
-
-impl From<StagedFunctionExecuteResponse> for proto::StagedFunctionExecuteResponse {
- fn from(response: StagedFunctionExecuteResponse) -> Self {
- Self {
- summary: response.summary,
- }
- }
-}
-
-impl From<String> for StagedFunctionExecuteResponse {
- fn from(summary: String) -> Self {
- Self { summary }
- }
-}
diff --git a/services/proto/src/teaclave_scheduler_service.rs b/services/proto/src/teaclave_scheduler_service.rs
index 1d8870a..2677367 100644
--- a/services/proto/src/teaclave_scheduler_service.rs
+++ b/services/proto/src/teaclave_scheduler_service.rs
@@ -4,7 +4,7 @@
use std::collections::HashMap;
use std::prelude::v1::*;
-use crate::teaclave_execution_service::StagedFunctionExecuteRequest;
+use crate::teaclave_common::{i32_from_task_status, i32_to_task_status};
use crate::teaclave_scheduler_service_proto as proto;
use anyhow::{Error, Result};
use core::convert::TryInto;
@@ -13,7 +13,7 @@ pub use proto::TeaclaveSchedulerClient;
pub use proto::TeaclaveSchedulerRequest;
pub use proto::TeaclaveSchedulerResponse;
use teaclave_rpc::into_request;
-use teaclave_types::StagedTask;
+use teaclave_types::{StagedTask, TaskStatus};
#[into_request(TeaclaveSchedulerRequest::Subscribe)]
pub struct SubscribeRequest {}
@@ -38,13 +38,46 @@ impl PullTaskResponse {
}
}
-#[into_request(TeaclaveSchedulerRequest::UpdateTask)]
-pub struct UpdateTaskRequest {
- pub staged_task_id: String,
+#[into_request(TeaclaveSchedulerRequest::UpdateTaskResult)]
+pub struct UpdateTaskResultRequest {
+ pub task_id: String,
+ pub return_value: Vec<u8>,
+ pub output_file_hash: HashMap<String, String>,
}
-#[into_request(TeaclaveSchedulerResponse::UpdateTask)]
-pub struct UpdateTaskResponse {}
+impl UpdateTaskResultRequest {
+ pub fn new(
+ task_id: impl Into<String>,
+ return_value: &[u8],
+ output_file_hash: HashMap<String, String>,
+ ) -> Self {
+ Self {
+ task_id: task_id.into(),
+ return_value: return_value.to_vec(),
+ output_file_hash,
+ }
+ }
+}
+
+#[into_request(TeaclaveSchedulerResponse::UpdateTaskResult)]
+pub struct UpdateTaskResultResponse {}
+
+#[into_request(TeaclaveSchedulerRequest::UpdateTaskStatus)]
+pub struct UpdateTaskStatusRequest {
+ pub task_id: String,
+ pub task_status: TaskStatus,
+}
+
+impl UpdateTaskStatusRequest {
+ pub fn new(task_id: impl Into<String>, task_status: TaskStatus) -> Self {
+ Self {
+ task_id: task_id.into(),
+ task_status,
+ }
+ }
+}
+#[into_request(TeaclaveSchedulerResponse::UpdateTaskStatus)]
+pub struct UpdateTaskStatusResponse {}
#[into_request(TeaclaveSchedulerRequest::PublishTask)]
pub struct PublishTaskRequest {
@@ -116,36 +149,75 @@ impl std::convert::From<PullTaskResponse> for proto::PullTaskResponse {
}
}
}
+impl std::convert::TryFrom<proto::UpdateTaskResultRequest> for UpdateTaskResultRequest {
+ type Error = Error;
+ fn try_from(proto: proto::UpdateTaskResultRequest) -> Result<Self> {
+ let ret = Self {
+ task_id: proto.task_id,
+ return_value: proto.return_value,
+ output_file_hash: proto.output_file_hash,
+ };
+ Ok(ret)
+ }
+}
+
+impl std::convert::From<UpdateTaskResultRequest> for proto::UpdateTaskResultRequest {
+ fn from(req: UpdateTaskResultRequest) -> Self {
+ proto::UpdateTaskResultRequest {
+ task_id: req.task_id,
+ return_value: req.return_value,
+ output_file_hash: req.output_file_hash,
+ }
+ }
+}
+
+impl std::convert::TryFrom<proto::UpdateTaskResultResponse> for UpdateTaskResultResponse {
+ type Error = Error;
+ fn try_from(proto: proto::UpdateTaskResultResponse) -> Result<Self> {
+ let ret = Self {};
+ Ok(ret)
+ }
+}
+
+impl std::convert::From<UpdateTaskResultResponse> for proto::UpdateTaskResultResponse {
+ fn from(req: UpdateTaskResultResponse) -> Self {
+ proto::UpdateTaskResultResponse {}
+ }
+}
-impl std::convert::TryFrom<proto::UpdateTaskRequest> for UpdateTaskRequest {
+impl std::convert::TryFrom<proto::UpdateTaskStatusRequest> for UpdateTaskStatusRequest {
type Error = Error;
- fn try_from(proto: proto::UpdateTaskRequest) -> Result<Self> {
+ fn try_from(proto: proto::UpdateTaskStatusRequest) -> Result<Self> {
+ let task_status = i32_to_task_status(proto.task_status)?;
let ret = Self {
- staged_task_id: proto.staged_task_id,
+ task_id: proto.task_id,
+ task_status,
};
Ok(ret)
}
}
-impl std::convert::From<UpdateTaskRequest> for proto::UpdateTaskRequest {
- fn from(req: UpdateTaskRequest) -> Self {
- proto::UpdateTaskRequest {
- staged_task_id: req.staged_task_id,
+impl std::convert::From<UpdateTaskStatusRequest> for proto::UpdateTaskStatusRequest {
+ fn from(req: UpdateTaskStatusRequest) -> Self {
+ let task_status = i32_from_task_status(req.task_status);
+ proto::UpdateTaskStatusRequest {
+ task_id: req.task_id,
+ task_status,
}
}
}
-impl std::convert::TryFrom<proto::UpdateTaskResponse> for UpdateTaskResponse {
+impl std::convert::TryFrom<proto::UpdateTaskStatusResponse> for UpdateTaskStatusResponse {
type Error = Error;
- fn try_from(proto: proto::UpdateTaskResponse) -> Result<Self> {
+ fn try_from(proto: proto::UpdateTaskStatusResponse) -> Result<Self> {
let ret = Self {};
Ok(ret)
}
}
-impl std::convert::From<UpdateTaskResponse> for proto::UpdateTaskResponse {
- fn from(req: UpdateTaskResponse) -> Self {
- proto::UpdateTaskResponse {}
+impl std::convert::From<UpdateTaskStatusResponse> for proto::UpdateTaskStatusResponse {
+ fn from(req: UpdateTaskStatusResponse) -> Self {
+ proto::UpdateTaskStatusResponse {}
}
}
diff --git a/services/scheduler/enclave/src/service.rs b/services/scheduler/enclave/src/service.rs
index 76c0b2c..5a843d7 100644
--- a/services/scheduler/enclave/src/service.rs
+++ b/services/scheduler/enclave/src/service.rs
@@ -29,7 +29,7 @@ use teaclave_rpc::endpoint::Endpoint;
use teaclave_rpc::Request;
use teaclave_service_enclave_utils::teaclave_service;
use teaclave_types::{
- StagedTask, Storable, TeaclaveServiceResponseError, TeaclaveServiceResponseResult,
+ StagedTask, Storable, Task, TeaclaveServiceResponseError, TeaclaveServiceResponseResult,
};
use anyhow::anyhow;
@@ -94,6 +94,31 @@ impl TeaclaveSchedulerService {
T::from_slice(dequeue_response.value.as_slice())
.map_err(|_| TeaclaveSchedulerError::DataError.into())
}
+
+ fn get_task(&self, key: &str) -> Result<Task> {
+ let key = format!("{}-{}", Task::key_prefix(), key);
+ let get_request = GetRequest::new(key.into_bytes());
+ let get_response = self
+ .storage_client
+ .clone()
+ .lock()
+ .map_err(|_| anyhow!("Cannot lock storage client"))?
+ .get(get_request)?;
+ Task::from_slice(get_response.value.as_slice())
+ }
+
+ fn put_task(&self, item: &impl Storable) -> Result<()> {
+ let k = item.key();
+ let v = item.to_vec()?;
+ let put_request = PutRequest::new(k.as_slice(), v.as_slice());
+ let _put_response = self
+ .storage_client
+ .clone()
+ .lock()
+ .map_err(|_| anyhow!("Cannot lock storage client"))?
+ .put(put_request)?;
+ Ok(())
+ }
}
impl TeaclaveScheduler for TeaclaveSchedulerService {
@@ -117,6 +142,7 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
&self,
request: Request<SubscribeRequest>,
) -> TeaclaveServiceResponseResult<SubscribeResponse> {
+ // TODO: subscribe a specific topic
unimplemented!()
}
@@ -130,11 +156,27 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
Ok(response)
}
- fn update_task(
+ fn update_task_status(
&self,
- request: Request<UpdateTaskRequest>,
- ) -> TeaclaveServiceResponseResult<UpdateTaskResponse> {
- unimplemented!()
+ request: Request<UpdateTaskStatusRequest>,
+ ) -> TeaclaveServiceResponseResult<UpdateTaskStatusResponse> {
+ let request = request.message;
+ let mut task = self.get_task(&request.task_id)?;
+ task.status = request.task_status;
+ self.put_task(&task)?;
+ Ok(UpdateTaskStatusResponse {})
+ }
+
+ fn update_task_result(
+ &self,
+ request: Request<UpdateTaskResultRequest>,
+ ) -> TeaclaveServiceResponseResult<UpdateTaskResultResponse> {
+ let request = request.message;
+ let mut task = self.get_task(&request.task_id)?;
+ task.return_value = Some(request.return_value);
+ task.output_file_hash = request.output_file_hash;
+ self.put_task(&task)?;
+ Ok(UpdateTaskResultResponse {})
}
}
diff --git a/tests/functional/enclave/src/lib.rs b/tests/functional/enclave/src/lib.rs
index b768b3e..891c552 100644
--- a/tests/functional/enclave/src/lib.rs
+++ b/tests/functional/enclave/src/lib.rs
@@ -37,7 +37,6 @@ use teaclave_types::TeeServiceResult;
mod teaclave_access_control_service;
mod teaclave_authentication_service;
-mod teaclave_execution_service;
mod teaclave_frontend_service;
mod teaclave_management_service;
mod teaclave_scheduler_service;
@@ -49,7 +48,6 @@ fn handle_run_test(_: &RunTestInput) -> TeeServiceResult<RunTestOutput> {
teaclave_access_control_service::run_tests(),
teaclave_authentication_service::run_tests(),
teaclave_storage_service::run_tests(),
- teaclave_execution_service::run_tests(),
teaclave_frontend_service::run_tests(),
teaclave_management_service::run_tests(),
teaclave_scheduler_service::run_tests(),
diff --git a/tests/functional/enclave/src/teaclave_execution_service.rs b/tests/functional/enclave/src/teaclave_execution_service.rs
deleted file mode 100644
index e86019a..0000000
--- a/tests/functional/enclave/src/teaclave_execution_service.rs
+++ /dev/null
@@ -1,75 +0,0 @@
-use std::prelude::v1::*;
-
-use std::convert::TryInto;
-use teaclave_proto::teaclave_execution_service::*;
-use teaclave_rpc::endpoint::Endpoint;
-
-use teaclave_config::RuntimeConfig;
-use teaclave_types::hashmap;
-use teaclave_types::TeaclaveFileRootKey128;
-use teaclave_types::TeaclaveFunctionArguments;
-use teaclave_types::TeaclaveWorkerFileRegistry;
-use teaclave_types::TeaclaveWorkerInputFileInfo;
-use teaclave_types::TeaclaveWorkerOutputFileInfo;
-use teaclave_types::WorkerInvocation;
-
-pub fn run_tests() -> bool {
- use teaclave_test_utils::*;
- run_tests!(test_invoke_success,)
-}
-
-fn get_client() -> TeaclaveExecutionClient {
- let runtime_config = RuntimeConfig::from_toml("runtime.config.toml").expect("runtime");
- let channel = Endpoint::new(
- &runtime_config
- .internal_endpoints
- .execution
- .advertised_address,
- )
- .connect()
- .expect("channel");
- TeaclaveExecutionClient::new(channel).expect("client")
-}
-
-fn test_invoke_success() {
- let mut client = get_client();
-
- let function_args = TeaclaveFunctionArguments::new(&hashmap!(
- "feature_size" => "4",
- "max_depth" => "4",
- "iterations" => "100",
- "shrinkage" => "0.1",
- "feature_sample_ratio" => "1.0",
- "data_sample_ratio" => "1.0",
- "min_leaf_size" => "1",
- "loss" => "LAD",
- "training_optimization_level" => "2"
- ));
-
- let plain_input = "fixtures/functions/gbdt_training/train.txt";
- let enc_output = "fixtures/functions/gbdt_training/model.enc.out";
-
- let input_info = TeaclaveWorkerInputFileInfo::create_with_plaintext_file(plain_input).unwrap();
- let input_files = TeaclaveWorkerFileRegistry::new(hashmap!(
- "training_data".to_string() => input_info));
-
- let output_info =
- TeaclaveWorkerOutputFileInfo::new(enc_output, TeaclaveFileRootKey128::default());
- let output_files = TeaclaveWorkerFileRegistry::new(hashmap!(
- "trained_model".to_string() => output_info));
-
- let request = StagedFunctionExecuteRequest {
- invocation: WorkerInvocation {
- runtime_name: "default".to_string(),
- executor_type: "native".try_into().unwrap(),
- function_name: "gbdt_training".to_string(),
- function_payload: String::new(),
- function_args,
- input_files,
- output_files,
- },
- };
-
- let response_result = client.invoke_function(request);
- assert!(response_result.is_ok());
-}
diff --git a/tests/functional/enclave/src/teaclave_frontend_service.rs b/tests/functional/enclave/src/teaclave_frontend_service.rs
index a67e75d..b670133 100644
--- a/tests/functional/enclave/src/teaclave_frontend_service.rs
+++ b/tests/functional/enclave/src/teaclave_frontend_service.rs
@@ -180,14 +180,12 @@ fn test_get_output_file() {
let response = client.register_output_file(request);
let data_id = response.unwrap().data_id;
- let request = GetOutputFileRequest {
- data_id: data_id.clone(),
- };
+ let request = GetOutputFileRequest::new(&data_id);
let response = client.get_output_file(request);
assert!(response.is_ok());
assert!(response.unwrap().hash.is_empty());
- let request = GetOutputFileRequest { data_id };
+ let request = GetOutputFileRequest::new(&data_id);
client
.metadata_mut()
.insert("token".to_string(), "wrong token".to_string());
@@ -206,14 +204,12 @@ fn test_get_input_file() {
let response = client.register_input_file(request);
let data_id = response.unwrap().data_id;
- let request = GetInputFileRequest {
- data_id: data_id.clone(),
- };
+ let request = GetInputFileRequest::new(&data_id);
let response = client.get_input_file(request);
assert!(response.is_ok());
assert!(!response.unwrap().hash.is_empty());
- let request = GetInputFileRequest { data_id };
+ let request = GetInputFileRequest::new(&data_id);
client
.metadata_mut()
.insert("token".to_string(), "wrong token".to_string());
@@ -256,16 +252,12 @@ fn test_register_function() {
fn test_get_function() {
let mut client = get_client();
- let request = GetFunctionRequest {
- function_id: "function-00000000-0000-0000-0000-000000000001".to_string(),
- };
+ let request = GetFunctionRequest::new("function-00000000-0000-0000-0000-000000000001");
let response = client.get_function(request);
assert!(response.is_ok());
assert!(!response.unwrap().name.is_empty());
- let request = GetFunctionRequest {
- function_id: "function-00000000-0000-0000-0000-000000000001".to_string(),
- };
+ let request = GetFunctionRequest::new("function-00000000-0000-0000-0000-000000000001");
client
.metadata_mut()
.insert("token".to_string(), "wrong token".to_string());
@@ -332,14 +324,12 @@ fn test_get_task() {
let response = client.create_task(request);
let task_id = response.unwrap().task_id;
- let request = GetTaskRequest {
- task_id: task_id.clone(),
- };
+ let request = GetTaskRequest::new(&task_id);
let response = client.get_task(request);
assert!(response.is_ok());
assert!(!response.unwrap().function_id.is_empty());
- let request = GetTaskRequest { task_id };
+ let request = GetTaskRequest::new(task_id);
client
.metadata_mut()
.insert("token".to_string(), "wrong token".to_string());
@@ -436,9 +426,7 @@ fn test_approve_task() {
};
let _response = client.assign_data(request);
- let request = ApproveTaskRequest {
- task_id: task_id.clone(),
- };
+ let request = ApproveTaskRequest::new(&task_id);
let correct_token = client.metadata().get("token").unwrap().to_string();
client
.metadata_mut()
@@ -446,7 +434,7 @@ fn test_approve_task() {
let response = client.approve_task(request);
assert!(response.is_err());
- let request = ApproveTaskRequest { task_id };
+ let request = ApproveTaskRequest::new(&task_id);
client
.metadata_mut()
.insert("token".to_string(), correct_token);
@@ -489,14 +477,10 @@ fn test_invoke_task() {
};
let _response = client.assign_data(request);
- let request = ApproveTaskRequest {
- task_id: task_id.clone(),
- };
+ let request = ApproveTaskRequest::new(&task_id);
let _response = client.approve_task(request);
- let request = InvokeTaskRequest {
- task_id: task_id.clone(),
- };
+ let request = InvokeTaskRequest::new(&task_id);
let correct_token = client.metadata().get("token").unwrap().to_string();
client
.metadata_mut()
@@ -504,16 +488,14 @@ fn test_invoke_task() {
let response = client.invoke_task(request);
assert!(response.is_err());
- let request = InvokeTaskRequest {
- task_id: task_id.clone(),
- };
+ let request = InvokeTaskRequest::new(&task_id);
client
.metadata_mut()
.insert("token".to_string(), correct_token);
let response = client.invoke_task(request);
assert!(response.is_ok());
- let request = GetTaskRequest { task_id };
+ let request = GetTaskRequest::new(&task_id);
let response = client.get_task(request);
assert_eq!(response.unwrap().status, TaskStatus::Running);
}
diff --git a/tests/functional/enclave/src/teaclave_scheduler_service.rs b/tests/functional/enclave/src/teaclave_scheduler_service.rs
index a4f0a22..5ef46d1 100644
--- a/tests/functional/enclave/src/teaclave_scheduler_service.rs
+++ b/tests/functional/enclave/src/teaclave_scheduler_service.rs
@@ -11,7 +11,7 @@ use teaclave_types::*;
pub fn run_tests() -> bool {
use teaclave_test_utils::*;
- run_tests!(test_pull_task,)
+ run_tests!(test_pull_task, test_update_task_status_result)
}
fn get_client(user_id: &str) -> TeaclaveSchedulerClient {
@@ -51,3 +51,21 @@ fn test_pull_task() {
log::debug!("response: {:?}", response);
assert!(response.is_ok());
}
+
+fn test_update_task_status_result() {
+ let mut client = get_client("mock_user");
+ let request = PullTaskRequest {};
+ let response = client.pull_task(request).unwrap();
+ log::debug!("response: {:?}", response);
+ let task_id = response.staged_task.task_id.to_string();
+
+ let request = UpdateTaskStatusRequest::new(&task_id, TaskStatus::Finished);
+ let response = client.update_task_status(request);
+ assert!(response.is_ok());
+
+ let request =
+ UpdateTaskResultRequest::new(&task_id, "return".to_string().as_bytes(), HashMap::new());
+ let response = client.update_task_result(request);
+
+ assert!(response.is_ok());
+}
diff --git a/tests/unit/enclave/src/lib.rs b/tests/unit/enclave/src/lib.rs
index 7a12fa7..c498810 100644
--- a/tests/unit/enclave/src/lib.rs
+++ b/tests/unit/enclave/src/lib.rs
@@ -19,9 +19,6 @@
#[cfg(feature = "mesalock_sgx")]
extern crate sgx_tstd as std;
-#[macro_use]
-extern crate log;
-
use std::prelude::v1::*;
use teaclave_access_control_service_enclave;
diff --git a/types/src/task.rs b/types/src/task.rs
index 013cf71..b83af0c 100644
--- a/types/src/task.rs
+++ b/types/src/task.rs
@@ -34,6 +34,8 @@ pub struct Task {
pub approved_user_list: HashSet<String>,
pub input_map: HashMap<String, String>,
pub output_map: HashMap<String, String>,
+ pub return_value: Option<Vec<u8>>,
+ pub output_file_hash: HashMap<String, String>,
pub status: TaskStatus,
}
diff --git a/types/src/worker.rs b/types/src/worker.rs
index dc60c02..2dea81d 100644
--- a/types/src/worker.rs
+++ b/types/src/worker.rs
@@ -267,6 +267,11 @@ pub struct WorkerInvocation {
pub output_files: TeaclaveWorkerFileRegistry<TeaclaveWorkerOutputFileInfo>,
}
+pub struct WorkerInvocationResult {
+ pub return_value: Vec<u8>,
+ pub output_file_hash: HashMap<String, String>,
+}
+
#[cfg(feature = "enclave_unit_test")]
pub mod tests {
use super::*;
diff --git a/utils/service_enclave_utils/src/lib.rs b/utils/service_enclave_utils/src/lib.rs
index da3ad2b..eb2bcd9 100644
--- a/utils/service_enclave_utils/src/lib.rs
+++ b/utils/service_enclave_utils/src/lib.rs
@@ -70,3 +70,23 @@ pub fn create_trusted_storage_endpoint(
Endpoint::new(storage_service_address).config(storage_service_client_config)
}
+
+pub fn create_trusted_scheduler_endpoint(
+ advertised_address: &str,
+ enclave_info: &EnclaveInfo,
+ as_root_ca_cert: &[u8],
+ verifier: AttestationReportVerificationFn,
+) -> Endpoint {
+ let scheduler_service_enclave_attrs = enclave_info
+ .get_enclave_attr("teaclave_scheduler_service")
+ .expect("enclave_info");
+ let scheduler_service_client_config = SgxTrustedTlsClientConfig::new()
+ .attestation_report_verifier(
+ vec![scheduler_service_enclave_attrs],
+ as_root_ca_cert,
+ verifier,
+ );
+ let scheduler_service_address = &advertised_address;
+
+ Endpoint::new(scheduler_service_address).config(scheduler_service_client_config)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@teaclave.apache.org
For additional commands, e-mail: commits-help@teaclave.apache.org