You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@teaclave.apache.org by ms...@apache.org on 2022/01/19 23:32:32 UTC
[incubator-teaclave] branch master updated: Add watchdog functionalities (#600)
This is an automated email from the ASF dual-hosted git repository.
mssun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git
The following commit(s) were added to refs/heads/master by this push:
new 8fdac8c Add watchdog functionalities (#600)
8fdac8c is described below
commit 8fdac8ca29c518bb73483e801dcd07d9b85abec8
Author: Hongbo <12...@users.noreply.github.com>
AuthorDate: Wed Jan 19 15:32:23 2022 -0800
Add watchdog functionalities (#600)
Add executor service watchdog
- executor now heartbeats to send its current status and receive
command from scheduler
- scheduler can now find out lost executor(s) and monitor their status,
and fail the corresponding task(s) when necessary
- users can cancel a task, new API has been added
- new tests for task cancelation and dangling task detection
---
cmake/UtilTargets.cmake | 3 +
cmake/scripts/test.sh | 73 +++++-
examples/python/builtin_gbdt_train.py | 7 +
examples/python/mesapy_deadloop_cancel.py | 83 +++++++
examples/python/mesapy_deadloop_payload.py | 21 ++
sdk/python/teaclave.py | 27 +++
services/execution/enclave/src/lib.rs | 9 +-
services/execution/enclave/src/service.rs | 160 +++++++++----
services/frontend/enclave/src/service.rs | 45 +++-
services/management/enclave/src/service.rs | 68 +++++-
services/proto/src/proto/teaclave_common.proto | 13 +
.../src/proto/teaclave_frontend_service.proto | 8 +-
.../src/proto/teaclave_management_service.proto | 1 +
.../src/proto/teaclave_scheduler_service.proto | 14 +-
services/proto/src/teaclave_common.rs | 105 +++++++++
services/proto/src/teaclave_frontend_service.rs | 49 ++++
services/proto/src/teaclave_management_service.rs | 2 +
services/proto/src/teaclave_scheduler_service.rs | 81 ++++++-
services/scheduler/enclave/src/lib.rs | 17 +-
services/scheduler/enclave/src/service.rs | 262 +++++++++++++++++++--
services/utils/service_app_utils/src/lib.rs | 3 +-
.../functions/gbdt_training/e2e_output_model.enc | Bin 0 -> 204800 bytes
tests/functional/enclave/src/execution_service.rs | 2 +-
tests/functional/enclave/src/frontend_service.rs | 139 ++++++++++-
tests/functional/enclave/src/management_service.rs | 9 +-
tests/functional/enclave/src/scheduler_service.rs | 28 ++-
types/src/error.rs | 2 +
types/src/storage.rs | 2 +
types/src/task.rs | 2 +
types/src/task_state.rs | 99 ++++++++
30 files changed, 1225 insertions(+), 109 deletions(-)
diff --git a/cmake/UtilTargets.cmake b/cmake/UtilTargets.cmake
index f2162d4..5c6cf1c 100644
--- a/cmake/UtilTargets.cmake
+++ b/cmake/UtilTargets.cmake
@@ -73,6 +73,9 @@ if(TEST_MODE)
add_custom_target(
run-sdk-tests COMMAND ${TEACLAVE_COMMON_ENVS}
${MT_SCRIPT_DIR}/test.sh sdk)
+ add_custom_target(
+ run-cancel-test COMMAND ${TEACLAVE_COMMON_ENVS}
+ ${MT_SCRIPT_DIR}/test.sh cancel)
else()
add_custom_target(
run-tests
diff --git a/cmake/scripts/test.sh b/cmake/scripts/test.sh
index 6d6c61a..6308646 100755
--- a/cmake/scripts/test.sh
+++ b/cmake/scripts/test.sh
@@ -97,7 +97,7 @@ run_functional_tests() {
pushd ${TEACLAVE_SERVICE_INSTALL_DIR}
./teaclave_authentication_service &
./teaclave_storage_service &
- sleep 3 # wait for authentication and storage service
+ sleep 10 # wait for authentication and storage service
./teaclave_management_service &
./teaclave_scheduler_service &
sleep 3 # wait for management service and scheduler_service
@@ -165,7 +165,7 @@ run_sdk_tests() {
pushd ${TEACLAVE_SERVICE_INSTALL_DIR}
./teaclave_authentication_service &
./teaclave_storage_service &
- sleep 3 # wait for authentication and storage service
+ sleep 10 # wait for authentication and storage service
./teaclave_management_service &
./teaclave_scheduler_service &
sleep 3 # wait for management service and scheduler_service
@@ -258,6 +258,71 @@ run_examples() {
cleanup
}
+run_cancel_test() {
+ trap cleanup INT TERM ERR
+
+ echo_title "cancel"
+ mkdir -p /tmp/fusion_data
+ pushd ${TEACLAVE_CLI_INSTALL_DIR}
+ ./teaclave_cli verify \
+ --enclave-info ../examples/enclave_info.toml \
+ --public-keys $(find ../examples -name "*.public.pem") \
+ --signatures $(find ../examples -name "*.sign.sha256")
+ popd
+
+ echo "initiating Teaclave with 2 executors..."
+
+ pushd ${TEACLAVE_SERVICE_INSTALL_DIR}
+ ./teaclave_authentication_service &
+ ./teaclave_storage_service &
+ sleep 3 # wait for authentication and storage service
+ ./teaclave_management_service &
+ ./teaclave_scheduler_service &
+ sleep 3 # wait for management service and scheduler_service
+ ./teaclave_access_control_service &
+ ./teaclave_frontend_service &
+ sleep 3 # wait for other services
+
+ start_storage_server
+
+ # Run of execution services separately
+ ./teaclave_execution_service & exe_pid1=$!
+ ./teaclave_execution_service & exe_pid2=$!
+ sleep 10 # wait for execution services
+ popd
+
+ echo "executor 1 pid: $exe_pid1"
+ echo "executor 2 pid: $exe_pid2"
+
+ pushd ${TEACLAVE_PROJECT_ROOT}/examples/python
+ export PYTHONPATH=${TEACLAVE_PROJECT_ROOT}/sdk/python
+ python3 mesapy_deadloop_cancel.py
+ popd
+
+ sleep 3
+
+ live_pids=0
+ if ps -p $exe_pid1 > /dev/null
+ then
+ live_pids=$((live_pids+1))
+ fi
+
+ if ps -p $exe_pid2 > /dev/null
+ then
+ live_pids=$((live_pids+1))
+ fi
+
+ if [ $live_pids -eq 1 ]
+ then
+ echo "only one executor is killed, test passed"
+ else
+ echo "Some unexpected happens, test failed"
+ false
+ fi
+
+ cleanup
+}
+
case "$1" in
"unit")
run_unit_tests
@@ -274,11 +339,15 @@ case "$1" in
"example")
run_examples
;;
+ "cancel")
+ run_cancel_test
+ ;;
*)
run_unit_tests
run_integration_tests
run_functional_tests
run_sdk_tests
run_examples
+ run_cancel_test
;;
esac
diff --git a/examples/python/builtin_gbdt_train.py b/examples/python/builtin_gbdt_train.py
index 2efc9c7..08b4b9a 100644
--- a/examples/python/builtin_gbdt_train.py
+++ b/examples/python/builtin_gbdt_train.py
@@ -103,6 +103,13 @@ class BuiltinGbdtExample:
print("[+] invoking task")
client.invoke_task(task_id)
+ # If you feel like you don't want to wait for the task to finish
+ # or something unexpected happens,
+ # you may uncomment the following lines to cancel the task
+ # time.sleep(3)
+ # print("[+] canceling task")
+ # client.cancel_task(task_id)
+
print("[+] getting result")
result = client.get_task_result(task_id)
print("[+] done")
diff --git a/examples/python/mesapy_deadloop_cancel.py b/examples/python/mesapy_deadloop_cancel.py
new file mode 100644
index 0000000..2fee278
--- /dev/null
+++ b/examples/python/mesapy_deadloop_cancel.py
@@ -0,0 +1,83 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+import time
+
+from teaclave import FunctionInput, FunctionOutput, OwnerList, DataMap
+from utils import USER_ID, USER_PASSWORD, connect_authentication_service, connect_frontend_service, PlatformAdmin
+
+
+class MesaPyEchoExample:
+ def __init__(self, user_id, user_password):
+ self.user_id = user_id
+ self.user_password = user_password
+
+ def deadloop(self, payload_file="mesapy_deadloop_payload.py"):
+ with connect_authentication_service() as client:
+ print(f"[+] {self.user_id} login")
+ token = client.user_login(self.user_id, self.user_password)
+
+ client = connect_frontend_service()
+ metadata = {"id": self.user_id, "token": token}
+ client.metadata = metadata
+
+ print("[+] registering function")
+
+ with open(payload_file, "rb") as f:
+ payload = f.read()
+ function_id = client.register_function(
+ name="mesapy-deadloop",
+ description="A deadloop function to test task cancellation",
+ executor_type="python",
+ payload=list(payload),
+ arguments=[])
+
+ print("[+] creating task")
+ task_id = client.create_task(function_id=function_id,
+ function_arguments={},
+ executor="mesapy")
+
+ print("[+] invoking task")
+ client.invoke_task(task_id)
+
+ print("[+] canceling task")
+ time.sleep(5)
+ client.cancel_task(task_id)
+
+ print("[+] getting result")
+
+ try:
+ result = client.get_task_result(task_id)
+ except Exception as e:
+ print(f"[+] result: {str(e)}")
+ result = str(e)
+
+ return result
+
+
+def main():
+ example = MesaPyEchoExample(USER_ID, USER_PASSWORD)
+ rv = example.deadloop()
+
+ print("[+] function return: ", rv)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/examples/python/mesapy_deadloop_payload.py b/examples/python/mesapy_deadloop_payload.py
new file mode 100644
index 0000000..b5a22e6
--- /dev/null
+++ b/examples/python/mesapy_deadloop_payload.py
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+def entrypoint(argv):
+ while True:
+ pass
diff --git a/sdk/python/teaclave.py b/sdk/python/teaclave.py
index 3e39f99..993d0a7 100644
--- a/sdk/python/teaclave.py
+++ b/sdk/python/teaclave.py
@@ -371,6 +371,13 @@ class InvokeTaskRequest(Request):
self.task_id = task_id
+class CancelTaskRequest(Request):
+ def __init__(self, metadata: Metadata, task_id: str):
+ self.request = "cancel_task"
+ self.metadata = metadata
+ self.task_id = task_id
+
+
class GetTaskRequest(Request):
def __init__(self, metadata: Metadata, task_id: str):
self.request = "get_task"
@@ -557,6 +564,17 @@ class FrontendService(TeaclaveService):
else:
raise TeaclaveException("Failed to invoke task")
+ def cancel_task(self, task_id: str):
+ self.check_metadata()
+ self.check_channel()
+ request = CancelTaskRequest(self.metadata, task_id)
+ _write_message(self.channel, request)
+ response = _read_message(self.channel)
+ if response["result"] == "ok":
+ pass
+ else:
+ raise TeaclaveException("Failed to cancel task")
+
def get_task_result(self, task_id: str):
self.check_metadata()
self.check_channel()
@@ -568,6 +586,15 @@ class FrontendService(TeaclaveService):
if response["result"] != "ok":
raise TeaclaveException("Failed to get task result")
time.sleep(1)
+ if response["content"]["status"] == 20:
+ print(response["content"]["result"]["result"])
+ raise TeaclaveException(
+ "Task Canceled, Error: " +
+ response["content"]["result"]["result"]["Err"]["reason"])
+ if response["content"]["status"] == 99:
+ raise TeaclaveException(
+ "Task Failed, Error: " +
+ response["content"]["result"]["result"]["Err"]["reason"])
if response["content"]["status"] == 10:
break
diff --git a/services/execution/enclave/src/lib.rs b/services/execution/enclave/src/lib.rs
index 3d46869..27b1b9b 100644
--- a/services/execution/enclave/src/lib.rs
+++ b/services/execution/enclave/src/lib.rs
@@ -77,18 +77,17 @@ fn start_service(config: &RuntimeConfig) -> Result<()> {
let mut service =
service::TeaclaveExecutionService::new(scheduler_service_endpoint, fusion_base)?;
- let _ = service.start();
-
- Ok(())
+ service.start()
}
#[handle_ecall]
fn handle_start_service(input: &StartServiceInput) -> TeeServiceResult<StartServiceOutput> {
match start_service(&input.config) {
Ok(_) => Ok(StartServiceOutput),
+ // terminate the enclave for executor
Err(e) => {
- log::error!("Failed to start the service: {}", e);
- Err(TeeServiceError::ServiceError)
+ log::error!("Service shutdown, reason: {}", e);
+ Err(TeeServiceError::EnclaveForceTermination)
}
}
}
diff --git a/services/execution/enclave/src/service.rs b/services/execution/enclave/src/service.rs
index f5eaa9c..0aa7515 100644
--- a/services/execution/enclave/src/service.rs
+++ b/services/execution/enclave/src/service.rs
@@ -18,9 +18,12 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::prelude::v1::*;
+use std::sync::mpsc;
use std::sync::{Arc, SgxMutex as Mutex};
+use std::thread;
use crate::task_file_manager::TaskFileManager;
+use teaclave_proto::teaclave_common::{ExecutorCommand, ExecutorStatus};
use teaclave_proto::teaclave_scheduler_service::*;
use teaclave_rpc::endpoint::Endpoint;
use teaclave_types::*;
@@ -36,6 +39,8 @@ pub(crate) struct TeaclaveExecutionService {
worker: Arc<Worker>,
scheduler_client: Arc<Mutex<TeaclaveSchedulerClient>>,
fusion_base: PathBuf,
+ id: Uuid,
+ status: ExecutorStatus,
}
impl TeaclaveExecutionService {
@@ -61,47 +66,102 @@ impl TeaclaveExecutionService {
worker: Arc::new(Worker::default()),
scheduler_client,
fusion_base: fusion_base.as_ref().to_owned(),
+ id: Uuid::new_v4(),
+ status: ExecutorStatus::Idle,
})
}
pub(crate) fn start(&mut self) -> Result<()> {
+ let (tx, rx) = mpsc::channel();
+ let mut current_task: Arc<Option<StagedTask>> = Arc::new(None);
+ let mut task_handle: Option<thread::JoinHandle<()>> = None;
+
loop {
std::thread::sleep(std::time::Duration::from_secs(3));
- let staged_task = match self.pull_task() {
- Ok(staged_task) => staged_task,
+
+ match self.heartbeat() {
+ Ok(ExecutorCommand::Stop) => {
+ log::info!("Executor {} is stopped", self.id);
+ return Err(anyhow::anyhow!("EnclaveForceTermination"));
+ }
+ Ok(ExecutorCommand::NewTask) if self.status == ExecutorStatus::Idle => {
+ match self.pull_task() {
+ Ok(task) => {
+ self.status = ExecutorStatus::Executing;
+ self.update_task_status(&task.task_id, TaskStatus::Running)?;
+ let tx_task = tx.clone();
+ let fusion_base = self.fusion_base.clone();
+ current_task = Arc::new(Some(task));
+ let task_copy = current_task.clone();
+ let handle = thread::spawn(move || {
+ let result = invoke_task(
+ &task_copy.as_ref().as_ref().unwrap(),
+ &fusion_base,
+ );
+ tx_task.send(result).unwrap();
+ });
+ task_handle = Some(handle);
+ log::info!("Executor {} accepted a new task, executing...", self.id);
+ }
+ Err(e) => {
+ log::error!("Executor {} failed to pull task: {}", self.id, e);
+ }
+ };
+ }
Err(e) => {
- log::debug!("PullTask Error: {:?}", e);
- continue;
+ log::error!("Executor {} failed to heartbeat: {}", self.id, e);
+ return Err(e);
}
- };
-
- let result = self.invoke_task(&staged_task);
- match result {
- Ok(_) => log::debug!(
- "InvokeTask: {:?}, {:?}, success",
- staged_task.user_id,
- staged_task.function_id
- ),
- Err(_) => log::debug!(
- "InvokeTask: {:?}, {:?}, failure",
- staged_task.user_id,
- staged_task.function_id
- ),
+ _ => {}
}
- log::debug!("InvokeTask result: {:?}", result);
- match self.update_task_result(&staged_task.task_id, result) {
- Ok(_) => (),
- Err(e) => {
- log::error!("UpdateResult Error: {:?}", e);
- continue;
+ match rx.try_recv() {
+ Ok(result) => {
+ let task_unwrapped = current_task.as_ref().as_ref().unwrap();
+ match result {
+ Ok(_) => log::debug!(
+ "InvokeTask: {:?}, {:?}, success",
+ task_unwrapped.task_id,
+ task_unwrapped.function_id
+ ),
+ Err(_) => log::debug!(
+ "InvokeTask: {:?}, {:?}, failure",
+ task_unwrapped.task_id,
+ task_unwrapped.function_id
+ ),
+ }
+ log::debug!("InvokeTask result: {:?}", result);
+ let task_copy = current_task.clone();
+ match self
+ .update_task_result(&task_copy.as_ref().as_ref().unwrap().task_id, result)
+ {
+ Ok(_) => (),
+ Err(e) => {
+ log::error!("UpdateResult Error: {:?}", e);
+ continue;
+ }
+ }
+ current_task = Arc::new(None);
+ task_handle.unwrap().join().unwrap();
+ task_handle = None;
+ self.status = ExecutorStatus::Idle;
}
+ Err(mpsc::TryRecvError::Disconnected) => {
+ log::error!(
+ "Executor {} failed to receive, sender disconnected",
+ self.id
+ );
+ }
+ // received nothing
+ Err(_) => {}
}
}
}
fn pull_task(&mut self) -> Result<StagedTask> {
- let request = PullTaskRequest {};
+ let request = PullTaskRequest {
+ executor_id: self.id,
+ };
let response = self
.scheduler_client
.clone()
@@ -113,25 +173,20 @@ impl TeaclaveExecutionService {
Ok(response.staged_task)
}
- fn invoke_task(&mut self, task: &StagedTask) -> Result<TaskOutputs> {
- self.update_task_status(&task.task_id, TaskStatus::Running)?;
-
- let file_mgr = TaskFileManager::new(
- WORKER_BASE_DIR,
- &self.fusion_base,
- &task.task_id,
- &task.input_data,
- &task.output_data,
- )?;
- let invocation = prepare_task(&task, &file_mgr)?;
-
- log::debug!("Invoke function: {:?}", invocation);
- let worker = Worker::default();
- let summary = worker.invoke_function(invocation)?;
+ fn heartbeat(&mut self) -> Result<ExecutorCommand> {
+ let request = HeartbeatRequest {
+ executor_id: self.id,
+ status: self.status,
+ };
+ let response = self
+ .scheduler_client
+ .clone()
+ .lock()
+ .map_err(|_| anyhow::anyhow!("Cannot lock scheduler client"))?
+ .heartbeat(request)?;
- let outputs_tag = finalize_task(&file_mgr)?;
- let task_outputs = TaskOutputs::new(summary.as_bytes(), outputs_tag);
- Ok(task_outputs)
+ log::debug!("heartbeat_with_result response: {:?}", response);
+ Ok(response.command)
}
fn update_task_result(
@@ -164,6 +219,25 @@ impl TeaclaveExecutionService {
}
}
+fn invoke_task(task: &StagedTask, fusion_base: &PathBuf) -> Result<TaskOutputs> {
+ let file_mgr = TaskFileManager::new(
+ WORKER_BASE_DIR,
+ fusion_base,
+ &task.task_id,
+ &task.input_data,
+ &task.output_data,
+ )?;
+ let invocation = prepare_task(&task, &file_mgr)?;
+
+ log::debug!("Invoke function: {:?}", invocation);
+ let worker = Worker::default();
+ let summary = worker.invoke_function(invocation)?;
+
+ let outputs_tag = finalize_task(&file_mgr)?;
+ let task_outputs = TaskOutputs::new(summary.as_bytes(), outputs_tag);
+ Ok(task_outputs)
+}
+
fn prepare_task(task: &StagedTask, file_mgr: &TaskFileManager) -> Result<StagedFunction> {
let input_files = file_mgr.prepare_staged_inputs()?;
let output_files = file_mgr.prepare_staged_outputs()?;
diff --git a/services/frontend/enclave/src/service.rs b/services/frontend/enclave/src/service.rs
index d6d2b08..bd0d3bc 100644
--- a/services/frontend/enclave/src/service.rs
+++ b/services/frontend/enclave/src/service.rs
@@ -27,16 +27,16 @@ use teaclave_proto::teaclave_authentication_service::{
use teaclave_proto::teaclave_common::UserCredential;
use teaclave_proto::teaclave_frontend_service::{
ApproveTaskRequest, ApproveTaskResponse, AssignDataRequest, AssignDataResponse,
- CreateTaskRequest, CreateTaskResponse, DeleteFunctionRequest, DeleteFunctionResponse,
- GetFunctionRequest, GetFunctionResponse, GetInputFileRequest, GetInputFileResponse,
- GetOutputFileRequest, GetOutputFileResponse, GetTaskRequest, GetTaskResponse,
- InvokeTaskRequest, InvokeTaskResponse, ListFunctionsRequest, ListFunctionsResponse,
- RegisterFunctionRequest, RegisterFunctionResponse, RegisterFusionOutputRequest,
- RegisterFusionOutputResponse, RegisterInputFileRequest, RegisterInputFileResponse,
- RegisterInputFromOutputRequest, RegisterInputFromOutputResponse, RegisterOutputFileRequest,
- RegisterOutputFileResponse, TeaclaveFrontend, UpdateFunctionRequest, UpdateFunctionResponse,
- UpdateInputFileRequest, UpdateInputFileResponse, UpdateOutputFileRequest,
- UpdateOutputFileResponse,
+ CancelTaskRequest, CancelTaskResponse, CreateTaskRequest, CreateTaskResponse,
+ DeleteFunctionRequest, DeleteFunctionResponse, GetFunctionRequest, GetFunctionResponse,
+ GetInputFileRequest, GetInputFileResponse, GetOutputFileRequest, GetOutputFileResponse,
+ GetTaskRequest, GetTaskResponse, InvokeTaskRequest, InvokeTaskResponse, ListFunctionsRequest,
+ ListFunctionsResponse, RegisterFunctionRequest, RegisterFunctionResponse,
+ RegisterFusionOutputRequest, RegisterFusionOutputResponse, RegisterInputFileRequest,
+ RegisterInputFileResponse, RegisterInputFromOutputRequest, RegisterInputFromOutputResponse,
+ RegisterOutputFileRequest, RegisterOutputFileResponse, TeaclaveFrontend, UpdateFunctionRequest,
+ UpdateFunctionResponse, UpdateInputFileRequest, UpdateInputFileResponse,
+ UpdateOutputFileRequest, UpdateOutputFileResponse,
};
use teaclave_proto::teaclave_management_service::TeaclaveManagementClient;
use teaclave_rpc::endpoint::Endpoint;
@@ -58,10 +58,22 @@ macro_rules! authentication_and_forward_to_management {
if authorize(&claims, $endpoint) {
claims
} else {
+ log::debug!(
+ "User is not authorized to access endpoint: {}, func: {}",
+ stringify!($endpoint),
+ stringify!($func)
+ );
bail!(TeaclaveFrontendError::AuthenticationError);
}
}
- _ => bail!(TeaclaveFrontendError::AuthenticationError),
+ _ => {
+ log::debug!(
+ "User is not authenticated to access endpoint: {}, func: {}",
+ stringify!($endpoint),
+ stringify!($func)
+ );
+ bail!(TeaclaveFrontendError::AuthenticationError)
+ }
};
let client = $service.management_client.clone();
@@ -101,6 +113,7 @@ enum Endpoints {
AssignData,
ApproveTask,
InvokeTask,
+ CancelTask,
}
fn authorize(claims: &UserAuthClaims, request: Endpoints) -> bool {
@@ -129,7 +142,8 @@ fn authorize(claims: &UserAuthClaims, request: Endpoints) -> bool {
| Endpoints::GetTask
| Endpoints::AssignData
| Endpoints::ApproveTask
- | Endpoints::InvokeTask => role.is_data_owner(),
+ | Endpoints::InvokeTask
+ | Endpoints::CancelTask => role.is_data_owner(),
Endpoints::GetFunction | Endpoints::ListFunctions => {
role.is_function_owner() || role.is_data_owner()
}
@@ -375,6 +389,13 @@ impl TeaclaveFrontend for TeaclaveFrontendService {
) -> TeaclaveServiceResponseResult<InvokeTaskResponse> {
authentication_and_forward_to_management!(self, request, invoke_task, Endpoints::InvokeTask)
}
+
+ fn cancel_task(
+ &self,
+ request: Request<CancelTaskRequest>,
+ ) -> TeaclaveServiceResponseResult<CancelTaskResponse> {
+ authentication_and_forward_to_management!(self, request, cancel_task, Endpoints::CancelTask)
+ }
}
impl TeaclaveFrontendService {
diff --git a/services/management/enclave/src/service.rs b/services/management/enclave/src/service.rs
index a51dd6b..0293450 100644
--- a/services/management/enclave/src/service.rs
+++ b/services/management/enclave/src/service.rs
@@ -23,16 +23,16 @@ use std::prelude::v1::*;
use std::sync::{Arc, SgxMutex as Mutex};
use teaclave_proto::teaclave_frontend_service::{
ApproveTaskRequest, ApproveTaskResponse, AssignDataRequest, AssignDataResponse,
- CreateTaskRequest, CreateTaskResponse, DeleteFunctionRequest, DeleteFunctionResponse,
- GetFunctionRequest, GetFunctionResponse, GetInputFileRequest, GetInputFileResponse,
- GetOutputFileRequest, GetOutputFileResponse, GetTaskRequest, GetTaskResponse,
- InvokeTaskRequest, InvokeTaskResponse, ListFunctionsRequest, ListFunctionsResponse,
- RegisterFunctionRequest, RegisterFunctionResponse, RegisterFusionOutputRequest,
- RegisterFusionOutputResponse, RegisterInputFileRequest, RegisterInputFileResponse,
- RegisterInputFromOutputRequest, RegisterInputFromOutputResponse, RegisterOutputFileRequest,
- RegisterOutputFileResponse, UpdateFunctionRequest, UpdateFunctionResponse,
- UpdateInputFileRequest, UpdateInputFileResponse, UpdateOutputFileRequest,
- UpdateOutputFileResponse,
+ CancelTaskRequest, CancelTaskResponse, CreateTaskRequest, CreateTaskResponse,
+ DeleteFunctionRequest, DeleteFunctionResponse, GetFunctionRequest, GetFunctionResponse,
+ GetInputFileRequest, GetInputFileResponse, GetOutputFileRequest, GetOutputFileResponse,
+ GetTaskRequest, GetTaskResponse, InvokeTaskRequest, InvokeTaskResponse, ListFunctionsRequest,
+ ListFunctionsResponse, RegisterFunctionRequest, RegisterFunctionResponse,
+ RegisterFusionOutputRequest, RegisterFusionOutputResponse, RegisterInputFileRequest,
+ RegisterInputFileResponse, RegisterInputFromOutputRequest, RegisterInputFromOutputResponse,
+ RegisterOutputFileRequest, RegisterOutputFileResponse, UpdateFunctionRequest,
+ UpdateFunctionResponse, UpdateInputFileRequest, UpdateInputFileResponse,
+ UpdateOutputFileRequest, UpdateOutputFileResponse,
};
use teaclave_proto::teaclave_management_service::TeaclaveManagement;
use teaclave_proto::teaclave_storage_service::{
@@ -630,6 +630,54 @@ impl TeaclaveManagement for TeaclaveManagementService {
Ok(InvokeTaskResponse)
}
+
+ // access_control:
+ // 1) user_id == task.creator
+ // 2) user_role == admin
+ fn cancel_task(
+ &self,
+ request: Request<CancelTaskRequest>,
+ ) -> TeaclaveServiceResponseResult<CancelTaskResponse> {
+ let user_id = self.get_request_user_id(request.metadata())?;
+ let role = self.get_request_role(request.metadata())?;
+ let request = request.message;
+
+ let ts: TaskState = self
+ .read_from_db(&request.task_id)
+ .map_err(|_| TeaclaveManagementServiceError::PermissionDenied)?;
+
+ match role {
+ UserRole::PlatformAdmin => {}
+ _ => {
+ ensure!(
+ ts.has_creator(&user_id),
+ TeaclaveManagementServiceError::PermissionDenied
+ );
+ }
+ }
+
+ match ts.status {
+ // need scheduler to cancel the task
+ TaskStatus::Staged | TaskStatus::Running => {
+ self.enqueue_to_db(CANCEL_QUEUE_KEY.as_bytes(), &ts)?;
+ }
+ _ => {
+ // race will not affect correctness/privacy
+ let task: Task<Cancel> = ts.try_into().map_err(|e| {
+ log::warn!("Cancel state error: {:?}", e);
+ TeaclaveManagementServiceError::PermissionDenied
+ })?;
+
+ log::debug!("Canceled Task: {:?}", task);
+
+ let ts: TaskState = task.into();
+ self.write_to_db(&ts)
+ .map_err(|_| TeaclaveManagementServiceError::StorageError)?;
+ }
+ }
+
+ Ok(CancelTaskResponse)
+ }
}
impl TeaclaveManagementService {
diff --git a/services/proto/src/proto/teaclave_common.proto b/services/proto/src/proto/teaclave_common.proto
index b2bc14f..4a7e88e 100644
--- a/services/proto/src/proto/teaclave_common.proto
+++ b/services/proto/src/proto/teaclave_common.proto
@@ -48,6 +48,19 @@ enum TaskStatus {
Staged = 3;
Running = 4;
Finished = 10;
+ Canceled = 20;
+ Failed = 99;
+}
+
+enum ExecutorStatus {
+ Idle = 0;
+ Executing = 1;
+}
+
+enum ExecutorCommand {
+ NoAction = 0;
+ Stop = 1;
+ NewTask = 2;
}
message TaskResult {
diff --git a/services/proto/src/proto/teaclave_frontend_service.proto b/services/proto/src/proto/teaclave_frontend_service.proto
index 8fcfba0..1dfb038 100644
--- a/services/proto/src/proto/teaclave_frontend_service.proto
+++ b/services/proto/src/proto/teaclave_frontend_service.proto
@@ -232,6 +232,12 @@ message InvokeTaskRequest {
message InvokeTaskResponse { }
+message CancelTaskRequest {
+ string task_id = 1;
+}
+
+message CancelTaskResponse { }
+
service TeaclaveFrontend {
rpc RegisterInputFile (RegisterInputFileRequest) returns (RegisterInputFileResponse);
rpc RegisterOutputFile (RegisterOutputFileRequest) returns (RegisterOutputFileResponse);
@@ -251,5 +257,5 @@ service TeaclaveFrontend {
rpc AssignData (AssignDataRequest) returns (AssignDataResponse);
rpc ApproveTask (ApproveTaskRequest) returns (ApproveTaskResponse);
rpc InvokeTask (InvokeTaskRequest) returns (InvokeTaskResponse);
-
+ rpc CancelTask (CancelTaskRequest) returns (CancelTaskResponse);
}
diff --git a/services/proto/src/proto/teaclave_management_service.proto b/services/proto/src/proto/teaclave_management_service.proto
index 2a71fd4..82c50a4 100644
--- a/services/proto/src/proto/teaclave_management_service.proto
+++ b/services/proto/src/proto/teaclave_management_service.proto
@@ -43,4 +43,5 @@ service TeaclaveManagement {
rpc AssignData (teaclave_frontend_service_proto.AssignDataRequest) returns (teaclave_frontend_service_proto.AssignDataResponse);
rpc ApproveTask (teaclave_frontend_service_proto.ApproveTaskRequest) returns (teaclave_frontend_service_proto.ApproveTaskResponse);
rpc InvokeTask (teaclave_frontend_service_proto.InvokeTaskRequest) returns (teaclave_frontend_service_proto.InvokeTaskResponse);
+ rpc CancelTask (teaclave_frontend_service_proto.CancelTaskRequest) returns (teaclave_frontend_service_proto.CancelTaskResponse);
}
diff --git a/services/proto/src/proto/teaclave_scheduler_service.proto b/services/proto/src/proto/teaclave_scheduler_service.proto
index f0b0771..9f9f3ad 100644
--- a/services/proto/src/proto/teaclave_scheduler_service.proto
+++ b/services/proto/src/proto/teaclave_scheduler_service.proto
@@ -28,7 +28,17 @@ message SubscribeResponse {
bool success = 1;
}
-message PullTaskRequest {}
+message HeartbeatRequest {
+ string executor_id = 1;
+ teaclave_common_proto.ExecutorStatus status = 2;
+}
+message HeartbeatResponse {
+ teaclave_common_proto.ExecutorCommand command = 1;
+}
+
+message PullTaskRequest {
+ string executor_id = 1;
+}
message PullTaskResponse {
bytes staged_task = 1;
}
@@ -60,4 +70,6 @@ service TeaclaveScheduler {
rpc UpdateTaskStatus(UpdateTaskStatusRequest) returns (UpdateTaskStatusResponse);
rpc UpdateTaskResult(UpdateTaskResultRequest) returns (UpdateTaskResultResponse);
+
+ rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
}
diff --git a/services/proto/src/teaclave_common.rs b/services/proto/src/teaclave_common.rs
index 5af0c61..5937220 100644
--- a/services/proto/src/teaclave_common.rs
+++ b/services/proto/src/teaclave_common.rs
@@ -111,6 +111,8 @@ pub fn i32_to_task_status(status: i32) -> Result<TaskStatus> {
Some(proto::TaskStatus::Staged) => TaskStatus::Staged,
Some(proto::TaskStatus::Running) => TaskStatus::Running,
Some(proto::TaskStatus::Finished) => TaskStatus::Finished,
+ Some(proto::TaskStatus::Failed) => TaskStatus::Failed,
+ Some(proto::TaskStatus::Canceled) => TaskStatus::Canceled,
None => bail!("invalid task status"),
};
Ok(ret)
@@ -124,6 +126,8 @@ pub fn i32_from_task_status(status: TaskStatus) -> i32 {
TaskStatus::Staged => proto::TaskStatus::Staged as i32,
TaskStatus::Running => proto::TaskStatus::Running as i32,
TaskStatus::Finished => proto::TaskStatus::Finished as i32,
+ TaskStatus::Failed => proto::TaskStatus::Failed as i32,
+ TaskStatus::Canceled => proto::TaskStatus::Canceled as i32,
}
}
@@ -194,3 +198,104 @@ impl std::convert::From<TaskResult> for proto::TaskResult {
proto::TaskResult { result: opt_result }
}
}
+
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub enum ExecutorStatus {
+ Idle,
+ Executing,
+}
+
+impl std::convert::TryFrom<proto::ExecutorStatus> for ExecutorStatus {
+ type Error = Error;
+ fn try_from(status: proto::ExecutorStatus) -> Result<Self> {
+ match status {
+ proto::ExecutorStatus::Idle => Ok(ExecutorStatus::Idle),
+ proto::ExecutorStatus::Executing => Ok(ExecutorStatus::Executing),
+ }
+ }
+}
+
+impl std::convert::TryFrom<i32> for ExecutorStatus {
+ type Error = Error;
+ fn try_from(status: i32) -> Result<Self> {
+ match proto::ExecutorStatus::from_i32(status) {
+ Some(proto::ExecutorStatus::Idle) => Ok(ExecutorStatus::Idle),
+ Some(proto::ExecutorStatus::Executing) => Ok(ExecutorStatus::Executing),
+ _ => bail!("invalid executor status"),
+ }
+ }
+}
+
+impl std::convert::From<ExecutorStatus> for i32 {
+ fn from(status: ExecutorStatus) -> Self {
+ match status {
+ ExecutorStatus::Idle => proto::ExecutorStatus::Idle as i32,
+ ExecutorStatus::Executing => proto::ExecutorStatus::Executing as i32,
+ }
+ }
+}
+
+impl std::convert::From<ExecutorStatus> for proto::ExecutorStatus {
+ fn from(status: ExecutorStatus) -> Self {
+ match status {
+ ExecutorStatus::Idle => proto::ExecutorStatus::Idle,
+ ExecutorStatus::Executing => proto::ExecutorStatus::Executing,
+ }
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum ExecutorCommand {
+ NoAction,
+ Stop,
+ NewTask,
+}
+
+impl Default for ExecutorCommand {
+ fn default() -> Self {
+ ExecutorCommand::NoAction
+ }
+}
+
+impl std::convert::TryFrom<proto::ExecutorCommand> for ExecutorCommand {
+ type Error = Error;
+ fn try_from(command: proto::ExecutorCommand) -> Result<Self> {
+ match command {
+ proto::ExecutorCommand::NoAction => Ok(ExecutorCommand::NoAction),
+ proto::ExecutorCommand::Stop => Ok(ExecutorCommand::Stop),
+ proto::ExecutorCommand::NewTask => Ok(ExecutorCommand::NewTask),
+ }
+ }
+}
+
+impl std::convert::From<ExecutorCommand> for proto::ExecutorCommand {
+ fn from(command: ExecutorCommand) -> Self {
+ match command {
+ ExecutorCommand::NoAction => proto::ExecutorCommand::NoAction,
+ ExecutorCommand::Stop => proto::ExecutorCommand::Stop,
+ ExecutorCommand::NewTask => proto::ExecutorCommand::NewTask,
+ }
+ }
+}
+
+impl std::convert::TryFrom<i32> for ExecutorCommand {
+ type Error = Error;
+ fn try_from(command: i32) -> Result<Self> {
+ match proto::ExecutorCommand::from_i32(command) {
+ Some(proto::ExecutorCommand::NoAction) => Ok(ExecutorCommand::NoAction),
+ Some(proto::ExecutorCommand::Stop) => Ok(ExecutorCommand::Stop),
+ Some(proto::ExecutorCommand::NewTask) => Ok(ExecutorCommand::NewTask),
+ _ => bail!("invalid executor status"),
+ }
+ }
+}
+
+impl std::convert::From<ExecutorCommand> for i32 {
+ fn from(command: ExecutorCommand) -> Self {
+ match command {
+ ExecutorCommand::NoAction => proto::ExecutorCommand::NoAction as i32,
+ ExecutorCommand::Stop => proto::ExecutorCommand::Stop as i32,
+ ExecutorCommand::NewTask => proto::ExecutorCommand::NewTask as i32,
+ }
+ }
+}
diff --git a/services/proto/src/teaclave_frontend_service.rs b/services/proto/src/teaclave_frontend_service.rs
index 7756c41..ba5d870 100644
--- a/services/proto/src/teaclave_frontend_service.rs
+++ b/services/proto/src/teaclave_frontend_service.rs
@@ -702,6 +702,22 @@ impl InvokeTaskRequest {
#[derive(Debug)]
pub struct InvokeTaskResponse;
+#[into_request(TeaclaveManagementRequest::CancelTask)]
+#[into_request(TeaclaveFrontendRequest::CancelTask)]
+#[derive(Debug)]
+pub struct CancelTaskRequest {
+ pub task_id: ExternalID,
+}
+
+impl CancelTaskRequest {
+ pub fn new(task_id: ExternalID) -> Self {
+ Self { task_id }
+ }
+}
+
+#[derive(Debug)]
+pub struct CancelTaskResponse;
+
impl std::convert::TryFrom<proto::RegisterInputFileRequest> for RegisterInputFileRequest {
type Error = Error;
@@ -1664,3 +1680,36 @@ impl From<InvokeTaskResponse> for proto::InvokeTaskResponse {
Self {}
}
}
+
+impl std::convert::TryFrom<proto::CancelTaskRequest> for CancelTaskRequest {
+ type Error = Error;
+
+ fn try_from(proto: proto::CancelTaskRequest) -> Result<Self> {
+ let task_id = proto.task_id.try_into()?;
+ let ret = Self { task_id };
+
+ Ok(ret)
+ }
+}
+
+impl From<CancelTaskRequest> for proto::CancelTaskRequest {
+ fn from(request: CancelTaskRequest) -> Self {
+ Self {
+ task_id: request.task_id.to_string(),
+ }
+ }
+}
+
+impl std::convert::TryFrom<proto::CancelTaskResponse> for CancelTaskResponse {
+ type Error = Error;
+
+ fn try_from(_proto: proto::CancelTaskResponse) -> Result<Self> {
+ Ok(CancelTaskResponse)
+ }
+}
+
+impl From<CancelTaskResponse> for proto::CancelTaskResponse {
+ fn from(_response: CancelTaskResponse) -> Self {
+ Self {}
+ }
+}
diff --git a/services/proto/src/teaclave_management_service.rs b/services/proto/src/teaclave_management_service.rs
index 4c4169d..c868d59 100644
--- a/services/proto/src/teaclave_management_service.rs
+++ b/services/proto/src/teaclave_management_service.rs
@@ -66,3 +66,5 @@ pub type ApproveTaskRequest = crate::teaclave_frontend_service::ApproveTaskReque
pub type ApproveTaskResponse = crate::teaclave_frontend_service::ApproveTaskResponse;
pub type InvokeTaskRequest = crate::teaclave_frontend_service::InvokeTaskRequest;
pub type InvokeTaskResponse = crate::teaclave_frontend_service::InvokeTaskResponse;
+pub type CancelTaskRequest = crate::teaclave_frontend_service::CancelTaskRequest;
+pub type CancelTaskResponse = crate::teaclave_frontend_service::CancelTaskResponse;
diff --git a/services/proto/src/teaclave_scheduler_service.rs b/services/proto/src/teaclave_scheduler_service.rs
index 585a9c4..afcd1e6 100644
--- a/services/proto/src/teaclave_scheduler_service.rs
+++ b/services/proto/src/teaclave_scheduler_service.rs
@@ -21,7 +21,9 @@
use std::collections::HashMap;
use std::prelude::v1::*;
-use crate::teaclave_common::{i32_from_task_status, i32_to_task_status};
+use crate::teaclave_common::{
+ i32_from_task_status, i32_to_task_status, ExecutorCommand, ExecutorStatus,
+};
use crate::teaclave_scheduler_service_proto as proto;
use anyhow::{Error, Result};
use core::convert::TryInto;
@@ -42,7 +44,9 @@ pub struct SubscribeResponse {
}
#[into_request(TeaclaveSchedulerRequest::PullTask)]
-pub struct PullTaskRequest {}
+pub struct PullTaskRequest {
+ pub executor_id: Uuid,
+}
#[into_request(TeaclaveSchedulerResponse::PullTask)]
#[derive(Debug)]
@@ -50,6 +54,33 @@ pub struct PullTaskResponse {
pub staged_task: StagedTask,
}
+#[into_request(TeaclaveSchedulerRequest::Heartbeat)]
+pub struct HeartbeatRequest {
+ pub executor_id: Uuid,
+ pub status: ExecutorStatus,
+}
+
+impl HeartbeatRequest {
+ pub fn new(executor_id: Uuid, status: ExecutorStatus) -> Self {
+ Self {
+ executor_id,
+ status,
+ }
+ }
+}
+
+#[into_request(TeaclaveSchedulerResponse::Heartbeat)]
+#[derive(Debug)]
+pub struct HeartbeatResponse {
+ pub command: ExecutorCommand,
+}
+
+impl HeartbeatResponse {
+ pub fn new(command: ExecutorCommand) -> Self {
+ Self { command }
+ }
+}
+
impl PullTaskResponse {
pub fn new(staged_task: StagedTask) -> Self {
Self { staged_task }
@@ -140,14 +171,16 @@ impl std::convert::From<SubscribeResponse> for proto::SubscribeResponse {
impl std::convert::TryFrom<proto::PullTaskRequest> for PullTaskRequest {
type Error = Error;
fn try_from(proto: proto::PullTaskRequest) -> Result<Self> {
- let ret = Self {};
+ let executor_id = Uuid::parse_str(&proto.executor_id)?;
+ let ret = Self { executor_id };
Ok(ret)
}
}
impl std::convert::From<PullTaskRequest> for proto::PullTaskRequest {
fn from(req: PullTaskRequest) -> Self {
- proto::PullTaskRequest {}
+ let executor_id = req.executor_id.to_string();
+ proto::PullTaskRequest { executor_id }
}
}
@@ -168,6 +201,46 @@ impl std::convert::From<PullTaskResponse> for proto::PullTaskResponse {
}
}
+impl std::convert::TryFrom<proto::HeartbeatRequest> for HeartbeatRequest {
+ type Error = Error;
+ fn try_from(proto: proto::HeartbeatRequest) -> Result<Self> {
+ let executor_id = Uuid::parse_str(&proto.executor_id)?;
+ let status = proto.status.try_into()?;
+ let ret = Self {
+ executor_id,
+ status,
+ };
+ Ok(ret)
+ }
+}
+
+impl std::convert::From<HeartbeatRequest> for proto::HeartbeatRequest {
+ fn from(req: HeartbeatRequest) -> Self {
+ let executor_id = req.executor_id.to_string();
+ proto::HeartbeatRequest {
+ executor_id,
+ status: req.status.into(),
+ }
+ }
+}
+
+impl std::convert::TryFrom<proto::HeartbeatResponse> for HeartbeatResponse {
+ type Error = Error;
+ fn try_from(proto: proto::HeartbeatResponse) -> Result<Self> {
+ let command = proto.command.try_into()?;
+ let ret = Self { command };
+ Ok(ret)
+ }
+}
+
+impl std::convert::From<HeartbeatResponse> for proto::HeartbeatResponse {
+ fn from(req: HeartbeatResponse) -> Self {
+ proto::HeartbeatResponse {
+ command: req.command.into(),
+ }
+ }
+}
+
impl std::convert::TryFrom<proto::UpdateTaskResultRequest> for UpdateTaskResultRequest {
type Error = Error;
fn try_from(proto: proto::UpdateTaskResultRequest) -> Result<Self> {
diff --git a/services/scheduler/enclave/src/lib.rs b/services/scheduler/enclave/src/lib.rs
index 1285fa3..202ab14 100644
--- a/services/scheduler/enclave/src/lib.rs
+++ b/services/scheduler/enclave/src/lib.rs
@@ -22,6 +22,7 @@ extern crate sgx_tstd as std;
#[cfg(feature = "mesalock_sgx")]
use std::prelude::v1::*;
+use std::sync::{Arc, SgxMutex as Mutex};
#[macro_use]
extern crate log;
@@ -96,15 +97,29 @@ fn start_service(config: &RuntimeConfig) -> Result<()> {
)?;
info!(" Starting Scheduler: setup storage endpoint finished ...");
- let service = service::TeaclaveSchedulerService::new(storage_service_endpoint)?;
+ let service_resources = service::TeaclaveSchedulerResources::new(storage_service_endpoint)?;
+
+ let service_resources = Arc::new(Mutex::new(service_resources));
+
+ let service = service::TeaclaveSchedulerService::new(&service_resources);
+
+ let deamon = service::TeaclaveSchedulerDeamon::new(&service_resources);
+
+ let deamon_handle = std::thread::spawn(move || {
+ let _ = deamon.run();
+ });
info!(" Starting Scheduler: start listening ...");
+
match server.start(service) {
Ok(_) => (),
Err(e) => {
error!("Service exit, error: {}.", e);
}
}
+
+ deamon_handle.join().unwrap();
+
Ok(())
}
diff --git a/services/scheduler/enclave/src/service.rs b/services/scheduler/enclave/src/service.rs
index f7beff6..0f835b0 100644
--- a/services/scheduler/enclave/src/service.rs
+++ b/services/scheduler/enclave/src/service.rs
@@ -17,11 +17,14 @@
use crate::error::TeaclaveSchedulerError;
-use std::collections::VecDeque;
+use std::collections::{HashMap, HashSet, VecDeque};
use std::convert::TryInto;
use std::prelude::v1::*;
use std::sync::{Arc, SgxMutex as Mutex};
+use std::time::{Duration, SystemTime};
+use std::untrusted::time::SystemTimeEx;
+use teaclave_proto::teaclave_common::{ExecutorCommand, ExecutorStatus};
use teaclave_proto::teaclave_scheduler_service::*;
use teaclave_proto::teaclave_storage_service::*;
use teaclave_rpc::endpoint::Endpoint;
@@ -33,14 +36,114 @@ use uuid::Uuid;
use anyhow::anyhow;
use anyhow::Result;
+const EXECUTOR_TIMEOUT_SECS: u64 = 30;
+
#[teaclave_service(teaclave_scheduler_service, TeaclaveScheduler, TeaclaveSchedulerError)]
#[derive(Clone)]
pub(crate) struct TeaclaveSchedulerService {
+ resources: Arc<Mutex<TeaclaveSchedulerResources>>,
+}
+
+pub struct TeaclaveSchedulerResources {
storage_client: Arc<Mutex<TeaclaveStorageClient>>,
- task_queue: Arc<Mutex<VecDeque<StagedTask>>>,
+ // map executor_id to task_id
+ task_queue: VecDeque<StagedTask>,
+ executors_tasks: HashMap<Uuid, Uuid>,
+ executors_last_heartbeat: HashMap<Uuid, SystemTime>,
+ executors_status: HashMap<Uuid, ExecutorStatus>,
+ tasks_to_cancel: HashSet<Uuid>,
+}
+
+pub struct TeaclaveSchedulerDeamon {
+ resources: Arc<Mutex<TeaclaveSchedulerResources>>,
+}
+
+impl TeaclaveSchedulerDeamon {
+ pub fn run(&self) -> Result<()> {
+ loop {
+ std::thread::sleep(std::time::Duration::from_secs(2));
+
+ let mut resources = self
+ .resources
+ .lock()
+ .map_err(|_| anyhow!("Cannot lock scheduler resources"))?;
+
+ let key = StagedTask::get_queue_key().as_bytes();
+
+ log::debug!("Pulling task/cancel queue");
+ while let Ok(canceled_task) = resources.pull_cancel_queue() {
+ resources.tasks_to_cancel.insert(canceled_task.task_id);
+ }
+
+ while let Ok(staged_task) = resources.pull_staged_task::<StagedTask>(key) {
+ log::debug!("deamon: Pulled staged task: {:?}", staged_task);
+ resources.task_queue.push_back(staged_task);
+ }
+
+ let current_time = SystemTime::now();
+ let mut to_remove = Vec::new();
+ for (executor_id, last_heartbeat) in resources.executors_last_heartbeat.iter() {
+ if current_time
+ .duration_since(*last_heartbeat)
+ .unwrap_or(Duration::from_secs(EXECUTOR_TIMEOUT_SECS + 1))
+ > Duration::from_secs(EXECUTOR_TIMEOUT_SECS)
+ {
+ // executor lost
+ to_remove.push(*executor_id);
+ log::warn!("Executor {} lost", executor_id);
+ }
+ }
+
+ for executor_id in to_remove {
+ resources.executors_last_heartbeat.remove(&executor_id);
+ resources.executors_status.remove(&executor_id);
+ match resources.executors_tasks.remove(&executor_id) {
+ Some(task_id) => {
+ // report task faliure
+ let ts = resources.get_task_state(&task_id)?;
+ if ts.is_ended() {
+ continue;
+ }
+
+ log::warn!("Executor {} lost, canceling task {}", executor_id, task_id);
+
+ let mut task: Task<Fail> = ts.try_into()?;
+
+ log::debug!("Task failed because of Executor lost: Task {:?}", task);
+ // Only TaskStatus::Running/Staged is allowed here.
+ let result_err =
+ TaskResult::Err(TaskFailure::new("Runtime Error: Executor Timeout"));
+
+ // Updating task result means we have finished execution
+ task.update_result(result_err)?;
+
+ let ts = TaskState::from(task);
+ resources.put_into_db(&ts)?;
+ }
+ None => {}
+ }
+ }
+ }
+ }
}
impl TeaclaveSchedulerService {
+ pub fn new(resources: &Arc<Mutex<TeaclaveSchedulerResources>>) -> Self {
+ Self {
+ resources: resources.clone(),
+ }
+ }
+}
+
+impl TeaclaveSchedulerDeamon {
+ pub fn new(resources: &Arc<Mutex<TeaclaveSchedulerResources>>) -> Self {
+ Self {
+ resources: resources.clone(),
+ }
+ }
+}
+
+impl TeaclaveSchedulerResources {
pub(crate) fn new(storage_service_endpoint: Endpoint) -> Result<Self> {
let mut i = 0;
let channel = loop {
@@ -55,13 +158,22 @@ impl TeaclaveSchedulerService {
std::thread::sleep(std::time::Duration::from_secs(3));
};
let storage_client = Arc::new(Mutex::new(TeaclaveStorageClient::new(channel)?));
- let task_queue = Arc::new(Mutex::new(VecDeque::new()));
- let service = Self {
+ let task_queue = VecDeque::new();
+ let executors_tasks = HashMap::new();
+ let executors_status = HashMap::new();
+ let tasks_to_cancel = HashSet::new();
+ let executors_last_heartbeat = HashMap::new();
+
+ let resources = TeaclaveSchedulerResources {
storage_client,
task_queue,
+ executors_tasks,
+ executors_last_heartbeat,
+ executors_status,
+ tasks_to_cancel,
};
- Ok(service)
+ Ok(resources)
}
fn pull_staged_task<T: Storable>(&self, key: &[u8]) -> TeaclaveServiceResponseResult<T> {
@@ -76,6 +188,33 @@ impl TeaclaveSchedulerService {
.map_err(|_| TeaclaveSchedulerError::DataError.into())
}
+ fn pull_cancel_queue(&self) -> Result<TaskState> {
+ let dequeue_request = DequeueRequest::new(CANCEL_QUEUE_KEY.as_bytes());
+ let dequeue_response = self
+ .storage_client
+ .clone()
+ .lock()
+ .map_err(|_| TeaclaveSchedulerError::StorageError)?
+ .dequeue(dequeue_request)?;
+ TaskState::from_slice(dequeue_response.value.as_slice())
+ .map_err(|_| TeaclaveSchedulerError::DataError.into())
+ }
+
+ fn cancel_task(&self, task_id: Uuid) -> Result<()> {
+ let ts = self.get_task_state(&task_id)?;
+ let mut task: Task<Cancel> = ts.try_into()?;
+
+ // Only TaskStatus::Running/Staged is allowed here.
+ let result_err = TaskResult::Err(TaskFailure::new("Task Canceled by the user"));
+
+ task.update_result(result_err)?;
+
+ let ts = TaskState::from(task);
+ self.put_into_db(&ts)?;
+
+ Ok(())
+ }
+
fn get_task_state(&self, task_id: &Uuid) -> Result<TaskState> {
let key = ExternalID::new(TaskState::key_prefix(), task_id.to_owned());
self.get_from_db(&key)
@@ -114,12 +253,14 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
request: Request<PublishTaskRequest>,
) -> TeaclaveServiceResponseResult<PublishTaskResponse> {
// XXX: Publisher is not implemented
- let mut task_queue = self
- .task_queue
+
+ let mut resources = self
+ .resources
.lock()
- .map_err(|_| anyhow!("Cannot lock task queue"))?;
+ .map_err(|_| anyhow!("Cannot lock scheduler resources"))?;
+
let staged_task = request.message.staged_task;
- task_queue.push_back(staged_task);
+ resources.task_queue.push_back(staged_task);
Ok(PublishTaskResponse {})
}
@@ -132,29 +273,107 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
unimplemented!()
}
+ fn heartbeat(
+ &self,
+ request: Request<HeartbeatRequest>,
+ ) -> TeaclaveServiceResponseResult<HeartbeatResponse> {
+ let mut resources = self
+ .resources
+ .lock()
+ .map_err(|_| anyhow!("Cannot lock scheduler resources"))?;
+
+ let mut command = ExecutorCommand::NoAction;
+
+ let executor_id = request.message.executor_id;
+ let status = request.message.status;
+
+ resources.executors_status.insert(executor_id, status);
+
+ resources
+ .executors_last_heartbeat
+ .insert(executor_id, SystemTime::now());
+
+ // check if the executor need to be stopped
+ match resources.executors_tasks.get(&executor_id) {
+ Some(task_id) => match status {
+ ExecutorStatus::Executing => {
+ if resources.tasks_to_cancel.contains(task_id) {
+ command = ExecutorCommand::Stop;
+ let task_id = task_id.to_owned();
+ resources.tasks_to_cancel.remove(&task_id);
+ log::debug!(
+ "Sending stop command to executor {}, killing executor {} because of task cancelation",
+ executor_id,
+ task_id
+ );
+ resources.cancel_task(task_id)?;
+ return Ok(HeartbeatResponse { command });
+ }
+ }
+ ExecutorStatus::Idle => {
+ resources.executors_tasks.remove(&executor_id);
+ }
+ },
+ None => {}
+ };
+
+ if resources.task_queue.len() > 0 {
+ command = ExecutorCommand::NewTask;
+ };
+
+ let response = HeartbeatResponse { command };
+ Ok(response)
+ }
+
fn pull_task(
&self,
- _request: Request<PullTaskRequest>,
+ request: Request<PullTaskRequest>,
) -> TeaclaveServiceResponseResult<PullTaskResponse> {
- let key = StagedTask::get_queue_key().as_bytes();
- let staged_task = self.pull_staged_task(key)?;
- let response = PullTaskResponse::new(staged_task);
- Ok(response)
+ let request = request.message;
+ let mut resources = self
+ .resources
+ .lock()
+ .map_err(|_| anyhow!("Cannot lock scheduler resources"))?;
+
+ match resources.task_queue.pop_front() {
+ Some(task) => match resources.tasks_to_cancel.take(&task.task_id) {
+ Some(task_id) => {
+ resources.cancel_task(task_id)?;
+ Err(TeaclaveServiceResponseError::InternalError(
+ "Task to pull has been canceled".into(),
+ ))
+ }
+ None => {
+ resources
+ .executors_tasks
+ .insert(request.executor_id, task.task_id);
+ Ok(PullTaskResponse::new(task))
+ }
+ },
+ None => Err(TeaclaveServiceResponseError::InternalError(
+ "No staged task in task_queue".into(),
+ )),
+ }
}
fn update_task_status(
&self,
request: Request<UpdateTaskStatusRequest>,
) -> TeaclaveServiceResponseResult<UpdateTaskStatusResponse> {
+ let resources = self
+ .resources
+ .lock()
+ .map_err(|_| anyhow!("Cannot lock scheduler resources"))?;
+
let request = request.message;
- let ts = self.get_task_state(&request.task_id)?;
+ let ts = resources.get_task_state(&request.task_id)?;
let task: Task<Run> = ts.try_into()?;
log::debug!("UpdateTaskStatus: Task {:?}", task);
// Only TaskStatus::Running is implicitly allowed here.
let ts = TaskState::from(task);
- self.put_into_db(&ts)?;
+ resources.put_into_db(&ts)?;
Ok(UpdateTaskStatusResponse {})
}
@@ -162,14 +381,19 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
&self,
request: Request<UpdateTaskResultRequest>,
) -> TeaclaveServiceResponseResult<UpdateTaskResultResponse> {
+ let resources = self
+ .resources
+ .lock()
+ .map_err(|_| anyhow!("Cannot lock scheduler resources"))?;
+
let request = request.message;
- let ts = self.get_task_state(&request.task_id)?;
+ let ts = resources.get_task_state(&request.task_id)?;
let mut task: Task<Finish> = ts.try_into()?;
if let TaskResult::Ok(outputs) = &request.task_result {
for (key, auth_tag) in outputs.tags_map.iter() {
let outfile = task.update_output_cmac(key, auth_tag)?;
- self.put_into_db(outfile)?;
+ resources.put_into_db(outfile)?;
}
};
@@ -178,7 +402,7 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
log::debug!("UpdateTaskResult: Task {:?}", task);
let ts = TaskState::from(task);
- self.put_into_db(&ts)?;
+ resources.put_into_db(&ts)?;
Ok(UpdateTaskResultResponse {})
}
}
diff --git a/services/utils/service_app_utils/src/lib.rs b/services/utils/service_app_utils/src/lib.rs
index 1295472..1475f80 100644
--- a/services/utils/service_app_utils/src/lib.rs
+++ b/services/utils/service_app_utils/src/lib.rs
@@ -23,7 +23,7 @@ use std::thread;
use teaclave_binder::proto::{ECallCommand, StartServiceInput, StartServiceOutput};
use teaclave_binder::TeeBinder;
use teaclave_config::RuntimeConfig;
-use teaclave_types::TeeServiceResult;
+use teaclave_types::{TeeServiceError, TeeServiceResult};
struct TeaclaveServiceLauncher {
tee: TeeBinder,
@@ -46,6 +46,7 @@ impl TeaclaveServiceLauncher {
.invoke::<StartServiceInput, TeeServiceResult<StartServiceOutput>>(command, input)
{
Err(e) => bail!("TEE invocation error: {:?}", e),
+ Ok(Err(TeeServiceError::EnclaveForceTermination)) => std::process::exit(1),
Ok(Err(e)) => bail!("Service exit with error: {:?}", e),
_ => Ok(String::from("Service successfully exit")),
}
diff --git a/tests/fixtures/functions/gbdt_training/e2e_output_model.enc b/tests/fixtures/functions/gbdt_training/e2e_output_model.enc
new file mode 100644
index 0000000..18f5224
Binary files /dev/null and b/tests/fixtures/functions/gbdt_training/e2e_output_model.enc differ
diff --git a/tests/functional/enclave/src/execution_service.rs b/tests/functional/enclave/src/execution_service.rs
index 443ca9e..67af16d 100644
--- a/tests/functional/enclave/src/execution_service.rs
+++ b/tests/functional/enclave/src/execution_service.rs
@@ -53,7 +53,7 @@ fn test_execute_function() {
let put_request = PutRequest::new(ts.key().as_slice(), ts.to_vec().unwrap().as_slice());
let _put_response = storage_client.put(put_request).unwrap();
- std::thread::sleep(std::time::Duration::from_secs(5));
+ std::thread::sleep(std::time::Duration::from_secs(15));
let get_request = GetRequest::new(ts.key().as_slice());
let get_response = storage_client.get(get_request).unwrap();
diff --git a/tests/functional/enclave/src/frontend_service.rs b/tests/functional/enclave/src/frontend_service.rs
index 9a7f6c6..0653e56 100644
--- a/tests/functional/enclave/src/frontend_service.rs
+++ b/tests/functional/enclave/src/frontend_service.rs
@@ -19,11 +19,13 @@ use crate::utils::*;
use std::convert::TryFrom;
use std::prelude::v1::*;
use teaclave_proto::teaclave_common::*;
+use teaclave_proto::teaclave_common::{ExecutorCommand, ExecutorStatus};
use teaclave_proto::teaclave_frontend_service::*;
use teaclave_proto::teaclave_scheduler_service::*;
use teaclave_test_utils::test_case;
use teaclave_types::*;
use url::Url;
+use uuid::Uuid;
fn authorized_client() -> TeaclaveFrontendClient {
let mut api_client =
@@ -343,8 +345,141 @@ fn test_invoke_task() {
let response = client.get_task(request).unwrap();
assert_eq!(response.status, TaskStatus::Staged);
- let request = PullTaskRequest {};
let mut scheduler_client = get_scheduler_client();
- let response = scheduler_client.pull_task(request);
+ let executor_id = Uuid::new_v4();
+
+ std::thread::sleep(std::time::Duration::from_secs(2));
+
+ let pull_task_request = PullTaskRequest { executor_id };
+ let response = scheduler_client.pull_task(pull_task_request);
assert!(response.is_ok());
}
+
+#[test_case]
+fn test_cancel_task() {
+ let mut client = authorized_client();
+ let function_id =
+ ExternalID::try_from("function-00000000-0000-0000-0000-000000000002").unwrap();
+ let external_outfile_url =
+ Url::parse("https://external-storage.com/filepath?presigned_token").unwrap();
+ let external_outfile_crypto = FileCrypto::default();
+
+ let request = CreateTaskRequest::new()
+ .function_id(function_id)
+ .function_arguments(hashmap!("arg1" => "arg1_value"))
+ .executor(Executor::MesaPy)
+ .outputs_ownership(hashmap!("output" => vec!["frontend_user"]));
+ let response = client.create_task(request).unwrap();
+ let task_id = response.task_id;
+
+ let request = RegisterOutputFileRequest::new(external_outfile_url, external_outfile_crypto);
+ let response = client.register_output_file(request).unwrap();
+ let output_id = response.data_id;
+
+ let request =
+ AssignDataRequest::new(task_id.clone(), hashmap!(), hashmap!("output" => output_id));
+ client.assign_data(request).unwrap();
+
+ let request = ApproveTaskRequest::new(task_id.clone());
+ client.approve_task(request).unwrap();
+
+ let request = InvokeTaskRequest::new(task_id.clone());
+ let response = client.invoke_task(request);
+ assert!(response.is_ok());
+
+ let mut scheduler_client = get_scheduler_client();
+
+ std::thread::sleep(std::time::Duration::from_secs(5));
+
+ let executor_id = Uuid::new_v4();
+ let request = HeartbeatRequest {
+ executor_id,
+ status: ExecutorStatus::Idle,
+ };
+
+ let response = scheduler_client.heartbeat(request).unwrap();
+ assert!(response.command == ExecutorCommand::NewTask);
+
+ let request = CancelTaskRequest::new(task_id.clone());
+ let response = client.cancel_task(request);
+ assert!(response.is_ok());
+
+ std::thread::sleep(std::time::Duration::from_secs(3));
+
+ let pull_task_request = PullTaskRequest { executor_id };
+ let response = scheduler_client.pull_task(pull_task_request);
+ log::debug!("response: {:?}", response);
+
+ assert!(response.is_err());
+
+ std::thread::sleep(std::time::Duration::from_secs(3));
+
+ let request = GetTaskRequest::new(task_id);
+ let response = client.get_task(request).unwrap();
+
+ assert_eq!(response.status, TaskStatus::Canceled);
+}
+
+#[test_case]
+fn test_fail_task() {
+ let mut client = authorized_client();
+ let function_id =
+ ExternalID::try_from("function-00000000-0000-0000-0000-000000000002").unwrap();
+ let external_outfile_url =
+ Url::parse("https://external-storage.com/filepath?presigned_token").unwrap();
+ let external_outfile_crypto = FileCrypto::default();
+
+ let request = CreateTaskRequest::new()
+ .function_id(function_id)
+ .function_arguments(hashmap!("arg1" => "arg1_value"))
+ .executor(Executor::MesaPy)
+ .outputs_ownership(hashmap!("output" => vec!["frontend_user"]));
+ let response = client.create_task(request).unwrap();
+ let task_id = response.task_id;
+
+ let request = RegisterOutputFileRequest::new(external_outfile_url, external_outfile_crypto);
+ let response = client.register_output_file(request).unwrap();
+ let output_id = response.data_id;
+
+ let request =
+ AssignDataRequest::new(task_id.clone(), hashmap!(), hashmap!("output" => output_id));
+ client.assign_data(request).unwrap();
+
+ let request = ApproveTaskRequest::new(task_id.clone());
+ client.approve_task(request).unwrap();
+
+ let request = InvokeTaskRequest::new(task_id.clone());
+ let response = client.invoke_task(request);
+ assert!(response.is_ok());
+
+ let mut scheduler_client = get_scheduler_client();
+
+ std::thread::sleep(std::time::Duration::from_secs(5));
+
+ let executor_id = Uuid::new_v4();
+ let request = HeartbeatRequest {
+ executor_id,
+ status: ExecutorStatus::Idle,
+ };
+ let response = scheduler_client.heartbeat(request).unwrap();
+ assert!(response.command == ExecutorCommand::NewTask);
+
+ let pull_task_request = PullTaskRequest { executor_id };
+ let response = scheduler_client.pull_task(pull_task_request).unwrap();
+ log::debug!("response: {:?}", response);
+
+ let request = HeartbeatRequest {
+ executor_id,
+ status: ExecutorStatus::Executing,
+ };
+ let response = scheduler_client.heartbeat(request).unwrap();
+ log::debug!("response: {:?}", response);
+ assert!(response.command == ExecutorCommand::NoAction);
+
+ std::thread::sleep(std::time::Duration::from_secs(33));
+
+ let request = GetTaskRequest::new(task_id);
+ let response = client.get_task(request).unwrap();
+
+ assert_eq!(response.status, TaskStatus::Failed);
+}
diff --git a/tests/functional/enclave/src/management_service.rs b/tests/functional/enclave/src/management_service.rs
index 2ec28e5..7001006 100644
--- a/tests/functional/enclave/src/management_service.rs
+++ b/tests/functional/enclave/src/management_service.rs
@@ -23,6 +23,7 @@ use teaclave_proto::teaclave_scheduler_service::*;
use teaclave_test_utils::test_case;
use teaclave_types::*;
use url::Url;
+use uuid::Uuid;
fn authorized_client(user_id: &str) -> TeaclaveManagementClient {
get_management_client(user_id)
@@ -723,8 +724,12 @@ fn test_invoke_task() {
let response = client2.get_task(request).unwrap();
assert_eq!(response.status, TaskStatus::Staged);
- let request = PullTaskRequest {};
let mut scheduler_client = get_scheduler_client();
- let response = scheduler_client.pull_task(request);
+ let executor_id = Uuid::new_v4();
+
+ std::thread::sleep(std::time::Duration::from_secs(2));
+
+ let pull_task_request = PullTaskRequest { executor_id };
+ let response = scheduler_client.pull_task(pull_task_request);
assert!(response.is_ok());
}
diff --git a/tests/functional/enclave/src/scheduler_service.rs b/tests/functional/enclave/src/scheduler_service.rs
index a96a08f..568928f 100644
--- a/tests/functional/enclave/src/scheduler_service.rs
+++ b/tests/functional/enclave/src/scheduler_service.rs
@@ -17,6 +17,7 @@
use crate::utils::*;
use std::prelude::v1::*;
+
use teaclave_proto::teaclave_scheduler_service::*;
use teaclave_proto::teaclave_storage_service::*;
use teaclave_test_utils::test_case;
@@ -42,11 +43,23 @@ fn test_pull_task() {
let _enqueue_response = storage_client.enqueue(enqueue_request).unwrap();
let mut client = get_scheduler_client();
- let request = PullTaskRequest {};
- let response = client.pull_task(request);
+ let executor_id = Uuid::new_v4();
+
+ std::thread::sleep(std::time::Duration::from_secs(2));
+
+ let pull_task_request = PullTaskRequest { executor_id };
+ let response = client.pull_task(pull_task_request);
log::debug!("response: {:?}", response);
+
assert!(response.is_ok());
- assert_eq!(response.unwrap().staged_task.function_id, function_id);
+
+ let response = response.unwrap();
+ log::info!(
+ "pulled staged_task function_id: {:?}",
+ response.staged_task.function_id
+ );
+
+ assert_eq!(response.staged_task.function_id, function_id);
}
#[test_case]
@@ -77,8 +90,13 @@ fn test_update_task_status_result() {
let _put_response = storage_client.put(put_request).unwrap();
let mut client = get_scheduler_client();
- let request = PullTaskRequest {};
- let response = client.pull_task(request).unwrap();
+
+ let executor_id = Uuid::new_v4();
+
+ std::thread::sleep(std::time::Duration::from_secs(2));
+
+ let pull_task_request = PullTaskRequest { executor_id };
+ let response = client.pull_task(pull_task_request).unwrap();
log::debug!("response: {:?}", response);
let task_id = response.staged_task.task_id;
diff --git a/types/src/error.rs b/types/src/error.rs
index f3b941d..c4bd12d 100644
--- a/types/src/error.rs
+++ b/types/src/error.rs
@@ -65,6 +65,8 @@ pub enum TeeServiceError {
ServiceError,
#[error("CommandNotRegistered")]
CommandNotRegistered,
+ #[error("EnclaveForceTermination")]
+ EnclaveForceTermination,
}
pub type TeeServiceResult<T> = std::result::Result<T, TeeServiceError>;
diff --git a/types/src/storage.rs b/types/src/storage.rs
index 44be773..5fc2325 100644
--- a/types/src/storage.rs
+++ b/types/src/storage.rs
@@ -20,6 +20,8 @@ use serde::{Deserialize, Serialize};
use std::prelude::v1::*;
use uuid::Uuid;
+pub const CANCEL_QUEUE_KEY: &str = "cancel_queue";
+
pub trait Storable: Serialize + for<'de> Deserialize<'de> {
fn key_prefix() -> &'static str;
diff --git a/types/src/task.rs b/types/src/task.rs
index b5cb135..320d3d2 100644
--- a/types/src/task.rs
+++ b/types/src/task.rs
@@ -142,6 +142,8 @@ pub enum TaskStatus {
Staged,
Running,
Finished,
+ Canceled,
+ Failed,
}
impl Default for TaskStatus {
diff --git a/types/src/task_state.rs b/types/src/task_state.rs
index c7e0ea4..da0ece6 100644
--- a/types/src/task_state.rs
+++ b/types/src/task_state.rs
@@ -81,6 +81,13 @@ impl TaskState {
pub fn has_creator(&self, user_id: &UserID) -> bool {
&self.creator == user_id
}
+
+ pub fn is_ended(&self) -> bool {
+ match self.status {
+ TaskStatus::Finished | TaskStatus::Failed | TaskStatus::Canceled => true,
+ _ => false,
+ }
+ }
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
@@ -97,6 +104,8 @@ impl StateTag for Stage {}
impl StateTag for Run {}
impl StateTag for Finish {}
impl StateTag for Done {}
+impl StateTag for Cancel {}
+impl StateTag for Fail {}
impl Task<Create> {
pub fn new(
@@ -298,6 +307,50 @@ impl Task<Done> {
}
}
+impl Task<Fail> {
+ pub fn new(ts: TaskState) -> Result<Self> {
+ let task = Task::<Fail> {
+ state: ts,
+ extra: Fail,
+ };
+ Ok(task)
+ }
+
+ pub fn update_result(&mut self, result: TaskResult) -> Result<()> {
+ match &result {
+ TaskResult::Err(_) => {
+ self.state.result = result;
+ Ok(())
+ }
+ _ => Err(Error::msg(
+ "TaskResult::Err(TaskFailure) is expected for failed task",
+ )),
+ }
+ }
+}
+
+impl Task<Cancel> {
+ pub fn new(ts: TaskState) -> Result<Self> {
+ let task = Task::<Cancel> {
+ state: ts,
+ extra: Cancel,
+ };
+ Ok(task)
+ }
+
+ pub fn update_result(&mut self, result: TaskResult) -> Result<()> {
+ match &result {
+ TaskResult::Err(_) => {
+ self.state.result = result;
+ Ok(())
+ }
+ _ => Err(Error::msg(
+ "TaskResult::Err(TaskFailure) is expected for canceled task",
+ )),
+ }
+ }
+}
+
trait TryTransitionTo<T>: Sized {
type Error;
fn try_transition_to(self) -> std::result::Result<T, Error>;
@@ -418,6 +471,34 @@ impl std::convert::TryFrom<TaskState> for Task<Finish> {
}
}
+impl std::convert::TryFrom<TaskState> for Task<Fail> {
+ type Error = Error;
+
+ fn try_from(ts: TaskState) -> Result<Self> {
+ let task = match ts.status {
+ TaskStatus::Running | TaskStatus::Staged => Task::<Fail>::new(ts)?,
+ _ => bail!("Cannot restore to Fail from saved state"),
+ };
+ Ok(task)
+ }
+}
+
+impl std::convert::TryFrom<TaskState> for Task<Cancel> {
+ type Error = Error;
+
+ fn try_from(ts: TaskState) -> Result<Self> {
+ let task = match ts.status {
+ TaskStatus::Running
+ | TaskStatus::Staged
+ | TaskStatus::Approved
+ | TaskStatus::Created
+ | TaskStatus::DataAssigned => Task::<Cancel>::new(ts)?,
+ _ => bail!("Cannot restore to Cancel from saved state"),
+ };
+ Ok(task)
+ }
+}
+
impl std::convert::From<Task<Create>> for TaskState {
fn from(mut task: Task<Create>) -> TaskState {
task.state.status = TaskStatus::Created;
@@ -425,6 +506,20 @@ impl std::convert::From<Task<Create>> for TaskState {
}
}
+impl std::convert::From<Task<Fail>> for TaskState {
+ fn from(mut task: Task<Fail>) -> TaskState {
+ task.state.status = TaskStatus::Failed;
+ task.state
+ }
+}
+
+impl std::convert::From<Task<Cancel>> for TaskState {
+ fn from(mut task: Task<Cancel>) -> TaskState {
+ task.state.status = TaskStatus::Canceled;
+ task.state
+ }
+}
+
impl_transit_and_into_task_state!(Assign => Approve);
impl_transit_and_into_task_state!(Approve => Stage);
impl_transit_and_into_task_state!(Stage => Run);
@@ -463,6 +558,10 @@ pub struct Run;
pub struct Finish;
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
pub struct Done;
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Cancel;
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Fail;
impl std::convert::From<Create> for TaskStatus {
fn from(_tag: Create) -> TaskStatus {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@teaclave.apache.org
For additional commands, e-mail: commits-help@teaclave.apache.org