You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@teaclave.apache.org by ms...@apache.org on 2020/04/30 02:53:38 UTC

[incubator-teaclave] branch master updated: [task] Restrict task status transition (#280)

This is an automated email from the ASF dual-hosted git repository.

mssun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git


The following commit(s) were added to refs/heads/master by this push:
     new 52d55e1  [task] Restrict task status transition (#280)
52d55e1 is described below

commit 52d55e15c84a965c7446ccfc7bcd2a031023ca44
Author: Zhaofeng Chen <zf...@apache.org>
AuthorDate: Wed Apr 29 19:53:30 2020 -0700

    [task] Restrict task status transition (#280)
---
 services/execution/enclave/src/service.rs          |  14 +-
 services/management/enclave/src/service.rs         |  72 ++------
 services/proto/src/proto/teaclave_common.proto     |   5 +-
 .../src/proto/teaclave_scheduler_service.proto     |   1 -
 services/proto/src/teaclave_common.rs              |   2 -
 services/proto/src/teaclave_scheduler_service.rs   |   6 +-
 services/scheduler/enclave/src/service.rs          |  11 +-
 .../enclave/src/end_to_end/native_echo.rs          |   6 +-
 tests/functional/enclave/src/execution_service.rs  |   1 +
 tests/functional/enclave/src/frontend_service.rs   |   2 +-
 tests/functional/enclave/src/management_service.rs |   2 +-
 tests/functional/enclave/src/scheduler_service.rs  |   4 +-
 types/src/staged_function.rs                       |   2 +-
 types/src/task.rs                                  | 191 +++++++++++++++------
 14 files changed, 180 insertions(+), 139 deletions(-)

diff --git a/services/execution/enclave/src/service.rs b/services/execution/enclave/src/service.rs
index 3d46fd8..1b73311 100644
--- a/services/execution/enclave/src/service.rs
+++ b/services/execution/enclave/src/service.rs
@@ -97,17 +97,16 @@ impl TeaclaveExecutionService {
     }
 
     fn invoke_task(&mut self, task: &StagedTask) -> Result<TaskOutputs> {
+        self.update_task_status(&task.task_id, TaskStatus::Running)?;
+
         let file_mgr = TaskFileManager::new(
             WORKER_BASE_DIR,
             &task.task_id,
             &task.input_data,
             &task.output_data,
         )?;
-
-        self.update_task_status(&task.task_id, TaskStatus::DataPreparing, String::new())?;
         let invocation = prepare_task(&task, &file_mgr)?;
 
-        self.update_task_status(&task.task_id, TaskStatus::Running, String::new())?;
         log::info!("Invoke function: {:?}", invocation);
         let worker = Worker::default();
         let summary = worker.invoke_function(invocation)?;
@@ -134,13 +133,8 @@ impl TeaclaveExecutionService {
         Ok(())
     }
 
-    fn update_task_status(
-        &mut self,
-        task_id: &Uuid,
-        task_status: TaskStatus,
-        status_info: String,
-    ) -> Result<()> {
-        let request = UpdateTaskStatusRequest::new(task_id.to_owned(), task_status, status_info);
+    fn update_task_status(&mut self, task_id: &Uuid, task_status: TaskStatus) -> Result<()> {
+        let request = UpdateTaskStatusRequest::new(task_id.to_owned(), task_status);
         let _response = self
             .scheduler_client
             .clone()
diff --git a/services/management/enclave/src/service.rs b/services/management/enclave/src/service.rs
index 17f4c18..0ba09b4 100644
--- a/services/management/enclave/src/service.rs
+++ b/services/management/enclave/src/service.rs
@@ -266,19 +266,15 @@ impl TeaclaveManagement for TeaclaveManagementService {
             .read_from_db(&request.function_id)
             .map_err(|_| ServiceError::PermissionDenied)?;
 
-        let mut task = Task::new(
+        let task = Task::new(
             user_id,
             request.executor,
-            &function,
             request.function_arguments,
             request.input_owners_map,
             request.output_owners_map,
-        );
-
-        task.check_function_compatibility(&function)
-            .map_err(|_| ServiceError::BadTask)?;
-
-        task.update_status(TaskStatus::Created);
+            function,
+        )
+        .map_err(|_| ServiceError::BadTask)?;
 
         log::info!("CreateTask: {:?}", task);
 
@@ -351,11 +347,6 @@ impl TeaclaveManagement for TeaclaveManagementService {
             ServiceError::PermissionDenied
         );
 
-        ensure!(
-            task.status == TaskStatus::Created,
-            ServiceError::PermissionDenied
-        );
-
         for (data_name, data_id) in request.input_map.iter() {
             let file: TeaclaveInputFile = self
                 .read_from_db(&data_id)
@@ -372,10 +363,6 @@ impl TeaclaveManagement for TeaclaveManagementService {
                 .map_err(|_| ServiceError::PermissionDenied)?;
         }
 
-        if task.all_data_assigned() {
-            task.update_status(TaskStatus::DataAssigned);
-        }
-
         log::info!("AssignData: {:?}", task);
 
         self.write_to_db(&task)
@@ -398,20 +385,8 @@ impl TeaclaveManagement for TeaclaveManagementService {
             .read_from_db(&request.task_id)
             .map_err(|_| ServiceError::PermissionDenied)?;
 
-        ensure!(
-            task.participants.contains(&user_id),
-            ServiceError::PermissionDenied
-        );
-
-        ensure!(
-            task.status == TaskStatus::DataAssigned,
-            ServiceError::PermissionDenied,
-        );
-
-        task.approved_users.insert(user_id);
-        if task.all_approved() {
-            task.update_status(TaskStatus::Approved);
-        }
+        task.approve(&user_id)
+            .map_err(|_| ServiceError::PermissionDenied)?;
 
         log::info!("ApproveTask: approve:{:?}", task);
 
@@ -437,6 +412,7 @@ impl TeaclaveManagement for TeaclaveManagementService {
 
         log::info!("InvokeTask: get task: {:?}", task);
 
+        // Early validation
         ensure!(task.creator == user_id, ServiceError::PermissionDenied);
         ensure!(
             task.status == TaskStatus::Approved,
@@ -450,7 +426,6 @@ impl TeaclaveManagement for TeaclaveManagementService {
         log::info!("InvokeTask: get function: {:?}", function);
 
         let mut input_map: HashMap<String, FunctionInputFile> = HashMap::new();
-        let mut output_map: HashMap<String, FunctionOutputFile> = HashMap::new();
         for (data_name, data_id) in task.input_map.iter() {
             let input_file: TeaclaveInputFile = self
                 .read_from_db(&data_id)
@@ -459,6 +434,7 @@ impl TeaclaveManagement for TeaclaveManagementService {
             input_map.insert(data_name.to_string(), input_data);
         }
 
+        let mut output_map: HashMap<String, FunctionOutputFile> = HashMap::new();
         for (data_name, data_id) in task.output_map.iter() {
             let output_file: TeaclaveOutputFile = self
                 .read_from_db(&data_id)
@@ -468,23 +444,11 @@ impl TeaclaveManagement for TeaclaveManagementService {
             output_map.insert(data_name.to_string(), output_data);
         }
 
-        let function_arguments = task.function_arguments.clone();
+        let staged_task = task.stage_for_running(&user_id, function, input_map, output_map)?;
 
-        let staged_task = StagedTask::new()
-            .task_id(task.task_id)
-            .executor(task.executor)
-            .executor_type(function.executor_type)
-            .function_id(function.id)
-            .function_payload(function.payload)
-            .function_arguments(function_arguments)
-            .input_data(input_map)
-            .output_data(output_map);
+        log::info!("InvokeTask: staged task: {:?}", staged_task);
 
         self.enqueue_to_db(StagedTask::get_queue_key().as_bytes(), &staged_task)?;
-        task.status = TaskStatus::Running;
-
-        log::info!("InvokeTask: staged task: {:?}", task);
-
         self.write_to_db(&task)
             .map_err(|_| ServiceError::StorageError)?;
         Ok(InvokeTaskResponse)
@@ -519,7 +483,8 @@ impl TeaclaveManagementService {
 
         Ok(service)
     }
-    pub fn create_fusion_data(&self, owner: impl Into<OwnerList>) -> Result<TeaclaveOutputFile> {
+
+    pub fn create_fusion_data(&self, owners: impl Into<OwnerList>) -> Result<TeaclaveOutputFile> {
         let uuid = Uuid::new_v4();
         let url = format!(
             "file://{}/{}.fusion",
@@ -529,13 +494,7 @@ impl TeaclaveManagementService {
         let url = Url::parse(&url).map_err(|_| anyhow!("invalid url"))?;
         let crypto_info = FileCrypto::default();
 
-        Ok(TeaclaveOutputFile {
-            url,
-            cmac: None,
-            crypto_info,
-            owner: owner.into(),
-            uuid,
-        })
+        Ok(TeaclaveOutputFile::new(url, crypto_info, owners))
     }
 
     fn get_request_user_id(
@@ -694,11 +653,12 @@ pub mod tests {
         let task = Task::new(
             UserID::from("mock_user"),
             Executor::MesaPy,
-            &function,
             function_arguments,
             HashMap::new(),
             HashMap::new(),
-        );
+            function,
+        )
+        .unwrap();
 
         assert!(Task::match_prefix(&task.key_string()));
         let value = task.to_vec().unwrap();
diff --git a/services/proto/src/proto/teaclave_common.proto b/services/proto/src/proto/teaclave_common.proto
index 4969227..155deeb 100644
--- a/services/proto/src/proto/teaclave_common.proto
+++ b/services/proto/src/proto/teaclave_common.proto
@@ -26,9 +26,8 @@ enum TaskStatus {
   DataAssigned = 1;
   Approved = 2;
   Staged = 3;
-  DataPreparing = 4;
-  Running = 5;
-  Finished = 6;
+  Running = 4;
+  Finished = 10;
 }
 
 message TaskResult {
diff --git a/services/proto/src/proto/teaclave_scheduler_service.proto b/services/proto/src/proto/teaclave_scheduler_service.proto
index 8a82565..31001dd 100644
--- a/services/proto/src/proto/teaclave_scheduler_service.proto
+++ b/services/proto/src/proto/teaclave_scheduler_service.proto
@@ -16,7 +16,6 @@ message PullTaskResponse {
 message UpdateTaskStatusRequest {
   string task_id = 1;
   teaclave_common_proto.TaskStatus task_status = 2;
-  string status_info = 3;
 }
 message UpdateTaskStatusResponse {}
 
diff --git a/services/proto/src/teaclave_common.rs b/services/proto/src/teaclave_common.rs
index 9675dd0..5af0c61 100644
--- a/services/proto/src/teaclave_common.rs
+++ b/services/proto/src/teaclave_common.rs
@@ -109,7 +109,6 @@ pub fn i32_to_task_status(status: i32) -> Result<TaskStatus> {
         Some(proto::TaskStatus::DataAssigned) => TaskStatus::DataAssigned,
         Some(proto::TaskStatus::Approved) => TaskStatus::Approved,
         Some(proto::TaskStatus::Staged) => TaskStatus::Staged,
-        Some(proto::TaskStatus::DataPreparing) => TaskStatus::DataPreparing,
         Some(proto::TaskStatus::Running) => TaskStatus::Running,
         Some(proto::TaskStatus::Finished) => TaskStatus::Finished,
         None => bail!("invalid task status"),
@@ -123,7 +122,6 @@ pub fn i32_from_task_status(status: TaskStatus) -> i32 {
         TaskStatus::DataAssigned => proto::TaskStatus::DataAssigned as i32,
         TaskStatus::Approved => proto::TaskStatus::Approved as i32,
         TaskStatus::Staged => proto::TaskStatus::Staged as i32,
-        TaskStatus::DataPreparing => proto::TaskStatus::DataPreparing as i32,
         TaskStatus::Running => proto::TaskStatus::Running as i32,
         TaskStatus::Finished => proto::TaskStatus::Finished as i32,
     }
diff --git a/services/proto/src/teaclave_scheduler_service.rs b/services/proto/src/teaclave_scheduler_service.rs
index 3993940..585a9c4 100644
--- a/services/proto/src/teaclave_scheduler_service.rs
+++ b/services/proto/src/teaclave_scheduler_service.rs
@@ -84,15 +84,13 @@ pub struct UpdateTaskResultResponse {}
 pub struct UpdateTaskStatusRequest {
     pub task_id: Uuid,
     pub task_status: TaskStatus,
-    pub status_info: String,
 }
 
 impl UpdateTaskStatusRequest {
-    pub fn new(task_id: Uuid, task_status: TaskStatus, status_info: String) -> Self {
+    pub fn new(task_id: Uuid, task_status: TaskStatus) -> Self {
         Self {
             task_id,
             task_status,
-            status_info,
         }
     }
 }
@@ -211,7 +209,6 @@ impl std::convert::TryFrom<proto::UpdateTaskStatusRequest> for UpdateTaskStatusR
         let ret = Self {
             task_id: Uuid::parse_str(&proto.task_id)?,
             task_status,
-            status_info: proto.status_info,
         };
         Ok(ret)
     }
@@ -223,7 +220,6 @@ impl std::convert::From<UpdateTaskStatusRequest> for proto::UpdateTaskStatusRequ
         proto::UpdateTaskStatusRequest {
             task_id: req.task_id.to_string(),
             task_status,
-            status_info: req.status_info,
         }
     }
 }
diff --git a/services/scheduler/enclave/src/service.rs b/services/scheduler/enclave/src/service.rs
index bad3535..fd8a438 100644
--- a/services/scheduler/enclave/src/service.rs
+++ b/services/scheduler/enclave/src/service.rs
@@ -190,7 +190,10 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
     ) -> TeaclaveServiceResponseResult<UpdateTaskStatusResponse> {
         let request = request.message;
         let mut task = self.get_task(&request.task_id)?;
-        task.status = request.task_status;
+
+        // Only TaskStatus::Running is allowed here so far.
+        task.invoking_by_executor()?;
+
         log::info!("UpdateTaskStatus: Task {:?}", task);
         self.put_into_db(&task)?;
         Ok(UpdateTaskStatusResponse {})
@@ -207,12 +210,10 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
             self.update_outputs_cmac(&task.output_map, &outputs.tags_map)?;
         };
 
-        task.result = request.task_result;
-
         // Updating task result means we have finished execution
-        task.status = TaskStatus::Finished;
-        self.put_into_db(&task)?;
+        task.finish(request.task_result)?;
 
+        self.put_into_db(&task)?;
         Ok(UpdateTaskResultResponse {})
     }
 }
diff --git a/tests/functional/enclave/src/end_to_end/native_echo.rs b/tests/functional/enclave/src/end_to_end/native_echo.rs
index 8864559..bd67bc0 100644
--- a/tests/functional/enclave/src/end_to_end/native_echo.rs
+++ b/tests/functional/enclave/src/end_to_end/native_echo.rs
@@ -50,12 +50,10 @@ pub fn test_echo_task_success() {
 
     log::info!("Create task: {:?}", response);
 
-    // Assign Data To Task
     let task_id = response.task_id;
-    let request = AssignDataRequest::new(task_id.clone(), hashmap!(), hashmap!());
-    let response = client.assign_data(request).unwrap();
 
-    log::info!("Assign data: {:?}", response);
+    // Assign Data To Task
+    // This task does not have any input/output files, we can skip the assignment process.
 
     // Approve Task
     approve_task(&mut client, &task_id).unwrap();
diff --git a/tests/functional/enclave/src/execution_service.rs b/tests/functional/enclave/src/execution_service.rs
index 5ca9f64..9590daa 100644
--- a/tests/functional/enclave/src/execution_service.rs
+++ b/tests/functional/enclave/src/execution_service.rs
@@ -28,6 +28,7 @@ fn test_execute_function() {
     let task_id = Uuid::new_v4();
     let task = Task {
         task_id,
+        status: TaskStatus::Staged,
         ..Default::default()
     };
 
diff --git a/tests/functional/enclave/src/frontend_service.rs b/tests/functional/enclave/src/frontend_service.rs
index 594c269..1508c6d 100644
--- a/tests/functional/enclave/src/frontend_service.rs
+++ b/tests/functional/enclave/src/frontend_service.rs
@@ -306,7 +306,7 @@ fn test_invoke_task() {
 
     let request = GetTaskRequest::new(task_id);
     let response = client.get_task(request).unwrap();
-    assert_eq!(response.status, TaskStatus::Running);
+    assert_eq!(response.status, TaskStatus::Staged);
 
     let request = PullTaskRequest {};
     let mut scheduler_client = get_scheduler_client();
diff --git a/tests/functional/enclave/src/management_service.rs b/tests/functional/enclave/src/management_service.rs
index 62b83e1..acb0482 100644
--- a/tests/functional/enclave/src/management_service.rs
+++ b/tests/functional/enclave/src/management_service.rs
@@ -581,7 +581,7 @@ fn test_invoke_task() {
 
     let request = GetTaskRequest::new(task_id);
     let response = client2.get_task(request).unwrap();
-    assert_eq!(response.status, TaskStatus::Running);
+    assert_eq!(response.status, TaskStatus::Staged);
 
     let request = PullTaskRequest {};
     let mut scheduler_client = get_scheduler_client();
diff --git a/tests/functional/enclave/src/scheduler_service.rs b/tests/functional/enclave/src/scheduler_service.rs
index be052ae..24c4f7f 100644
--- a/tests/functional/enclave/src/scheduler_service.rs
+++ b/tests/functional/enclave/src/scheduler_service.rs
@@ -52,7 +52,7 @@ fn test_update_task_status_result() {
     let task_id = Uuid::new_v4();
     let task = Task {
         task_id,
-        status: TaskStatus::Running,
+        status: TaskStatus::Staged,
         ..Default::default()
     };
 
@@ -79,7 +79,7 @@ fn test_update_task_status_result() {
     log::debug!("response: {:?}", response);
     let task_id = response.staged_task.task_id;
 
-    let request = UpdateTaskStatusRequest::new(task_id, TaskStatus::Running, String::new());
+    let request = UpdateTaskStatusRequest::new(task_id, TaskStatus::Running);
     let response = client.update_task_status(request);
     assert!(response.is_ok());
 
diff --git a/types/src/staged_function.rs b/types/src/staged_function.rs
index 8483b7a..03a46b9 100644
--- a/types/src/staged_function.rs
+++ b/types/src/staged_function.rs
@@ -98,7 +98,7 @@ impl std::fmt::Display for ArgumentValue {
 #[derive(Clone, Serialize, Deserialize, Debug, Default)]
 pub struct FunctionArguments {
     #[serde(flatten)]
-    pub inner: HashMap<String, ArgumentValue>,
+    inner: HashMap<String, ArgumentValue>,
 }
 
 impl<S: core::default::Default + std::hash::BuildHasher> From<FunctionArguments>
diff --git a/types/src/task.rs b/types/src/task.rs
index f23a24d..c6f3090 100644
--- a/types/src/task.rs
+++ b/types/src/task.rs
@@ -130,7 +130,6 @@ pub enum TaskStatus {
     DataAssigned,
     Approved,
     Staged,
-    DataPreparing,
     Running,
     Finished,
 }
@@ -384,79 +383,133 @@ impl Storable for Task {
 impl Task {
     pub fn new(
         requester: UserID,
-        executor: Executor,
-        function: &Function,
-        function_arguments: FunctionArguments,
-        input_owners_map: HashMap<String, OwnerList>,
-        output_owners_map: HashMap<String, OwnerList>,
-    ) -> Self {
-        let input_owners = UserList::unions(input_owners_map.values().cloned());
-        let output_owners = UserList::unions(output_owners_map.values().cloned());
+        req_executor: Executor,
+        req_func_args: FunctionArguments,
+        req_input_owners: HashMap<String, OwnerList>,
+        req_output_owners: HashMap<String, OwnerList>,
+        function: Function,
+    ) -> Result<Self> {
+        // gather all participants
+        let input_owners = UserList::unions(req_input_owners.values().cloned());
+        let output_owners = UserList::unions(req_output_owners.values().cloned());
         let mut participants = UserList::unions(vec![input_owners, output_owners]);
         participants.insert(requester.clone());
         if !function.public {
             participants.insert(function.owner.clone());
         }
 
-        Task {
+        //check function compatibility
+        let fn_args_spec: HashSet<&String> = function.arguments.iter().collect();
+        let req_args: HashSet<&String> = req_func_args.inner().keys().collect();
+        ensure!(fn_args_spec == req_args, "function_arguments mismatch");
+
+        // check input fkeys
+        let inputs_spec: HashSet<&String> = function.inputs.iter().map(|f| &f.name).collect();
+        let req_input_fkeys: HashSet<&String> = req_input_owners.keys().collect();
+        ensure!(inputs_spec == req_input_fkeys, "input keys mismatch");
+
+        // check output fkeys
+        let outputs_spec: HashSet<&String> = function.outputs.iter().map(|f| &f.name).collect();
+        let req_output_fkeys: HashSet<&String> = req_output_owners.keys().collect();
+        ensure!(outputs_spec == req_output_fkeys, "output keys mismatch");
+
+        // Skip the assignment if no file is required
+        let status = if req_input_owners.is_empty() && req_output_owners.is_empty() {
+            TaskStatus::DataAssigned
+        } else {
+            TaskStatus::Created
+        };
+
+        let task = Task {
             task_id: Uuid::new_v4(),
             creator: requester,
-            executor,
+            executor: req_executor,
             function_id: function.external_id(),
             function_owner: function.owner.clone(),
-            function_arguments,
-            input_owners_map,
-            output_owners_map,
+            function_arguments: req_func_args,
+            input_owners_map: req_input_owners,
+            output_owners_map: req_output_owners,
             participants,
+            status,
             ..Default::default()
-        }
-    }
+        };
 
-    pub fn update_status(&mut self, status: TaskStatus) {
-        self.status = status;
+        Ok(task)
     }
 
-    pub fn check_function_compatibility(&self, function: &Function) -> Result<()> {
-        // check arguments
-        let function_arguments: HashSet<&String> = function.arguments.iter().collect();
-        let provide_args: HashSet<&String> = self.function_arguments.inner().keys().collect();
+    pub fn approve(&mut self, requester: &UserID) -> Result<()> {
         ensure!(
-            function_arguments == provide_args,
-            "function_arguments mismatch"
+            self.status == TaskStatus::DataAssigned,
+            "Unexpected task status when approving: {:?}",
+            self.status
         );
 
-        // check input
-        let input_args: HashSet<String> = function.inputs.iter().map(|f| f.name.clone()).collect();
-        let provide_args: HashSet<String> = self.input_owners_map.keys().cloned().collect();
-        ensure!(input_args == provide_args, "input keys mismatch");
+        ensure!(
+            self.participants.contains(requester),
+            "Unexpected user trying to approve a task: {:?}",
+            requester
+        );
 
-        // check output
-        let output_args: HashSet<String> =
-            function.outputs.iter().map(|f| f.name.clone()).collect();
-        let provide_args: HashSet<String> = self.output_owners_map.keys().cloned().collect();
-        ensure!(output_args == provide_args, "output keys mismatch");
+        self.approved_users.insert(requester.clone());
+        if self.participants == self.approved_users {
+            self.update_status(TaskStatus::Approved);
+        }
 
         Ok(())
     }
 
-    pub fn all_data_assigned(&self) -> bool {
-        let input_args: HashSet<String> = self.input_owners_map.keys().cloned().collect();
-        let assiged_inputs: HashSet<String> = self.input_map.keys().cloned().collect();
-        if input_args != assiged_inputs {
-            return false;
-        }
-
-        let output_args: HashSet<String> = self.output_owners_map.keys().cloned().collect();
-        let assiged_outputs: HashSet<String> = self.output_map.keys().cloned().collect();
-        if output_args != assiged_outputs {
-            return false;
-        }
+    pub fn invoking_by_executor(&mut self) -> Result<()> {
+        ensure!(
+            self.status == TaskStatus::Staged,
+            "Unexpected task status when invoked: {:?}",
+            self.status
+        );
+        self.status = TaskStatus::Running;
+        Ok(())
+    }
 
-        true
+    pub fn finish(&mut self, result: TaskResult) -> Result<()> {
+        ensure!(
+            self.status == TaskStatus::Running,
+            "Unexpected task status when invoked: {:?}",
+            self.status
+        );
+        self.result = result;
+        self.status = TaskStatus::Finished;
+        Ok(())
     }
 
-    pub fn all_approved(&self) -> bool {
-        self.participants == self.approved_users
+    pub fn stage_for_running(
+        &mut self,
+        requester: &UserID,
+        function: Function,
+        input_map: HashMap<String, FunctionInputFile>,
+        output_map: HashMap<String, FunctionOutputFile>,
+    ) -> Result<StagedTask> {
+        ensure!(
+            &self.creator == requester,
+            "Unexpected user trying to invoke a task: {:?}",
+            requester
+        );
+        ensure!(
+            self.status == TaskStatus::Approved,
+            "Unexpected task status when invoked: {:?}",
+            self.status
+        );
+        let function_arguments = self.function_arguments.clone();
+        let staged_task = StagedTask {
+            task_id: self.task_id,
+            executor: self.executor,
+            executor_type: function.executor_type,
+            function_id: function.id,
+            function_payload: function.payload,
+            function_arguments,
+            input_data: input_map.into(),
+            output_data: output_map.into(),
+        };
+
+        self.update_status(TaskStatus::Staged);
+        Ok(staged_task)
     }
 
     pub fn assign_input(
@@ -466,6 +519,12 @@ impl Task {
         file: &TeaclaveInputFile,
     ) -> Result<()> {
         ensure!(
+            self.status == TaskStatus::Created,
+            "Unexpected task status during input assignment: {:?}",
+            self.status
+        );
+
+        ensure!(
             file.owner.contains(requester),
             "Assign: requester is not in the owner list. {:?}.",
             file.external_id()
@@ -490,7 +549,12 @@ impl Task {
             "Assign: file already assigned. {:?}",
             fname
         );
+
         self.input_map.insert(fname.to_owned(), file.external_id());
+
+        if self.all_data_assigned() {
+            self.update_status(TaskStatus::DataAssigned);
+        }
         Ok(())
     }
 
@@ -501,6 +565,12 @@ impl Task {
         file: &TeaclaveOutputFile,
     ) -> Result<()> {
         ensure!(
+            self.status == TaskStatus::Created,
+            "Unexpected task status during output assignment: {:?}",
+            self.status
+        );
+
+        ensure!(
             file.owner.contains(requester),
             "Assign: requester is not in the owner list. {:?}.",
             file.external_id()
@@ -525,7 +595,32 @@ impl Task {
             "Assign: file already assigned. {:?}",
             fname
         );
+
         self.output_map.insert(fname.to_owned(), file.external_id());
+
+        if self.all_data_assigned() {
+            self.update_status(TaskStatus::DataAssigned);
+        }
         Ok(())
     }
+
+    fn update_status(&mut self, status: TaskStatus) {
+        self.status = status;
+    }
+
+    fn all_data_assigned(&self) -> bool {
+        let input_args: HashSet<&String> = self.input_owners_map.keys().collect();
+        let assiged_inputs: HashSet<&String> = self.input_map.keys().collect();
+        if input_args != assiged_inputs {
+            return false;
+        }
+
+        let output_args: HashSet<&String> = self.output_owners_map.keys().collect();
+        let assiged_outputs: HashSet<&String> = self.output_map.keys().collect();
+        if output_args != assiged_outputs {
+            return false;
+        }
+
+        true
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@teaclave.apache.org
For additional commands, e-mail: commits-help@teaclave.apache.org