You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2023/01/20 12:23:36 UTC
[airavata-mft] branch master updated: Bootstrapping MFT from command line with standalone mode
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new 1244f23 Bootstrapping MFT from command line with standalone mode
1244f23 is described below
commit 1244f2397a1df9a93ca93c260a56409318416f4e
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Fri Jan 20 07:23:28 2023 -0500
Bootstrapping MFT from command line with standalone mode
---
python-cli/README.md | 9 +-
.../{mft_cli => airavata_mft_cli}/__init__.py | 0
python-cli/mft_cli/airavata_mft_cli/base.py | 21 +++
python-cli/mft_cli/airavata_mft_cli/bootstrap.py | 111 ++++++++++++
python-cli/mft_cli/airavata_mft_cli/main.py | 10 ++
python-cli/mft_cli/airavata_mft_cli/operations.py | 190 +++++++++++++++++++++
.../storage/__init__.py | 14 +-
.../{mft_cli => airavata_mft_cli}/storage/azure.py | 8 +-
.../{mft_cli => airavata_mft_cli}/storage/gcs.py | 9 +-
.../{mft_cli => airavata_mft_cli}/storage/s3.py | 8 +-
python-cli/mft_cli/mft_cli/main.py | 180 -------------------
python-cli/mft_cli/pyproject.toml | 9 +-
python-sdk/src/airavata_mft_sdk/mft_client.py | 4 +-
13 files changed, 379 insertions(+), 194 deletions(-)
diff --git a/python-cli/README.md b/python-cli/README.md
index 79b6c52..7973f73 100644
--- a/python-cli/README.md
+++ b/python-cli/README.md
@@ -10,7 +10,7 @@ pip install pick
Load Poetry shell
```
-cd mft_cli
+cd airavata_mft_cli
poetry shell
```
@@ -24,5 +24,10 @@ pip install airavata_mft_sdk==0.0.1-alpha21
Build the binary
```
poetry install
-mft-cli
+mft --help
+```
+
+To publish the ditribution to pypi
+```
+ poetry publish --build
```
\ No newline at end of file
diff --git a/python-cli/mft_cli/mft_cli/__init__.py b/python-cli/mft_cli/airavata_mft_cli/__init__.py
similarity index 100%
rename from python-cli/mft_cli/mft_cli/__init__.py
rename to python-cli/mft_cli/airavata_mft_cli/__init__.py
diff --git a/python-cli/mft_cli/airavata_mft_cli/base.py b/python-cli/mft_cli/airavata_mft_cli/base.py
new file mode 100644
index 0000000..39882ac
--- /dev/null
+++ b/python-cli/mft_cli/airavata_mft_cli/base.py
@@ -0,0 +1,21 @@
+import typer
+import airavata_mft_cli.operations as operations
+import airavata_mft_cli.bootstrap as bootstrap
+
+app = typer.Typer()
+
+@app.command("ls")
+def list(storage_path):
+ operations.list(storage_path)
+
+@app.command("cp")
+def copy(source, destination):
+ operations.copy(source, destination)
+
+@app.command("init")
+def init_mft():
+ bootstrap.start_mft()
+
+@app.command("stop")
+def init_mft():
+ bootstrap.stop_mft()
\ No newline at end of file
diff --git a/python-cli/mft_cli/airavata_mft_cli/bootstrap.py b/python-cli/mft_cli/airavata_mft_cli/bootstrap.py
new file mode 100644
index 0000000..133ab15
--- /dev/null
+++ b/python-cli/mft_cli/airavata_mft_cli/bootstrap.py
@@ -0,0 +1,111 @@
+import typer
+import requests
+import os
+import zipfile
+from subprocess import call
+from subprocess import Popen
+from pathlib import Path
+
+def download_and_unarchive(url, download_path, extract_dir = os.path.join(os.path.expanduser('~'), ".mft/")):
+
+ response = requests.get(url, stream=True)
+ file_size = int(response.headers['Content-Length'])
+ with typer.progressbar(length=file_size) as progress:
+ with open(download_path, "wb") as handle:
+ for data in response.iter_content(chunk_size=8192 * 2):
+ progress.update(len(data))
+ handle.write(data)
+
+ print("Un archiving ....")
+ with zipfile.ZipFile(download_path,"r") as zip_ref:
+ zip_ref.extractall(extract_dir)
+
+ os.remove(download_path)
+
+def restart_service(bin_path, daemon_script_name):
+ current_dir = os.getcwd()
+ try:
+ os.chdir(bin_path)
+ os.chmod(daemon_script_name, 0o744)
+ rc = call(["./" + daemon_script_name, "stop"])
+ rc = call(["./" + daemon_script_name, "start"])
+ finally:
+ os.chdir(current_dir)
+
+def stop_service(bin_path, daemon_script_name):
+ current_dir = os.getcwd()
+ try:
+ os.chdir(bin_path)
+ os.chmod(daemon_script_name, 0o744)
+ rc = call(["./" + daemon_script_name, "stop"])
+ finally:
+ os.chdir(current_dir)
+
+def start_mft():
+ print("Setting up MFT Services")
+
+ path = os.path.join(os.path.expanduser('~'), ".mft/consul")
+ if not os.path.exists(path):
+ consul_macos_url = "https://releases.hashicorp.com/consul/1.7.1/consul_1.7.1_darwin_amd64.zip"
+ consul_linux_url = "https://releases.hashicorp.com/consul/1.7.1/consul_1.7.1_linux_amd64.zip"
+ print("Downloading Consul...")
+ zip_path = os.path.join(os.path.expanduser('~'), ".mft/consul.zip")
+ download_and_unarchive(consul_macos_url, zip_path, os.path.join(os.path.expanduser('~'), ".mft/"))
+
+ current_dir = os.getcwd()
+ try:
+ os.chdir(os.path.join(os.path.expanduser('~'), ".mft"))
+ os.chmod("consul", 0o744)
+
+ if os.path.exists("consul.pid"):
+ pid = Path('consul.pid').read_text()
+ call(["kill", "-9", pid])
+
+ consul_process = Popen(['nohup', './consul', "agent", "-dev"],
+ stdout=open('consul.log', 'w'),
+ stderr=open('consul.err.log', 'a'),
+ preexec_fn=os.setpgrp)
+
+ print("Consul process id: " + str(consul_process.pid))
+ with open("consul.pid", "w") as consul_pid:
+ consul_pid.write(str(consul_process.pid))
+ finally:
+ os.chdir(current_dir)
+
+ path = os.path.join(os.path.expanduser('~'), ".mft/Standalone-Service-0.01")
+ if not os.path.exists(path):
+ url = "https://github.com/apache/airavata-mft/releases/download/v0.0.1/Standalone-Service-0.01-bin.zip"
+ print("Downloading MFT Server...")
+ zip_path = os.path.join(os.path.expanduser('~'), ".mft/Standalone-Service-0.01-bin.zip")
+ download_and_unarchive(url, zip_path)
+
+ restart_service(path + "/bin", "standalone-service-daemon.sh")
+
+ print("MFT Started")
+
+
+def stop_mft():
+ print("Stopping up MFT Services")
+
+ path = os.path.join(os.path.expanduser('~'), ".mft/consul")
+ if os.path.exists(path):
+ current_dir = os.getcwd()
+ try:
+ os.chdir(os.path.join(os.path.expanduser('~'), ".mft"))
+ os.chmod("consul", 0o744)
+
+ if os.path.exists("consul.pid"):
+ pid = Path('consul.pid').read_text()
+ call(["kill", "-9", pid])
+ finally:
+ os.chdir(current_dir)
+
+ path = os.path.join(os.path.expanduser('~'), ".mft/Standalone-Service-0.01")
+ if os.path.exists(path):
+ stop_service(path + "/bin", "standalone-service-daemon.sh")
+
+ print("MFT Stopped....")
+
+
+
+
diff --git a/python-cli/mft_cli/airavata_mft_cli/main.py b/python-cli/mft_cli/airavata_mft_cli/main.py
new file mode 100644
index 0000000..b402a76
--- /dev/null
+++ b/python-cli/mft_cli/airavata_mft_cli/main.py
@@ -0,0 +1,10 @@
+import typer
+import airavata_mft_cli.storage
+import airavata_mft_cli.base
+
+app = airavata_mft_cli.base.app
+
+app.add_typer(airavata_mft_cli.storage.app, name="storage")
+
+if __name__ == "__main__":
+ app()
\ No newline at end of file
diff --git a/python-cli/mft_cli/airavata_mft_cli/operations.py b/python-cli/mft_cli/airavata_mft_cli/operations.py
new file mode 100644
index 0000000..4e9b83f
--- /dev/null
+++ b/python-cli/mft_cli/airavata_mft_cli/operations.py
@@ -0,0 +1,190 @@
+import typer
+from airavata_mft_sdk import mft_client
+from airavata_mft_sdk.common import StorageCommon_pb2
+from airavata_mft_sdk import MFTTransferApi_pb2
+from rich.console import Console
+from rich.table import Table
+import time
+
+def fetch_storage_and_secret_ids(storage_name):
+ client = mft_client.MFTClient(transfer_api_port = 7003,
+ transfer_api_secured = False,
+ resource_service_host = "localhost",
+ resource_service_port = 7003,
+ resource_service_secured = False,
+ secret_service_host = "localhost",
+ secret_service_port = 7003)
+ search_req = StorageCommon_pb2.StorageSearchRequest(storageName=storage_name)
+ storages = client.common_api.searchStorages(search_req)
+
+ if len(storages.storageList) == 0:
+ search_req = StorageCommon_pb2.StorageSearchRequest(storageId=storage_name)
+ storages = client.common_api.searchStorages(search_req)
+
+ if len(storages.storageList) == 0:
+ print("No storage with name or id " + storage_name + " was found. Please register the storage with command mft-cli storage add")
+ raise typer.Abort()
+
+ if len(storages.storageList) > 1:
+ print("More than one storage with nam " + storage_name + " was found. Please use the storage id. You can fetch it from mft-cli storage list")
+ raise typer.Abort()
+
+ storage = storages.storageList[0]
+ sec_req = StorageCommon_pb2.SecretForStorageGetRequest(storageId = storage.storageId)
+ sec_resp = client.common_api.getSecretForStorage(sec_req)
+ if sec_resp.error != 0:
+ print("Could not fetch the secret for storage " + storage.storageId)
+
+ return sec_resp.storageId, sec_resp.secretId
+
+def get_resource_metadata(storage_path, recursive_search = False):
+ storage_name = storage_path.split("/")[0]
+ resource_path = storage_path[len(storage_name) +1 :]
+
+ storage_id, secret_id = fetch_storage_and_secret_ids(storage_name)
+
+ id_req = MFTTransferApi_pb2.GetResourceMetadataFromIDsRequest(storageId = storage_id,
+ secretId = secret_id,
+ resourcePath = resource_path)
+ resource_medata_req = MFTTransferApi_pb2.FetchResourceMetadataRequest(idRequest = id_req)
+
+ client = mft_client.MFTClient(transfer_api_port = 7003,
+ transfer_api_secured = False,
+ resource_service_host = "localhost",
+ resource_service_port = 7003,
+ resource_service_secured = False,
+ secret_service_host = "localhost",
+ secret_service_port = 7003)
+
+ metadata_resp = client.transfer_api.resourceMetadata(resource_medata_req)
+ return metadata_resp
+
+def list(storage_path):
+
+ metadata_resp = get_resource_metadata(storage_path)
+
+ console = Console()
+ table = Table()
+
+ table.add_column('Name', justify='left')
+ table.add_column('Type', justify='center')
+ table.add_column('Size', justify='center')
+
+ if (metadata_resp.WhichOneof('metadata') == 'directory') :
+ for dir in metadata_resp.directory.directories:
+ table.add_row('[bold]' + dir.friendlyName + '[/bold]', 'DIR', '')
+
+ for file in metadata_resp.directory.files:
+ table.add_row('[bold]' + file.friendlyName + '[/bold]', 'FILE', str(file.resourceSize))
+
+ elif (metadata_resp.WhichOneof('metadata') == 'file'):
+ table.add_row('[bold]' + metadata_resp.file.friendlyName + '[/bold]', 'FILE', str(metadata_resp.file.resourceSize))
+
+ elif (metadata_resp.WhichOneof('metadata') == 'error'):
+ print(metadata_resp.error)
+
+ console.print(table)
+
+def flatten_directories(directory, parent_path, file_list):
+ for dir in directory.directories:
+ flatten_directories(dir, parent_path + dir.friendlyName + "/", file_list)
+
+ for file in directory.files:
+ file_list.append((file, parent_path + file.friendlyName))
+
+def copy(source, destination):
+
+ source_storage_id, source_secret_id = fetch_storage_and_secret_ids(source.split("/")[0])
+ dest_storage_id, dest_secret_id = fetch_storage_and_secret_ids(destination.split("/")[0])
+
+ ## TODO : Check agent availability and deploy cloud agents if required
+
+ file_list = []
+ source_metadata = get_resource_metadata(source)
+ endpoint_paths = []
+ total_volume = 0
+
+ transfer_request = MFTTransferApi_pb2.TransferApiRequest(sourceStorageId = source_storage_id,
+ sourceSecretId = source_secret_id,
+ destinationStorageId = dest_storage_id,
+ destinationSecretId = dest_secret_id,
+ optimizeTransferPath = False)
+
+ if (source_metadata.WhichOneof('metadata') == 'directory') :
+ if (destination[-1] != "/"):
+ print("Source is a directory path so destination path should end with /")
+ raise typer.Abort()
+
+ flatten_directories(source_metadata.directory, "", file_list)
+ for file_entry in file_list:
+ file = file_entry[0]
+ relative_path = file_entry[1]
+ endpoint_paths.append(MFTTransferApi_pb2.EndpointPaths(
+ sourcePath = file.resourcePath,
+ destinationPath = destination[len(destination.split("/")[0]) +1 :] + relative_path))
+ total_volume += file.resourceSize
+
+ elif (source_metadata.WhichOneof('metadata') == 'file'):
+ file_list.append((source_metadata.file, source_metadata.file.friendlyName))
+
+ if destination[-1] == "/":
+ destination = destination + source_metadata.file.friendlyName
+
+ endpoint_paths.append(MFTTransferApi_pb2.EndpointPaths(
+ sourcePath = source_metadata.file.resourcePath,
+ destinationPath = destination[len(destination.split("/")[0]) +1 :]))
+
+ total_volume += source_metadata.file.resourceSize
+
+ elif (source_metadata.WhichOneof('metadata') == 'error'):
+ print("Failed while fetching source details")
+ print(metadata_resp.error)
+ raise typer.Abort()
+
+ transfer_request.endpointPaths.extend(endpoint_paths)
+
+ confirm = typer.confirm("Total number of " + str(len(endpoint_paths)) +
+ " files to be transferred. Total volume is " + str(total_volume)
+ + " bytes. Do you want to start the transfer? ", True)
+
+ client = mft_client.MFTClient(transfer_api_port = 7003,
+ transfer_api_secured = False,
+ resource_service_host = "localhost",
+ resource_service_port = 7003,
+ resource_service_secured = False,
+ secret_service_host = "localhost",
+ secret_service_port = 7003)
+
+ transfer_resp = client.transfer_api.submitTransfer(transfer_request)
+
+ if not confirm:
+ raise typer.Abort()
+
+ transfer_id = transfer_resp.transferId
+
+ state_request = MFTTransferApi_pb2.TransferStateApiRequest(transferId=transfer_id)
+
+ ## TODO: This has to be optimized and avoid frequent polling of all transfer ids in each iteration
+ ## Possible fix is to introduce a parent batch transfer id at the API level and fetch child trnasfer id
+ # summaries in a single API call
+
+ completed = 0
+ failed = 0
+
+ with typer.progressbar(length=100) as progress:
+
+ while 1:
+ state_resp = client.transfer_api.getTransferStateSummary(state_request)
+
+ progress.update(int(state_resp.percentage * 100))
+ if (state_resp.percentage == 1.0):
+ completed = len(state_resp.completed)
+ failed = len(state_resp.failed)
+ break
+
+ if (state_resp.state == "FAILED"):
+ print("Transfer failed. Reason: " + state_resp.description)
+ raise typer.Abort()
+ time.sleep(1)
+
+ print(f"Processed {completed + failed} files. Completed {completed}, Failed {failed}.")
diff --git a/python-cli/mft_cli/mft_cli/storage/__init__.py b/python-cli/mft_cli/airavata_mft_cli/storage/__init__.py
similarity index 68%
rename from python-cli/mft_cli/mft_cli/storage/__init__.py
rename to python-cli/mft_cli/airavata_mft_cli/storage/__init__.py
index 4a9ac56..effb3a7 100644
--- a/python-cli/mft_cli/mft_cli/storage/__init__.py
+++ b/python-cli/mft_cli/airavata_mft_cli/storage/__init__.py
@@ -1,8 +1,8 @@
import typer
from pick import pick
-import mft_cli.storage.s3 as s3
-import mft_cli.storage.azure as azure
-import mft_cli.storage.gcs as gcs
+import airavata_mft_cli.storage.s3 as s3
+import airavata_mft_cli.storage.azure as azure
+import airavata_mft_cli.storage.gcs as gcs
from airavata_mft_sdk import mft_client
from airavata_mft_sdk.common import StorageCommon_pb2
from rich.console import Console
@@ -24,7 +24,13 @@ def add_storage():
@app.command("list")
def list_storage():
- client = mft_client.MFTClient()
+ client = mft_client.MFTClient(transfer_api_port = 7003,
+ transfer_api_secured = False,
+ resource_service_host = "localhost",
+ resource_service_port = 7003,
+ resource_service_secured = False,
+ secret_service_host = "localhost",
+ secret_service_port = 7003)
list_req = StorageCommon_pb2.StorageListRequest()
list_response = client.common_api.listStorages(list_req)
diff --git a/python-cli/mft_cli/mft_cli/storage/azure.py b/python-cli/mft_cli/airavata_mft_cli/storage/azure.py
similarity index 85%
rename from python-cli/mft_cli/mft_cli/storage/azure.py
rename to python-cli/mft_cli/airavata_mft_cli/storage/azure.py
index c83b0bb..879bbf0 100644
--- a/python-cli/mft_cli/mft_cli/storage/azure.py
+++ b/python-cli/mft_cli/airavata_mft_cli/storage/azure.py
@@ -16,7 +16,13 @@ def handle_add_storage():
if index == 1: # Manual configuration
connection_string = typer.prompt("Connection String")
- client = mft_client.MFTClient()
+ client = mft_client.MFTClient(transfer_api_port = 7003,
+ transfer_api_secured = False,
+ resource_service_host = "localhost",
+ resource_service_port = 7003,
+ resource_service_secured = False,
+ secret_service_host = "localhost",
+ secret_service_port = 7003)
azure_secret = AzureCredential_pb2.AzureSecret(connectionString = connection_string)
secret_wrapper = MFTAgentStubs_pb2.SecretWrapper(azure=azure_secret)
diff --git a/python-cli/mft_cli/mft_cli/storage/gcs.py b/python-cli/mft_cli/airavata_mft_cli/storage/gcs.py
similarity index 96%
rename from python-cli/mft_cli/mft_cli/storage/gcs.py
rename to python-cli/mft_cli/airavata_mft_cli/storage/gcs.py
index 8adc673..a479f51 100644
--- a/python-cli/mft_cli/mft_cli/storage/gcs.py
+++ b/python-cli/mft_cli/airavata_mft_cli/storage/gcs.py
@@ -114,7 +114,14 @@ def handle_add_storage():
else:
print("No credential found in ~/" + gcs_key_path + " file")
exit()
- client = mft_client.MFTClient()
+
+ client = mft_client.MFTClient(transfer_api_port = 7003,
+ transfer_api_secured = False,
+ resource_service_host = "localhost",
+ resource_service_port = 7003,
+ resource_service_secured = False,
+ secret_service_host = "localhost",
+ secret_service_port = 7003)
gcs_secret = GCSCredential_pb2.GCSSecret(clientEmail=client_email, privateKey=client_secret, projectId=project_id)
secret_wrapper = MFTAgentStubs_pb2.SecretWrapper(gcs=gcs_secret)
diff --git a/python-cli/mft_cli/mft_cli/storage/s3.py b/python-cli/mft_cli/airavata_mft_cli/storage/s3.py
similarity index 91%
rename from python-cli/mft_cli/mft_cli/storage/s3.py
rename to python-cli/mft_cli/airavata_mft_cli/storage/s3.py
index 5b8881e..092a728 100644
--- a/python-cli/mft_cli/mft_cli/storage/s3.py
+++ b/python-cli/mft_cli/airavata_mft_cli/storage/s3.py
@@ -61,7 +61,13 @@ def handle_add_storage():
region, index = pick(aws_regions, "Select the AWS Region", indicator="=>")
endpoint = "https://s3." + region + ".amazonaws.com"
- client = mft_client.MFTClient()
+ client = mft_client.MFTClient(transfer_api_port = 7003,
+ transfer_api_secured = False,
+ resource_service_host = "localhost",
+ resource_service_port = 7003,
+ resource_service_secured = False,
+ secret_service_host = "localhost",
+ secret_service_port = 7003)
s3_secret = S3Credential_pb2.S3Secret(accessKey=client_id, secretKey=client_secret, sessionToken = session_token)
secret_wrapper = MFTAgentStubs_pb2.SecretWrapper(s3=s3_secret)
diff --git a/python-cli/mft_cli/mft_cli/main.py b/python-cli/mft_cli/mft_cli/main.py
deleted file mode 100644
index c89065f..0000000
--- a/python-cli/mft_cli/mft_cli/main.py
+++ /dev/null
@@ -1,180 +0,0 @@
-import typer
-import mft_cli.storage
-from airavata_mft_sdk import mft_client
-from airavata_mft_sdk.common import StorageCommon_pb2
-from airavata_mft_sdk import MFTTransferApi_pb2
-from rich.console import Console
-from rich.table import Table
-from rich.progress import track
-import time
-
-app = typer.Typer()
-
-app.add_typer(mft_cli.storage.app, name="storage")
-
-def fetch_storage_and_secret_ids(storage_name):
- client = mft_client.MFTClient()
- search_req = StorageCommon_pb2.StorageSearchRequest(storageName=storage_name)
- storages = client.common_api.searchStorages(search_req)
-
- if len(storages.storageList) == 0:
- search_req = StorageCommon_pb2.StorageSearchRequest(storageId=storage_name)
- storages = client.common_api.searchStorages(search_req)
-
- if len(storages.storageList) == 0:
- print("No storage with name or id " + storage_name + " was found. Please register the storage with command mft-cli storage add")
- raise typer.Abort()
-
- if len(storages.storageList) > 1:
- print("More than one storage with nam " + storage_name + " was found. Please use the storage id. You can fetch it from mft-cli storage list")
- raise typer.Abort()
-
- storage = storages.storageList[0]
- sec_req = StorageCommon_pb2.SecretForStorageGetRequest(storageId = storage.storageId)
- sec_resp = client.common_api.getSecretForStorage(sec_req)
- if sec_resp.error != 0:
- print("Could not fetch the secret for storage " + storage.storageId)
-
- return sec_resp.storageId, sec_resp.secretId
-def get_resource_metadata(storage_path, recursive_search = False):
- storage_name = storage_path.split("/")[0]
- resource_path = storage_path[len(storage_name) +1 :]
-
- storage_id, secret_id = fetch_storage_and_secret_ids(storage_name)
-
- id_req = MFTTransferApi_pb2.GetResourceMetadataFromIDsRequest(storageId = storage_id,
- secretId = secret_id,
- resourcePath = resource_path)
- resource_medata_req = MFTTransferApi_pb2.FetchResourceMetadataRequest(idRequest = id_req)
-
- client = mft_client.MFTClient()
-
- metadata_resp = client.transfer_api.resourceMetadata(resource_medata_req)
- return metadata_resp
-@app.command("ls")
-def list(storage_path):
-
- metadata_resp = get_resource_metadata(storage_path)
-
- console = Console()
- table = Table()
-
- table.add_column('Name', justify='left')
- table.add_column('Type', justify='center')
- table.add_column('Size', justify='center')
-
- if (metadata_resp.WhichOneof('metadata') == 'directory') :
- for dir in metadata_resp.directory.directories:
- table.add_row('[bold]' + dir.friendlyName + '[/bold]', 'DIR', '')
-
- for file in metadata_resp.directory.files:
- table.add_row('[bold]' + file.friendlyName + '[/bold]', 'FILE', str(file.resourceSize))
-
- elif (metadata_resp.WhichOneof('metadata') == 'file'):
- table.add_row('[bold]' + metadata_resp.file.friendlyName + '[/bold]', 'FILE', str(metadata_resp.file.resourceSize))
-
- elif (metadata_resp.WhichOneof('metadata') == 'error'):
- print(metadata_resp.error)
-
- console.print(table)
-
-def flatten_directories(directory, parent_path, file_list):
- for dir in directory.directories:
- flatten_directories(dir, parent_path + dir.friendlyName + "/", file_list)
-
- for file in directory.files:
- file_list.append((file, parent_path + file.friendlyName))
-
-@app.command("cp")
-def copy(source, destination):
-
- source_storage_id, source_secret_id = fetch_storage_and_secret_ids(source.split("/")[0])
- dest_storage_id, dest_secret_id = fetch_storage_and_secret_ids(destination.split("/")[0])
-
- ## TODO : Check agent availability and deploy cloud agents if required
-
- file_list = []
- source_metadata = get_resource_metadata(source)
- endpoint_paths = []
- total_volume = 0
-
- transfer_request = MFTTransferApi_pb2.TransferApiRequest(sourceStorageId = source_storage_id,
- sourceSecretId = source_secret_id,
- destinationStorageId = dest_storage_id,
- destinationSecretId = dest_secret_id,
- optimizeTransferPath = False)
-
- if (source_metadata.WhichOneof('metadata') == 'directory') :
- if (destination[-1] != "/"):
- print("Source is a directory path so destination path should end with /")
- raise typer.Abort()
-
- flatten_directories(source_metadata.directory, "", file_list)
- for file_entry in file_list:
- file = file_entry[0]
- relative_path = file_entry[1]
- endpoint_paths.append(MFTTransferApi_pb2.EndpointPaths(
- sourcePath = file.resourcePath,
- destinationPath = destination[len(destination.split("/")[0]) +1 :] + relative_path))
- total_volume += file.resourceSize
-
- elif (source_metadata.WhichOneof('metadata') == 'file'):
- file_list.append((source_metadata.file, source_metadata.file.friendlyName))
-
- if destination[-1] == "/":
- destination = destination + source_metadata.file.friendlyName
-
- endpoint_paths.append(MFTTransferApi_pb2.EndpointPaths(
- sourcePath = source_metadata.file.resourcePath,
- destinationPath = destination[len(destination.split("/")[0]) +1 :]))
-
- total_volume += source_metadata.file.resourceSize
-
- elif (source_metadata.WhichOneof('metadata') == 'error'):
- print("Failed while fetching source details")
- print(metadata_resp.error)
- raise typer.Abort()
-
- transfer_request.endpointPaths.extend(endpoint_paths)
-
- confirm = typer.confirm("Total number of " + str(len(endpoint_paths)) +
- " files to be transferred. Total volume is " + str(total_volume)
- + " bytes. Do you want to start the transfer? ", True)
-
- client = mft_client.MFTClient()
- transfer_resp = client.transfer_api.submitTransfer(transfer_request)
-
- if not confirm:
- raise typer.Abort()
-
- transfer_id = transfer_resp.transferId
-
- state_request = MFTTransferApi_pb2.TransferStateApiRequest(transferId=transfer_id)
-
- ## TODO: This has to be optimized and avoid frequent polling of all transfer ids in each iteration
- ## Possible fix is to introduce a parent batch transfer id at the API level and fetch child trnasfer id
- # summaries in a single API call
-
- completed = 0
- failed = 0
-
- with typer.progressbar(length=100) as progress:
-
- while 1:
- state_resp = client.transfer_api.getTransferStateSummary(state_request)
-
- progress.update(int(state_resp.percentage * 100))
- if (state_resp.percentage == 1.0):
- completed = len(state_resp.completed)
- failed = len(state_resp.failed)
- break
-
- if (state_resp.state == "FAILED"):
- print("Transfer failed. Reason: " + state_resp.description)
- raise typer.Abort()
- time.sleep(1)
-
- print(f"Processed {completed + failed} files. Completed {completed}, Failed {failed}.")
-
-if __name__ == "__main__":
- app()
\ No newline at end of file
diff --git a/python-cli/mft_cli/pyproject.toml b/python-cli/mft_cli/pyproject.toml
index efc0fad..3bf7e17 100644
--- a/python-cli/mft_cli/pyproject.toml
+++ b/python-cli/mft_cli/pyproject.toml
@@ -1,17 +1,20 @@
[tool.poetry]
-name = "mft-cli"
-version = "0.1.0"
+name = "airavata-mft-cli"
+version = "0.1.3"
description = "Command Line Client for Airavata MFT data transfer framework"
authors = ["Dimuthu Wannipurage <di...@gmail.com>"]
readme = "README.md"
[tool.poetry.scripts]
-mft-cli = "mft_cli.main:app"
+mft = "airavata_mft_cli.main:app"
[tool.poetry.dependencies]
python = "^3.10"
typer = {extras = ["all"], version = "^0.7.0"}
pick = {version= "2.2.0"}
+grpcio= {version="1.46.3"}
+grpcio-tools = {version="1.46.3"}
+airavata_mft_sdk= {version="0.0.1-alpha21"}
[build-system]
diff --git a/python-sdk/src/airavata_mft_sdk/mft_client.py b/python-sdk/src/airavata_mft_sdk/mft_client.py
index 498da53..4463263 100644
--- a/python-sdk/src/airavata_mft_sdk/mft_client.py
+++ b/python-sdk/src/airavata_mft_sdk/mft_client.py
@@ -22,10 +22,10 @@ from airavata_mft_sdk.scp import SCPSecretService_pb2_grpc
class MFTClient:
def __init__(self, transfer_api_host = "localhost",
- transfer_api_port = 7004,
+ transfer_api_port = 7003,
transfer_api_secured = False,
resource_service_host = "localhost",
- resource_service_port = 7002,
+ resource_service_port = 7003,
resource_service_secured = False,
secret_service_host = "localhost",
secret_service_port = 7003,