You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@teaclave.apache.org by ms...@apache.org on 2020/05/09 18:37:35 UTC
[incubator-teaclave] branch master updated: [fusion] Optimize
fusion_base dir logic (#291)
This is an automated email from the ASF dual-hosted git repository.
mssun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git
The following commit(s) were added to refs/heads/master by this push:
new 3060efd [fusion] Optimize fusion_base dir logic (#291)
3060efd is described below
commit 3060efdc5bfc857b5e92e86526940b735bb8fca9
Author: Zhaofeng Chen <zf...@apache.org>
AuthorDate: Sat May 9 11:37:28 2020 -0700
[fusion] Optimize fusion_base dir logic (#291)
---
file_agent/src/agent.rs | 84 +++++++++++++++++-----
services/execution/enclave/src/lib.rs | 3 +-
services/execution/enclave/src/ocall.rs | 3 +-
services/execution/enclave/src/service.rs | 11 ++-
.../execution/enclave/src/task_file_manager.rs | 56 +++++++++------
services/management/enclave/src/lib.rs | 5 +-
services/management/enclave/src/service.rs | 18 +----
types/src/file_agent.rs | 10 ++-
8 files changed, 129 insertions(+), 61 deletions(-)
diff --git a/file_agent/src/agent.rs b/file_agent/src/agent.rs
index 98f8621..e366a71 100644
--- a/file_agent/src/agent.rs
+++ b/file_agent/src/agent.rs
@@ -21,6 +21,7 @@ use tokio::io::AsyncWriteExt;
use tokio_util::codec;
use url::Url;
+use std::path::{Component, Path, PathBuf};
use teaclave_types::{FileAgentRequest, HandleFileCommand, HandleFileInfo};
async fn download_remote_input_to_file(
@@ -79,23 +80,48 @@ async fn upload_output_file_to_remote(
}
}
-async fn handle_download(info: HandleFileInfo) -> anyhow::Result<()> {
+async fn handle_download(
+ info: HandleFileInfo,
+ fusion_base: impl AsRef<Path>,
+) -> anyhow::Result<()> {
anyhow::ensure!(
!info.local.exists(),
"[Download] Dest local file: {:?} already exists.",
info.local
);
let dst = info.local;
+ let remote = info.remote;
- match info.remote.scheme() {
+ match remote.scheme() {
"https" | "http" => {
- download_remote_input_to_file(info.remote, dst).await?;
+ download_remote_input_to_file(remote, dst).await?;
}
"file" => {
- let src = info
- .remote
+ let src = remote
.to_file_path()
- .map_err(|e| anyhow::anyhow!("Cannot convert to path: {:?}", e))?;
+ .map_err(|e| anyhow::anyhow!("Cannot convert file:// to path: {:?}", e))?;
+ anyhow::ensure!(
+ src.exists(),
+ "[Download] Src local file: {:?} doesn't exist.",
+ src
+ );
+ copy_file(src, dst).await?;
+ }
+ "fusion" => {
+ let path = remote
+ .to_file_path()
+ .map_err(|e| anyhow::anyhow!("Cannot convert fusion:// to path: {:?}", e))?;
+ let components = path.components().collect::<Vec<_>>();
+ anyhow::ensure!(
+ (components[0] == Component::RootDir)
+ && (components[1] == Component::Normal("TEACLAVE_FUSION_BASE".as_ref())),
+ "[Download] Fusion data format error: {:?}",
+ components
+ );
+
+ let relative_path: PathBuf = components[2..].iter().collect();
+ let src: PathBuf = fusion_base.as_ref().join(relative_path);
+
anyhow::ensure!(
src.exists(),
"[Download] Src local file: {:?} doesn't exist.",
@@ -108,7 +134,7 @@ async fn handle_download(info: HandleFileInfo) -> anyhow::Result<()> {
Ok(())
}
-async fn handle_upload(info: HandleFileInfo) -> anyhow::Result<()> {
+async fn handle_upload(info: HandleFileInfo, fusion_base: impl AsRef<Path>) -> anyhow::Result<()> {
anyhow::ensure!(
info.local.exists(),
"[Upload] Src local file: {:?} doesn't exist.",
@@ -125,9 +151,28 @@ async fn handle_upload(info: HandleFileInfo) -> anyhow::Result<()> {
.remote
.to_file_path()
.map_err(|e| anyhow::anyhow!("Cannot convert to path: {:?}", e))?;
+ anyhow::ensure!(!dst.exists(), "[Upload] Dest local file: {:?} exist.", dst);
+ copy_file(src, dst).await?;
+ }
+ "fusion" => {
+ let path = info
+ .remote
+ .to_file_path()
+ .map_err(|e| anyhow::anyhow!("Cannot convert fusion:// to path: {:?}", e))?;
+ let components = path.components().collect::<Vec<_>>();
+ anyhow::ensure!(
+ (components[0] == Component::RootDir)
+ && (components[1] == Component::Normal("TEACLAVE_FUSION_BASE".as_ref())),
+ "[Upload] Fusion data format error: {:?}",
+ components
+ );
+
+ let relative_path: PathBuf = components[2..].iter().collect();
+ let dst: PathBuf = fusion_base.as_ref().join(relative_path);
+
anyhow::ensure!(
!dst.exists(),
- "[Download] Dest local file: {:?} already exist.",
+ "[Upload] Dest fusion file: {:?} exists.",
dst
);
copy_file(src, dst).await?;
@@ -144,12 +189,16 @@ fn handle_file_request(bytes: &[u8]) -> anyhow::Result<()> {
.enable_all()
.build()?
.block_on(async {
+ let fusion_base = req.fusion_base.clone();
match req.cmd {
HandleFileCommand::Download => {
let futures: Vec<_> = req
.info
.into_iter()
- .map(|info| tokio::spawn(async { handle_download(info).await }))
+ .map(|info| {
+ let fusion_base = fusion_base.clone();
+ tokio::spawn(async { handle_download(info, fusion_base).await })
+ })
.collect();
join_all(futures).await
}
@@ -157,7 +206,10 @@ fn handle_file_request(bytes: &[u8]) -> anyhow::Result<()> {
let futures: Vec<_> = req
.info
.into_iter()
- .map(|info| tokio::spawn(async { handle_upload(info).await }))
+ .map(|info| {
+ let fusion_base = fusion_base.clone();
+ tokio::spawn(async { handle_upload(info, fusion_base).await })
+ })
.collect();
join_all(futures).await
}
@@ -214,7 +266,7 @@ mod tests {
let dest = PathBuf::from("/tmp/input_test.txt");
let info = HandleFileInfo::new(&dest, &url);
- let req = FileAgentRequest::new(HandleFileCommand::Download, vec![info]);
+ let req = FileAgentRequest::new(HandleFileCommand::Download, vec![info], "");
let bytes = serde_json::to_vec(&req).unwrap();
handle_file_request(&bytes).unwrap();
@@ -234,7 +286,7 @@ mod tests {
let url = Url::parse(s).unwrap();
let info = HandleFileInfo::new(&src, &url);
- let req = FileAgentRequest::new(HandleFileCommand::Upload, vec![info]);
+ let req = FileAgentRequest::new(HandleFileCommand::Upload, vec![info], "");
let bytes = serde_json::to_vec(&req).unwrap();
handle_file_request(&bytes).unwrap();
@@ -255,7 +307,7 @@ mod tests {
.iter()
.map(|fname| HandleFileInfo::new(base.join(fname), &url))
.collect();
- let req = FileAgentRequest::new(HandleFileCommand::Download, info_list);
+ let req = FileAgentRequest::new(HandleFileCommand::Download, info_list, "");
let bytes = serde_json::to_vec(&req).unwrap();
handle_file_request(&bytes).unwrap();
@@ -284,7 +336,7 @@ mod tests {
})
.collect();
- let req = FileAgentRequest::new(HandleFileCommand::Upload, info_list);
+ let req = FileAgentRequest::new(HandleFileCommand::Upload, info_list, "");
let bytes = serde_json::to_vec(&req).unwrap();
handle_file_request(&bytes).unwrap();
@@ -309,7 +361,7 @@ mod tests {
let url = Url::parse(&s).unwrap();
let info = HandleFileInfo::new(&src, &url);
- let req = FileAgentRequest::new(HandleFileCommand::Upload, vec![info]);
+ let req = FileAgentRequest::new(HandleFileCommand::Upload, vec![info], "");
let bytes = serde_json::to_vec(&req).unwrap();
handle_file_request(&bytes).unwrap();
@@ -317,7 +369,7 @@ mod tests {
// test local download
let dest = base.join("d2.txt");
let info = HandleFileInfo::new(&dest, &url);
- let req = FileAgentRequest::new(HandleFileCommand::Download, vec![info]);
+ let req = FileAgentRequest::new(HandleFileCommand::Download, vec![info], "");
let bytes = serde_json::to_vec(&req).unwrap();
handle_file_request(&bytes).unwrap();
diff --git a/services/execution/enclave/src/lib.rs b/services/execution/enclave/src/lib.rs
index 0903090..77487d3 100644
--- a/services/execution/enclave/src/lib.rs
+++ b/services/execution/enclave/src/lib.rs
@@ -69,7 +69,8 @@ fn start_service(config: &RuntimeConfig) -> Result<()> {
fusion_base.display()
);
- let mut service = service::TeaclaveExecutionService::new(scheduler_service_endpoint)?;
+ let mut service =
+ service::TeaclaveExecutionService::new(scheduler_service_endpoint, fusion_base)?;
let _ = service.start();
Ok(())
diff --git a/services/execution/enclave/src/ocall.rs b/services/execution/enclave/src/ocall.rs
index ffbc774..d06aa82 100644
--- a/services/execution/enclave/src/ocall.rs
+++ b/services/execution/enclave/src/ocall.rs
@@ -64,7 +64,8 @@ pub mod tests {
let dest = PathBuf::from("/tmp/execution_input_test.txt");
let info = HandleFileInfo::new(&dest, &url);
- let request = FileAgentRequest::new(HandleFileCommand::Download, vec![info]);
+ let request =
+ FileAgentRequest::new(HandleFileCommand::Download, vec![info], "/tmp/fusion_data");
handle_file_request(request).unwrap();
std::untrusted::fs::remove_file(&dest).unwrap();
diff --git a/services/execution/enclave/src/service.rs b/services/execution/enclave/src/service.rs
index a0e419d..18ce902 100644
--- a/services/execution/enclave/src/service.rs
+++ b/services/execution/enclave/src/service.rs
@@ -16,6 +16,7 @@
// under the License.
use std::collections::HashMap;
+use std::path::{Path, PathBuf};
use std::prelude::v1::*;
use std::sync::{Arc, SgxMutex as Mutex};
@@ -34,10 +35,14 @@ static WORKER_BASE_DIR: &str = "/tmp/teaclave_agent/";
pub(crate) struct TeaclaveExecutionService {
worker: Arc<Worker>,
scheduler_client: Arc<Mutex<TeaclaveSchedulerClient>>,
+ fusion_base: PathBuf,
}
impl TeaclaveExecutionService {
- pub(crate) fn new(scheduler_service_endpoint: Endpoint) -> Result<Self> {
+ pub(crate) fn new(
+ scheduler_service_endpoint: Endpoint,
+ fusion_base: impl AsRef<Path>,
+ ) -> Result<Self> {
let mut i = 0;
let channel = loop {
match scheduler_service_endpoint.connect() {
@@ -55,6 +60,7 @@ impl TeaclaveExecutionService {
Ok(TeaclaveExecutionService {
worker: Arc::new(Worker::default()),
scheduler_client,
+ fusion_base: fusion_base.as_ref().to_owned(),
})
}
@@ -101,6 +107,7 @@ impl TeaclaveExecutionService {
let file_mgr = TaskFileManager::new(
WORKER_BASE_DIR,
+ &self.fusion_base,
&task.task_id,
&task.input_data,
&task.output_data,
@@ -185,6 +192,7 @@ pub mod tests {
let file_mgr = TaskFileManager::new(
WORKER_BASE_DIR,
+ "/tmp/fusion_base",
&staged_task.task_id,
&staged_task.input_data,
&staged_task.output_data,
@@ -242,6 +250,7 @@ pub mod tests {
let file_mgr = TaskFileManager::new(
WORKER_BASE_DIR,
+ "/tmp/fusion_base",
&staged_task.task_id,
&staged_task.input_data,
&staged_task.output_data,
diff --git a/services/execution/enclave/src/task_file_manager.rs b/services/execution/enclave/src/task_file_manager.rs
index 6e27dda..908cbaf 100644
--- a/services/execution/enclave/src/task_file_manager.rs
+++ b/services/execution/enclave/src/task_file_manager.rs
@@ -30,6 +30,7 @@ use uuid::Uuid;
pub(crate) struct TaskFileManager {
inter_inputs: InterInputs,
inter_outputs: InterOutputs,
+ fusion_base: PathBuf,
}
struct InterInputs {
@@ -56,12 +57,13 @@ pub(self) struct InterOutput {
impl TaskFileManager {
pub(crate) fn new(
- base: &str,
+ inter_base: impl AsRef<Path>,
+ fusion_base: impl AsRef<Path>,
task_id: &Uuid,
inputs: &FunctionInputFiles,
outputs: &FunctionOutputFiles,
) -> Result<Self> {
- let cwd = Path::new(base).join(task_id.to_string());
+ let cwd = Path::new(inter_base.as_ref()).join(task_id.to_string());
let inputs_base = cwd.join("inputs");
let outputs_base = cwd.join("outputs");
@@ -71,13 +73,14 @@ impl TaskFileManager {
let tfmgr = TaskFileManager {
inter_inputs,
inter_outputs,
+ fusion_base: fusion_base.as_ref().to_owned(),
};
Ok(tfmgr)
}
pub(crate) fn prepare_staged_inputs(&self) -> Result<StagedFiles> {
- self.inter_inputs.download()?;
+ self.inter_inputs.download(&self.fusion_base)?;
self.inter_inputs.convert_to_staged_files()
}
@@ -88,19 +91,19 @@ impl TaskFileManager {
pub(crate) fn upload_outputs(&self) -> Result<HashMap<String, FileAuthTag>> {
let auth_tags = self.inter_outputs.convert_staged_files_for_upload()?;
- self.inter_outputs.upload()?;
+ self.inter_outputs.upload(&self.fusion_base)?;
Ok(auth_tags)
}
}
impl InterInput {
fn new(
- base: impl AsRef<Path>,
+ inter_base: impl AsRef<Path>,
funiq_key: String,
file: FunctionInputFile,
) -> Result<InterInput> {
- let download_path = make_intermediate_path(base.as_ref(), &funiq_key, &file.url)?;
- let staged_path = make_staged_path(base.as_ref(), &funiq_key, &file.url)?;
+ let download_path = make_intermediate_path(inter_base.as_ref(), &funiq_key, &file.url)?;
+ let staged_path = make_staged_path(inter_base.as_ref(), &funiq_key, &file.url)?;
Ok(InterInput {
funiq_key,
@@ -168,18 +171,19 @@ impl std::iter::FromIterator<InterInput> for InterInputs {
}
impl InterInputs {
- pub fn new(base: impl AsRef<Path>, inputs: FunctionInputFiles) -> Result<InterInputs> {
+ pub fn new(input_base: impl AsRef<Path>, inputs: FunctionInputFiles) -> Result<InterInputs> {
inputs
.into_iter()
- .map(|(funiq_key, file)| InterInput::new(base.as_ref(), funiq_key, file))
+ .map(|(funiq_key, file)| InterInput::new(input_base.as_ref(), funiq_key, file))
.collect()
}
- pub(crate) fn download(&self) -> Result<()> {
+ pub(crate) fn download(&self, fusion_base: impl AsRef<Path>) -> Result<()> {
let req_info = self.inner.iter().map(|inter_input| {
HandleFileInfo::new(&inter_input.download_path, &inter_input.file.url)
});
- let request = FileAgentRequest::new(HandleFileCommand::Download, req_info);
+ let request =
+ FileAgentRequest::new(HandleFileCommand::Download, req_info, fusion_base.as_ref());
log::info!("Ocall file download request: {:?}", request);
handle_file_request(request)?;
Ok(())
@@ -203,12 +207,12 @@ impl std::iter::FromIterator<InterOutput> for InterOutputs {
impl InterOutput {
pub fn new(
- base: impl AsRef<Path>,
+ inter_base: impl AsRef<Path>,
funiq_key: String,
file: FunctionOutputFile,
) -> Result<InterOutput> {
- let upload_path = make_intermediate_path(base.as_ref(), &funiq_key, &file.url)?;
- let staged_path = make_staged_path(base.as_ref(), &funiq_key, &file.url)?;
+ let upload_path = make_intermediate_path(inter_base.as_ref(), &funiq_key, &file.url)?;
+ let staged_path = make_staged_path(inter_base.as_ref(), &funiq_key, &file.url)?;
let random_key = TeaclaveFile128Key::random();
let staged_info = StagedFileInfo::new(&staged_path, random_key, FileAuthTag::default());
@@ -242,10 +246,13 @@ impl InterOutput {
}
impl InterOutputs {
- pub fn new(base: impl AsRef<Path>, outputs: FunctionOutputFiles) -> Result<InterOutputs> {
+ pub fn new(
+ output_base: impl AsRef<Path>,
+ outputs: FunctionOutputFiles,
+ ) -> Result<InterOutputs> {
outputs
.into_iter()
- .map(|(funiq_key, file)| InterOutput::new(base.as_ref(), funiq_key, file))
+ .map(|(funiq_key, file)| InterOutput::new(output_base.as_ref(), funiq_key, file))
.collect()
}
@@ -272,12 +279,13 @@ impl InterOutputs {
.collect()
}
- pub(crate) fn upload(&self) -> Result<()> {
+ pub(crate) fn upload(&self, fusion_base: impl AsRef<Path>) -> Result<()> {
let req_info = self.inner.iter().map(|inter_output| {
HandleFileInfo::new(&inter_output.upload_path, &inter_output.file.url)
});
- let request = FileAgentRequest::new(HandleFileCommand::Upload, req_info);
- log::info!("Ocall file download request: {:?}", request);
+ let request =
+ FileAgentRequest::new(HandleFileCommand::Upload, req_info, fusion_base.as_ref());
+ log::info!("Ocall file upload request: {:?}", request);
handle_file_request(request)?;
Ok(())
}
@@ -333,8 +341,14 @@ pub mod tests {
let outputs = hashmap!();
let task_id = Uuid::new_v4();
- let file_mgr =
- TaskFileManager::new("/tmp", &task_id, &inputs.into(), &outputs.into()).unwrap();
+ let file_mgr = TaskFileManager::new(
+ "/tmp",
+ "/tmp/fusion_base",
+ &task_id,
+ &inputs.into(),
+ &outputs.into(),
+ )
+ .unwrap();
file_mgr.prepare_staged_inputs().unwrap();
file_mgr.prepare_staged_outputs().unwrap();
}
diff --git a/services/management/enclave/src/lib.rs b/services/management/enclave/src/lib.rs
index bdba877..a5dbc11 100644
--- a/services/management/enclave/src/lib.rs
+++ b/services/management/enclave/src/lib.rs
@@ -82,10 +82,7 @@ fn start_service(config: &RuntimeConfig) -> Result<()> {
verifier::universal_quote_verifier,
);
- let service = service::TeaclaveManagementService::new(
- storage_service_endpoint,
- config.mount.fusion_base_dir.clone(),
- )?;
+ let service = service::TeaclaveManagementService::new(storage_service_endpoint)?;
match server.start(service) {
Ok(_) => (),
Err(e) => {
diff --git a/services/management/enclave/src/service.rs b/services/management/enclave/src/service.rs
index fd5fac1..1ee15b6 100644
--- a/services/management/enclave/src/service.rs
+++ b/services/management/enclave/src/service.rs
@@ -17,7 +17,6 @@
use anyhow::{anyhow, Result};
use std::collections::HashMap;
-use std::path::PathBuf;
use std::prelude::v1::*;
use std::sync::{Arc, SgxMutex as Mutex};
use teaclave_proto::teaclave_frontend_service::{
@@ -70,7 +69,6 @@ impl From<ServiceError> for TeaclaveServiceResponseError {
#[derive(Clone)]
pub(crate) struct TeaclaveManagementService {
storage_client: Arc<Mutex<TeaclaveStorageClient>>,
- fusion_base_dir: PathBuf,
}
impl TeaclaveManagement for TeaclaveManagementService {
@@ -439,10 +437,7 @@ impl TeaclaveManagement for TeaclaveManagementService {
}
impl TeaclaveManagementService {
- pub(crate) fn new(
- storage_service_endpoint: Endpoint,
- fusion_base_dir: PathBuf,
- ) -> Result<Self> {
+ pub(crate) fn new(storage_service_endpoint: Endpoint) -> Result<Self> {
let mut i = 0;
let channel = loop {
match storage_service_endpoint.connect() {
@@ -456,10 +451,7 @@ impl TeaclaveManagementService {
std::thread::sleep(std::time::Duration::from_secs(3));
};
let storage_client = Arc::new(Mutex::new(TeaclaveStorageClient::new(channel)?));
- let service = Self {
- storage_client,
- fusion_base_dir,
- };
+ let service = Self { storage_client };
#[cfg(test_mode)]
service.add_mock_data()?;
@@ -469,11 +461,7 @@ impl TeaclaveManagementService {
pub fn create_fusion_data(&self, owners: impl Into<OwnerList>) -> Result<TeaclaveOutputFile> {
let uuid = Uuid::new_v4();
- let url = format!(
- "file://{}/{}.fusion",
- self.fusion_base_dir.display(),
- uuid.to_string()
- );
+ let url = format!("fusion:///TEACLAVE_FUSION_BASE/{}.fusion", uuid.to_string());
let url = Url::parse(&url).map_err(|_| anyhow!("invalid url"))?;
let crypto_info = FileCrypto::default();
diff --git a/types/src/file_agent.rs b/types/src/file_agent.rs
index ec91399..2f3db92 100644
--- a/types/src/file_agent.rs
+++ b/types/src/file_agent.rs
@@ -16,7 +16,7 @@
// under the License.
use serde::{Deserialize, Serialize};
-use std::path::PathBuf;
+use std::path::{Path, PathBuf};
use std::prelude::v1::*;
#[derive(Debug, Serialize, Deserialize)]
@@ -29,16 +29,22 @@ pub enum HandleFileCommand {
pub struct FileAgentRequest {
pub cmd: HandleFileCommand,
pub info: Vec<HandleFileInfo>,
+ pub fusion_base: PathBuf,
}
impl FileAgentRequest {
- pub fn new<T: IntoIterator>(cmd: HandleFileCommand, info: T) -> Self
+ pub fn new<T: IntoIterator>(
+ cmd: HandleFileCommand,
+ info: T,
+ fusion_base: impl AsRef<Path>,
+ ) -> Self
where
<T as IntoIterator>::Item: Into<HandleFileInfo>,
{
FileAgentRequest {
cmd,
info: info.into_iter().map(|x| x.into()).collect(),
+ fusion_base: fusion_base.as_ref().to_owned(),
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@teaclave.apache.org
For additional commands, e-mail: commits-help@teaclave.apache.org