You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@teaclave.apache.org by ms...@apache.org on 2022/01/19 23:32:32 UTC

[incubator-teaclave] branch master updated: Add watchdog functionalities (#600)

This is an automated email from the ASF dual-hosted git repository.

mssun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fdac8c  Add watchdog functionalities (#600)
8fdac8c is described below

commit 8fdac8ca29c518bb73483e801dcd07d9b85abec8
Author: Hongbo <12...@users.noreply.github.com>
AuthorDate: Wed Jan 19 15:32:23 2022 -0800

    Add watchdog functionalities (#600)
    
    Add executor service watchdog
    
    - executor now heartbeats to send its current status and receive
    command from scheduler
    - scheduler can now find out lost executor(s) and monitor their status,
    and fail the corresponding task(s) when necessary
    - users can cancel a task, new API has been added
    - new tests for task cancelation and dangling task detection
---
 cmake/UtilTargets.cmake                            |   3 +
 cmake/scripts/test.sh                              |  73 +++++-
 examples/python/builtin_gbdt_train.py              |   7 +
 examples/python/mesapy_deadloop_cancel.py          |  83 +++++++
 examples/python/mesapy_deadloop_payload.py         |  21 ++
 sdk/python/teaclave.py                             |  27 +++
 services/execution/enclave/src/lib.rs              |   9 +-
 services/execution/enclave/src/service.rs          | 160 +++++++++----
 services/frontend/enclave/src/service.rs           |  45 +++-
 services/management/enclave/src/service.rs         |  68 +++++-
 services/proto/src/proto/teaclave_common.proto     |  13 +
 .../src/proto/teaclave_frontend_service.proto      |   8 +-
 .../src/proto/teaclave_management_service.proto    |   1 +
 .../src/proto/teaclave_scheduler_service.proto     |  14 +-
 services/proto/src/teaclave_common.rs              | 105 +++++++++
 services/proto/src/teaclave_frontend_service.rs    |  49 ++++
 services/proto/src/teaclave_management_service.rs  |   2 +
 services/proto/src/teaclave_scheduler_service.rs   |  81 ++++++-
 services/scheduler/enclave/src/lib.rs              |  17 +-
 services/scheduler/enclave/src/service.rs          | 262 +++++++++++++++++++--
 services/utils/service_app_utils/src/lib.rs        |   3 +-
 .../functions/gbdt_training/e2e_output_model.enc   | Bin 0 -> 204800 bytes
 tests/functional/enclave/src/execution_service.rs  |   2 +-
 tests/functional/enclave/src/frontend_service.rs   | 139 ++++++++++-
 tests/functional/enclave/src/management_service.rs |   9 +-
 tests/functional/enclave/src/scheduler_service.rs  |  28 ++-
 types/src/error.rs                                 |   2 +
 types/src/storage.rs                               |   2 +
 types/src/task.rs                                  |   2 +
 types/src/task_state.rs                            |  99 ++++++++
 30 files changed, 1225 insertions(+), 109 deletions(-)

diff --git a/cmake/UtilTargets.cmake b/cmake/UtilTargets.cmake
index f2162d4..5c6cf1c 100644
--- a/cmake/UtilTargets.cmake
+++ b/cmake/UtilTargets.cmake
@@ -73,6 +73,9 @@ if(TEST_MODE)
   add_custom_target(
     run-sdk-tests COMMAND ${TEACLAVE_COMMON_ENVS}
                           ${MT_SCRIPT_DIR}/test.sh sdk)
+  add_custom_target(
+    run-cancel-test COMMAND ${TEACLAVE_COMMON_ENVS}
+                          ${MT_SCRIPT_DIR}/test.sh cancel)
 else()
   add_custom_target(
     run-tests
diff --git a/cmake/scripts/test.sh b/cmake/scripts/test.sh
index 6d6c61a..6308646 100755
--- a/cmake/scripts/test.sh
+++ b/cmake/scripts/test.sh
@@ -97,7 +97,7 @@ run_functional_tests() {
   pushd ${TEACLAVE_SERVICE_INSTALL_DIR}
   ./teaclave_authentication_service &
   ./teaclave_storage_service &
-  sleep 3    # wait for authentication and storage service
+  sleep 10    # wait for authentication and storage service
   ./teaclave_management_service &
   ./teaclave_scheduler_service &
   sleep 3    # wait for management service and scheduler_service
@@ -165,7 +165,7 @@ run_sdk_tests() {
   pushd ${TEACLAVE_SERVICE_INSTALL_DIR}
   ./teaclave_authentication_service &
   ./teaclave_storage_service &
-  sleep 3    # wait for authentication and storage service
+  sleep 10    # wait for authentication and storage service
   ./teaclave_management_service &
   ./teaclave_scheduler_service &
   sleep 3    # wait for management service and scheduler_service
@@ -258,6 +258,71 @@ run_examples() {
   cleanup
 }
 
+run_cancel_test() {
+  trap cleanup INT TERM ERR
+
+  echo_title "cancel"
+  mkdir -p /tmp/fusion_data
+  pushd ${TEACLAVE_CLI_INSTALL_DIR}
+  ./teaclave_cli verify \
+                 --enclave-info ../examples/enclave_info.toml \
+                 --public-keys $(find ../examples -name "*.public.pem") \
+                 --signatures $(find ../examples -name "*.sign.sha256")
+  popd
+
+  echo "initiating Teaclave with 2 executors..."
+
+  pushd ${TEACLAVE_SERVICE_INSTALL_DIR}
+  ./teaclave_authentication_service &
+  ./teaclave_storage_service &
+  sleep 3    # wait for authentication and storage service
+  ./teaclave_management_service &
+  ./teaclave_scheduler_service &
+  sleep 3    # wait for management service and scheduler_service
+  ./teaclave_access_control_service &
+  ./teaclave_frontend_service &
+  sleep 3    # wait for other services
+
+  start_storage_server
+
+  # Run of execution services separately
+  ./teaclave_execution_service & exe_pid1=$!
+  ./teaclave_execution_service & exe_pid2=$!
+  sleep 10    # wait for execution services
+  popd
+
+  echo "executor 1 pid: $exe_pid1"
+  echo "executor 2 pid: $exe_pid2"
+
+  pushd ${TEACLAVE_PROJECT_ROOT}/examples/python
+  export PYTHONPATH=${TEACLAVE_PROJECT_ROOT}/sdk/python
+  python3 mesapy_deadloop_cancel.py
+  popd
+
+  sleep 3
+
+  live_pids=0
+  if ps -p $exe_pid1 > /dev/null
+  then
+    live_pids=$((live_pids+1))
+  fi
+
+  if ps -p $exe_pid2 > /dev/null
+  then
+    live_pids=$((live_pids+1))
+  fi
+
+  if [ $live_pids -eq 1 ]
+  then
+    echo "only one executor is killed, test passed"
+  else
+    echo "Some unexpected happens, test failed"
+    false
+  fi
+
+  cleanup
+}
+
 case "$1" in
     "unit")
         run_unit_tests
@@ -274,11 +339,15 @@ case "$1" in
     "example")
         run_examples
         ;;
+    "cancel")
+        run_cancel_test
+        ;;
     *)
         run_unit_tests
         run_integration_tests
         run_functional_tests
         run_sdk_tests
         run_examples
+        run_cancel_test
         ;;
 esac
diff --git a/examples/python/builtin_gbdt_train.py b/examples/python/builtin_gbdt_train.py
index 2efc9c7..08b4b9a 100644
--- a/examples/python/builtin_gbdt_train.py
+++ b/examples/python/builtin_gbdt_train.py
@@ -103,6 +103,13 @@ class BuiltinGbdtExample:
         print("[+] invoking task")
         client.invoke_task(task_id)
 
+        # If you feel like you don't want to wait for the task to finish
+        # or something unexpected happens,
+        # you may uncomment the following lines to cancel the task
+        # time.sleep(3)
+        # print("[+] canceling task")
+        # client.cancel_task(task_id)
+
         print("[+] getting result")
         result = client.get_task_result(task_id)
         print("[+] done")
diff --git a/examples/python/mesapy_deadloop_cancel.py b/examples/python/mesapy_deadloop_cancel.py
new file mode 100644
index 0000000..2fee278
--- /dev/null
+++ b/examples/python/mesapy_deadloop_cancel.py
@@ -0,0 +1,83 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+import time
+
+from teaclave import FunctionInput, FunctionOutput, OwnerList, DataMap
+from utils import USER_ID, USER_PASSWORD, connect_authentication_service, connect_frontend_service, PlatformAdmin
+
+
+class MesaPyEchoExample:
+    def __init__(self, user_id, user_password):
+        self.user_id = user_id
+        self.user_password = user_password
+
+    def deadloop(self, payload_file="mesapy_deadloop_payload.py"):
+        with connect_authentication_service() as client:
+            print(f"[+] {self.user_id} login")
+            token = client.user_login(self.user_id, self.user_password)
+
+        client = connect_frontend_service()
+        metadata = {"id": self.user_id, "token": token}
+        client.metadata = metadata
+
+        print("[+] registering function")
+
+        with open(payload_file, "rb") as f:
+            payload = f.read()
+        function_id = client.register_function(
+            name="mesapy-deadloop",
+            description="A deadloop function to test task cancellation",
+            executor_type="python",
+            payload=list(payload),
+            arguments=[])
+
+        print("[+] creating task")
+        task_id = client.create_task(function_id=function_id,
+                                     function_arguments={},
+                                     executor="mesapy")
+
+        print("[+] invoking task")
+        client.invoke_task(task_id)
+
+        print("[+] canceling task")
+        time.sleep(5)
+        client.cancel_task(task_id)
+
+        print("[+] getting result")
+
+        try:
+            result = client.get_task_result(task_id)
+        except Exception as e:
+            print(f"[+] result: {str(e)}")
+            result = str(e)
+
+        return result
+
+
+def main():
+    example = MesaPyEchoExample(USER_ID, USER_PASSWORD)
+    rv = example.deadloop()
+
+    print("[+] function return: ", rv)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/examples/python/mesapy_deadloop_payload.py b/examples/python/mesapy_deadloop_payload.py
new file mode 100644
index 0000000..b5a22e6
--- /dev/null
+++ b/examples/python/mesapy_deadloop_payload.py
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+def entrypoint(argv):
+    while True:
+        pass
diff --git a/sdk/python/teaclave.py b/sdk/python/teaclave.py
index 3e39f99..993d0a7 100644
--- a/sdk/python/teaclave.py
+++ b/sdk/python/teaclave.py
@@ -371,6 +371,13 @@ class InvokeTaskRequest(Request):
         self.task_id = task_id
 
 
+class CancelTaskRequest(Request):
+    def __init__(self, metadata: Metadata, task_id: str):
+        self.request = "cancel_task"
+        self.metadata = metadata
+        self.task_id = task_id
+
+
 class GetTaskRequest(Request):
     def __init__(self, metadata: Metadata, task_id: str):
         self.request = "get_task"
@@ -557,6 +564,17 @@ class FrontendService(TeaclaveService):
         else:
             raise TeaclaveException("Failed to invoke task")
 
+    def cancel_task(self, task_id: str):
+        self.check_metadata()
+        self.check_channel()
+        request = CancelTaskRequest(self.metadata, task_id)
+        _write_message(self.channel, request)
+        response = _read_message(self.channel)
+        if response["result"] == "ok":
+            pass
+        else:
+            raise TeaclaveException("Failed to cancel task")
+
     def get_task_result(self, task_id: str):
         self.check_metadata()
         self.check_channel()
@@ -568,6 +586,15 @@ class FrontendService(TeaclaveService):
             if response["result"] != "ok":
                 raise TeaclaveException("Failed to get task result")
             time.sleep(1)
+            if response["content"]["status"] == 20:
+                print(response["content"]["result"]["result"])
+                raise TeaclaveException(
+                    "Task Canceled, Error: " +
+                    response["content"]["result"]["result"]["Err"]["reason"])
+            if response["content"]["status"] == 99:
+                raise TeaclaveException(
+                    "Task Failed, Error: " +
+                    response["content"]["result"]["result"]["Err"]["reason"])
             if response["content"]["status"] == 10:
                 break
 
diff --git a/services/execution/enclave/src/lib.rs b/services/execution/enclave/src/lib.rs
index 3d46869..27b1b9b 100644
--- a/services/execution/enclave/src/lib.rs
+++ b/services/execution/enclave/src/lib.rs
@@ -77,18 +77,17 @@ fn start_service(config: &RuntimeConfig) -> Result<()> {
 
     let mut service =
         service::TeaclaveExecutionService::new(scheduler_service_endpoint, fusion_base)?;
-    let _ = service.start();
-
-    Ok(())
+    service.start()
 }
 
 #[handle_ecall]
 fn handle_start_service(input: &StartServiceInput) -> TeeServiceResult<StartServiceOutput> {
     match start_service(&input.config) {
         Ok(_) => Ok(StartServiceOutput),
+        // terminate the enclave for executor
         Err(e) => {
-            log::error!("Failed to start the service: {}", e);
-            Err(TeeServiceError::ServiceError)
+            log::error!("Service shutdown, reason: {}", e);
+            Err(TeeServiceError::EnclaveForceTermination)
         }
     }
 }
diff --git a/services/execution/enclave/src/service.rs b/services/execution/enclave/src/service.rs
index f5eaa9c..0aa7515 100644
--- a/services/execution/enclave/src/service.rs
+++ b/services/execution/enclave/src/service.rs
@@ -18,9 +18,12 @@
 use std::collections::HashMap;
 use std::path::{Path, PathBuf};
 use std::prelude::v1::*;
+use std::sync::mpsc;
 use std::sync::{Arc, SgxMutex as Mutex};
+use std::thread;
 
 use crate::task_file_manager::TaskFileManager;
+use teaclave_proto::teaclave_common::{ExecutorCommand, ExecutorStatus};
 use teaclave_proto::teaclave_scheduler_service::*;
 use teaclave_rpc::endpoint::Endpoint;
 use teaclave_types::*;
@@ -36,6 +39,8 @@ pub(crate) struct TeaclaveExecutionService {
     worker: Arc<Worker>,
     scheduler_client: Arc<Mutex<TeaclaveSchedulerClient>>,
     fusion_base: PathBuf,
+    id: Uuid,
+    status: ExecutorStatus,
 }
 
 impl TeaclaveExecutionService {
@@ -61,47 +66,102 @@ impl TeaclaveExecutionService {
             worker: Arc::new(Worker::default()),
             scheduler_client,
             fusion_base: fusion_base.as_ref().to_owned(),
+            id: Uuid::new_v4(),
+            status: ExecutorStatus::Idle,
         })
     }
 
     pub(crate) fn start(&mut self) -> Result<()> {
+        let (tx, rx) = mpsc::channel();
+        let mut current_task: Arc<Option<StagedTask>> = Arc::new(None);
+        let mut task_handle: Option<thread::JoinHandle<()>> = None;
+
         loop {
             std::thread::sleep(std::time::Duration::from_secs(3));
-            let staged_task = match self.pull_task() {
-                Ok(staged_task) => staged_task,
+
+            match self.heartbeat() {
+                Ok(ExecutorCommand::Stop) => {
+                    log::info!("Executor {} is stopped", self.id);
+                    return Err(anyhow::anyhow!("EnclaveForceTermination"));
+                }
+                Ok(ExecutorCommand::NewTask) if self.status == ExecutorStatus::Idle => {
+                    match self.pull_task() {
+                        Ok(task) => {
+                            self.status = ExecutorStatus::Executing;
+                            self.update_task_status(&task.task_id, TaskStatus::Running)?;
+                            let tx_task = tx.clone();
+                            let fusion_base = self.fusion_base.clone();
+                            current_task = Arc::new(Some(task));
+                            let task_copy = current_task.clone();
+                            let handle = thread::spawn(move || {
+                                let result = invoke_task(
+                                    &task_copy.as_ref().as_ref().unwrap(),
+                                    &fusion_base,
+                                );
+                                tx_task.send(result).unwrap();
+                            });
+                            task_handle = Some(handle);
+                            log::info!("Executor {} accepted a new task, executing...", self.id);
+                        }
+                        Err(e) => {
+                            log::error!("Executor {} failed to pull task: {}", self.id, e);
+                        }
+                    };
+                }
                 Err(e) => {
-                    log::debug!("PullTask Error: {:?}", e);
-                    continue;
+                    log::error!("Executor {} failed to heartbeat: {}", self.id, e);
+                    return Err(e);
                 }
-            };
-
-            let result = self.invoke_task(&staged_task);
-            match result {
-                Ok(_) => log::debug!(
-                    "InvokeTask: {:?}, {:?}, success",
-                    staged_task.user_id,
-                    staged_task.function_id
-                ),
-                Err(_) => log::debug!(
-                    "InvokeTask: {:?}, {:?}, failure",
-                    staged_task.user_id,
-                    staged_task.function_id
-                ),
+                _ => {}
             }
-            log::debug!("InvokeTask result: {:?}", result);
 
-            match self.update_task_result(&staged_task.task_id, result) {
-                Ok(_) => (),
-                Err(e) => {
-                    log::error!("UpdateResult Error: {:?}", e);
-                    continue;
+            match rx.try_recv() {
+                Ok(result) => {
+                    let task_unwrapped = current_task.as_ref().as_ref().unwrap();
+                    match result {
+                        Ok(_) => log::debug!(
+                            "InvokeTask: {:?}, {:?}, success",
+                            task_unwrapped.task_id,
+                            task_unwrapped.function_id
+                        ),
+                        Err(_) => log::debug!(
+                            "InvokeTask: {:?}, {:?}, failure",
+                            task_unwrapped.task_id,
+                            task_unwrapped.function_id
+                        ),
+                    }
+                    log::debug!("InvokeTask result: {:?}", result);
+                    let task_copy = current_task.clone();
+                    match self
+                        .update_task_result(&task_copy.as_ref().as_ref().unwrap().task_id, result)
+                    {
+                        Ok(_) => (),
+                        Err(e) => {
+                            log::error!("UpdateResult Error: {:?}", e);
+                            continue;
+                        }
+                    }
+                    current_task = Arc::new(None);
+                    task_handle.unwrap().join().unwrap();
+                    task_handle = None;
+                    self.status = ExecutorStatus::Idle;
                 }
+                Err(mpsc::TryRecvError::Disconnected) => {
+                    log::error!(
+                        "Executor {} failed to receive, sender disconnected",
+                        self.id
+                    );
+                }
+                // received nothing
+                Err(_) => {}
             }
         }
     }
 
     fn pull_task(&mut self) -> Result<StagedTask> {
-        let request = PullTaskRequest {};
+        let request = PullTaskRequest {
+            executor_id: self.id,
+        };
         let response = self
             .scheduler_client
             .clone()
@@ -113,25 +173,20 @@ impl TeaclaveExecutionService {
         Ok(response.staged_task)
     }
 
-    fn invoke_task(&mut self, task: &StagedTask) -> Result<TaskOutputs> {
-        self.update_task_status(&task.task_id, TaskStatus::Running)?;
-
-        let file_mgr = TaskFileManager::new(
-            WORKER_BASE_DIR,
-            &self.fusion_base,
-            &task.task_id,
-            &task.input_data,
-            &task.output_data,
-        )?;
-        let invocation = prepare_task(&task, &file_mgr)?;
-
-        log::debug!("Invoke function: {:?}", invocation);
-        let worker = Worker::default();
-        let summary = worker.invoke_function(invocation)?;
+    fn heartbeat(&mut self) -> Result<ExecutorCommand> {
+        let request = HeartbeatRequest {
+            executor_id: self.id,
+            status: self.status,
+        };
+        let response = self
+            .scheduler_client
+            .clone()
+            .lock()
+            .map_err(|_| anyhow::anyhow!("Cannot lock scheduler client"))?
+            .heartbeat(request)?;
 
-        let outputs_tag = finalize_task(&file_mgr)?;
-        let task_outputs = TaskOutputs::new(summary.as_bytes(), outputs_tag);
-        Ok(task_outputs)
+        log::debug!("heartbeat_with_result response: {:?}", response);
+        Ok(response.command)
     }
 
     fn update_task_result(
@@ -164,6 +219,25 @@ impl TeaclaveExecutionService {
     }
 }
 
+fn invoke_task(task: &StagedTask, fusion_base: &PathBuf) -> Result<TaskOutputs> {
+    let file_mgr = TaskFileManager::new(
+        WORKER_BASE_DIR,
+        fusion_base,
+        &task.task_id,
+        &task.input_data,
+        &task.output_data,
+    )?;
+    let invocation = prepare_task(&task, &file_mgr)?;
+
+    log::debug!("Invoke function: {:?}", invocation);
+    let worker = Worker::default();
+    let summary = worker.invoke_function(invocation)?;
+
+    let outputs_tag = finalize_task(&file_mgr)?;
+    let task_outputs = TaskOutputs::new(summary.as_bytes(), outputs_tag);
+    Ok(task_outputs)
+}
+
 fn prepare_task(task: &StagedTask, file_mgr: &TaskFileManager) -> Result<StagedFunction> {
     let input_files = file_mgr.prepare_staged_inputs()?;
     let output_files = file_mgr.prepare_staged_outputs()?;
diff --git a/services/frontend/enclave/src/service.rs b/services/frontend/enclave/src/service.rs
index d6d2b08..bd0d3bc 100644
--- a/services/frontend/enclave/src/service.rs
+++ b/services/frontend/enclave/src/service.rs
@@ -27,16 +27,16 @@ use teaclave_proto::teaclave_authentication_service::{
 use teaclave_proto::teaclave_common::UserCredential;
 use teaclave_proto::teaclave_frontend_service::{
     ApproveTaskRequest, ApproveTaskResponse, AssignDataRequest, AssignDataResponse,
-    CreateTaskRequest, CreateTaskResponse, DeleteFunctionRequest, DeleteFunctionResponse,
-    GetFunctionRequest, GetFunctionResponse, GetInputFileRequest, GetInputFileResponse,
-    GetOutputFileRequest, GetOutputFileResponse, GetTaskRequest, GetTaskResponse,
-    InvokeTaskRequest, InvokeTaskResponse, ListFunctionsRequest, ListFunctionsResponse,
-    RegisterFunctionRequest, RegisterFunctionResponse, RegisterFusionOutputRequest,
-    RegisterFusionOutputResponse, RegisterInputFileRequest, RegisterInputFileResponse,
-    RegisterInputFromOutputRequest, RegisterInputFromOutputResponse, RegisterOutputFileRequest,
-    RegisterOutputFileResponse, TeaclaveFrontend, UpdateFunctionRequest, UpdateFunctionResponse,
-    UpdateInputFileRequest, UpdateInputFileResponse, UpdateOutputFileRequest,
-    UpdateOutputFileResponse,
+    CancelTaskRequest, CancelTaskResponse, CreateTaskRequest, CreateTaskResponse,
+    DeleteFunctionRequest, DeleteFunctionResponse, GetFunctionRequest, GetFunctionResponse,
+    GetInputFileRequest, GetInputFileResponse, GetOutputFileRequest, GetOutputFileResponse,
+    GetTaskRequest, GetTaskResponse, InvokeTaskRequest, InvokeTaskResponse, ListFunctionsRequest,
+    ListFunctionsResponse, RegisterFunctionRequest, RegisterFunctionResponse,
+    RegisterFusionOutputRequest, RegisterFusionOutputResponse, RegisterInputFileRequest,
+    RegisterInputFileResponse, RegisterInputFromOutputRequest, RegisterInputFromOutputResponse,
+    RegisterOutputFileRequest, RegisterOutputFileResponse, TeaclaveFrontend, UpdateFunctionRequest,
+    UpdateFunctionResponse, UpdateInputFileRequest, UpdateInputFileResponse,
+    UpdateOutputFileRequest, UpdateOutputFileResponse,
 };
 use teaclave_proto::teaclave_management_service::TeaclaveManagementClient;
 use teaclave_rpc::endpoint::Endpoint;
@@ -58,10 +58,22 @@ macro_rules! authentication_and_forward_to_management {
                 if authorize(&claims, $endpoint) {
                     claims
                 } else {
+                    log::debug!(
+                        "User is not authorized to access endpoint: {}, func: {}",
+                        stringify!($endpoint),
+                        stringify!($func)
+                    );
                     bail!(TeaclaveFrontendError::AuthenticationError);
                 }
             }
-            _ => bail!(TeaclaveFrontendError::AuthenticationError),
+            _ => {
+                log::debug!(
+                    "User is not authenticated to access endpoint: {}, func: {}",
+                    stringify!($endpoint),
+                    stringify!($func)
+                );
+                bail!(TeaclaveFrontendError::AuthenticationError)
+            }
         };
 
         let client = $service.management_client.clone();
@@ -101,6 +113,7 @@ enum Endpoints {
     AssignData,
     ApproveTask,
     InvokeTask,
+    CancelTask,
 }
 
 fn authorize(claims: &UserAuthClaims, request: Endpoints) -> bool {
@@ -129,7 +142,8 @@ fn authorize(claims: &UserAuthClaims, request: Endpoints) -> bool {
         | Endpoints::GetTask
         | Endpoints::AssignData
         | Endpoints::ApproveTask
-        | Endpoints::InvokeTask => role.is_data_owner(),
+        | Endpoints::InvokeTask
+        | Endpoints::CancelTask => role.is_data_owner(),
         Endpoints::GetFunction | Endpoints::ListFunctions => {
             role.is_function_owner() || role.is_data_owner()
         }
@@ -375,6 +389,13 @@ impl TeaclaveFrontend for TeaclaveFrontendService {
     ) -> TeaclaveServiceResponseResult<InvokeTaskResponse> {
         authentication_and_forward_to_management!(self, request, invoke_task, Endpoints::InvokeTask)
     }
+
+    fn cancel_task(
+        &self,
+        request: Request<CancelTaskRequest>,
+    ) -> TeaclaveServiceResponseResult<CancelTaskResponse> {
+        authentication_and_forward_to_management!(self, request, cancel_task, Endpoints::CancelTask)
+    }
 }
 
 impl TeaclaveFrontendService {
diff --git a/services/management/enclave/src/service.rs b/services/management/enclave/src/service.rs
index a51dd6b..0293450 100644
--- a/services/management/enclave/src/service.rs
+++ b/services/management/enclave/src/service.rs
@@ -23,16 +23,16 @@ use std::prelude::v1::*;
 use std::sync::{Arc, SgxMutex as Mutex};
 use teaclave_proto::teaclave_frontend_service::{
     ApproveTaskRequest, ApproveTaskResponse, AssignDataRequest, AssignDataResponse,
-    CreateTaskRequest, CreateTaskResponse, DeleteFunctionRequest, DeleteFunctionResponse,
-    GetFunctionRequest, GetFunctionResponse, GetInputFileRequest, GetInputFileResponse,
-    GetOutputFileRequest, GetOutputFileResponse, GetTaskRequest, GetTaskResponse,
-    InvokeTaskRequest, InvokeTaskResponse, ListFunctionsRequest, ListFunctionsResponse,
-    RegisterFunctionRequest, RegisterFunctionResponse, RegisterFusionOutputRequest,
-    RegisterFusionOutputResponse, RegisterInputFileRequest, RegisterInputFileResponse,
-    RegisterInputFromOutputRequest, RegisterInputFromOutputResponse, RegisterOutputFileRequest,
-    RegisterOutputFileResponse, UpdateFunctionRequest, UpdateFunctionResponse,
-    UpdateInputFileRequest, UpdateInputFileResponse, UpdateOutputFileRequest,
-    UpdateOutputFileResponse,
+    CancelTaskRequest, CancelTaskResponse, CreateTaskRequest, CreateTaskResponse,
+    DeleteFunctionRequest, DeleteFunctionResponse, GetFunctionRequest, GetFunctionResponse,
+    GetInputFileRequest, GetInputFileResponse, GetOutputFileRequest, GetOutputFileResponse,
+    GetTaskRequest, GetTaskResponse, InvokeTaskRequest, InvokeTaskResponse, ListFunctionsRequest,
+    ListFunctionsResponse, RegisterFunctionRequest, RegisterFunctionResponse,
+    RegisterFusionOutputRequest, RegisterFusionOutputResponse, RegisterInputFileRequest,
+    RegisterInputFileResponse, RegisterInputFromOutputRequest, RegisterInputFromOutputResponse,
+    RegisterOutputFileRequest, RegisterOutputFileResponse, UpdateFunctionRequest,
+    UpdateFunctionResponse, UpdateInputFileRequest, UpdateInputFileResponse,
+    UpdateOutputFileRequest, UpdateOutputFileResponse,
 };
 use teaclave_proto::teaclave_management_service::TeaclaveManagement;
 use teaclave_proto::teaclave_storage_service::{
@@ -630,6 +630,54 @@ impl TeaclaveManagement for TeaclaveManagementService {
 
         Ok(InvokeTaskResponse)
     }
+
+    // access_control:
+    // 1) user_id == task.creator
+    // 2) user_role == admin
+    fn cancel_task(
+        &self,
+        request: Request<CancelTaskRequest>,
+    ) -> TeaclaveServiceResponseResult<CancelTaskResponse> {
+        let user_id = self.get_request_user_id(request.metadata())?;
+        let role = self.get_request_role(request.metadata())?;
+        let request = request.message;
+
+        let ts: TaskState = self
+            .read_from_db(&request.task_id)
+            .map_err(|_| TeaclaveManagementServiceError::PermissionDenied)?;
+
+        match role {
+            UserRole::PlatformAdmin => {}
+            _ => {
+                ensure!(
+                    ts.has_creator(&user_id),
+                    TeaclaveManagementServiceError::PermissionDenied
+                );
+            }
+        }
+
+        match ts.status {
+            // need scheduler to cancel the task
+            TaskStatus::Staged | TaskStatus::Running => {
+                self.enqueue_to_db(CANCEL_QUEUE_KEY.as_bytes(), &ts)?;
+            }
+            _ => {
+                // race will not affect correctness/privacy
+                let task: Task<Cancel> = ts.try_into().map_err(|e| {
+                    log::warn!("Cancel state error: {:?}", e);
+                    TeaclaveManagementServiceError::PermissionDenied
+                })?;
+
+                log::debug!("Canceled Task: {:?}", task);
+
+                let ts: TaskState = task.into();
+                self.write_to_db(&ts)
+                    .map_err(|_| TeaclaveManagementServiceError::StorageError)?;
+            }
+        }
+
+        Ok(CancelTaskResponse)
+    }
 }
 
 impl TeaclaveManagementService {
diff --git a/services/proto/src/proto/teaclave_common.proto b/services/proto/src/proto/teaclave_common.proto
index b2bc14f..4a7e88e 100644
--- a/services/proto/src/proto/teaclave_common.proto
+++ b/services/proto/src/proto/teaclave_common.proto
@@ -48,6 +48,19 @@ enum TaskStatus {
   Staged = 3;
   Running = 4;
   Finished = 10;
+  Canceled = 20;
+  Failed = 99;
+}
+
+enum ExecutorStatus {
+  Idle = 0;
+  Executing = 1;
+}
+
+enum ExecutorCommand {
+  NoAction = 0;
+  Stop = 1;
+  NewTask = 2;
 }
 
 message TaskResult {
diff --git a/services/proto/src/proto/teaclave_frontend_service.proto b/services/proto/src/proto/teaclave_frontend_service.proto
index 8fcfba0..1dfb038 100644
--- a/services/proto/src/proto/teaclave_frontend_service.proto
+++ b/services/proto/src/proto/teaclave_frontend_service.proto
@@ -232,6 +232,12 @@ message InvokeTaskRequest {
 
 message InvokeTaskResponse { }
 
+message CancelTaskRequest {
+  string task_id = 1;
+}
+
+message CancelTaskResponse { }
+
 service TeaclaveFrontend {
   rpc RegisterInputFile (RegisterInputFileRequest) returns (RegisterInputFileResponse);
   rpc RegisterOutputFile (RegisterOutputFileRequest) returns (RegisterOutputFileResponse);
@@ -251,5 +257,5 @@ service TeaclaveFrontend {
   rpc AssignData (AssignDataRequest) returns (AssignDataResponse);
   rpc ApproveTask (ApproveTaskRequest) returns (ApproveTaskResponse);
   rpc InvokeTask (InvokeTaskRequest) returns (InvokeTaskResponse);
-
+  rpc CancelTask (CancelTaskRequest) returns (CancelTaskResponse);
 }
diff --git a/services/proto/src/proto/teaclave_management_service.proto b/services/proto/src/proto/teaclave_management_service.proto
index 2a71fd4..82c50a4 100644
--- a/services/proto/src/proto/teaclave_management_service.proto
+++ b/services/proto/src/proto/teaclave_management_service.proto
@@ -43,4 +43,5 @@ service TeaclaveManagement {
   rpc AssignData (teaclave_frontend_service_proto.AssignDataRequest) returns (teaclave_frontend_service_proto.AssignDataResponse);
   rpc ApproveTask (teaclave_frontend_service_proto.ApproveTaskRequest) returns (teaclave_frontend_service_proto.ApproveTaskResponse);
   rpc InvokeTask (teaclave_frontend_service_proto.InvokeTaskRequest) returns (teaclave_frontend_service_proto.InvokeTaskResponse);
+  rpc CancelTask (teaclave_frontend_service_proto.CancelTaskRequest) returns (teaclave_frontend_service_proto.CancelTaskResponse);
 }
diff --git a/services/proto/src/proto/teaclave_scheduler_service.proto b/services/proto/src/proto/teaclave_scheduler_service.proto
index f0b0771..9f9f3ad 100644
--- a/services/proto/src/proto/teaclave_scheduler_service.proto
+++ b/services/proto/src/proto/teaclave_scheduler_service.proto
@@ -28,7 +28,17 @@ message SubscribeResponse {
   bool success = 1;
 }
 
-message PullTaskRequest {}
+message HeartbeatRequest {
+  string executor_id = 1;
+  teaclave_common_proto.ExecutorStatus status = 2;
+}
+message HeartbeatResponse {
+  teaclave_common_proto.ExecutorCommand command = 1;
+}
+
+message PullTaskRequest {
+  string executor_id = 1;
+}
 message PullTaskResponse {
   bytes staged_task = 1;
 }
@@ -60,4 +70,6 @@ service TeaclaveScheduler {
 
   rpc UpdateTaskStatus(UpdateTaskStatusRequest) returns (UpdateTaskStatusResponse);
   rpc UpdateTaskResult(UpdateTaskResultRequest) returns (UpdateTaskResultResponse);
+
+  rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
 }
diff --git a/services/proto/src/teaclave_common.rs b/services/proto/src/teaclave_common.rs
index 5af0c61..5937220 100644
--- a/services/proto/src/teaclave_common.rs
+++ b/services/proto/src/teaclave_common.rs
@@ -111,6 +111,8 @@ pub fn i32_to_task_status(status: i32) -> Result<TaskStatus> {
         Some(proto::TaskStatus::Staged) => TaskStatus::Staged,
         Some(proto::TaskStatus::Running) => TaskStatus::Running,
         Some(proto::TaskStatus::Finished) => TaskStatus::Finished,
+        Some(proto::TaskStatus::Failed) => TaskStatus::Failed,
+        Some(proto::TaskStatus::Canceled) => TaskStatus::Canceled,
         None => bail!("invalid task status"),
     };
     Ok(ret)
@@ -124,6 +126,8 @@ pub fn i32_from_task_status(status: TaskStatus) -> i32 {
         TaskStatus::Staged => proto::TaskStatus::Staged as i32,
         TaskStatus::Running => proto::TaskStatus::Running as i32,
         TaskStatus::Finished => proto::TaskStatus::Finished as i32,
+        TaskStatus::Failed => proto::TaskStatus::Failed as i32,
+        TaskStatus::Canceled => proto::TaskStatus::Canceled as i32,
     }
 }
 
@@ -194,3 +198,104 @@ impl std::convert::From<TaskResult> for proto::TaskResult {
         proto::TaskResult { result: opt_result }
     }
 }
+
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub enum ExecutorStatus {
+    Idle,
+    Executing,
+}
+
+impl std::convert::TryFrom<proto::ExecutorStatus> for ExecutorStatus {
+    type Error = Error;
+    fn try_from(status: proto::ExecutorStatus) -> Result<Self> {
+        match status {
+            proto::ExecutorStatus::Idle => Ok(ExecutorStatus::Idle),
+            proto::ExecutorStatus::Executing => Ok(ExecutorStatus::Executing),
+        }
+    }
+}
+
+impl std::convert::TryFrom<i32> for ExecutorStatus {
+    type Error = Error;
+    fn try_from(status: i32) -> Result<Self> {
+        match proto::ExecutorStatus::from_i32(status) {
+            Some(proto::ExecutorStatus::Idle) => Ok(ExecutorStatus::Idle),
+            Some(proto::ExecutorStatus::Executing) => Ok(ExecutorStatus::Executing),
+            _ => bail!("invalid executor status"),
+        }
+    }
+}
+
+impl std::convert::From<ExecutorStatus> for i32 {
+    fn from(status: ExecutorStatus) -> Self {
+        match status {
+            ExecutorStatus::Idle => proto::ExecutorStatus::Idle as i32,
+            ExecutorStatus::Executing => proto::ExecutorStatus::Executing as i32,
+        }
+    }
+}
+
+impl std::convert::From<ExecutorStatus> for proto::ExecutorStatus {
+    fn from(status: ExecutorStatus) -> Self {
+        match status {
+            ExecutorStatus::Idle => proto::ExecutorStatus::Idle,
+            ExecutorStatus::Executing => proto::ExecutorStatus::Executing,
+        }
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum ExecutorCommand {
+    NoAction,
+    Stop,
+    NewTask,
+}
+
+impl Default for ExecutorCommand {
+    fn default() -> Self {
+        ExecutorCommand::NoAction
+    }
+}
+
+impl std::convert::TryFrom<proto::ExecutorCommand> for ExecutorCommand {
+    type Error = Error;
+    fn try_from(command: proto::ExecutorCommand) -> Result<Self> {
+        match command {
+            proto::ExecutorCommand::NoAction => Ok(ExecutorCommand::NoAction),
+            proto::ExecutorCommand::Stop => Ok(ExecutorCommand::Stop),
+            proto::ExecutorCommand::NewTask => Ok(ExecutorCommand::NewTask),
+        }
+    }
+}
+
+impl std::convert::From<ExecutorCommand> for proto::ExecutorCommand {
+    fn from(command: ExecutorCommand) -> Self {
+        match command {
+            ExecutorCommand::NoAction => proto::ExecutorCommand::NoAction,
+            ExecutorCommand::Stop => proto::ExecutorCommand::Stop,
+            ExecutorCommand::NewTask => proto::ExecutorCommand::NewTask,
+        }
+    }
+}
+
+impl std::convert::TryFrom<i32> for ExecutorCommand {
+    type Error = Error;
+    fn try_from(command: i32) -> Result<Self> {
+        match proto::ExecutorCommand::from_i32(command) {
+            Some(proto::ExecutorCommand::NoAction) => Ok(ExecutorCommand::NoAction),
+            Some(proto::ExecutorCommand::Stop) => Ok(ExecutorCommand::Stop),
+            Some(proto::ExecutorCommand::NewTask) => Ok(ExecutorCommand::NewTask),
+            _ => bail!("invalid executor status"),
+        }
+    }
+}
+
+impl std::convert::From<ExecutorCommand> for i32 {
+    fn from(command: ExecutorCommand) -> Self {
+        match command {
+            ExecutorCommand::NoAction => proto::ExecutorCommand::NoAction as i32,
+            ExecutorCommand::Stop => proto::ExecutorCommand::Stop as i32,
+            ExecutorCommand::NewTask => proto::ExecutorCommand::NewTask as i32,
+        }
+    }
+}
diff --git a/services/proto/src/teaclave_frontend_service.rs b/services/proto/src/teaclave_frontend_service.rs
index 7756c41..ba5d870 100644
--- a/services/proto/src/teaclave_frontend_service.rs
+++ b/services/proto/src/teaclave_frontend_service.rs
@@ -702,6 +702,22 @@ impl InvokeTaskRequest {
 #[derive(Debug)]
 pub struct InvokeTaskResponse;
 
+#[into_request(TeaclaveManagementRequest::CancelTask)]
+#[into_request(TeaclaveFrontendRequest::CancelTask)]
+#[derive(Debug)]
+pub struct CancelTaskRequest {
+    pub task_id: ExternalID,
+}
+
+impl CancelTaskRequest {
+    pub fn new(task_id: ExternalID) -> Self {
+        Self { task_id }
+    }
+}
+
+#[derive(Debug)]
+pub struct CancelTaskResponse;
+
 impl std::convert::TryFrom<proto::RegisterInputFileRequest> for RegisterInputFileRequest {
     type Error = Error;
 
@@ -1664,3 +1680,36 @@ impl From<InvokeTaskResponse> for proto::InvokeTaskResponse {
         Self {}
     }
 }
+
+impl std::convert::TryFrom<proto::CancelTaskRequest> for CancelTaskRequest {
+    type Error = Error;
+
+    fn try_from(proto: proto::CancelTaskRequest) -> Result<Self> {
+        let task_id = proto.task_id.try_into()?;
+        let ret = Self { task_id };
+
+        Ok(ret)
+    }
+}
+
+impl From<CancelTaskRequest> for proto::CancelTaskRequest {
+    fn from(request: CancelTaskRequest) -> Self {
+        Self {
+            task_id: request.task_id.to_string(),
+        }
+    }
+}
+
+impl std::convert::TryFrom<proto::CancelTaskResponse> for CancelTaskResponse {
+    type Error = Error;
+
+    fn try_from(_proto: proto::CancelTaskResponse) -> Result<Self> {
+        Ok(CancelTaskResponse)
+    }
+}
+
+impl From<CancelTaskResponse> for proto::CancelTaskResponse {
+    fn from(_response: CancelTaskResponse) -> Self {
+        Self {}
+    }
+}
diff --git a/services/proto/src/teaclave_management_service.rs b/services/proto/src/teaclave_management_service.rs
index 4c4169d..c868d59 100644
--- a/services/proto/src/teaclave_management_service.rs
+++ b/services/proto/src/teaclave_management_service.rs
@@ -66,3 +66,5 @@ pub type ApproveTaskRequest = crate::teaclave_frontend_service::ApproveTaskReque
 pub type ApproveTaskResponse = crate::teaclave_frontend_service::ApproveTaskResponse;
 pub type InvokeTaskRequest = crate::teaclave_frontend_service::InvokeTaskRequest;
 pub type InvokeTaskResponse = crate::teaclave_frontend_service::InvokeTaskResponse;
+pub type CancelTaskRequest = crate::teaclave_frontend_service::CancelTaskRequest;
+pub type CancelTaskResponse = crate::teaclave_frontend_service::CancelTaskResponse;
diff --git a/services/proto/src/teaclave_scheduler_service.rs b/services/proto/src/teaclave_scheduler_service.rs
index 585a9c4..afcd1e6 100644
--- a/services/proto/src/teaclave_scheduler_service.rs
+++ b/services/proto/src/teaclave_scheduler_service.rs
@@ -21,7 +21,9 @@
 use std::collections::HashMap;
 use std::prelude::v1::*;
 
-use crate::teaclave_common::{i32_from_task_status, i32_to_task_status};
+use crate::teaclave_common::{
+    i32_from_task_status, i32_to_task_status, ExecutorCommand, ExecutorStatus,
+};
 use crate::teaclave_scheduler_service_proto as proto;
 use anyhow::{Error, Result};
 use core::convert::TryInto;
@@ -42,7 +44,9 @@ pub struct SubscribeResponse {
 }
 
 #[into_request(TeaclaveSchedulerRequest::PullTask)]
-pub struct PullTaskRequest {}
+pub struct PullTaskRequest {
+    pub executor_id: Uuid,
+}
 
 #[into_request(TeaclaveSchedulerResponse::PullTask)]
 #[derive(Debug)]
@@ -50,6 +54,33 @@ pub struct PullTaskResponse {
     pub staged_task: StagedTask,
 }
 
+#[into_request(TeaclaveSchedulerRequest::Heartbeat)]
+pub struct HeartbeatRequest {
+    pub executor_id: Uuid,
+    pub status: ExecutorStatus,
+}
+
+impl HeartbeatRequest {
+    pub fn new(executor_id: Uuid, status: ExecutorStatus) -> Self {
+        Self {
+            executor_id,
+            status,
+        }
+    }
+}
+
+#[into_request(TeaclaveSchedulerResponse::Heartbeat)]
+#[derive(Debug)]
+pub struct HeartbeatResponse {
+    pub command: ExecutorCommand,
+}
+
+impl HeartbeatResponse {
+    pub fn new(command: ExecutorCommand) -> Self {
+        Self { command }
+    }
+}
+
 impl PullTaskResponse {
     pub fn new(staged_task: StagedTask) -> Self {
         Self { staged_task }
@@ -140,14 +171,16 @@ impl std::convert::From<SubscribeResponse> for proto::SubscribeResponse {
 impl std::convert::TryFrom<proto::PullTaskRequest> for PullTaskRequest {
     type Error = Error;
     fn try_from(proto: proto::PullTaskRequest) -> Result<Self> {
-        let ret = Self {};
+        let executor_id = Uuid::parse_str(&proto.executor_id)?;
+        let ret = Self { executor_id };
         Ok(ret)
     }
 }
 
 impl std::convert::From<PullTaskRequest> for proto::PullTaskRequest {
     fn from(req: PullTaskRequest) -> Self {
-        proto::PullTaskRequest {}
+        let executor_id = req.executor_id.to_string();
+        proto::PullTaskRequest { executor_id }
     }
 }
 
@@ -168,6 +201,46 @@ impl std::convert::From<PullTaskResponse> for proto::PullTaskResponse {
     }
 }
 
+impl std::convert::TryFrom<proto::HeartbeatRequest> for HeartbeatRequest {
+    type Error = Error;
+    fn try_from(proto: proto::HeartbeatRequest) -> Result<Self> {
+        let executor_id = Uuid::parse_str(&proto.executor_id)?;
+        let status = proto.status.try_into()?;
+        let ret = Self {
+            executor_id,
+            status,
+        };
+        Ok(ret)
+    }
+}
+
+impl std::convert::From<HeartbeatRequest> for proto::HeartbeatRequest {
+    fn from(req: HeartbeatRequest) -> Self {
+        let executor_id = req.executor_id.to_string();
+        proto::HeartbeatRequest {
+            executor_id,
+            status: req.status.into(),
+        }
+    }
+}
+
+impl std::convert::TryFrom<proto::HeartbeatResponse> for HeartbeatResponse {
+    type Error = Error;
+    fn try_from(proto: proto::HeartbeatResponse) -> Result<Self> {
+        let command = proto.command.try_into()?;
+        let ret = Self { command };
+        Ok(ret)
+    }
+}
+
+impl std::convert::From<HeartbeatResponse> for proto::HeartbeatResponse {
+    fn from(req: HeartbeatResponse) -> Self {
+        proto::HeartbeatResponse {
+            command: req.command.into(),
+        }
+    }
+}
+
 impl std::convert::TryFrom<proto::UpdateTaskResultRequest> for UpdateTaskResultRequest {
     type Error = Error;
     fn try_from(proto: proto::UpdateTaskResultRequest) -> Result<Self> {
diff --git a/services/scheduler/enclave/src/lib.rs b/services/scheduler/enclave/src/lib.rs
index 1285fa3..202ab14 100644
--- a/services/scheduler/enclave/src/lib.rs
+++ b/services/scheduler/enclave/src/lib.rs
@@ -22,6 +22,7 @@ extern crate sgx_tstd as std;
 
 #[cfg(feature = "mesalock_sgx")]
 use std::prelude::v1::*;
+use std::sync::{Arc, SgxMutex as Mutex};
 
 #[macro_use]
 extern crate log;
@@ -96,15 +97,29 @@ fn start_service(config: &RuntimeConfig) -> Result<()> {
     )?;
     info!(" Starting Scheduler: setup storage endpoint finished ...");
 
-    let service = service::TeaclaveSchedulerService::new(storage_service_endpoint)?;
+    let service_resources = service::TeaclaveSchedulerResources::new(storage_service_endpoint)?;
+
+    let service_resources = Arc::new(Mutex::new(service_resources));
+
+    let service = service::TeaclaveSchedulerService::new(&service_resources);
+
+    let deamon = service::TeaclaveSchedulerDeamon::new(&service_resources);
+
+    let deamon_handle = std::thread::spawn(move || {
+        let _ = deamon.run();
+    });
 
     info!(" Starting Scheduler: start listening ...");
+
     match server.start(service) {
         Ok(_) => (),
         Err(e) => {
             error!("Service exit, error: {}.", e);
         }
     }
+
+    deamon_handle.join().unwrap();
+
     Ok(())
 }
 
diff --git a/services/scheduler/enclave/src/service.rs b/services/scheduler/enclave/src/service.rs
index f7beff6..0f835b0 100644
--- a/services/scheduler/enclave/src/service.rs
+++ b/services/scheduler/enclave/src/service.rs
@@ -17,11 +17,14 @@
 
 use crate::error::TeaclaveSchedulerError;
 
-use std::collections::VecDeque;
+use std::collections::{HashMap, HashSet, VecDeque};
 use std::convert::TryInto;
 use std::prelude::v1::*;
 use std::sync::{Arc, SgxMutex as Mutex};
+use std::time::{Duration, SystemTime};
+use std::untrusted::time::SystemTimeEx;
 
+use teaclave_proto::teaclave_common::{ExecutorCommand, ExecutorStatus};
 use teaclave_proto::teaclave_scheduler_service::*;
 use teaclave_proto::teaclave_storage_service::*;
 use teaclave_rpc::endpoint::Endpoint;
@@ -33,14 +36,114 @@ use uuid::Uuid;
 use anyhow::anyhow;
 use anyhow::Result;
 
+const EXECUTOR_TIMEOUT_SECS: u64 = 30;
+
 #[teaclave_service(teaclave_scheduler_service, TeaclaveScheduler, TeaclaveSchedulerError)]
 #[derive(Clone)]
 pub(crate) struct TeaclaveSchedulerService {
+    resources: Arc<Mutex<TeaclaveSchedulerResources>>,
+}
+
+pub struct TeaclaveSchedulerResources {
     storage_client: Arc<Mutex<TeaclaveStorageClient>>,
-    task_queue: Arc<Mutex<VecDeque<StagedTask>>>,
+    // map executor_id to task_id
+    task_queue: VecDeque<StagedTask>,
+    executors_tasks: HashMap<Uuid, Uuid>,
+    executors_last_heartbeat: HashMap<Uuid, SystemTime>,
+    executors_status: HashMap<Uuid, ExecutorStatus>,
+    tasks_to_cancel: HashSet<Uuid>,
+}
+
+pub struct TeaclaveSchedulerDeamon {
+    resources: Arc<Mutex<TeaclaveSchedulerResources>>,
+}
+
+impl TeaclaveSchedulerDeamon {
+    pub fn run(&self) -> Result<()> {
+        loop {
+            std::thread::sleep(std::time::Duration::from_secs(2));
+
+            let mut resources = self
+                .resources
+                .lock()
+                .map_err(|_| anyhow!("Cannot lock scheduler resources"))?;
+
+            let key = StagedTask::get_queue_key().as_bytes();
+
+            log::debug!("Pulling task/cancel queue");
+            while let Ok(canceled_task) = resources.pull_cancel_queue() {
+                resources.tasks_to_cancel.insert(canceled_task.task_id);
+            }
+
+            while let Ok(staged_task) = resources.pull_staged_task::<StagedTask>(key) {
+                log::debug!("deamon: Pulled staged task: {:?}", staged_task);
+                resources.task_queue.push_back(staged_task);
+            }
+
+            let current_time = SystemTime::now();
+            let mut to_remove = Vec::new();
+            for (executor_id, last_heartbeat) in resources.executors_last_heartbeat.iter() {
+                if current_time
+                    .duration_since(*last_heartbeat)
+                    .unwrap_or(Duration::from_secs(EXECUTOR_TIMEOUT_SECS + 1))
+                    > Duration::from_secs(EXECUTOR_TIMEOUT_SECS)
+                {
+                    // executor lost
+                    to_remove.push(*executor_id);
+                    log::warn!("Executor {} lost", executor_id);
+                }
+            }
+
+            for executor_id in to_remove {
+                resources.executors_last_heartbeat.remove(&executor_id);
+                resources.executors_status.remove(&executor_id);
+                match resources.executors_tasks.remove(&executor_id) {
+                    Some(task_id) => {
+                        // report task faliure
+                        let ts = resources.get_task_state(&task_id)?;
+                        if ts.is_ended() {
+                            continue;
+                        }
+
+                        log::warn!("Executor {} lost, canceling task {}", executor_id, task_id);
+
+                        let mut task: Task<Fail> = ts.try_into()?;
+
+                        log::debug!("Task failed because of Executor lost: Task {:?}", task);
+                        // Only TaskStatus::Running/Staged is allowed here.
+                        let result_err =
+                            TaskResult::Err(TaskFailure::new("Runtime Error: Executor Timeout"));
+
+                        // Updating task result means we have finished execution
+                        task.update_result(result_err)?;
+
+                        let ts = TaskState::from(task);
+                        resources.put_into_db(&ts)?;
+                    }
+                    None => {}
+                }
+            }
+        }
+    }
 }
 
 impl TeaclaveSchedulerService {
+    pub fn new(resources: &Arc<Mutex<TeaclaveSchedulerResources>>) -> Self {
+        Self {
+            resources: resources.clone(),
+        }
+    }
+}
+
+impl TeaclaveSchedulerDeamon {
+    pub fn new(resources: &Arc<Mutex<TeaclaveSchedulerResources>>) -> Self {
+        Self {
+            resources: resources.clone(),
+        }
+    }
+}
+
+impl TeaclaveSchedulerResources {
     pub(crate) fn new(storage_service_endpoint: Endpoint) -> Result<Self> {
         let mut i = 0;
         let channel = loop {
@@ -55,13 +158,22 @@ impl TeaclaveSchedulerService {
             std::thread::sleep(std::time::Duration::from_secs(3));
         };
         let storage_client = Arc::new(Mutex::new(TeaclaveStorageClient::new(channel)?));
-        let task_queue = Arc::new(Mutex::new(VecDeque::new()));
-        let service = Self {
+        let task_queue = VecDeque::new();
+        let executors_tasks = HashMap::new();
+        let executors_status = HashMap::new();
+        let tasks_to_cancel = HashSet::new();
+        let executors_last_heartbeat = HashMap::new();
+
+        let resources = TeaclaveSchedulerResources {
             storage_client,
             task_queue,
+            executors_tasks,
+            executors_last_heartbeat,
+            executors_status,
+            tasks_to_cancel,
         };
 
-        Ok(service)
+        Ok(resources)
     }
 
     fn pull_staged_task<T: Storable>(&self, key: &[u8]) -> TeaclaveServiceResponseResult<T> {
@@ -76,6 +188,33 @@ impl TeaclaveSchedulerService {
             .map_err(|_| TeaclaveSchedulerError::DataError.into())
     }
 
+    fn pull_cancel_queue(&self) -> Result<TaskState> {
+        let dequeue_request = DequeueRequest::new(CANCEL_QUEUE_KEY.as_bytes());
+        let dequeue_response = self
+            .storage_client
+            .clone()
+            .lock()
+            .map_err(|_| TeaclaveSchedulerError::StorageError)?
+            .dequeue(dequeue_request)?;
+        TaskState::from_slice(dequeue_response.value.as_slice())
+            .map_err(|_| TeaclaveSchedulerError::DataError.into())
+    }
+
+    fn cancel_task(&self, task_id: Uuid) -> Result<()> {
+        let ts = self.get_task_state(&task_id)?;
+        let mut task: Task<Cancel> = ts.try_into()?;
+
+        // Only TaskStatus::Running/Staged is allowed here.
+        let result_err = TaskResult::Err(TaskFailure::new("Task Canceled by the user"));
+
+        task.update_result(result_err)?;
+
+        let ts = TaskState::from(task);
+        self.put_into_db(&ts)?;
+
+        Ok(())
+    }
+
     fn get_task_state(&self, task_id: &Uuid) -> Result<TaskState> {
         let key = ExternalID::new(TaskState::key_prefix(), task_id.to_owned());
         self.get_from_db(&key)
@@ -114,12 +253,14 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
         request: Request<PublishTaskRequest>,
     ) -> TeaclaveServiceResponseResult<PublishTaskResponse> {
         // XXX: Publisher is not implemented
-        let mut task_queue = self
-            .task_queue
+
+        let mut resources = self
+            .resources
             .lock()
-            .map_err(|_| anyhow!("Cannot lock task queue"))?;
+            .map_err(|_| anyhow!("Cannot lock scheduler resources"))?;
+
         let staged_task = request.message.staged_task;
-        task_queue.push_back(staged_task);
+        resources.task_queue.push_back(staged_task);
         Ok(PublishTaskResponse {})
     }
 
@@ -132,29 +273,107 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
         unimplemented!()
     }
 
+    fn heartbeat(
+        &self,
+        request: Request<HeartbeatRequest>,
+    ) -> TeaclaveServiceResponseResult<HeartbeatResponse> {
+        let mut resources = self
+            .resources
+            .lock()
+            .map_err(|_| anyhow!("Cannot lock scheduler resources"))?;
+
+        let mut command = ExecutorCommand::NoAction;
+
+        let executor_id = request.message.executor_id;
+        let status = request.message.status;
+
+        resources.executors_status.insert(executor_id, status);
+
+        resources
+            .executors_last_heartbeat
+            .insert(executor_id, SystemTime::now());
+
+        // check if the executor need to be stopped
+        match resources.executors_tasks.get(&executor_id) {
+            Some(task_id) => match status {
+                ExecutorStatus::Executing => {
+                    if resources.tasks_to_cancel.contains(task_id) {
+                        command = ExecutorCommand::Stop;
+                        let task_id = task_id.to_owned();
+                        resources.tasks_to_cancel.remove(&task_id);
+                        log::debug!(
+                            "Sending stop command to executor {}, killing executor {} because of task cancelation",
+                            executor_id,
+                            task_id
+                        );
+                        resources.cancel_task(task_id)?;
+                        return Ok(HeartbeatResponse { command });
+                    }
+                }
+                ExecutorStatus::Idle => {
+                    resources.executors_tasks.remove(&executor_id);
+                }
+            },
+            None => {}
+        };
+
+        if resources.task_queue.len() > 0 {
+            command = ExecutorCommand::NewTask;
+        };
+
+        let response = HeartbeatResponse { command };
+        Ok(response)
+    }
+
     fn pull_task(
         &self,
-        _request: Request<PullTaskRequest>,
+        request: Request<PullTaskRequest>,
     ) -> TeaclaveServiceResponseResult<PullTaskResponse> {
-        let key = StagedTask::get_queue_key().as_bytes();
-        let staged_task = self.pull_staged_task(key)?;
-        let response = PullTaskResponse::new(staged_task);
-        Ok(response)
+        let request = request.message;
+        let mut resources = self
+            .resources
+            .lock()
+            .map_err(|_| anyhow!("Cannot lock scheduler resources"))?;
+
+        match resources.task_queue.pop_front() {
+            Some(task) => match resources.tasks_to_cancel.take(&task.task_id) {
+                Some(task_id) => {
+                    resources.cancel_task(task_id)?;
+                    Err(TeaclaveServiceResponseError::InternalError(
+                        "Task to pull has been canceled".into(),
+                    ))
+                }
+                None => {
+                    resources
+                        .executors_tasks
+                        .insert(request.executor_id, task.task_id);
+                    Ok(PullTaskResponse::new(task))
+                }
+            },
+            None => Err(TeaclaveServiceResponseError::InternalError(
+                "No staged task in task_queue".into(),
+            )),
+        }
     }
 
     fn update_task_status(
         &self,
         request: Request<UpdateTaskStatusRequest>,
     ) -> TeaclaveServiceResponseResult<UpdateTaskStatusResponse> {
+        let resources = self
+            .resources
+            .lock()
+            .map_err(|_| anyhow!("Cannot lock scheduler resources"))?;
+
         let request = request.message;
-        let ts = self.get_task_state(&request.task_id)?;
+        let ts = resources.get_task_state(&request.task_id)?;
         let task: Task<Run> = ts.try_into()?;
 
         log::debug!("UpdateTaskStatus: Task {:?}", task);
         // Only TaskStatus::Running is implicitly allowed here.
 
         let ts = TaskState::from(task);
-        self.put_into_db(&ts)?;
+        resources.put_into_db(&ts)?;
         Ok(UpdateTaskStatusResponse {})
     }
 
@@ -162,14 +381,19 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
         &self,
         request: Request<UpdateTaskResultRequest>,
     ) -> TeaclaveServiceResponseResult<UpdateTaskResultResponse> {
+        let resources = self
+            .resources
+            .lock()
+            .map_err(|_| anyhow!("Cannot lock scheduler resources"))?;
+
         let request = request.message;
-        let ts = self.get_task_state(&request.task_id)?;
+        let ts = resources.get_task_state(&request.task_id)?;
         let mut task: Task<Finish> = ts.try_into()?;
 
         if let TaskResult::Ok(outputs) = &request.task_result {
             for (key, auth_tag) in outputs.tags_map.iter() {
                 let outfile = task.update_output_cmac(key, auth_tag)?;
-                self.put_into_db(outfile)?;
+                resources.put_into_db(outfile)?;
             }
         };
 
@@ -178,7 +402,7 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
         log::debug!("UpdateTaskResult: Task {:?}", task);
 
         let ts = TaskState::from(task);
-        self.put_into_db(&ts)?;
+        resources.put_into_db(&ts)?;
         Ok(UpdateTaskResultResponse {})
     }
 }
diff --git a/services/utils/service_app_utils/src/lib.rs b/services/utils/service_app_utils/src/lib.rs
index 1295472..1475f80 100644
--- a/services/utils/service_app_utils/src/lib.rs
+++ b/services/utils/service_app_utils/src/lib.rs
@@ -23,7 +23,7 @@ use std::thread;
 use teaclave_binder::proto::{ECallCommand, StartServiceInput, StartServiceOutput};
 use teaclave_binder::TeeBinder;
 use teaclave_config::RuntimeConfig;
-use teaclave_types::TeeServiceResult;
+use teaclave_types::{TeeServiceError, TeeServiceResult};
 
 struct TeaclaveServiceLauncher {
     tee: TeeBinder,
@@ -46,6 +46,7 @@ impl TeaclaveServiceLauncher {
             .invoke::<StartServiceInput, TeeServiceResult<StartServiceOutput>>(command, input)
         {
             Err(e) => bail!("TEE invocation error: {:?}", e),
+            Ok(Err(TeeServiceError::EnclaveForceTermination)) => std::process::exit(1),
             Ok(Err(e)) => bail!("Service exit with error: {:?}", e),
             _ => Ok(String::from("Service successfully exit")),
         }
diff --git a/tests/fixtures/functions/gbdt_training/e2e_output_model.enc b/tests/fixtures/functions/gbdt_training/e2e_output_model.enc
new file mode 100644
index 0000000..18f5224
Binary files /dev/null and b/tests/fixtures/functions/gbdt_training/e2e_output_model.enc differ
diff --git a/tests/functional/enclave/src/execution_service.rs b/tests/functional/enclave/src/execution_service.rs
index 443ca9e..67af16d 100644
--- a/tests/functional/enclave/src/execution_service.rs
+++ b/tests/functional/enclave/src/execution_service.rs
@@ -53,7 +53,7 @@ fn test_execute_function() {
     let put_request = PutRequest::new(ts.key().as_slice(), ts.to_vec().unwrap().as_slice());
     let _put_response = storage_client.put(put_request).unwrap();
 
-    std::thread::sleep(std::time::Duration::from_secs(5));
+    std::thread::sleep(std::time::Duration::from_secs(15));
 
     let get_request = GetRequest::new(ts.key().as_slice());
     let get_response = storage_client.get(get_request).unwrap();
diff --git a/tests/functional/enclave/src/frontend_service.rs b/tests/functional/enclave/src/frontend_service.rs
index 9a7f6c6..0653e56 100644
--- a/tests/functional/enclave/src/frontend_service.rs
+++ b/tests/functional/enclave/src/frontend_service.rs
@@ -19,11 +19,13 @@ use crate::utils::*;
 use std::convert::TryFrom;
 use std::prelude::v1::*;
 use teaclave_proto::teaclave_common::*;
+use teaclave_proto::teaclave_common::{ExecutorCommand, ExecutorStatus};
 use teaclave_proto::teaclave_frontend_service::*;
 use teaclave_proto::teaclave_scheduler_service::*;
 use teaclave_test_utils::test_case;
 use teaclave_types::*;
 use url::Url;
+use uuid::Uuid;
 
 fn authorized_client() -> TeaclaveFrontendClient {
     let mut api_client =
@@ -343,8 +345,141 @@ fn test_invoke_task() {
     let response = client.get_task(request).unwrap();
     assert_eq!(response.status, TaskStatus::Staged);
 
-    let request = PullTaskRequest {};
     let mut scheduler_client = get_scheduler_client();
-    let response = scheduler_client.pull_task(request);
+    let executor_id = Uuid::new_v4();
+
+    std::thread::sleep(std::time::Duration::from_secs(2));
+
+    let pull_task_request = PullTaskRequest { executor_id };
+    let response = scheduler_client.pull_task(pull_task_request);
     assert!(response.is_ok());
 }
+
+#[test_case]
+fn test_cancel_task() {
+    let mut client = authorized_client();
+    let function_id =
+        ExternalID::try_from("function-00000000-0000-0000-0000-000000000002").unwrap();
+    let external_outfile_url =
+        Url::parse("https://external-storage.com/filepath?presigned_token").unwrap();
+    let external_outfile_crypto = FileCrypto::default();
+
+    let request = CreateTaskRequest::new()
+        .function_id(function_id)
+        .function_arguments(hashmap!("arg1" => "arg1_value"))
+        .executor(Executor::MesaPy)
+        .outputs_ownership(hashmap!("output" => vec!["frontend_user"]));
+    let response = client.create_task(request).unwrap();
+    let task_id = response.task_id;
+
+    let request = RegisterOutputFileRequest::new(external_outfile_url, external_outfile_crypto);
+    let response = client.register_output_file(request).unwrap();
+    let output_id = response.data_id;
+
+    let request =
+        AssignDataRequest::new(task_id.clone(), hashmap!(), hashmap!("output" => output_id));
+    client.assign_data(request).unwrap();
+
+    let request = ApproveTaskRequest::new(task_id.clone());
+    client.approve_task(request).unwrap();
+
+    let request = InvokeTaskRequest::new(task_id.clone());
+    let response = client.invoke_task(request);
+    assert!(response.is_ok());
+
+    let mut scheduler_client = get_scheduler_client();
+
+    std::thread::sleep(std::time::Duration::from_secs(5));
+
+    let executor_id = Uuid::new_v4();
+    let request = HeartbeatRequest {
+        executor_id,
+        status: ExecutorStatus::Idle,
+    };
+
+    let response = scheduler_client.heartbeat(request).unwrap();
+    assert!(response.command == ExecutorCommand::NewTask);
+
+    let request = CancelTaskRequest::new(task_id.clone());
+    let response = client.cancel_task(request);
+    assert!(response.is_ok());
+
+    std::thread::sleep(std::time::Duration::from_secs(3));
+
+    let pull_task_request = PullTaskRequest { executor_id };
+    let response = scheduler_client.pull_task(pull_task_request);
+    log::debug!("response: {:?}", response);
+
+    assert!(response.is_err());
+
+    std::thread::sleep(std::time::Duration::from_secs(3));
+
+    let request = GetTaskRequest::new(task_id);
+    let response = client.get_task(request).unwrap();
+
+    assert_eq!(response.status, TaskStatus::Canceled);
+}
+
+#[test_case]
+fn test_fail_task() {
+    let mut client = authorized_client();
+    let function_id =
+        ExternalID::try_from("function-00000000-0000-0000-0000-000000000002").unwrap();
+    let external_outfile_url =
+        Url::parse("https://external-storage.com/filepath?presigned_token").unwrap();
+    let external_outfile_crypto = FileCrypto::default();
+
+    let request = CreateTaskRequest::new()
+        .function_id(function_id)
+        .function_arguments(hashmap!("arg1" => "arg1_value"))
+        .executor(Executor::MesaPy)
+        .outputs_ownership(hashmap!("output" => vec!["frontend_user"]));
+    let response = client.create_task(request).unwrap();
+    let task_id = response.task_id;
+
+    let request = RegisterOutputFileRequest::new(external_outfile_url, external_outfile_crypto);
+    let response = client.register_output_file(request).unwrap();
+    let output_id = response.data_id;
+
+    let request =
+        AssignDataRequest::new(task_id.clone(), hashmap!(), hashmap!("output" => output_id));
+    client.assign_data(request).unwrap();
+
+    let request = ApproveTaskRequest::new(task_id.clone());
+    client.approve_task(request).unwrap();
+
+    let request = InvokeTaskRequest::new(task_id.clone());
+    let response = client.invoke_task(request);
+    assert!(response.is_ok());
+
+    let mut scheduler_client = get_scheduler_client();
+
+    std::thread::sleep(std::time::Duration::from_secs(5));
+
+    let executor_id = Uuid::new_v4();
+    let request = HeartbeatRequest {
+        executor_id,
+        status: ExecutorStatus::Idle,
+    };
+    let response = scheduler_client.heartbeat(request).unwrap();
+    assert!(response.command == ExecutorCommand::NewTask);
+
+    let pull_task_request = PullTaskRequest { executor_id };
+    let response = scheduler_client.pull_task(pull_task_request).unwrap();
+    log::debug!("response: {:?}", response);
+
+    let request = HeartbeatRequest {
+        executor_id,
+        status: ExecutorStatus::Executing,
+    };
+    let response = scheduler_client.heartbeat(request).unwrap();
+    log::debug!("response: {:?}", response);
+    assert!(response.command == ExecutorCommand::NoAction);
+
+    std::thread::sleep(std::time::Duration::from_secs(33));
+
+    let request = GetTaskRequest::new(task_id);
+    let response = client.get_task(request).unwrap();
+
+    assert_eq!(response.status, TaskStatus::Failed);
+}
diff --git a/tests/functional/enclave/src/management_service.rs b/tests/functional/enclave/src/management_service.rs
index 2ec28e5..7001006 100644
--- a/tests/functional/enclave/src/management_service.rs
+++ b/tests/functional/enclave/src/management_service.rs
@@ -23,6 +23,7 @@ use teaclave_proto::teaclave_scheduler_service::*;
 use teaclave_test_utils::test_case;
 use teaclave_types::*;
 use url::Url;
+use uuid::Uuid;
 
 fn authorized_client(user_id: &str) -> TeaclaveManagementClient {
     get_management_client(user_id)
@@ -723,8 +724,12 @@ fn test_invoke_task() {
     let response = client2.get_task(request).unwrap();
     assert_eq!(response.status, TaskStatus::Staged);
 
-    let request = PullTaskRequest {};
     let mut scheduler_client = get_scheduler_client();
-    let response = scheduler_client.pull_task(request);
+    let executor_id = Uuid::new_v4();
+
+    std::thread::sleep(std::time::Duration::from_secs(2));
+
+    let pull_task_request = PullTaskRequest { executor_id };
+    let response = scheduler_client.pull_task(pull_task_request);
     assert!(response.is_ok());
 }
diff --git a/tests/functional/enclave/src/scheduler_service.rs b/tests/functional/enclave/src/scheduler_service.rs
index a96a08f..568928f 100644
--- a/tests/functional/enclave/src/scheduler_service.rs
+++ b/tests/functional/enclave/src/scheduler_service.rs
@@ -17,6 +17,7 @@
 
 use crate::utils::*;
 use std::prelude::v1::*;
+
 use teaclave_proto::teaclave_scheduler_service::*;
 use teaclave_proto::teaclave_storage_service::*;
 use teaclave_test_utils::test_case;
@@ -42,11 +43,23 @@ fn test_pull_task() {
     let _enqueue_response = storage_client.enqueue(enqueue_request).unwrap();
 
     let mut client = get_scheduler_client();
-    let request = PullTaskRequest {};
-    let response = client.pull_task(request);
+    let executor_id = Uuid::new_v4();
+
+    std::thread::sleep(std::time::Duration::from_secs(2));
+
+    let pull_task_request = PullTaskRequest { executor_id };
+    let response = client.pull_task(pull_task_request);
     log::debug!("response: {:?}", response);
+
     assert!(response.is_ok());
-    assert_eq!(response.unwrap().staged_task.function_id, function_id);
+
+    let response = response.unwrap();
+    log::info!(
+        "pulled staged_task function_id: {:?}",
+        response.staged_task.function_id
+    );
+
+    assert_eq!(response.staged_task.function_id, function_id);
 }
 
 #[test_case]
@@ -77,8 +90,13 @@ fn test_update_task_status_result() {
     let _put_response = storage_client.put(put_request).unwrap();
 
     let mut client = get_scheduler_client();
-    let request = PullTaskRequest {};
-    let response = client.pull_task(request).unwrap();
+
+    let executor_id = Uuid::new_v4();
+
+    std::thread::sleep(std::time::Duration::from_secs(2));
+
+    let pull_task_request = PullTaskRequest { executor_id };
+    let response = client.pull_task(pull_task_request).unwrap();
     log::debug!("response: {:?}", response);
     let task_id = response.staged_task.task_id;
 
diff --git a/types/src/error.rs b/types/src/error.rs
index f3b941d..c4bd12d 100644
--- a/types/src/error.rs
+++ b/types/src/error.rs
@@ -65,6 +65,8 @@ pub enum TeeServiceError {
     ServiceError,
     #[error("CommandNotRegistered")]
     CommandNotRegistered,
+    #[error("EnclaveForceTermination")]
+    EnclaveForceTermination,
 }
 
 pub type TeeServiceResult<T> = std::result::Result<T, TeeServiceError>;
diff --git a/types/src/storage.rs b/types/src/storage.rs
index 44be773..5fc2325 100644
--- a/types/src/storage.rs
+++ b/types/src/storage.rs
@@ -20,6 +20,8 @@ use serde::{Deserialize, Serialize};
 use std::prelude::v1::*;
 use uuid::Uuid;
 
+pub const CANCEL_QUEUE_KEY: &str = "cancel_queue";
+
 pub trait Storable: Serialize + for<'de> Deserialize<'de> {
     fn key_prefix() -> &'static str;
 
diff --git a/types/src/task.rs b/types/src/task.rs
index b5cb135..320d3d2 100644
--- a/types/src/task.rs
+++ b/types/src/task.rs
@@ -142,6 +142,8 @@ pub enum TaskStatus {
     Staged,
     Running,
     Finished,
+    Canceled,
+    Failed,
 }
 
 impl Default for TaskStatus {
diff --git a/types/src/task_state.rs b/types/src/task_state.rs
index c7e0ea4..da0ece6 100644
--- a/types/src/task_state.rs
+++ b/types/src/task_state.rs
@@ -81,6 +81,13 @@ impl TaskState {
     pub fn has_creator(&self, user_id: &UserID) -> bool {
         &self.creator == user_id
     }
+
+    pub fn is_ended(&self) -> bool {
+        match self.status {
+            TaskStatus::Finished | TaskStatus::Failed | TaskStatus::Canceled => true,
+            _ => false,
+        }
+    }
 }
 
 #[derive(Debug, Clone, Default, Deserialize, Serialize)]
@@ -97,6 +104,8 @@ impl StateTag for Stage {}
 impl StateTag for Run {}
 impl StateTag for Finish {}
 impl StateTag for Done {}
+impl StateTag for Cancel {}
+impl StateTag for Fail {}
 
 impl Task<Create> {
     pub fn new(
@@ -298,6 +307,50 @@ impl Task<Done> {
     }
 }
 
+impl Task<Fail> {
+    pub fn new(ts: TaskState) -> Result<Self> {
+        let task = Task::<Fail> {
+            state: ts,
+            extra: Fail,
+        };
+        Ok(task)
+    }
+
+    pub fn update_result(&mut self, result: TaskResult) -> Result<()> {
+        match &result {
+            TaskResult::Err(_) => {
+                self.state.result = result;
+                Ok(())
+            }
+            _ => Err(Error::msg(
+                "TaskResult::Err(TaskFailure) is expected for failed task",
+            )),
+        }
+    }
+}
+
+impl Task<Cancel> {
+    pub fn new(ts: TaskState) -> Result<Self> {
+        let task = Task::<Cancel> {
+            state: ts,
+            extra: Cancel,
+        };
+        Ok(task)
+    }
+
+    pub fn update_result(&mut self, result: TaskResult) -> Result<()> {
+        match &result {
+            TaskResult::Err(_) => {
+                self.state.result = result;
+                Ok(())
+            }
+            _ => Err(Error::msg(
+                "TaskResult::Err(TaskFailure) is expected for canceled task",
+            )),
+        }
+    }
+}
+
 trait TryTransitionTo<T>: Sized {
     type Error;
     fn try_transition_to(self) -> std::result::Result<T, Error>;
@@ -418,6 +471,34 @@ impl std::convert::TryFrom<TaskState> for Task<Finish> {
     }
 }
 
+impl std::convert::TryFrom<TaskState> for Task<Fail> {
+    type Error = Error;
+
+    fn try_from(ts: TaskState) -> Result<Self> {
+        let task = match ts.status {
+            TaskStatus::Running | TaskStatus::Staged => Task::<Fail>::new(ts)?,
+            _ => bail!("Cannot restore to Fail from saved state"),
+        };
+        Ok(task)
+    }
+}
+
+impl std::convert::TryFrom<TaskState> for Task<Cancel> {
+    type Error = Error;
+
+    fn try_from(ts: TaskState) -> Result<Self> {
+        let task = match ts.status {
+            TaskStatus::Running
+            | TaskStatus::Staged
+            | TaskStatus::Approved
+            | TaskStatus::Created
+            | TaskStatus::DataAssigned => Task::<Cancel>::new(ts)?,
+            _ => bail!("Cannot restore to Cancel from saved state"),
+        };
+        Ok(task)
+    }
+}
+
 impl std::convert::From<Task<Create>> for TaskState {
     fn from(mut task: Task<Create>) -> TaskState {
         task.state.status = TaskStatus::Created;
@@ -425,6 +506,20 @@ impl std::convert::From<Task<Create>> for TaskState {
     }
 }
 
+impl std::convert::From<Task<Fail>> for TaskState {
+    fn from(mut task: Task<Fail>) -> TaskState {
+        task.state.status = TaskStatus::Failed;
+        task.state
+    }
+}
+
+impl std::convert::From<Task<Cancel>> for TaskState {
+    fn from(mut task: Task<Cancel>) -> TaskState {
+        task.state.status = TaskStatus::Canceled;
+        task.state
+    }
+}
+
 impl_transit_and_into_task_state!(Assign => Approve);
 impl_transit_and_into_task_state!(Approve => Stage);
 impl_transit_and_into_task_state!(Stage => Run);
@@ -463,6 +558,10 @@ pub struct Run;
 pub struct Finish;
 #[derive(Debug, Default, Clone, Deserialize, Serialize)]
 pub struct Done;
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Cancel;
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Fail;
 
 impl std::convert::From<Create> for TaskStatus {
     fn from(_tag: Create) -> TaskStatus {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@teaclave.apache.org
For additional commands, e-mail: commits-help@teaclave.apache.org