You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@teaclave.apache.org by ms...@apache.org on 2020/05/13 17:24:42 UTC

[incubator-teaclave] branch master updated: [task] Bind Task state to static type (#293)

This is an automated email from the ASF dual-hosted git repository.

mssun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git


The following commit(s) were added to refs/heads/master by this push:
     new 756e21b  [task] Bind Task state to static type (#293)
756e21b is described below

commit 756e21b2d9c3bcb4ac2b2486cc55696b5a1abe1e
Author: Zhaofeng Chen <zf...@apache.org>
AuthorDate: Wed May 13 10:24:34 2020 -0700

    [task] Bind Task state to static type (#293)
---
 services/management/enclave/src/service.rs         | 106 +++--
 services/scheduler/enclave/src/service.rs          |  32 +-
 .../enclave/src/end_to_end/builtin_echo.rs         |   5 +-
 tests/functional/enclave/src/execution_service.rs  |   8 +-
 tests/functional/enclave/src/scheduler_service.rs  |  13 +-
 types/src/lib.rs                                   |   2 +
 types/src/task.rs                                  | 255 +----------
 types/src/task_state.rs                            | 499 +++++++++++++++++++++
 8 files changed, 594 insertions(+), 326 deletions(-)

diff --git a/services/management/enclave/src/service.rs b/services/management/enclave/src/service.rs
index 1ee15b6..35311a6 100644
--- a/services/management/enclave/src/service.rs
+++ b/services/management/enclave/src/service.rs
@@ -17,6 +17,7 @@
 
 use anyhow::{anyhow, Result};
 use std::collections::HashMap;
+use std::convert::TryInto;
 use std::prelude::v1::*;
 use std::sync::{Arc, SgxMutex as Mutex};
 use teaclave_proto::teaclave_frontend_service::{
@@ -36,11 +37,7 @@ use teaclave_proto::teaclave_storage_service::{
 use teaclave_rpc::endpoint::Endpoint;
 use teaclave_rpc::Request;
 use teaclave_service_enclave_utils::{ensure, teaclave_service};
-use teaclave_types::{
-    ExternalID, FileCrypto, Function, OwnerList, StagedTask, Storable, Task, TaskStatus,
-    TeaclaveInputFile, TeaclaveOutputFile, TeaclaveServiceResponseError,
-    TeaclaveServiceResponseResult, UserID,
-};
+use teaclave_types::*;
 use thiserror::Error;
 use url::Url;
 use uuid::Uuid;
@@ -266,7 +263,7 @@ impl TeaclaveManagement for TeaclaveManagementService {
             .read_from_db(&request.function_id)
             .map_err(|_| ServiceError::PermissionDenied)?;
 
-        let task = Task::new(
+        let task = Task::<Create>::new(
             user_id,
             request.executor,
             request.function_arguments,
@@ -278,10 +275,12 @@ impl TeaclaveManagement for TeaclaveManagementService {
 
         log::info!("CreateTask: {:?}", task);
 
-        self.write_to_db(&task)
+        let ts: TaskState = task.into();
+        self.write_to_db(&ts)
             .map_err(|_| ServiceError::StorageError)?;
 
-        Ok(CreateTaskResponse::new(task.external_id()))
+        let response = CreateTaskResponse::new(ts.external_id());
+        Ok(response)
     }
 
     // access control: task.participants.contains(&user_id)
@@ -291,31 +290,28 @@ impl TeaclaveManagement for TeaclaveManagementService {
     ) -> TeaclaveServiceResponseResult<GetTaskResponse> {
         let user_id = self.get_request_user_id(request.metadata())?;
 
-        let task: Task = self
+        let ts: TaskState = self
             .read_from_db(&request.message.task_id)
             .map_err(|_| ServiceError::PermissionDenied)?;
 
-        ensure!(
-            task.participants.contains(&user_id),
-            ServiceError::PermissionDenied
-        );
+        ensure!(ts.has_participant(&user_id), ServiceError::PermissionDenied);
 
-        log::info!("GetTask: {:?}", task);
+        log::info!("GetTask: {:?}", ts);
 
         let response = GetTaskResponse {
-            task_id: task.external_id(),
-            creator: task.creator,
-            function_id: task.function_id,
-            function_owner: task.function_owner,
-            function_arguments: task.function_arguments,
-            inputs_ownership: task.inputs_ownership,
-            outputs_ownership: task.outputs_ownership,
-            participants: task.participants,
-            approved_users: task.approved_users,
-            assigned_inputs: task.assigned_inputs.external_ids(),
-            assigned_outputs: task.assigned_outputs.external_ids(),
-            result: task.result,
-            status: task.status,
+            task_id: ts.external_id(),
+            creator: ts.creator,
+            function_id: ts.function_id,
+            function_owner: ts.function_owner,
+            function_arguments: ts.function_arguments,
+            inputs_ownership: ts.inputs_ownership,
+            outputs_ownership: ts.outputs_ownership,
+            participants: ts.participants,
+            approved_users: ts.approved_users,
+            assigned_inputs: ts.assigned_inputs.external_ids(),
+            assigned_outputs: ts.assigned_outputs.external_ids(),
+            result: ts.result,
+            status: ts.status,
         };
         Ok(response)
     }
@@ -338,14 +334,16 @@ impl TeaclaveManagement for TeaclaveManagementService {
 
         let request = request.message;
 
-        let mut task: Task = self
+        let ts: TaskState = self
             .read_from_db(&request.task_id)
             .map_err(|_| ServiceError::PermissionDenied)?;
 
-        ensure!(
-            task.participants.contains(&user_id),
+        ensure!(ts.has_participant(&user_id), ServiceError::PermissionDenied);
+
+        let mut task: Task<Assign> = ts.try_into().map_err(|e| {
+            log::warn!("Assign state error: {:?}", e);
             ServiceError::PermissionDenied
-        );
+        })?;
 
         for (data_name, data_id) in request.inputs.iter() {
             let file: TeaclaveInputFile = self
@@ -365,7 +363,8 @@ impl TeaclaveManagement for TeaclaveManagementService {
 
         log::info!("AssignData: {:?}", task);
 
-        self.write_to_db(&task)
+        let ts: TaskState = task.into();
+        self.write_to_db(&ts)
             .map_err(|_| ServiceError::StorageError)?;
 
         Ok(AssignDataResponse)
@@ -381,16 +380,22 @@ impl TeaclaveManagement for TeaclaveManagementService {
         let user_id = self.get_request_user_id(request.metadata())?;
 
         let request = request.message;
-        let mut task: Task = self
+        let ts: TaskState = self
             .read_from_db(&request.task_id)
             .map_err(|_| ServiceError::PermissionDenied)?;
 
+        let mut task: Task<Approve> = ts.try_into().map_err(|e| {
+            log::warn!("Approve state error: {:?}", e);
+            ServiceError::PermissionDenied
+        })?;
+
         task.approve(&user_id)
             .map_err(|_| ServiceError::PermissionDenied)?;
 
         log::info!("ApproveTask: approve:{:?}", task);
 
-        self.write_to_db(&task)
+        let ts: TaskState = task.into();
+        self.write_to_db(&ts)
             .map_err(|_| ServiceError::StorageError)?;
 
         Ok(ApproveTaskResponse)
@@ -406,32 +411,36 @@ impl TeaclaveManagement for TeaclaveManagementService {
         let user_id = self.get_request_user_id(request.metadata())?;
         let request = request.message;
 
-        let mut task: Task = self
+        let ts: TaskState = self
             .read_from_db(&request.task_id)
             .map_err(|_| ServiceError::PermissionDenied)?;
 
-        log::info!("InvokeTask: get task: {:?}", task);
-
         // Early validation
-        ensure!(task.creator == user_id, ServiceError::PermissionDenied);
-        ensure!(
-            task.status == TaskStatus::Approved,
-            ServiceError::PermissionDenied
-        );
+        ensure!(ts.has_creator(&user_id), ServiceError::PermissionDenied);
 
         let function: Function = self
-            .read_from_db(&task.function_id)
+            .read_from_db(&ts.function_id)
             .map_err(|_| ServiceError::PermissionDenied)?;
 
         log::info!("InvokeTask: get function: {:?}", function);
 
+        let mut task: Task<Stage> = ts.try_into().map_err(|e| {
+            log::warn!("Stage state error: {:?}", e);
+            ServiceError::PermissionDenied
+        })?;
+
+        log::info!("InvokeTask: get task: {:?}", task);
+
         let staged_task = task.stage_for_running(&user_id, function)?;
 
         log::info!("InvokeTask: staged task: {:?}", staged_task);
 
         self.enqueue_to_db(StagedTask::get_queue_key().as_bytes(), &staged_task)?;
-        self.write_to_db(&task)
+
+        let ts: TaskState = task.into();
+        self.write_to_db(&ts)
             .map_err(|_| ServiceError::StorageError)?;
+
         Ok(InvokeTaskResponse)
     }
 }
@@ -516,7 +525,6 @@ impl TeaclaveManagementService {
 
     #[cfg(test_mode)]
     fn add_mock_data(&self) -> Result<()> {
-        use teaclave_types::{FileAuthTag, FunctionInput, FunctionOutput};
         let mut output_file = self.create_fusion_data(vec!["mock_user1", "frontend_user"])?;
         output_file.uuid = Uuid::parse_str("00000000-0000-0000-0000-000000000001")?;
         output_file.cmac = Some(FileAuthTag::mock());
@@ -625,7 +633,7 @@ pub mod tests {
             .owner("mock_user");
         let function_arguments = FunctionArguments::new(hashmap!("arg" => "data"));
 
-        let task = Task::new(
+        let task = Task::<Create>::new(
             UserID::from("mock_user"),
             Executor::MesaPy,
             function_arguments,
@@ -635,9 +643,9 @@ pub mod tests {
         )
         .unwrap();
 
-        assert!(Task::match_prefix(&task.key_string()));
-        let value = task.to_vec().unwrap();
-        let deserialized_task = Task::from_slice(&value).unwrap();
+        let ts: TaskState = task.try_into().unwrap();
+        let value = ts.to_vec().unwrap();
+        let deserialized_task = TaskState::from_slice(&value).unwrap();
         info!("task: {:?}", deserialized_task);
     }
 
diff --git a/services/scheduler/enclave/src/service.rs b/services/scheduler/enclave/src/service.rs
index 01a88c9..fe03eb7 100644
--- a/services/scheduler/enclave/src/service.rs
+++ b/services/scheduler/enclave/src/service.rs
@@ -19,6 +19,7 @@
 #![allow(unused_variables)]
 
 use std::collections::VecDeque;
+use std::convert::TryInto;
 #[cfg(feature = "mesalock_sgx")]
 use std::prelude::v1::*;
 use std::sync::{Arc, SgxMutex as Mutex};
@@ -29,10 +30,7 @@ use teaclave_proto::teaclave_storage_service::*;
 use teaclave_rpc::endpoint::Endpoint;
 use teaclave_rpc::Request;
 use teaclave_service_enclave_utils::teaclave_service;
-use teaclave_types::{
-    ExternalID, OutputsTags, StagedTask, Storable, Task, TaskFiles, TaskResult, TaskStatus,
-    TeaclaveOutputFile, TeaclaveServiceResponseError, TeaclaveServiceResponseResult,
-};
+use teaclave_types::*;
 use uuid::Uuid;
 
 use anyhow::anyhow;
@@ -98,8 +96,8 @@ impl TeaclaveSchedulerService {
             .map_err(|_| TeaclaveSchedulerError::DataError.into())
     }
 
-    fn get_task(&self, task_id: &Uuid) -> Result<Task> {
-        let key = ExternalID::new(Task::key_prefix(), task_id.to_owned());
+    fn get_task_state(&self, task_id: &Uuid) -> Result<TaskState> {
+        let key = ExternalID::new(TaskState::key_prefix(), task_id.to_owned());
         self.get_from_db(&key)
     }
 
@@ -169,13 +167,14 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
         request: Request<UpdateTaskStatusRequest>,
     ) -> TeaclaveServiceResponseResult<UpdateTaskStatusResponse> {
         let request = request.message;
-        let mut task = self.get_task(&request.task_id)?;
-
-        // Only TaskStatus::Running is allowed here so far.
-        task.invoking_by_executor()?;
+        let ts = self.get_task_state(&request.task_id)?;
+        let task: Task<Run> = ts.try_into()?;
 
         log::info!("UpdateTaskStatus: Task {:?}", task);
-        self.put_into_db(&task)?;
+        // Only TaskStatus::Running is implicitly allowed here.
+
+        let ts = TaskState::from(task);
+        self.put_into_db(&ts)?;
         Ok(UpdateTaskStatusResponse {})
     }
 
@@ -184,19 +183,22 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
         request: Request<UpdateTaskResultRequest>,
     ) -> TeaclaveServiceResponseResult<UpdateTaskResultResponse> {
         let request = request.message;
-        let mut task = self.get_task(&request.task_id)?;
+        let ts = self.get_task_state(&request.task_id)?;
+        let mut task: Task<Finish> = ts.try_into()?;
 
         if let TaskResult::Ok(outputs) = &request.task_result {
             for (key, auth_tag) in outputs.tags_map.iter() {
-                let outfile = task.assigned_outputs.update_cmac(key, auth_tag)?;
+                let outfile = task.update_output_cmac(key, auth_tag)?;
                 self.put_into_db(outfile)?;
             }
         };
 
         // Updating task result means we have finished execution
-        task.finish(request.task_result)?;
+        task.update_result(request.task_result)?;
+        log::info!("UpdateTaskResult: Task {:?}", task);
 
-        self.put_into_db(&task)?;
+        let ts = TaskState::from(task);
+        self.put_into_db(&ts)?;
         Ok(UpdateTaskResultResponse {})
     }
 }
diff --git a/tests/functional/enclave/src/end_to_end/builtin_echo.rs b/tests/functional/enclave/src/end_to_end/builtin_echo.rs
index 5d39ba3..890fec0 100644
--- a/tests/functional/enclave/src/end_to_end/builtin_echo.rs
+++ b/tests/functional/enclave/src/end_to_end/builtin_echo.rs
@@ -51,10 +51,11 @@ pub fn test_echo_task_success() {
     let task_id = response.task_id;
 
     // Assign Data To Task
-    // This task does not have any input/output files, we can skip the assignment process.
+    // This task does not have any input/output files, we can opt to skip the assignment process.
 
     // Approve Task
-    approve_task(&mut client, &task_id).unwrap();
+    // This task is a single user task, we can opt to skip the approvement process.
+    // approve_task(&mut client, &task_id).unwrap();
 
     // Invoke Task
     invoke_task(&mut client, &task_id).unwrap();
diff --git a/tests/functional/enclave/src/execution_service.rs b/tests/functional/enclave/src/execution_service.rs
index 2412197..19dfa45 100644
--- a/tests/functional/enclave/src/execution_service.rs
+++ b/tests/functional/enclave/src/execution_service.rs
@@ -26,7 +26,7 @@ use uuid::Uuid;
 #[test_case]
 fn test_execute_function() {
     let task_id = Uuid::new_v4();
-    let task = Task {
+    let ts = TaskState {
         task_id,
         status: TaskStatus::Staged,
         ..Default::default()
@@ -49,14 +49,14 @@ fn test_execute_function() {
         staged_task.to_vec().unwrap(),
     );
     let _enqueue_response = storage_client.enqueue(enqueue_request).unwrap();
-    let put_request = PutRequest::new(task.key().as_slice(), task.to_vec().unwrap().as_slice());
+    let put_request = PutRequest::new(ts.key().as_slice(), ts.to_vec().unwrap().as_slice());
     let _put_response = storage_client.put(put_request).unwrap();
 
     std::thread::sleep(std::time::Duration::from_secs(5));
 
-    let get_request = GetRequest::new(task.key().as_slice());
+    let get_request = GetRequest::new(ts.key().as_slice());
     let get_response = storage_client.get(get_request).unwrap();
-    let updated_task = Task::from_slice(get_response.value.as_slice()).unwrap();
+    let updated_task = TaskState::from_slice(get_response.value.as_slice()).unwrap();
     assert_eq!(
         updated_task.result.unwrap().return_value,
         b"Hello, Teaclave Tests!"
diff --git a/tests/functional/enclave/src/scheduler_service.rs b/tests/functional/enclave/src/scheduler_service.rs
index 226f273..70d56b4 100644
--- a/tests/functional/enclave/src/scheduler_service.rs
+++ b/tests/functional/enclave/src/scheduler_service.rs
@@ -51,12 +51,6 @@ fn test_pull_task() {
 #[test_case]
 fn test_update_task_status_result() {
     let task_id = Uuid::new_v4();
-    let task = Task {
-        task_id,
-        status: TaskStatus::Staged,
-        ..Default::default()
-    };
-
     let function_id = Uuid::new_v4();
 
     let staged_task = StagedTask::new()
@@ -72,7 +66,12 @@ fn test_update_task_status_result() {
     );
     let _enqueue_response = storage_client.enqueue(enqueue_request).unwrap();
 
-    let put_request = PutRequest::new(task.key().as_slice(), task.to_vec().unwrap().as_slice());
+    let ts = TaskState {
+        task_id,
+        status: TaskStatus::Staged,
+        ..Default::default()
+    };
+    let put_request = PutRequest::new(ts.key().as_slice(), ts.to_vec().unwrap().as_slice());
     let _put_response = storage_client.put(put_request).unwrap();
 
     let mut client = get_scheduler_client();
diff --git a/types/src/lib.rs b/types/src/lib.rs
index 4b404e6..7045d4a 100644
--- a/types/src/lib.rs
+++ b/types/src/lib.rs
@@ -49,6 +49,8 @@ mod storage;
 pub use storage::Storable;
 mod task;
 pub use task::*;
+mod task_state;
+pub use task_state::*;
 mod file_agent;
 pub use file_agent::*;
 mod macros;
diff --git a/types/src/task.rs b/types/src/task.rs
index eb24b5c..99a6a77 100644
--- a/types/src/task.rs
+++ b/types/src/task.rs
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::FunctionArguments;
-use crate::Storable;
 use crate::*;
 use anyhow::{anyhow, bail, ensure, Error, Result};
 use serde::{Deserialize, Serialize};
@@ -124,7 +122,7 @@ impl IntoIterator for OwnerList {
     }
 }
 
-#[derive(Debug, Deserialize, Serialize, std::cmp::PartialEq)]
+#[derive(Clone, Debug, Deserialize, Serialize, std::cmp::PartialEq)]
 pub enum TaskStatus {
     Created,
     DataAssigned,
@@ -140,7 +138,7 @@ impl Default for TaskStatus {
     }
 }
 
-#[derive(Debug, Default, Deserialize, Serialize)]
+#[derive(Debug, Clone, Default, Deserialize, Serialize)]
 pub struct OutputsTags {
     inner: HashMap<String, FileAuthTag>,
 }
@@ -197,7 +195,7 @@ impl std::iter::FromIterator<(String, FileAuthTag)> for OutputsTags {
     }
 }
 
-#[derive(Debug, Deserialize, Serialize)]
+#[derive(Debug, Clone, Deserialize, Serialize)]
 pub struct TaskOutputs {
     pub return_value: Vec<u8>,
     pub tags_map: OutputsTags,
@@ -212,7 +210,7 @@ impl TaskOutputs {
     }
 }
 
-#[derive(Debug, Deserialize, Serialize)]
+#[derive(Debug, Clone, Deserialize, Serialize)]
 pub struct TaskFailure {
     pub reason: String,
 }
@@ -285,7 +283,7 @@ impl std::convert::TryFrom<String> for ExternalID {
     }
 }
 
-#[derive(Debug, Deserialize, Serialize)]
+#[derive(Debug, Clone, Deserialize, Serialize)]
 pub enum TaskResult {
     NotReady,
     Ok(TaskOutputs),
@@ -350,7 +348,7 @@ where
     }
 }
 
-#[derive(Debug, Default, Deserialize, Serialize)]
+#[derive(Debug, Clone, Default, Deserialize, Serialize)]
 pub struct TaskFileOwners {
     inner: HashMap<String, OwnerList>,
 }
@@ -496,244 +494,3 @@ impl std::convert::From<TaskFiles<TeaclaveOutputFile>> for FunctionOutputFiles {
         files.into_iter().collect()
     }
 }
-
-const TASK_PREFIX: &str = "task";
-
-#[derive(Debug, Default, Deserialize, Serialize)]
-pub struct Task {
-    pub task_id: Uuid,
-    pub creator: UserID,
-    pub function_id: ExternalID,
-    pub function_arguments: FunctionArguments,
-    pub executor: Executor,
-    pub inputs_ownership: TaskFileOwners,
-    pub outputs_ownership: TaskFileOwners,
-    pub function_owner: UserID,
-    pub participants: UserList,
-    pub approved_users: UserList,
-    pub assigned_inputs: TaskFiles<TeaclaveInputFile>,
-    pub assigned_outputs: TaskFiles<TeaclaveOutputFile>,
-    pub result: TaskResult,
-    pub status: TaskStatus,
-}
-
-impl Storable for Task {
-    fn key_prefix() -> &'static str {
-        TASK_PREFIX
-    }
-
-    fn uuid(&self) -> Uuid {
-        self.task_id
-    }
-}
-
-impl Task {
-    pub fn new(
-        requester: UserID,
-        req_executor: Executor,
-        req_func_args: FunctionArguments,
-        req_input_owners: impl Into<TaskFileOwners>,
-        req_output_owners: impl Into<TaskFileOwners>,
-        function: Function,
-    ) -> Result<Self> {
-        let req_input_owners = req_input_owners.into();
-        let req_output_owners = req_output_owners.into();
-
-        // gather all participants
-        let input_owners = req_input_owners.all_owners();
-        let output_owners = req_output_owners.all_owners();
-        let mut participants = UserList::unions(vec![input_owners, output_owners]);
-        participants.insert(requester.clone());
-        if !function.public {
-            participants.insert(function.owner.clone());
-        }
-
-        //check function compatibility
-        let fn_args_spec: HashSet<&String> = function.arguments.iter().collect();
-        let req_args: HashSet<&String> = req_func_args.inner().keys().collect();
-        ensure!(fn_args_spec == req_args, "function_arguments mismatch");
-
-        // check input fkeys
-        let inputs_spec: HashSet<&String> = function.inputs.iter().map(|f| &f.name).collect();
-        let req_input_fkeys: HashSet<&String> = req_input_owners.keys().collect();
-        ensure!(inputs_spec == req_input_fkeys, "input keys mismatch");
-
-        // check output fkeys
-        let outputs_spec: HashSet<&String> = function.outputs.iter().map(|f| &f.name).collect();
-        let req_output_fkeys: HashSet<&String> = req_output_owners.keys().collect();
-        ensure!(outputs_spec == req_output_fkeys, "output keys mismatch");
-
-        // Skip the assignment if no file is required
-        let status = if req_input_owners.is_empty() && req_output_owners.is_empty() {
-            TaskStatus::DataAssigned
-        } else {
-            TaskStatus::Created
-        };
-
-        let task = Task {
-            task_id: Uuid::new_v4(),
-            creator: requester,
-            executor: req_executor,
-            function_id: function.external_id(),
-            function_owner: function.owner.clone(),
-            function_arguments: req_func_args,
-            inputs_ownership: req_input_owners,
-            outputs_ownership: req_output_owners,
-            participants,
-            status,
-            ..Default::default()
-        };
-
-        Ok(task)
-    }
-
-    pub fn approve(&mut self, requester: &UserID) -> Result<()> {
-        ensure!(
-            self.status == TaskStatus::DataAssigned,
-            "Unexpected task status when approving: {:?}",
-            self.status
-        );
-
-        ensure!(
-            self.participants.contains(requester),
-            "Unexpected user trying to approve a task: {:?}",
-            requester
-        );
-
-        self.approved_users.insert(requester.clone());
-        if self.participants == self.approved_users {
-            self.update_status(TaskStatus::Approved);
-        }
-
-        Ok(())
-    }
-
-    pub fn invoking_by_executor(&mut self) -> Result<()> {
-        ensure!(
-            self.status == TaskStatus::Staged,
-            "Unexpected task status when invoked: {:?}",
-            self.status
-        );
-        self.status = TaskStatus::Running;
-        Ok(())
-    }
-
-    pub fn finish(&mut self, result: TaskResult) -> Result<()> {
-        ensure!(
-            self.status == TaskStatus::Running,
-            "Unexpected task status when invoked: {:?}",
-            self.status
-        );
-        self.result = result;
-        self.status = TaskStatus::Finished;
-        Ok(())
-    }
-
-    pub fn stage_for_running(
-        &mut self,
-        requester: &UserID,
-        function: Function,
-    ) -> Result<StagedTask> {
-        ensure!(
-            &self.creator == requester,
-            "Unexpected user trying to invoke a task: {:?}",
-            requester
-        );
-        ensure!(
-            self.status == TaskStatus::Approved,
-            "Unexpected task status when invoked: {:?}",
-            self.status
-        );
-        let function_arguments = self.function_arguments.clone();
-        let staged_task = StagedTask {
-            task_id: self.task_id,
-            executor: self.executor,
-            executor_type: function.executor_type,
-            function_id: function.id,
-            function_name: function.name,
-            function_payload: function.payload,
-            function_arguments,
-            input_data: self.assigned_inputs.clone().into(),
-            output_data: self.assigned_outputs.clone().into(),
-        };
-
-        self.update_status(TaskStatus::Staged);
-        Ok(staged_task)
-    }
-
-    pub fn assign_input(
-        &mut self,
-        requester: &UserID,
-        fname: &str,
-        file: TeaclaveInputFile,
-    ) -> Result<()> {
-        ensure!(
-            self.status == TaskStatus::Created,
-            "Unexpected task status during input assignment: {:?}",
-            self.status
-        );
-
-        ensure!(
-            file.owner.contains(requester),
-            "Assign: requester is not in the owner list. {:?}.",
-            file.external_id()
-        );
-
-        self.inputs_ownership.check(fname, &file.owner)?;
-
-        self.assigned_inputs.assign(fname, file)?;
-
-        if self.all_data_assigned() {
-            self.update_status(TaskStatus::DataAssigned);
-        }
-        Ok(())
-    }
-
-    pub fn assign_output(
-        &mut self,
-        requester: &UserID,
-        fname: &str,
-        file: TeaclaveOutputFile,
-    ) -> Result<()> {
-        ensure!(
-            self.status == TaskStatus::Created,
-            "Unexpected task status during output assignment: {:?}",
-            self.status
-        );
-
-        ensure!(
-            file.owner.contains(requester),
-            "Assign: requester is not in the owner list. {:?}.",
-            file.external_id()
-        );
-
-        self.outputs_ownership.check(fname, &file.owner)?;
-
-        self.assigned_outputs.assign(fname, file)?;
-
-        if self.all_data_assigned() {
-            self.update_status(TaskStatus::DataAssigned);
-        }
-        Ok(())
-    }
-
-    fn update_status(&mut self, status: TaskStatus) {
-        self.status = status;
-    }
-
-    fn all_data_assigned(&self) -> bool {
-        let input_args: HashSet<&String> = self.inputs_ownership.keys().collect();
-        let assiged_inputs: HashSet<&String> = self.assigned_inputs.keys().collect();
-        if input_args != assiged_inputs {
-            return false;
-        }
-
-        let output_args: HashSet<&String> = self.outputs_ownership.keys().collect();
-        let assiged_outputs: HashSet<&String> = self.assigned_outputs.keys().collect();
-        if output_args != assiged_outputs {
-            return false;
-        }
-
-        true
-    }
-}
diff --git a/types/src/task_state.rs b/types/src/task_state.rs
new file mode 100644
index 0000000..76353e8
--- /dev/null
+++ b/types/src/task_state.rs
@@ -0,0 +1,499 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::*;
+use anyhow::{bail, ensure, Error, Result};
+use serde::{Deserialize, Serialize};
+use std::collections::HashSet;
+use std::convert::TryInto;
+use uuid::Uuid;
+
+const TASK_PREFIX: &str = "task";
+
+#[derive(Debug, Clone, Default, Deserialize, Serialize)]
+pub struct TaskState {
+    pub task_id: Uuid,
+    pub creator: UserID,
+    pub function_id: ExternalID,
+    pub function_arguments: FunctionArguments,
+    pub executor: Executor,
+    pub inputs_ownership: TaskFileOwners,
+    pub outputs_ownership: TaskFileOwners,
+    pub function_owner: UserID,
+    pub participants: UserList,
+    pub approved_users: UserList,
+    pub assigned_inputs: TaskFiles<TeaclaveInputFile>,
+    pub assigned_outputs: TaskFiles<TeaclaveOutputFile>,
+    pub result: TaskResult,
+    pub status: TaskStatus,
+}
+
+impl Storable for TaskState {
+    fn key_prefix() -> &'static str {
+        TASK_PREFIX
+    }
+
+    fn uuid(&self) -> Uuid {
+        self.task_id
+    }
+}
+
+impl TaskState {
+    pub fn everyone_approved(&self) -> bool {
+        // Single user task is by default approved by the creator
+        (self.participants.len() == 1) || (self.participants == self.approved_users)
+    }
+
+    pub fn all_data_assigned(&self) -> bool {
+        let input_args: HashSet<&String> = self.inputs_ownership.keys().collect();
+        let assiged_inputs: HashSet<&String> = self.assigned_inputs.keys().collect();
+        if input_args != assiged_inputs {
+            return false;
+        }
+
+        let output_args: HashSet<&String> = self.outputs_ownership.keys().collect();
+        let assiged_outputs: HashSet<&String> = self.assigned_outputs.keys().collect();
+        if output_args != assiged_outputs {
+            return false;
+        }
+
+        true
+    }
+
+    pub fn has_participant(&self, user_id: &UserID) -> bool {
+        self.participants.contains(user_id)
+    }
+
+    pub fn has_creator(&self, user_id: &UserID) -> bool {
+        &self.creator == user_id
+    }
+}
+
+#[derive(Debug, Clone, Default, Deserialize, Serialize)]
+pub struct Task<S: StateTag> {
+    state: TaskState,
+    extra: S,
+}
+
+pub trait StateTag {}
+impl StateTag for Create {}
+impl StateTag for Assign {}
+impl StateTag for Approve {}
+impl StateTag for Stage {}
+impl StateTag for Run {}
+impl StateTag for Finish {}
+impl StateTag for Done {}
+
+impl Task<Create> {
+    pub fn new(
+        requester: UserID,
+        req_executor: Executor,
+        req_func_args: FunctionArguments,
+        req_input_owners: impl Into<TaskFileOwners>,
+        req_output_owners: impl Into<TaskFileOwners>,
+        function: Function,
+    ) -> Result<Self> {
+        let req_input_owners = req_input_owners.into();
+        let req_output_owners = req_output_owners.into();
+
+        // gather all participants
+        let input_owners = req_input_owners.all_owners();
+        let output_owners = req_output_owners.all_owners();
+        let mut participants = UserList::unions(vec![input_owners, output_owners]);
+        participants.insert(requester.clone());
+        if !function.public {
+            participants.insert(function.owner.clone());
+        }
+
+        //check function compatibility
+        let fn_args_spec: HashSet<&String> = function.arguments.iter().collect();
+        let req_args: HashSet<&String> = req_func_args.inner().keys().collect();
+        ensure!(fn_args_spec == req_args, "function_arguments mismatch");
+
+        // check input fkeys
+        let inputs_spec: HashSet<&String> = function.inputs.iter().map(|f| &f.name).collect();
+        let req_input_fkeys: HashSet<&String> = req_input_owners.keys().collect();
+        ensure!(inputs_spec == req_input_fkeys, "input keys mismatch");
+
+        // check output fkeys
+        let outputs_spec: HashSet<&String> = function.outputs.iter().map(|f| &f.name).collect();
+        let req_output_fkeys: HashSet<&String> = req_output_owners.keys().collect();
+        ensure!(outputs_spec == req_output_fkeys, "output keys mismatch");
+
+        let ts = TaskState {
+            task_id: Uuid::new_v4(),
+            creator: requester,
+            executor: req_executor,
+            function_id: function.external_id(),
+            function_owner: function.owner.clone(),
+            function_arguments: req_func_args,
+            inputs_ownership: req_input_owners,
+            outputs_ownership: req_output_owners,
+            participants,
+            ..Default::default()
+        };
+
+        Ok(Task {
+            state: ts,
+            extra: Create,
+        })
+    }
+}
+
+impl Task<Assign> {
+    pub fn new(ts: TaskState) -> Result<Self> {
+        let task = Task::<Assign> {
+            state: ts,
+            extra: Assign,
+        };
+        Ok(task)
+    }
+
+    pub fn assign_input(
+        &mut self,
+        requester: &UserID,
+        fname: &str,
+        file: TeaclaveInputFile,
+    ) -> Result<()> {
+        ensure!(
+            file.owner.contains(requester),
+            "Assign: requester is not in the owner list. {:?}.",
+            file.external_id()
+        );
+
+        self.state.inputs_ownership.check(fname, &file.owner)?;
+        self.state.assigned_inputs.assign(fname, file)?;
+        Ok(())
+    }
+
+    pub fn assign_output(
+        &mut self,
+        requester: &UserID,
+        fname: &str,
+        file: TeaclaveOutputFile,
+    ) -> Result<()> {
+        ensure!(
+            file.owner.contains(requester),
+            "Assign: requester is not in the owner list. {:?}.",
+            file.external_id()
+        );
+
+        self.state.outputs_ownership.check(fname, &file.owner)?;
+        self.state.assigned_outputs.assign(fname, file)?;
+        Ok(())
+    }
+}
+
+impl Task<Approve> {
+    pub fn new(ts: TaskState) -> Result<Self> {
+        let task = Task::<Approve> {
+            state: ts,
+            extra: Approve,
+        };
+        Ok(task)
+    }
+
+    pub fn approve(&mut self, requester: &UserID) -> Result<()> {
+        ensure!(
+            self.state.participants.contains(requester),
+            "Unexpected user trying to approve a task: {:?}",
+            requester
+        );
+
+        self.state.approved_users.insert(requester.clone());
+        Ok(())
+    }
+}
+impl Task<Stage> {
+    pub fn new(ts: TaskState) -> Result<Self> {
+        let task = Task::<Stage> {
+            state: ts,
+            extra: Stage,
+        };
+        Ok(task)
+    }
+
+    pub fn stage_for_running(
+        &mut self,
+        requester: &UserID,
+        function: Function,
+    ) -> Result<StagedTask> {
+        ensure!(
+            self.state.has_creator(&requester),
+            "Requestor is not the task creater"
+        );
+
+        let function_arguments = self.state.function_arguments.clone();
+        let staged_task = StagedTask {
+            task_id: self.state.task_id,
+            executor: self.state.executor,
+            executor_type: function.executor_type,
+            function_id: function.id,
+            function_name: function.name,
+            function_payload: function.payload,
+            function_arguments,
+            input_data: self.state.assigned_inputs.clone().into(),
+            output_data: self.state.assigned_outputs.clone().into(),
+        };
+        Ok(staged_task)
+    }
+}
+
+impl Task<Run> {
+    pub fn new(ts: TaskState) -> Result<Self> {
+        let task = Task::<Run> {
+            state: ts,
+            extra: Run,
+        };
+        Ok(task)
+    }
+}
+
+impl Task<Finish> {
+    pub fn new(ts: TaskState) -> Result<Self> {
+        let task = Task::<Finish> {
+            state: ts,
+            extra: Finish,
+        };
+        Ok(task)
+    }
+
+    pub fn update_output_cmac(
+        &mut self,
+        fname: &str,
+        auth_tag: &FileAuthTag,
+    ) -> Result<&TeaclaveOutputFile> {
+        self.state.assigned_outputs.update_cmac(fname, auth_tag)
+    }
+
+    pub fn update_result(&mut self, result: TaskResult) -> Result<()> {
+        self.state.result = result;
+        Ok(())
+    }
+}
+
+impl Task<Done> {
+    pub fn new(ts: TaskState) -> Result<Self> {
+        let task = Task::<Done> {
+            state: ts,
+            extra: Done,
+        };
+        Ok(task)
+    }
+}
+
+impl std::convert::TryFrom<Task<Assign>> for Task<Approve> {
+    type Error = Error;
+    fn try_from(task: Task<Assign>) -> Result<Task<Approve>> {
+        ensure!(
+            task.state.all_data_assigned(),
+            "Not ready: Assign -> Approve"
+        );
+        Task::<Approve>::new(task.state)
+    }
+}
+
+impl std::convert::TryFrom<Task<Approve>> for Task<Stage> {
+    type Error = Error;
+    fn try_from(task: Task<Approve>) -> Result<Task<Stage>> {
+        ensure!(
+            task.state.everyone_approved(),
+            "Not ready: Apporve -> Stage"
+        );
+        Task::<Stage>::new(task.state)
+    }
+}
+
+impl std::convert::TryFrom<Task<Stage>> for Task<Run> {
+    type Error = Error;
+    fn try_from(task: Task<Stage>) -> Result<Task<Run>> {
+        Task::<Run>::new(task.state)
+    }
+}
+
+impl std::convert::TryFrom<Task<Run>> for Task<Finish> {
+    type Error = Error;
+    fn try_from(task: Task<Run>) -> Result<Task<Finish>> {
+        Task::<Finish>::new(task.state)
+    }
+}
+
+impl std::convert::TryFrom<Task<Finish>> for Task<Done> {
+    type Error = Error;
+    fn try_from(task: Task<Finish>) -> Result<Task<Done>> {
+        Task::<Done>::new(task.state)
+    }
+}
+
+impl std::convert::TryFrom<TaskState> for Task<Assign> {
+    type Error = Error;
+
+    fn try_from(ts: TaskState) -> Result<Self> {
+        let task = match ts.status {
+            TaskStatus::Created => Task::<Assign>::new(ts)?,
+            _ => bail!("Cannot restore to Assign from saved state "),
+        };
+        Ok(task)
+    }
+}
+
+impl std::convert::TryFrom<TaskState> for Task<Approve> {
+    type Error = Error;
+
+    fn try_from(ts: TaskState) -> Result<Self> {
+        let task = match ts.status {
+            TaskStatus::Created => {
+                let task: Task<Assign> = ts.try_into()?;
+                task.try_into()?
+            }
+            TaskStatus::DataAssigned => Task::<Approve>::new(ts)?,
+            _ => bail!("Cannot restore to Approve from saved state"),
+        };
+        Ok(task)
+    }
+}
+
+impl std::convert::TryFrom<TaskState> for Task<Stage> {
+    type Error = Error;
+
+    fn try_from(ts: TaskState) -> Result<Self> {
+        let task = match ts.status {
+            TaskStatus::Created | TaskStatus::DataAssigned => {
+                let task: Task<Approve> = ts.try_into()?;
+                task.try_into()?
+            }
+            TaskStatus::Approved => Task::<Stage>::new(ts)?,
+            _ => bail!("Cannot restore to Stage from saved state"),
+        };
+        Ok(task)
+    }
+}
+
+impl std::convert::TryFrom<TaskState> for Task<Run> {
+    type Error = Error;
+
+    fn try_from(ts: TaskState) -> Result<Self> {
+        let task = match ts.status {
+            TaskStatus::Staged => Task::<Run>::new(ts)?,
+            _ => bail!("Cannot restore to Run from saved state"),
+        };
+        Ok(task)
+    }
+}
+
+impl std::convert::TryFrom<TaskState> for Task<Finish> {
+    type Error = Error;
+
+    fn try_from(ts: TaskState) -> Result<Self> {
+        let task = match ts.status {
+            TaskStatus::Running => Task::<Finish>::new(ts)?,
+            _ => bail!("Cannot restore to Finish from saved state"),
+        };
+        Ok(task)
+    }
+}
+
+impl std::convert::From<Task<Create>> for TaskState {
+    fn from(mut task: Task<Create>) -> TaskState {
+        task.state.status = TaskStatus::Created;
+        task.state
+    }
+}
+
+impl_transit_and_into_task_state!(Assign => Approve);
+impl_transit_and_into_task_state!(Approve => Stage);
+impl_transit_and_into_task_state!(Stage => Run);
+impl_transit_and_into_task_state!(Run => Finish);
+impl_transit_and_into_task_state!(Finish => Done);
+
+#[macro_export]
+macro_rules! impl_transit_and_into_task_state {
+    ( $cur:ty => $next:ty ) => {
+        impl std::convert::From<Task<$cur>> for TaskState {
+            fn from(mut task: Task<$cur>) -> TaskState {
+                let nt: Result<Task<$next>> = task.clone().try_into();
+                match nt {
+                    Ok(mut t) => {
+                        t.state.status = t.extra.into();
+                        t.state
+                    }
+                    Err(_) => {
+                        task.state.status = task.extra.into();
+                        task.state
+                    }
+                }
+            }
+        }
+    };
+}
+
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Create;
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Assign;
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Approve;
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Stage;
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Run;
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Finish;
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Done;
+
+impl std::convert::From<Create> for TaskStatus {
+    fn from(_tag: Create) -> TaskStatus {
+        TaskStatus::Created
+    }
+}
+
+impl std::convert::From<Assign> for TaskStatus {
+    fn from(_tag: Assign) -> TaskStatus {
+        TaskStatus::Created
+    }
+}
+
+impl std::convert::From<Approve> for TaskStatus {
+    fn from(_tag: Approve) -> TaskStatus {
+        TaskStatus::DataAssigned
+    }
+}
+
+impl std::convert::From<Stage> for TaskStatus {
+    fn from(_tag: Stage) -> TaskStatus {
+        TaskStatus::Approved
+    }
+}
+
+impl std::convert::From<Run> for TaskStatus {
+    fn from(_tag: Run) -> TaskStatus {
+        TaskStatus::Staged
+    }
+}
+
+impl std::convert::From<Finish> for TaskStatus {
+    fn from(_tag: Finish) -> TaskStatus {
+        TaskStatus::Running
+    }
+}
+
+impl std::convert::From<Done> for TaskStatus {
+    fn from(_tag: Done) -> TaskStatus {
+        TaskStatus::Finished
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@teaclave.apache.org
For additional commands, e-mail: commits-help@teaclave.apache.org