You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@teaclave.apache.org by ms...@apache.org on 2020/03/17 17:10:52 UTC

[incubator-teaclave] 01/03: [services] Rewrite and coordinate scheduler and execution serivces

This is an automated email from the ASF dual-hosted git repository.

mssun pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git

commit 9073e05f505220b1da535f4edcec5b7a071a36f5
Author: Mingshen Sun <bo...@mssun.me>
AuthorDate: Fri Mar 13 16:49:24 2020 -0700

    [services] Rewrite and coordinate scheduler and execution serivces
---
 binder/src/macros.rs                               |   6 +-
 cmake/scripts/test.sh                              |   2 +-
 services/execution/enclave/src/lib.rs              |  70 +++++-----
 services/execution/enclave/src/service.rs          | 144 +++++++++++++++------
 services/management/enclave/src/task.rs            |   2 +
 services/proto/build.rs                            |   1 -
 services/proto/src/lib.rs                          |   5 -
 .../src/proto/teaclave_execution_service.proto     |  32 -----
 .../src/proto/teaclave_scheduler_service.proto     |  18 ++-
 services/proto/src/teaclave_execution_service.rs   | 139 --------------------
 services/proto/src/teaclave_scheduler_service.rs   | 110 +++++++++++++---
 services/scheduler/enclave/src/service.rs          |  52 +++++++-
 tests/functional/enclave/src/lib.rs                |   2 -
 .../enclave/src/teaclave_execution_service.rs      |  75 -----------
 .../enclave/src/teaclave_frontend_service.rs       |  46 ++-----
 .../enclave/src/teaclave_scheduler_service.rs      |  20 ++-
 tests/unit/enclave/src/lib.rs                      |   3 -
 types/src/task.rs                                  |   2 +
 types/src/worker.rs                                |   5 +
 utils/service_enclave_utils/src/lib.rs             |  20 +++
 20 files changed, 355 insertions(+), 399 deletions(-)

