You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ma...@apache.org on 2021/04/26 20:55:00 UTC
[airavata-django-portal-sdk] 02/07: AIRAVATA-3420 Implements
DjangoFileSystemProvider,
refactored user_storage module to use UserStorageProvider
This is an automated email from the ASF dual-hosted git repository.
machristie pushed a commit to branch mft-integration
in repository https://gitbox.apache.org/repos/asf/airavata-django-portal-sdk.git
commit 58ff63c4acff0034a70558005601d83888a52714
Author: Marcus Christie <ma...@apache.org>
AuthorDate: Wed Apr 7 18:17:16 2021 -0400
AIRAVATA-3420 Implements DjangoFileSystemProvider, refactored user_storage module to use UserStorageProvider
---
airavata_django_portal_sdk/user_storage.py | 1196 --------------------
.../user_storage/__init__.py | 51 +
airavata_django_portal_sdk/user_storage/api.py | 758 +++++++++++++
.../user_storage/backends/__init__.py | 3 +
.../user_storage/backends/base.py | 64 ++
.../backends/django_filesystem_provider.py | 306 +++++
.../user_storage/backends/mft_provider.py | 152 +++
.../user_storage/backends/remote_api_provider.py | 0
.../user_storage_provider.py | 3 +-
9 files changed, 1336 insertions(+), 1197 deletions(-)
diff --git a/airavata_django_portal_sdk/user_storage.py b/airavata_django_portal_sdk/user_storage.py
deleted file mode 100644
index 5fb07c5..0000000
--- a/airavata_django_portal_sdk/user_storage.py
+++ /dev/null
@@ -1,1196 +0,0 @@
-import cgi
-import copy
-import io
-import logging
-import mimetypes
-import os
-import shutil
-import warnings
-from datetime import datetime
-from http import HTTPStatus
-from urllib.parse import quote, unquote, urlparse
-
-import grpc
-import requests
-from airavata.model.data.replica.ttypes import (
- DataProductModel,
- DataProductType,
- DataReplicaLocationModel,
- ReplicaLocationCategory,
- ReplicaPersistentType
-)
-from django.conf import settings
-from django.core.exceptions import ObjectDoesNotExist, SuspiciousFileOperation
-from django.core.files import File
-from django.core.files.move import file_move_safe
-from django.core.files.storage import FileSystemStorage
-
-from . import MFTApi_pb2, MFTApi_pb2_grpc
-from .util import convert_iso8601_to_datetime
-
-logger = logging.getLogger(__name__)
-
-TMP_INPUT_FILE_UPLOAD_DIR = "tmp"
-
-
-def save(request, path, file, name=None, content_type=None):
- "Save file in path in the user's storage and return DataProduct."
- if _is_remote_api():
- if name is None and hasattr(file, 'name'):
- name = os.path.basename(file.name)
- files = {'file': (name, file, content_type)
- if content_type is not None else file, }
- resp = _call_remote_api(request,
- "/user-storage/~/{path}",
- path_params={"path": path},
- method="post",
- files=files)
- data = resp.json()
- product_uri = data['uploaded']['productUri']
- data_product = request.airavata_client.getDataProduct(
- request.authz_token, product_uri)
- return data_product
- else:
- username = request.user.username
- full_path = _Datastore().save(username, path, file, name=name)
- data_product = _save_data_product(
- request, full_path, name=name, content_type=content_type
- )
- return data_product
-
-
-def move_from_filepath(
- request,
- source_path,
- target_path,
- name=None,
- content_type=None):
- "Move a file from filesystem into user's storage."
- # TODO: deprecate this method
- username = request.user.username
- file_name = name if name is not None else os.path.basename(source_path)
- full_path = _Datastore().move_external(
- source_path, username, target_path, file_name)
- data_product = _save_data_product(
- request, full_path, name=file_name, content_type=content_type
- )
- return data_product
-
-
-def save_input_file(request, file, name=None, content_type=None):
- """Save input file in staging area for input files."""
- if _is_remote_api():
- if name is None and hasattr(file, 'name'):
- name = os.path.basename(file.name)
- files = {'file': (name, file, content_type)
- if content_type is not None else file, }
- resp = _call_remote_api(request,
- "/upload",
- method="post",
- files=files)
- data = resp.json()
- product_uri = data['data-product']['productUri']
- data_product = request.airavata_client.getDataProduct(
- request.authz_token, product_uri)
- return data_product
- else:
- username = request.user.username
- file_name = name if name is not None else os.path.basename(file.name)
- full_path = _Datastore().save(username, TMP_INPUT_FILE_UPLOAD_DIR, file)
- data_product = _save_data_product(
- request, full_path, name=file_name, content_type=content_type
- )
- return data_product
-
-
-def copy_input_file(request, data_product=None, data_product_uri=None):
- # TODO: we could probably deprecate this as well, since we do an open/save
- # to copy instead. Or at least, we don't need it in UserStorageProvider.
- if data_product is None:
- data_product = _get_data_product(request, data_product_uri)
- path = _get_replica_filepath(data_product)
- name = data_product.productName
- full_path = _Datastore().copy(
- data_product.ownerName,
- path,
- request.user.username,
- TMP_INPUT_FILE_UPLOAD_DIR,
- name=name,
- )
- return _save_copy_of_data_product(request, full_path, data_product)
-
-
-def is_input_file(request, data_product=None, data_product_uri=None):
- # TODO: don't need this in UserStorageProvider
- if data_product is None:
- data_product = _get_data_product(request, data_product_uri)
- if _is_remote_api():
- resp = _call_remote_api(
- request,
- "/data-products/",
- params={'product-uri': data_product.productUri})
- data = resp.json()
- return data['isInputFileUpload']
- # Check if file is one of user's files and in TMP_INPUT_FILE_UPLOAD_DIR
- path = _get_replica_filepath(data_product)
- if _Datastore().exists(request.user.username, path):
- rel_path = _Datastore().rel_path(request.user.username, path)
- return os.path.dirname(rel_path) == TMP_INPUT_FILE_UPLOAD_DIR
- else:
- return False
-
-
-def move_input_file(request, data_product=None, path=None, data_product_uri=None):
- # TODO: don't need this in UserStorageProvider
- if data_product is None:
- data_product = _get_data_product(request, data_product_uri)
- source_path = _get_replica_filepath(data_product)
- file_name = data_product.productName
- full_path = _Datastore().move(
- data_product.ownerName,
- source_path,
- request.user.username,
- path,
- file_name)
- _delete_data_product(data_product.ownerName, source_path)
- data_product = _save_copy_of_data_product(request, full_path, data_product)
- return data_product
-
-
-def move_input_file_from_filepath(
- request, source_path, name=None, content_type=None
-):
- # TODO: don't need this in UserStorageProvider
- "Move a file from filesystem into user's input file staging area."
- username = request.user.username
- file_name = name if name is not None else os.path.basename(source_path)
- full_path = _Datastore().move_external(
- source_path, username, TMP_INPUT_FILE_UPLOAD_DIR, file_name
- )
- data_product = _save_data_product(
- request, full_path, name=file_name, content_type=content_type
- )
- return data_product
-
-
-def open_file(request, data_product=None, data_product_uri=None):
- """
- Return file object for replica if it exists in user storage. One of
- `data_product` or `data_product_uri` is required.
- """
- if data_product is None:
- data_product = _get_data_product(request, data_product_uri)
- if _is_remote_api():
- resp = _call_remote_api(
- request,
- "/download",
- params={'data-product-uri': data_product.productUri})
- file = io.BytesIO(resp.content)
- disposition = resp.headers['Content-Disposition']
- disp_value, disp_params = cgi.parse_header(disposition)
- # Give the file object a name just like a real opened file object
- file.name = disp_params['filename']
- return file
- else:
- path = _get_replica_filepath(data_product)
- return _Datastore().open(data_product.ownerName, path)
-
-
-def exists(request, data_product=None, data_product_uri=None):
- """
- Return True if replica for data_product exists in user storage. One of
- `data_product` or `data_product_uri` is required.
- """
- if data_product is None:
- data_product = _get_data_product(request, data_product_uri)
- if _is_remote_api():
- resp = _call_remote_api(
- request,
- "/data-products/",
- params={'product-uri': data_product.productUri})
- data = resp.json()
- return data['downloadURL'] is not None
- else:
- path = _get_replica_filepath(data_product)
- return _Datastore().exists(data_product.ownerName, path)
-
-
-def dir_exists(request, path):
- "Return True if path exists in user's data store."
- if _is_remote_api():
- resp = _call_remote_api(request,
- "/user-storage/~/{path}",
- path_params={"path": path},
- raise_for_status=False)
- if resp.status_code == HTTPStatus.NOT_FOUND:
- return False
- resp.raise_for_status()
- return resp.json()['isDir']
- else:
- user_storage_provider = MFTApiUserStorageProvider()
- return user_storage_provider.dir_exists(request, path)
-
-
-def user_file_exists(request, path):
- """If file exists, return data product URI, else None."""
- if _is_remote_api():
- resp = _call_remote_api(request,
- "/user-storage/~/{path}",
- path_params={"path": path},
- raise_for_status=False)
- if resp.status_code == HTTPStatus.NOT_FOUND or resp.json()['isDir']:
- return None
- resp.raise_for_status()
- return resp.json()['files'][0]['dataProductURI']
- elif _Datastore().exists(request.user.username, path):
- full_path = _Datastore().path(request.user.username, path)
- data_product_uri = _get_data_product_uri(request, full_path)
- return data_product_uri
- else:
- return None
-
-
-def delete_dir(request, path):
- """Delete path in user's data store, if it exists."""
- if _is_remote_api():
- resp = _call_remote_api(request,
- "/user-storage/~/{path}",
- path_params={"path": path},
- method="delete",
- raise_for_status=False)
- _raise_404(resp, f"File path does not exist {path}")
- resp.raise_for_status()
- return
- _Datastore().delete_dir(request.user.username, path)
-
-
-def delete_user_file(request, path):
- """Delete file in user's data store, if it exists."""
- if _is_remote_api():
- resp = _call_remote_api(request,
- "/user-storage/~/{path}",
- path_params={"path": path},
- method="delete",
- raise_for_status=False)
- _raise_404(resp, f"File path does not exist {path}")
- resp.raise_for_status()
- return
- return _Datastore().delete(request.user.username, path)
-
-
-def update_file_content(request, path, fileContentText):
- if _is_remote_api():
- _call_remote_api(request,
- "/user-storage/~/{path}",
- path_params={"path": path},
- method="put",
- data={"fileContentText": fileContentText}
- )
- return
- else:
- full_path = _Datastore().path(request.user.username, path)
- with open(full_path, 'w') as f:
- myfile = File(f)
- myfile.write(fileContentText)
-
-
-def update_data_product_content(request, data_product=None, fileContentText="", data_product_uri=None):
- if data_product is None:
- data_product = _get_data_product(request, data_product_uri)
- # TODO: implement remote api support (DataProductView.put())
- path = _get_replica_filepath(data_product)
- full_path = _Datastore().path(request.user.username, path)
- with open(full_path, 'w') as f:
- myfile = File(f)
- myfile.write(fileContentText)
-
-
-def get_file(request, path):
- if _is_remote_api():
- resp = _call_remote_api(request,
- "/user-storage/~/{path}",
- path_params={"path": path},
- raise_for_status=False
- )
- _raise_404(resp, "User storage file path does not exist")
- data = resp.json()
- if data["isDir"]:
- raise Exception("User storage path is a directory, not a file")
- file = data['files'][0]
- file['created_time'] = convert_iso8601_to_datetime(file['createdTime'])
- file['mime_type'] = file['mimeType']
- file['data-product-uri'] = file['dataProductURI']
- return file
-
- user_storage_provider = MFTApiUserStorageProvider()
- return user_storage_provider.get_file(request, path)
-
-
-def delete(request, data_product=None, data_product_uri=None):
- """
- Delete replica for data product in this data store. One of `data_product`
- or `data_product_uri` is required.
- """
- if data_product is None:
- data_product = _get_data_product(request, data_product_uri)
- if _is_remote_api():
- _call_remote_api(
- request,
- "/delete-file",
- params={'data-product-uri': data_product.productUri},
- method="delete")
- return
- else:
- path = _get_replica_filepath(data_product)
- try:
- _Datastore().delete(data_product.ownerName, path)
- _delete_data_product(data_product.ownerName, path)
- except Exception:
- logger.exception(
- "Unable to delete file {} for data product uri {}".format(
- path, data_product.productUri
- )
- )
- raise
-
-
-def listdir(request, path):
- """Return a tuple of two lists, one for directories, the second for files."""
-
- if _is_remote_api():
- resp = _call_remote_api(request,
- "/user-storage/~/{path}",
- path_params={"path": path},
- )
- data = resp.json()
- for directory in data['directories']:
- # Convert JSON ISO8601 timestamp to datetime instance
- directory['created_time'] = convert_iso8601_to_datetime(
- directory['createdTime'])
- for file in data['files']:
- # Convert JSON ISO8601 timestamp to datetime instance
- file['created_time'] = convert_iso8601_to_datetime(
- file['createdTime'])
- file['mime_type'] = file['mimeType']
- file['data-product-uri'] = file['dataProductURI']
- return data['directories'], data['files']
-
- user_storage_provider = MFTApiUserStorageProvider()
- return user_storage_provider.listdir(request, path)
-
-
-def list_experiment_dir(request, experiment_id, path=""):
- """
- List files, directories in experiment data directory. Returns a tuple,
- see `listdir`.
- """
- if _is_remote_api():
- resp = _call_remote_api(request,
- "/experiment-storage/{experiment_id}/{path}",
- path_params={"path": path,
- "experiment_id": experiment_id},
- )
- data = resp.json()
- for directory in data['directories']:
- # Convert JSON ISO8601 timestamp to datetime instance
- directory['created_time'] = convert_iso8601_to_datetime(
- directory['createdTime'])
- for file in data['files']:
- # Convert JSON ISO8601 timestamp to datetime instance
- file['created_time'] = convert_iso8601_to_datetime(
- file['createdTime'])
- file['mime_type'] = file['mimeType']
- file['data-product-uri'] = file['dataProductURI']
- return data['directories'], data['files']
-
- experiment = request.airavata_client.getExperiment(
- request.authz_token, experiment_id)
- datastore = _Datastore()
- exp_data_path = experiment.userConfigurationData.experimentDataDir
- exp_data_path = os.path.join(exp_data_path, path)
- exp_owner = experiment.userName
- if datastore.dir_exists(exp_owner, exp_data_path):
- directories, files = datastore.list_user_dir(
- exp_owner, exp_data_path)
- directories_data = []
- for d in directories:
- dpath = os.path.join(exp_data_path, d)
- rel_path = os.path.join(path, d)
- created_time = datastore.get_created_time(
- exp_owner, dpath)
- size = datastore.size(exp_owner, dpath)
- directories_data.append(
- {
- "name": d,
- "path": rel_path,
- "created_time": created_time,
- "size": size,
- }
- )
- files_data = []
- for f in files:
- user_rel_path = os.path.join(exp_data_path, f)
- if not datastore.exists(exp_owner, user_rel_path):
- logger.warning(
- f"list_experiment_dir skipping {exp_owner}:{user_rel_path}, "
- "does not exist (broken symlink?)")
- continue
- created_time = datastore.get_created_time(
- exp_owner, user_rel_path
- )
- size = datastore.size(exp_owner, user_rel_path)
- full_path = datastore.path(exp_owner, user_rel_path)
- data_product_uri = _get_data_product_uri(request, full_path, owner=exp_owner)
-
- data_product = request.airavata_client.getDataProduct(
- request.authz_token, data_product_uri)
- mime_type = None
- if 'mime-type' in data_product.productMetadata:
- mime_type = data_product.productMetadata['mime-type']
- files_data.append(
- {
- "name": f,
- "path": user_rel_path,
- "data-product-uri": data_product_uri,
- "created_time": created_time,
- "mime_type": mime_type,
- "size": size,
- "hidden": False,
- }
- )
- return directories_data, files_data
- else:
- raise ObjectDoesNotExist("Experiment data directory does not exist")
-
-
-def experiment_dir_exists(request, experiment_id, path=""):
-
- if _is_remote_api():
- resp = _call_remote_api(request,
- "/experiment-storage/{experiment_id}/{path}",
- path_params={"path": path,
- "experiment_id": experiment_id},
- raise_for_status=False)
- if resp.status_code == HTTPStatus.NOT_FOUND:
- return False
- resp.raise_for_status()
- return resp.json()['isDir']
-
- experiment = request.airavata_client.getExperiment(
- request.authz_token, experiment_id)
- datastore = _Datastore()
- exp_data_path = experiment.userConfigurationData.experimentDataDir
- if exp_data_path is None:
- return False
- exp_data_path = os.path.join(exp_data_path, path)
- exp_owner = experiment.userName
- return datastore.dir_exists(exp_owner, exp_data_path)
-
-
-def get_experiment_dir(
- request,
- project_name=None,
- experiment_name=None,
- path=None):
- return _Datastore().get_experiment_dir(
- request.user.username, project_name, experiment_name, path
- )
-
-
-def create_user_dir(request, path):
- if _is_remote_api():
- logger.debug(f"path={path}")
- _call_remote_api(request,
- "/user-storage/~/{path}",
- path_params={"path": path},
- method="post")
- return
- _Datastore().create_user_dir(request.user.username, path)
-
-
-def get_rel_path(request, path):
- return _Datastore().rel_path(request.user.username, path)
-
-
-def get_rel_experiment_dir(request, experiment_id):
- """Return experiment data dir path relative to user's directory."""
- warnings.warn("Use list_experiment_dir instead.", DeprecationWarning)
- if _is_remote_api():
- resp = _call_remote_api(request,
- "/experiments/{experimentId}/",
- path_params={"experimentId": experiment_id})
- resp.raise_for_status()
- return resp.json()['relativeExperimentDataDir']
-
- experiment = request.airavata_client.getExperiment(
- request.authz_token, experiment_id)
- if (experiment.userConfigurationData and
- experiment.userConfigurationData.experimentDataDir):
- datastore = _Datastore()
- data_dir = experiment.userConfigurationData.experimentDataDir
- if datastore.dir_exists(request.user.username, data_dir):
- return datastore.rel_path(request.user.username, data_dir)
- else:
- return None
- else:
- return None
-
-
-def _get_data_product_uri(request, full_path, owner=None):
-
- from airavata_django_portal_sdk import models
- if owner is None:
- owner = request.user.username
- user_file = models.UserFiles.objects.filter(
- username=owner, file_path=full_path)
- if user_file.exists():
- product_uri = user_file[0].file_dpu
- else:
- data_product = _save_data_product(request, full_path, owner=owner)
- product_uri = data_product.productUri
- return product_uri
-
-
-def _get_data_product(request, data_product_uri):
- return request.airavata_client.getDataProduct(
- request.authz_token, data_product_uri)
-
-
-def _save_data_product(request, full_path, name=None, content_type=None, owner=None):
- "Create, register and record in DB a data product for full_path."
- if owner is None:
- owner = request.user.username
- data_product = _create_data_product(
- owner, full_path, name=name, content_type=content_type
- )
- product_uri = _register_data_product(request, full_path, data_product, owner=owner)
- data_product.productUri = product_uri
- return data_product
-
-
-def _register_data_product(request, full_path, data_product, owner=None):
- if owner is None:
- owner = request.user.username
- product_uri = request.airavata_client.registerDataProduct(
- request.authz_token, data_product
- )
- from airavata_django_portal_sdk import models
- user_file_instance = models.UserFiles(
- username=owner,
- file_path=full_path,
- file_dpu=product_uri)
- user_file_instance.save()
- return product_uri
-
-
-def _save_copy_of_data_product(request, full_path, data_product):
- """Save copy of a data product with a different path."""
- data_product_copy = _copy_data_product(request, data_product, full_path)
- product_uri = _register_data_product(request, full_path, data_product_copy)
- data_product_copy.productUri = product_uri
- return data_product_copy
-
-
-def _copy_data_product(request, data_product, full_path):
- """Create an unsaved copy of a data product with different path."""
- data_product_copy = copy.copy(data_product)
- data_product_copy.productUri = None
- data_product_copy.ownerName = request.user.username
- data_replica_location = _create_replica_location(
- full_path, data_product_copy.productName
- )
- data_product_copy.replicaLocations = [data_replica_location]
- return data_product_copy
-
-
-def _delete_data_product(username, full_path):
- # TODO: call API to delete data product from replica catalog when it is
- # available (not currently implemented)
- from airavata_django_portal_sdk import models
- user_file = models.UserFiles.objects.filter(
- username=username, file_path=full_path)
- if user_file.exists():
- user_file.delete()
-
-
-def _create_data_product(username, full_path, name=None, content_type=None):
- data_product = DataProductModel()
- data_product.gatewayId = settings.GATEWAY_ID
- data_product.ownerName = username
- if name is not None:
- file_name = name
- else:
- file_name = os.path.basename(full_path)
- data_product.productName = file_name
- data_product.dataProductType = DataProductType.FILE
- final_content_type = _determine_content_type(full_path, content_type)
- if final_content_type is not None:
- data_product.productMetadata = {"mime-type": final_content_type}
- data_replica_location = _create_replica_location(full_path, file_name)
- data_product.replicaLocations = [data_replica_location]
- return data_product
-
-
-def _determine_content_type(full_path, content_type=None):
- result = content_type
- if result is None:
- # Try to guess the content-type from file extension
- guessed_type, encoding = mimetypes.guess_type(full_path)
- result = guessed_type
- if result is None or result == "application/octet-stream":
- # Check if file is Unicode text by trying to read some of it
- try:
- open(full_path, "r").read(1024)
- result = "text/plain"
- except UnicodeDecodeError:
- logger.debug(f"Failed to read as Unicode text: {full_path}")
- return result
-
-
-def _create_replica_location(full_path, file_name):
- data_replica_location = DataReplicaLocationModel()
- data_replica_location.storageResourceId = settings.GATEWAY_DATA_STORE_RESOURCE_ID
- data_replica_location.replicaName = "{} gateway data store copy".format(
- file_name)
- data_replica_location.replicaLocationCategory = (
- ReplicaLocationCategory.GATEWAY_DATA_STORE
- )
- data_replica_location.replicaPersistentType = ReplicaPersistentType.TRANSIENT
- data_replica_location.filePath = "file://{}:{}".format(
- settings.GATEWAY_DATA_STORE_HOSTNAME, quote(full_path)
- )
- return data_replica_location
-
-
-def _get_replica_filepath(data_product):
- replica_filepaths = [
- rep.filePath
- for rep in data_product.replicaLocations
- if rep.replicaLocationCategory == ReplicaLocationCategory.GATEWAY_DATA_STORE
- ]
- replica_filepath = replica_filepaths[0] if len(
- replica_filepaths) > 0 else None
- if replica_filepath:
- return unquote(urlparse(replica_filepath).path)
- return None
-
-
-def _is_remote_api():
- return getattr(settings, 'GATEWAY_DATA_STORE_REMOTE_API', None) is not None
-
-
-def _call_remote_api(
- request,
- path,
- path_params=None,
- method="get",
- raise_for_status=True,
- **kwargs):
-
- headers = {
- 'Authorization': f'Bearer {request.authz_token.accessToken}'}
- encoded_path_params = {}
- if path_params is not None:
- for pk, pv in path_params.items():
- encoded_path_params[pk] = quote(pv)
- encoded_path = path.format(**encoded_path_params)
- logger.debug(f"encoded_path={encoded_path}")
- r = requests.request(
- method,
- f'{settings.GATEWAY_DATA_STORE_REMOTE_API}{encoded_path}',
- headers=headers,
- **kwargs,
- )
- if raise_for_status:
- r.raise_for_status()
- return r
-
-
-def _raise_404(response, msg, exception_class=ObjectDoesNotExist):
- if response.status_code == 404:
- raise exception_class(msg)
-
-
-class _Datastore:
- """Internal datastore abstraction."""
-
- def __init__(self, directory=None):
- if getattr(
- settings,
- 'GATEWAY_DATA_STORE_REMOTE_API',
- None) is not None:
- raise Exception(
- f"This Django portal instance is configured to connect to a "
- f"remote data store via API (settings.GATEWAY_DATA_STORE_REMOTE_API="
- f"{settings.GATEWAY_DATA_STORE_REMOTE_API}). This local "
- f"Datastore instance is not available in remote data store mode.")
- if directory:
- self.directory = directory
- else:
- self.directory = settings.GATEWAY_DATA_STORE_DIR
-
- def exists(self, username, path):
- """Check if file path exists in this data store."""
- try:
- return self._user_data_storage(username).exists(
- path) and os.path.isfile(self.path(username, path))
- except SuspiciousFileOperation as e:
- logger.warning(
- "Invalid path for user {}: {}".format(
- username, str(e)))
- return False
-
- def dir_exists(self, username, path):
- """Check if directory path exists in this data store."""
- try:
- return self._user_data_storage(username).exists(
- path) and os.path.isdir(self.path(username, path))
- except SuspiciousFileOperation as e:
- logger.warning(
- "Invalid path for user {}: {}".format(
- username, str(e)))
- return False
-
- def open(self, username, path):
- """Open path for user if it exists in this data store."""
- if self.exists(username, path):
- return self._user_data_storage(username).open(path)
- else:
- raise ObjectDoesNotExist(
- "File path does not exist: {}".format(path))
-
- def save(self, username, path, file, name=None):
- """Save file to username/path in data store."""
- # file.name may be full path, so get just the name of the file
- file_name = name if name is not None else os.path.basename(file.name)
- user_data_storage = self._user_data_storage(username)
- file_path = os.path.join(
- path, user_data_storage.get_valid_name(file_name))
- input_file_name = user_data_storage.save(file_path, file)
- input_file_fullpath = user_data_storage.path(input_file_name)
- return input_file_fullpath
-
- def move(
- self,
- source_username,
- source_path,
- target_username,
- target_dir,
- file_name):
- source_full_path = self.path(source_username, source_path)
- user_data_storage = self._user_data_storage(target_username)
- # Make file_name a valid filename
- target_path = os.path.join(
- target_dir, user_data_storage.get_valid_name(file_name)
- )
- # Get available file path: if there is an existing file at target_path
- # create a uniquely named path
- target_path = user_data_storage.get_available_name(target_path)
- target_full_path = self.path(target_username, target_path)
- file_move_safe(source_full_path, target_full_path)
- return target_full_path
-
- def move_external(
- self,
- external_path,
- target_username,
- target_dir,
- file_name):
- user_data_storage = self._user_data_storage(target_username)
- # Make file_name a valid filename
- target_path = os.path.join(
- target_dir, user_data_storage.get_valid_name(file_name)
- )
- # Get available file path: if there is an existing file at target_path
- # create a uniquely named path
- target_path = user_data_storage.get_available_name(target_path)
- if not self.dir_exists(target_username, target_dir):
- self.create_user_dir(target_username, target_dir)
- target_full_path = self.path(target_username, target_path)
- file_move_safe(external_path, target_full_path)
- return target_full_path
-
- def create_user_dir(self, username, path):
- user_data_storage = self._user_data_storage(username)
- if not user_data_storage.exists(path):
- self._makedirs(username, path)
- else:
- raise Exception("Directory {} already exists".format(path))
-
- def copy(
- self,
- source_username,
- source_path,
- target_username,
- target_path,
- name=None):
- """Copy a user file into target_path dir."""
- f = self.open(source_username, source_path)
- return self.save(target_username, target_path, f, name=name)
-
- def delete(self, username, path):
- """Delete file in this data store."""
- if self.exists(username, path):
- user_data_storage = self._user_data_storage(username)
- user_data_storage.delete(path)
- else:
- raise ObjectDoesNotExist(
- "File path does not exist: {}".format(path))
-
- def delete_dir(self, username, path):
- """Delete entire directory in this data store."""
- if self.dir_exists(username, path):
- user_path = self.path(username, path)
- shutil.rmtree(user_path)
- else:
- raise ObjectDoesNotExist(
- "File path does not exist: {}".format(path))
-
- def get_experiment_dir(
- self, username, project_name=None, experiment_name=None, path=None
- ):
- """Return an experiment directory (full path) for the given experiment."""
- user_experiment_data_storage = self._user_data_storage(username)
- if path is None:
- proj_dir_name = user_experiment_data_storage.get_valid_name(
- project_name)
- # AIRAVATA-3245 Make project directory with correct permissions
- if not user_experiment_data_storage.exists(proj_dir_name):
- self._makedirs(username, proj_dir_name)
- experiment_dir_name = os.path.join(
- proj_dir_name,
- user_experiment_data_storage.get_valid_name(experiment_name),
- )
- # Since there may already be another experiment with the same name in
- # this project, we need to check for available name
- experiment_dir_name = user_experiment_data_storage.get_available_name(
- experiment_dir_name)
- experiment_dir = user_experiment_data_storage.path(
- experiment_dir_name)
- else:
- # path can be relative to the user's storage space or absolute (as long
- # as it is still inside the user's storage space)
- # if path is passed in, assumption is that it has already been
- # created
- user_experiment_data_storage = self._user_data_storage(username)
- experiment_dir = user_experiment_data_storage.path(path)
- if not user_experiment_data_storage.exists(experiment_dir):
- self._makedirs(username, experiment_dir)
- return experiment_dir
-
- def _makedirs(self, username, dir_path):
- user_experiment_data_storage = self._user_data_storage(username)
- full_path = user_experiment_data_storage.path(dir_path)
- os.makedirs(
- full_path,
- mode=user_experiment_data_storage.directory_permissions_mode)
- # os.makedirs mode isn't always respected so need to chmod to be sure
- os.chmod(
- full_path,
- mode=user_experiment_data_storage.directory_permissions_mode)
-
- def list_user_dir(self, username, file_path):
- logger.debug("file_path={}".format(file_path))
- user_data_storage = self._user_data_storage(username)
- return user_data_storage.listdir(file_path)
-
- def get_created_time(self, username, file_path):
- user_data_storage = self._user_data_storage(username)
- return user_data_storage.get_created_time(file_path)
-
- def size(self, username, file_path):
- user_data_storage = self._user_data_storage(username)
- full_path = self.path(username, file_path)
- if os.path.isdir(full_path):
- return self._get_dir_size(full_path)
- else:
- return user_data_storage.size(file_path)
-
- def path(self, username, file_path):
- user_data_storage = self._user_data_storage(username)
- return user_data_storage.path(file_path)
-
- def rel_path(self, username, file_path):
- full_path = self.path(username, file_path)
- return os.path.relpath(full_path, self.path(username, ""))
-
- def _user_data_storage(self, username):
- return FileSystemStorage(
- location=os.path.join(
- self.directory, username))
-
- # from https://stackoverflow.com/a/1392549
- def _get_dir_size(self, start_path="."):
- total_size = 0
- for dirpath, dirnames, filenames in os.walk(start_path):
- for f in filenames:
- fp = os.path.join(dirpath, f)
- # Check for broken symlinks (.exists return False for broken
- # symlinks)
- if os.path.exists(fp):
- total_size += os.path.getsize(fp)
- return total_size
-
-
-class UserStorageProvider:
- def dir_exists(self, request, path):
- raise NotImplementedError()
-
- def listdir(self, request, path):
- raise NotImplementedError()
-
- def get_file(self, request, path):
- raise NotImplementedError()
-
-
-class FileSystemUserStorageProvider(UserStorageProvider):
- def dir_exists(self, request, path):
- return _Datastore().dir_exists(request.user.username, path)
-
- def listdir(self, request, path):
- datastore = _Datastore()
- if datastore.dir_exists(request.user.username, path):
- directories, files = datastore.list_user_dir(
- request.user.username, path)
- directories_data = []
- for d in directories:
- dpath = os.path.join(path, d)
- created_time = datastore.get_created_time(
- request.user.username, dpath)
- size = datastore.size(request.user.username, dpath)
- directories_data.append(
- {
- "name": d,
- "path": dpath,
- "created_time": created_time,
- "size": size,
- "hidden": dpath == TMP_INPUT_FILE_UPLOAD_DIR,
- }
- )
- files_data = []
- for f in files:
- user_rel_path = os.path.join(path, f)
- if not datastore.exists(request.user.username, user_rel_path):
- logger.warning(f"listdir skipping {request.user.username}:{user_rel_path}, "
- "does not exist (broken symlink?)")
- continue
- created_time = datastore.get_created_time(
- request.user.username, user_rel_path
- )
- size = datastore.size(request.user.username, user_rel_path)
- full_path = datastore.path(request.user.username, user_rel_path)
- data_product_uri = _get_data_product_uri(request, full_path)
-
- data_product = request.airavata_client.getDataProduct(
- request.authz_token, data_product_uri)
- mime_type = None
- if 'mime-type' in data_product.productMetadata:
- mime_type = data_product.productMetadata['mime-type']
- files_data.append(
- {
- "name": f,
- "path": user_rel_path,
- "data-product-uri": data_product_uri,
- "created_time": created_time,
- "mime_type": mime_type,
- "size": size,
- "hidden": False,
- }
- )
- return directories_data, files_data
- else:
- raise ObjectDoesNotExist("User storage path does not exist")
-
- def get_file(self, request, path):
-
- if _is_remote_api():
- resp = _call_remote_api(request,
- "/user-storage/~/{path}",
- path_params={"path": path},
- raise_for_status=False
- )
- _raise_404(resp, "User storage file path does not exist")
- data = resp.json()
- if data["isDir"]:
- raise Exception("User storage path is a directory, not a file")
- file = data['files'][0]
- file['created_time'] = convert_iso8601_to_datetime(file['createdTime'])
- file['mime_type'] = file['mimeType']
- file['data-product-uri'] = file['dataProductURI']
- return file
- datastore = _Datastore()
- if datastore.exists(request.user.username, path):
- created_time = datastore.get_created_time(
- request.user.username, path)
- size = datastore.size(request.user.username, path)
- full_path = datastore.path(request.user.username, path)
- data_product_uri = _get_data_product_uri(request, full_path)
- dir_path, file_name = os.path.split(path)
-
- data_product = request.airavata_client.getDataProduct(
- request.authz_token, data_product_uri)
- mime_type = None
- if 'mime-type' in data_product.productMetadata:
- mime_type = data_product.productMetadata['mime-type']
-
- return {
- 'name': full_path,
- 'path': dir_path,
- 'data-product-uri': data_product_uri,
- 'created_time': created_time,
- 'mime_type': mime_type,
- 'size': size,
- 'hidden': False
- }
- else:
- raise ObjectDoesNotExist("User storage file path does not exist")
-
-
-class MFTApiUserStorageProvider(UserStorageProvider):
- def __init__(self) -> None:
- super().__init__()
-
- def dir_exists(self, request, path):
- with grpc.insecure_channel('localhost:7004') as channel:
- # remove trailing slash and figure out parent path
- # FIXME remove the hard coded /tmp path
- parent_path, child_path = os.path.split(f"/tmp/{path}".rstrip("/"))
- logger.debug(f"parent_path={parent_path}, child_path={child_path}")
- stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
- # Get metadata for parent directory and see if child_path exists
- request = MFTApi_pb2.FetchResourceMetadataRequest(
- resourceId="remote-ssh-dir-resource",
- resourceType="SCP",
- resourceToken="local-ssh-cred",
- resourceBackend="FILE",
- resourceCredentialBackend="FILE",
- targetAgentId="agent0",
- childPath=parent_path,
- mftAuthorizationToken="user token")
- response = stub.getDirectoryResourceMetadata(request)
- # if not child_path, then return True since the response was
- # successful and we just need to confirm the existence of the root dir
- if child_path == '':
- return True
- return child_path in map(lambda f: f.friendlyName, response.directories)
-
- def listdir(self, request, path):
- # TODO setup resourceId, etc from __init__ arguments
- channel = grpc.insecure_channel('localhost:7004')
- stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
- request = MFTApi_pb2.FetchResourceMetadataRequest(
- resourceId="remote-ssh-dir-resource",
- resourceType="SCP",
- resourceToken="local-ssh-cred",
- resourceBackend="FILE",
- resourceCredentialBackend="FILE",
- targetAgentId="agent0",
- childPath=f"/tmp/{path}",
- mftAuthorizationToken="user token")
- response = stub.getDirectoryResourceMetadata(request)
- directories_data = []
- for d in response.directories:
-
- dpath = os.path.join(path, d.friendlyName)
- created_time = datetime.fromtimestamp(d.createdTime)
- # TODO MFT API doesn't report size
- size = 0
- directories_data.append(
- {
- "name": d.friendlyName,
- "path": dpath,
- "created_time": created_time,
- "size": size,
- # TODO how to handle hidden directories or directories for
- # staging input file uploads
- "hidden": False
- }
- )
- files_data = []
- for f in response.files:
- user_rel_path = os.path.join(path, f.friendlyName)
- # TODO do we need to check for broken symlinks?
- created_time = datetime.fromtimestamp(f.createdTime)
- # TODO get the size as well
- size = 0
- # full_path = datastore.path(request.user.username, user_rel_path)
- # TODO how do we register these as data products, do we need to?
- # data_product_uri = _get_data_product_uri(request, full_path)
-
- # data_product = request.airavata_client.getDataProduct(
- # request.authz_token, data_product_uri)
- # mime_type = None
- # if 'mime-type' in data_product.productMetadata:
- # mime_type = data_product.productMetadata['mime-type']
- files_data.append(
- {
- "name": f.friendlyName,
- "path": user_rel_path,
- "data-product-uri": None,
- "created_time": created_time,
- "mime_type": None,
- "size": size,
- "hidden": False,
- }
- )
- return directories_data, files_data
-
- def get_file(self, request, path):
- # FIXME remove hard coded /tmp path
- path = f"/tmp/{path}".rstrip("/")
- file_metadata = self._get_file(path)
- if file_metadata is not None:
- user_rel_path = os.path.join(path, file_metadata.friendlyName)
- created_time = datetime.fromtimestamp(file_metadata.createdTime)
- # TODO get the size as well
- size = 0
-
- return {
- "name": file_metadata.friendlyName,
- "path": user_rel_path,
- "data-product-uri": None,
- "created_time": created_time,
- "mime_type": None,
- "size": size,
- "hidden": False,
- }
- else:
- raise ObjectDoesNotExist("User storage file path does not exist")
-
- def _get_file(self, path):
- with grpc.insecure_channel('localhost:7004') as channel:
- stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
- # Get metadata for parent directory and see if child_path exists
- request = MFTApi_pb2.FetchResourceMetadataRequest(
- resourceId="remote-ssh-dir-resource",
- resourceType="SCP",
- resourceToken="local-ssh-cred",
- resourceBackend="FILE",
- resourceCredentialBackend="FILE",
- targetAgentId="agent0",
- childPath=path,
- mftAuthorizationToken="user token")
- try:
- # TODO is there a better way to check if file exists than catching exception?
- return stub.getFileResourceMetadata(request)
- except Exception:
- logger.exception(f"_get_file({path})")
- return None
-
- def _get_download_url(self, path):
-
- with grpc.insecure_channel('localhost:7004') as channel:
- stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
- download_request = MFTApi_pb2.HttpDownloadApiRequest(sourceStoreId="remote-ssh-storage",
- sourcePath="/tmp/a.txt",
- sourceToken="local-ssh-cred",
- sourceType="SCP",
- targetAgent="agent0",
- mftAuthorizationToken="")
- try:
- # TODO is there a better way to check if file exists than catching exception?
- # response stub.submitHttpDownload(request)
- pass
- except Exception:
- logger.exception(f"_get_file({path})")
- return None
diff --git a/airavata_django_portal_sdk/user_storage/__init__.py b/airavata_django_portal_sdk/user_storage/__init__.py
new file mode 100644
index 0000000..21a22b4
--- /dev/null
+++ b/airavata_django_portal_sdk/user_storage/__init__.py
@@ -0,0 +1,51 @@
+from .api import (
+ copy_input_file,
+ create_user_dir,
+ delete,
+ delete_dir,
+ delete_user_file,
+ dir_exists,
+ exists,
+ experiment_dir_exists,
+ get_experiment_dir,
+ get_file,
+ get_file_metadata,
+ get_rel_experiment_dir,
+ is_input_file,
+ list_experiment_dir,
+ listdir,
+ move,
+ move_input_file,
+ open_file,
+ save,
+ save_input_file,
+ update_data_product_content,
+ update_file_content,
+ user_file_exists
+)
+
+__all__ = [
+ 'copy_input_file',
+ 'create_user_dir',
+ 'delete',
+ 'delete_dir',
+ 'delete_user_file',
+ 'dir_exists',
+ 'exists',
+ 'experiment_dir_exists',
+ 'get_experiment_dir',
+ 'get_file',
+ 'get_file_metadata',
+ 'get_rel_experiment_dir',
+ 'is_input_file',
+ 'list_experiment_dir',
+ 'listdir',
+ 'move',
+ 'move_input_file',
+ 'open_file',
+ 'save',
+ 'save_input_file',
+ 'update_data_product_content',
+ 'update_file_content',
+ 'user_file_exists'
+]
diff --git a/airavata_django_portal_sdk/user_storage/api.py b/airavata_django_portal_sdk/user_storage/api.py
new file mode 100644
index 0000000..4a21498
--- /dev/null
+++ b/airavata_django_portal_sdk/user_storage/api.py
@@ -0,0 +1,758 @@
+import cgi
+import copy
+import importlib
+import io
+import logging
+import mimetypes
+import os
+import warnings
+from http import HTTPStatus
+from urllib.parse import quote, unquote, urlparse
+
+import requests
+from airavata.model.data.replica.ttypes import (
+ DataProductModel,
+ DataProductType,
+ DataReplicaLocationModel,
+ ReplicaLocationCategory,
+ ReplicaPersistentType
+)
+from django.conf import settings
+from django.core.exceptions import ObjectDoesNotExist
+
+from ..util import convert_iso8601_to_datetime
+
+logger = logging.getLogger(__name__)
+
+TMP_INPUT_FILE_UPLOAD_DIR = "tmp"
+
+
+def get_user_storage_provider(request, owner_username=None, storage_resource_id=None):
+ # TODO: default the module_class_name to MFT provider
+ module_class_name = None
+ options = {}
+ if storage_resource_id is None:
+ if not hasattr(settings, 'USER_STORAGES'):
+ # make this backward compatible with the older settings
+ module_class_name = 'airavata_django_portal_sdk.user_storage.backends.DjangoFileSystemProvider'
+ storage_resource_id = settings.GATEWAY_DATA_STORE_RESOURCE_ID
+ options = dict(directory=settings.GATEWAY_DATA_STORE_DIR)
+ logger.warning("Please add the USER_STORAGES setting. Using legacy GATEWAY_DATA_STORE_RESOURCE_ID and GATEWAY_DATA_STORE_DIR settings.")
+ else:
+ conf = settings.USER_STORAGES["default"]
+ module_class_name = conf['BACKEND']
+ storage_resource_id = conf['STORAGE_RESOURCE_ID']
+ options = conf.get('OPTIONS', {})
+ else:
+ if not hasattr(settings, 'USER_STORAGES'):
+ # make this backward compatible with the older settings
+ module_class_name = 'airavata_django_portal_sdk.user_storage.backends.DjangoFileSystemProvider'
+ storage_resource_id = settings.GATEWAY_DATA_STORE_RESOURCE_ID
+ options = dict(directory=settings.GATEWAY_DATA_STORE_DIR)
+ logger.warning("Please add the USER_STORAGES setting. Using legacy GATEWAY_DATA_STORE_RESOURCE_ID and GATEWAY_DATA_STORE_DIR settings.")
+ else:
+ for conf in settings.USER_STORAGES:
+ if conf['STORAGE_RESOURCE_ID'] == storage_resource_id:
+ module_class_name = conf['BACKEND']
+ options = conf.get('OPTIONS', {})
+ break
+ module_name, class_name = module_class_name.rsplit(".", 1)
+ module = importlib.import_module(module_name)
+ BackendClass = getattr(module, class_name)
+ authz_token = request.authz_token
+ context = {
+ 'request': request,
+ 'owner_username': owner_username,
+ }
+ instance = BackendClass(authz_token, storage_resource_id, context=context, **options)
+ return instance
+
+
+def save(request, path, file, name=None, content_type=None, storage_resource_id=None):
+ "Save file in path in the user's storage and return DataProduct."
+ if _is_remote_api():
+ if name is None and hasattr(file, 'name'):
+ name = os.path.basename(file.name)
+ files = {'file': (name, file, content_type)
+ if content_type is not None else file, }
+ resp = _call_remote_api(request,
+ "/user-storage/~/{path}",
+ path_params={"path": path},
+ method="post",
+ files=files)
+ data = resp.json()
+ product_uri = data['uploaded']['productUri']
+ data_product = request.airavata_client.getDataProduct(
+ request.authz_token, product_uri)
+ return data_product
+ backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+ storage_resource_id, resource_path = backend.save(path, file, name=name, content_type=content_type)
+ data_product = _save_data_product(
+ request, resource_path, storage_resource_id, name=name, content_type=content_type
+ )
+ return data_product
+
+
+def save_input_file(request, file, name=None, content_type=None, storage_resource_id=None):
+ """Save input file in staging area for input files."""
+ if _is_remote_api():
+ if name is None and hasattr(file, 'name'):
+ name = os.path.basename(file.name)
+ files = {'file': (name, file, content_type)
+ if content_type is not None else file, }
+ resp = _call_remote_api(request,
+ "/upload",
+ method="post",
+ files=files)
+ data = resp.json()
+ product_uri = data['data-product']['productUri']
+ data_product = request.airavata_client.getDataProduct(
+ request.authz_token, product_uri)
+ return data_product
+ else:
+ backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+ file_name = name if name is not None else os.path.basename(file.name)
+ storage_resource_id, resource_path = backend.save(
+ TMP_INPUT_FILE_UPLOAD_DIR, file, name=file_name)
+ data_product = _save_data_product(
+ request, resource_path, storage_resource_id, name=name, content_type=content_type
+ )
+ return data_product
+
+
+def copy_input_file(request, data_product=None, data_product_uri=None, storage_resource_id=None):
+ if data_product is None:
+ data_product = _get_data_product(request, data_product_uri)
+ source_storage_resource_id, source_resource_path = _get_replica_resource_id_and_filepath(data_product)
+ source_backend = get_user_storage_provider(request,
+ owner_username=data_product.ownerName,
+ storage_resource_id=source_storage_resource_id)
+ file = source_backend.open(source_resource_path)
+ name = data_product.productName
+ target_backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+ storage_resource_id, full_path = target_backend.save(TMP_INPUT_FILE_UPLOAD_DIR, file, name=name)
+ return _save_copy_of_data_product(request, full_path, data_product, storage_resource_id)
+
+
+def is_input_file(request, data_product=None, data_product_uri=None):
+ if data_product is None:
+ data_product = _get_data_product(request, data_product_uri)
+ if _is_remote_api():
+ resp = _call_remote_api(
+ request,
+ "/data-products/",
+ params={'product-uri': data_product.productUri})
+ data = resp.json()
+ return data['isInputFileUpload']
+ # Check if file is one of user's files and in TMP_INPUT_FILE_UPLOAD_DIR
+ storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
+ backend = get_user_storage_provider(request,
+ owner_username=data_product.ownerName,
+ storage_resource_id=storage_resource_id)
+ if backend.exists(path):
+ directories, files = backend.get_metadata(path)
+ rel_path = files[0]['path']
+ return os.path.dirname(rel_path) == TMP_INPUT_FILE_UPLOAD_DIR
+ else:
+ return False
+
+
+def move(request, data_product=None, path=None, data_product_uri=None, storage_resource_id=None):
+ if data_product is None:
+ data_product = _get_data_product(request, data_product_uri)
+ source_storage_resource_id, source_path = _get_replica_resource_id_and_filepath(data_product)
+ source_backend = get_user_storage_provider(request,
+ owner_username=data_product.ownerName,
+ storage_resource_id=source_storage_resource_id)
+ file = source_backend.open(source_path)
+ file_name = data_product.productName
+ target_backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+ storage_resource_id, full_path = target_backend.save(path, file, name=file_name)
+ data_product_copy = _save_copy_of_data_product(request, full_path, data_product, storage_resource_id)
+ # Remove the source file and data product metadata
+ source_backend.delete(source_path)
+ _delete_data_product(data_product.ownerName, source_path)
+ return data_product_copy
+
+
+def move_input_file(request, data_product=None, path=None, data_product_uri=None, storage_resource_id=None):
+ warnings.warn("Use 'move' instead.", DeprecationWarning)
+ return move(request, data_product=data_product, path=path, data_product_uri=data_product_uri, storage_resource_id=storage_resource_id)
+
+
+def open_file(request, data_product=None, data_product_uri=None):
+ """
+ Return file object for replica if it exists in user storage. One of
+ `data_product` or `data_product_uri` is required.
+ """
+ if data_product is None:
+ data_product = _get_data_product(request, data_product_uri)
+ if _is_remote_api():
+ resp = _call_remote_api(
+ request,
+ "/download",
+ params={'data-product-uri': data_product.productUri})
+ file = io.BytesIO(resp.content)
+ disposition = resp.headers['Content-Disposition']
+ disp_value, disp_params = cgi.parse_header(disposition)
+ # Give the file object a name just like a real opened file object
+ file.name = disp_params['filename']
+ return file
+ else:
+ storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
+ backend = get_user_storage_provider(request,
+ owner_username=data_product.ownerName,
+ storage_resource_id=storage_resource_id)
+ return backend.open(path)
+
+
+def exists(request, data_product=None, data_product_uri=None):
+ """
+ Return True if replica for data_product exists in user storage. One of
+ `data_product` or `data_product_uri` is required.
+ """
+ if data_product is None:
+ data_product = _get_data_product(request, data_product_uri)
+ if _is_remote_api():
+ resp = _call_remote_api(
+ request,
+ "/data-products/",
+ params={'product-uri': data_product.productUri})
+ data = resp.json()
+ return data['downloadURL'] is not None
+ else:
+ storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
+ backend = get_user_storage_provider(request,
+ owner_username=data_product.ownerName,
+ storage_resource_id=storage_resource_id)
+ return backend.exists(path)
+
+
+def dir_exists(request, path, storage_resource_id=None):
+ "Return True if path exists in user's data store."
+ if _is_remote_api():
+ resp = _call_remote_api(request,
+ "/user-storage/~/{path}",
+ path_params={"path": path},
+ raise_for_status=False)
+ if resp.status_code == HTTPStatus.NOT_FOUND:
+ return False
+ resp.raise_for_status()
+ return resp.json()['isDir']
+ else:
+ backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+ return backend.exists(path)
+
+
+def user_file_exists(request, path, storage_resource_id=None):
+ """If file exists, return data product URI, else None."""
+ if _is_remote_api():
+ resp = _call_remote_api(request,
+ "/user-storage/~/{path}",
+ path_params={"path": path},
+ raise_for_status=False)
+ if resp.status_code == HTTPStatus.NOT_FOUND or resp.json()['isDir']:
+ return None
+ resp.raise_for_status()
+ return resp.json()['files'][0]['dataProductURI']
+ backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+ if backend.exists(path) and backend.is_file(path):
+ _, files = backend.get_metadata(path)
+ full_path = files[0]['resource_path']
+ data_product_uri = _get_data_product_uri(request, full_path, backend.resource_id)
+ return data_product_uri
+ else:
+ return None
+
+
+def delete_dir(request, path, storage_resource_id=None):
+ """Delete path in user's data store, if it exists."""
+ if _is_remote_api():
+ resp = _call_remote_api(request,
+ "/user-storage/~/{path}",
+ path_params={"path": path},
+ method="delete",
+ raise_for_status=False)
+ _raise_404(resp, f"File path does not exist {path}")
+ resp.raise_for_status()
+ return
+ backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+ backend.delete(path)
+
+
+def delete_user_file(request, path, storage_resource_id=None):
+ """Delete file in user's data store, if it exists."""
+ if _is_remote_api():
+ resp = _call_remote_api(request,
+ "/user-storage/~/{path}",
+ path_params={"path": path},
+ method="delete",
+ raise_for_status=False)
+ _raise_404(resp, f"File path does not exist {path}")
+ resp.raise_for_status()
+ return
+ backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+ backend.delete(path)
+
+
+def update_file_content(request, path, fileContentText, storage_resource_id=None):
+ if _is_remote_api():
+ _call_remote_api(request,
+ "/user-storage/~/{path}",
+ path_params={"path": path},
+ method="put",
+ data={"fileContentText": fileContentText}
+ )
+ return
+ else:
+ backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+ file = io.StringIO(fileContentText)
+ backend.update(path, file)
+
+
+def update_data_product_content(request, data_product=None, fileContentText="", data_product_uri=None):
+ if data_product is None:
+ data_product = _get_data_product(request, data_product_uri)
+ if _is_remote_api():
+ _call_remote_api(request,
+ "/data-products/",
+ params={'product-uri': data_product.productUri},
+ method="put",
+ data={"fileContentText": fileContentText},
+ )
+ return
+ storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
+ update_file_content(request, path, fileContentText, storage_resource_id=storage_resource_id)
+
+
+def get_file_metadata(request, path, storage_resource_id=None):
+ if _is_remote_api():
+ resp = _call_remote_api(request,
+ "/user-storage/~/{path}",
+ path_params={"path": path},
+ raise_for_status=False
+ )
+ _raise_404(resp, "User storage file path does not exist")
+ data = resp.json()
+ if data["isDir"]:
+ raise Exception("User storage path is a directory, not a file")
+ file = data['files'][0]
+ file['created_time'] = convert_iso8601_to_datetime(file['createdTime'])
+ file['mime_type'] = file['mimeType']
+ file['data-product-uri'] = file['dataProductURI']
+ return file
+
+ backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+ if backend.exists(path) and backend.is_file(path):
+ _, files = backend.get_metadata(path)
+ file = files[0]
+ data_product_uri = _get_data_product_uri(request, file['resource_path'],
+ storage_resource_id=backend.resource_id)
+
+ data_product = request.airavata_client.getDataProduct(
+ request.authz_token, data_product_uri)
+ mime_type = None
+ if 'mime-type' in data_product.productMetadata:
+ mime_type = data_product.productMetadata['mime-type']
+ file['data-product-uri'] = data_product_uri
+ file['mime_type'] = mime_type
+ return file
+ else:
+ raise ObjectDoesNotExist("File does not exist at that path.")
+
+
+def get_file(request, path, storage_resource_id=None):
+ warnings.warn("Use 'get_file_metadata' instead.", DeprecationWarning)
+ return get_file_metadata(request, path, storage_resource_id)
+
+
+def delete(request, data_product=None, data_product_uri=None):
+ """
+ Delete replica for data product in this data store. One of `data_product`
+ or `data_product_uri` is required.
+ """
+ if data_product is None:
+ data_product = _get_data_product(request, data_product_uri)
+ if _is_remote_api():
+ _call_remote_api(
+ request,
+ "/delete-file",
+ params={'data-product-uri': data_product.productUri},
+ method="delete")
+ return
+ else:
+ storage_resource_id, path = _get_replica_resource_id_and_filepath(data_product)
+ backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+ try:
+ backend.delete(path)
+ _delete_data_product(data_product.ownerName, path)
+ except Exception:
+ logger.exception(
+ "Unable to delete file {} for data product uri {}".format(
+ path, data_product.productUri
+ )
+ )
+ raise
+
+
+def listdir(request, path, storage_resource_id=None):
+ """Return a tuple of two lists, one for directories, the second for files."""
+
+ if _is_remote_api():
+ resp = _call_remote_api(request,
+ "/user-storage/~/{path}",
+ path_params={"path": path},
+ )
+ data = resp.json()
+ for directory in data['directories']:
+ # Convert JSON ISO8601 timestamp to datetime instance
+ directory['created_time'] = convert_iso8601_to_datetime(
+ directory['createdTime'])
+ for file in data['files']:
+ # Convert JSON ISO8601 timestamp to datetime instance
+ file['created_time'] = convert_iso8601_to_datetime(
+ file['createdTime'])
+ file['mime_type'] = file['mimeType']
+ file['data-product-uri'] = file['dataProductURI']
+ return data['directories'], data['files']
+
+ backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+ directories, files = backend.get_metadata(path)
+ # for each file, lookup or register a data product and enrich the file
+ # metadata with data-product-uri and mime-type
+ for file in files:
+ data_product_uri = _get_data_product_uri(request, file['resource_path'],
+ storage_resource_id=backend.resource_id)
+
+ data_product = request.airavata_client.getDataProduct(
+ request.authz_token, data_product_uri)
+ mime_type = None
+ if 'mime-type' in data_product.productMetadata:
+ mime_type = data_product.productMetadata['mime-type']
+ file['data-product-uri'] = data_product_uri
+ file['mime_type'] = mime_type
+ return directories, files
+
+
+def list_experiment_dir(request, experiment_id, path="", storage_resource_id=None):
+ """
+ List files, directories in experiment data directory. Returns a tuple,
+ see `listdir`.
+ """
+ if _is_remote_api():
+ resp = _call_remote_api(request,
+ "/experiment-storage/{experiment_id}/{path}",
+ path_params={"path": path,
+ "experiment_id": experiment_id},
+ )
+ data = resp.json()
+ for directory in data['directories']:
+ # Convert JSON ISO8601 timestamp to datetime instance
+ directory['created_time'] = convert_iso8601_to_datetime(
+ directory['createdTime'])
+ for file in data['files']:
+ # Convert JSON ISO8601 timestamp to datetime instance
+ file['created_time'] = convert_iso8601_to_datetime(
+ file['createdTime'])
+ file['mime_type'] = file['mimeType']
+ file['data-product-uri'] = file['dataProductURI']
+ return data['directories'], data['files']
+
+ experiment = request.airavata_client.getExperiment(
+ request.authz_token, experiment_id)
+ backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+ exp_data_path = experiment.userConfigurationData.experimentDataDir
+ exp_data_path = os.path.join(exp_data_path, path)
+ # Implement username override with exp_owner
+ # exp_owner = experiment.userName
+ if backend.exists(exp_data_path):
+ directories, files = backend.get_metadata(exp_data_path)
+ # for each file, lookup or register a data product and enrich the file
+ # metadata with data-product-uri and mime-type
+ for file in files:
+ data_product_uri = _get_data_product_uri(request, file['resource_path'],
+ storage_resource_id=backend.resource_id)
+
+ data_product = request.airavata_client.getDataProduct(
+ request.authz_token, data_product_uri)
+ mime_type = None
+ if 'mime-type' in data_product.productMetadata:
+ mime_type = data_product.productMetadata['mime-type']
+ file['data-product-uri'] = data_product_uri
+ file['mime_type'] = mime_type
+ return directories, files
+ else:
+ raise ObjectDoesNotExist("Experiment data directory does not exist")
+
+
+def experiment_dir_exists(request, experiment_id, path="", storage_resource_id=None):
+
+ if _is_remote_api():
+ resp = _call_remote_api(request,
+ "/experiment-storage/{experiment_id}/{path}",
+ path_params={"path": path,
+ "experiment_id": experiment_id},
+ raise_for_status=False)
+ if resp.status_code == HTTPStatus.NOT_FOUND:
+ return False
+ resp.raise_for_status()
+ return resp.json()['isDir']
+
+ experiment = request.airavata_client.getExperiment(
+ request.authz_token, experiment_id)
+ exp_data_path = experiment.userConfigurationData.experimentDataDir
+ if exp_data_path is None:
+ return False
+ # Implement username overide with exp_owner
+ # exp_owner = experiment.userName
+ backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+ return backend.exists(exp_data_path)
+
+
+def get_experiment_dir(request, project_name=None, experiment_name=None, path=None, storage_resource_id=None):
+ warnings.warn("Use 'create_user_dir' instead.", DeprecationWarning)
+ storage_resource_id, resource_path = create_user_dir(request,
+ dir_names=[project_name, experiment_name],
+ create_unique=True,
+ storage_resource_id=storage_resource_id)
+ return resource_path
+
+
+def create_user_dir(request, path="", dir_names=(), create_unique=False, storage_resource_id=None):
+ if _is_remote_api():
+ logger.debug(f"path={path}")
+ _call_remote_api(request,
+ "/user-storage/~/{path}",
+ path_params={"path": path},
+ method="post")
+ return
+ backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+ # For backwards compatibility, manufacture the dir_names array as needed
+ if len(dir_names) == 0:
+ dir_names = []
+ while not backend.exists(path):
+ path, dir_name = os.path.split(path)
+ if dir_name == '':
+ raise Exception("Could not find a base directory in which to create directories.")
+ dir_names.insert(0, dir_name)
+ storage_resource_id, resource_path = backend.create_dirs(path, dir_names=dir_names, create_unique=create_unique)
+ return storage_resource_id, resource_path
+
+
+def get_rel_experiment_dir(request, experiment_id, storage_resource_id=None):
+ """Return experiment data dir path relative to user's directory."""
+ warnings.warn("Use 'list_experiment_dir' instead.", DeprecationWarning)
+ if _is_remote_api():
+ resp = _call_remote_api(request,
+ "/experiments/{experimentId}/",
+ path_params={"experimentId": experiment_id})
+ resp.raise_for_status()
+ return resp.json()['relativeExperimentDataDir']
+
+ experiment = request.airavata_client.getExperiment(
+ request.authz_token, experiment_id)
+ if (experiment.userConfigurationData and
+ experiment.userConfigurationData.experimentDataDir):
+ backend = get_user_storage_provider(request, storage_resource_id=storage_resource_id)
+ data_dir = experiment.userConfigurationData.experimentDataDir
+ if backend.exists(data_dir):
+ directories, _ = backend.get_metadata(os.path.dirname(data_dir))
+ for directory in directories:
+ if directory['name'] == os.path.basename(data_dir):
+ return directory['path']
+ raise Exception(f"Could not find relative path to experiment data dir {data_dir}")
+ else:
+ return None
+ else:
+ return None
+
+
+def _get_data_product_uri(request, full_path, storage_resource_id, owner=None):
+
+ from airavata_django_portal_sdk import models
+ if owner is None:
+ owner = request.user.username
+ user_file = models.UserFiles.objects.filter(
+ username=owner, file_path=full_path)
+ if user_file.exists():
+ product_uri = user_file[0].file_dpu
+ else:
+ data_product = _save_data_product(request, full_path, storage_resource_id, owner=owner)
+ product_uri = data_product.productUri
+ return product_uri
+
+
+def _get_data_product(request, data_product_uri):
+ return request.airavata_client.getDataProduct(
+ request.authz_token, data_product_uri)
+
+
+def _save_data_product(request, full_path, storage_resource_id, name=None, content_type=None, owner=None):
+ "Create, register and record in DB a data product for full_path."
+ if owner is None:
+ owner = request.user.username
+ data_product = _create_data_product(
+ owner, full_path, storage_resource_id, name=name, content_type=content_type
+ )
+ product_uri = _register_data_product(request, full_path, data_product, owner=owner)
+ data_product.productUri = product_uri
+ return data_product
+
+
+def _register_data_product(request, full_path, data_product, owner=None):
+ if owner is None:
+ owner = request.user.username
+ product_uri = request.airavata_client.registerDataProduct(
+ request.authz_token, data_product
+ )
+ from airavata_django_portal_sdk import models
+ user_file_instance = models.UserFiles(
+ username=owner,
+ file_path=full_path,
+ file_dpu=product_uri)
+ user_file_instance.save()
+ return product_uri
+
+
+def _save_copy_of_data_product(request, full_path, data_product, storage_resource_id):
+ """Save copy of a data product with a different path."""
+ data_product_copy = _copy_data_product(request, data_product, full_path, storage_resource_id)
+ product_uri = _register_data_product(request, full_path, data_product_copy)
+ data_product_copy.productUri = product_uri
+ return data_product_copy
+
+
+def _copy_data_product(request, data_product, full_path, storage_resource_id):
+ """Create an unsaved copy of a data product with different path."""
+ data_product_copy = copy.copy(data_product)
+ data_product_copy.productUri = None
+ data_product_copy.ownerName = request.user.username
+ data_replica_location = _create_replica_location(
+ full_path, data_product_copy.productName, storage_resource_id
+ )
+ data_product_copy.replicaLocations = [data_replica_location]
+ return data_product_copy
+
+
+def _delete_data_product(username, full_path):
+ # TODO: call API to delete data product from replica catalog when it is
+ # available (not currently implemented)
+ from airavata_django_portal_sdk import models
+ user_file = models.UserFiles.objects.filter(
+ username=username, file_path=full_path)
+ if user_file.exists():
+ user_file.delete()
+
+
+def _create_data_product(username, full_path, storage_resource_id, name=None, content_type=None):
+ data_product = DataProductModel()
+ data_product.gatewayId = settings.GATEWAY_ID
+ data_product.ownerName = username
+ if name is not None:
+ file_name = name
+ else:
+ file_name = os.path.basename(full_path)
+ data_product.productName = file_name
+ data_product.dataProductType = DataProductType.FILE
+ final_content_type = _determine_content_type(full_path, content_type)
+ if final_content_type is not None:
+ data_product.productMetadata = {"mime-type": final_content_type}
+ data_replica_location = _create_replica_location(full_path, file_name, storage_resource_id)
+ data_product.replicaLocations = [data_replica_location]
+ return data_product
+
+
+def _determine_content_type(full_path, content_type=None):
+ result = content_type
+ if result is None:
+ # Try to guess the content-type from file extension
+ guessed_type, encoding = mimetypes.guess_type(full_path)
+ result = guessed_type
+ if result is None or result == "application/octet-stream":
+ # Check if file is Unicode text by trying to read some of it
+ try:
+ open(full_path, "r").read(1024)
+ result = "text/plain"
+ except UnicodeDecodeError:
+ logger.debug(f"Failed to read as Unicode text: {full_path}")
+ return result
+
+
+def _create_replica_location(full_path, file_name, storage_resource_id):
+ data_replica_location = DataReplicaLocationModel()
+ data_replica_location.storageResourceId = storage_resource_id
+ data_replica_location.replicaName = "{} gateway data store copy".format(
+ file_name)
+ data_replica_location.replicaLocationCategory = (
+ ReplicaLocationCategory.GATEWAY_DATA_STORE
+ )
+ data_replica_location.replicaPersistentType = ReplicaPersistentType.TRANSIENT
+ data_replica_location.filePath = quote(full_path)
+ return data_replica_location
+
+
+def _get_replica_filepath(data_product):
+ replica_filepaths = [
+ rep.filePath
+ for rep in data_product.replicaLocations
+ if rep.replicaLocationCategory == ReplicaLocationCategory.GATEWAY_DATA_STORE
+ ]
+ replica_filepath = replica_filepaths[0] if len(
+ replica_filepaths) > 0 else None
+ if replica_filepath:
+ return unquote(urlparse(replica_filepath).path)
+ return None
+
+
+def _get_replica_location(data_product, category=ReplicaLocationCategory.GATEWAY_DATA_STORE):
+ replica_locations = [
+ rep
+ for rep in data_product.replicaLocations
+ if rep.replicaLocationCategory == ReplicaLocationCategory.GATEWAY_DATA_STORE
+ ]
+ return replica_locations[0] if len(replica_locations) > 0 else None
+
+
+def _get_replica_resource_id_and_filepath(data_product):
+ replica_location = _get_replica_location(data_product)
+ if replica_location is not None:
+ return (replica_location.storageResourceId,
+ unquote(urlparse(replica_location.filePath).path))
+ else:
+ return None
+
+
+def _is_remote_api():
+ return getattr(settings, 'GATEWAY_DATA_STORE_REMOTE_API', None) is not None
+
+
+def _call_remote_api(
+ request,
+ path,
+ path_params=None,
+ method="get",
+ raise_for_status=True,
+ **kwargs):
+
+ headers = {
+ 'Authorization': f'Bearer {request.authz_token.accessToken}'}
+ encoded_path_params = {}
+ if path_params is not None:
+ for pk, pv in path_params.items():
+ encoded_path_params[pk] = quote(pv)
+ encoded_path = path.format(**encoded_path_params)
+ logger.debug(f"encoded_path={encoded_path}")
+ r = requests.request(
+ method,
+ f'{settings.GATEWAY_DATA_STORE_REMOTE_API}{encoded_path}',
+ headers=headers,
+ **kwargs,
+ )
+ if raise_for_status:
+ r.raise_for_status()
+ return r
+
+
+def _raise_404(response, msg, exception_class=ObjectDoesNotExist):
+ if response.status_code == 404:
+ raise exception_class(msg)
diff --git a/airavata_django_portal_sdk/user_storage/backends/__init__.py b/airavata_django_portal_sdk/user_storage/backends/__init__.py
new file mode 100644
index 0000000..eefe980
--- /dev/null
+++ b/airavata_django_portal_sdk/user_storage/backends/__init__.py
@@ -0,0 +1,3 @@
+from .django_filesystem_provider import DjangoFileSystemProvider
+
+__all__ = ['DjangoFileSystemProvider']
diff --git a/airavata_django_portal_sdk/user_storage/backends/base.py b/airavata_django_portal_sdk/user_storage/backends/base.py
new file mode 100644
index 0000000..409a794
--- /dev/null
+++ b/airavata_django_portal_sdk/user_storage/backends/base.py
@@ -0,0 +1,64 @@
+
+class UserStorageProvider:
+ def __init__(self, authz_token, resource_id, context=None, **kwargs):
+ self.authz_token = authz_token
+ self.resource_id = resource_id
+ # TODO probably don't need context for passing 'request'
+ self.context = context
+
+ # TODO remove content_type
+ def save(self, resource_path, file, name=None, content_type=None):
+ """
+ Return a tuple of storage resource id and path, if any, to file.
+ """
+ raise NotImplementedError()
+
+ def get_upload_url(self, resource_path):
+ raise NotImplementedError()
+
+ def open(self, resource_path):
+ raise NotImplementedError()
+
+ def get_download_url(self, resource_path):
+ raise NotImplementedError()
+
+ def exists(self, resource_path):
+ raise NotImplementedError()
+
+ def is_file(self, resource_path):
+ # TODO: is this needed if we have get_metadata?
+ raise NotImplementedError()
+
+ def is_dir(self, resource_path):
+ # TODO: is this needed if we have get_metadata?
+ raise NotImplementedError()
+
+ def get_metadata(self, resource_path):
+ """
+ Return a tuple of two sequences: directories and files for the given
+ resource_path. If the resource_path represents a file, then the
+ directories sequence should be empty and the files sequence will only
+ have the one file.
+ """
+ raise NotImplementedError()
+
+ def delete(self, resource_path):
+ raise NotImplementedError()
+
+ def update(self, resource_path, file):
+ raise NotImplementedError()
+
+ def create_dirs(self, resource_path, dir_names=[], create_unique=False):
+ """
+ Create one or more named subdirectories inside the resource_path.
+ resource_path must exist. dir_names will potentially be normalized as
+ needed. The intermediate directories may already exist, but if the
+ final directory already exists, this method will raise an Exception,
+ unless create_unique is True in which the name will be modified until
+ a unique directory name is found.
+ """
+ raise NotImplementedError()
+
+ @property
+ def username(self):
+ return self.authz_token.claimsMap['userName']
diff --git a/airavata_django_portal_sdk/user_storage/backends/django_filesystem_provider.py b/airavata_django_portal_sdk/user_storage/backends/django_filesystem_provider.py
new file mode 100644
index 0000000..78812bd
--- /dev/null
+++ b/airavata_django_portal_sdk/user_storage/backends/django_filesystem_provider.py
@@ -0,0 +1,306 @@
+import logging
+import os
+import shutil
+
+from django.core.exceptions import ObjectDoesNotExist, SuspiciousFileOperation
+from django.core.files.storage import FileSystemStorage
+
+from .base import UserStorageProvider
+from django.core.files import File
+
+logger = logging.getLogger(__name__)
+
+TMP_INPUT_FILE_UPLOAD_DIR = "tmp"
+
+
+class DjangoFileSystemProvider(UserStorageProvider):
+ def __init__(self, authz_token, resource_id, context=None, directory=None, storage_resource_id=None, **kwargs):
+ super().__init__(authz_token, resource_id, context=context, **kwargs)
+ self.directory = directory
+ self.storage_resource_id = resource_id
+
+ def save(self, path, file, name=None, content_type=None):
+ full_path = self.datastore.save(path, file, name=name)
+ return self.storage_resource_id, full_path
+
+ def get_upload_url(self, resource_path):
+ # TODO: implement
+ return super().get_upload_url(resource_path)
+
+ def open(self, resource_path):
+ return self.datastore.open(resource_path)
+
+ def get_download_url(self, resource_path):
+ # TODO: implement
+ return super().get_download_url(resource_path)
+
+ def exists(self, resource_path):
+ return self.datastore.exists(resource_path)
+
+ def is_file(self, resource_path):
+ return self.datastore.file_exists(resource_path)
+
+ def is_dir(self, resource_path):
+ return self.datastore.dir_exists(resource_path)
+
+ def get_metadata(self, resource_path):
+ # TODO: also return an isDir boolean flag?
+ datastore = self.datastore
+ if datastore.dir_exists(resource_path):
+ directories, files = datastore.list_user_dir(
+ resource_path)
+ directories_data = []
+ for d in directories:
+ dpath = os.path.join(resource_path, d)
+ created_time = datastore.get_created_time(dpath)
+ size = datastore.size(dpath)
+ directories_data.append(
+ {
+ "name": d,
+ "path": datastore.rel_path(dpath),
+ "created_time": created_time,
+ "size": size,
+ "hidden": dpath == TMP_INPUT_FILE_UPLOAD_DIR,
+ }
+ )
+ files_data = []
+ for f in files:
+ user_rel_path = os.path.join(resource_path, f)
+ if not datastore.exists(user_rel_path):
+ logger.warning(f"listdir skipping {user_rel_path}, "
+ "does not exist (broken symlink?)")
+ continue
+ created_time = datastore.get_created_time(user_rel_path)
+ size = datastore.size(user_rel_path)
+ full_path = datastore.path(user_rel_path)
+ files_data.append(
+ {
+ "name": f,
+ "path": datastore.rel_path(full_path),
+ "resource_path": full_path,
+ "created_time": created_time,
+ "size": size,
+ "hidden": False,
+ }
+ )
+ return directories_data, files_data
+ elif datastore.exists(resource_path):
+
+ created_time = datastore.get_created_time(resource_path)
+ size = datastore.size(resource_path)
+ full_path = datastore.path(resource_path)
+ return [], [
+ {
+ "name": os.path.basename(resource_path),
+ "path": datastore.rel_path(full_path),
+ "resource_path": full_path,
+ "created_time": created_time,
+ "size": size,
+ "hidden": False,
+ }
+ ]
+ else:
+ raise ObjectDoesNotExist(f"User storage path does not exist {resource_path}")
+
+ def delete(self, resource_path):
+ if self.datastore.file_exists(resource_path):
+ self.datastore.delete(resource_path)
+ elif self.datastore.dir_exists(resource_path):
+ self.datastore.delete_dir(resource_path)
+ else:
+ raise ObjectDoesNotExist(f"User resource_path does not exist {resource_path}")
+
+ def update(self, resource_path, file):
+ full_path = self.datastore.path(resource_path)
+ with open(full_path, 'w') as f:
+ f.write(file.read())
+
+ def create_dirs(self, resource_path, dir_names=[], create_unique=False):
+ datastore = self.datastore
+ if not datastore.exists(resource_path):
+ raise ObjectDoesNotExist(f"User resource_path does not exist {resource_path}")
+ valid_dir_names = []
+ for dir_name in dir_names:
+ valid_dir_names.append(datastore.get_valid_name(dir_name))
+ final_path = os.path.join(resource_path, *valid_dir_names)
+ if datastore.exists(final_path) and not create_unique:
+ raise Exception(f"Directory {final_path} already exists")
+ # Make sure path is unique if it already exists
+ final_path = datastore.get_available_name(final_path)
+ datastore.create_user_dir(final_path)
+ return self.storage_resource_id, final_path
+
+ @property
+ def datastore(self):
+ directory = os.path.join(self.directory, self.username)
+ owner_username = self.context.get('owner_username')
+ # When the current user isn't the owner, set the directory based on the owner's username
+ if owner_username:
+ directory = os.path.join(self.directory, owner_username)
+ return _Datastore(directory=directory)
+
+
+class _Datastore:
+ """Internal datastore abstraction."""
+
+ def __init__(self, directory=None):
+ self.directory = directory
+ self.storage = self._user_data_storage(self.directory)
+
+ def exists(self, path):
+ """Check if path exists in this data store."""
+ try:
+ return self.storage.exists(path)
+ except SuspiciousFileOperation as e:
+ logger.warning(f"Invalid path: {e}")
+ return False
+
+ def file_exists(self, path):
+ """Check if file path exists in this data store."""
+ try:
+ return self.storage.exists(path) and os.path.isfile(self.path(path))
+ except SuspiciousFileOperation as e:
+ logger.warning(f"Invalid path: {e}")
+ return False
+
+ def dir_exists(self, path):
+ """Check if directory path exists in this data store."""
+ logger.debug(f"dir_exists: {path}, {self.path(path)}")
+ try:
+ return self.storage.exists(path) and os.path.isdir(self.path(path))
+ except SuspiciousFileOperation as e:
+ logger.warning(f"Invalid path: {e}")
+ return False
+
+ def open(self, path):
+ """Open path for user if it exists in this data store."""
+ if self.exists(path):
+ return self.storage.open(path)
+ else:
+ raise ObjectDoesNotExist(
+ "File path does not exist: {}".format(path))
+
+ def save(self, path, file, name=None):
+ """Save file to username/path in data store."""
+ # file.name may be full path, so get just the name of the file
+ file_name = name if name is not None else os.path.basename(file.name)
+ user_data_storage = self.storage
+ file_path = os.path.join(
+ path, user_data_storage.get_valid_name(file_name))
+ input_file_name = user_data_storage.save(file_path, file)
+ input_file_fullpath = user_data_storage.path(input_file_name)
+ return input_file_fullpath
+
+ def create_user_dir(self, path):
+ user_data_storage = self.storage
+ if not user_data_storage.exists(path):
+ self._makedirs(path)
+ else:
+ raise Exception("Directory {} already exists".format(path))
+
+ def delete(self, path):
+ """Delete file in this data store."""
+ if self.file_exists(path):
+ user_data_storage = self.storage
+ user_data_storage.delete(path)
+ else:
+ raise ObjectDoesNotExist(
+ "File path does not exist: {}".format(path))
+
+ def delete_dir(self, path):
+ """Delete entire directory in this data store."""
+ if self.dir_exists(path):
+ user_path = self.path(path)
+ shutil.rmtree(user_path)
+ else:
+ raise ObjectDoesNotExist(
+ "File path does not exist: {}".format(path))
+
+ def get_experiment_dir(
+ self, project_name=None, experiment_name=None, path=None
+ ):
+ """Return an experiment directory (full path) for the given experiment."""
+ user_experiment_data_storage = self.storage
+ if path is None:
+ proj_dir_name = user_experiment_data_storage.get_valid_name(
+ project_name)
+ # AIRAVATA-3245 Make project directory with correct permissions
+ if not user_experiment_data_storage.exists(proj_dir_name):
+ self._makedirs(proj_dir_name)
+ experiment_dir_name = os.path.join(
+ proj_dir_name,
+ user_experiment_data_storage.get_valid_name(experiment_name),
+ )
+ # Since there may already be another experiment with the same name in
+ # this project, we need to check for available name
+ experiment_dir_name = user_experiment_data_storage.get_available_name(
+ experiment_dir_name)
+ experiment_dir = user_experiment_data_storage.path(
+ experiment_dir_name)
+ else:
+ # path can be relative to the user's storage space or absolute (as long
+ # as it is still inside the user's storage space)
+ # if path is passed in, assumption is that it has already been
+ # created
+ user_experiment_data_storage = self.storage
+ experiment_dir = user_experiment_data_storage.path(path)
+ if not user_experiment_data_storage.exists(experiment_dir):
+ self._makedirs(experiment_dir)
+ return experiment_dir
+
+ def get_valid_name(self, name):
+ return self.storage.get_valid_name(name)
+
+ def get_available_name(self, name):
+ return self.storage.get_available_name(name)
+
+ def _makedirs(self, dir_path):
+ user_experiment_data_storage = self.storage
+ full_path = user_experiment_data_storage.path(dir_path)
+ os.makedirs(
+ full_path,
+ mode=user_experiment_data_storage.directory_permissions_mode)
+ # os.makedirs mode isn't always respected so need to chmod to be sure
+ os.chmod(
+ full_path,
+ mode=user_experiment_data_storage.directory_permissions_mode)
+
+ def list_user_dir(self, file_path):
+ logger.debug("file_path={}".format(file_path))
+ user_data_storage = self.storage
+ return user_data_storage.listdir(file_path)
+
+ def get_created_time(self, file_path):
+ user_data_storage = self.storage
+ return user_data_storage.get_created_time(file_path)
+
+ def size(self, file_path):
+ user_data_storage = self.storage
+ full_path = self.path(file_path)
+ if os.path.isdir(full_path):
+ return self._get_dir_size(full_path)
+ else:
+ return user_data_storage.size(file_path)
+
+ def path(self, file_path):
+ user_data_storage = self.storage
+ return user_data_storage.path(file_path)
+
+ def rel_path(self, file_path):
+ full_path = self.path(file_path)
+ return os.path.relpath(full_path, self.path(""))
+
+ def _user_data_storage(self, directory):
+ return FileSystemStorage(location=directory)
+
+ # from https://stackoverflow.com/a/1392549
+ def _get_dir_size(self, start_path="."):
+ total_size = 0
+ for dirpath, dirnames, filenames in os.walk(start_path):
+ for f in filenames:
+ fp = os.path.join(dirpath, f)
+ # Check for broken symlinks (.exists return False for broken
+ # symlinks)
+ if os.path.exists(fp):
+ total_size += os.path.getsize(fp)
+ return total_size
diff --git a/airavata_django_portal_sdk/user_storage/backends/mft_provider.py b/airavata_django_portal_sdk/user_storage/backends/mft_provider.py
new file mode 100644
index 0000000..6678572
--- /dev/null
+++ b/airavata_django_portal_sdk/user_storage/backends/mft_provider.py
@@ -0,0 +1,152 @@
+from .base import UserStorageProvider
+
+
+class MFTUserStorageProvider(UserStorageProvider):
+
+ def exists(self, resource_path):
+ return super().exists(resource_path)
+# with grpc.insecure_channel('localhost:7004') as channel:
+# # remove trailing slash and figure out parent path
+# # FIXME remove the hard coded /tmp path
+# parent_path, child_path = os.path.split(f"/tmp/{path}".rstrip("/"))
+# logger.debug(f"parent_path={parent_path}, child_path={child_path}")
+# stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
+# # Get metadata for parent directory and see if child_path exists
+# request = MFTApi_pb2.FetchResourceMetadataRequest(
+# resourceId="remote-ssh-dir-resource",
+# resourceType="SCP",
+# resourceToken="local-ssh-cred",
+# resourceBackend="FILE",
+# resourceCredentialBackend="FILE",
+# targetAgentId="agent0",
+# childPath=parent_path,
+# mftAuthorizationToken="user token")
+# response = stub.getDirectoryResourceMetadata(request)
+# # if not child_path, then return True since the response was
+# # successful and we just need to confirm the existence of the root dir
+# if child_path == '':
+# return True
+# return child_path in map(lambda f: f.friendlyName, response.directories)
+
+ def get_metadata(self, resource_path):
+ return super().get_metadata(resource_path)
+# def listdir(self, request, path):
+# # TODO setup resourceId, etc from __init__ arguments
+# channel = grpc.insecure_channel('localhost:7004')
+# stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
+# request = MFTApi_pb2.FetchResourceMetadataRequest(
+# resourceId="remote-ssh-dir-resource",
+# resourceType="SCP",
+# resourceToken="local-ssh-cred",
+# resourceBackend="FILE",
+# resourceCredentialBackend="FILE",
+# targetAgentId="agent0",
+# childPath=f"/tmp/{path}",
+# mftAuthorizationToken="user token")
+# response = stub.getDirectoryResourceMetadata(request)
+# directories_data = []
+# for d in response.directories:
+
+# dpath = os.path.join(path, d.friendlyName)
+# created_time = datetime.fromtimestamp(d.createdTime)
+# # TODO MFT API doesn't report size
+# size = 0
+# directories_data.append(
+# {
+# "name": d.friendlyName,
+# "path": dpath,
+# "created_time": created_time,
+# "size": size,
+# # TODO how to handle hidden directories or directories for
+# # staging input file uploads
+# "hidden": False
+# }
+# )
+# files_data = []
+# for f in response.files:
+# user_rel_path = os.path.join(path, f.friendlyName)
+# # TODO do we need to check for broken symlinks?
+# created_time = datetime.fromtimestamp(f.createdTime)
+# # TODO get the size as well
+# size = 0
+# # full_path = datastore.path(request.user.username, user_rel_path)
+# # TODO how do we register these as data products, do we need to?
+# # data_product_uri = _get_data_product_uri(request, full_path)
+
+# # data_product = request.airavata_client.getDataProduct(
+# # request.authz_token, data_product_uri)
+# # mime_type = None
+# # if 'mime-type' in data_product.productMetadata:
+# # mime_type = data_product.productMetadata['mime-type']
+# files_data.append(
+# {
+# "name": f.friendlyName,
+# "path": user_rel_path,
+# "data-product-uri": None,
+# "created_time": created_time,
+# "mime_type": None,
+# "size": size,
+# "hidden": False,
+# }
+# )
+# return directories_data, files_data
+
+# def get_file(self, request, path):
+# # FIXME remove hard coded /tmp path
+# path = f"/tmp/{path}".rstrip("/")
+# file_metadata = self._get_file(path)
+# if file_metadata is not None:
+# user_rel_path = os.path.join(path, file_metadata.friendlyName)
+# created_time = datetime.fromtimestamp(file_metadata.createdTime)
+# # TODO get the size as well
+# size = 0
+
+# return {
+# "name": file_metadata.friendlyName,
+# "path": user_rel_path,
+# "data-product-uri": None,
+# "created_time": created_time,
+# "mime_type": None,
+# "size": size,
+# "hidden": False,
+# }
+# else:
+# raise ObjectDoesNotExist("User storage file path does not exist")
+
+# def _get_file(self, path):
+# with grpc.insecure_channel('localhost:7004') as channel:
+# stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
+# # Get metadata for parent directory and see if child_path exists
+# request = MFTApi_pb2.FetchResourceMetadataRequest(
+# resourceId="remote-ssh-dir-resource",
+# resourceType="SCP",
+# resourceToken="local-ssh-cred",
+# resourceBackend="FILE",
+# resourceCredentialBackend="FILE",
+# targetAgentId="agent0",
+# childPath=path,
+# mftAuthorizationToken="user token")
+# try:
+# # TODO is there a better way to check if file exists than catching exception?
+# return stub.getFileResourceMetadata(request)
+# except Exception:
+# logger.exception(f"_get_file({path})")
+# return None
+
+# def _get_download_url(self, path):
+
+# with grpc.insecure_channel('localhost:7004') as channel:
+# stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
+# download_request = MFTApi_pb2.HttpDownloadApiRequest(sourceStoreId="remote-ssh-storage",
+# sourcePath="/tmp/a.txt",
+# sourceToken="local-ssh-cred",
+# sourceType="SCP",
+# targetAgent="agent0",
+# mftAuthorizationToken="")
+# try:
+# # TODO is there a better way to check if file exists than catching exception?
+# # response stub.submitHttpDownload(request)
+# pass
+# except Exception:
+# logger.exception(f"_get_file({path})")
+# return None
diff --git a/airavata_django_portal_sdk/user_storage/backends/remote_api_provider.py b/airavata_django_portal_sdk/user_storage/backends/remote_api_provider.py
new file mode 100644
index 0000000..e69de29
diff --git a/airavata_django_portal_sdk/user_storage_provider.py b/airavata_django_portal_sdk/user_storage_provider.py
index cc214db..b6b6842 100644
--- a/airavata_django_portal_sdk/user_storage_provider.py
+++ b/airavata_django_portal_sdk/user_storage_provider.py
@@ -13,8 +13,9 @@ logger = logging.getLogger(__name__)
class UserStorageProvider:
- def __init__(self, authz_token, *args, **kwargs):
+ def __init__(self, authz_token, context=None, *args, **kwargs):
self.authz_token = authz_token
+ self.context = context
def save(self, authz_token, path, file, name=None, content_type=None):
raise NotImplementedError()