You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@teaclave.apache.org by ms...@apache.org on 2020/04/30 02:53:38 UTC
[incubator-teaclave] branch master updated: [task] Restrict task
status transition (#280)
This is an automated email from the ASF dual-hosted git repository.
mssun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git
The following commit(s) were added to refs/heads/master by this push:
new 52d55e1 [task] Restrict task status transition (#280)
52d55e1 is described below
commit 52d55e15c84a965c7446ccfc7bcd2a031023ca44
Author: Zhaofeng Chen <zf...@apache.org>
AuthorDate: Wed Apr 29 19:53:30 2020 -0700
[task] Restrict task status transition (#280)
---
services/execution/enclave/src/service.rs | 14 +-
services/management/enclave/src/service.rs | 72 ++------
services/proto/src/proto/teaclave_common.proto | 5 +-
.../src/proto/teaclave_scheduler_service.proto | 1 -
services/proto/src/teaclave_common.rs | 2 -
services/proto/src/teaclave_scheduler_service.rs | 6 +-
services/scheduler/enclave/src/service.rs | 11 +-
.../enclave/src/end_to_end/native_echo.rs | 6 +-
tests/functional/enclave/src/execution_service.rs | 1 +
tests/functional/enclave/src/frontend_service.rs | 2 +-
tests/functional/enclave/src/management_service.rs | 2 +-
tests/functional/enclave/src/scheduler_service.rs | 4 +-
types/src/staged_function.rs | 2 +-
types/src/task.rs | 191 +++++++++++++++------
14 files changed, 180 insertions(+), 139 deletions(-)
diff --git a/services/execution/enclave/src/service.rs b/services/execution/enclave/src/service.rs
index 3d46fd8..1b73311 100644
--- a/services/execution/enclave/src/service.rs
+++ b/services/execution/enclave/src/service.rs
@@ -97,17 +97,16 @@ impl TeaclaveExecutionService {
}
fn invoke_task(&mut self, task: &StagedTask) -> Result<TaskOutputs> {
+ self.update_task_status(&task.task_id, TaskStatus::Running)?;
+
let file_mgr = TaskFileManager::new(
WORKER_BASE_DIR,
&task.task_id,
&task.input_data,
&task.output_data,
)?;
-
- self.update_task_status(&task.task_id, TaskStatus::DataPreparing, String::new())?;
let invocation = prepare_task(&task, &file_mgr)?;
- self.update_task_status(&task.task_id, TaskStatus::Running, String::new())?;
log::info!("Invoke function: {:?}", invocation);
let worker = Worker::default();
let summary = worker.invoke_function(invocation)?;
@@ -134,13 +133,8 @@ impl TeaclaveExecutionService {
Ok(())
}
- fn update_task_status(
- &mut self,
- task_id: &Uuid,
- task_status: TaskStatus,
- status_info: String,
- ) -> Result<()> {
- let request = UpdateTaskStatusRequest::new(task_id.to_owned(), task_status, status_info);
+ fn update_task_status(&mut self, task_id: &Uuid, task_status: TaskStatus) -> Result<()> {
+ let request = UpdateTaskStatusRequest::new(task_id.to_owned(), task_status);
let _response = self
.scheduler_client
.clone()
diff --git a/services/management/enclave/src/service.rs b/services/management/enclave/src/service.rs
index 17f4c18..0ba09b4 100644
--- a/services/management/enclave/src/service.rs
+++ b/services/management/enclave/src/service.rs
@@ -266,19 +266,15 @@ impl TeaclaveManagement for TeaclaveManagementService {
.read_from_db(&request.function_id)
.map_err(|_| ServiceError::PermissionDenied)?;
- let mut task = Task::new(
+ let task = Task::new(
user_id,
request.executor,
- &function,
request.function_arguments,
request.input_owners_map,
request.output_owners_map,
- );
-
- task.check_function_compatibility(&function)
- .map_err(|_| ServiceError::BadTask)?;
-
- task.update_status(TaskStatus::Created);
+ function,
+ )
+ .map_err(|_| ServiceError::BadTask)?;
log::info!("CreateTask: {:?}", task);
@@ -351,11 +347,6 @@ impl TeaclaveManagement for TeaclaveManagementService {
ServiceError::PermissionDenied
);
- ensure!(
- task.status == TaskStatus::Created,
- ServiceError::PermissionDenied
- );
-
for (data_name, data_id) in request.input_map.iter() {
let file: TeaclaveInputFile = self
.read_from_db(&data_id)
@@ -372,10 +363,6 @@ impl TeaclaveManagement for TeaclaveManagementService {
.map_err(|_| ServiceError::PermissionDenied)?;
}
- if task.all_data_assigned() {
- task.update_status(TaskStatus::DataAssigned);
- }
-
log::info!("AssignData: {:?}", task);
self.write_to_db(&task)
@@ -398,20 +385,8 @@ impl TeaclaveManagement for TeaclaveManagementService {
.read_from_db(&request.task_id)
.map_err(|_| ServiceError::PermissionDenied)?;
- ensure!(
- task.participants.contains(&user_id),
- ServiceError::PermissionDenied
- );
-
- ensure!(
- task.status == TaskStatus::DataAssigned,
- ServiceError::PermissionDenied,
- );
-
- task.approved_users.insert(user_id);
- if task.all_approved() {
- task.update_status(TaskStatus::Approved);
- }
+ task.approve(&user_id)
+ .map_err(|_| ServiceError::PermissionDenied)?;
log::info!("ApproveTask: approve:{:?}", task);
@@ -437,6 +412,7 @@ impl TeaclaveManagement for TeaclaveManagementService {
log::info!("InvokeTask: get task: {:?}", task);
+ // Early validation
ensure!(task.creator == user_id, ServiceError::PermissionDenied);
ensure!(
task.status == TaskStatus::Approved,
@@ -450,7 +426,6 @@ impl TeaclaveManagement for TeaclaveManagementService {
log::info!("InvokeTask: get function: {:?}", function);
let mut input_map: HashMap<String, FunctionInputFile> = HashMap::new();
- let mut output_map: HashMap<String, FunctionOutputFile> = HashMap::new();
for (data_name, data_id) in task.input_map.iter() {
let input_file: TeaclaveInputFile = self
.read_from_db(&data_id)
@@ -459,6 +434,7 @@ impl TeaclaveManagement for TeaclaveManagementService {
input_map.insert(data_name.to_string(), input_data);
}
+ let mut output_map: HashMap<String, FunctionOutputFile> = HashMap::new();
for (data_name, data_id) in task.output_map.iter() {
let output_file: TeaclaveOutputFile = self
.read_from_db(&data_id)
@@ -468,23 +444,11 @@ impl TeaclaveManagement for TeaclaveManagementService {
output_map.insert(data_name.to_string(), output_data);
}
- let function_arguments = task.function_arguments.clone();
+ let staged_task = task.stage_for_running(&user_id, function, input_map, output_map)?;
- let staged_task = StagedTask::new()
- .task_id(task.task_id)
- .executor(task.executor)
- .executor_type(function.executor_type)
- .function_id(function.id)
- .function_payload(function.payload)
- .function_arguments(function_arguments)
- .input_data(input_map)
- .output_data(output_map);
+ log::info!("InvokeTask: staged task: {:?}", staged_task);
self.enqueue_to_db(StagedTask::get_queue_key().as_bytes(), &staged_task)?;
- task.status = TaskStatus::Running;
-
- log::info!("InvokeTask: staged task: {:?}", task);
-
self.write_to_db(&task)
.map_err(|_| ServiceError::StorageError)?;
Ok(InvokeTaskResponse)
@@ -519,7 +483,8 @@ impl TeaclaveManagementService {
Ok(service)
}
- pub fn create_fusion_data(&self, owner: impl Into<OwnerList>) -> Result<TeaclaveOutputFile> {
+
+ pub fn create_fusion_data(&self, owners: impl Into<OwnerList>) -> Result<TeaclaveOutputFile> {
let uuid = Uuid::new_v4();
let url = format!(
"file://{}/{}.fusion",
@@ -529,13 +494,7 @@ impl TeaclaveManagementService {
let url = Url::parse(&url).map_err(|_| anyhow!("invalid url"))?;
let crypto_info = FileCrypto::default();
- Ok(TeaclaveOutputFile {
- url,
- cmac: None,
- crypto_info,
- owner: owner.into(),
- uuid,
- })
+ Ok(TeaclaveOutputFile::new(url, crypto_info, owners))
}
fn get_request_user_id(
@@ -694,11 +653,12 @@ pub mod tests {
let task = Task::new(
UserID::from("mock_user"),
Executor::MesaPy,
- &function,
function_arguments,
HashMap::new(),
HashMap::new(),
- );
+ function,
+ )
+ .unwrap();
assert!(Task::match_prefix(&task.key_string()));
let value = task.to_vec().unwrap();
diff --git a/services/proto/src/proto/teaclave_common.proto b/services/proto/src/proto/teaclave_common.proto
index 4969227..155deeb 100644
--- a/services/proto/src/proto/teaclave_common.proto
+++ b/services/proto/src/proto/teaclave_common.proto
@@ -26,9 +26,8 @@ enum TaskStatus {
DataAssigned = 1;
Approved = 2;
Staged = 3;
- DataPreparing = 4;
- Running = 5;
- Finished = 6;
+ Running = 4;
+ Finished = 10;
}
message TaskResult {
diff --git a/services/proto/src/proto/teaclave_scheduler_service.proto b/services/proto/src/proto/teaclave_scheduler_service.proto
index 8a82565..31001dd 100644
--- a/services/proto/src/proto/teaclave_scheduler_service.proto
+++ b/services/proto/src/proto/teaclave_scheduler_service.proto
@@ -16,7 +16,6 @@ message PullTaskResponse {
message UpdateTaskStatusRequest {
string task_id = 1;
teaclave_common_proto.TaskStatus task_status = 2;
- string status_info = 3;
}
message UpdateTaskStatusResponse {}
diff --git a/services/proto/src/teaclave_common.rs b/services/proto/src/teaclave_common.rs
index 9675dd0..5af0c61 100644
--- a/services/proto/src/teaclave_common.rs
+++ b/services/proto/src/teaclave_common.rs
@@ -109,7 +109,6 @@ pub fn i32_to_task_status(status: i32) -> Result<TaskStatus> {
Some(proto::TaskStatus::DataAssigned) => TaskStatus::DataAssigned,
Some(proto::TaskStatus::Approved) => TaskStatus::Approved,
Some(proto::TaskStatus::Staged) => TaskStatus::Staged,
- Some(proto::TaskStatus::DataPreparing) => TaskStatus::DataPreparing,
Some(proto::TaskStatus::Running) => TaskStatus::Running,
Some(proto::TaskStatus::Finished) => TaskStatus::Finished,
None => bail!("invalid task status"),
@@ -123,7 +122,6 @@ pub fn i32_from_task_status(status: TaskStatus) -> i32 {
TaskStatus::DataAssigned => proto::TaskStatus::DataAssigned as i32,
TaskStatus::Approved => proto::TaskStatus::Approved as i32,
TaskStatus::Staged => proto::TaskStatus::Staged as i32,
- TaskStatus::DataPreparing => proto::TaskStatus::DataPreparing as i32,
TaskStatus::Running => proto::TaskStatus::Running as i32,
TaskStatus::Finished => proto::TaskStatus::Finished as i32,
}
diff --git a/services/proto/src/teaclave_scheduler_service.rs b/services/proto/src/teaclave_scheduler_service.rs
index 3993940..585a9c4 100644
--- a/services/proto/src/teaclave_scheduler_service.rs
+++ b/services/proto/src/teaclave_scheduler_service.rs
@@ -84,15 +84,13 @@ pub struct UpdateTaskResultResponse {}
pub struct UpdateTaskStatusRequest {
pub task_id: Uuid,
pub task_status: TaskStatus,
- pub status_info: String,
}
impl UpdateTaskStatusRequest {
- pub fn new(task_id: Uuid, task_status: TaskStatus, status_info: String) -> Self {
+ pub fn new(task_id: Uuid, task_status: TaskStatus) -> Self {
Self {
task_id,
task_status,
- status_info,
}
}
}
@@ -211,7 +209,6 @@ impl std::convert::TryFrom<proto::UpdateTaskStatusRequest> for UpdateTaskStatusR
let ret = Self {
task_id: Uuid::parse_str(&proto.task_id)?,
task_status,
- status_info: proto.status_info,
};
Ok(ret)
}
@@ -223,7 +220,6 @@ impl std::convert::From<UpdateTaskStatusRequest> for proto::UpdateTaskStatusRequ
proto::UpdateTaskStatusRequest {
task_id: req.task_id.to_string(),
task_status,
- status_info: req.status_info,
}
}
}
diff --git a/services/scheduler/enclave/src/service.rs b/services/scheduler/enclave/src/service.rs
index bad3535..fd8a438 100644
--- a/services/scheduler/enclave/src/service.rs
+++ b/services/scheduler/enclave/src/service.rs
@@ -190,7 +190,10 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
) -> TeaclaveServiceResponseResult<UpdateTaskStatusResponse> {
let request = request.message;
let mut task = self.get_task(&request.task_id)?;
- task.status = request.task_status;
+
+ // Only TaskStatus::Running is allowed here so far.
+ task.invoking_by_executor()?;
+
log::info!("UpdateTaskStatus: Task {:?}", task);
self.put_into_db(&task)?;
Ok(UpdateTaskStatusResponse {})
@@ -207,12 +210,10 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
self.update_outputs_cmac(&task.output_map, &outputs.tags_map)?;
};
- task.result = request.task_result;
-
// Updating task result means we have finished execution
- task.status = TaskStatus::Finished;
- self.put_into_db(&task)?;
+ task.finish(request.task_result)?;
+ self.put_into_db(&task)?;
Ok(UpdateTaskResultResponse {})
}
}
diff --git a/tests/functional/enclave/src/end_to_end/native_echo.rs b/tests/functional/enclave/src/end_to_end/native_echo.rs
index 8864559..bd67bc0 100644
--- a/tests/functional/enclave/src/end_to_end/native_echo.rs
+++ b/tests/functional/enclave/src/end_to_end/native_echo.rs
@@ -50,12 +50,10 @@ pub fn test_echo_task_success() {
log::info!("Create task: {:?}", response);
- // Assign Data To Task
let task_id = response.task_id;
- let request = AssignDataRequest::new(task_id.clone(), hashmap!(), hashmap!());
- let response = client.assign_data(request).unwrap();
- log::info!("Assign data: {:?}", response);
+ // Assign Data To Task
+ // This task does not have any input/output files, we can skip the assignment process.
// Approve Task
approve_task(&mut client, &task_id).unwrap();
diff --git a/tests/functional/enclave/src/execution_service.rs b/tests/functional/enclave/src/execution_service.rs
index 5ca9f64..9590daa 100644
--- a/tests/functional/enclave/src/execution_service.rs
+++ b/tests/functional/enclave/src/execution_service.rs
@@ -28,6 +28,7 @@ fn test_execute_function() {
let task_id = Uuid::new_v4();
let task = Task {
task_id,
+ status: TaskStatus::Staged,
..Default::default()
};
diff --git a/tests/functional/enclave/src/frontend_service.rs b/tests/functional/enclave/src/frontend_service.rs
index 594c269..1508c6d 100644
--- a/tests/functional/enclave/src/frontend_service.rs
+++ b/tests/functional/enclave/src/frontend_service.rs
@@ -306,7 +306,7 @@ fn test_invoke_task() {
let request = GetTaskRequest::new(task_id);
let response = client.get_task(request).unwrap();
- assert_eq!(response.status, TaskStatus::Running);
+ assert_eq!(response.status, TaskStatus::Staged);
let request = PullTaskRequest {};
let mut scheduler_client = get_scheduler_client();
diff --git a/tests/functional/enclave/src/management_service.rs b/tests/functional/enclave/src/management_service.rs
index 62b83e1..acb0482 100644
--- a/tests/functional/enclave/src/management_service.rs
+++ b/tests/functional/enclave/src/management_service.rs
@@ -581,7 +581,7 @@ fn test_invoke_task() {
let request = GetTaskRequest::new(task_id);
let response = client2.get_task(request).unwrap();
- assert_eq!(response.status, TaskStatus::Running);
+ assert_eq!(response.status, TaskStatus::Staged);
let request = PullTaskRequest {};
let mut scheduler_client = get_scheduler_client();
diff --git a/tests/functional/enclave/src/scheduler_service.rs b/tests/functional/enclave/src/scheduler_service.rs
index be052ae..24c4f7f 100644
--- a/tests/functional/enclave/src/scheduler_service.rs
+++ b/tests/functional/enclave/src/scheduler_service.rs
@@ -52,7 +52,7 @@ fn test_update_task_status_result() {
let task_id = Uuid::new_v4();
let task = Task {
task_id,
- status: TaskStatus::Running,
+ status: TaskStatus::Staged,
..Default::default()
};
@@ -79,7 +79,7 @@ fn test_update_task_status_result() {
log::debug!("response: {:?}", response);
let task_id = response.staged_task.task_id;
- let request = UpdateTaskStatusRequest::new(task_id, TaskStatus::Running, String::new());
+ let request = UpdateTaskStatusRequest::new(task_id, TaskStatus::Running);
let response = client.update_task_status(request);
assert!(response.is_ok());
diff --git a/types/src/staged_function.rs b/types/src/staged_function.rs
index 8483b7a..03a46b9 100644
--- a/types/src/staged_function.rs
+++ b/types/src/staged_function.rs
@@ -98,7 +98,7 @@ impl std::fmt::Display for ArgumentValue {
#[derive(Clone, Serialize, Deserialize, Debug, Default)]
pub struct FunctionArguments {
#[serde(flatten)]
- pub inner: HashMap<String, ArgumentValue>,
+ inner: HashMap<String, ArgumentValue>,
}
impl<S: core::default::Default + std::hash::BuildHasher> From<FunctionArguments>
diff --git a/types/src/task.rs b/types/src/task.rs
index f23a24d..c6f3090 100644
--- a/types/src/task.rs
+++ b/types/src/task.rs
@@ -130,7 +130,6 @@ pub enum TaskStatus {
DataAssigned,
Approved,
Staged,
- DataPreparing,
Running,
Finished,
}
@@ -384,79 +383,133 @@ impl Storable for Task {
impl Task {
pub fn new(
requester: UserID,
- executor: Executor,
- function: &Function,
- function_arguments: FunctionArguments,
- input_owners_map: HashMap<String, OwnerList>,
- output_owners_map: HashMap<String, OwnerList>,
- ) -> Self {
- let input_owners = UserList::unions(input_owners_map.values().cloned());
- let output_owners = UserList::unions(output_owners_map.values().cloned());
+ req_executor: Executor,
+ req_func_args: FunctionArguments,
+ req_input_owners: HashMap<String, OwnerList>,
+ req_output_owners: HashMap<String, OwnerList>,
+ function: Function,
+ ) -> Result<Self> {
+ // gather all participants
+ let input_owners = UserList::unions(req_input_owners.values().cloned());
+ let output_owners = UserList::unions(req_output_owners.values().cloned());
let mut participants = UserList::unions(vec![input_owners, output_owners]);
participants.insert(requester.clone());
if !function.public {
participants.insert(function.owner.clone());
}
- Task {
+ //check function compatibility
+ let fn_args_spec: HashSet<&String> = function.arguments.iter().collect();
+ let req_args: HashSet<&String> = req_func_args.inner().keys().collect();
+ ensure!(fn_args_spec == req_args, "function_arguments mismatch");
+
+ // check input fkeys
+ let inputs_spec: HashSet<&String> = function.inputs.iter().map(|f| &f.name).collect();
+ let req_input_fkeys: HashSet<&String> = req_input_owners.keys().collect();
+ ensure!(inputs_spec == req_input_fkeys, "input keys mismatch");
+
+ // check output fkeys
+ let outputs_spec: HashSet<&String> = function.outputs.iter().map(|f| &f.name).collect();
+ let req_output_fkeys: HashSet<&String> = req_output_owners.keys().collect();
+ ensure!(outputs_spec == req_output_fkeys, "output keys mismatch");
+
+ // Skip the assignment if no file is required
+ let status = if req_input_owners.is_empty() && req_output_owners.is_empty() {
+ TaskStatus::DataAssigned
+ } else {
+ TaskStatus::Created
+ };
+
+ let task = Task {
task_id: Uuid::new_v4(),
creator: requester,
- executor,
+ executor: req_executor,
function_id: function.external_id(),
function_owner: function.owner.clone(),
- function_arguments,
- input_owners_map,
- output_owners_map,
+ function_arguments: req_func_args,
+ input_owners_map: req_input_owners,
+ output_owners_map: req_output_owners,
participants,
+ status,
..Default::default()
- }
- }
+ };
- pub fn update_status(&mut self, status: TaskStatus) {
- self.status = status;
+ Ok(task)
}
- pub fn check_function_compatibility(&self, function: &Function) -> Result<()> {
- // check arguments
- let function_arguments: HashSet<&String> = function.arguments.iter().collect();
- let provide_args: HashSet<&String> = self.function_arguments.inner().keys().collect();
+ pub fn approve(&mut self, requester: &UserID) -> Result<()> {
ensure!(
- function_arguments == provide_args,
- "function_arguments mismatch"
+ self.status == TaskStatus::DataAssigned,
+ "Unexpected task status when approving: {:?}",
+ self.status
);
- // check input
- let input_args: HashSet<String> = function.inputs.iter().map(|f| f.name.clone()).collect();
- let provide_args: HashSet<String> = self.input_owners_map.keys().cloned().collect();
- ensure!(input_args == provide_args, "input keys mismatch");
+ ensure!(
+ self.participants.contains(requester),
+ "Unexpected user trying to approve a task: {:?}",
+ requester
+ );
- // check output
- let output_args: HashSet<String> =
- function.outputs.iter().map(|f| f.name.clone()).collect();
- let provide_args: HashSet<String> = self.output_owners_map.keys().cloned().collect();
- ensure!(output_args == provide_args, "output keys mismatch");
+ self.approved_users.insert(requester.clone());
+ if self.participants == self.approved_users {
+ self.update_status(TaskStatus::Approved);
+ }
Ok(())
}
- pub fn all_data_assigned(&self) -> bool {
- let input_args: HashSet<String> = self.input_owners_map.keys().cloned().collect();
- let assiged_inputs: HashSet<String> = self.input_map.keys().cloned().collect();
- if input_args != assiged_inputs {
- return false;
- }
-
- let output_args: HashSet<String> = self.output_owners_map.keys().cloned().collect();
- let assiged_outputs: HashSet<String> = self.output_map.keys().cloned().collect();
- if output_args != assiged_outputs {
- return false;
- }
+ pub fn invoking_by_executor(&mut self) -> Result<()> {
+ ensure!(
+ self.status == TaskStatus::Staged,
+ "Unexpected task status when invoked: {:?}",
+ self.status
+ );
+ self.status = TaskStatus::Running;
+ Ok(())
+ }
- true
+ pub fn finish(&mut self, result: TaskResult) -> Result<()> {
+ ensure!(
+ self.status == TaskStatus::Running,
+ "Unexpected task status when invoked: {:?}",
+ self.status
+ );
+ self.result = result;
+ self.status = TaskStatus::Finished;
+ Ok(())
}
- pub fn all_approved(&self) -> bool {
- self.participants == self.approved_users
+ pub fn stage_for_running(
+ &mut self,
+ requester: &UserID,
+ function: Function,
+ input_map: HashMap<String, FunctionInputFile>,
+ output_map: HashMap<String, FunctionOutputFile>,
+ ) -> Result<StagedTask> {
+ ensure!(
+ &self.creator == requester,
+ "Unexpected user trying to invoke a task: {:?}",
+ requester
+ );
+ ensure!(
+ self.status == TaskStatus::Approved,
+ "Unexpected task status when invoked: {:?}",
+ self.status
+ );
+ let function_arguments = self.function_arguments.clone();
+ let staged_task = StagedTask {
+ task_id: self.task_id,
+ executor: self.executor,
+ executor_type: function.executor_type,
+ function_id: function.id,
+ function_payload: function.payload,
+ function_arguments,
+ input_data: input_map.into(),
+ output_data: output_map.into(),
+ };
+
+ self.update_status(TaskStatus::Staged);
+ Ok(staged_task)
}
pub fn assign_input(
@@ -466,6 +519,12 @@ impl Task {
file: &TeaclaveInputFile,
) -> Result<()> {
ensure!(
+ self.status == TaskStatus::Created,
+ "Unexpected task status during input assignment: {:?}",
+ self.status
+ );
+
+ ensure!(
file.owner.contains(requester),
"Assign: requester is not in the owner list. {:?}.",
file.external_id()
@@ -490,7 +549,12 @@ impl Task {
"Assign: file already assigned. {:?}",
fname
);
+
self.input_map.insert(fname.to_owned(), file.external_id());
+
+ if self.all_data_assigned() {
+ self.update_status(TaskStatus::DataAssigned);
+ }
Ok(())
}
@@ -501,6 +565,12 @@ impl Task {
file: &TeaclaveOutputFile,
) -> Result<()> {
ensure!(
+ self.status == TaskStatus::Created,
+ "Unexpected task status during output assignment: {:?}",
+ self.status
+ );
+
+ ensure!(
file.owner.contains(requester),
"Assign: requester is not in the owner list. {:?}.",
file.external_id()
@@ -525,7 +595,32 @@ impl Task {
"Assign: file already assigned. {:?}",
fname
);
+
self.output_map.insert(fname.to_owned(), file.external_id());
+
+ if self.all_data_assigned() {
+ self.update_status(TaskStatus::DataAssigned);
+ }
Ok(())
}
+
+ fn update_status(&mut self, status: TaskStatus) {
+ self.status = status;
+ }
+
+ fn all_data_assigned(&self) -> bool {
+ let input_args: HashSet<&String> = self.input_owners_map.keys().collect();
+ let assiged_inputs: HashSet<&String> = self.input_map.keys().collect();
+ if input_args != assiged_inputs {
+ return false;
+ }
+
+ let output_args: HashSet<&String> = self.output_owners_map.keys().collect();
+ let assiged_outputs: HashSet<&String> = self.output_map.keys().collect();
+ if output_args != assiged_outputs {
+ return false;
+ }
+
+ true
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@teaclave.apache.org
For additional commands, e-mail: commits-help@teaclave.apache.org