diff --git a/binder/src/macros.rs b/binder/src/macros.rs
index effb9ee..d81fa1e 100644
--- a/binder/src/macros.rs
+++ b/binder/src/macros.rs
@@ -94,13 +94,13 @@ macro_rules! register_ecall_handler {
             // The last argument could be either * mut usize, or &mut usize
             let input_buf: &[u8] = unsafe { std::slice::from_raw_parts(in_buf, in_len) };
 
-            trace!("tee receive cmd: {:x}, input_buf = {:?}", cmd, input_buf);
+            log::trace!("tee receive cmd: {:x}, input_buf = {:?}", cmd, input_buf);
 
             let inner_vec = unsafe {
                 match ecall_ipc_lib_dispatcher(cmd, input_buf) {
                     Ok(out) => out,
                     Err(e) => {
-                        error!("tee execute cmd: {:x}, error: {}", cmd, e);
+                        log::error!("tee execute cmd: {:x}, error: {}", cmd, e);
                         return teaclave_types::EnclaveStatus(1);
                     }
                 }
@@ -112,7 +112,7 @@ macro_rules! register_ecall_handler {
             *out_len = inner_len;
 
             if inner_len > out_max {
-                debug!("tee before copy out_buf check: out_max={:x} < inner={:x}", out_max, inner_len);
+                log::debug!("tee before copy out_buf check: out_max={:x} < inner={:x}", out_max, inner_len);
                 return teaclave_types::EnclaveStatus(0x0000_000c);
             }
 
diff --git a/cmake/scripts/test.sh b/cmake/scripts/test.sh
index 6993e42..1b320c1 100755
--- a/cmake/scripts/test.sh
+++ b/cmake/scripts/test.sh
@@ -73,7 +73,7 @@ run_functional_tests() {
   sleep 3    # wait for management service and scheduler_service
   ./teaclave_access_control_service &
   ./teaclave_frontend_service &
-  ./teaclave_execution_service &
+  # ./teaclave_execution_service &
   popd
   sleep 3    # wait for other services
   ./teaclave_functional_tests
diff --git a/services/execution/enclave/src/lib.rs b/services/execution/enclave/src/lib.rs
index 3584f92..4df60bc 100644
--- a/services/execution/enclave/src/lib.rs
+++ b/services/execution/enclave/src/lib.rs
@@ -22,55 +22,49 @@ extern crate sgx_tstd as std;
 #[cfg(feature = "mesalock_sgx")]
 use std::prelude::v1::*;
 
-#[macro_use]
-extern crate log;
-
-use teaclave_attestation::{AttestationConfig, RemoteAttestation};
+use teaclave_attestation::verifier;
 use teaclave_binder::proto::{
     ECallCommand, FinalizeEnclaveInput, FinalizeEnclaveOutput, InitEnclaveInput, InitEnclaveOutput,
     StartServiceInput, StartServiceOutput,
 };
 use teaclave_binder::{handle_ecall, register_ecall_handler};
 use teaclave_config::RuntimeConfig;
-use teaclave_proto::teaclave_execution_service::{
-    TeaclaveExecutionRequest, TeaclaveExecutionResponse,
-};
-use teaclave_rpc::config::SgxTrustedTlsServerConfig;
-use teaclave_rpc::server::SgxTrustedTlsServer;
+use teaclave_config::BUILD_CONFIG;
+use teaclave_service_enclave_utils::create_trusted_scheduler_endpoint;
 use teaclave_service_enclave_utils::ServiceEnclave;
-use teaclave_types::{TeeServiceError, TeeServiceResult};
+use teaclave_types::{EnclaveInfo, TeeServiceError, TeeServiceResult};
 
 mod service;
 
+const AS_ROOT_CA_CERT: &[u8] = BUILD_CONFIG.as_root_ca_cert;
+const AUDITOR_PUBLIC_KEYS_LEN: usize = BUILD_CONFIG.auditor_public_keys.len();
+const AUDITOR_PUBLIC_KEYS: &[&[u8]; AUDITOR_PUBLIC_KEYS_LEN] = BUILD_CONFIG.auditor_public_keys;
+
 fn start_service(config: &RuntimeConfig) -> anyhow::Result<()> {
-    let listen_address = config.internal_endpoints.execution.listen_address;
-    let as_config = &config.attestation;
-    let attestation_config = AttestationConfig::new(
-        &as_config.algorithm,
-        &as_config.url,
-        &as_config.key,
-        &as_config.spid,
+    let enclave_info = EnclaveInfo::verify_and_new(
+        config
+            .audit
+            .enclave_info_bytes
+            .as_ref()
+            .expect("enclave_info"),
+        AUDITOR_PUBLIC_KEYS,
+        config
+            .audit
+            .auditor_signatures_bytes
+            .as_ref()
+            .expect("auditor signatures"),
+    )?;
+    let scheduler_service_address = &config.internal_endpoints.scheduler.advertised_address;
+    let scheduler_service_endpoint = create_trusted_scheduler_endpoint(
+        &scheduler_service_address,
+        &enclave_info,
+        AS_ROOT_CA_CERT,
+        verifier::universal_quote_verifier,
     );
-    let attested_tls_config = RemoteAttestation::new()
-        .config(attestation_config)
-        .generate_and_endorse()
-        .unwrap()
-        .attested_tls_config()
-        .unwrap();
-    let server_config =
-        SgxTrustedTlsServerConfig::from_attested_tls_config(attested_tls_config).unwrap();
 
-    let mut server =
-        SgxTrustedTlsServer::<TeaclaveExecutionResponse, TeaclaveExecutionRequest>::new(
-            listen_address,
-            server_config,
-        );
-    match server.start(service::TeaclaveExecutionService::new()) {
-        Ok(_) => (),
-        Err(e) => {
-            error!("Service exit, error: {}.", e);
-        }
-    }
+    let mut service = service::TeaclaveExecutionService::new(scheduler_service_endpoint).unwrap();
+    let _ = service.start();
+
     Ok(())
 }
 
@@ -101,8 +95,10 @@ register_ecall_handler!(
 
 #[cfg(feature = "enclave_unit_test")]
 pub mod tests {
+    use super::*;
+    use teaclave_test_utils::*;
 
     pub fn run_tests() -> bool {
-        true
+        run_tests!(service::tests::test_invoke_function)
     }
 }
diff --git a/services/execution/enclave/src/service.rs b/services/execution/enclave/src/service.rs
index ce451ef..f939c15 100644
--- a/services/execution/enclave/src/service.rs
+++ b/services/execution/enclave/src/service.rs
@@ -18,64 +18,128 @@
 #[cfg(feature = "mesalock_sgx")]
 use std::prelude::v1::*;
 
-use std::sync::Arc;
+use std::sync::{Arc, SgxMutex as Mutex};
 
-use teaclave_proto::teaclave_execution_service::{
-    StagedFunctionExecuteRequest, StagedFunctionExecuteResponse, TeaclaveExecution,
-};
-use teaclave_service_enclave_utils::teaclave_service;
-use teaclave_types::{TeaclaveServiceResponseError, TeaclaveServiceResponseResult};
-
-use teaclave_rpc::Request;
+use teaclave_proto::teaclave_scheduler_service::*;
+use teaclave_rpc::endpoint::Endpoint;
+use teaclave_types::{StagedTask, WorkerInvocationResult};
 use teaclave_worker::Worker;
-use thiserror::Error;
-
-#[derive(Error, Debug)]
-pub enum TeaclaveExecutionError {
-    #[error("woker running spec error")]
-    WorkerRunningSpecError,
-}
 
-impl From<TeaclaveExecutionError> for TeaclaveServiceResponseError {
-    fn from(error: TeaclaveExecutionError) -> Self {
-        TeaclaveServiceResponseError::RequestError(error.to_string())
-    }
-}
+use anyhow::Result;
 
-#[teaclave_service(teaclave_execution_service, TeaclaveExecution, TeaclaveExecutionError)]
 #[derive(Clone)]
 pub(crate) struct TeaclaveExecutionService {
     worker: Arc<Worker>,
+    scheduler_client: Arc<Mutex<TeaclaveSchedulerClient>>,
 }
 
 impl TeaclaveExecutionService {
-    pub(crate) fn new() -> Self {
-        TeaclaveExecutionService {
+    pub(crate) fn new(scheduler_service_endpoint: Endpoint) -> Result<Self> {
+        let mut i = 0;
+        let channel = loop {
+            match scheduler_service_endpoint.connect() {
+                Ok(channel) => break channel,
+                Err(_) => {
+                    anyhow::ensure!(i < 3, "failed to connect to storage service");
+                    log::debug!("Failed to connect to storage service, retry {}", i);
+                    i += 1;
+                }
+            }
+            std::thread::sleep(std::time::Duration::from_secs(1));
+        };
+        let scheduler_client = Arc::new(Mutex::new(TeaclaveSchedulerClient::new(channel)?));
+        Ok(TeaclaveExecutionService {
             worker: Arc::new(Worker::default()),
-        }
+            scheduler_client,
+        })
     }
-}
 
-impl TeaclaveExecution for TeaclaveExecutionService {
-    fn invoke_function(
-        &self,
-        request: Request<StagedFunctionExecuteRequest>,
-    ) -> TeaclaveServiceResponseResult<StagedFunctionExecuteResponse> {
-        let request = request.message;
-        match self.worker.invoke_function(request.into()) {
-            Ok(summary) => {
-                info!("[+] Invoking function ok: {}", summary);
-                Ok(summary.into())
-            }
-            Err(e) => {
-                error!("[+] Invoking function failed: {}", e);
-                Err(TeaclaveExecutionError::WorkerRunningSpecError.into())
-            }
+    pub(crate) fn start(&mut self) -> Result<()> {
+        loop {
+            std::thread::sleep(std::time::Duration::from_secs(3));
+            let scheduler_client = self.scheduler_client.clone();
+            let mut client = match scheduler_client.lock() {
+                Ok(client) => client,
+                Err(e) => {
+                    log::error!("Error: {:?}", e);
+                    continue;
+                }
+            };
+
+            let request = PullTaskRequest {};
+            log::debug!("pull_task");
+            let response = match client.pull_task(request) {
+                Ok(response) => response,
+                Err(e) => {
+                    log::error!("Error: {:?}", e);
+                    continue;
+                }
+            };
+            log::debug!("response: {:?}", response);
+            let result = self.invoke_task(response.staged_task);
+            self.update_task(result);
         }
     }
+
+    fn invoke_task(&mut self, _task: StagedTask) -> WorkerInvocationResult {
+        // TODO: convert task to function, i.e., needs help from agent
+        unimplemented!()
+    }
+
+    fn update_task(&mut self, _result: WorkerInvocationResult) {
+        unimplemented!()
+    }
 }
 
 #[cfg(test_mode)]
 mod test_mode {
     use super::*;
 }
+
+#[cfg(feature = "enclave_unit_test")]
+pub mod tests {
+    use super::*;
+    use std::convert::TryInto;
+    use teaclave_types::*;
+
+    pub fn test_invoke_function() {
+        let function_args = TeaclaveFunctionArguments::new(&hashmap!(
+            "feature_size"  => "4",
+            "max_depth"     => "4",
+            "iterations"    => "100",
+            "shrinkage"     => "0.1",
+            "feature_sample_ratio" => "1.0",
+            "data_sample_ratio" => "1.0",
+            "min_leaf_size" => "1",
+            "loss"          => "LAD",
+            "training_optimization_level" => "2"
+        ));
+
+        let plain_input = "fixtures/functions/gbdt_training/train.txt";
+        let enc_output = "fixtures/functions/gbdt_training/model.enc.out";
+
+        let input_info =
+            TeaclaveWorkerInputFileInfo::create_with_plaintext_file(plain_input).unwrap();
+        let input_files = TeaclaveWorkerFileRegistry::new(hashmap!(
+            "training_data".to_string() => input_info));
+
+        let output_info =
+            TeaclaveWorkerOutputFileInfo::new(enc_output, TeaclaveFileRootKey128::default());
+        let output_files = TeaclaveWorkerFileRegistry::new(hashmap!(
+            "trained_model".to_string() => output_info));
+        let invocation = WorkerInvocation {
+            runtime_name: "default".to_string(),
+            executor_type: "native".try_into().unwrap(),
+            function_name: "gbdt_training".to_string(),
+            function_payload: String::new(),
+            function_args,
+            input_files,
+            output_files,
+        };
+
+        let worker = Worker::default();
+        let result = worker.invoke_function(invocation);
+        assert!(result.is_ok());
+        log::debug!("summary: {:?}", result.unwrap());
+    }
+}
diff --git a/services/management/enclave/src/task.rs b/services/management/enclave/src/task.rs
index d6e3729..2d3a584 100644
--- a/services/management/enclave/src/task.rs
+++ b/services/management/enclave/src/task.rs
@@ -59,6 +59,8 @@ pub(crate) fn create_task(
         approved_user_list: HashSet::new(),
         input_map: HashMap::new(),
         output_map: HashMap::new(),
+        return_value: None,
+        output_file_hash: HashMap::new(),
         status: TaskStatus::Created,
     };
     // check arguments
diff --git a/services/proto/build.rs b/services/proto/build.rs
index b1f4897..fc17b99 100644
--- a/services/proto/build.rs
+++ b/services/proto/build.rs
@@ -26,7 +26,6 @@ fn main() {
         "src/proto/teaclave_authentication_service.proto",
         "src/proto/teaclave_common.proto",
         "src/proto/teaclave_storage_service.proto",
-        "src/proto/teaclave_execution_service.proto",
         "src/proto/teaclave_frontend_service.proto",
         "src/proto/teaclave_management_service.proto",
         "src/proto/teaclave_scheduler_service.proto",
diff --git a/services/proto/src/lib.rs b/services/proto/src/lib.rs
index b7a3af2..18c285f 100644
--- a/services/proto/src/lib.rs
+++ b/services/proto/src/lib.rs
@@ -22,7 +22,6 @@ extern crate sgx_tstd as std;
 pub mod teaclave_access_control_service;
 pub mod teaclave_authentication_service;
 pub mod teaclave_common;
-pub mod teaclave_execution_service;
 pub mod teaclave_frontend_service;
 pub mod teaclave_management_service;
 pub mod teaclave_scheduler_service;
@@ -46,10 +45,6 @@ pub mod teaclave_storage_service_proto {
     include_proto!("teaclave_storage_service_proto");
 }
 
-pub mod teaclave_execution_service_proto {
-    include_proto!("teaclave_execution_service_proto");
-}
-
 pub mod teaclave_frontend_service_proto {
     include_proto!("teaclave_frontend_service_proto");
 }
diff --git a/services/proto/src/proto/teaclave_execution_service.proto b/services/proto/src/proto/teaclave_execution_service.proto
deleted file mode 100644
index f0a57ba..0000000
--- a/services/proto/src/proto/teaclave_execution_service.proto
+++ /dev/null
@@ -1,32 +0,0 @@
-syntax = "proto3";
-package teaclave_execution_service_proto;
-
-import "teaclave_common.proto";
-
-message WorkerInputFileInfo {
-  string path = 1;
-  teaclave_common_proto.FileCryptoInfo crypto_info = 2;
-}
-
-message WorkerOutputFileInfo {
-  string path = 1;
-  teaclave_common_proto.FileCryptoInfo crypto_info = 2;
-}
-
-message StagedFunctionExecuteRequest {
-  string runtime_name = 1;
-  string executor_type = 2;
-  string function_name = 3;
-  string function_payload = 4;
-  map<string, string> function_args = 11;
-  map<string, WorkerInputFileInfo> input_files = 21;
-  map<string, WorkerOutputFileInfo> output_files = 22;
-}
-
-message StagedFunctionExecuteResponse {
-  string summary = 1;
-}
-
-service TeaclaveExecution {
-  rpc InvokeFunction(StagedFunctionExecuteRequest) returns (StagedFunctionExecuteResponse);
-}
\ No newline at end of file
diff --git a/services/proto/src/proto/teaclave_scheduler_service.proto b/services/proto/src/proto/teaclave_scheduler_service.proto
index ea34ec0..151efe7 100644
--- a/services/proto/src/proto/teaclave_scheduler_service.proto
+++ b/services/proto/src/proto/teaclave_scheduler_service.proto
@@ -13,10 +13,18 @@ message PullTaskResponse {
   bytes staged_task = 1;
 }
 
-message UpdateTaskRequest {
-  string staged_task_id = 2;
+message UpdateTaskStatusRequest {
+  string task_id = 1;
+  teaclave_common_proto.TaskStatus task_status = 2;
 }
-message UpdateTaskResponse {}
+message UpdateTaskStatusResponse {}
+
+message UpdateTaskResultRequest {
+  string task_id = 1;
+  bytes return_value = 2;
+  map<string, string> output_file_hash = 3;
+}
+message UpdateTaskResultResponse {}
 
 message PublishTaskRequest {
   bytes staged_task = 1;
@@ -30,5 +38,7 @@ service TeaclaveScheduler {
   // Subscriber
   rpc Subscribe(SubscribeRequest) returns (SubscribeResponse);
   rpc PullTask(PullTaskRequest) returns (PullTaskResponse);
-  rpc UpdateTask(UpdateTaskRequest) returns (UpdateTaskResponse);
+
+  rpc UpdateTaskStatus(UpdateTaskStatusRequest) returns (UpdateTaskStatusResponse);
+  rpc UpdateTaskResult(UpdateTaskResultRequest) returns (UpdateTaskResultResponse);
 }
diff --git a/services/proto/src/teaclave_execution_service.rs b/services/proto/src/teaclave_execution_service.rs
deleted file mode 100644
index 4713c1b..0000000
--- a/services/proto/src/teaclave_execution_service.rs
+++ /dev/null
@@ -1,139 +0,0 @@
-use std::prelude::v1::*;
-
-use anyhow::{anyhow, Error, Result};
-use core::convert::TryInto;
-use teaclave_rpc::into_request;
-use teaclave_types::{
-    TeaclaveFunctionArguments, TeaclaveWorkerInputFileInfo, TeaclaveWorkerOutputFileInfo,
-    WorkerInvocation,
-};
-
-use crate::teaclave_execution_service_proto as proto;
-pub use proto::TeaclaveExecution;
-pub use proto::TeaclaveExecutionClient;
-pub use proto::TeaclaveExecutionRequest;
-pub use proto::TeaclaveExecutionResponse;
-
-#[into_request(TeaclaveExecutionRequest::InvokeFunction)]
-#[derive(Debug)]
-pub struct StagedFunctionExecuteRequest {
-    pub invocation: WorkerInvocation,
-}
-
-#[into_request(TeaclaveExecutionResponse::InvokeFunction)]
-#[derive(Debug)]
-pub struct StagedFunctionExecuteResponse {
-    pub summary: std::string::String,
-}
-
-impl std::convert::TryFrom<proto::WorkerInputFileInfo> for TeaclaveWorkerInputFileInfo {
-    type Error = Error;
-    fn try_from(proto: proto::WorkerInputFileInfo) -> Result<Self> {
-        let path = std::path::Path::new(&proto.path).to_path_buf();
-        let crypto_info = proto
-            .crypto_info
-            .ok_or_else(|| anyhow!("Missing field: crypto_info"))?
-            .try_into()?;
-        Ok(TeaclaveWorkerInputFileInfo { path, crypto_info })
-    }
-}
-
-impl std::convert::TryFrom<proto::WorkerOutputFileInfo> for TeaclaveWorkerOutputFileInfo {
-    type Error = Error;
-    fn try_from(proto: proto::WorkerOutputFileInfo) -> Result<Self> {
-        let path = std::path::Path::new(&proto.path).to_path_buf();
-        let crypto_info = proto
-            .crypto_info
-            .ok_or_else(|| anyhow!("Missing field: crypto_info"))?
-            .try_into()?;
-        Ok(TeaclaveWorkerOutputFileInfo { path, crypto_info })
-    }
-}
-
-// For server side
-impl std::convert::TryFrom<proto::StagedFunctionExecuteRequest> for StagedFunctionExecuteRequest {
-    type Error = Error;
-
-    fn try_from(proto: proto::StagedFunctionExecuteRequest) -> Result<Self> {
-        let ret = Self {
-            invocation: WorkerInvocation {
-                runtime_name: proto.runtime_name,
-                executor_type: proto.executor_type.as_str().try_into()?,
-                function_name: proto.function_name,
-                function_payload: proto.function_payload,
-                function_args: TeaclaveFunctionArguments {
-                    args: proto.function_args,
-                },
-                input_files: proto.input_files.try_into()?,
-                output_files: proto.output_files.try_into()?,
-            },
-        };
-
-        Ok(ret)
-    }
-}
-
-impl std::convert::TryFrom<proto::StagedFunctionExecuteResponse> for StagedFunctionExecuteResponse {
-    type Error = Error;
-
-    fn try_from(proto: proto::StagedFunctionExecuteResponse) -> Result<Self> {
-        let ret = Self {
-            summary: proto.summary,
-        };
-
-        Ok(ret)
-    }
-}
-
-// For client side
-impl std::convert::From<TeaclaveWorkerInputFileInfo> for proto::WorkerInputFileInfo {
-    fn from(info: TeaclaveWorkerInputFileInfo) -> Self {
-        proto::WorkerInputFileInfo {
-            path: info.path.to_string_lossy().to_string(),
-            crypto_info: Some(info.crypto_info.into()),
-        }
-    }
-}
-
-impl std::convert::From<TeaclaveWorkerOutputFileInfo> for proto::WorkerOutputFileInfo {
-    fn from(info: TeaclaveWorkerOutputFileInfo) -> Self {
-        proto::WorkerOutputFileInfo {
-            path: info.path.to_string_lossy().to_string(),
-            crypto_info: Some(info.crypto_info.into()),
-        }
-    }
-}
-
-impl From<StagedFunctionExecuteRequest> for WorkerInvocation {
-    fn from(request: StagedFunctionExecuteRequest) -> Self {
-        request.invocation
-    }
-}
-
-impl From<StagedFunctionExecuteRequest> for proto::StagedFunctionExecuteRequest {
-    fn from(request: StagedFunctionExecuteRequest) -> Self {
-        Self {
-            runtime_name: request.invocation.runtime_name,
-            executor_type: request.invocation.executor_type.to_string(),
-            function_name: request.invocation.function_name,
-            function_payload: request.invocation.function_payload,
-            function_args: request.invocation.function_args.args,
-            input_files: request.invocation.input_files.into(),
-            output_files: request.invocation.output_files.into(),
-        }
-    }
-}
-
-impl From<StagedFunctionExecuteResponse> for proto::StagedFunctionExecuteResponse {
-    fn from(response: StagedFunctionExecuteResponse) -> Self {
-        Self {
-            summary: response.summary,
-        }
-    }
-}
-
-impl From<String> for StagedFunctionExecuteResponse {
-    fn from(summary: String) -> Self {
-        Self { summary }
-    }
-}
diff --git a/services/proto/src/teaclave_scheduler_service.rs b/services/proto/src/teaclave_scheduler_service.rs
index 1d8870a..2677367 100644
--- a/services/proto/src/teaclave_scheduler_service.rs
+++ b/services/proto/src/teaclave_scheduler_service.rs
@@ -4,7 +4,7 @@
 use std::collections::HashMap;
 use std::prelude::v1::*;
 
-use crate::teaclave_execution_service::StagedFunctionExecuteRequest;
+use crate::teaclave_common::{i32_from_task_status, i32_to_task_status};
 use crate::teaclave_scheduler_service_proto as proto;
 use anyhow::{Error, Result};
 use core::convert::TryInto;
@@ -13,7 +13,7 @@ pub use proto::TeaclaveSchedulerClient;
 pub use proto::TeaclaveSchedulerRequest;
 pub use proto::TeaclaveSchedulerResponse;
 use teaclave_rpc::into_request;
-use teaclave_types::StagedTask;
+use teaclave_types::{StagedTask, TaskStatus};
 
 #[into_request(TeaclaveSchedulerRequest::Subscribe)]
 pub struct SubscribeRequest {}
@@ -38,13 +38,46 @@ impl PullTaskResponse {
     }
 }
 
-#[into_request(TeaclaveSchedulerRequest::UpdateTask)]
-pub struct UpdateTaskRequest {
-    pub staged_task_id: String,
+#[into_request(TeaclaveSchedulerRequest::UpdateTaskResult)]
+pub struct UpdateTaskResultRequest {
+    pub task_id: String,
+    pub return_value: Vec<u8>,
+    pub output_file_hash: HashMap<String, String>,
 }
 
-#[into_request(TeaclaveSchedulerResponse::UpdateTask)]
-pub struct UpdateTaskResponse {}
+impl UpdateTaskResultRequest {
+    pub fn new(
+        task_id: impl Into<String>,
+        return_value: &[u8],
+        output_file_hash: HashMap<String, String>,
+    ) -> Self {
+        Self {
+            task_id: task_id.into(),
+            return_value: return_value.to_vec(),
+            output_file_hash,
+        }
+    }
+}
+
+#[into_request(TeaclaveSchedulerResponse::UpdateTaskResult)]
+pub struct UpdateTaskResultResponse {}
+
+#[into_request(TeaclaveSchedulerRequest::UpdateTaskStatus)]
+pub struct UpdateTaskStatusRequest {
+    pub task_id: String,
+    pub task_status: TaskStatus,
+}
+
+impl UpdateTaskStatusRequest {
+    pub fn new(task_id: impl Into<String>, task_status: TaskStatus) -> Self {
+        Self {
+            task_id: task_id.into(),
+            task_status,
+        }
+    }
+}
+#[into_request(TeaclaveSchedulerResponse::UpdateTaskStatus)]
+pub struct UpdateTaskStatusResponse {}
 
 #[into_request(TeaclaveSchedulerRequest::PublishTask)]
 pub struct PublishTaskRequest {
@@ -116,36 +149,75 @@ impl std::convert::From<PullTaskResponse> for proto::PullTaskResponse {
         }
     }
 }
+impl std::convert::TryFrom<proto::UpdateTaskResultRequest> for UpdateTaskResultRequest {
+    type Error = Error;
+    fn try_from(proto: proto::UpdateTaskResultRequest) -> Result<Self> {
+        let ret = Self {
+            task_id: proto.task_id,
+            return_value: proto.return_value,
+            output_file_hash: proto.output_file_hash,
+        };
+        Ok(ret)
+    }
+}
+
+impl std::convert::From<UpdateTaskResultRequest> for proto::UpdateTaskResultRequest {
+    fn from(req: UpdateTaskResultRequest) -> Self {
+        proto::UpdateTaskResultRequest {
+            task_id: req.task_id,
+            return_value: req.return_value,
+            output_file_hash: req.output_file_hash,
+        }
+    }
+}
+
+impl std::convert::TryFrom<proto::UpdateTaskResultResponse> for UpdateTaskResultResponse {
+    type Error = Error;
+    fn try_from(proto: proto::UpdateTaskResultResponse) -> Result<Self> {
+        let ret = Self {};
+        Ok(ret)
+    }
+}
+
+impl std::convert::From<UpdateTaskResultResponse> for proto::UpdateTaskResultResponse {
+    fn from(req: UpdateTaskResultResponse) -> Self {
+        proto::UpdateTaskResultResponse {}
+    }
+}
 
-impl std::convert::TryFrom<proto::UpdateTaskRequest> for UpdateTaskRequest {
+impl std::convert::TryFrom<proto::UpdateTaskStatusRequest> for UpdateTaskStatusRequest {
     type Error = Error;
-    fn try_from(proto: proto::UpdateTaskRequest) -> Result<Self> {
+    fn try_from(proto: proto::UpdateTaskStatusRequest) -> Result<Self> {
+        let task_status = i32_to_task_status(proto.task_status)?;
         let ret = Self {
-            staged_task_id: proto.staged_task_id,
+            task_id: proto.task_id,
+            task_status,
         };
         Ok(ret)
     }
 }
 
-impl std::convert::From<UpdateTaskRequest> for proto::UpdateTaskRequest {
-    fn from(req: UpdateTaskRequest) -> Self {
-        proto::UpdateTaskRequest {
-            staged_task_id: req.staged_task_id,
+impl std::convert::From<UpdateTaskStatusRequest> for proto::UpdateTaskStatusRequest {
+    fn from(req: UpdateTaskStatusRequest) -> Self {
+        let task_status = i32_from_task_status(req.task_status);
+        proto::UpdateTaskStatusRequest {
+            task_id: req.task_id,
+            task_status,
         }
     }
 }
 
-impl std::convert::TryFrom<proto::UpdateTaskResponse> for UpdateTaskResponse {
+impl std::convert::TryFrom<proto::UpdateTaskStatusResponse> for UpdateTaskStatusResponse {
     type Error = Error;
-    fn try_from(proto: proto::UpdateTaskResponse) -> Result<Self> {
+    fn try_from(proto: proto::UpdateTaskStatusResponse) -> Result<Self> {
         let ret = Self {};
         Ok(ret)
     }
 }
 
-impl std::convert::From<UpdateTaskResponse> for proto::UpdateTaskResponse {
-    fn from(req: UpdateTaskResponse) -> Self {
-        proto::UpdateTaskResponse {}
+impl std::convert::From<UpdateTaskStatusResponse> for proto::UpdateTaskStatusResponse {
+    fn from(req: UpdateTaskStatusResponse) -> Self {
+        proto::UpdateTaskStatusResponse {}
     }
 }
 
diff --git a/services/scheduler/enclave/src/service.rs b/services/scheduler/enclave/src/service.rs
index 76c0b2c..5a843d7 100644
--- a/services/scheduler/enclave/src/service.rs
+++ b/services/scheduler/enclave/src/service.rs
@@ -29,7 +29,7 @@ use teaclave_rpc::endpoint::Endpoint;
 use teaclave_rpc::Request;
 use teaclave_service_enclave_utils::teaclave_service;
 use teaclave_types::{
-    StagedTask, Storable, TeaclaveServiceResponseError, TeaclaveServiceResponseResult,
+    StagedTask, Storable, Task, TeaclaveServiceResponseError, TeaclaveServiceResponseResult,
 };
 
 use anyhow::anyhow;
@@ -94,6 +94,31 @@ impl TeaclaveSchedulerService {
         T::from_slice(dequeue_response.value.as_slice())
             .map_err(|_| TeaclaveSchedulerError::DataError.into())
     }
+
+    fn get_task(&self, key: &str) -> Result<Task> {
+        let key = format!("{}-{}", Task::key_prefix(), key);
+        let get_request = GetRequest::new(key.into_bytes());
+        let get_response = self
+            .storage_client
+            .clone()
+            .lock()
+            .map_err(|_| anyhow!("Cannot lock storage client"))?
+            .get(get_request)?;
+        Task::from_slice(get_response.value.as_slice())
+    }
+
+    fn put_task(&self, item: &impl Storable) -> Result<()> {
+        let k = item.key();
+        let v = item.to_vec()?;
+        let put_request = PutRequest::new(k.as_slice(), v.as_slice());
+        let _put_response = self
+            .storage_client
+            .clone()
+            .lock()
+            .map_err(|_| anyhow!("Cannot lock storage client"))?
+            .put(put_request)?;
+        Ok(())
+    }
 }
 
 impl TeaclaveScheduler for TeaclaveSchedulerService {
@@ -117,6 +142,7 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
         &self,
         request: Request<SubscribeRequest>,
     ) -> TeaclaveServiceResponseResult<SubscribeResponse> {
+        // TODO: subscribe a specific topic
         unimplemented!()
     }
 
@@ -130,11 +156,27 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
         Ok(response)
     }
 
-    fn update_task(
+    fn update_task_status(
         &self,
-        request: Request<UpdateTaskRequest>,
-    ) -> TeaclaveServiceResponseResult<UpdateTaskResponse> {
-        unimplemented!()
+        request: Request<UpdateTaskStatusRequest>,
+    ) -> TeaclaveServiceResponseResult<UpdateTaskStatusResponse> {
+        let request = request.message;
+        let mut task = self.get_task(&request.task_id)?;
+        task.status = request.task_status;
+        self.put_task(&task)?;
+        Ok(UpdateTaskStatusResponse {})
+    }
+
+    fn update_task_result(
+        &self,
+        request: Request<UpdateTaskResultRequest>,
+    ) -> TeaclaveServiceResponseResult<UpdateTaskResultResponse> {
+        let request = request.message;
+        let mut task = self.get_task(&request.task_id)?;
+        task.return_value = Some(request.return_value);
+        task.output_file_hash = request.output_file_hash;
+        self.put_task(&task)?;
+        Ok(UpdateTaskResultResponse {})
     }
 }
 
diff --git a/tests/functional/enclave/src/lib.rs b/tests/functional/enclave/src/lib.rs
index b768b3e..891c552 100644
--- a/tests/functional/enclave/src/lib.rs
+++ b/tests/functional/enclave/src/lib.rs
@@ -37,7 +37,6 @@ use teaclave_types::TeeServiceResult;
 
 mod teaclave_access_control_service;
 mod teaclave_authentication_service;
-mod teaclave_execution_service;
 mod teaclave_frontend_service;
 mod teaclave_management_service;
 mod teaclave_scheduler_service;
@@ -49,7 +48,6 @@ fn handle_run_test(_: &RunTestInput) -> TeeServiceResult<RunTestOutput> {
         teaclave_access_control_service::run_tests(),
         teaclave_authentication_service::run_tests(),
         teaclave_storage_service::run_tests(),
-        teaclave_execution_service::run_tests(),
         teaclave_frontend_service::run_tests(),
         teaclave_management_service::run_tests(),
         teaclave_scheduler_service::run_tests(),
diff --git a/tests/functional/enclave/src/teaclave_execution_service.rs b/tests/functional/enclave/src/teaclave_execution_service.rs
deleted file mode 100644
index e86019a..0000000
--- a/tests/functional/enclave/src/teaclave_execution_service.rs
+++ /dev/null
@@ -1,75 +0,0 @@
-use std::prelude::v1::*;
-
-use std::convert::TryInto;
-use teaclave_proto::teaclave_execution_service::*;
-use teaclave_rpc::endpoint::Endpoint;
-
-use teaclave_config::RuntimeConfig;
-use teaclave_types::hashmap;
-use teaclave_types::TeaclaveFileRootKey128;
-use teaclave_types::TeaclaveFunctionArguments;
-use teaclave_types::TeaclaveWorkerFileRegistry;
-use teaclave_types::TeaclaveWorkerInputFileInfo;
-use teaclave_types::TeaclaveWorkerOutputFileInfo;
-use teaclave_types::WorkerInvocation;
-
-pub fn run_tests() -> bool {
-    use teaclave_test_utils::*;
-    run_tests!(test_invoke_success,)
-}
-
-fn get_client() -> TeaclaveExecutionClient {
-    let runtime_config = RuntimeConfig::from_toml("runtime.config.toml").expect("runtime");
-    let channel = Endpoint::new(
-        &runtime_config
-            .internal_endpoints
-            .execution
-            .advertised_address,
-    )
-    .connect()
-    .expect("channel");
-    TeaclaveExecutionClient::new(channel).expect("client")
-}
-
-fn test_invoke_success() {
-    let mut client = get_client();
-
-    let function_args = TeaclaveFunctionArguments::new(&hashmap!(
-        "feature_size"  => "4",
-        "max_depth"     => "4",
-        "iterations"    => "100",
-        "shrinkage"     => "0.1",
-        "feature_sample_ratio" => "1.0",
-        "data_sample_ratio" => "1.0",
-        "min_leaf_size" => "1",
-        "loss"          => "LAD",
-        "training_optimization_level" => "2"
-    ));
-
-    let plain_input = "fixtures/functions/gbdt_training/train.txt";
-    let enc_output = "fixtures/functions/gbdt_training/model.enc.out";
-
-    let input_info = TeaclaveWorkerInputFileInfo::create_with_plaintext_file(plain_input).unwrap();
-    let input_files = TeaclaveWorkerFileRegistry::new(hashmap!(
-        "training_data".to_string() => input_info));
-
-    let output_info =
-        TeaclaveWorkerOutputFileInfo::new(enc_output, TeaclaveFileRootKey128::default());
-    let output_files = TeaclaveWorkerFileRegistry::new(hashmap!(
-        "trained_model".to_string() => output_info));
-
-    let request = StagedFunctionExecuteRequest {
-        invocation: WorkerInvocation {
-            runtime_name: "default".to_string(),
-            executor_type: "native".try_into().unwrap(),
-            function_name: "gbdt_training".to_string(),
-            function_payload: String::new(),
-            function_args,
-            input_files,
-            output_files,
-        },
-    };
-
-    let response_result = client.invoke_function(request);
-    assert!(response_result.is_ok());
-}
diff --git a/tests/functional/enclave/src/teaclave_frontend_service.rs b/tests/functional/enclave/src/teaclave_frontend_service.rs
index a67e75d..b670133 100644
--- a/tests/functional/enclave/src/teaclave_frontend_service.rs
+++ b/tests/functional/enclave/src/teaclave_frontend_service.rs
@@ -180,14 +180,12 @@ fn test_get_output_file() {
     let response = client.register_output_file(request);
     let data_id = response.unwrap().data_id;
 
-    let request = GetOutputFileRequest {
-        data_id: data_id.clone(),
-    };
+    let request = GetOutputFileRequest::new(&data_id);
     let response = client.get_output_file(request);
     assert!(response.is_ok());
     assert!(response.unwrap().hash.is_empty());
 
-    let request = GetOutputFileRequest { data_id };
+    let request = GetOutputFileRequest::new(&data_id);
     client
         .metadata_mut()
         .insert("token".to_string(), "wrong token".to_string());
@@ -206,14 +204,12 @@ fn test_get_input_file() {
     let response = client.register_input_file(request);
     let data_id = response.unwrap().data_id;
 
-    let request = GetInputFileRequest {
-        data_id: data_id.clone(),
-    };
+    let request = GetInputFileRequest::new(&data_id);
     let response = client.get_input_file(request);
     assert!(response.is_ok());
     assert!(!response.unwrap().hash.is_empty());
 
-    let request = GetInputFileRequest { data_id };
+    let request = GetInputFileRequest::new(&data_id);
     client
         .metadata_mut()
         .insert("token".to_string(), "wrong token".to_string());
@@ -256,16 +252,12 @@ fn test_register_function() {
 fn test_get_function() {
     let mut client = get_client();
 
-    let request = GetFunctionRequest {
-        function_id: "function-00000000-0000-0000-0000-000000000001".to_string(),
-    };
+    let request = GetFunctionRequest::new("function-00000000-0000-0000-0000-000000000001");
     let response = client.get_function(request);
     assert!(response.is_ok());
     assert!(!response.unwrap().name.is_empty());
 
-    let request = GetFunctionRequest {
-        function_id: "function-00000000-0000-0000-0000-000000000001".to_string(),
-    };
+    let request = GetFunctionRequest::new("function-00000000-0000-0000-0000-000000000001");
     client
         .metadata_mut()
         .insert("token".to_string(), "wrong token".to_string());
@@ -332,14 +324,12 @@ fn test_get_task() {
     let response = client.create_task(request);
     let task_id = response.unwrap().task_id;
 
-    let request = GetTaskRequest {
-        task_id: task_id.clone(),
-    };
+    let request = GetTaskRequest::new(&task_id);
     let response = client.get_task(request);
     assert!(response.is_ok());
     assert!(!response.unwrap().function_id.is_empty());
 
-    let request = GetTaskRequest { task_id };
+    let request = GetTaskRequest::new(task_id);
     client
         .metadata_mut()
         .insert("token".to_string(), "wrong token".to_string());
@@ -436,9 +426,7 @@ fn test_approve_task() {
     };
     let _response = client.assign_data(request);
 
-    let request = ApproveTaskRequest {
-        task_id: task_id.clone(),
-    };
+    let request = ApproveTaskRequest::new(&task_id);
     let correct_token = client.metadata().get("token").unwrap().to_string();
     client
         .metadata_mut()
@@ -446,7 +434,7 @@ fn test_approve_task() {
     let response = client.approve_task(request);
     assert!(response.is_err());
 
-    let request = ApproveTaskRequest { task_id };
+    let request = ApproveTaskRequest::new(&task_id);
     client
         .metadata_mut()
         .insert("token".to_string(), correct_token);
@@ -489,14 +477,10 @@ fn test_invoke_task() {
     };
     let _response = client.assign_data(request);
 
-    let request = ApproveTaskRequest {
-        task_id: task_id.clone(),
-    };
+    let request = ApproveTaskRequest::new(&task_id);
     let _response = client.approve_task(request);
 
-    let request = InvokeTaskRequest {
-        task_id: task_id.clone(),
-    };
+    let request = InvokeTaskRequest::new(&task_id);
     let correct_token = client.metadata().get("token").unwrap().to_string();
     client
         .metadata_mut()
@@ -504,16 +488,14 @@ fn test_invoke_task() {
     let response = client.invoke_task(request);
     assert!(response.is_err());
 
-    let request = InvokeTaskRequest {
-        task_id: task_id.clone(),
-    };
+    let request = InvokeTaskRequest::new(&task_id);
     client
         .metadata_mut()
         .insert("token".to_string(), correct_token);
     let response = client.invoke_task(request);
     assert!(response.is_ok());
 
-    let request = GetTaskRequest { task_id };
+    let request = GetTaskRequest::new(&task_id);
     let response = client.get_task(request);
     assert_eq!(response.unwrap().status, TaskStatus::Running);
 }
diff --git a/tests/functional/enclave/src/teaclave_scheduler_service.rs b/tests/functional/enclave/src/teaclave_scheduler_service.rs
index a4f0a22..5ef46d1 100644
--- a/tests/functional/enclave/src/teaclave_scheduler_service.rs
+++ b/tests/functional/enclave/src/teaclave_scheduler_service.rs
@@ -11,7 +11,7 @@ use teaclave_types::*;
 pub fn run_tests() -> bool {
     use teaclave_test_utils::*;
 
-    run_tests!(test_pull_task,)
+    run_tests!(test_pull_task, test_update_task_status_result)
 }
 
 fn get_client(user_id: &str) -> TeaclaveSchedulerClient {
@@ -51,3 +51,21 @@ fn test_pull_task() {
     log::debug!("response: {:?}", response);
     assert!(response.is_ok());
 }
+
+fn test_update_task_status_result() {
+    let mut client = get_client("mock_user");
+    let request = PullTaskRequest {};
+    let response = client.pull_task(request).unwrap();
+    log::debug!("response: {:?}", response);
+    let task_id = response.staged_task.task_id.to_string();
+
+    let request = UpdateTaskStatusRequest::new(&task_id, TaskStatus::Finished);
+    let response = client.update_task_status(request);
+    assert!(response.is_ok());
+
+    let request =
+        UpdateTaskResultRequest::new(&task_id, "return".to_string().as_bytes(), HashMap::new());
+    let response = client.update_task_result(request);
+
+    assert!(response.is_ok());
+}
diff --git a/tests/unit/enclave/src/lib.rs b/tests/unit/enclave/src/lib.rs
index 7a12fa7..c498810 100644
--- a/tests/unit/enclave/src/lib.rs
+++ b/tests/unit/enclave/src/lib.rs
@@ -19,9 +19,6 @@
 #[cfg(feature = "mesalock_sgx")]
 extern crate sgx_tstd as std;
 
-#[macro_use]
-extern crate log;
-
 use std::prelude::v1::*;
 
 use teaclave_access_control_service_enclave;
diff --git a/types/src/task.rs b/types/src/task.rs
index 013cf71..b83af0c 100644
--- a/types/src/task.rs
+++ b/types/src/task.rs
@@ -34,6 +34,8 @@ pub struct Task {
     pub approved_user_list: HashSet<String>,
     pub input_map: HashMap<String, String>,
     pub output_map: HashMap<String, String>,
+    pub return_value: Option<Vec<u8>>,
+    pub output_file_hash: HashMap<String, String>,
     pub status: TaskStatus,
 }
 
diff --git a/types/src/worker.rs b/types/src/worker.rs
index dc60c02..2dea81d 100644
--- a/types/src/worker.rs
+++ b/types/src/worker.rs
@@ -267,6 +267,11 @@ pub struct WorkerInvocation {
     pub output_files: TeaclaveWorkerFileRegistry<TeaclaveWorkerOutputFileInfo>,
 }
 
+pub struct WorkerInvocationResult {
+    pub return_value: Vec<u8>,
+    pub output_file_hash: HashMap<String, String>,
+}
+
 #[cfg(feature = "enclave_unit_test")]
 pub mod tests {
     use super::*;
diff --git a/utils/service_enclave_utils/src/lib.rs b/utils/service_enclave_utils/src/lib.rs
index da3ad2b..eb2bcd9 100644
--- a/utils/service_enclave_utils/src/lib.rs
+++ b/utils/service_enclave_utils/src/lib.rs
@@ -70,3 +70,23 @@ pub fn create_trusted_storage_endpoint(
 
     Endpoint::new(storage_service_address).config(storage_service_client_config)
 }
+
+pub fn create_trusted_scheduler_endpoint(
+    advertised_address: &str,
+    enclave_info: &EnclaveInfo,
+    as_root_ca_cert: &[u8],
+    verifier: AttestationReportVerificationFn,
+) -> Endpoint {
+    let scheduler_service_enclave_attrs = enclave_info
+        .get_enclave_attr("teaclave_scheduler_service")
+        .expect("enclave_info");
+    let scheduler_service_client_config = SgxTrustedTlsClientConfig::new()
+        .attestation_report_verifier(
+            vec![scheduler_service_enclave_attrs],
+            as_root_ca_cert,
+            verifier,
+        );
+    let scheduler_service_address = &advertised_address;
+
+    Endpoint::new(scheduler_service_address).config(scheduler_service_client_config)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@teaclave.apache.org
For additional commands, e-mail: commits-help@teaclave.apache.org