You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@teaclave.apache.org by ms...@apache.org on 2020/04/24 06:12:09 UTC
[incubator-teaclave] branch master updated: [fusion] Support fusion
data used as input with example (#274)
This is an automated email from the ASF dual-hosted git repository.
mssun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git
The following commit(s) were added to refs/heads/master by this push:
new 00e898e [fusion] Support fusion data used as input with example (#274)
00e898e is described below
commit 00e898e1355d785bd456d2cf2ad05e208a4d69e6
Author: Zhaofeng Chen <zf...@apache.org>
AuthorDate: Thu Apr 23 23:11:59 2020 -0700
[fusion] Support fusion data used as input with example (#274)
---
services/execution/enclave/src/service.rs | 7 +-
.../execution/enclave/src/task_file_manager.rs | 402 ++++++++++++---------
services/management/enclave/src/service.rs | 2 +-
services/proto/src/proto/teaclave_common.proto | 2 +-
services/proto/src/teaclave_common.rs | 4 +-
services/scheduler/enclave/Cargo.toml | 1 +
services/scheduler/enclave/src/service.rs | 56 ++-
.../enclave/src/end_to_end/mesapy_data_fusion.rs | 104 +++++-
.../enclave/src/end_to_end/mesapy_echo.rs | 4 +-
tests/functional/enclave/src/end_to_end/mod.rs | 17 +-
.../enclave/src/end_to_end/native_echo.rs | 4 +-
.../enclave/src/end_to_end/native_gbdt_training.rs | 4 +-
tests/functional/enclave/src/scheduler_service.rs | 4 +-
types/src/file.rs | 6 +
types/src/staged_task.rs | 20 +-
types/src/task.rs | 27 +-
types/src/worker.rs | 2 +-
17 files changed, 452 insertions(+), 214 deletions(-)
diff --git a/services/execution/enclave/src/service.rs b/services/execution/enclave/src/service.rs
index ec89165..3d46fd8 100644
--- a/services/execution/enclave/src/service.rs
+++ b/services/execution/enclave/src/service.rs
@@ -153,8 +153,7 @@ impl TeaclaveExecutionService {
}
fn prepare_task(task: &StagedTask, file_mgr: &TaskFileManager) -> Result<StagedFunction> {
- file_mgr.download_inputs()?;
- let input_files = file_mgr.convert_downloaded_inputs()?;
+ let input_files = file_mgr.prepare_staged_inputs()?;
let output_files = file_mgr.prepare_staged_outputs()?;
let function_payload = String::from_utf8_lossy(&task.function_payload).to_string();
@@ -170,9 +169,7 @@ fn prepare_task(task: &StagedTask, file_mgr: &TaskFileManager) -> Result<StagedF
}
fn finalize_task(file_mgr: &TaskFileManager) -> Result<HashMap<String, FileAuthTag>> {
- let outputs_tag = file_mgr.convert_staged_outputs()?;
- file_mgr.upload_outputs()?;
- Ok(outputs_tag)
+ file_mgr.upload_outputs()
}
#[cfg(feature = "enclave_unit_test")]
diff --git a/services/execution/enclave/src/task_file_manager.rs b/services/execution/enclave/src/task_file_manager.rs
index 298808f..e3e066a 100644
--- a/services/execution/enclave/src/task_file_manager.rs
+++ b/services/execution/enclave/src/task_file_manager.rs
@@ -27,12 +27,31 @@ use teaclave_types::*;
use url::Url;
use uuid::Uuid;
-#[derive(Default)]
pub(crate) struct TaskFileManager {
- cwd: PathBuf,
- inputs: FunctionInputFiles,
- outputs: FunctionOutputFiles,
- staged_outputs: StagedFiles,
+ inter_inputs: InterInputs,
+ inter_outputs: InterOutputs,
+}
+
+struct InterInputs {
+ inner: Vec<InterInput>,
+}
+
+struct InterOutputs {
+ inner: Vec<InterOutput>,
+}
+
+pub(self) struct InterInput {
+ pub(self) funiq_key: String,
+ pub(self) file: FunctionInputFile,
+ pub(self) download_path: PathBuf,
+ pub(self) staged_path: PathBuf,
+}
+
+pub(self) struct InterOutput {
+ pub(self) funiq_key: String,
+ pub(self) file: FunctionOutputFile,
+ pub(self) upload_path: PathBuf,
+ pub(self) staged_info: StagedFileInfo,
}
impl TaskFileManager {
@@ -43,210 +62,258 @@ impl TaskFileManager {
outputs: &FunctionOutputFiles,
) -> Result<Self> {
let cwd = Path::new(base).join(task_id.to_string());
- if !cwd.exists() {
- std::untrusted::fs::create_dir_all(&cwd)?;
- }
+ let inputs_base = cwd.clone().join("inputs");
+ let outputs_base = cwd.join("outputs");
- let tfmgr = TaskFileManager {
- cwd,
- inputs: inputs.clone(),
- outputs: outputs.clone(),
- ..Default::default()
- }
- .create_staged_outputs();
+ let inter_inputs = InterInputs::new(&inputs_base, inputs.clone())?;
+ let inter_outputs = InterOutputs::new(&outputs_base, outputs.clone())?;
- if !tfmgr.inputs_base_dir().exists() {
- std::untrusted::fs::create_dir_all(tfmgr.inputs_base_dir())?;
- }
+ let tfmgr = TaskFileManager {
+ inter_inputs,
+ inter_outputs,
+ };
- if !tfmgr.outputs_base_dir().exists() {
- std::untrusted::fs::create_dir_all(tfmgr.outputs_base_dir())?;
- }
Ok(tfmgr)
}
- pub(crate) fn inputs_base_dir(&self) -> PathBuf {
- self.cwd.join("inputs")
+ pub(crate) fn prepare_staged_inputs(&self) -> Result<StagedFiles> {
+ self.inter_inputs.download()?;
+ self.inter_inputs.convert_to_staged_files()
}
- pub(crate) fn outputs_base_dir(&self) -> PathBuf {
- self.cwd.join("outputs")
+ pub(crate) fn prepare_staged_outputs(&self) -> Result<StagedFiles> {
+ let staged_outputs = self.inter_outputs.generate_staged_files();
+ Ok(staged_outputs)
}
- // Original input file is downloaded to $inputs_base_dir/$funiq_key/$original_filename
- fn make_input_download_path(&self, funiq_key: &str, url: &Url) -> Result<PathBuf> {
- let url_path = url.path();
- let original_name = Path::new(url_path)
- .file_name()
- .ok_or_else(|| anyhow::anyhow!("Cannot get filename from url: {:?}", url))?;
- let download_dir = self.inputs_base_dir().join(funiq_key);
- if !download_dir.exists() {
- std::untrusted::fs::create_dir_all(&download_dir)?;
- }
- let local_dest = download_dir.join(original_name);
- Ok(local_dest)
+ pub(crate) fn upload_outputs(&self) -> Result<HashMap<String, FileAuthTag>> {
+ let auth_tags = self.inter_outputs.convert_staged_files_for_upload()?;
+ self.inter_outputs.upload()?;
+ Ok(auth_tags)
}
+}
- // Staged input file is converted to $inputs_base_dir/${funiq_key}.staged_in
- fn make_staged_input_path(&self, file_unique_key: &str) -> PathBuf {
- let mut local_dest = self.inputs_base_dir().join(file_unique_key);
- local_dest.set_extension("staged_in");
- local_dest
- }
+impl InterInput {
+ fn new(
+ base: impl AsRef<Path>,
+ funiq_key: String,
+ file: FunctionInputFile,
+ ) -> Result<InterInput> {
+ let download_path = make_intermediate_path(base.as_ref(), &funiq_key, &file.url)?;
+ let staged_path = make_staged_path(base.as_ref(), &funiq_key, &file.url)?;
- // Staged output file is set to $outputs_base_dir/${funiq_key}.staged_out
- fn make_staged_output_path(&self, file_unique_key: &str) -> PathBuf {
- let dir = self.outputs_base_dir();
- let mut local_dest = dir.join(file_unique_key);
- local_dest.set_extension("staged_out");
- local_dest
+ Ok(InterInput {
+ funiq_key,
+ file,
+ download_path,
+ staged_path,
+ })
}
- // Output file ready for upload is set to $outputs_base_dir/${funiq_key}.out
- fn make_upload_output_path(&self, file_unique_key: &str) -> PathBuf {
- let mut local_dest = self.outputs_base_dir().join(file_unique_key);
- local_dest.set_extension("out");
- local_dest
+ fn to_staged_file_entry(&self) -> Result<(String, StagedFileInfo)> {
+ let src = &self.download_path;
+ let dst = &self.staged_path;
+ let staged_file_info = match self.file.crypto_info {
+ FileCrypto::TeaclaveFile128(crypto) => {
+ std::untrusted::fs::soft_link(src, dst)?;
+ StagedFileInfo::new(&src, crypto, self.file.cmac)
+ }
+ FileCrypto::AesGcm128(crypto) => {
+ let mut bytes = read_all_bytes(src)?;
+ let n = bytes.len();
+ anyhow::ensure!(
+ n > FILE_AUTH_TAG_LENGTH,
+ "AesGcm128 File, invalid length: {:?}",
+ src
+ );
+ anyhow::ensure!(
+ self.file.cmac == bytes[n - FILE_AUTH_TAG_LENGTH..],
+ "AesGcm128 File, invalid tag: {:?}",
+ src
+ );
+ crypto.decrypt(&mut bytes)?;
+ StagedFileInfo::create_with_bytes(dst, &bytes)?
+ }
+ FileCrypto::AesGcm256(crypto) => {
+ let mut bytes = read_all_bytes(src)?;
+ let n = bytes.len();
+ anyhow::ensure!(
+ n > FILE_AUTH_TAG_LENGTH,
+ "AesGcm256 File, invalid length: {:?}",
+ src
+ );
+ anyhow::ensure!(
+ self.file.cmac == bytes[n - FILE_AUTH_TAG_LENGTH..],
+ "AesGcm256 File, invalid tag: {:?}",
+ src
+ );
+ crypto.decrypt(&mut bytes)?;
+ StagedFileInfo::create_with_bytes(dst, &bytes)?
+ }
+ FileCrypto::Raw => {
+ let bytes = read_all_bytes(src)?;
+ StagedFileInfo::create_with_bytes(dst, &bytes)?
+ }
+ };
+ Ok((self.funiq_key.clone(), staged_file_info))
}
+}
- pub(crate) fn download_inputs(&self) -> Result<()> {
- let mut req_info = Vec::new();
- for (fname, finfo) in self.inputs.iter() {
- let local_dest = self.make_input_download_path(fname, &finfo.url)?;
- req_info.push(HandleFileInfo::new(local_dest, &finfo.url));
+impl std::iter::FromIterator<InterInput> for InterInputs {
+ fn from_iter<T: IntoIterator<Item = InterInput>>(iter: T) -> Self {
+ InterInputs {
+ inner: Vec::from_iter(iter),
}
+ }
+}
+impl InterInputs {
+ pub fn new(base: impl AsRef<Path>, inputs: FunctionInputFiles) -> Result<InterInputs> {
+ inputs
+ .into_iter()
+ .map(|(funiq_key, file)| InterInput::new(base.as_ref(), funiq_key, file))
+ .collect()
+ }
+
+ pub(crate) fn download(&self) -> Result<()> {
+ let req_info = self.inner.iter().map(|inter_input| {
+ HandleFileInfo::new(&inter_input.download_path, &inter_input.file.url)
+ });
let request = FileAgentRequest::new(HandleFileCommand::Download, req_info);
log::info!("Ocall file download request: {:?}", request);
handle_file_request(request)?;
Ok(())
}
- pub(crate) fn convert_downloaded_inputs(&self) -> Result<StagedFiles> {
- let mut files: HashMap<String, StagedFileInfo> = HashMap::new();
- for (fkey, finfo) in self.inputs.iter() {
- let src = self.make_input_download_path(fkey, &finfo.url)?;
- let staged_file_info = match finfo.crypto_info {
- FileCrypto::TeaclaveFile128(crypto) => {
- StagedFileInfo::new(&src, crypto, finfo.cmac)
- }
- FileCrypto::AesGcm128(crypto) => {
- let dst = self.make_staged_input_path(fkey);
- let mut bytes = read_all_bytes(&src)?;
- let n = bytes.len();
- anyhow::ensure!(
- n > FILE_AUTH_TAG_LENGTH,
- "AesGcm128 File, invalid length: {:?}",
- src
- );
- anyhow::ensure!(
- finfo.cmac == bytes[n - FILE_AUTH_TAG_LENGTH..],
- "AesGcm128 File, invalid tag: {:?}",
- src
- );
- crypto.decrypt(&mut bytes)?;
- StagedFileInfo::create_with_bytes(&dst, &bytes)?
- }
- FileCrypto::AesGcm256(crypto) => {
- let dst = self.make_staged_input_path(fkey);
- let mut bytes = read_all_bytes(&src)?;
- let n = bytes.len();
- anyhow::ensure!(
- n > FILE_AUTH_TAG_LENGTH,
- "AesGcm256 File, invalid length: {:?}",
- src
- );
- anyhow::ensure!(
- finfo.cmac == bytes[n - FILE_AUTH_TAG_LENGTH..],
- "AesGcm256 File, invalid tag: {:?}",
- src
- );
- crypto.decrypt(&mut bytes)?;
- StagedFileInfo::create_with_bytes(&dst, &bytes)?
- }
- FileCrypto::Raw => {
- let dst = self.make_staged_input_path(fkey);
- let bytes = read_all_bytes(&src)?;
- StagedFileInfo::create_with_bytes(&dst, &bytes)?
- }
- };
-
- files.insert(fkey.to_string(), staged_file_info);
+ pub(crate) fn convert_to_staged_files(&self) -> Result<StagedFiles> {
+ self.inner
+ .iter()
+ .map(|inter_file| inter_file.to_staged_file_entry())
+ .collect()
+ }
+}
+
+impl std::iter::FromIterator<InterOutput> for InterOutputs {
+ fn from_iter<T: IntoIterator<Item = InterOutput>>(iter: T) -> Self {
+ InterOutputs {
+ inner: Vec::from_iter(iter),
}
- Ok(StagedFiles::new(files))
+ }
+}
+
+impl InterOutput {
+ pub fn new(
+ base: impl AsRef<Path>,
+ funiq_key: String,
+ file: FunctionOutputFile,
+ ) -> Result<InterOutput> {
+ let upload_path = make_intermediate_path(base.as_ref(), &funiq_key, &file.url)?;
+ let staged_path = make_staged_path(base.as_ref(), &funiq_key, &file.url)?;
+ let random_key = TeaclaveFile128Key::random();
+ let staged_info = StagedFileInfo::new(&staged_path, random_key, FileAuthTag::default());
+
+ Ok(InterOutput {
+ funiq_key,
+ file,
+ upload_path,
+ staged_info,
+ })
+ }
+
+ fn convert_to_upload_file(&self) -> Result<FileAuthTag> {
+ let dest = &self.upload_path;
+ let outfile = match self.file.crypto_info {
+ FileCrypto::TeaclaveFile128(crypto) => {
+ self.staged_info.convert_file(dest, crypto.to_owned())?
+ }
+
+ FileCrypto::AesGcm128(_) => {
+ anyhow::bail!("OutputFile: unsupported type");
+ }
+ FileCrypto::AesGcm256(_) => {
+ anyhow::bail!("OutputFile: unsupported type");
+ }
+ FileCrypto::Raw => {
+ anyhow::bail!("OutputFile: unsupported type");
+ }
+ };
+ Ok(outfile.cmac)
+ }
+}
+
+impl InterOutputs {
+ pub fn new(base: impl AsRef<Path>, outputs: FunctionOutputFiles) -> Result<InterOutputs> {
+ outputs
+ .into_iter()
+ .map(|(funiq_key, file)| InterOutput::new(base.as_ref(), funiq_key, file))
+ .collect()
}
- // For each output file, we create a random key for the execution.
- // TaskFileManager will convert these output files to the user
- // selected key during the uploading process.
- pub fn create_staged_outputs(self) -> Self {
- let staged_outputs: StagedFiles = self
- .outputs
+ pub fn generate_staged_files(&self) -> StagedFiles {
+ self.inner
.iter()
- .map(|(fkey, _)| {
- let dest = self.make_staged_output_path(fkey);
- let random_key = TeaclaveFile128Key::random();
+ .map(|inter_output| {
(
- fkey.to_string(),
- StagedFileInfo::new(dest, random_key, FileAuthTag::default()),
+ inter_output.funiq_key.clone(),
+ inter_output.staged_info.clone(),
)
})
- .collect();
-
- Self {
- staged_outputs,
- ..self
- }
- }
-
- pub(crate) fn prepare_staged_outputs(&self) -> Result<StagedFiles> {
- anyhow::ensure!(
- self.staged_outputs.len() == self.outputs.len(),
- "Inconsistent between staged_outputs and outputs"
- );
- Ok(self.staged_outputs.clone())
+ .collect()
}
- pub(crate) fn convert_staged_outputs(&self) -> anyhow::Result<HashMap<String, FileAuthTag>> {
- let mut outputs_tag: HashMap<String, FileAuthTag> = HashMap::new();
-
- for (fkey, finfo) in self.outputs.iter() {
- let dest = self.make_upload_output_path(fkey);
- let staged_file = self
- .staged_outputs
- .get(fkey)
- .ok_or_else(|| anyhow::anyhow!("Missing file in staged_output: {:?}", fkey))?;
- let outfile = match finfo.crypto_info {
- FileCrypto::TeaclaveFile128(crypto) => staged_file.convert_file(dest, crypto)?,
-
- FileCrypto::AesGcm128(_) => {
- anyhow::bail!("OutputFile: unsupported type");
- }
- FileCrypto::AesGcm256(_) => {
- anyhow::bail!("OutputFile: unsupported type");
- }
- FileCrypto::Raw => {
- anyhow::bail!("OutputFile: unsupported type");
- }
- };
- outputs_tag.insert(fkey.to_string(), outfile.cmac);
- }
- Ok(outputs_tag)
+ pub fn convert_staged_files_for_upload(&self) -> Result<HashMap<String, FileAuthTag>> {
+ self.inner
+ .iter()
+ .map(|inter_output| {
+ inter_output
+ .convert_to_upload_file()
+ .map(|cmac| (inter_output.funiq_key.clone(), cmac))
+ })
+ .collect()
}
- pub(crate) fn upload_outputs(&self) -> Result<()> {
- let req_info = self.outputs.iter().map(|(fkey, value)| {
- let local = self.make_upload_output_path(fkey);
- HandleFileInfo::new(local, &value.url)
+ pub(crate) fn upload(&self) -> Result<()> {
+ let req_info = self.inner.iter().map(|inter_output| {
+ HandleFileInfo::new(&inter_output.upload_path, &inter_output.file.url)
});
-
let request = FileAgentRequest::new(HandleFileCommand::Upload, req_info);
- log::info!("Ocall file upload request: {:?}", request);
+ log::info!("Ocall file download request: {:?}", request);
handle_file_request(request)?;
Ok(())
}
}
+// Staged file is put in $base_dir/${funiq_key}-staged/$original_name
+fn make_staged_path(base: impl AsRef<Path>, funiq_key: &str, url: &Url) -> Result<PathBuf> {
+ let url_path = url.path();
+ let original_name = Path::new(url_path)
+ .file_name()
+ .ok_or_else(|| anyhow::anyhow!("Cannot get filename from url: {:?}", url))?;
+
+ let staged_dir = format!("{}-{}", funiq_key, "staged");
+ let file_dir = base.as_ref().to_owned().join(&staged_dir);
+ if !file_dir.exists() {
+ std::untrusted::fs::create_dir_all(&file_dir)?;
+ }
+ let local_dest = file_dir.join(original_name);
+ Ok(local_dest)
+}
+
+// Intermediate file is converted to $base_dir/${funiq_key}/$original_name
+fn make_intermediate_path(base: impl AsRef<Path>, funiq_key: &str, url: &Url) -> Result<PathBuf> {
+ let url_path = url.path();
+ let original_name = Path::new(url_path)
+ .file_name()
+ .ok_or_else(|| anyhow::anyhow!("Cannot get filename from url: {:?}", url))?;
+
+ let file_dir = base.as_ref().to_owned().join(funiq_key);
+ if !file_dir.exists() {
+ std::untrusted::fs::create_dir_all(&file_dir)?;
+ }
+ let local_dest = file_dir.join(original_name);
+ Ok(local_dest)
+}
+
#[cfg(feature = "enclave_unit_test")]
pub mod tests {
use super::*;
@@ -268,8 +335,7 @@ pub mod tests {
let file_mgr =
TaskFileManager::new("/tmp", &task_id, &inputs.into(), &outputs.into()).unwrap();
- file_mgr.download_inputs().unwrap();
- file_mgr.convert_downloaded_inputs().unwrap();
+ file_mgr.prepare_staged_inputs().unwrap();
file_mgr.prepare_staged_outputs().unwrap();
}
}
diff --git a/services/management/enclave/src/service.rs b/services/management/enclave/src/service.rs
index 54c65b2..17f4c18 100644
--- a/services/management/enclave/src/service.rs
+++ b/services/management/enclave/src/service.rs
@@ -522,7 +522,7 @@ impl TeaclaveManagementService {
pub fn create_fusion_data(&self, owner: impl Into<OwnerList>) -> Result<TeaclaveOutputFile> {
let uuid = Uuid::new_v4();
let url = format!(
- "file://{}/{}",
+ "file://{}/{}.fusion",
self.fusion_base_dir.display(),
uuid.to_string()
);
diff --git a/services/proto/src/proto/teaclave_common.proto b/services/proto/src/proto/teaclave_common.proto
index 97ca19a..4969227 100644
--- a/services/proto/src/proto/teaclave_common.proto
+++ b/services/proto/src/proto/teaclave_common.proto
@@ -14,7 +14,7 @@ message FileCryptoInfo {
message TaskOutputs {
bytes return_value = 1;
- map<string, string> output_file_hash = 2;
+ map<string, string> tags_map = 2;
}
message TaskFailure {
diff --git a/services/proto/src/teaclave_common.rs b/services/proto/src/teaclave_common.rs
index d81ecc5..9675dd0 100644
--- a/services/proto/src/teaclave_common.rs
+++ b/services/proto/src/teaclave_common.rs
@@ -134,7 +134,7 @@ impl std::convert::TryFrom<proto::TaskOutputs> for TaskOutputs {
fn try_from(proto: proto::TaskOutputs) -> Result<Self> {
let ret = TaskOutputs {
return_value: proto.return_value,
- output_file_hash: proto.output_file_hash.try_into()?,
+ tags_map: proto.tags_map.try_into()?,
};
Ok(ret)
}
@@ -143,7 +143,7 @@ impl std::convert::From<TaskOutputs> for proto::TaskOutputs {
fn from(outputs: TaskOutputs) -> Self {
proto::TaskOutputs {
return_value: outputs.return_value,
- output_file_hash: outputs.output_file_hash.into(),
+ tags_map: outputs.tags_map.into(),
}
}
}
diff --git a/services/scheduler/enclave/Cargo.toml b/services/scheduler/enclave/Cargo.toml
index 69503d4..01e9445 100644
--- a/services/scheduler/enclave/Cargo.toml
+++ b/services/scheduler/enclave/Cargo.toml
@@ -33,6 +33,7 @@ serde_json = { version = "1.0.39" }
serde = { version = "1.0.92", features = ["derive"] }
thiserror = { version = "1.0.9" }
gbdt = { version = "0.1.0", features = ["input", "enable_training"] }
+uuid = { version = "0.8.1", features = ["v4"] }
teaclave_attestation = { path = "../../../attestation" }
teaclave_config = { path = "../../../config" }
diff --git a/services/scheduler/enclave/src/service.rs b/services/scheduler/enclave/src/service.rs
index 49baf50..bad3535 100644
--- a/services/scheduler/enclave/src/service.rs
+++ b/services/scheduler/enclave/src/service.rs
@@ -23,15 +23,17 @@ use std::collections::VecDeque;
use std::prelude::v1::*;
use std::sync::{Arc, SgxMutex as Mutex};
+use std::collections::HashMap;
use teaclave_proto::teaclave_scheduler_service::*;
use teaclave_proto::teaclave_storage_service::*;
use teaclave_rpc::endpoint::Endpoint;
use teaclave_rpc::Request;
use teaclave_service_enclave_utils::teaclave_service;
use teaclave_types::{
- StagedTask, Storable, Task, TaskStatus, TeaclaveServiceResponseError,
- TeaclaveServiceResponseResult,
+ ExternalID, OutputsTags, StagedTask, Storable, Task, TaskResult, TaskStatus,
+ TeaclaveOutputFile, TeaclaveServiceResponseError, TeaclaveServiceResponseResult,
};
+use uuid::Uuid;
use anyhow::anyhow;
use anyhow::Result;
@@ -96,19 +98,24 @@ impl TeaclaveSchedulerService {
.map_err(|_| TeaclaveSchedulerError::DataError.into())
}
- fn get_task(&self, key: &str) -> Result<Task> {
- let key = format!("{}-{}", Task::key_prefix(), key);
- let get_request = GetRequest::new(key.into_bytes());
- let get_response = self
+ fn get_task(&self, task_id: &Uuid) -> Result<Task> {
+ let key = ExternalID::new(Task::key_prefix(), task_id.to_owned());
+ self.get_from_db(&key)
+ }
+
+ fn get_from_db<T: Storable>(&self, key: &ExternalID) -> Result<T> {
+ anyhow::ensure!(T::match_prefix(&key.prefix), "Key prefix doesn't match.");
+ let get_request = GetRequest::new(key.to_bytes());
+ let response = self
.storage_client
.clone()
.lock()
.map_err(|_| anyhow!("Cannot lock storage client"))?
.get(get_request)?;
- Task::from_slice(get_response.value.as_slice())
+ T::from_slice(response.value.as_slice())
}
- fn put_task(&self, item: &impl Storable) -> Result<()> {
+ fn put_into_db(&self, item: &impl Storable) -> Result<()> {
let k = item.key();
let v = item.to_vec()?;
let put_request = PutRequest::new(k.as_slice(), v.as_slice());
@@ -120,6 +127,26 @@ impl TeaclaveSchedulerService {
.put(put_request)?;
Ok(())
}
+
+ fn update_outputs_cmac(
+ &self,
+ task_output_map: &HashMap<String, ExternalID>,
+ tags_map: &OutputsTags,
+ ) -> Result<()> {
+ anyhow::ensure!(
+ task_output_map.len() == tags_map.len(),
+ "Error: task result output tags count"
+ );
+ for (key, id) in task_output_map.iter() {
+ let mut outfile: TeaclaveOutputFile = self.get_from_db(id)?;
+ let auth_tag = tags_map
+ .get(key)
+ .ok_or_else(|| anyhow::anyhow!("Missing result in task result outpt tags"))?;
+ outfile.assign_cmac(auth_tag)?;
+ self.put_into_db(&outfile)?;
+ }
+ Ok(())
+ }
}
impl TeaclaveScheduler for TeaclaveSchedulerService {
@@ -162,10 +189,10 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
request: Request<UpdateTaskStatusRequest>,
) -> TeaclaveServiceResponseResult<UpdateTaskStatusResponse> {
let request = request.message;
- let mut task = self.get_task(&request.task_id.to_string())?;
+ let mut task = self.get_task(&request.task_id)?;
task.status = request.task_status;
log::info!("UpdateTaskStatus: Task {:?}", task);
- self.put_task(&task)?;
+ self.put_into_db(&task)?;
Ok(UpdateTaskStatusResponse {})
}
@@ -174,13 +201,18 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
request: Request<UpdateTaskResultRequest>,
) -> TeaclaveServiceResponseResult<UpdateTaskResultResponse> {
let request = request.message;
- let mut task = self.get_task(&request.task_id.to_string())?;
+ let mut task = self.get_task(&request.task_id)?;
+
+ if let TaskResult::Ok(outputs) = &request.task_result {
+ self.update_outputs_cmac(&task.output_map, &outputs.tags_map)?;
+ };
+
task.result = request.task_result;
// Updating task result means we have finished execution
task.status = TaskStatus::Finished;
+ self.put_into_db(&task)?;
- self.put_task(&task)?;
Ok(UpdateTaskResultResponse {})
}
}
diff --git a/tests/functional/enclave/src/end_to_end/mesapy_data_fusion.rs b/tests/functional/enclave/src/end_to_end/mesapy_data_fusion.rs
index 27538c2..327ffd5 100644
--- a/tests/functional/enclave/src/end_to_end/mesapy_data_fusion.rs
+++ b/tests/functional/enclave/src/end_to_end/mesapy_data_fusion.rs
@@ -31,7 +31,7 @@ fn setup_client() -> anyhow::Result<(TeaclaveFrontendClient, TeaclaveFrontendCli
Ok((client1, client2))
}
-fn register_function(client: &mut TeaclaveFrontendClient) -> ExternalID {
+fn register_data_fusion_function(client: &mut TeaclaveFrontendClient) -> ExternalID {
let script = r#"
def readlines(fid):
lines = None
@@ -91,7 +91,10 @@ fn register_fusion_output(
response.data_id
}
-fn create_task(client: &mut TeaclaveFrontendClient, function_id: &ExternalID) -> ExternalID {
+fn create_data_fusion_task(
+ client: &mut TeaclaveFrontendClient,
+ function_id: &ExternalID,
+) -> ExternalID {
let request = CreateTaskRequest::new()
.function_id(function_id.to_owned())
.input_owners_map(hashmap!(
@@ -120,10 +123,10 @@ fn assign_data_for_task(
pub fn test_data_fusion_success() {
let (mut c1, mut c2) = setup_client().unwrap();
- let function_id = register_function(&mut c1);
+ let function_id = register_data_fusion_function(&mut c1);
// Create Task
- let task_id = create_task(&mut c1, &function_id);
+ let task_id = create_data_fusion_task(&mut c1, &function_id);
// Register Data and Assign Data To Task
// input1 is owned by user1
@@ -159,13 +162,100 @@ pub fn test_data_fusion_success() {
);
// Approve Task
- approve_task(&mut c1, &task_id);
- approve_task(&mut c2, &task_id);
+ approve_task(&mut c1, &task_id).unwrap();
+ approve_task(&mut c2, &task_id).unwrap();
// Invoke Task by the creator
- invoke_task(&mut c1, &task_id);
+ invoke_task(&mut c1, &task_id).unwrap();
// Get Task
let ret_val = get_task_until(&mut c1, &task_id, TaskStatus::Finished);
assert_eq!(&ret_val, "Mixed 5 lines of data");
+
+ let task = get_task(&mut c2, &task_id);
+ assert!(task.status == TaskStatus::Finished);
+
+ let fusion_id = task.output_map.get("OutFusionData").unwrap();
+ let fusion_owners = task.output_owners_map.get("OutFusionData").unwrap();
+
+ let fusion_input = register_fusion_input_from_output(&mut c2, &fusion_id);
+ let function_id = register_word_count_function(&mut c2);
+
+ let task_id = create_wlc_task(&mut c2, &function_id, &fusion_owners);
+ assign_data_for_task(
+ &mut c2,
+ &task_id,
+ hashmap!("InputData" => fusion_input),
+ hashmap!(),
+ );
+
+ approve_task(&mut c2, &task_id).unwrap();
+
+ // Invoke Task by the creator
+ assert!(invoke_task(&mut c2, &task_id).is_err());
+
+ approve_task(&mut c1, &task_id).unwrap();
+ invoke_task(&mut c2, &task_id).unwrap();
+ let ret_val = get_task_until(&mut c2, &task_id, TaskStatus::Finished);
+ assert_eq!(&ret_val, "2");
+}
+
+fn register_fusion_input_from_output(
+ client: &mut TeaclaveFrontendClient,
+ fusion_id: &ExternalID,
+) -> ExternalID {
+ let request = RegisterInputFromOutputRequest::new(fusion_id.clone());
+ let response = client.register_input_from_output(request).unwrap();
+ response.data_id
+}
+
+fn register_word_count_function(client: &mut TeaclaveFrontendClient) -> ExternalID {
+ let script = r#"
+def readlines(fid):
+ lines = None
+ with teaclave_open(fid, "rb") as f:
+ lines = f.readlines()
+ return lines
+
+def entrypoint(argv):
+ fid = "InputData"
+ assert len(argv) == 2
+ assert argv[0] == "query"
+ word = argv[1]
+ cnt = 0
+ for line in readlines(fid):
+ if word in line:
+ cnt += 1
+ return "%s" % cnt
+"#;
+
+ let input_spec = FunctionInput::new("InputData", "Lines of Data");
+ let request = RegisterFunctionRequest::new()
+ .name("wlc")
+ .description("Mesapy Word Line Count Function")
+ .arguments(vec!["query"])
+ .payload(script.into())
+ .executor_type(ExecutorType::Python)
+ .public(true)
+ .inputs(vec![input_spec]);
+ let response = client.register_function(request).unwrap();
+ log::info!("Resgister function: {:?}", response);
+ response.function_id
+}
+
+fn create_wlc_task(
+ client: &mut TeaclaveFrontendClient,
+ function_id: &ExternalID,
+ owners: &OwnerList,
+) -> ExternalID {
+ let request = CreateTaskRequest::new()
+ .function_id(function_id.to_owned())
+ .function_arguments(hashmap!("query" => "teaclave"))
+ .input_owners_map(hashmap!(
+ "InputData" => owners.to_owned()
+ ))
+ .executor(Executor::MesaPy);
+ let response = client.create_task(request).unwrap();
+ log::info!("Create task: {:?}", response);
+ response.task_id
}
diff --git a/tests/functional/enclave/src/end_to_end/mesapy_echo.rs b/tests/functional/enclave/src/end_to_end/mesapy_echo.rs
index 420dd40..749931d 100644
--- a/tests/functional/enclave/src/end_to_end/mesapy_echo.rs
+++ b/tests/functional/enclave/src/end_to_end/mesapy_echo.rs
@@ -65,10 +65,10 @@ def entrypoint(argv):
log::info!("Assign data: {:?}", response);
// Approve Task
- approve_task(&mut client, &task_id);
+ approve_task(&mut client, &task_id).unwrap();
// Invoke Task
- invoke_task(&mut client, &task_id);
+ invoke_task(&mut client, &task_id).unwrap();
// Get Task
let ret_val = get_task_until(&mut client, &task_id, TaskStatus::Finished);
diff --git a/tests/functional/enclave/src/end_to_end/mod.rs b/tests/functional/enclave/src/end_to_end/mod.rs
index eb807ea..64a7020 100644
--- a/tests/functional/enclave/src/end_to_end/mod.rs
+++ b/tests/functional/enclave/src/end_to_end/mod.rs
@@ -26,6 +26,13 @@ mod mesapy_echo;
mod native_echo;
mod native_gbdt_training;
+fn get_task(client: &mut TeaclaveFrontendClient, task_id: &ExternalID) -> GetTaskResponse {
+ let request = GetTaskRequest::new(task_id.clone());
+ let response = client.get_task(request).unwrap();
+ log::info!("Get task: {:?}", response);
+ response
+}
+
fn get_task_until(
client: &mut TeaclaveFrontendClient,
task_id: &ExternalID,
@@ -55,14 +62,16 @@ fn get_task_until(
}
}
-fn approve_task(client: &mut TeaclaveFrontendClient, task_id: &ExternalID) {
+fn approve_task(client: &mut TeaclaveFrontendClient, task_id: &ExternalID) -> anyhow::Result<()> {
let request = ApproveTaskRequest::new(task_id.clone());
- let response = client.approve_task(request).unwrap();
+ let response = client.approve_task(request)?;
log::info!("Approve task: {:?}", response);
+ Ok(())
}
-fn invoke_task(client: &mut TeaclaveFrontendClient, task_id: &ExternalID) {
+fn invoke_task(client: &mut TeaclaveFrontendClient, task_id: &ExternalID) -> anyhow::Result<()> {
let request = InvokeTaskRequest::new(task_id.clone());
- let response = client.invoke_task(request).unwrap();
+ let response = client.invoke_task(request)?;
log::info!("Invoke task: {:?}", response);
+ Ok(())
}
diff --git a/tests/functional/enclave/src/end_to_end/native_echo.rs b/tests/functional/enclave/src/end_to_end/native_echo.rs
index 73c2090..8864559 100644
--- a/tests/functional/enclave/src/end_to_end/native_echo.rs
+++ b/tests/functional/enclave/src/end_to_end/native_echo.rs
@@ -58,10 +58,10 @@ pub fn test_echo_task_success() {
log::info!("Assign data: {:?}", response);
// Approve Task
- approve_task(&mut client, &task_id);
+ approve_task(&mut client, &task_id).unwrap();
// Invoke Task
- invoke_task(&mut client, &task_id);
+ invoke_task(&mut client, &task_id).unwrap();
// Get Task
let ret_val = get_task_until(&mut client, &task_id, TaskStatus::Finished);
diff --git a/tests/functional/enclave/src/end_to_end/native_gbdt_training.rs b/tests/functional/enclave/src/end_to_end/native_gbdt_training.rs
index 5407668..3747596 100644
--- a/tests/functional/enclave/src/end_to_end/native_gbdt_training.rs
+++ b/tests/functional/enclave/src/end_to_end/native_gbdt_training.rs
@@ -30,8 +30,8 @@ pub fn test_gbdt_training_task() {
let task_id = create_gbdt_training_task(&mut client, &function_id);
assign_data_to_task(&mut client, &task_id, training_data_id, output_model_id);
- approve_task(&mut client, &task_id);
- invoke_task(&mut client, &task_id);
+ approve_task(&mut client, &task_id).unwrap();
+ invoke_task(&mut client, &task_id).unwrap();
let ret_val = get_task_until(&mut client, &task_id, TaskStatus::Finished);
assert_eq!(&ret_val, "Trained 120 lines of data.");
diff --git a/tests/functional/enclave/src/scheduler_service.rs b/tests/functional/enclave/src/scheduler_service.rs
index 82a0034..be052ae 100644
--- a/tests/functional/enclave/src/scheduler_service.rs
+++ b/tests/functional/enclave/src/scheduler_service.rs
@@ -79,11 +79,11 @@ fn test_update_task_status_result() {
log::debug!("response: {:?}", response);
let task_id = response.staged_task.task_id;
- let request = UpdateTaskStatusRequest::new(task_id, TaskStatus::Finished, String::new());
+ let request = UpdateTaskStatusRequest::new(task_id, TaskStatus::Running, String::new());
let response = client.update_task_status(request);
assert!(response.is_ok());
- let task_outputs = TaskOutputs::new("return", hashmap!());
+ let task_outputs = TaskOutputs::new("return value", hashmap!());
let request = UpdateTaskResultRequest::new(task_id, Ok(task_outputs));
let response = client.update_task_result(request);
diff --git a/types/src/file.rs b/types/src/file.rs
index 35fece2..c2185a1 100644
--- a/types/src/file.rs
+++ b/types/src/file.rs
@@ -102,6 +102,12 @@ impl TeaclaveOutputFile {
uuid: create_uuid(),
}
}
+
+ pub fn assign_cmac(&mut self, cmac: &FileAuthTag) -> Result<()> {
+ anyhow::ensure!(self.cmac.is_none(), "Cannot overwrite output file cmac");
+ self.cmac = Some(cmac.to_owned());
+ Ok(())
+ }
}
impl Storable for TeaclaveOutputFile {
diff --git a/types/src/staged_task.rs b/types/src/staged_task.rs
index 42aad04..c2bf26e 100644
--- a/types/src/staged_task.rs
+++ b/types/src/staged_task.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::hash_map::{Iter, IterMut};
+use std::collections::hash_map::{IntoIter, Iter, IterMut};
use std::collections::HashMap;
use std::prelude::v1::*;
@@ -45,6 +45,15 @@ impl FunctionInputFiles {
}
}
+impl IntoIterator for FunctionInputFiles {
+ type Item = (String, FunctionInputFile);
+ type IntoIter = IntoIter<String, FunctionInputFile>;
+
+ fn into_iter(self) -> IntoIter<String, FunctionInputFile> {
+ self.inner.into_iter()
+ }
+}
+
impl std::convert::From<HashMap<String, FunctionInputFile>> for FunctionInputFiles {
fn from(entries: HashMap<String, FunctionInputFile>) -> FunctionInputFiles {
FunctionInputFiles { inner: entries }
@@ -56,6 +65,15 @@ pub struct FunctionOutputFiles {
inner: HashMap<String, FunctionOutputFile>,
}
+impl IntoIterator for FunctionOutputFiles {
+ type Item = (String, FunctionOutputFile);
+ type IntoIter = IntoIter<String, FunctionOutputFile>;
+
+ fn into_iter(self) -> IntoIter<String, FunctionOutputFile> {
+ self.inner.into_iter()
+ }
+}
+
impl FunctionOutputFiles {
pub fn new(entries: HashMap<String, FunctionOutputFile>) -> Self {
entries.into()
diff --git a/types/src/task.rs b/types/src/task.rs
index 29d747a..f23a24d 100644
--- a/types/src/task.rs
+++ b/types/src/task.rs
@@ -154,6 +154,18 @@ impl OutputsTags {
pub fn iter(&self) -> Iter<String, FileAuthTag> {
self.inner.iter()
}
+
+ pub fn get(&self, key: &str) -> Option<&FileAuthTag> {
+ self.inner.get(key)
+ }
+
+ pub fn len(&self) -> usize {
+ self.inner.len()
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
}
impl std::convert::TryFrom<HashMap<String, String>> for OutputsTags {
@@ -189,14 +201,14 @@ impl std::iter::FromIterator<(String, FileAuthTag)> for OutputsTags {
#[derive(Debug, Deserialize, Serialize)]
pub struct TaskOutputs {
pub return_value: Vec<u8>,
- pub output_file_hash: OutputsTags,
+ pub tags_map: OutputsTags,
}
impl TaskOutputs {
- pub fn new(value: impl Into<Vec<u8>>, output_file_hash: HashMap<String, FileAuthTag>) -> Self {
+ pub fn new(value: impl Into<Vec<u8>>, tags_map: HashMap<String, FileAuthTag>) -> Self {
TaskOutputs {
return_value: value.into(),
- output_file_hash: OutputsTags::new(output_file_hash),
+ tags_map: OutputsTags::new(tags_map),
}
}
}
@@ -281,8 +293,15 @@ pub enum TaskResult {
Err(TaskFailure),
}
-#[cfg(test_mode)]
impl TaskResult {
+ pub fn is_ok(&self) -> bool {
+ match self {
+ TaskResult::Ok(_) => true,
+ _ => false,
+ }
+ }
+
+ #[cfg(test_mode)]
pub fn unwrap(self) -> TaskOutputs {
match self {
TaskResult::Ok(t) => t,
diff --git a/types/src/worker.rs b/types/src/worker.rs
index 6581d42..eebe592 100644
--- a/types/src/worker.rs
+++ b/types/src/worker.rs
@@ -146,7 +146,7 @@ pub struct WorkerCapability {
#[derive(Debug, Default)]
pub struct ExecutionResult {
pub return_value: Vec<u8>,
- pub output_file_hash: OutputsTags,
+ pub tags_map: OutputsTags,
}
#[cfg(feature = "enclave_unit_test")]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@teaclave.apache.org
For additional commands, e-mail: commits-help@teaclave.apache.org