You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@teaclave.apache.org by ms...@apache.org on 2020/05/13 17:24:42 UTC
[incubator-teaclave] branch master updated: [task] Bind Task state
to static type (#293)
This is an automated email from the ASF dual-hosted git repository.
mssun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git
The following commit(s) were added to refs/heads/master by this push:
new 756e21b [task] Bind Task state to static type (#293)
756e21b is described below
commit 756e21b2d9c3bcb4ac2b2486cc55696b5a1abe1e
Author: Zhaofeng Chen <zf...@apache.org>
AuthorDate: Wed May 13 10:24:34 2020 -0700
[task] Bind Task state to static type (#293)
---
services/management/enclave/src/service.rs | 106 +++--
services/scheduler/enclave/src/service.rs | 32 +-
.../enclave/src/end_to_end/builtin_echo.rs | 5 +-
tests/functional/enclave/src/execution_service.rs | 8 +-
tests/functional/enclave/src/scheduler_service.rs | 13 +-
types/src/lib.rs | 2 +
types/src/task.rs | 255 +----------
types/src/task_state.rs | 499 +++++++++++++++++++++
8 files changed, 594 insertions(+), 326 deletions(-)
diff --git a/services/management/enclave/src/service.rs b/services/management/enclave/src/service.rs
index 1ee15b6..35311a6 100644
--- a/services/management/enclave/src/service.rs
+++ b/services/management/enclave/src/service.rs
@@ -17,6 +17,7 @@
use anyhow::{anyhow, Result};
use std::collections::HashMap;
+use std::convert::TryInto;
use std::prelude::v1::*;
use std::sync::{Arc, SgxMutex as Mutex};
use teaclave_proto::teaclave_frontend_service::{
@@ -36,11 +37,7 @@ use teaclave_proto::teaclave_storage_service::{
use teaclave_rpc::endpoint::Endpoint;
use teaclave_rpc::Request;
use teaclave_service_enclave_utils::{ensure, teaclave_service};
-use teaclave_types::{
- ExternalID, FileCrypto, Function, OwnerList, StagedTask, Storable, Task, TaskStatus,
- TeaclaveInputFile, TeaclaveOutputFile, TeaclaveServiceResponseError,
- TeaclaveServiceResponseResult, UserID,
-};
+use teaclave_types::*;
use thiserror::Error;
use url::Url;
use uuid::Uuid;
@@ -266,7 +263,7 @@ impl TeaclaveManagement for TeaclaveManagementService {
.read_from_db(&request.function_id)
.map_err(|_| ServiceError::PermissionDenied)?;
- let task = Task::new(
+ let task = Task::<Create>::new(
user_id,
request.executor,
request.function_arguments,
@@ -278,10 +275,12 @@ impl TeaclaveManagement for TeaclaveManagementService {
log::info!("CreateTask: {:?}", task);
- self.write_to_db(&task)
+ let ts: TaskState = task.into();
+ self.write_to_db(&ts)
.map_err(|_| ServiceError::StorageError)?;
- Ok(CreateTaskResponse::new(task.external_id()))
+ let response = CreateTaskResponse::new(ts.external_id());
+ Ok(response)
}
// access control: task.participants.contains(&user_id)
@@ -291,31 +290,28 @@ impl TeaclaveManagement for TeaclaveManagementService {
) -> TeaclaveServiceResponseResult<GetTaskResponse> {
let user_id = self.get_request_user_id(request.metadata())?;
- let task: Task = self
+ let ts: TaskState = self
.read_from_db(&request.message.task_id)
.map_err(|_| ServiceError::PermissionDenied)?;
- ensure!(
- task.participants.contains(&user_id),
- ServiceError::PermissionDenied
- );
+ ensure!(ts.has_participant(&user_id), ServiceError::PermissionDenied);
- log::info!("GetTask: {:?}", task);
+ log::info!("GetTask: {:?}", ts);
let response = GetTaskResponse {
- task_id: task.external_id(),
- creator: task.creator,
- function_id: task.function_id,
- function_owner: task.function_owner,
- function_arguments: task.function_arguments,
- inputs_ownership: task.inputs_ownership,
- outputs_ownership: task.outputs_ownership,
- participants: task.participants,
- approved_users: task.approved_users,
- assigned_inputs: task.assigned_inputs.external_ids(),
- assigned_outputs: task.assigned_outputs.external_ids(),
- result: task.result,
- status: task.status,
+ task_id: ts.external_id(),
+ creator: ts.creator,
+ function_id: ts.function_id,
+ function_owner: ts.function_owner,
+ function_arguments: ts.function_arguments,
+ inputs_ownership: ts.inputs_ownership,
+ outputs_ownership: ts.outputs_ownership,
+ participants: ts.participants,
+ approved_users: ts.approved_users,
+ assigned_inputs: ts.assigned_inputs.external_ids(),
+ assigned_outputs: ts.assigned_outputs.external_ids(),
+ result: ts.result,
+ status: ts.status,
};
Ok(response)
}
@@ -338,14 +334,16 @@ impl TeaclaveManagement for TeaclaveManagementService {
let request = request.message;
- let mut task: Task = self
+ let ts: TaskState = self
.read_from_db(&request.task_id)
.map_err(|_| ServiceError::PermissionDenied)?;
- ensure!(
- task.participants.contains(&user_id),
+ ensure!(ts.has_participant(&user_id), ServiceError::PermissionDenied);
+
+ let mut task: Task<Assign> = ts.try_into().map_err(|e| {
+ log::warn!("Assign state error: {:?}", e);
ServiceError::PermissionDenied
- );
+ })?;
for (data_name, data_id) in request.inputs.iter() {
let file: TeaclaveInputFile = self
@@ -365,7 +363,8 @@ impl TeaclaveManagement for TeaclaveManagementService {
log::info!("AssignData: {:?}", task);
- self.write_to_db(&task)
+ let ts: TaskState = task.into();
+ self.write_to_db(&ts)
.map_err(|_| ServiceError::StorageError)?;
Ok(AssignDataResponse)
@@ -381,16 +380,22 @@ impl TeaclaveManagement for TeaclaveManagementService {
let user_id = self.get_request_user_id(request.metadata())?;
let request = request.message;
- let mut task: Task = self
+ let ts: TaskState = self
.read_from_db(&request.task_id)
.map_err(|_| ServiceError::PermissionDenied)?;
+ let mut task: Task<Approve> = ts.try_into().map_err(|e| {
+ log::warn!("Approve state error: {:?}", e);
+ ServiceError::PermissionDenied
+ })?;
+
task.approve(&user_id)
.map_err(|_| ServiceError::PermissionDenied)?;
log::info!("ApproveTask: approve:{:?}", task);
- self.write_to_db(&task)
+ let ts: TaskState = task.into();
+ self.write_to_db(&ts)
.map_err(|_| ServiceError::StorageError)?;
Ok(ApproveTaskResponse)
@@ -406,32 +411,36 @@ impl TeaclaveManagement for TeaclaveManagementService {
let user_id = self.get_request_user_id(request.metadata())?;
let request = request.message;
- let mut task: Task = self
+ let ts: TaskState = self
.read_from_db(&request.task_id)
.map_err(|_| ServiceError::PermissionDenied)?;
- log::info!("InvokeTask: get task: {:?}", task);
-
// Early validation
- ensure!(task.creator == user_id, ServiceError::PermissionDenied);
- ensure!(
- task.status == TaskStatus::Approved,
- ServiceError::PermissionDenied
- );
+ ensure!(ts.has_creator(&user_id), ServiceError::PermissionDenied);
let function: Function = self
- .read_from_db(&task.function_id)
+ .read_from_db(&ts.function_id)
.map_err(|_| ServiceError::PermissionDenied)?;
log::info!("InvokeTask: get function: {:?}", function);
+ let mut task: Task<Stage> = ts.try_into().map_err(|e| {
+ log::warn!("Stage state error: {:?}", e);
+ ServiceError::PermissionDenied
+ })?;
+
+ log::info!("InvokeTask: get task: {:?}", task);
+
let staged_task = task.stage_for_running(&user_id, function)?;
log::info!("InvokeTask: staged task: {:?}", staged_task);
self.enqueue_to_db(StagedTask::get_queue_key().as_bytes(), &staged_task)?;
- self.write_to_db(&task)
+
+ let ts: TaskState = task.into();
+ self.write_to_db(&ts)
.map_err(|_| ServiceError::StorageError)?;
+
Ok(InvokeTaskResponse)
}
}
@@ -516,7 +525,6 @@ impl TeaclaveManagementService {
#[cfg(test_mode)]
fn add_mock_data(&self) -> Result<()> {
- use teaclave_types::{FileAuthTag, FunctionInput, FunctionOutput};
let mut output_file = self.create_fusion_data(vec!["mock_user1", "frontend_user"])?;
output_file.uuid = Uuid::parse_str("00000000-0000-0000-0000-000000000001")?;
output_file.cmac = Some(FileAuthTag::mock());
@@ -625,7 +633,7 @@ pub mod tests {
.owner("mock_user");
let function_arguments = FunctionArguments::new(hashmap!("arg" => "data"));
- let task = Task::new(
+ let task = Task::<Create>::new(
UserID::from("mock_user"),
Executor::MesaPy,
function_arguments,
@@ -635,9 +643,9 @@ pub mod tests {
)
.unwrap();
- assert!(Task::match_prefix(&task.key_string()));
- let value = task.to_vec().unwrap();
- let deserialized_task = Task::from_slice(&value).unwrap();
+ let ts: TaskState = task.try_into().unwrap();
+ let value = ts.to_vec().unwrap();
+ let deserialized_task = TaskState::from_slice(&value).unwrap();
info!("task: {:?}", deserialized_task);
}
diff --git a/services/scheduler/enclave/src/service.rs b/services/scheduler/enclave/src/service.rs
index 01a88c9..fe03eb7 100644
--- a/services/scheduler/enclave/src/service.rs
+++ b/services/scheduler/enclave/src/service.rs
@@ -19,6 +19,7 @@
#![allow(unused_variables)]
use std::collections::VecDeque;
+use std::convert::TryInto;
#[cfg(feature = "mesalock_sgx")]
use std::prelude::v1::*;
use std::sync::{Arc, SgxMutex as Mutex};
@@ -29,10 +30,7 @@ use teaclave_proto::teaclave_storage_service::*;
use teaclave_rpc::endpoint::Endpoint;
use teaclave_rpc::Request;
use teaclave_service_enclave_utils::teaclave_service;
-use teaclave_types::{
- ExternalID, OutputsTags, StagedTask, Storable, Task, TaskFiles, TaskResult, TaskStatus,
- TeaclaveOutputFile, TeaclaveServiceResponseError, TeaclaveServiceResponseResult,
-};
+use teaclave_types::*;
use uuid::Uuid;
use anyhow::anyhow;
@@ -98,8 +96,8 @@ impl TeaclaveSchedulerService {
.map_err(|_| TeaclaveSchedulerError::DataError.into())
}
- fn get_task(&self, task_id: &Uuid) -> Result<Task> {
- let key = ExternalID::new(Task::key_prefix(), task_id.to_owned());
+ fn get_task_state(&self, task_id: &Uuid) -> Result<TaskState> {
+ let key = ExternalID::new(TaskState::key_prefix(), task_id.to_owned());
self.get_from_db(&key)
}
@@ -169,13 +167,14 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
request: Request<UpdateTaskStatusRequest>,
) -> TeaclaveServiceResponseResult<UpdateTaskStatusResponse> {
let request = request.message;
- let mut task = self.get_task(&request.task_id)?;
-
- // Only TaskStatus::Running is allowed here so far.
- task.invoking_by_executor()?;
+ let ts = self.get_task_state(&request.task_id)?;
+ let task: Task<Run> = ts.try_into()?;
log::info!("UpdateTaskStatus: Task {:?}", task);
- self.put_into_db(&task)?;
+ // Only TaskStatus::Running is implicitly allowed here.
+
+ let ts = TaskState::from(task);
+ self.put_into_db(&ts)?;
Ok(UpdateTaskStatusResponse {})
}
@@ -184,19 +183,22 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
request: Request<UpdateTaskResultRequest>,
) -> TeaclaveServiceResponseResult<UpdateTaskResultResponse> {
let request = request.message;
- let mut task = self.get_task(&request.task_id)?;
+ let ts = self.get_task_state(&request.task_id)?;
+ let mut task: Task<Finish> = ts.try_into()?;
if let TaskResult::Ok(outputs) = &request.task_result {
for (key, auth_tag) in outputs.tags_map.iter() {
- let outfile = task.assigned_outputs.update_cmac(key, auth_tag)?;
+ let outfile = task.update_output_cmac(key, auth_tag)?;
self.put_into_db(outfile)?;
}
};
// Updating task result means we have finished execution
- task.finish(request.task_result)?;
+ task.update_result(request.task_result)?;
+ log::info!("UpdateTaskResult: Task {:?}", task);
- self.put_into_db(&task)?;
+ let ts = TaskState::from(task);
+ self.put_into_db(&ts)?;
Ok(UpdateTaskResultResponse {})
}
}
diff --git a/tests/functional/enclave/src/end_to_end/builtin_echo.rs b/tests/functional/enclave/src/end_to_end/builtin_echo.rs
index 5d39ba3..890fec0 100644
--- a/tests/functional/enclave/src/end_to_end/builtin_echo.rs
+++ b/tests/functional/enclave/src/end_to_end/builtin_echo.rs
@@ -51,10 +51,11 @@ pub fn test_echo_task_success() {
let task_id = response.task_id;
// Assign Data To Task
- // This task does not have any input/output files, we can skip the assignment process.
+ // This task does not have any input/output files, we can opt to skip the assignment process.
// Approve Task
- approve_task(&mut client, &task_id).unwrap();
+ // This task is a single user task, we can opt to skip the approvement process.
+ // approve_task(&mut client, &task_id).unwrap();
// Invoke Task
invoke_task(&mut client, &task_id).unwrap();
diff --git a/tests/functional/enclave/src/execution_service.rs b/tests/functional/enclave/src/execution_service.rs
index 2412197..19dfa45 100644
--- a/tests/functional/enclave/src/execution_service.rs
+++ b/tests/functional/enclave/src/execution_service.rs
@@ -26,7 +26,7 @@ use uuid::Uuid;
#[test_case]
fn test_execute_function() {
let task_id = Uuid::new_v4();
- let task = Task {
+ let ts = TaskState {
task_id,
status: TaskStatus::Staged,
..Default::default()
@@ -49,14 +49,14 @@ fn test_execute_function() {
staged_task.to_vec().unwrap(),
);
let _enqueue_response = storage_client.enqueue(enqueue_request).unwrap();
- let put_request = PutRequest::new(task.key().as_slice(), task.to_vec().unwrap().as_slice());
+ let put_request = PutRequest::new(ts.key().as_slice(), ts.to_vec().unwrap().as_slice());
let _put_response = storage_client.put(put_request).unwrap();
std::thread::sleep(std::time::Duration::from_secs(5));
- let get_request = GetRequest::new(task.key().as_slice());
+ let get_request = GetRequest::new(ts.key().as_slice());
let get_response = storage_client.get(get_request).unwrap();
- let updated_task = Task::from_slice(get_response.value.as_slice()).unwrap();
+ let updated_task = TaskState::from_slice(get_response.value.as_slice()).unwrap();
assert_eq!(
updated_task.result.unwrap().return_value,
b"Hello, Teaclave Tests!"
diff --git a/tests/functional/enclave/src/scheduler_service.rs b/tests/functional/enclave/src/scheduler_service.rs
index 226f273..70d56b4 100644
--- a/tests/functional/enclave/src/scheduler_service.rs
+++ b/tests/functional/enclave/src/scheduler_service.rs
@@ -51,12 +51,6 @@ fn test_pull_task() {
#[test_case]
fn test_update_task_status_result() {
let task_id = Uuid::new_v4();
- let task = Task {
- task_id,
- status: TaskStatus::Staged,
- ..Default::default()
- };
-
let function_id = Uuid::new_v4();
let staged_task = StagedTask::new()
@@ -72,7 +66,12 @@ fn test_update_task_status_result() {
);
let _enqueue_response = storage_client.enqueue(enqueue_request).unwrap();
- let put_request = PutRequest::new(task.key().as_slice(), task.to_vec().unwrap().as_slice());
+ let ts = TaskState {
+ task_id,
+ status: TaskStatus::Staged,
+ ..Default::default()
+ };
+ let put_request = PutRequest::new(ts.key().as_slice(), ts.to_vec().unwrap().as_slice());
let _put_response = storage_client.put(put_request).unwrap();
let mut client = get_scheduler_client();
diff --git a/types/src/lib.rs b/types/src/lib.rs
index 4b404e6..7045d4a 100644
--- a/types/src/lib.rs
+++ b/types/src/lib.rs
@@ -49,6 +49,8 @@ mod storage;
pub use storage::Storable;
mod task;
pub use task::*;
+mod task_state;
+pub use task_state::*;
mod file_agent;
pub use file_agent::*;
mod macros;
diff --git a/types/src/task.rs b/types/src/task.rs
index eb24b5c..99a6a77 100644
--- a/types/src/task.rs
+++ b/types/src/task.rs
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use crate::FunctionArguments;
-use crate::Storable;
use crate::*;
use anyhow::{anyhow, bail, ensure, Error, Result};
use serde::{Deserialize, Serialize};
@@ -124,7 +122,7 @@ impl IntoIterator for OwnerList {
}
}
-#[derive(Debug, Deserialize, Serialize, std::cmp::PartialEq)]
+#[derive(Clone, Debug, Deserialize, Serialize, std::cmp::PartialEq)]
pub enum TaskStatus {
Created,
DataAssigned,
@@ -140,7 +138,7 @@ impl Default for TaskStatus {
}
}
-#[derive(Debug, Default, Deserialize, Serialize)]
+#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct OutputsTags {
inner: HashMap<String, FileAuthTag>,
}
@@ -197,7 +195,7 @@ impl std::iter::FromIterator<(String, FileAuthTag)> for OutputsTags {
}
}
-#[derive(Debug, Deserialize, Serialize)]
+#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TaskOutputs {
pub return_value: Vec<u8>,
pub tags_map: OutputsTags,
@@ -212,7 +210,7 @@ impl TaskOutputs {
}
}
-#[derive(Debug, Deserialize, Serialize)]
+#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TaskFailure {
pub reason: String,
}
@@ -285,7 +283,7 @@ impl std::convert::TryFrom<String> for ExternalID {
}
}
-#[derive(Debug, Deserialize, Serialize)]
+#[derive(Debug, Clone, Deserialize, Serialize)]
pub enum TaskResult {
NotReady,
Ok(TaskOutputs),
@@ -350,7 +348,7 @@ where
}
}
-#[derive(Debug, Default, Deserialize, Serialize)]
+#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct TaskFileOwners {
inner: HashMap<String, OwnerList>,
}
@@ -496,244 +494,3 @@ impl std::convert::From<TaskFiles<TeaclaveOutputFile>> for FunctionOutputFiles {
files.into_iter().collect()
}
}
-
-const TASK_PREFIX: &str = "task";
-
-#[derive(Debug, Default, Deserialize, Serialize)]
-pub struct Task {
- pub task_id: Uuid,
- pub creator: UserID,
- pub function_id: ExternalID,
- pub function_arguments: FunctionArguments,
- pub executor: Executor,
- pub inputs_ownership: TaskFileOwners,
- pub outputs_ownership: TaskFileOwners,
- pub function_owner: UserID,
- pub participants: UserList,
- pub approved_users: UserList,
- pub assigned_inputs: TaskFiles<TeaclaveInputFile>,
- pub assigned_outputs: TaskFiles<TeaclaveOutputFile>,
- pub result: TaskResult,
- pub status: TaskStatus,
-}
-
-impl Storable for Task {
- fn key_prefix() -> &'static str {
- TASK_PREFIX
- }
-
- fn uuid(&self) -> Uuid {
- self.task_id
- }
-}
-
-impl Task {
- pub fn new(
- requester: UserID,
- req_executor: Executor,
- req_func_args: FunctionArguments,
- req_input_owners: impl Into<TaskFileOwners>,
- req_output_owners: impl Into<TaskFileOwners>,
- function: Function,
- ) -> Result<Self> {
- let req_input_owners = req_input_owners.into();
- let req_output_owners = req_output_owners.into();
-
- // gather all participants
- let input_owners = req_input_owners.all_owners();
- let output_owners = req_output_owners.all_owners();
- let mut participants = UserList::unions(vec![input_owners, output_owners]);
- participants.insert(requester.clone());
- if !function.public {
- participants.insert(function.owner.clone());
- }
-
- //check function compatibility
- let fn_args_spec: HashSet<&String> = function.arguments.iter().collect();
- let req_args: HashSet<&String> = req_func_args.inner().keys().collect();
- ensure!(fn_args_spec == req_args, "function_arguments mismatch");
-
- // check input fkeys
- let inputs_spec: HashSet<&String> = function.inputs.iter().map(|f| &f.name).collect();
- let req_input_fkeys: HashSet<&String> = req_input_owners.keys().collect();
- ensure!(inputs_spec == req_input_fkeys, "input keys mismatch");
-
- // check output fkeys
- let outputs_spec: HashSet<&String> = function.outputs.iter().map(|f| &f.name).collect();
- let req_output_fkeys: HashSet<&String> = req_output_owners.keys().collect();
- ensure!(outputs_spec == req_output_fkeys, "output keys mismatch");
-
- // Skip the assignment if no file is required
- let status = if req_input_owners.is_empty() && req_output_owners.is_empty() {
- TaskStatus::DataAssigned
- } else {
- TaskStatus::Created
- };
-
- let task = Task {
- task_id: Uuid::new_v4(),
- creator: requester,
- executor: req_executor,
- function_id: function.external_id(),
- function_owner: function.owner.clone(),
- function_arguments: req_func_args,
- inputs_ownership: req_input_owners,
- outputs_ownership: req_output_owners,
- participants,
- status,
- ..Default::default()
- };
-
- Ok(task)
- }
-
- pub fn approve(&mut self, requester: &UserID) -> Result<()> {
- ensure!(
- self.status == TaskStatus::DataAssigned,
- "Unexpected task status when approving: {:?}",
- self.status
- );
-
- ensure!(
- self.participants.contains(requester),
- "Unexpected user trying to approve a task: {:?}",
- requester
- );
-
- self.approved_users.insert(requester.clone());
- if self.participants == self.approved_users {
- self.update_status(TaskStatus::Approved);
- }
-
- Ok(())
- }
-
- pub fn invoking_by_executor(&mut self) -> Result<()> {
- ensure!(
- self.status == TaskStatus::Staged,
- "Unexpected task status when invoked: {:?}",
- self.status
- );
- self.status = TaskStatus::Running;
- Ok(())
- }
-
- pub fn finish(&mut self, result: TaskResult) -> Result<()> {
- ensure!(
- self.status == TaskStatus::Running,
- "Unexpected task status when invoked: {:?}",
- self.status
- );
- self.result = result;
- self.status = TaskStatus::Finished;
- Ok(())
- }
-
- pub fn stage_for_running(
- &mut self,
- requester: &UserID,
- function: Function,
- ) -> Result<StagedTask> {
- ensure!(
- &self.creator == requester,
- "Unexpected user trying to invoke a task: {:?}",
- requester
- );
- ensure!(
- self.status == TaskStatus::Approved,
- "Unexpected task status when invoked: {:?}",
- self.status
- );
- let function_arguments = self.function_arguments.clone();
- let staged_task = StagedTask {
- task_id: self.task_id,
- executor: self.executor,
- executor_type: function.executor_type,
- function_id: function.id,
- function_name: function.name,
- function_payload: function.payload,
- function_arguments,
- input_data: self.assigned_inputs.clone().into(),
- output_data: self.assigned_outputs.clone().into(),
- };
-
- self.update_status(TaskStatus::Staged);
- Ok(staged_task)
- }
-
- pub fn assign_input(
- &mut self,
- requester: &UserID,
- fname: &str,
- file: TeaclaveInputFile,
- ) -> Result<()> {
- ensure!(
- self.status == TaskStatus::Created,
- "Unexpected task status during input assignment: {:?}",
- self.status
- );
-
- ensure!(
- file.owner.contains(requester),
- "Assign: requester is not in the owner list. {:?}.",
- file.external_id()
- );
-
- self.inputs_ownership.check(fname, &file.owner)?;
-
- self.assigned_inputs.assign(fname, file)?;
-
- if self.all_data_assigned() {
- self.update_status(TaskStatus::DataAssigned);
- }
- Ok(())
- }
-
- pub fn assign_output(
- &mut self,
- requester: &UserID,
- fname: &str,
- file: TeaclaveOutputFile,
- ) -> Result<()> {
- ensure!(
- self.status == TaskStatus::Created,
- "Unexpected task status during output assignment: {:?}",
- self.status
- );
-
- ensure!(
- file.owner.contains(requester),
- "Assign: requester is not in the owner list. {:?}.",
- file.external_id()
- );
-
- self.outputs_ownership.check(fname, &file.owner)?;
-
- self.assigned_outputs.assign(fname, file)?;
-
- if self.all_data_assigned() {
- self.update_status(TaskStatus::DataAssigned);
- }
- Ok(())
- }
-
- fn update_status(&mut self, status: TaskStatus) {
- self.status = status;
- }
-
- fn all_data_assigned(&self) -> bool {
- let input_args: HashSet<&String> = self.inputs_ownership.keys().collect();
- let assiged_inputs: HashSet<&String> = self.assigned_inputs.keys().collect();
- if input_args != assiged_inputs {
- return false;
- }
-
- let output_args: HashSet<&String> = self.outputs_ownership.keys().collect();
- let assiged_outputs: HashSet<&String> = self.assigned_outputs.keys().collect();
- if output_args != assiged_outputs {
- return false;
- }
-
- true
- }
-}
diff --git a/types/src/task_state.rs b/types/src/task_state.rs
new file mode 100644
index 0000000..76353e8
--- /dev/null
+++ b/types/src/task_state.rs
@@ -0,0 +1,499 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::*;
+use anyhow::{bail, ensure, Error, Result};
+use serde::{Deserialize, Serialize};
+use std::collections::HashSet;
+use std::convert::TryInto;
+use uuid::Uuid;
+
+const TASK_PREFIX: &str = "task";
+
+#[derive(Debug, Clone, Default, Deserialize, Serialize)]
+pub struct TaskState {
+ pub task_id: Uuid,
+ pub creator: UserID,
+ pub function_id: ExternalID,
+ pub function_arguments: FunctionArguments,
+ pub executor: Executor,
+ pub inputs_ownership: TaskFileOwners,
+ pub outputs_ownership: TaskFileOwners,
+ pub function_owner: UserID,
+ pub participants: UserList,
+ pub approved_users: UserList,
+ pub assigned_inputs: TaskFiles<TeaclaveInputFile>,
+ pub assigned_outputs: TaskFiles<TeaclaveOutputFile>,
+ pub result: TaskResult,
+ pub status: TaskStatus,
+}
+
+impl Storable for TaskState {
+ fn key_prefix() -> &'static str {
+ TASK_PREFIX
+ }
+
+ fn uuid(&self) -> Uuid {
+ self.task_id
+ }
+}
+
+impl TaskState {
+ pub fn everyone_approved(&self) -> bool {
+ // Single user task is by default approved by the creator
+ (self.participants.len() == 1) || (self.participants == self.approved_users)
+ }
+
+ pub fn all_data_assigned(&self) -> bool {
+ let input_args: HashSet<&String> = self.inputs_ownership.keys().collect();
+ let assiged_inputs: HashSet<&String> = self.assigned_inputs.keys().collect();
+ if input_args != assiged_inputs {
+ return false;
+ }
+
+ let output_args: HashSet<&String> = self.outputs_ownership.keys().collect();
+ let assiged_outputs: HashSet<&String> = self.assigned_outputs.keys().collect();
+ if output_args != assiged_outputs {
+ return false;
+ }
+
+ true
+ }
+
+ pub fn has_participant(&self, user_id: &UserID) -> bool {
+ self.participants.contains(user_id)
+ }
+
+ pub fn has_creator(&self, user_id: &UserID) -> bool {
+ &self.creator == user_id
+ }
+}
+
+#[derive(Debug, Clone, Default, Deserialize, Serialize)]
+pub struct Task<S: StateTag> {
+ state: TaskState,
+ extra: S,
+}
+
+pub trait StateTag {}
+impl StateTag for Create {}
+impl StateTag for Assign {}
+impl StateTag for Approve {}
+impl StateTag for Stage {}
+impl StateTag for Run {}
+impl StateTag for Finish {}
+impl StateTag for Done {}
+
+impl Task<Create> {
+ pub fn new(
+ requester: UserID,
+ req_executor: Executor,
+ req_func_args: FunctionArguments,
+ req_input_owners: impl Into<TaskFileOwners>,
+ req_output_owners: impl Into<TaskFileOwners>,
+ function: Function,
+ ) -> Result<Self> {
+ let req_input_owners = req_input_owners.into();
+ let req_output_owners = req_output_owners.into();
+
+ // gather all participants
+ let input_owners = req_input_owners.all_owners();
+ let output_owners = req_output_owners.all_owners();
+ let mut participants = UserList::unions(vec![input_owners, output_owners]);
+ participants.insert(requester.clone());
+ if !function.public {
+ participants.insert(function.owner.clone());
+ }
+
+ //check function compatibility
+ let fn_args_spec: HashSet<&String> = function.arguments.iter().collect();
+ let req_args: HashSet<&String> = req_func_args.inner().keys().collect();
+ ensure!(fn_args_spec == req_args, "function_arguments mismatch");
+
+ // check input fkeys
+ let inputs_spec: HashSet<&String> = function.inputs.iter().map(|f| &f.name).collect();
+ let req_input_fkeys: HashSet<&String> = req_input_owners.keys().collect();
+ ensure!(inputs_spec == req_input_fkeys, "input keys mismatch");
+
+ // check output fkeys
+ let outputs_spec: HashSet<&String> = function.outputs.iter().map(|f| &f.name).collect();
+ let req_output_fkeys: HashSet<&String> = req_output_owners.keys().collect();
+ ensure!(outputs_spec == req_output_fkeys, "output keys mismatch");
+
+ let ts = TaskState {
+ task_id: Uuid::new_v4(),
+ creator: requester,
+ executor: req_executor,
+ function_id: function.external_id(),
+ function_owner: function.owner.clone(),
+ function_arguments: req_func_args,
+ inputs_ownership: req_input_owners,
+ outputs_ownership: req_output_owners,
+ participants,
+ ..Default::default()
+ };
+
+ Ok(Task {
+ state: ts,
+ extra: Create,
+ })
+ }
+}
+
+impl Task<Assign> {
+ pub fn new(ts: TaskState) -> Result<Self> {
+ let task = Task::<Assign> {
+ state: ts,
+ extra: Assign,
+ };
+ Ok(task)
+ }
+
+ pub fn assign_input(
+ &mut self,
+ requester: &UserID,
+ fname: &str,
+ file: TeaclaveInputFile,
+ ) -> Result<()> {
+ ensure!(
+ file.owner.contains(requester),
+ "Assign: requester is not in the owner list. {:?}.",
+ file.external_id()
+ );
+
+ self.state.inputs_ownership.check(fname, &file.owner)?;
+ self.state.assigned_inputs.assign(fname, file)?;
+ Ok(())
+ }
+
+ pub fn assign_output(
+ &mut self,
+ requester: &UserID,
+ fname: &str,
+ file: TeaclaveOutputFile,
+ ) -> Result<()> {
+ ensure!(
+ file.owner.contains(requester),
+ "Assign: requester is not in the owner list. {:?}.",
+ file.external_id()
+ );
+
+ self.state.outputs_ownership.check(fname, &file.owner)?;
+ self.state.assigned_outputs.assign(fname, file)?;
+ Ok(())
+ }
+}
+
+impl Task<Approve> {
+ pub fn new(ts: TaskState) -> Result<Self> {
+ let task = Task::<Approve> {
+ state: ts,
+ extra: Approve,
+ };
+ Ok(task)
+ }
+
+ pub fn approve(&mut self, requester: &UserID) -> Result<()> {
+ ensure!(
+ self.state.participants.contains(requester),
+ "Unexpected user trying to approve a task: {:?}",
+ requester
+ );
+
+ self.state.approved_users.insert(requester.clone());
+ Ok(())
+ }
+}
+impl Task<Stage> {
+ pub fn new(ts: TaskState) -> Result<Self> {
+ let task = Task::<Stage> {
+ state: ts,
+ extra: Stage,
+ };
+ Ok(task)
+ }
+
+ pub fn stage_for_running(
+ &mut self,
+ requester: &UserID,
+ function: Function,
+ ) -> Result<StagedTask> {
+ ensure!(
+ self.state.has_creator(&requester),
+ "Requestor is not the task creater"
+ );
+
+ let function_arguments = self.state.function_arguments.clone();
+ let staged_task = StagedTask {
+ task_id: self.state.task_id,
+ executor: self.state.executor,
+ executor_type: function.executor_type,
+ function_id: function.id,
+ function_name: function.name,
+ function_payload: function.payload,
+ function_arguments,
+ input_data: self.state.assigned_inputs.clone().into(),
+ output_data: self.state.assigned_outputs.clone().into(),
+ };
+ Ok(staged_task)
+ }
+}
+
+impl Task<Run> {
+ pub fn new(ts: TaskState) -> Result<Self> {
+ let task = Task::<Run> {
+ state: ts,
+ extra: Run,
+ };
+ Ok(task)
+ }
+}
+
+impl Task<Finish> {
+ pub fn new(ts: TaskState) -> Result<Self> {
+ let task = Task::<Finish> {
+ state: ts,
+ extra: Finish,
+ };
+ Ok(task)
+ }
+
+ pub fn update_output_cmac(
+ &mut self,
+ fname: &str,
+ auth_tag: &FileAuthTag,
+ ) -> Result<&TeaclaveOutputFile> {
+ self.state.assigned_outputs.update_cmac(fname, auth_tag)
+ }
+
+ pub fn update_result(&mut self, result: TaskResult) -> Result<()> {
+ self.state.result = result;
+ Ok(())
+ }
+}
+
+impl Task<Done> {
+ pub fn new(ts: TaskState) -> Result<Self> {
+ let task = Task::<Done> {
+ state: ts,
+ extra: Done,
+ };
+ Ok(task)
+ }
+}
+
+impl std::convert::TryFrom<Task<Assign>> for Task<Approve> {
+ type Error = Error;
+ fn try_from(task: Task<Assign>) -> Result<Task<Approve>> {
+ ensure!(
+ task.state.all_data_assigned(),
+ "Not ready: Assign -> Approve"
+ );
+ Task::<Approve>::new(task.state)
+ }
+}
+
+impl std::convert::TryFrom<Task<Approve>> for Task<Stage> {
+ type Error = Error;
+ fn try_from(task: Task<Approve>) -> Result<Task<Stage>> {
+ ensure!(
+ task.state.everyone_approved(),
+ "Not ready: Apporve -> Stage"
+ );
+ Task::<Stage>::new(task.state)
+ }
+}
+
+impl std::convert::TryFrom<Task<Stage>> for Task<Run> {
+ type Error = Error;
+ fn try_from(task: Task<Stage>) -> Result<Task<Run>> {
+ Task::<Run>::new(task.state)
+ }
+}
+
+impl std::convert::TryFrom<Task<Run>> for Task<Finish> {
+ type Error = Error;
+ fn try_from(task: Task<Run>) -> Result<Task<Finish>> {
+ Task::<Finish>::new(task.state)
+ }
+}
+
+impl std::convert::TryFrom<Task<Finish>> for Task<Done> {
+ type Error = Error;
+ fn try_from(task: Task<Finish>) -> Result<Task<Done>> {
+ Task::<Done>::new(task.state)
+ }
+}
+
+impl std::convert::TryFrom<TaskState> for Task<Assign> {
+ type Error = Error;
+
+ fn try_from(ts: TaskState) -> Result<Self> {
+ let task = match ts.status {
+ TaskStatus::Created => Task::<Assign>::new(ts)?,
+ _ => bail!("Cannot restore to Assign from saved state "),
+ };
+ Ok(task)
+ }
+}
+
+impl std::convert::TryFrom<TaskState> for Task<Approve> {
+ type Error = Error;
+
+ fn try_from(ts: TaskState) -> Result<Self> {
+ let task = match ts.status {
+ TaskStatus::Created => {
+ let task: Task<Assign> = ts.try_into()?;
+ task.try_into()?
+ }
+ TaskStatus::DataAssigned => Task::<Approve>::new(ts)?,
+ _ => bail!("Cannot restore to Approve from saved state"),
+ };
+ Ok(task)
+ }
+}
+
+impl std::convert::TryFrom<TaskState> for Task<Stage> {
+ type Error = Error;
+
+ fn try_from(ts: TaskState) -> Result<Self> {
+ let task = match ts.status {
+ TaskStatus::Created | TaskStatus::DataAssigned => {
+ let task: Task<Approve> = ts.try_into()?;
+ task.try_into()?
+ }
+ TaskStatus::Approved => Task::<Stage>::new(ts)?,
+ _ => bail!("Cannot restore to Stage from saved state"),
+ };
+ Ok(task)
+ }
+}
+
+impl std::convert::TryFrom<TaskState> for Task<Run> {
+ type Error = Error;
+
+ fn try_from(ts: TaskState) -> Result<Self> {
+ let task = match ts.status {
+ TaskStatus::Staged => Task::<Run>::new(ts)?,
+ _ => bail!("Cannot restore to Run from saved state"),
+ };
+ Ok(task)
+ }
+}
+
+impl std::convert::TryFrom<TaskState> for Task<Finish> {
+ type Error = Error;
+
+ fn try_from(ts: TaskState) -> Result<Self> {
+ let task = match ts.status {
+ TaskStatus::Running => Task::<Finish>::new(ts)?,
+ _ => bail!("Cannot restore to Finish from saved state"),
+ };
+ Ok(task)
+ }
+}
+
+impl std::convert::From<Task<Create>> for TaskState {
+ fn from(mut task: Task<Create>) -> TaskState {
+ task.state.status = TaskStatus::Created;
+ task.state
+ }
+}
+
+impl_transit_and_into_task_state!(Assign => Approve);
+impl_transit_and_into_task_state!(Approve => Stage);
+impl_transit_and_into_task_state!(Stage => Run);
+impl_transit_and_into_task_state!(Run => Finish);
+impl_transit_and_into_task_state!(Finish => Done);
+
+#[macro_export]
+macro_rules! impl_transit_and_into_task_state {
+ ( $cur:ty => $next:ty ) => {
+ impl std::convert::From<Task<$cur>> for TaskState {
+ fn from(mut task: Task<$cur>) -> TaskState {
+ let nt: Result<Task<$next>> = task.clone().try_into();
+ match nt {
+ Ok(mut t) => {
+ t.state.status = t.extra.into();
+ t.state
+ }
+ Err(_) => {
+ task.state.status = task.extra.into();
+ task.state
+ }
+ }
+ }
+ }
+ };
+}
+
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Create;
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Assign;
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Approve;
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Stage;
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Run;
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Finish;
+#[derive(Debug, Default, Clone, Deserialize, Serialize)]
+pub struct Done;
+
+impl std::convert::From<Create> for TaskStatus {
+ fn from(_tag: Create) -> TaskStatus {
+ TaskStatus::Created
+ }
+}
+
+impl std::convert::From<Assign> for TaskStatus {
+ fn from(_tag: Assign) -> TaskStatus {
+ TaskStatus::Created
+ }
+}
+
+impl std::convert::From<Approve> for TaskStatus {
+ fn from(_tag: Approve) -> TaskStatus {
+ TaskStatus::DataAssigned
+ }
+}
+
+impl std::convert::From<Stage> for TaskStatus {
+ fn from(_tag: Stage) -> TaskStatus {
+ TaskStatus::Approved
+ }
+}
+
+impl std::convert::From<Run> for TaskStatus {
+ fn from(_tag: Run) -> TaskStatus {
+ TaskStatus::Staged
+ }
+}
+
+impl std::convert::From<Finish> for TaskStatus {
+ fn from(_tag: Finish) -> TaskStatus {
+ TaskStatus::Running
+ }
+}
+
+impl std::convert::From<Done> for TaskStatus {
+ fn from(_tag: Done) -> TaskStatus {
+ TaskStatus::Finished
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@teaclave.apache.org
For additional commands, e-mail: commits-help@teaclave.apache.org