You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@teaclave.apache.org by ms...@apache.org on 2020/05/09 18:37:35 UTC

[incubator-teaclave] branch master updated: [fusion] Optimize fusion_base dir logic (#291)

This is an automated email from the ASF dual-hosted git repository.

mssun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git


The following commit(s) were added to refs/heads/master by this push:
     new 3060efd  [fusion] Optimize fusion_base dir logic (#291)
3060efd is described below

commit 3060efdc5bfc857b5e92e86526940b735bb8fca9
Author: Zhaofeng Chen <zf...@apache.org>
AuthorDate: Sat May 9 11:37:28 2020 -0700

    [fusion] Optimize fusion_base dir logic (#291)
---
 file_agent/src/agent.rs                            | 84 +++++++++++++++++-----
 services/execution/enclave/src/lib.rs              |  3 +-
 services/execution/enclave/src/ocall.rs            |  3 +-
 services/execution/enclave/src/service.rs          | 11 ++-
 .../execution/enclave/src/task_file_manager.rs     | 56 +++++++++------
 services/management/enclave/src/lib.rs             |  5 +-
 services/management/enclave/src/service.rs         | 18 +----
 types/src/file_agent.rs                            | 10 ++-
 8 files changed, 129 insertions(+), 61 deletions(-)

diff --git a/file_agent/src/agent.rs b/file_agent/src/agent.rs
index 98f8621..e366a71 100644
--- a/file_agent/src/agent.rs
+++ b/file_agent/src/agent.rs
@@ -21,6 +21,7 @@ use tokio::io::AsyncWriteExt;
 use tokio_util::codec;
 use url::Url;
 
+use std::path::{Component, Path, PathBuf};
 use teaclave_types::{FileAgentRequest, HandleFileCommand, HandleFileInfo};
 
 async fn download_remote_input_to_file(
@@ -79,23 +80,48 @@ async fn upload_output_file_to_remote(
     }
 }
 
-async fn handle_download(info: HandleFileInfo) -> anyhow::Result<()> {
+async fn handle_download(
+    info: HandleFileInfo,
+    fusion_base: impl AsRef<Path>,
+) -> anyhow::Result<()> {
     anyhow::ensure!(
         !info.local.exists(),
         "[Download] Dest local file: {:?} already exists.",
         info.local
     );
     let dst = info.local;
+    let remote = info.remote;
 
-    match info.remote.scheme() {
+    match remote.scheme() {
         "https" | "http" => {
-            download_remote_input_to_file(info.remote, dst).await?;
+            download_remote_input_to_file(remote, dst).await?;
         }
         "file" => {
-            let src = info
-                .remote
+            let src = remote
                 .to_file_path()
-                .map_err(|e| anyhow::anyhow!("Cannot convert to path: {:?}", e))?;
+                .map_err(|e| anyhow::anyhow!("Cannot convert file:// to path: {:?}", e))?;
+            anyhow::ensure!(
+                src.exists(),
+                "[Download] Src local file: {:?} doesn't exist.",
+                src
+            );
+            copy_file(src, dst).await?;
+        }
+        "fusion" => {
+            let path = remote
+                .to_file_path()
+                .map_err(|e| anyhow::anyhow!("Cannot convert fusion:// to path: {:?}", e))?;
+            let components = path.components().collect::<Vec<_>>();
+            anyhow::ensure!(
+                (components[0] == Component::RootDir)
+                    && (components[1] == Component::Normal("TEACLAVE_FUSION_BASE".as_ref())),
+                "[Download] Fusion data format error: {:?}",
+                components
+            );
+
+            let relative_path: PathBuf = components[2..].iter().collect();
+            let src: PathBuf = fusion_base.as_ref().join(relative_path);
+
             anyhow::ensure!(
                 src.exists(),
                 "[Download] Src local file: {:?} doesn't exist.",
@@ -108,7 +134,7 @@ async fn handle_download(info: HandleFileInfo) -> anyhow::Result<()> {
     Ok(())
 }
 
-async fn handle_upload(info: HandleFileInfo) -> anyhow::Result<()> {
+async fn handle_upload(info: HandleFileInfo, fusion_base: impl AsRef<Path>) -> anyhow::Result<()> {
     anyhow::ensure!(
         info.local.exists(),
         "[Upload] Src local file: {:?} doesn't exist.",
@@ -125,9 +151,28 @@ async fn handle_upload(info: HandleFileInfo) -> anyhow::Result<()> {
                 .remote
                 .to_file_path()
                 .map_err(|e| anyhow::anyhow!("Cannot convert to path: {:?}", e))?;
+            anyhow::ensure!(!dst.exists(), "[Upload] Dest local file: {:?} exist.", dst);
+            copy_file(src, dst).await?;
+        }
+        "fusion" => {
+            let path = info
+                .remote
+                .to_file_path()
+                .map_err(|e| anyhow::anyhow!("Cannot convert fusion:// to path: {:?}", e))?;
+            let components = path.components().collect::<Vec<_>>();
+            anyhow::ensure!(
+                (components[0] == Component::RootDir)
+                    && (components[1] == Component::Normal("TEACLAVE_FUSION_BASE".as_ref())),
+                "[Upload] Fusion data format error: {:?}",
+                components
+            );
+
+            let relative_path: PathBuf = components[2..].iter().collect();
+            let dst: PathBuf = fusion_base.as_ref().join(relative_path);
+
             anyhow::ensure!(
                 !dst.exists(),
-                "[Download] Dest local file: {:?} already exist.",
+                "[Upload] Dest fusion file: {:?} exists.",
                 dst
             );
             copy_file(src, dst).await?;
@@ -144,12 +189,16 @@ fn handle_file_request(bytes: &[u8]) -> anyhow::Result<()> {
         .enable_all()
         .build()?
         .block_on(async {
+            let fusion_base = req.fusion_base.clone();
             match req.cmd {
                 HandleFileCommand::Download => {
                     let futures: Vec<_> = req
                         .info
                         .into_iter()
-                        .map(|info| tokio::spawn(async { handle_download(info).await }))
+                        .map(|info| {
+                            let fusion_base = fusion_base.clone();
+                            tokio::spawn(async { handle_download(info, fusion_base).await })
+                        })
                         .collect();
                     join_all(futures).await
                 }
@@ -157,7 +206,10 @@ fn handle_file_request(bytes: &[u8]) -> anyhow::Result<()> {
                     let futures: Vec<_> = req
                         .info
                         .into_iter()
-                        .map(|info| tokio::spawn(async { handle_upload(info).await }))
+                        .map(|info| {
+                            let fusion_base = fusion_base.clone();
+                            tokio::spawn(async { handle_upload(info, fusion_base).await })
+                        })
                         .collect();
                     join_all(futures).await
                 }
@@ -214,7 +266,7 @@ mod tests {
         let dest = PathBuf::from("/tmp/input_test.txt");
 
         let info = HandleFileInfo::new(&dest, &url);
-        let req = FileAgentRequest::new(HandleFileCommand::Download, vec![info]);
+        let req = FileAgentRequest::new(HandleFileCommand::Download, vec![info], "");
 
         let bytes = serde_json::to_vec(&req).unwrap();
         handle_file_request(&bytes).unwrap();
@@ -234,7 +286,7 @@ mod tests {
         let url = Url::parse(s).unwrap();
 
         let info = HandleFileInfo::new(&src, &url);
-        let req = FileAgentRequest::new(HandleFileCommand::Upload, vec![info]);
+        let req = FileAgentRequest::new(HandleFileCommand::Upload, vec![info], "");
 
         let bytes = serde_json::to_vec(&req).unwrap();
         handle_file_request(&bytes).unwrap();
@@ -255,7 +307,7 @@ mod tests {
             .iter()
             .map(|fname| HandleFileInfo::new(base.join(fname), &url))
             .collect();
-        let req = FileAgentRequest::new(HandleFileCommand::Download, info_list);
+        let req = FileAgentRequest::new(HandleFileCommand::Download, info_list, "");
 
         let bytes = serde_json::to_vec(&req).unwrap();
         handle_file_request(&bytes).unwrap();
@@ -284,7 +336,7 @@ mod tests {
             })
             .collect();
 
-        let req = FileAgentRequest::new(HandleFileCommand::Upload, info_list);
+        let req = FileAgentRequest::new(HandleFileCommand::Upload, info_list, "");
 
         let bytes = serde_json::to_vec(&req).unwrap();
         handle_file_request(&bytes).unwrap();
@@ -309,7 +361,7 @@ mod tests {
         let url = Url::parse(&s).unwrap();
 
         let info = HandleFileInfo::new(&src, &url);
-        let req = FileAgentRequest::new(HandleFileCommand::Upload, vec![info]);
+        let req = FileAgentRequest::new(HandleFileCommand::Upload, vec![info], "");
 
         let bytes = serde_json::to_vec(&req).unwrap();
         handle_file_request(&bytes).unwrap();
@@ -317,7 +369,7 @@ mod tests {
         // test local download
         let dest = base.join("d2.txt");
         let info = HandleFileInfo::new(&dest, &url);
-        let req = FileAgentRequest::new(HandleFileCommand::Download, vec![info]);
+        let req = FileAgentRequest::new(HandleFileCommand::Download, vec![info], "");
 
         let bytes = serde_json::to_vec(&req).unwrap();
         handle_file_request(&bytes).unwrap();
diff --git a/services/execution/enclave/src/lib.rs b/services/execution/enclave/src/lib.rs
index 0903090..77487d3 100644
--- a/services/execution/enclave/src/lib.rs
+++ b/services/execution/enclave/src/lib.rs
@@ -69,7 +69,8 @@ fn start_service(config: &RuntimeConfig) -> Result<()> {
         fusion_base.display()
     );
 
-    let mut service = service::TeaclaveExecutionService::new(scheduler_service_endpoint)?;
+    let mut service =
+        service::TeaclaveExecutionService::new(scheduler_service_endpoint, fusion_base)?;
     let _ = service.start();
 
     Ok(())
diff --git a/services/execution/enclave/src/ocall.rs b/services/execution/enclave/src/ocall.rs
index ffbc774..d06aa82 100644
--- a/services/execution/enclave/src/ocall.rs
+++ b/services/execution/enclave/src/ocall.rs
@@ -64,7 +64,8 @@ pub mod tests {
         let dest = PathBuf::from("/tmp/execution_input_test.txt");
 
         let info = HandleFileInfo::new(&dest, &url);
-        let request = FileAgentRequest::new(HandleFileCommand::Download, vec![info]);
+        let request =
+            FileAgentRequest::new(HandleFileCommand::Download, vec![info], "/tmp/fusion_data");
 
         handle_file_request(request).unwrap();
         std::untrusted::fs::remove_file(&dest).unwrap();
diff --git a/services/execution/enclave/src/service.rs b/services/execution/enclave/src/service.rs
index a0e419d..18ce902 100644
--- a/services/execution/enclave/src/service.rs
+++ b/services/execution/enclave/src/service.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use std::collections::HashMap;
+use std::path::{Path, PathBuf};
 use std::prelude::v1::*;
 use std::sync::{Arc, SgxMutex as Mutex};
 
@@ -34,10 +35,14 @@ static WORKER_BASE_DIR: &str = "/tmp/teaclave_agent/";
 pub(crate) struct TeaclaveExecutionService {
     worker: Arc<Worker>,
     scheduler_client: Arc<Mutex<TeaclaveSchedulerClient>>,
+    fusion_base: PathBuf,
 }
 
 impl TeaclaveExecutionService {
-    pub(crate) fn new(scheduler_service_endpoint: Endpoint) -> Result<Self> {
+    pub(crate) fn new(
+        scheduler_service_endpoint: Endpoint,
+        fusion_base: impl AsRef<Path>,
+    ) -> Result<Self> {
         let mut i = 0;
         let channel = loop {
             match scheduler_service_endpoint.connect() {
@@ -55,6 +60,7 @@ impl TeaclaveExecutionService {
         Ok(TeaclaveExecutionService {
             worker: Arc::new(Worker::default()),
             scheduler_client,
+            fusion_base: fusion_base.as_ref().to_owned(),
         })
     }
 
@@ -101,6 +107,7 @@ impl TeaclaveExecutionService {
 
         let file_mgr = TaskFileManager::new(
             WORKER_BASE_DIR,
+            &self.fusion_base,
             &task.task_id,
             &task.input_data,
             &task.output_data,
@@ -185,6 +192,7 @@ pub mod tests {
 
         let file_mgr = TaskFileManager::new(
             WORKER_BASE_DIR,
+            "/tmp/fusion_base",
             &staged_task.task_id,
             &staged_task.input_data,
             &staged_task.output_data,
@@ -242,6 +250,7 @@ pub mod tests {
 
         let file_mgr = TaskFileManager::new(
             WORKER_BASE_DIR,
+            "/tmp/fusion_base",
             &staged_task.task_id,
             &staged_task.input_data,
             &staged_task.output_data,
diff --git a/services/execution/enclave/src/task_file_manager.rs b/services/execution/enclave/src/task_file_manager.rs
index 6e27dda..908cbaf 100644
--- a/services/execution/enclave/src/task_file_manager.rs
+++ b/services/execution/enclave/src/task_file_manager.rs
@@ -30,6 +30,7 @@ use uuid::Uuid;
 pub(crate) struct TaskFileManager {
     inter_inputs: InterInputs,
     inter_outputs: InterOutputs,
+    fusion_base: PathBuf,
 }
 
 struct InterInputs {
@@ -56,12 +57,13 @@ pub(self) struct InterOutput {
 
 impl TaskFileManager {
     pub(crate) fn new(
-        base: &str,
+        inter_base: impl AsRef<Path>,
+        fusion_base: impl AsRef<Path>,
         task_id: &Uuid,
         inputs: &FunctionInputFiles,
         outputs: &FunctionOutputFiles,
     ) -> Result<Self> {
-        let cwd = Path::new(base).join(task_id.to_string());
+        let cwd = Path::new(inter_base.as_ref()).join(task_id.to_string());
         let inputs_base = cwd.join("inputs");
         let outputs_base = cwd.join("outputs");
 
@@ -71,13 +73,14 @@ impl TaskFileManager {
         let tfmgr = TaskFileManager {
             inter_inputs,
             inter_outputs,
+            fusion_base: fusion_base.as_ref().to_owned(),
         };
 
         Ok(tfmgr)
     }
 
     pub(crate) fn prepare_staged_inputs(&self) -> Result<StagedFiles> {
-        self.inter_inputs.download()?;
+        self.inter_inputs.download(&self.fusion_base)?;
         self.inter_inputs.convert_to_staged_files()
     }
 
@@ -88,19 +91,19 @@ impl TaskFileManager {
 
     pub(crate) fn upload_outputs(&self) -> Result<HashMap<String, FileAuthTag>> {
         let auth_tags = self.inter_outputs.convert_staged_files_for_upload()?;
-        self.inter_outputs.upload()?;
+        self.inter_outputs.upload(&self.fusion_base)?;
         Ok(auth_tags)
     }
 }
 
 impl InterInput {
     fn new(
-        base: impl AsRef<Path>,
+        inter_base: impl AsRef<Path>,
         funiq_key: String,
         file: FunctionInputFile,
     ) -> Result<InterInput> {
-        let download_path = make_intermediate_path(base.as_ref(), &funiq_key, &file.url)?;
-        let staged_path = make_staged_path(base.as_ref(), &funiq_key, &file.url)?;
+        let download_path = make_intermediate_path(inter_base.as_ref(), &funiq_key, &file.url)?;
+        let staged_path = make_staged_path(inter_base.as_ref(), &funiq_key, &file.url)?;
 
         Ok(InterInput {
             funiq_key,
@@ -168,18 +171,19 @@ impl std::iter::FromIterator<InterInput> for InterInputs {
 }
 
 impl InterInputs {
-    pub fn new(base: impl AsRef<Path>, inputs: FunctionInputFiles) -> Result<InterInputs> {
+    pub fn new(input_base: impl AsRef<Path>, inputs: FunctionInputFiles) -> Result<InterInputs> {
         inputs
             .into_iter()
-            .map(|(funiq_key, file)| InterInput::new(base.as_ref(), funiq_key, file))
+            .map(|(funiq_key, file)| InterInput::new(input_base.as_ref(), funiq_key, file))
             .collect()
     }
 
-    pub(crate) fn download(&self) -> Result<()> {
+    pub(crate) fn download(&self, fusion_base: impl AsRef<Path>) -> Result<()> {
         let req_info = self.inner.iter().map(|inter_input| {
             HandleFileInfo::new(&inter_input.download_path, &inter_input.file.url)
         });
-        let request = FileAgentRequest::new(HandleFileCommand::Download, req_info);
+        let request =
+            FileAgentRequest::new(HandleFileCommand::Download, req_info, fusion_base.as_ref());
         log::info!("Ocall file download request: {:?}", request);
         handle_file_request(request)?;
         Ok(())
@@ -203,12 +207,12 @@ impl std::iter::FromIterator<InterOutput> for InterOutputs {
 
 impl InterOutput {
     pub fn new(
-        base: impl AsRef<Path>,
+        inter_base: impl AsRef<Path>,
         funiq_key: String,
         file: FunctionOutputFile,
     ) -> Result<InterOutput> {
-        let upload_path = make_intermediate_path(base.as_ref(), &funiq_key, &file.url)?;
-        let staged_path = make_staged_path(base.as_ref(), &funiq_key, &file.url)?;
+        let upload_path = make_intermediate_path(inter_base.as_ref(), &funiq_key, &file.url)?;
+        let staged_path = make_staged_path(inter_base.as_ref(), &funiq_key, &file.url)?;
         let random_key = TeaclaveFile128Key::random();
         let staged_info = StagedFileInfo::new(&staged_path, random_key, FileAuthTag::default());
 
@@ -242,10 +246,13 @@ impl InterOutput {
 }
 
 impl InterOutputs {
-    pub fn new(base: impl AsRef<Path>, outputs: FunctionOutputFiles) -> Result<InterOutputs> {
+    pub fn new(
+        output_base: impl AsRef<Path>,
+        outputs: FunctionOutputFiles,
+    ) -> Result<InterOutputs> {
         outputs
             .into_iter()
-            .map(|(funiq_key, file)| InterOutput::new(base.as_ref(), funiq_key, file))
+            .map(|(funiq_key, file)| InterOutput::new(output_base.as_ref(), funiq_key, file))
             .collect()
     }
 
@@ -272,12 +279,13 @@ impl InterOutputs {
             .collect()
     }
 
-    pub(crate) fn upload(&self) -> Result<()> {
+    pub(crate) fn upload(&self, fusion_base: impl AsRef<Path>) -> Result<()> {
         let req_info = self.inner.iter().map(|inter_output| {
             HandleFileInfo::new(&inter_output.upload_path, &inter_output.file.url)
         });
-        let request = FileAgentRequest::new(HandleFileCommand::Upload, req_info);
-        log::info!("Ocall file download request: {:?}", request);
+        let request =
+            FileAgentRequest::new(HandleFileCommand::Upload, req_info, fusion_base.as_ref());
+        log::info!("Ocall file upload request: {:?}", request);
         handle_file_request(request)?;
         Ok(())
     }
@@ -333,8 +341,14 @@ pub mod tests {
         let outputs = hashmap!();
         let task_id = Uuid::new_v4();
 
-        let file_mgr =
-            TaskFileManager::new("/tmp", &task_id, &inputs.into(), &outputs.into()).unwrap();
+        let file_mgr = TaskFileManager::new(
+            "/tmp",
+            "/tmp/fusion_base",
+            &task_id,
+            &inputs.into(),
+            &outputs.into(),
+        )
+        .unwrap();
         file_mgr.prepare_staged_inputs().unwrap();
         file_mgr.prepare_staged_outputs().unwrap();
     }
diff --git a/services/management/enclave/src/lib.rs b/services/management/enclave/src/lib.rs
index bdba877..a5dbc11 100644
--- a/services/management/enclave/src/lib.rs
+++ b/services/management/enclave/src/lib.rs
@@ -82,10 +82,7 @@ fn start_service(config: &RuntimeConfig) -> Result<()> {
         verifier::universal_quote_verifier,
     );
 
-    let service = service::TeaclaveManagementService::new(
-        storage_service_endpoint,
-        config.mount.fusion_base_dir.clone(),
-    )?;
+    let service = service::TeaclaveManagementService::new(storage_service_endpoint)?;
     match server.start(service) {
         Ok(_) => (),
         Err(e) => {
diff --git a/services/management/enclave/src/service.rs b/services/management/enclave/src/service.rs
index fd5fac1..1ee15b6 100644
--- a/services/management/enclave/src/service.rs
+++ b/services/management/enclave/src/service.rs
@@ -17,7 +17,6 @@
 
 use anyhow::{anyhow, Result};
 use std::collections::HashMap;
-use std::path::PathBuf;
 use std::prelude::v1::*;
 use std::sync::{Arc, SgxMutex as Mutex};
 use teaclave_proto::teaclave_frontend_service::{
@@ -70,7 +69,6 @@ impl From<ServiceError> for TeaclaveServiceResponseError {
 #[derive(Clone)]
 pub(crate) struct TeaclaveManagementService {
     storage_client: Arc<Mutex<TeaclaveStorageClient>>,
-    fusion_base_dir: PathBuf,
 }
 
 impl TeaclaveManagement for TeaclaveManagementService {
@@ -439,10 +437,7 @@ impl TeaclaveManagement for TeaclaveManagementService {
 }
 
 impl TeaclaveManagementService {
-    pub(crate) fn new(
-        storage_service_endpoint: Endpoint,
-        fusion_base_dir: PathBuf,
-    ) -> Result<Self> {
+    pub(crate) fn new(storage_service_endpoint: Endpoint) -> Result<Self> {
         let mut i = 0;
         let channel = loop {
             match storage_service_endpoint.connect() {
@@ -456,10 +451,7 @@ impl TeaclaveManagementService {
             std::thread::sleep(std::time::Duration::from_secs(3));
         };
         let storage_client = Arc::new(Mutex::new(TeaclaveStorageClient::new(channel)?));
-        let service = Self {
-            storage_client,
-            fusion_base_dir,
-        };
+        let service = Self { storage_client };
 
         #[cfg(test_mode)]
         service.add_mock_data()?;
@@ -469,11 +461,7 @@ impl TeaclaveManagementService {
 
     pub fn create_fusion_data(&self, owners: impl Into<OwnerList>) -> Result<TeaclaveOutputFile> {
         let uuid = Uuid::new_v4();
-        let url = format!(
-            "file://{}/{}.fusion",
-            self.fusion_base_dir.display(),
-            uuid.to_string()
-        );
+        let url = format!("fusion:///TEACLAVE_FUSION_BASE/{}.fusion", uuid.to_string());
         let url = Url::parse(&url).map_err(|_| anyhow!("invalid url"))?;
         let crypto_info = FileCrypto::default();
 
diff --git a/types/src/file_agent.rs b/types/src/file_agent.rs
index ec91399..2f3db92 100644
--- a/types/src/file_agent.rs
+++ b/types/src/file_agent.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use serde::{Deserialize, Serialize};
-use std::path::PathBuf;
+use std::path::{Path, PathBuf};
 use std::prelude::v1::*;
 
 #[derive(Debug, Serialize, Deserialize)]
@@ -29,16 +29,22 @@ pub enum HandleFileCommand {
 pub struct FileAgentRequest {
     pub cmd: HandleFileCommand,
     pub info: Vec<HandleFileInfo>,
+    pub fusion_base: PathBuf,
 }
 
 impl FileAgentRequest {
-    pub fn new<T: IntoIterator>(cmd: HandleFileCommand, info: T) -> Self
+    pub fn new<T: IntoIterator>(
+        cmd: HandleFileCommand,
+        info: T,
+        fusion_base: impl AsRef<Path>,
+    ) -> Self
     where
         <T as IntoIterator>::Item: Into<HandleFileInfo>,
     {
         FileAgentRequest {
             cmd,
             info: info.into_iter().map(|x| x.into()).collect(),
+            fusion_base: fusion_base.as_ref().to_owned(),
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@teaclave.apache.org
For additional commands, e-mail: commits-help@teaclave.apache.org