You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@teaclave.apache.org by ms...@apache.org on 2020/04/24 06:12:09 UTC

[incubator-teaclave] branch master updated: [fusion] Support fusion data used as input with example (#274)

This is an automated email from the ASF dual-hosted git repository.

mssun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git


The following commit(s) were added to refs/heads/master by this push:
     new 00e898e  [fusion] Support fusion data used as input with example (#274)
00e898e is described below

commit 00e898e1355d785bd456d2cf2ad05e208a4d69e6
Author: Zhaofeng Chen <zf...@apache.org>
AuthorDate: Thu Apr 23 23:11:59 2020 -0700

    [fusion] Support fusion data used as input with example (#274)
---
 services/execution/enclave/src/service.rs          |   7 +-
 .../execution/enclave/src/task_file_manager.rs     | 402 ++++++++++++---------
 services/management/enclave/src/service.rs         |   2 +-
 services/proto/src/proto/teaclave_common.proto     |   2 +-
 services/proto/src/teaclave_common.rs              |   4 +-
 services/scheduler/enclave/Cargo.toml              |   1 +
 services/scheduler/enclave/src/service.rs          |  56 ++-
 .../enclave/src/end_to_end/mesapy_data_fusion.rs   | 104 +++++-
 .../enclave/src/end_to_end/mesapy_echo.rs          |   4 +-
 tests/functional/enclave/src/end_to_end/mod.rs     |  17 +-
 .../enclave/src/end_to_end/native_echo.rs          |   4 +-
 .../enclave/src/end_to_end/native_gbdt_training.rs |   4 +-
 tests/functional/enclave/src/scheduler_service.rs  |   4 +-
 types/src/file.rs                                  |   6 +
 types/src/staged_task.rs                           |  20 +-
 types/src/task.rs                                  |  27 +-
 types/src/worker.rs                                |   2 +-
 17 files changed, 452 insertions(+), 214 deletions(-)

diff --git a/services/execution/enclave/src/service.rs b/services/execution/enclave/src/service.rs
index ec89165..3d46fd8 100644
--- a/services/execution/enclave/src/service.rs
+++ b/services/execution/enclave/src/service.rs
@@ -153,8 +153,7 @@ impl TeaclaveExecutionService {
 }
 
 fn prepare_task(task: &StagedTask, file_mgr: &TaskFileManager) -> Result<StagedFunction> {
-    file_mgr.download_inputs()?;
-    let input_files = file_mgr.convert_downloaded_inputs()?;
+    let input_files = file_mgr.prepare_staged_inputs()?;
     let output_files = file_mgr.prepare_staged_outputs()?;
     let function_payload = String::from_utf8_lossy(&task.function_payload).to_string();
 
@@ -170,9 +169,7 @@ fn prepare_task(task: &StagedTask, file_mgr: &TaskFileManager) -> Result<StagedF
 }
 
 fn finalize_task(file_mgr: &TaskFileManager) -> Result<HashMap<String, FileAuthTag>> {
-    let outputs_tag = file_mgr.convert_staged_outputs()?;
-    file_mgr.upload_outputs()?;
-    Ok(outputs_tag)
+    file_mgr.upload_outputs()
 }
 
 #[cfg(feature = "enclave_unit_test")]
diff --git a/services/execution/enclave/src/task_file_manager.rs b/services/execution/enclave/src/task_file_manager.rs
index 298808f..e3e066a 100644
--- a/services/execution/enclave/src/task_file_manager.rs
+++ b/services/execution/enclave/src/task_file_manager.rs
@@ -27,12 +27,31 @@ use teaclave_types::*;
 use url::Url;
 use uuid::Uuid;
 
-#[derive(Default)]
 pub(crate) struct TaskFileManager {
-    cwd: PathBuf,
-    inputs: FunctionInputFiles,
-    outputs: FunctionOutputFiles,
-    staged_outputs: StagedFiles,
+    inter_inputs: InterInputs,
+    inter_outputs: InterOutputs,
+}
+
+struct InterInputs {
+    inner: Vec<InterInput>,
+}
+
+struct InterOutputs {
+    inner: Vec<InterOutput>,
+}
+
+pub(self) struct InterInput {
+    pub(self) funiq_key: String,
+    pub(self) file: FunctionInputFile,
+    pub(self) download_path: PathBuf,
+    pub(self) staged_path: PathBuf,
+}
+
+pub(self) struct InterOutput {
+    pub(self) funiq_key: String,
+    pub(self) file: FunctionOutputFile,
+    pub(self) upload_path: PathBuf,
+    pub(self) staged_info: StagedFileInfo,
 }
 
 impl TaskFileManager {
@@ -43,210 +62,258 @@ impl TaskFileManager {
         outputs: &FunctionOutputFiles,
     ) -> Result<Self> {
         let cwd = Path::new(base).join(task_id.to_string());
-        if !cwd.exists() {
-            std::untrusted::fs::create_dir_all(&cwd)?;
-        }
+        let inputs_base = cwd.clone().join("inputs");
+        let outputs_base = cwd.join("outputs");
 
-        let tfmgr = TaskFileManager {
-            cwd,
-            inputs: inputs.clone(),
-            outputs: outputs.clone(),
-            ..Default::default()
-        }
-        .create_staged_outputs();
+        let inter_inputs = InterInputs::new(&inputs_base, inputs.clone())?;
+        let inter_outputs = InterOutputs::new(&outputs_base, outputs.clone())?;
 
-        if !tfmgr.inputs_base_dir().exists() {
-            std::untrusted::fs::create_dir_all(tfmgr.inputs_base_dir())?;
-        }
+        let tfmgr = TaskFileManager {
+            inter_inputs,
+            inter_outputs,
+        };
 
-        if !tfmgr.outputs_base_dir().exists() {
-            std::untrusted::fs::create_dir_all(tfmgr.outputs_base_dir())?;
-        }
         Ok(tfmgr)
     }
 
-    pub(crate) fn inputs_base_dir(&self) -> PathBuf {
-        self.cwd.join("inputs")
+    pub(crate) fn prepare_staged_inputs(&self) -> Result<StagedFiles> {
+        self.inter_inputs.download()?;
+        self.inter_inputs.convert_to_staged_files()
     }
 
-    pub(crate) fn outputs_base_dir(&self) -> PathBuf {
-        self.cwd.join("outputs")
+    pub(crate) fn prepare_staged_outputs(&self) -> Result<StagedFiles> {
+        let staged_outputs = self.inter_outputs.generate_staged_files();
+        Ok(staged_outputs)
     }
 
-    // Original input file is downloaded to $inputs_base_dir/$funiq_key/$original_filename
-    fn make_input_download_path(&self, funiq_key: &str, url: &Url) -> Result<PathBuf> {
-        let url_path = url.path();
-        let original_name = Path::new(url_path)
-            .file_name()
-            .ok_or_else(|| anyhow::anyhow!("Cannot get filename from url: {:?}", url))?;
-        let download_dir = self.inputs_base_dir().join(funiq_key);
-        if !download_dir.exists() {
-            std::untrusted::fs::create_dir_all(&download_dir)?;
-        }
-        let local_dest = download_dir.join(original_name);
-        Ok(local_dest)
+    pub(crate) fn upload_outputs(&self) -> Result<HashMap<String, FileAuthTag>> {
+        let auth_tags = self.inter_outputs.convert_staged_files_for_upload()?;
+        self.inter_outputs.upload()?;
+        Ok(auth_tags)
     }
+}
 
-    // Staged input file is converted to $inputs_base_dir/${funiq_key}.staged_in
-    fn make_staged_input_path(&self, file_unique_key: &str) -> PathBuf {
-        let mut local_dest = self.inputs_base_dir().join(file_unique_key);
-        local_dest.set_extension("staged_in");
-        local_dest
-    }
+impl InterInput {
+    fn new(
+        base: impl AsRef<Path>,
+        funiq_key: String,
+        file: FunctionInputFile,
+    ) -> Result<InterInput> {
+        let download_path = make_intermediate_path(base.as_ref(), &funiq_key, &file.url)?;
+        let staged_path = make_staged_path(base.as_ref(), &funiq_key, &file.url)?;
 
-    // Staged output file is set to $outputs_base_dir/${funiq_key}.staged_out
-    fn make_staged_output_path(&self, file_unique_key: &str) -> PathBuf {
-        let dir = self.outputs_base_dir();
-        let mut local_dest = dir.join(file_unique_key);
-        local_dest.set_extension("staged_out");
-        local_dest
+        Ok(InterInput {
+            funiq_key,
+            file,
+            download_path,
+            staged_path,
+        })
     }
 
-    // Output file ready for upload is set to $outputs_base_dir/${funiq_key}.out
-    fn make_upload_output_path(&self, file_unique_key: &str) -> PathBuf {
-        let mut local_dest = self.outputs_base_dir().join(file_unique_key);
-        local_dest.set_extension("out");
-        local_dest
+    fn to_staged_file_entry(&self) -> Result<(String, StagedFileInfo)> {
+        let src = &self.download_path;
+        let dst = &self.staged_path;
+        let staged_file_info = match self.file.crypto_info {
+            FileCrypto::TeaclaveFile128(crypto) => {
+                std::untrusted::fs::soft_link(src, dst)?;
+                StagedFileInfo::new(&src, crypto, self.file.cmac)
+            }
+            FileCrypto::AesGcm128(crypto) => {
+                let mut bytes = read_all_bytes(src)?;
+                let n = bytes.len();
+                anyhow::ensure!(
+                    n > FILE_AUTH_TAG_LENGTH,
+                    "AesGcm128 File, invalid length: {:?}",
+                    src
+                );
+                anyhow::ensure!(
+                    self.file.cmac == bytes[n - FILE_AUTH_TAG_LENGTH..],
+                    "AesGcm128 File, invalid tag: {:?}",
+                    src
+                );
+                crypto.decrypt(&mut bytes)?;
+                StagedFileInfo::create_with_bytes(dst, &bytes)?
+            }
+            FileCrypto::AesGcm256(crypto) => {
+                let mut bytes = read_all_bytes(src)?;
+                let n = bytes.len();
+                anyhow::ensure!(
+                    n > FILE_AUTH_TAG_LENGTH,
+                    "AesGcm256 File, invalid length: {:?}",
+                    src
+                );
+                anyhow::ensure!(
+                    self.file.cmac == bytes[n - FILE_AUTH_TAG_LENGTH..],
+                    "AesGcm256 File, invalid tag: {:?}",
+                    src
+                );
+                crypto.decrypt(&mut bytes)?;
+                StagedFileInfo::create_with_bytes(dst, &bytes)?
+            }
+            FileCrypto::Raw => {
+                let bytes = read_all_bytes(src)?;
+                StagedFileInfo::create_with_bytes(dst, &bytes)?
+            }
+        };
+        Ok((self.funiq_key.clone(), staged_file_info))
     }
+}
 
-    pub(crate) fn download_inputs(&self) -> Result<()> {
-        let mut req_info = Vec::new();
-        for (fname, finfo) in self.inputs.iter() {
-            let local_dest = self.make_input_download_path(fname, &finfo.url)?;
-            req_info.push(HandleFileInfo::new(local_dest, &finfo.url));
+impl std::iter::FromIterator<InterInput> for InterInputs {
+    fn from_iter<T: IntoIterator<Item = InterInput>>(iter: T) -> Self {
+        InterInputs {
+            inner: Vec::from_iter(iter),
         }
+    }
+}
 
+impl InterInputs {
+    pub fn new(base: impl AsRef<Path>, inputs: FunctionInputFiles) -> Result<InterInputs> {
+        inputs
+            .into_iter()
+            .map(|(funiq_key, file)| InterInput::new(base.as_ref(), funiq_key, file))
+            .collect()
+    }
+
+    pub(crate) fn download(&self) -> Result<()> {
+        let req_info = self.inner.iter().map(|inter_input| {
+            HandleFileInfo::new(&inter_input.download_path, &inter_input.file.url)
+        });
         let request = FileAgentRequest::new(HandleFileCommand::Download, req_info);
         log::info!("Ocall file download request: {:?}", request);
         handle_file_request(request)?;
         Ok(())
     }
 
-    pub(crate) fn convert_downloaded_inputs(&self) -> Result<StagedFiles> {
-        let mut files: HashMap<String, StagedFileInfo> = HashMap::new();
-        for (fkey, finfo) in self.inputs.iter() {
-            let src = self.make_input_download_path(fkey, &finfo.url)?;
-            let staged_file_info = match finfo.crypto_info {
-                FileCrypto::TeaclaveFile128(crypto) => {
-                    StagedFileInfo::new(&src, crypto, finfo.cmac)
-                }
-                FileCrypto::AesGcm128(crypto) => {
-                    let dst = self.make_staged_input_path(fkey);
-                    let mut bytes = read_all_bytes(&src)?;
-                    let n = bytes.len();
-                    anyhow::ensure!(
-                        n > FILE_AUTH_TAG_LENGTH,
-                        "AesGcm128 File, invalid length: {:?}",
-                        src
-                    );
-                    anyhow::ensure!(
-                        finfo.cmac == bytes[n - FILE_AUTH_TAG_LENGTH..],
-                        "AesGcm128 File, invalid tag: {:?}",
-                        src
-                    );
-                    crypto.decrypt(&mut bytes)?;
-                    StagedFileInfo::create_with_bytes(&dst, &bytes)?
-                }
-                FileCrypto::AesGcm256(crypto) => {
-                    let dst = self.make_staged_input_path(fkey);
-                    let mut bytes = read_all_bytes(&src)?;
-                    let n = bytes.len();
-                    anyhow::ensure!(
-                        n > FILE_AUTH_TAG_LENGTH,
-                        "AesGcm256 File, invalid length: {:?}",
-                        src
-                    );
-                    anyhow::ensure!(
-                        finfo.cmac == bytes[n - FILE_AUTH_TAG_LENGTH..],
-                        "AesGcm256 File, invalid tag: {:?}",
-                        src
-                    );
-                    crypto.decrypt(&mut bytes)?;
-                    StagedFileInfo::create_with_bytes(&dst, &bytes)?
-                }
-                FileCrypto::Raw => {
-                    let dst = self.make_staged_input_path(fkey);
-                    let bytes = read_all_bytes(&src)?;
-                    StagedFileInfo::create_with_bytes(&dst, &bytes)?
-                }
-            };
-
-            files.insert(fkey.to_string(), staged_file_info);
+    pub(crate) fn convert_to_staged_files(&self) -> Result<StagedFiles> {
+        self.inner
+            .iter()
+            .map(|inter_file| inter_file.to_staged_file_entry())
+            .collect()
+    }
+}
+
+impl std::iter::FromIterator<InterOutput> for InterOutputs {
+    fn from_iter<T: IntoIterator<Item = InterOutput>>(iter: T) -> Self {
+        InterOutputs {
+            inner: Vec::from_iter(iter),
         }
-        Ok(StagedFiles::new(files))
+    }
+}
+
+impl InterOutput {
+    pub fn new(
+        base: impl AsRef<Path>,
+        funiq_key: String,
+        file: FunctionOutputFile,
+    ) -> Result<InterOutput> {
+        let upload_path = make_intermediate_path(base.as_ref(), &funiq_key, &file.url)?;
+        let staged_path = make_staged_path(base.as_ref(), &funiq_key, &file.url)?;
+        let random_key = TeaclaveFile128Key::random();
+        let staged_info = StagedFileInfo::new(&staged_path, random_key, FileAuthTag::default());
+
+        Ok(InterOutput {
+            funiq_key,
+            file,
+            upload_path,
+            staged_info,
+        })
+    }
+
+    fn convert_to_upload_file(&self) -> Result<FileAuthTag> {
+        let dest = &self.upload_path;
+        let outfile = match self.file.crypto_info {
+            FileCrypto::TeaclaveFile128(crypto) => {
+                self.staged_info.convert_file(dest, crypto.to_owned())?
+            }
+
+            FileCrypto::AesGcm128(_) => {
+                anyhow::bail!("OutputFile: unsupported type");
+            }
+            FileCrypto::AesGcm256(_) => {
+                anyhow::bail!("OutputFile: unsupported type");
+            }
+            FileCrypto::Raw => {
+                anyhow::bail!("OutputFile: unsupported type");
+            }
+        };
+        Ok(outfile.cmac)
+    }
+}
+
+impl InterOutputs {
+    pub fn new(base: impl AsRef<Path>, outputs: FunctionOutputFiles) -> Result<InterOutputs> {
+        outputs
+            .into_iter()
+            .map(|(funiq_key, file)| InterOutput::new(base.as_ref(), funiq_key, file))
+            .collect()
     }
 
-    // For each output file, we create a random key for the execution.
-    // TaskFileManager will convert these output files to the user
-    // selected key during the uploading process.
-    pub fn create_staged_outputs(self) -> Self {
-        let staged_outputs: StagedFiles = self
-            .outputs
+    pub fn generate_staged_files(&self) -> StagedFiles {
+        self.inner
             .iter()
-            .map(|(fkey, _)| {
-                let dest = self.make_staged_output_path(fkey);
-                let random_key = TeaclaveFile128Key::random();
+            .map(|inter_output| {
                 (
-                    fkey.to_string(),
-                    StagedFileInfo::new(dest, random_key, FileAuthTag::default()),
+                    inter_output.funiq_key.clone(),
+                    inter_output.staged_info.clone(),
                 )
             })
-            .collect();
-
-        Self {
-            staged_outputs,
-            ..self
-        }
-    }
-
-    pub(crate) fn prepare_staged_outputs(&self) -> Result<StagedFiles> {
-        anyhow::ensure!(
-            self.staged_outputs.len() == self.outputs.len(),
-            "Inconsistent between staged_outputs and outputs"
-        );
-        Ok(self.staged_outputs.clone())
+            .collect()
     }
 
-    pub(crate) fn convert_staged_outputs(&self) -> anyhow::Result<HashMap<String, FileAuthTag>> {
-        let mut outputs_tag: HashMap<String, FileAuthTag> = HashMap::new();
-
-        for (fkey, finfo) in self.outputs.iter() {
-            let dest = self.make_upload_output_path(fkey);
-            let staged_file = self
-                .staged_outputs
-                .get(fkey)
-                .ok_or_else(|| anyhow::anyhow!("Missing file in staged_output: {:?}", fkey))?;
-            let outfile = match finfo.crypto_info {
-                FileCrypto::TeaclaveFile128(crypto) => staged_file.convert_file(dest, crypto)?,
-
-                FileCrypto::AesGcm128(_) => {
-                    anyhow::bail!("OutputFile: unsupported type");
-                }
-                FileCrypto::AesGcm256(_) => {
-                    anyhow::bail!("OutputFile: unsupported type");
-                }
-                FileCrypto::Raw => {
-                    anyhow::bail!("OutputFile: unsupported type");
-                }
-            };
-            outputs_tag.insert(fkey.to_string(), outfile.cmac);
-        }
-        Ok(outputs_tag)
+    pub fn convert_staged_files_for_upload(&self) -> Result<HashMap<String, FileAuthTag>> {
+        self.inner
+            .iter()
+            .map(|inter_output| {
+                inter_output
+                    .convert_to_upload_file()
+                    .map(|cmac| (inter_output.funiq_key.clone(), cmac))
+            })
+            .collect()
     }
 
-    pub(crate) fn upload_outputs(&self) -> Result<()> {
-        let req_info = self.outputs.iter().map(|(fkey, value)| {
-            let local = self.make_upload_output_path(fkey);
-            HandleFileInfo::new(local, &value.url)
+    pub(crate) fn upload(&self) -> Result<()> {
+        let req_info = self.inner.iter().map(|inter_output| {
+            HandleFileInfo::new(&inter_output.upload_path, &inter_output.file.url)
         });
-
         let request = FileAgentRequest::new(HandleFileCommand::Upload, req_info);
-        log::info!("Ocall file upload request: {:?}", request);
+        log::info!("Ocall file download request: {:?}", request);
         handle_file_request(request)?;
         Ok(())
     }
 }
 
+// Staged file is put in $base_dir/${funiq_key}-staged/$original_name
+fn make_staged_path(base: impl AsRef<Path>, funiq_key: &str, url: &Url) -> Result<PathBuf> {
+    let url_path = url.path();
+    let original_name = Path::new(url_path)
+        .file_name()
+        .ok_or_else(|| anyhow::anyhow!("Cannot get filename from url: {:?}", url))?;
+
+    let staged_dir = format!("{}-{}", funiq_key, "staged");
+    let file_dir = base.as_ref().to_owned().join(&staged_dir);
+    if !file_dir.exists() {
+        std::untrusted::fs::create_dir_all(&file_dir)?;
+    }
+    let local_dest = file_dir.join(original_name);
+    Ok(local_dest)
+}
+
+// Intermediate file is converted to $base_dir/${funiq_key}/$original_name
+fn make_intermediate_path(base: impl AsRef<Path>, funiq_key: &str, url: &Url) -> Result<PathBuf> {
+    let url_path = url.path();
+    let original_name = Path::new(url_path)
+        .file_name()
+        .ok_or_else(|| anyhow::anyhow!("Cannot get filename from url: {:?}", url))?;
+
+    let file_dir = base.as_ref().to_owned().join(funiq_key);
+    if !file_dir.exists() {
+        std::untrusted::fs::create_dir_all(&file_dir)?;
+    }
+    let local_dest = file_dir.join(original_name);
+    Ok(local_dest)
+}
+
 #[cfg(feature = "enclave_unit_test")]
 pub mod tests {
     use super::*;
@@ -268,8 +335,7 @@ pub mod tests {
 
         let file_mgr =
             TaskFileManager::new("/tmp", &task_id, &inputs.into(), &outputs.into()).unwrap();
-        file_mgr.download_inputs().unwrap();
-        file_mgr.convert_downloaded_inputs().unwrap();
+        file_mgr.prepare_staged_inputs().unwrap();
         file_mgr.prepare_staged_outputs().unwrap();
     }
 }
diff --git a/services/management/enclave/src/service.rs b/services/management/enclave/src/service.rs
index 54c65b2..17f4c18 100644
--- a/services/management/enclave/src/service.rs
+++ b/services/management/enclave/src/service.rs
@@ -522,7 +522,7 @@ impl TeaclaveManagementService {
     pub fn create_fusion_data(&self, owner: impl Into<OwnerList>) -> Result<TeaclaveOutputFile> {
         let uuid = Uuid::new_v4();
         let url = format!(
-            "file://{}/{}",
+            "file://{}/{}.fusion",
             self.fusion_base_dir.display(),
             uuid.to_string()
         );
diff --git a/services/proto/src/proto/teaclave_common.proto b/services/proto/src/proto/teaclave_common.proto
index 97ca19a..4969227 100644
--- a/services/proto/src/proto/teaclave_common.proto
+++ b/services/proto/src/proto/teaclave_common.proto
@@ -14,7 +14,7 @@ message FileCryptoInfo {
 
 message TaskOutputs {
   bytes return_value = 1;
-  map<string, string> output_file_hash = 2;
+  map<string, string> tags_map = 2;
 }
 
 message TaskFailure {
diff --git a/services/proto/src/teaclave_common.rs b/services/proto/src/teaclave_common.rs
index d81ecc5..9675dd0 100644
--- a/services/proto/src/teaclave_common.rs
+++ b/services/proto/src/teaclave_common.rs
@@ -134,7 +134,7 @@ impl std::convert::TryFrom<proto::TaskOutputs> for TaskOutputs {
     fn try_from(proto: proto::TaskOutputs) -> Result<Self> {
         let ret = TaskOutputs {
             return_value: proto.return_value,
-            output_file_hash: proto.output_file_hash.try_into()?,
+            tags_map: proto.tags_map.try_into()?,
         };
         Ok(ret)
     }
@@ -143,7 +143,7 @@ impl std::convert::From<TaskOutputs> for proto::TaskOutputs {
     fn from(outputs: TaskOutputs) -> Self {
         proto::TaskOutputs {
             return_value: outputs.return_value,
-            output_file_hash: outputs.output_file_hash.into(),
+            tags_map: outputs.tags_map.into(),
         }
     }
 }
diff --git a/services/scheduler/enclave/Cargo.toml b/services/scheduler/enclave/Cargo.toml
index 69503d4..01e9445 100644
--- a/services/scheduler/enclave/Cargo.toml
+++ b/services/scheduler/enclave/Cargo.toml
@@ -33,6 +33,7 @@ serde_json    = { version = "1.0.39" }
 serde         = { version = "1.0.92", features = ["derive"] }
 thiserror     = { version = "1.0.9" }
 gbdt          = { version = "0.1.0", features = ["input", "enable_training"] }
+uuid          = { version = "0.8.1", features = ["v4"] }
 
 teaclave_attestation           = { path = "../../../attestation" }
 teaclave_config                = { path = "../../../config" }
diff --git a/services/scheduler/enclave/src/service.rs b/services/scheduler/enclave/src/service.rs
index 49baf50..bad3535 100644
--- a/services/scheduler/enclave/src/service.rs
+++ b/services/scheduler/enclave/src/service.rs
@@ -23,15 +23,17 @@ use std::collections::VecDeque;
 use std::prelude::v1::*;
 use std::sync::{Arc, SgxMutex as Mutex};
 
+use std::collections::HashMap;
 use teaclave_proto::teaclave_scheduler_service::*;
 use teaclave_proto::teaclave_storage_service::*;
 use teaclave_rpc::endpoint::Endpoint;
 use teaclave_rpc::Request;
 use teaclave_service_enclave_utils::teaclave_service;
 use teaclave_types::{
-    StagedTask, Storable, Task, TaskStatus, TeaclaveServiceResponseError,
-    TeaclaveServiceResponseResult,
+    ExternalID, OutputsTags, StagedTask, Storable, Task, TaskResult, TaskStatus,
+    TeaclaveOutputFile, TeaclaveServiceResponseError, TeaclaveServiceResponseResult,
 };
+use uuid::Uuid;
 
 use anyhow::anyhow;
 use anyhow::Result;
@@ -96,19 +98,24 @@ impl TeaclaveSchedulerService {
             .map_err(|_| TeaclaveSchedulerError::DataError.into())
     }
 
-    fn get_task(&self, key: &str) -> Result<Task> {
-        let key = format!("{}-{}", Task::key_prefix(), key);
-        let get_request = GetRequest::new(key.into_bytes());
-        let get_response = self
+    fn get_task(&self, task_id: &Uuid) -> Result<Task> {
+        let key = ExternalID::new(Task::key_prefix(), task_id.to_owned());
+        self.get_from_db(&key)
+    }
+
+    fn get_from_db<T: Storable>(&self, key: &ExternalID) -> Result<T> {
+        anyhow::ensure!(T::match_prefix(&key.prefix), "Key prefix doesn't match.");
+        let get_request = GetRequest::new(key.to_bytes());
+        let response = self
             .storage_client
             .clone()
             .lock()
             .map_err(|_| anyhow!("Cannot lock storage client"))?
             .get(get_request)?;
-        Task::from_slice(get_response.value.as_slice())
+        T::from_slice(response.value.as_slice())
     }
 
-    fn put_task(&self, item: &impl Storable) -> Result<()> {
+    fn put_into_db(&self, item: &impl Storable) -> Result<()> {
         let k = item.key();
         let v = item.to_vec()?;
         let put_request = PutRequest::new(k.as_slice(), v.as_slice());
@@ -120,6 +127,26 @@ impl TeaclaveSchedulerService {
             .put(put_request)?;
         Ok(())
     }
+
+    fn update_outputs_cmac(
+        &self,
+        task_output_map: &HashMap<String, ExternalID>,
+        tags_map: &OutputsTags,
+    ) -> Result<()> {
+        anyhow::ensure!(
+            task_output_map.len() == tags_map.len(),
+            "Error: task result output tags count"
+        );
+        for (key, id) in task_output_map.iter() {
+            let mut outfile: TeaclaveOutputFile = self.get_from_db(id)?;
+            let auth_tag = tags_map
+                .get(key)
+                .ok_or_else(|| anyhow::anyhow!("Missing result in task result outpt tags"))?;
+            outfile.assign_cmac(auth_tag)?;
+            self.put_into_db(&outfile)?;
+        }
+        Ok(())
+    }
 }
 
 impl TeaclaveScheduler for TeaclaveSchedulerService {
@@ -162,10 +189,10 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
         request: Request<UpdateTaskStatusRequest>,
     ) -> TeaclaveServiceResponseResult<UpdateTaskStatusResponse> {
         let request = request.message;
-        let mut task = self.get_task(&request.task_id.to_string())?;
+        let mut task = self.get_task(&request.task_id)?;
         task.status = request.task_status;
         log::info!("UpdateTaskStatus: Task {:?}", task);
-        self.put_task(&task)?;
+        self.put_into_db(&task)?;
         Ok(UpdateTaskStatusResponse {})
     }
 
@@ -174,13 +201,18 @@ impl TeaclaveScheduler for TeaclaveSchedulerService {
         request: Request<UpdateTaskResultRequest>,
     ) -> TeaclaveServiceResponseResult<UpdateTaskResultResponse> {
         let request = request.message;
-        let mut task = self.get_task(&request.task_id.to_string())?;
+        let mut task = self.get_task(&request.task_id)?;
+
+        if let TaskResult::Ok(outputs) = &request.task_result {
+            self.update_outputs_cmac(&task.output_map, &outputs.tags_map)?;
+        };
+
         task.result = request.task_result;
 
         // Updating task result means we have finished execution
         task.status = TaskStatus::Finished;
+        self.put_into_db(&task)?;
 
-        self.put_task(&task)?;
         Ok(UpdateTaskResultResponse {})
     }
 }
diff --git a/tests/functional/enclave/src/end_to_end/mesapy_data_fusion.rs b/tests/functional/enclave/src/end_to_end/mesapy_data_fusion.rs
index 27538c2..327ffd5 100644
--- a/tests/functional/enclave/src/end_to_end/mesapy_data_fusion.rs
+++ b/tests/functional/enclave/src/end_to_end/mesapy_data_fusion.rs
@@ -31,7 +31,7 @@ fn setup_client() -> anyhow::Result<(TeaclaveFrontendClient, TeaclaveFrontendCli
     Ok((client1, client2))
 }
 
-fn register_function(client: &mut TeaclaveFrontendClient) -> ExternalID {
+fn register_data_fusion_function(client: &mut TeaclaveFrontendClient) -> ExternalID {
     let script = r#"
 def readlines(fid):
     lines = None
@@ -91,7 +91,10 @@ fn register_fusion_output(
     response.data_id
 }
 
-fn create_task(client: &mut TeaclaveFrontendClient, function_id: &ExternalID) -> ExternalID {
+fn create_data_fusion_task(
+    client: &mut TeaclaveFrontendClient,
+    function_id: &ExternalID,
+) -> ExternalID {
     let request = CreateTaskRequest::new()
         .function_id(function_id.to_owned())
         .input_owners_map(hashmap!(
@@ -120,10 +123,10 @@ fn assign_data_for_task(
 pub fn test_data_fusion_success() {
     let (mut c1, mut c2) = setup_client().unwrap();
 
-    let function_id = register_function(&mut c1);
+    let function_id = register_data_fusion_function(&mut c1);
 
     // Create Task
-    let task_id = create_task(&mut c1, &function_id);
+    let task_id = create_data_fusion_task(&mut c1, &function_id);
 
     // Register Data and Assign Data To Task
     // input1 is owned by user1
@@ -159,13 +162,100 @@ pub fn test_data_fusion_success() {
     );
 
     // Approve Task
-    approve_task(&mut c1, &task_id);
-    approve_task(&mut c2, &task_id);
+    approve_task(&mut c1, &task_id).unwrap();
+    approve_task(&mut c2, &task_id).unwrap();
 
     // Invoke Task by the creator
-    invoke_task(&mut c1, &task_id);
+    invoke_task(&mut c1, &task_id).unwrap();
 
     // Get Task
     let ret_val = get_task_until(&mut c1, &task_id, TaskStatus::Finished);
     assert_eq!(&ret_val, "Mixed 5 lines of data");
+
+    let task = get_task(&mut c2, &task_id);
+    assert!(task.status == TaskStatus::Finished);
+
+    let fusion_id = task.output_map.get("OutFusionData").unwrap();
+    let fusion_owners = task.output_owners_map.get("OutFusionData").unwrap();
+
+    let fusion_input = register_fusion_input_from_output(&mut c2, &fusion_id);
+    let function_id = register_word_count_function(&mut c2);
+
+    let task_id = create_wlc_task(&mut c2, &function_id, &fusion_owners);
+    assign_data_for_task(
+        &mut c2,
+        &task_id,
+        hashmap!("InputData" => fusion_input),
+        hashmap!(),
+    );
+
+    approve_task(&mut c2, &task_id).unwrap();
+
+    // Invoke Task by the creator
+    assert!(invoke_task(&mut c2, &task_id).is_err());
+
+    approve_task(&mut c1, &task_id).unwrap();
+    invoke_task(&mut c2, &task_id).unwrap();
+    let ret_val = get_task_until(&mut c2, &task_id, TaskStatus::Finished);
+    assert_eq!(&ret_val, "2");
+}
+
+fn register_fusion_input_from_output(
+    client: &mut TeaclaveFrontendClient,
+    fusion_id: &ExternalID,
+) -> ExternalID {
+    let request = RegisterInputFromOutputRequest::new(fusion_id.clone());
+    let response = client.register_input_from_output(request).unwrap();
+    response.data_id
+}
+
+fn register_word_count_function(client: &mut TeaclaveFrontendClient) -> ExternalID {
+    let script = r#"
+def readlines(fid):
+    lines = None
+    with teaclave_open(fid, "rb") as f:
+        lines = f.readlines()
+    return lines
+
+def entrypoint(argv):
+    fid = "InputData"
+    assert len(argv) == 2
+    assert argv[0] == "query"
+    word = argv[1]
+    cnt = 0
+    for line in readlines(fid):
+        if word in line:
+            cnt += 1
+    return "%s" % cnt
+"#;
+
+    let input_spec = FunctionInput::new("InputData", "Lines of Data");
+    let request = RegisterFunctionRequest::new()
+        .name("wlc")
+        .description("Mesapy Word Line Count Function")
+        .arguments(vec!["query"])
+        .payload(script.into())
+        .executor_type(ExecutorType::Python)
+        .public(true)
+        .inputs(vec![input_spec]);
+    let response = client.register_function(request).unwrap();
+    log::info!("Resgister function: {:?}", response);
+    response.function_id
+}
+
+fn create_wlc_task(
+    client: &mut TeaclaveFrontendClient,
+    function_id: &ExternalID,
+    owners: &OwnerList,
+) -> ExternalID {
+    let request = CreateTaskRequest::new()
+        .function_id(function_id.to_owned())
+        .function_arguments(hashmap!("query" => "teaclave"))
+        .input_owners_map(hashmap!(
+            "InputData" => owners.to_owned()
+        ))
+        .executor(Executor::MesaPy);
+    let response = client.create_task(request).unwrap();
+    log::info!("Create task: {:?}", response);
+    response.task_id
 }
diff --git a/tests/functional/enclave/src/end_to_end/mesapy_echo.rs b/tests/functional/enclave/src/end_to_end/mesapy_echo.rs
index 420dd40..749931d 100644
--- a/tests/functional/enclave/src/end_to_end/mesapy_echo.rs
+++ b/tests/functional/enclave/src/end_to_end/mesapy_echo.rs
@@ -65,10 +65,10 @@ def entrypoint(argv):
     log::info!("Assign data: {:?}", response);
 
     // Approve Task
-    approve_task(&mut client, &task_id);
+    approve_task(&mut client, &task_id).unwrap();
 
     // Invoke Task
-    invoke_task(&mut client, &task_id);
+    invoke_task(&mut client, &task_id).unwrap();
 
     // Get Task
     let ret_val = get_task_until(&mut client, &task_id, TaskStatus::Finished);
diff --git a/tests/functional/enclave/src/end_to_end/mod.rs b/tests/functional/enclave/src/end_to_end/mod.rs
index eb807ea..64a7020 100644
--- a/tests/functional/enclave/src/end_to_end/mod.rs
+++ b/tests/functional/enclave/src/end_to_end/mod.rs
@@ -26,6 +26,13 @@ mod mesapy_echo;
 mod native_echo;
 mod native_gbdt_training;
 
+fn get_task(client: &mut TeaclaveFrontendClient, task_id: &ExternalID) -> GetTaskResponse {
+    let request = GetTaskRequest::new(task_id.clone());
+    let response = client.get_task(request).unwrap();
+    log::info!("Get task: {:?}", response);
+    response
+}
+
 fn get_task_until(
     client: &mut TeaclaveFrontendClient,
     task_id: &ExternalID,
@@ -55,14 +62,16 @@ fn get_task_until(
     }
 }
 
-fn approve_task(client: &mut TeaclaveFrontendClient, task_id: &ExternalID) {
+fn approve_task(client: &mut TeaclaveFrontendClient, task_id: &ExternalID) -> anyhow::Result<()> {
     let request = ApproveTaskRequest::new(task_id.clone());
-    let response = client.approve_task(request).unwrap();
+    let response = client.approve_task(request)?;
     log::info!("Approve task: {:?}", response);
+    Ok(())
 }
 
-fn invoke_task(client: &mut TeaclaveFrontendClient, task_id: &ExternalID) {
+fn invoke_task(client: &mut TeaclaveFrontendClient, task_id: &ExternalID) -> anyhow::Result<()> {
     let request = InvokeTaskRequest::new(task_id.clone());
-    let response = client.invoke_task(request).unwrap();
+    let response = client.invoke_task(request)?;
     log::info!("Invoke task: {:?}", response);
+    Ok(())
 }
diff --git a/tests/functional/enclave/src/end_to_end/native_echo.rs b/tests/functional/enclave/src/end_to_end/native_echo.rs
index 73c2090..8864559 100644
--- a/tests/functional/enclave/src/end_to_end/native_echo.rs
+++ b/tests/functional/enclave/src/end_to_end/native_echo.rs
@@ -58,10 +58,10 @@ pub fn test_echo_task_success() {
     log::info!("Assign data: {:?}", response);
 
     // Approve Task
-    approve_task(&mut client, &task_id);
+    approve_task(&mut client, &task_id).unwrap();
 
     // Invoke Task
-    invoke_task(&mut client, &task_id);
+    invoke_task(&mut client, &task_id).unwrap();
 
     // Get Task
     let ret_val = get_task_until(&mut client, &task_id, TaskStatus::Finished);
diff --git a/tests/functional/enclave/src/end_to_end/native_gbdt_training.rs b/tests/functional/enclave/src/end_to_end/native_gbdt_training.rs
index 5407668..3747596 100644
--- a/tests/functional/enclave/src/end_to_end/native_gbdt_training.rs
+++ b/tests/functional/enclave/src/end_to_end/native_gbdt_training.rs
@@ -30,8 +30,8 @@ pub fn test_gbdt_training_task() {
 
     let task_id = create_gbdt_training_task(&mut client, &function_id);
     assign_data_to_task(&mut client, &task_id, training_data_id, output_model_id);
-    approve_task(&mut client, &task_id);
-    invoke_task(&mut client, &task_id);
+    approve_task(&mut client, &task_id).unwrap();
+    invoke_task(&mut client, &task_id).unwrap();
 
     let ret_val = get_task_until(&mut client, &task_id, TaskStatus::Finished);
     assert_eq!(&ret_val, "Trained 120 lines of data.");
diff --git a/tests/functional/enclave/src/scheduler_service.rs b/tests/functional/enclave/src/scheduler_service.rs
index 82a0034..be052ae 100644
--- a/tests/functional/enclave/src/scheduler_service.rs
+++ b/tests/functional/enclave/src/scheduler_service.rs
@@ -79,11 +79,11 @@ fn test_update_task_status_result() {
     log::debug!("response: {:?}", response);
     let task_id = response.staged_task.task_id;
 
-    let request = UpdateTaskStatusRequest::new(task_id, TaskStatus::Finished, String::new());
+    let request = UpdateTaskStatusRequest::new(task_id, TaskStatus::Running, String::new());
     let response = client.update_task_status(request);
     assert!(response.is_ok());
 
-    let task_outputs = TaskOutputs::new("return", hashmap!());
+    let task_outputs = TaskOutputs::new("return value", hashmap!());
     let request = UpdateTaskResultRequest::new(task_id, Ok(task_outputs));
     let response = client.update_task_result(request);
 
diff --git a/types/src/file.rs b/types/src/file.rs
index 35fece2..c2185a1 100644
--- a/types/src/file.rs
+++ b/types/src/file.rs
@@ -102,6 +102,12 @@ impl TeaclaveOutputFile {
             uuid: create_uuid(),
         }
     }
+
+    pub fn assign_cmac(&mut self, cmac: &FileAuthTag) -> Result<()> {
+        anyhow::ensure!(self.cmac.is_none(), "Cannot overwrite output file cmac");
+        self.cmac = Some(cmac.to_owned());
+        Ok(())
+    }
 }
 
 impl Storable for TeaclaveOutputFile {
diff --git a/types/src/staged_task.rs b/types/src/staged_task.rs
index 42aad04..c2bf26e 100644
--- a/types/src/staged_task.rs
+++ b/types/src/staged_task.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::hash_map::{Iter, IterMut};
+use std::collections::hash_map::{IntoIter, Iter, IterMut};
 use std::collections::HashMap;
 use std::prelude::v1::*;
 
@@ -45,6 +45,15 @@ impl FunctionInputFiles {
     }
 }
 
+impl IntoIterator for FunctionInputFiles {
+    type Item = (String, FunctionInputFile);
+    type IntoIter = IntoIter<String, FunctionInputFile>;
+
+    fn into_iter(self) -> IntoIter<String, FunctionInputFile> {
+        self.inner.into_iter()
+    }
+}
+
 impl std::convert::From<HashMap<String, FunctionInputFile>> for FunctionInputFiles {
     fn from(entries: HashMap<String, FunctionInputFile>) -> FunctionInputFiles {
         FunctionInputFiles { inner: entries }
@@ -56,6 +65,15 @@ pub struct FunctionOutputFiles {
     inner: HashMap<String, FunctionOutputFile>,
 }
 
+impl IntoIterator for FunctionOutputFiles {
+    type Item = (String, FunctionOutputFile);
+    type IntoIter = IntoIter<String, FunctionOutputFile>;
+
+    fn into_iter(self) -> IntoIter<String, FunctionOutputFile> {
+        self.inner.into_iter()
+    }
+}
+
 impl FunctionOutputFiles {
     pub fn new(entries: HashMap<String, FunctionOutputFile>) -> Self {
         entries.into()
diff --git a/types/src/task.rs b/types/src/task.rs
index 29d747a..f23a24d 100644
--- a/types/src/task.rs
+++ b/types/src/task.rs
@@ -154,6 +154,18 @@ impl OutputsTags {
     pub fn iter(&self) -> Iter<String, FileAuthTag> {
         self.inner.iter()
     }
+
+    pub fn get(&self, key: &str) -> Option<&FileAuthTag> {
+        self.inner.get(key)
+    }
+
+    pub fn len(&self) -> usize {
+        self.inner.len()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
 }
 
 impl std::convert::TryFrom<HashMap<String, String>> for OutputsTags {
@@ -189,14 +201,14 @@ impl std::iter::FromIterator<(String, FileAuthTag)> for OutputsTags {
 #[derive(Debug, Deserialize, Serialize)]
 pub struct TaskOutputs {
     pub return_value: Vec<u8>,
-    pub output_file_hash: OutputsTags,
+    pub tags_map: OutputsTags,
 }
 
 impl TaskOutputs {
-    pub fn new(value: impl Into<Vec<u8>>, output_file_hash: HashMap<String, FileAuthTag>) -> Self {
+    pub fn new(value: impl Into<Vec<u8>>, tags_map: HashMap<String, FileAuthTag>) -> Self {
         TaskOutputs {
             return_value: value.into(),
-            output_file_hash: OutputsTags::new(output_file_hash),
+            tags_map: OutputsTags::new(tags_map),
         }
     }
 }
@@ -281,8 +293,15 @@ pub enum TaskResult {
     Err(TaskFailure),
 }
 
-#[cfg(test_mode)]
 impl TaskResult {
+    pub fn is_ok(&self) -> bool {
+        match self {
+            TaskResult::Ok(_) => true,
+            _ => false,
+        }
+    }
+
+    #[cfg(test_mode)]
     pub fn unwrap(self) -> TaskOutputs {
         match self {
             TaskResult::Ok(t) => t,
diff --git a/types/src/worker.rs b/types/src/worker.rs
index 6581d42..eebe592 100644
--- a/types/src/worker.rs
+++ b/types/src/worker.rs
@@ -146,7 +146,7 @@ pub struct WorkerCapability {
 #[derive(Debug, Default)]
 pub struct ExecutionResult {
     pub return_value: Vec<u8>,
-    pub output_file_hash: OutputsTags,
+    pub tags_map: OutputsTags,
 }
 
 #[cfg(feature = "enclave_unit_test")]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@teaclave.apache.org
For additional commands, e-mail: commits-help@teaclave.apache.org